diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 157126a3aa2..ed186899c1e 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -22,7 +22,6 @@ import tempfile import multiprocessing.pool import shutil import subprocess -import uuid from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec @@ -43,6 +42,7 @@ from .services.osd import RemoveUtil, OSDRemoval, OSDService from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ NodeExporterService from .inventory import Inventory, SpecStore, HostCache +from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade try: import remoto @@ -70,9 +70,6 @@ Host * DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ' -# ceph daemon types that use the ceph container image. -# NOTE: listed in upgrade order! -CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror'] CEPH_TYPES = set(CEPH_UPGRADE_ORDER) @@ -391,11 +388,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): CephadmOrchestrator.instance = self - t = self.get_store('upgrade_state') - if t: - self.upgrade_state = json.loads(t) - else: - self.upgrade_state = None + self.upgrade = CephadmUpgrade(self) self.health_checks = {} @@ -469,261 +462,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons)) - def _wait_for_ok_to_stop(self, s): - # only wait a little bit; the service might go away for something - tries = 4 - while tries > 0: - if s.daemon_type not in ['mon', 'osd', 'mds']: - self.log.info('Upgrade: It is presumed safe to stop %s.%s' % - (s.daemon_type, s.daemon_id)) - return True - ret, out, err = self.mon_command({ - 'prefix': '%s ok-to-stop' % s.daemon_type, - 'ids': [s.daemon_id], - }) - if not self.upgrade_state or self.upgrade_state.get('paused'): - return False - if ret: - self.log.info('Upgrade: It is NOT safe to stop %s.%s' % - (s.daemon_type, s.daemon_id)) - time.sleep(15) - tries -= 1 - else: - self.log.info('Upgrade: It is safe to stop %s.%s' % - (s.daemon_type, s.daemon_id)) - return True - return False - - def _clear_upgrade_health_checks(self): - for k in ['UPGRADE_NO_STANDBY_MGR', - 'UPGRADE_FAILED_PULL']: - if k in self.health_checks: - del self.health_checks[k] - self.set_health_checks(self.health_checks) - - def _fail_upgrade(self, alert_id, alert): - self.log.error('Upgrade: Paused due to %s: %s' % (alert_id, - alert['summary'])) - self.upgrade_state['error'] = alert_id + ': ' + alert['summary'] - self.upgrade_state['paused'] = True - self._save_upgrade_state() - self.health_checks[alert_id] = alert - self.set_health_checks(self.health_checks) - - def _update_upgrade_progress(self, progress): - if 'progress_id' not in self.upgrade_state: - self.upgrade_state['progress_id'] = str(uuid.uuid4()) - self._save_upgrade_state() - self.remote('progress', 'update', self.upgrade_state['progress_id'], - ev_msg='Upgrade to %s' % self.upgrade_state['target_name'], - ev_progress=progress) - - def _do_upgrade(self): - # type: () -> None - if not self.upgrade_state: - self.log.debug('_do_upgrade no state, exiting') - return - - target_name = self.upgrade_state.get('target_name') - target_id = self.upgrade_state.get('target_id', None) - if not target_id: - # need to learn the container hash - self.log.info('Upgrade: First pull of %s' % target_name) - try: - target_id, target_version = self._get_container_image_id(target_name) - except OrchestratorError as e: - self._fail_upgrade('UPGRADE_FAILED_PULL', { - 'severity': 'warning', - 'summary': 'Upgrade: failed to pull target image', - 'count': 1, - 'detail': [str(e)], - }) - return - self.upgrade_state['target_id'] = target_id - self.upgrade_state['target_version'] = target_version - self._save_upgrade_state() - target_version = self.upgrade_state.get('target_version') - self.log.info('Upgrade: Target is %s with id %s' % (target_name, - target_id)) - - # get all distinct container_image settings - image_settings = {} - ret, out, err = self.mon_command({ - 'prefix': 'config dump', - 'format': 'json', - }) - config = json.loads(out) - for opt in config: - if opt['name'] == 'container_image': - image_settings[opt['section']] = opt['value'] - - daemons = self.cache.get_daemons() - done = 0 - for daemon_type in CEPH_UPGRADE_ORDER: - self.log.info('Upgrade: Checking %s daemons...' % daemon_type) - need_upgrade_self = False - for d in daemons: - if d.daemon_type != daemon_type: - continue - if d.container_image_id == target_id: - self.log.debug('daemon %s.%s version correct' % ( - daemon_type, d.daemon_id)) - done += 1 - continue - self.log.debug('daemon %s.%s not correct (%s, %s, %s)' % ( - daemon_type, d.daemon_id, - d.container_image_name, d.container_image_id, d.version)) - - if daemon_type == 'mgr' and \ - d.daemon_id == self.get_mgr_id(): - self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' % - self.get_mgr_id()) - need_upgrade_self = True - continue - - # make sure host has latest container image - out, err, code = self._run_cephadm( - d.hostname, None, 'inspect-image', [], - image=target_name, no_fsid=True, error_ok=True) - if code or json.loads(''.join(out)).get('image_id') != target_id: - self.log.info('Upgrade: Pulling %s on %s' % (target_name, - d.hostname)) - out, err, code = self._run_cephadm( - d.hostname, None, 'pull', [], - image=target_name, no_fsid=True, error_ok=True) - if code: - self._fail_upgrade('UPGRADE_FAILED_PULL', { - 'severity': 'warning', - 'summary': 'Upgrade: failed to pull target image', - 'count': 1, - 'detail': [ - 'failed to pull %s on host %s' % (target_name, - d.hostname)], - }) - return - r = json.loads(''.join(out)) - if r.get('image_id') != target_id: - self.log.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id)) - self.upgrade_state['target_id'] = r['image_id'] - self._save_upgrade_state() - return - - self._update_upgrade_progress(done / len(daemons)) - - if not d.container_image_id: - if d.container_image_name == target_name: - self.log.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name())) - continue - if not self._wait_for_ok_to_stop(d): - return - self.log.info('Upgrade: Redeploying %s.%s' % - (d.daemon_type, d.daemon_id)) - ret, out, err = self.mon_command({ - 'prefix': 'config set', - 'name': 'container_image', - 'value': target_name, - 'who': utils.name_to_config_section(daemon_type + '.' + d.daemon_id), - }) - self._daemon_action( - d.daemon_type, - d.daemon_id, - d.hostname, - 'redeploy' - ) - return - - if need_upgrade_self: - mgr_map = self.get('mgr_map') - num = len(mgr_map.get('standbys')) - if not num: - self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', { - 'severity': 'warning', - 'summary': 'Upgrade: Need standby mgr daemon', - 'count': 1, - 'detail': [ - 'The upgrade process needs to upgrade the mgr, ' - 'but it needs at least one standby to proceed.', - ], - }) - return - - self.log.info('Upgrade: there are %d other already-upgraded ' - 'standby mgrs, failing over' % num) - - self._update_upgrade_progress(done / len(daemons)) - - # fail over - ret, out, err = self.mon_command({ - 'prefix': 'mgr fail', - 'who': self.get_mgr_id(), - }) - return - elif daemon_type == 'mgr': - if 'UPGRADE_NO_STANDBY_MGR' in self.health_checks: - del self.health_checks['UPGRADE_NO_STANDBY_MGR'] - self.set_health_checks(self.health_checks) - - # make sure 'ceph versions' agrees - ret, out, err = self.mon_command({ - 'prefix': 'versions', - }) - j = json.loads(out) - for version, count in j.get(daemon_type, {}).items(): - if version != target_version: - self.log.warning( - 'Upgrade: %d %s daemon(s) are %s != target %s' % - (count, daemon_type, version, target_version)) - - # push down configs - if image_settings.get(daemon_type) != target_name: - self.log.info('Upgrade: Setting container_image for all %s...' % - daemon_type) - ret, out, err = self.mon_command({ - 'prefix': 'config set', - 'name': 'container_image', - 'value': target_name, - 'who': daemon_type, - }) - to_clean = [] - for section in image_settings.keys(): - if section.startswith(utils.name_to_config_section(daemon_type) + '.'): - to_clean.append(section) - if to_clean: - self.log.debug('Upgrade: Cleaning up container_image for %s...' % - to_clean) - for section in to_clean: - ret, image, err = self.mon_command({ - 'prefix': 'config rm', - 'name': 'container_image', - 'who': section, - }) - - self.log.info('Upgrade: All %s daemons are up to date.' % - daemon_type) - - # clean up - self.log.info('Upgrade: Finalizing container_image settings') - ret, out, err = self.mon_command({ - 'prefix': 'config set', - 'name': 'container_image', - 'value': target_name, - 'who': 'global', - }) - for daemon_type in CEPH_UPGRADE_ORDER: - ret, image, err = self.mon_command({ - 'prefix': 'config rm', - 'name': 'container_image', - 'who': utils.name_to_config_section(daemon_type), - }) - - self.log.info('Upgrade: Complete!') - if 'progress_id' in self.upgrade_state: - self.remote('progress', 'complete', - self.upgrade_state['progress_id']) - self.upgrade_state = None - self._save_upgrade_state() - return - def _check_host(self, host): if host not in self.inventory: return @@ -872,8 +610,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self._check_daemons() - if self.upgrade_state and not self.upgrade_state.get('paused'): - self._do_upgrade() + if self.upgrade.continue_upgrade(): continue self._serve_sleep() @@ -950,9 +687,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): continue return name - def _save_upgrade_state(self): - self.set_store('upgrade_state', json.dumps(self.upgrade_state)) - def _reconfig_ssh(self): temp_files = [] # type: list ssh_options = [] # type: List[str] @@ -2376,89 +2110,23 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): @trivial_completion def upgrade_status(self): - r = orchestrator.UpgradeStatusSpec() - if self.upgrade_state: - r.target_image = self.upgrade_state.get('target_name') - r.in_progress = True - if self.upgrade_state.get('error'): - r.message = 'Error: ' + self.upgrade_state.get('error') - elif self.upgrade_state.get('paused'): - r.message = 'Upgrade paused' - return r + return self.upgrade.upgrade_status() @trivial_completion def upgrade_start(self, image, version): - if self.mode != 'root': - raise OrchestratorError('upgrade is not supported in %s mode' % ( - self.mode)) - if version: - try: - (major, minor, patch) = version.split('.') - assert int(minor) >= 0 - assert int(patch) >= 0 - except: - raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)') - if int(major) < 15 or (int(major) == 15 and int(minor) < 2): - raise OrchestratorError('cephadm only supports octopus (15.2.0) or later') - target_name = self.container_image_base + ':v' + version - elif image: - target_name = image - else: - raise OrchestratorError('must specify either image or version') - if self.upgrade_state: - if self.upgrade_state.get('target_name') != target_name: - raise OrchestratorError( - 'Upgrade to %s (not %s) already in progress' % - (self.upgrade_state.get('target_name'), target_name)) - if self.upgrade_state.get('paused'): - del self.upgrade_state['paused'] - self._save_upgrade_state() - return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name') - return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name') - self.upgrade_state = { - 'target_name': target_name, - 'progress_id': str(uuid.uuid4()), - } - self._update_upgrade_progress(0.0) - self._save_upgrade_state() - self._clear_upgrade_health_checks() - self.event.set() - return 'Initiating upgrade to %s' % (target_name) + return self.upgrade.upgrade_start(image, version) @trivial_completion def upgrade_pause(self): - if not self.upgrade_state: - raise OrchestratorError('No upgrade in progress') - if self.upgrade_state.get('paused'): - return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name') - self.upgrade_state['paused'] = True - self._save_upgrade_state() - return 'Paused upgrade to %s' % self.upgrade_state.get('target_name') + return self.upgrade.upgrade_pause() @trivial_completion def upgrade_resume(self): - if not self.upgrade_state: - raise OrchestratorError('No upgrade in progress') - if not self.upgrade_state.get('paused'): - return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name') - del self.upgrade_state['paused'] - self._save_upgrade_state() - self.event.set() - return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name') + return self.upgrade.upgrade_resume() @trivial_completion def upgrade_stop(self): - if not self.upgrade_state: - return 'No upgrade in progress' - target_name = self.upgrade_state.get('target_name') - if 'progress_id' in self.upgrade_state: - self.remote('progress', 'complete', - self.upgrade_state['progress_id']) - self.upgrade_state = None - self._save_upgrade_state() - self._clear_upgrade_health_checks() - self.event.set() - return 'Stopped upgrade to %s' % target_name + return self.upgrade.upgrade_stop() @trivial_completion def remove_osds(self, osd_ids: List[str], diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py new file mode 100644 index 00000000000..65dd99b5fb5 --- /dev/null +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -0,0 +1,378 @@ +import json +import logging +import time +import uuid +from typing import TYPE_CHECKING, Optional + +import orchestrator +from cephadm.utils import name_to_config_section +from orchestrator import OrchestratorError + +if TYPE_CHECKING: + from .module import CephadmOrchestrator + + +# ceph daemon types that use the ceph container image. +# NOTE: listed in upgrade order! +CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror'] + +logger = logging.getLogger(__name__) + +class CephadmUpgrade: + def __init__(self, mgr: "CephadmOrchestrator"): + self.mgr = mgr + + t = self.mgr.get_store('upgrade_state') + if t: + self.upgrade_state = json.loads(t) + else: + self.upgrade_state = None + + def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: + r = orchestrator.UpgradeStatusSpec() + if self.upgrade_state: + r.target_image = self.upgrade_state.get('target_name') + r.in_progress = True + if self.upgrade_state.get('error'): + r.message = 'Error: ' + self.upgrade_state.get('error') + elif self.upgrade_state.get('paused'): + r.message = 'Upgrade paused' + return r + + def upgrade_start(self, image, version) -> str: + if self.mgr.mode != 'root': + raise OrchestratorError('upgrade is not supported in %s mode' % ( + self.mgr.mode)) + if version: + try: + (major, minor, patch) = version.split('.') + assert int(minor) >= 0 + assert int(patch) >= 0 + except: + raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)') + if int(major) < 15 or (int(major) == 15 and int(minor) < 2): + raise OrchestratorError('cephadm only supports octopus (15.2.0) or later') + target_name = self.mgr.container_image_base + ':v' + version + elif image: + target_name = image + else: + raise OrchestratorError('must specify either image or version') + if self.upgrade_state: + if self.upgrade_state.get('target_name') != target_name: + raise OrchestratorError( + 'Upgrade to %s (not %s) already in progress' % + (self.upgrade_state.get('target_name'), target_name)) + if self.upgrade_state.get('paused'): + del self.upgrade_state['paused'] + self._save_upgrade_state() + return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name') + return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name') + self.upgrade_state = { + 'target_name': target_name, + 'progress_id': str(uuid.uuid4()), + } + self._update_upgrade_progress(0.0) + self._save_upgrade_state() + self._clear_upgrade_health_checks() + self.mgr.event.set() + return 'Initiating upgrade to %s' % (target_name) + + def upgrade_pause(self) -> str: + if not self.upgrade_state: + raise OrchestratorError('No upgrade in progress') + if self.upgrade_state.get('paused'): + return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name') + self.upgrade_state['paused'] = True + self._save_upgrade_state() + return 'Paused upgrade to %s' % self.upgrade_state.get('target_name') + + def upgrade_resume(self) -> str: + if not self.upgrade_state: + raise OrchestratorError('No upgrade in progress') + if not self.upgrade_state.get('paused'): + return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name') + del self.upgrade_state['paused'] + self._save_upgrade_state() + self.mgr.event.set() + return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name') + + def upgrade_stop(self) -> str: + if not self.upgrade_state: + return 'No upgrade in progress' + target_name = self.upgrade_state.get('target_name') + if 'progress_id' in self.upgrade_state: + self.mgr.remote('progress', 'complete', + self.upgrade_state['progress_id']) + self.upgrade_state = None + self._save_upgrade_state() + self._clear_upgrade_health_checks() + self.mgr.event.set() + return 'Stopped upgrade to %s' % target_name + + def continue_upgrade(self) -> bool: + """ + Returns false, if nothing was done. + :return: + """ + if self.upgrade_state and not self.upgrade_state.get('paused'): + self._do_upgrade() + return True + return False + + def _wait_for_ok_to_stop(self, s) -> bool: + # only wait a little bit; the service might go away for something + tries = 4 + while tries > 0: + if s.daemon_type not in ['mon', 'osd', 'mds']: + logger.info('Upgrade: It is presumed safe to stop %s.%s' % + (s.daemon_type, s.daemon_id)) + return True + ret, out, err = self.mgr.mon_command({ + 'prefix': '%s ok-to-stop' % s.daemon_type, + 'ids': [s.daemon_id], + }) + if not self.upgrade_state or self.upgrade_state.get('paused'): + return False + if ret: + logger.info('Upgrade: It is NOT safe to stop %s.%s' % + (s.daemon_type, s.daemon_id)) + time.sleep(15) + tries -= 1 + else: + logger.info('Upgrade: It is safe to stop %s.%s' % + (s.daemon_type, s.daemon_id)) + return True + return False + + def _clear_upgrade_health_checks(self) -> None: + for k in ['UPGRADE_NO_STANDBY_MGR', + 'UPGRADE_FAILED_PULL']: + if k in self.mgr.health_checks: + del self.mgr.health_checks[k] + self.mgr.set_health_checks(self.mgr.health_checks) + + def _fail_upgrade(self, alert_id, alert) -> None: + logger.error('Upgrade: Paused due to %s: %s' % (alert_id, + alert['summary'])) + self.upgrade_state['error'] = alert_id + ': ' + alert['summary'] + self.upgrade_state['paused'] = True + self._save_upgrade_state() + self.mgr.health_checks[alert_id] = alert + self.mgr.set_health_checks(self.mgr.health_checks) + + def _update_upgrade_progress(self, progress) -> None: + if 'progress_id' not in self.upgrade_state: + self.upgrade_state['progress_id'] = str(uuid.uuid4()) + self._save_upgrade_state() + self.mgr.remote('progress', 'update', self.upgrade_state['progress_id'], + ev_msg='Upgrade to %s' % self.upgrade_state['target_name'], + ev_progress=progress) + + def _save_upgrade_state(self) -> None: + self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state)) + + def _do_upgrade(self): + # type: () -> None + if not self.upgrade_state: + logger.debug('_do_upgrade no state, exiting') + return + + target_name = self.upgrade_state.get('target_name') + target_id = self.upgrade_state.get('target_id', None) + if not target_id: + # need to learn the container hash + logger.info('Upgrade: First pull of %s' % target_name) + try: + target_id, target_version = self.mgr._get_container_image_id(target_name) + except OrchestratorError as e: + self._fail_upgrade('UPGRADE_FAILED_PULL', { + 'severity': 'warning', + 'summary': 'Upgrade: failed to pull target image', + 'count': 1, + 'detail': [str(e)], + }) + return + self.upgrade_state['target_id'] = target_id + self.upgrade_state['target_version'] = target_version + self._save_upgrade_state() + target_version = self.upgrade_state.get('target_version') + logger.info('Upgrade: Target is %s with id %s' % (target_name, + target_id)) + + # get all distinct container_image settings + image_settings = {} + ret, out, err = self.mgr.mon_command({ + 'prefix': 'config dump', + 'format': 'json', + }) + config = json.loads(out) + for opt in config: + if opt['name'] == 'container_image': + image_settings[opt['section']] = opt['value'] + + daemons = self.mgr.cache.get_daemons() + done = 0 + for daemon_type in CEPH_UPGRADE_ORDER: + logger.info('Upgrade: Checking %s daemons...' % daemon_type) + need_upgrade_self = False + for d in daemons: + if d.daemon_type != daemon_type: + continue + if d.container_image_id == target_id: + logger.debug('daemon %s.%s version correct' % ( + daemon_type, d.daemon_id)) + done += 1 + continue + logger.debug('daemon %s.%s not correct (%s, %s, %s)' % ( + daemon_type, d.daemon_id, + d.container_image_name, d.container_image_id, d.version)) + + if daemon_type == 'mgr' and \ + d.daemon_id == self.mgr.get_mgr_id(): + logger.info('Upgrade: Need to upgrade myself (mgr.%s)' % + self.mgr.get_mgr_id()) + need_upgrade_self = True + continue + + # make sure host has latest container image + out, err, code = self.mgr._run_cephadm( + d.hostname, None, 'inspect-image', [], + image=target_name, no_fsid=True, error_ok=True) + if code or json.loads(''.join(out)).get('image_id') != target_id: + logger.info('Upgrade: Pulling %s on %s' % (target_name, + d.hostname)) + out, err, code = self.mgr._run_cephadm( + d.hostname, None, 'pull', [], + image=target_name, no_fsid=True, error_ok=True) + if code: + self._fail_upgrade('UPGRADE_FAILED_PULL', { + 'severity': 'warning', + 'summary': 'Upgrade: failed to pull target image', + 'count': 1, + 'detail': [ + 'failed to pull %s on host %s' % (target_name, + d.hostname)], + }) + return + r = json.loads(''.join(out)) + if r.get('image_id') != target_id: + logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id)) + self.upgrade_state['target_id'] = r['image_id'] + self._save_upgrade_state() + return + + self._update_upgrade_progress(done / len(daemons)) + + if not d.container_image_id: + if d.container_image_name == target_name: + logger.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name())) + continue + if not self._wait_for_ok_to_stop(d): + return + logger.info('Upgrade: Redeploying %s.%s' % + (d.daemon_type, d.daemon_id)) + ret, out, err = self.mgr.mon_command({ + 'prefix': 'config set', + 'name': 'container_image', + 'value': target_name, + 'who': name_to_config_section(daemon_type + '.' + d.daemon_id), + }) + self.mgr._daemon_action( + d.daemon_type, + d.daemon_id, + d.hostname, + 'redeploy' + ) + return + + if need_upgrade_self: + mgr_map = self.mgr.get('mgr_map') + num = len(mgr_map.get('standbys')) + if not num: + self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', { + 'severity': 'warning', + 'summary': 'Upgrade: Need standby mgr daemon', + 'count': 1, + 'detail': [ + 'The upgrade process needs to upgrade the mgr, ' + 'but it needs at least one standby to proceed.', + ], + }) + return + + logger.info('Upgrade: there are %d other already-upgraded ' + 'standby mgrs, failing over' % num) + + self._update_upgrade_progress(done / len(daemons)) + + # fail over + ret, out, err = self.mgr.mon_command({ + 'prefix': 'mgr fail', + 'who': self.mgr.get_mgr_id(), + }) + return + elif daemon_type == 'mgr': + if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks: + del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR'] + self.mgr.set_health_checks(self.mgr.health_checks) + + # make sure 'ceph versions' agrees + ret, out, err = self.mgr.mon_command({ + 'prefix': 'versions', + }) + j = json.loads(out) + for version, count in j.get(daemon_type, {}).items(): + if version != target_version: + logger.warning( + 'Upgrade: %d %s daemon(s) are %s != target %s' % + (count, daemon_type, version, target_version)) + + # push down configs + if image_settings.get(daemon_type) != target_name: + logger.info('Upgrade: Setting container_image for all %s...' % + daemon_type) + ret, out, err = self.mgr.mon_command({ + 'prefix': 'config set', + 'name': 'container_image', + 'value': target_name, + 'who': daemon_type, + }) + to_clean = [] + for section in image_settings.keys(): + if section.startswith(name_to_config_section(daemon_type) + '.'): + to_clean.append(section) + if to_clean: + logger.debug('Upgrade: Cleaning up container_image for %s...' % + to_clean) + for section in to_clean: + ret, image, err = self.mgr.mon_command({ + 'prefix': 'config rm', + 'name': 'container_image', + 'who': section, + }) + + logger.info('Upgrade: All %s daemons are up to date.' % + daemon_type) + + # clean up + logger.info('Upgrade: Finalizing container_image settings') + ret, out, err = self.mgr.mon_command({ + 'prefix': 'config set', + 'name': 'container_image', + 'value': target_name, + 'who': 'global', + }) + for daemon_type in CEPH_UPGRADE_ORDER: + ret, image, err = self.mgr.mon_command({ + 'prefix': 'config rm', + 'name': 'container_image', + 'who': name_to_config_section(daemon_type), + }) + + logger.info('Upgrade: Complete!') + if 'progress_id' in self.upgrade_state: + self.mgr.remote('progress', 'complete', + self.upgrade_state['progress_id']) + self.upgrade_state = None + self._save_upgrade_state() + return