From c1f3497b43bff6f7640161807dce01dc089ce405 Mon Sep 17 00:00:00 2001 From: Adam King Date: Fri, 1 Apr 2022 09:41:01 -0400 Subject: [PATCH] mgr/cephadm: make use of new upgrade control parameters Fixes: https://tracker.ceph.com/issues/54135 Signed-off-by: Adam King --- src/pybind/mgr/cephadm/upgrade.py | 145 +++++++++++++++++++--- src/pybind/mgr/orchestrator/_interface.py | 1 + src/pybind/mgr/orchestrator/module.py | 1 + 3 files changed, 132 insertions(+), 15 deletions(-) diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index b3c9d3d1541..4b06ef455f5 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -59,7 +59,12 @@ class UpgradeState: error: Optional[str] = None, paused: Optional[bool] = None, fs_original_max_mds: Optional[Dict[str, int]] = None, - fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None + fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None, + daemon_types: Optional[List[str]] = None, + hosts: Optional[List[str]] = None, + services: Optional[List[str]] = None, + total_count: Optional[int] = None, + remaining_count: Optional[int] = None, ): self._target_name: str = target_name # Use CephadmUpgrade.target_image instead. self.progress_id: str = progress_id @@ -71,6 +76,11 @@ class UpgradeState: self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds self.fs_original_allow_standby_replay: Optional[Dict[str, bool]] = fs_original_allow_standby_replay + self.daemon_types = daemon_types + self.hosts = hosts + self.services = services + self.total_count = total_count + self.remaining_count = remaining_count def to_json(self) -> dict: return { @@ -83,6 +93,11 @@ class UpgradeState: 'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay, 'error': self.error, 'paused': self.paused, + 'daemon_types': self.daemon_types, + 'hosts': self.hosts, + 'services': self.services, + 'total_count': self.total_count, + 'remaining_count': self.remaining_count, } @classmethod @@ -132,6 +147,23 @@ class CephadmUpgrade: r.target_image = self.target_image r.in_progress = True r.progress, r.services_complete = self._get_upgrade_info() + + if self.upgrade_state.daemon_types is not None: + which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}' + if self.upgrade_state.hosts is not None: + which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}' + elif self.upgrade_state.services is not None: + which_str = f'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}' + if self.upgrade_state.hosts is not None: + which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}' + elif self.upgrade_state.hosts is not None: + which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}' + else: + which_str = 'Upgrading all daemon types on all hosts' + if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None: + which_str += f'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).' + r.which = which_str + # accessing self.upgrade_info_str will throw an exception if it # has not been set in _do_upgrade yet try: @@ -148,7 +180,7 @@ class CephadmUpgrade: if not self.upgrade_state or not self.upgrade_state.target_digests: return '', [] - daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER] + daemons = self._get_filtered_daemons() if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'): return '', [] @@ -163,6 +195,25 @@ class CephadmUpgrade: return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types + def _get_filtered_daemons(self) -> List[DaemonDescription]: + # Return the set of daemons set to be upgraded with out current + # filtering parameters (or all daemons in upgrade order if no filtering + # parameter are set). + assert self.upgrade_state is not None + if self.upgrade_state.daemon_types is not None: + daemons = [d for d in self.mgr.cache.get_daemons( + ) if d.daemon_type in self.upgrade_state.daemon_types] + elif self.upgrade_state.services is not None: + daemons = [] + for service in self.upgrade_state.services: + daemons += self.mgr.cache.get_daemons_by_service(service) + else: + daemons = [d for d in self.mgr.cache.get_daemons( + ) if d.daemon_type in CEPH_UPGRADE_ORDER] + if self.upgrade_state.hosts is not None: + daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts] + return daemons + def _get_current_version(self) -> Tuple[int, int, str]: current_version = self.mgr.version.split('ceph version ')[1] (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2) @@ -279,7 +330,12 @@ class CephadmUpgrade: self.mgr.log.info('Upgrade: Started with target %s' % target_name) self.upgrade_state = UpgradeState( target_name=target_name, - progress_id=str(uuid.uuid4()) + progress_id=str(uuid.uuid4()), + daemon_types=daemon_types, + hosts=hosts, + services=services, + total_count=limit, + remaining_count=limit, ) self._update_upgrade_progress(0.0) self._save_upgrade_state() @@ -723,6 +779,9 @@ class CephadmUpgrade: if target_digests is None: target_digests = [] for d_entry in to_upgrade: + if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]: + self.mgr.log.info(f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade') + return d = d_entry[0] assert d.daemon_type is not None assert d.daemon_id is not None @@ -763,8 +822,7 @@ class CephadmUpgrade: self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type) if len(to_upgrade) > 1: - logger.info('Upgrade: Updating %s.%s (%d/%d)' % - (d.daemon_type, d.daemon_id, num, len(to_upgrade))) + logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade), self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999))) else: logger.info('Upgrade: Updating %s.%s' % (d.daemon_type, d.daemon_id)) @@ -788,6 +846,9 @@ class CephadmUpgrade: }) return num += 1 + if self.upgrade_state.remaining_count is not None and not d_entry[1]: + self.upgrade_state.remaining_count -= 1 + self._save_upgrade_state() def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None: if need_upgrade_self: @@ -883,6 +944,17 @@ class CephadmUpgrade: self.upgrade_state.fs_original_allow_standby_replay = {} self._save_upgrade_state() + def _mark_upgrade_complete(self) -> None: + if not self.upgrade_state: + logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting') + return + logger.info('Upgrade: Complete!') + if self.upgrade_state.progress_id: + self.mgr.remote('progress', 'complete', + self.upgrade_state.progress_id) + self.upgrade_state = None + self._save_upgrade_state() + def _do_upgrade(self): # type: () -> None if not self.upgrade_state: @@ -965,14 +1037,53 @@ class CephadmUpgrade: 'who': 'mon', }) - daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER] + if self.upgrade_state.daemon_types is not None: + logger.debug(f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}') + daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in self.upgrade_state.daemon_types] + elif self.upgrade_state.services is not None: + logger.debug(f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}') + daemons = [] + for service in self.upgrade_state.services: + daemons += self.mgr.cache.get_daemons_by_service(service) + else: + daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER] + if self.upgrade_state.hosts is not None: + logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}') + daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts] upgraded_daemon_count: int = 0 for daemon_type in CEPH_UPGRADE_ORDER: + if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0: + # we hit our limit and should end the upgrade + # except for cases where we only need to redeploy, but not actually upgrade + # the image (which we don't count towards our limit). This case only occurs with mgr + # and monitoring stack daemons. Additionally, this case is only valid if + # the active mgr is already upgraded. + if any(d in target_digests for d in self.mgr.get_active_mgr_digests()): + if daemon_type not in MONITORING_STACK_TYPES and daemon_type != 'mgr': + continue + else: + self._mark_upgrade_complete() + return logger.debug('Upgrade: Checking %s daemons' % daemon_type) + daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type] - need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons, target_digests) + need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons_of_type, target_digests) upgraded_daemon_count += done - self._update_upgrade_progress(upgraded_daemon_count / len(self.mgr.cache.get_daemons())) + self._update_upgrade_progress(upgraded_daemon_count / len(daemons)) + + # make sure mgr and monitoring stack daemons are properly redeployed in staggered upgrade scenarios + if daemon_type == 'mgr' or daemon_type in MONITORING_STACK_TYPES: + if any(d in target_digests for d in self.mgr.get_active_mgr_digests()): + need_upgrade_names = [d[0].name() for d in need_upgrade] + [d[0].name() for d in need_upgrade_deployer] + dds = [d for d in self.mgr.cache.get_daemons_by_type(daemon_type) if d.name() not in need_upgrade_names] + need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests) + if not n1: + if not need_upgrade_self and need_upgrade_active: + need_upgrade_self = True + need_upgrade_deployer += n2 + else: + # no point in trying to redeploy with new version if active mgr is not on the new version + need_upgrade_deployer = [] if not need_upgrade_self: # only after the mgr itself is upgraded can we expect daemons to have @@ -997,6 +1108,15 @@ class CephadmUpgrade: if to_upgrade: return + self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr') + + # following bits of _do_upgrade are for completing upgrade for given + # types. If we haven't actually finished upgrading all the daemons + # of this type, we should exit the loop here + _, n1, n2, _ = self._detect_need_upgrade(self.mgr.cache.get_daemons_by_type(daemon_type), target_digests) + if n1 or n2: + continue + # complete mon upgrade? if daemon_type == 'mon': if not self.mgr.get("have_local_config_map"): @@ -1032,7 +1152,7 @@ class CephadmUpgrade: self.mgr.agent_helpers._request_ack_all_not_up_to_date() return - logger.debug('Upgrade: All %s daemons are up to date.' % daemon_type) + logger.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type) # clean up logger.info('Upgrade: Finalizing container_image settings') @@ -1051,10 +1171,5 @@ class CephadmUpgrade: 'who': 'mon', }) - logger.info('Upgrade: Complete!') - if self.upgrade_state.progress_id: - self.mgr.remote('progress', 'complete', - self.upgrade_state.progress_id) - self.upgrade_state = None - self._save_upgrade_state() + self._mark_upgrade_complete() return diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 6622372f87c..b0ccf73570b 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -780,6 +780,7 @@ class UpgradeStatusSpec(object): self.in_progress = False # Is an upgrade underway? self.target_image: Optional[str] = None self.services_complete: List[str] = [] # Which daemon types are fully updated? + self.which: str = '' # for if user specified daemon types, services or hosts self.progress: Optional[str] = None # How many of the daemons have we upgraded self.message = "" # Freeform description diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index 3ce7fb907fb..d105c586903 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -1424,6 +1424,7 @@ Usage: r = { 'target_image': status.target_image, 'in_progress': status.in_progress, + 'which': status.which, 'services_complete': status.services_complete, 'progress': status.progress, 'message': status.message,