mgr/cephadm: make use of new upgrade control parameters

Fixes: https://tracker.ceph.com/issues/54135

Signed-off-by: Adam King <adking@redhat.com>
This commit is contained in:
Adam King 2022-04-01 09:41:01 -04:00
parent aeaa0b5fd8
commit c1f3497b43
3 changed files with 132 additions and 15 deletions

View File

@ -59,7 +59,12 @@ class UpgradeState:
error: Optional[str] = None,
paused: Optional[bool] = None,
fs_original_max_mds: Optional[Dict[str, int]] = None,
fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None
fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None,
daemon_types: Optional[List[str]] = None,
hosts: Optional[List[str]] = None,
services: Optional[List[str]] = None,
total_count: Optional[int] = None,
remaining_count: Optional[int] = None,
):
self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
self.progress_id: str = progress_id
@ -71,6 +76,11 @@ class UpgradeState:
self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
self.fs_original_allow_standby_replay: Optional[Dict[str,
bool]] = fs_original_allow_standby_replay
self.daemon_types = daemon_types
self.hosts = hosts
self.services = services
self.total_count = total_count
self.remaining_count = remaining_count
def to_json(self) -> dict:
return {
@ -83,6 +93,11 @@ class UpgradeState:
'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay,
'error': self.error,
'paused': self.paused,
'daemon_types': self.daemon_types,
'hosts': self.hosts,
'services': self.services,
'total_count': self.total_count,
'remaining_count': self.remaining_count,
}
@classmethod
@ -132,6 +147,23 @@ class CephadmUpgrade:
r.target_image = self.target_image
r.in_progress = True
r.progress, r.services_complete = self._get_upgrade_info()
if self.upgrade_state.daemon_types is not None:
which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
if self.upgrade_state.hosts is not None:
which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
elif self.upgrade_state.services is not None:
which_str = f'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}'
if self.upgrade_state.hosts is not None:
which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
elif self.upgrade_state.hosts is not None:
which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
else:
which_str = 'Upgrading all daemon types on all hosts'
if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None:
which_str += f'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).'
r.which = which_str
# accessing self.upgrade_info_str will throw an exception if it
# has not been set in _do_upgrade yet
try:
@ -148,7 +180,7 @@ class CephadmUpgrade:
if not self.upgrade_state or not self.upgrade_state.target_digests:
return '', []
daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
daemons = self._get_filtered_daemons()
if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
return '', []
@ -163,6 +195,25 @@ class CephadmUpgrade:
return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types
def _get_filtered_daemons(self) -> List[DaemonDescription]:
# Return the set of daemons set to be upgraded with out current
# filtering parameters (or all daemons in upgrade order if no filtering
# parameter are set).
assert self.upgrade_state is not None
if self.upgrade_state.daemon_types is not None:
daemons = [d for d in self.mgr.cache.get_daemons(
) if d.daemon_type in self.upgrade_state.daemon_types]
elif self.upgrade_state.services is not None:
daemons = []
for service in self.upgrade_state.services:
daemons += self.mgr.cache.get_daemons_by_service(service)
else:
daemons = [d for d in self.mgr.cache.get_daemons(
) if d.daemon_type in CEPH_UPGRADE_ORDER]
if self.upgrade_state.hosts is not None:
daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
return daemons
def _get_current_version(self) -> Tuple[int, int, str]:
current_version = self.mgr.version.split('ceph version ')[1]
(current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
@ -279,7 +330,12 @@ class CephadmUpgrade:
self.mgr.log.info('Upgrade: Started with target %s' % target_name)
self.upgrade_state = UpgradeState(
target_name=target_name,
progress_id=str(uuid.uuid4())
progress_id=str(uuid.uuid4()),
daemon_types=daemon_types,
hosts=hosts,
services=services,
total_count=limit,
remaining_count=limit,
)
self._update_upgrade_progress(0.0)
self._save_upgrade_state()
@ -723,6 +779,9 @@ class CephadmUpgrade:
if target_digests is None:
target_digests = []
for d_entry in to_upgrade:
if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]:
self.mgr.log.info(f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
return
d = d_entry[0]
assert d.daemon_type is not None
assert d.daemon_id is not None
@ -763,8 +822,7 @@ class CephadmUpgrade:
self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type)
if len(to_upgrade) > 1:
logger.info('Upgrade: Updating %s.%s (%d/%d)' %
(d.daemon_type, d.daemon_id, num, len(to_upgrade)))
logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade), self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999)))
else:
logger.info('Upgrade: Updating %s.%s' %
(d.daemon_type, d.daemon_id))
@ -788,6 +846,9 @@ class CephadmUpgrade:
})
return
num += 1
if self.upgrade_state.remaining_count is not None and not d_entry[1]:
self.upgrade_state.remaining_count -= 1
self._save_upgrade_state()
def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None:
if need_upgrade_self:
@ -883,6 +944,17 @@ class CephadmUpgrade:
self.upgrade_state.fs_original_allow_standby_replay = {}
self._save_upgrade_state()
def _mark_upgrade_complete(self) -> None:
if not self.upgrade_state:
logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting')
return
logger.info('Upgrade: Complete!')
if self.upgrade_state.progress_id:
self.mgr.remote('progress', 'complete',
self.upgrade_state.progress_id)
self.upgrade_state = None
self._save_upgrade_state()
def _do_upgrade(self):
# type: () -> None
if not self.upgrade_state:
@ -965,14 +1037,53 @@ class CephadmUpgrade:
'who': 'mon',
})
daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
if self.upgrade_state.daemon_types is not None:
logger.debug(f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in self.upgrade_state.daemon_types]
elif self.upgrade_state.services is not None:
logger.debug(f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
daemons = []
for service in self.upgrade_state.services:
daemons += self.mgr.cache.get_daemons_by_service(service)
else:
daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
if self.upgrade_state.hosts is not None:
logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}')
daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
upgraded_daemon_count: int = 0
for daemon_type in CEPH_UPGRADE_ORDER:
if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0:
# we hit our limit and should end the upgrade
# except for cases where we only need to redeploy, but not actually upgrade
# the image (which we don't count towards our limit). This case only occurs with mgr
# and monitoring stack daemons. Additionally, this case is only valid if
# the active mgr is already upgraded.
if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
if daemon_type not in MONITORING_STACK_TYPES and daemon_type != 'mgr':
continue
else:
self._mark_upgrade_complete()
return
logger.debug('Upgrade: Checking %s daemons' % daemon_type)
daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type]
need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons, target_digests)
need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons_of_type, target_digests)
upgraded_daemon_count += done
self._update_upgrade_progress(upgraded_daemon_count / len(self.mgr.cache.get_daemons()))
self._update_upgrade_progress(upgraded_daemon_count / len(daemons))
# make sure mgr and monitoring stack daemons are properly redeployed in staggered upgrade scenarios
if daemon_type == 'mgr' or daemon_type in MONITORING_STACK_TYPES:
if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
need_upgrade_names = [d[0].name() for d in need_upgrade] + [d[0].name() for d in need_upgrade_deployer]
dds = [d for d in self.mgr.cache.get_daemons_by_type(daemon_type) if d.name() not in need_upgrade_names]
need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests)
if not n1:
if not need_upgrade_self and need_upgrade_active:
need_upgrade_self = True
need_upgrade_deployer += n2
else:
# no point in trying to redeploy with new version if active mgr is not on the new version
need_upgrade_deployer = []
if not need_upgrade_self:
# only after the mgr itself is upgraded can we expect daemons to have
@ -997,6 +1108,15 @@ class CephadmUpgrade:
if to_upgrade:
return
self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
# following bits of _do_upgrade are for completing upgrade for given
# types. If we haven't actually finished upgrading all the daemons
# of this type, we should exit the loop here
_, n1, n2, _ = self._detect_need_upgrade(self.mgr.cache.get_daemons_by_type(daemon_type), target_digests)
if n1 or n2:
continue
# complete mon upgrade?
if daemon_type == 'mon':
if not self.mgr.get("have_local_config_map"):
@ -1032,7 +1152,7 @@ class CephadmUpgrade:
self.mgr.agent_helpers._request_ack_all_not_up_to_date()
return
logger.debug('Upgrade: All %s daemons are up to date.' % daemon_type)
logger.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type)
# clean up
logger.info('Upgrade: Finalizing container_image settings')
@ -1051,10 +1171,5 @@ class CephadmUpgrade:
'who': 'mon',
})
logger.info('Upgrade: Complete!')
if self.upgrade_state.progress_id:
self.mgr.remote('progress', 'complete',
self.upgrade_state.progress_id)
self.upgrade_state = None
self._save_upgrade_state()
self._mark_upgrade_complete()
return

View File

@ -780,6 +780,7 @@ class UpgradeStatusSpec(object):
self.in_progress = False # Is an upgrade underway?
self.target_image: Optional[str] = None
self.services_complete: List[str] = [] # Which daemon types are fully updated?
self.which: str = '<unknown>' # for if user specified daemon types, services or hosts
self.progress: Optional[str] = None # How many of the daemons have we upgraded
self.message = "" # Freeform description

View File

@ -1424,6 +1424,7 @@ Usage:
r = {
'target_image': status.target_image,
'in_progress': status.in_progress,
'which': status.which,
'services_complete': status.services_complete,
'progress': status.progress,
'message': status.message,