mirror of
https://github.com/ceph/ceph
synced 2024-12-29 15:03:33 +00:00
Merge pull request #34950 from sebastian-philipp/cephadm-upgrade.py
mgr/cephadm: move upgrade code to upgrade.py Reviewed-by: Michael Fritch <mfritch@suse.com>
This commit is contained in:
commit
bceb02a31f
@ -22,7 +22,6 @@ import tempfile
|
||||
import multiprocessing.pool
|
||||
import shutil
|
||||
import subprocess
|
||||
import uuid
|
||||
|
||||
from ceph.deployment import inventory
|
||||
from ceph.deployment.drive_group import DriveGroupSpec
|
||||
@ -43,6 +42,7 @@ from .services.osd import RemoveUtil, OSDRemoval, OSDService
|
||||
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
|
||||
NodeExporterService
|
||||
from .inventory import Inventory, SpecStore, HostCache
|
||||
from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
|
||||
|
||||
try:
|
||||
import remoto
|
||||
@ -70,9 +70,6 @@ Host *
|
||||
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
|
||||
CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
|
||||
|
||||
# ceph daemon types that use the ceph container image.
|
||||
# NOTE: listed in upgrade order!
|
||||
CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
|
||||
CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
|
||||
|
||||
|
||||
@ -391,11 +388,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
|
||||
|
||||
CephadmOrchestrator.instance = self
|
||||
|
||||
t = self.get_store('upgrade_state')
|
||||
if t:
|
||||
self.upgrade_state = json.loads(t)
|
||||
else:
|
||||
self.upgrade_state = None
|
||||
self.upgrade = CephadmUpgrade(self)
|
||||
|
||||
self.health_checks = {}
|
||||
|
||||
@ -469,261 +462,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
|
||||
return
|
||||
raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
|
||||
|
||||
def _wait_for_ok_to_stop(self, s):
|
||||
# only wait a little bit; the service might go away for something
|
||||
tries = 4
|
||||
while tries > 0:
|
||||
if s.daemon_type not in ['mon', 'osd', 'mds']:
|
||||
self.log.info('Upgrade: It is presumed safe to stop %s.%s' %
|
||||
(s.daemon_type, s.daemon_id))
|
||||
return True
|
||||
ret, out, err = self.mon_command({
|
||||
'prefix': '%s ok-to-stop' % s.daemon_type,
|
||||
'ids': [s.daemon_id],
|
||||
})
|
||||
if not self.upgrade_state or self.upgrade_state.get('paused'):
|
||||
return False
|
||||
if ret:
|
||||
self.log.info('Upgrade: It is NOT safe to stop %s.%s' %
|
||||
(s.daemon_type, s.daemon_id))
|
||||
time.sleep(15)
|
||||
tries -= 1
|
||||
else:
|
||||
self.log.info('Upgrade: It is safe to stop %s.%s' %
|
||||
(s.daemon_type, s.daemon_id))
|
||||
return True
|
||||
return False
|
||||
|
||||
def _clear_upgrade_health_checks(self):
|
||||
for k in ['UPGRADE_NO_STANDBY_MGR',
|
||||
'UPGRADE_FAILED_PULL']:
|
||||
if k in self.health_checks:
|
||||
del self.health_checks[k]
|
||||
self.set_health_checks(self.health_checks)
|
||||
|
||||
def _fail_upgrade(self, alert_id, alert):
|
||||
self.log.error('Upgrade: Paused due to %s: %s' % (alert_id,
|
||||
alert['summary']))
|
||||
self.upgrade_state['error'] = alert_id + ': ' + alert['summary']
|
||||
self.upgrade_state['paused'] = True
|
||||
self._save_upgrade_state()
|
||||
self.health_checks[alert_id] = alert
|
||||
self.set_health_checks(self.health_checks)
|
||||
|
||||
def _update_upgrade_progress(self, progress):
|
||||
if 'progress_id' not in self.upgrade_state:
|
||||
self.upgrade_state['progress_id'] = str(uuid.uuid4())
|
||||
self._save_upgrade_state()
|
||||
self.remote('progress', 'update', self.upgrade_state['progress_id'],
|
||||
ev_msg='Upgrade to %s' % self.upgrade_state['target_name'],
|
||||
ev_progress=progress)
|
||||
|
||||
def _do_upgrade(self):
|
||||
# type: () -> None
|
||||
if not self.upgrade_state:
|
||||
self.log.debug('_do_upgrade no state, exiting')
|
||||
return
|
||||
|
||||
target_name = self.upgrade_state.get('target_name')
|
||||
target_id = self.upgrade_state.get('target_id', None)
|
||||
if not target_id:
|
||||
# need to learn the container hash
|
||||
self.log.info('Upgrade: First pull of %s' % target_name)
|
||||
try:
|
||||
target_id, target_version = self._get_container_image_id(target_name)
|
||||
except OrchestratorError as e:
|
||||
self._fail_upgrade('UPGRADE_FAILED_PULL', {
|
||||
'severity': 'warning',
|
||||
'summary': 'Upgrade: failed to pull target image',
|
||||
'count': 1,
|
||||
'detail': [str(e)],
|
||||
})
|
||||
return
|
||||
self.upgrade_state['target_id'] = target_id
|
||||
self.upgrade_state['target_version'] = target_version
|
||||
self._save_upgrade_state()
|
||||
target_version = self.upgrade_state.get('target_version')
|
||||
self.log.info('Upgrade: Target is %s with id %s' % (target_name,
|
||||
target_id))
|
||||
|
||||
# get all distinct container_image settings
|
||||
image_settings = {}
|
||||
ret, out, err = self.mon_command({
|
||||
'prefix': 'config dump',
|
||||
'format': 'json',
|
||||
})
|
||||
config = json.loads(out)
|
||||
for opt in config:
|
||||
if opt['name'] == 'container_image':
|
||||
image_settings[opt['section']] = opt['value']
|
||||
|
||||
daemons = self.cache.get_daemons()
|
||||
done = 0
|
||||
for daemon_type in CEPH_UPGRADE_ORDER:
|
||||
self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
|
||||
need_upgrade_self = False
|
||||
for d in daemons:
|
||||
if d.daemon_type != daemon_type:
|
||||
continue
|
||||
if d.container_image_id == target_id:
|
||||
self.log.debug('daemon %s.%s version correct' % (
|
||||
daemon_type, d.daemon_id))
|
||||
done += 1
|
||||
continue
|
||||
self.log.debug('daemon %s.%s not correct (%s, %s, %s)' % (
|
||||
daemon_type, d.daemon_id,
|
||||
d.container_image_name, d.container_image_id, d.version))
|
||||
|
||||
if daemon_type == 'mgr' and \
|
||||
d.daemon_id == self.get_mgr_id():
|
||||
self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' %
|
||||
self.get_mgr_id())
|
||||
need_upgrade_self = True
|
||||
continue
|
||||
|
||||
# make sure host has latest container image
|
||||
out, err, code = self._run_cephadm(
|
||||
d.hostname, None, 'inspect-image', [],
|
||||
image=target_name, no_fsid=True, error_ok=True)
|
||||
if code or json.loads(''.join(out)).get('image_id') != target_id:
|
||||
self.log.info('Upgrade: Pulling %s on %s' % (target_name,
|
||||
d.hostname))
|
||||
out, err, code = self._run_cephadm(
|
||||
d.hostname, None, 'pull', [],
|
||||
image=target_name, no_fsid=True, error_ok=True)
|
||||
if code:
|
||||
self._fail_upgrade('UPGRADE_FAILED_PULL', {
|
||||
'severity': 'warning',
|
||||
'summary': 'Upgrade: failed to pull target image',
|
||||
'count': 1,
|
||||
'detail': [
|
||||
'failed to pull %s on host %s' % (target_name,
|
||||
d.hostname)],
|
||||
})
|
||||
return
|
||||
r = json.loads(''.join(out))
|
||||
if r.get('image_id') != target_id:
|
||||
self.log.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id))
|
||||
self.upgrade_state['target_id'] = r['image_id']
|
||||
self._save_upgrade_state()
|
||||
return
|
||||
|
||||
self._update_upgrade_progress(done / len(daemons))
|
||||
|
||||
if not d.container_image_id:
|
||||
if d.container_image_name == target_name:
|
||||
self.log.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name()))
|
||||
continue
|
||||
if not self._wait_for_ok_to_stop(d):
|
||||
return
|
||||
self.log.info('Upgrade: Redeploying %s.%s' %
|
||||
(d.daemon_type, d.daemon_id))
|
||||
ret, out, err = self.mon_command({
|
||||
'prefix': 'config set',
|
||||
'name': 'container_image',
|
||||
'value': target_name,
|
||||
'who': utils.name_to_config_section(daemon_type + '.' + d.daemon_id),
|
||||
})
|
||||
self._daemon_action(
|
||||
d.daemon_type,
|
||||
d.daemon_id,
|
||||
d.hostname,
|
||||
'redeploy'
|
||||
)
|
||||
return
|
||||
|
||||
if need_upgrade_self:
|
||||
mgr_map = self.get('mgr_map')
|
||||
num = len(mgr_map.get('standbys'))
|
||||
if not num:
|
||||
self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
|
||||
'severity': 'warning',
|
||||
'summary': 'Upgrade: Need standby mgr daemon',
|
||||
'count': 1,
|
||||
'detail': [
|
||||
'The upgrade process needs to upgrade the mgr, '
|
||||
'but it needs at least one standby to proceed.',
|
||||
],
|
||||
})
|
||||
return
|
||||
|
||||
self.log.info('Upgrade: there are %d other already-upgraded '
|
||||
'standby mgrs, failing over' % num)
|
||||
|
||||
self._update_upgrade_progress(done / len(daemons))
|
||||
|
||||
# fail over
|
||||
ret, out, err = self.mon_command({
|
||||
'prefix': 'mgr fail',
|
||||
'who': self.get_mgr_id(),
|
||||
})
|
||||
return
|
||||
elif daemon_type == 'mgr':
|
||||
if 'UPGRADE_NO_STANDBY_MGR' in self.health_checks:
|
||||
del self.health_checks['UPGRADE_NO_STANDBY_MGR']
|
||||
self.set_health_checks(self.health_checks)
|
||||
|
||||
# make sure 'ceph versions' agrees
|
||||
ret, out, err = self.mon_command({
|
||||
'prefix': 'versions',
|
||||
})
|
||||
j = json.loads(out)
|
||||
for version, count in j.get(daemon_type, {}).items():
|
||||
if version != target_version:
|
||||
self.log.warning(
|
||||
'Upgrade: %d %s daemon(s) are %s != target %s' %
|
||||
(count, daemon_type, version, target_version))
|
||||
|
||||
# push down configs
|
||||
if image_settings.get(daemon_type) != target_name:
|
||||
self.log.info('Upgrade: Setting container_image for all %s...' %
|
||||
daemon_type)
|
||||
ret, out, err = self.mon_command({
|
||||
'prefix': 'config set',
|
||||
'name': 'container_image',
|
||||
'value': target_name,
|
||||
'who': daemon_type,
|
||||
})
|
||||
to_clean = []
|
||||
for section in image_settings.keys():
|
||||
if section.startswith(utils.name_to_config_section(daemon_type) + '.'):
|
||||
to_clean.append(section)
|
||||
if to_clean:
|
||||
self.log.debug('Upgrade: Cleaning up container_image for %s...' %
|
||||
to_clean)
|
||||
for section in to_clean:
|
||||
ret, image, err = self.mon_command({
|
||||
'prefix': 'config rm',
|
||||
'name': 'container_image',
|
||||
'who': section,
|
||||
})
|
||||
|
||||
self.log.info('Upgrade: All %s daemons are up to date.' %
|
||||
daemon_type)
|
||||
|
||||
# clean up
|
||||
self.log.info('Upgrade: Finalizing container_image settings')
|
||||
ret, out, err = self.mon_command({
|
||||
'prefix': 'config set',
|
||||
'name': 'container_image',
|
||||
'value': target_name,
|
||||
'who': 'global',
|
||||
})
|
||||
for daemon_type in CEPH_UPGRADE_ORDER:
|
||||
ret, image, err = self.mon_command({
|
||||
'prefix': 'config rm',
|
||||
'name': 'container_image',
|
||||
'who': utils.name_to_config_section(daemon_type),
|
||||
})
|
||||
|
||||
self.log.info('Upgrade: Complete!')
|
||||
if 'progress_id' in self.upgrade_state:
|
||||
self.remote('progress', 'complete',
|
||||
self.upgrade_state['progress_id'])
|
||||
self.upgrade_state = None
|
||||
self._save_upgrade_state()
|
||||
return
|
||||
|
||||
def _check_host(self, host):
|
||||
if host not in self.inventory:
|
||||
return
|
||||
@ -872,8 +610,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
|
||||
|
||||
self._check_daemons()
|
||||
|
||||
if self.upgrade_state and not self.upgrade_state.get('paused'):
|
||||
self._do_upgrade()
|
||||
if self.upgrade.continue_upgrade():
|
||||
continue
|
||||
|
||||
self._serve_sleep()
|
||||
@ -950,9 +687,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
|
||||
continue
|
||||
return name
|
||||
|
||||
def _save_upgrade_state(self):
|
||||
self.set_store('upgrade_state', json.dumps(self.upgrade_state))
|
||||
|
||||
def _reconfig_ssh(self):
|
||||
temp_files = [] # type: list
|
||||
ssh_options = [] # type: List[str]
|
||||
@ -2376,89 +2110,23 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
|
||||
|
||||
@trivial_completion
|
||||
def upgrade_status(self):
|
||||
r = orchestrator.UpgradeStatusSpec()
|
||||
if self.upgrade_state:
|
||||
r.target_image = self.upgrade_state.get('target_name')
|
||||
r.in_progress = True
|
||||
if self.upgrade_state.get('error'):
|
||||
r.message = 'Error: ' + self.upgrade_state.get('error')
|
||||
elif self.upgrade_state.get('paused'):
|
||||
r.message = 'Upgrade paused'
|
||||
return r
|
||||
return self.upgrade.upgrade_status()
|
||||
|
||||
@trivial_completion
|
||||
def upgrade_start(self, image, version):
|
||||
if self.mode != 'root':
|
||||
raise OrchestratorError('upgrade is not supported in %s mode' % (
|
||||
self.mode))
|
||||
if version:
|
||||
try:
|
||||
(major, minor, patch) = version.split('.')
|
||||
assert int(minor) >= 0
|
||||
assert int(patch) >= 0
|
||||
except:
|
||||
raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
|
||||
if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
|
||||
raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
|
||||
target_name = self.container_image_base + ':v' + version
|
||||
elif image:
|
||||
target_name = image
|
||||
else:
|
||||
raise OrchestratorError('must specify either image or version')
|
||||
if self.upgrade_state:
|
||||
if self.upgrade_state.get('target_name') != target_name:
|
||||
raise OrchestratorError(
|
||||
'Upgrade to %s (not %s) already in progress' %
|
||||
(self.upgrade_state.get('target_name'), target_name))
|
||||
if self.upgrade_state.get('paused'):
|
||||
del self.upgrade_state['paused']
|
||||
self._save_upgrade_state()
|
||||
return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
|
||||
return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name')
|
||||
self.upgrade_state = {
|
||||
'target_name': target_name,
|
||||
'progress_id': str(uuid.uuid4()),
|
||||
}
|
||||
self._update_upgrade_progress(0.0)
|
||||
self._save_upgrade_state()
|
||||
self._clear_upgrade_health_checks()
|
||||
self.event.set()
|
||||
return 'Initiating upgrade to %s' % (target_name)
|
||||
return self.upgrade.upgrade_start(image, version)
|
||||
|
||||
@trivial_completion
|
||||
def upgrade_pause(self):
|
||||
if not self.upgrade_state:
|
||||
raise OrchestratorError('No upgrade in progress')
|
||||
if self.upgrade_state.get('paused'):
|
||||
return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name')
|
||||
self.upgrade_state['paused'] = True
|
||||
self._save_upgrade_state()
|
||||
return 'Paused upgrade to %s' % self.upgrade_state.get('target_name')
|
||||
return self.upgrade.upgrade_pause()
|
||||
|
||||
@trivial_completion
|
||||
def upgrade_resume(self):
|
||||
if not self.upgrade_state:
|
||||
raise OrchestratorError('No upgrade in progress')
|
||||
if not self.upgrade_state.get('paused'):
|
||||
return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name')
|
||||
del self.upgrade_state['paused']
|
||||
self._save_upgrade_state()
|
||||
self.event.set()
|
||||
return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
|
||||
return self.upgrade.upgrade_resume()
|
||||
|
||||
@trivial_completion
|
||||
def upgrade_stop(self):
|
||||
if not self.upgrade_state:
|
||||
return 'No upgrade in progress'
|
||||
target_name = self.upgrade_state.get('target_name')
|
||||
if 'progress_id' in self.upgrade_state:
|
||||
self.remote('progress', 'complete',
|
||||
self.upgrade_state['progress_id'])
|
||||
self.upgrade_state = None
|
||||
self._save_upgrade_state()
|
||||
self._clear_upgrade_health_checks()
|
||||
self.event.set()
|
||||
return 'Stopped upgrade to %s' % target_name
|
||||
return self.upgrade.upgrade_stop()
|
||||
|
||||
@trivial_completion
|
||||
def remove_osds(self, osd_ids: List[str],
|
||||
|
378
src/pybind/mgr/cephadm/upgrade.py
Normal file
378
src/pybind/mgr/cephadm/upgrade.py
Normal file
@ -0,0 +1,378 @@
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
import orchestrator
|
||||
from cephadm.utils import name_to_config_section
|
||||
from orchestrator import OrchestratorError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .module import CephadmOrchestrator
|
||||
|
||||
|
||||
# ceph daemon types that use the ceph container image.
|
||||
# NOTE: listed in upgrade order!
|
||||
CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class CephadmUpgrade:
|
||||
def __init__(self, mgr: "CephadmOrchestrator"):
|
||||
self.mgr = mgr
|
||||
|
||||
t = self.mgr.get_store('upgrade_state')
|
||||
if t:
|
||||
self.upgrade_state = json.loads(t)
|
||||
else:
|
||||
self.upgrade_state = None
|
||||
|
||||
def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
|
||||
r = orchestrator.UpgradeStatusSpec()
|
||||
if self.upgrade_state:
|
||||
r.target_image = self.upgrade_state.get('target_name')
|
||||
r.in_progress = True
|
||||
if self.upgrade_state.get('error'):
|
||||
r.message = 'Error: ' + self.upgrade_state.get('error')
|
||||
elif self.upgrade_state.get('paused'):
|
||||
r.message = 'Upgrade paused'
|
||||
return r
|
||||
|
||||
def upgrade_start(self, image, version) -> str:
|
||||
if self.mgr.mode != 'root':
|
||||
raise OrchestratorError('upgrade is not supported in %s mode' % (
|
||||
self.mgr.mode))
|
||||
if version:
|
||||
try:
|
||||
(major, minor, patch) = version.split('.')
|
||||
assert int(minor) >= 0
|
||||
assert int(patch) >= 0
|
||||
except:
|
||||
raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
|
||||
if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
|
||||
raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
|
||||
target_name = self.mgr.container_image_base + ':v' + version
|
||||
elif image:
|
||||
target_name = image
|
||||
else:
|
||||
raise OrchestratorError('must specify either image or version')
|
||||
if self.upgrade_state:
|
||||
if self.upgrade_state.get('target_name') != target_name:
|
||||
raise OrchestratorError(
|
||||
'Upgrade to %s (not %s) already in progress' %
|
||||
(self.upgrade_state.get('target_name'), target_name))
|
||||
if self.upgrade_state.get('paused'):
|
||||
del self.upgrade_state['paused']
|
||||
self._save_upgrade_state()
|
||||
return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
|
||||
return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name')
|
||||
self.upgrade_state = {
|
||||
'target_name': target_name,
|
||||
'progress_id': str(uuid.uuid4()),
|
||||
}
|
||||
self._update_upgrade_progress(0.0)
|
||||
self._save_upgrade_state()
|
||||
self._clear_upgrade_health_checks()
|
||||
self.mgr.event.set()
|
||||
return 'Initiating upgrade to %s' % (target_name)
|
||||
|
||||
def upgrade_pause(self) -> str:
|
||||
if not self.upgrade_state:
|
||||
raise OrchestratorError('No upgrade in progress')
|
||||
if self.upgrade_state.get('paused'):
|
||||
return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name')
|
||||
self.upgrade_state['paused'] = True
|
||||
self._save_upgrade_state()
|
||||
return 'Paused upgrade to %s' % self.upgrade_state.get('target_name')
|
||||
|
||||
def upgrade_resume(self) -> str:
|
||||
if not self.upgrade_state:
|
||||
raise OrchestratorError('No upgrade in progress')
|
||||
if not self.upgrade_state.get('paused'):
|
||||
return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name')
|
||||
del self.upgrade_state['paused']
|
||||
self._save_upgrade_state()
|
||||
self.mgr.event.set()
|
||||
return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
|
||||
|
||||
def upgrade_stop(self) -> str:
|
||||
if not self.upgrade_state:
|
||||
return 'No upgrade in progress'
|
||||
target_name = self.upgrade_state.get('target_name')
|
||||
if 'progress_id' in self.upgrade_state:
|
||||
self.mgr.remote('progress', 'complete',
|
||||
self.upgrade_state['progress_id'])
|
||||
self.upgrade_state = None
|
||||
self._save_upgrade_state()
|
||||
self._clear_upgrade_health_checks()
|
||||
self.mgr.event.set()
|
||||
return 'Stopped upgrade to %s' % target_name
|
||||
|
||||
def continue_upgrade(self) -> bool:
|
||||
"""
|
||||
Returns false, if nothing was done.
|
||||
:return:
|
||||
"""
|
||||
if self.upgrade_state and not self.upgrade_state.get('paused'):
|
||||
self._do_upgrade()
|
||||
return True
|
||||
return False
|
||||
|
||||
def _wait_for_ok_to_stop(self, s) -> bool:
|
||||
# only wait a little bit; the service might go away for something
|
||||
tries = 4
|
||||
while tries > 0:
|
||||
if s.daemon_type not in ['mon', 'osd', 'mds']:
|
||||
logger.info('Upgrade: It is presumed safe to stop %s.%s' %
|
||||
(s.daemon_type, s.daemon_id))
|
||||
return True
|
||||
ret, out, err = self.mgr.mon_command({
|
||||
'prefix': '%s ok-to-stop' % s.daemon_type,
|
||||
'ids': [s.daemon_id],
|
||||
})
|
||||
if not self.upgrade_state or self.upgrade_state.get('paused'):
|
||||
return False
|
||||
if ret:
|
||||
logger.info('Upgrade: It is NOT safe to stop %s.%s' %
|
||||
(s.daemon_type, s.daemon_id))
|
||||
time.sleep(15)
|
||||
tries -= 1
|
||||
else:
|
||||
logger.info('Upgrade: It is safe to stop %s.%s' %
|
||||
(s.daemon_type, s.daemon_id))
|
||||
return True
|
||||
return False
|
||||
|
||||
def _clear_upgrade_health_checks(self) -> None:
|
||||
for k in ['UPGRADE_NO_STANDBY_MGR',
|
||||
'UPGRADE_FAILED_PULL']:
|
||||
if k in self.mgr.health_checks:
|
||||
del self.mgr.health_checks[k]
|
||||
self.mgr.set_health_checks(self.mgr.health_checks)
|
||||
|
||||
def _fail_upgrade(self, alert_id, alert) -> None:
|
||||
logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
|
||||
alert['summary']))
|
||||
self.upgrade_state['error'] = alert_id + ': ' + alert['summary']
|
||||
self.upgrade_state['paused'] = True
|
||||
self._save_upgrade_state()
|
||||
self.mgr.health_checks[alert_id] = alert
|
||||
self.mgr.set_health_checks(self.mgr.health_checks)
|
||||
|
||||
def _update_upgrade_progress(self, progress) -> None:
|
||||
if 'progress_id' not in self.upgrade_state:
|
||||
self.upgrade_state['progress_id'] = str(uuid.uuid4())
|
||||
self._save_upgrade_state()
|
||||
self.mgr.remote('progress', 'update', self.upgrade_state['progress_id'],
|
||||
ev_msg='Upgrade to %s' % self.upgrade_state['target_name'],
|
||||
ev_progress=progress)
|
||||
|
||||
def _save_upgrade_state(self) -> None:
|
||||
self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state))
|
||||
|
||||
def _do_upgrade(self):
|
||||
# type: () -> None
|
||||
if not self.upgrade_state:
|
||||
logger.debug('_do_upgrade no state, exiting')
|
||||
return
|
||||
|
||||
target_name = self.upgrade_state.get('target_name')
|
||||
target_id = self.upgrade_state.get('target_id', None)
|
||||
if not target_id:
|
||||
# need to learn the container hash
|
||||
logger.info('Upgrade: First pull of %s' % target_name)
|
||||
try:
|
||||
target_id, target_version = self.mgr._get_container_image_id(target_name)
|
||||
except OrchestratorError as e:
|
||||
self._fail_upgrade('UPGRADE_FAILED_PULL', {
|
||||
'severity': 'warning',
|
||||
'summary': 'Upgrade: failed to pull target image',
|
||||
'count': 1,
|
||||
'detail': [str(e)],
|
||||
})
|
||||
return
|
||||
self.upgrade_state['target_id'] = target_id
|
||||
self.upgrade_state['target_version'] = target_version
|
||||
self._save_upgrade_state()
|
||||
target_version = self.upgrade_state.get('target_version')
|
||||
logger.info('Upgrade: Target is %s with id %s' % (target_name,
|
||||
target_id))
|
||||
|
||||
# get all distinct container_image settings
|
||||
image_settings = {}
|
||||
ret, out, err = self.mgr.mon_command({
|
||||
'prefix': 'config dump',
|
||||
'format': 'json',
|
||||
})
|
||||
config = json.loads(out)
|
||||
for opt in config:
|
||||
if opt['name'] == 'container_image':
|
||||
image_settings[opt['section']] = opt['value']
|
||||
|
||||
daemons = self.mgr.cache.get_daemons()
|
||||
done = 0
|
||||
for daemon_type in CEPH_UPGRADE_ORDER:
|
||||
logger.info('Upgrade: Checking %s daemons...' % daemon_type)
|
||||
need_upgrade_self = False
|
||||
for d in daemons:
|
||||
if d.daemon_type != daemon_type:
|
||||
continue
|
||||
if d.container_image_id == target_id:
|
||||
logger.debug('daemon %s.%s version correct' % (
|
||||
daemon_type, d.daemon_id))
|
||||
done += 1
|
||||
continue
|
||||
logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
|
||||
daemon_type, d.daemon_id,
|
||||
d.container_image_name, d.container_image_id, d.version))
|
||||
|
||||
if daemon_type == 'mgr' and \
|
||||
d.daemon_id == self.mgr.get_mgr_id():
|
||||
logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
|
||||
self.mgr.get_mgr_id())
|
||||
need_upgrade_self = True
|
||||
continue
|
||||
|
||||
# make sure host has latest container image
|
||||
out, err, code = self.mgr._run_cephadm(
|
||||
d.hostname, None, 'inspect-image', [],
|
||||
image=target_name, no_fsid=True, error_ok=True)
|
||||
if code or json.loads(''.join(out)).get('image_id') != target_id:
|
||||
logger.info('Upgrade: Pulling %s on %s' % (target_name,
|
||||
d.hostname))
|
||||
out, err, code = self.mgr._run_cephadm(
|
||||
d.hostname, None, 'pull', [],
|
||||
image=target_name, no_fsid=True, error_ok=True)
|
||||
if code:
|
||||
self._fail_upgrade('UPGRADE_FAILED_PULL', {
|
||||
'severity': 'warning',
|
||||
'summary': 'Upgrade: failed to pull target image',
|
||||
'count': 1,
|
||||
'detail': [
|
||||
'failed to pull %s on host %s' % (target_name,
|
||||
d.hostname)],
|
||||
})
|
||||
return
|
||||
r = json.loads(''.join(out))
|
||||
if r.get('image_id') != target_id:
|
||||
logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id))
|
||||
self.upgrade_state['target_id'] = r['image_id']
|
||||
self._save_upgrade_state()
|
||||
return
|
||||
|
||||
self._update_upgrade_progress(done / len(daemons))
|
||||
|
||||
if not d.container_image_id:
|
||||
if d.container_image_name == target_name:
|
||||
logger.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name()))
|
||||
continue
|
||||
if not self._wait_for_ok_to_stop(d):
|
||||
return
|
||||
logger.info('Upgrade: Redeploying %s.%s' %
|
||||
(d.daemon_type, d.daemon_id))
|
||||
ret, out, err = self.mgr.mon_command({
|
||||
'prefix': 'config set',
|
||||
'name': 'container_image',
|
||||
'value': target_name,
|
||||
'who': name_to_config_section(daemon_type + '.' + d.daemon_id),
|
||||
})
|
||||
self.mgr._daemon_action(
|
||||
d.daemon_type,
|
||||
d.daemon_id,
|
||||
d.hostname,
|
||||
'redeploy'
|
||||
)
|
||||
return
|
||||
|
||||
if need_upgrade_self:
|
||||
mgr_map = self.mgr.get('mgr_map')
|
||||
num = len(mgr_map.get('standbys'))
|
||||
if not num:
|
||||
self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
|
||||
'severity': 'warning',
|
||||
'summary': 'Upgrade: Need standby mgr daemon',
|
||||
'count': 1,
|
||||
'detail': [
|
||||
'The upgrade process needs to upgrade the mgr, '
|
||||
'but it needs at least one standby to proceed.',
|
||||
],
|
||||
})
|
||||
return
|
||||
|
||||
logger.info('Upgrade: there are %d other already-upgraded '
|
||||
'standby mgrs, failing over' % num)
|
||||
|
||||
self._update_upgrade_progress(done / len(daemons))
|
||||
|
||||
# fail over
|
||||
ret, out, err = self.mgr.mon_command({
|
||||
'prefix': 'mgr fail',
|
||||
'who': self.mgr.get_mgr_id(),
|
||||
})
|
||||
return
|
||||
elif daemon_type == 'mgr':
|
||||
if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
|
||||
del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
|
||||
self.mgr.set_health_checks(self.mgr.health_checks)
|
||||
|
||||
# make sure 'ceph versions' agrees
|
||||
ret, out, err = self.mgr.mon_command({
|
||||
'prefix': 'versions',
|
||||
})
|
||||
j = json.loads(out)
|
||||
for version, count in j.get(daemon_type, {}).items():
|
||||
if version != target_version:
|
||||
logger.warning(
|
||||
'Upgrade: %d %s daemon(s) are %s != target %s' %
|
||||
(count, daemon_type, version, target_version))
|
||||
|
||||
# push down configs
|
||||
if image_settings.get(daemon_type) != target_name:
|
||||
logger.info('Upgrade: Setting container_image for all %s...' %
|
||||
daemon_type)
|
||||
ret, out, err = self.mgr.mon_command({
|
||||
'prefix': 'config set',
|
||||
'name': 'container_image',
|
||||
'value': target_name,
|
||||
'who': daemon_type,
|
||||
})
|
||||
to_clean = []
|
||||
for section in image_settings.keys():
|
||||
if section.startswith(name_to_config_section(daemon_type) + '.'):
|
||||
to_clean.append(section)
|
||||
if to_clean:
|
||||
logger.debug('Upgrade: Cleaning up container_image for %s...' %
|
||||
to_clean)
|
||||
for section in to_clean:
|
||||
ret, image, err = self.mgr.mon_command({
|
||||
'prefix': 'config rm',
|
||||
'name': 'container_image',
|
||||
'who': section,
|
||||
})
|
||||
|
||||
logger.info('Upgrade: All %s daemons are up to date.' %
|
||||
daemon_type)
|
||||
|
||||
# clean up
|
||||
logger.info('Upgrade: Finalizing container_image settings')
|
||||
ret, out, err = self.mgr.mon_command({
|
||||
'prefix': 'config set',
|
||||
'name': 'container_image',
|
||||
'value': target_name,
|
||||
'who': 'global',
|
||||
})
|
||||
for daemon_type in CEPH_UPGRADE_ORDER:
|
||||
ret, image, err = self.mgr.mon_command({
|
||||
'prefix': 'config rm',
|
||||
'name': 'container_image',
|
||||
'who': name_to_config_section(daemon_type),
|
||||
})
|
||||
|
||||
logger.info('Upgrade: Complete!')
|
||||
if 'progress_id' in self.upgrade_state:
|
||||
self.mgr.remote('progress', 'complete',
|
||||
self.upgrade_state['progress_id'])
|
||||
self.upgrade_state = None
|
||||
self._save_upgrade_state()
|
||||
return
|
Loading…
Reference in New Issue
Block a user