mgr/cephadm/upgrade: match against any repo_digest, not image_id

The image id can vary across hosts and (most notably) docker vs podman.
Instead, use the repo_digest as an image identifier.

Unfortunately, a single image may have multiple digests, even within the
same registry, so keep a list of the digests for the image we are
upgrading to, and ensure that each container has a digest that matches at
least one of them.

This allows upgrade to proceed in mixed docker+podman clusters.  However,
it does not yet address a cluster with mixed CPU architectures, because
the container image will have different digest(s) for each architecture
build.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2021-01-27 08:54:00 -06:00
parent e9333972c1
commit fe72946729
2 changed files with 28 additions and 19 deletions

View File

@ -81,6 +81,7 @@ def test_upgrade_run(use_repo_digest, cephadm_module: CephadmOrchestrator):
fsid='fsid',
container_id='container_id',
container_image_id='image_id',
container_image_digests=['to_image@repo_digest'],
version='version',
state='running',
)
@ -88,7 +89,12 @@ def test_upgrade_run(use_repo_digest, cephadm_module: CephadmOrchestrator):
)):
CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
cephadm_module.upgrade._do_upgrade()
with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
'image_id': 'image_id',
'repo_digests': ['to_image@repo_digest'],
'ceph_version': 'ceph version 18.2.3 (hash)',
}))):
cephadm_module.upgrade._do_upgrade()
_, image, _ = cephadm_module.check_mon_command({
'prefix': 'config get',

View File

@ -21,7 +21,7 @@ class UpgradeState:
target_name: str,
progress_id: str,
target_id: Optional[str] = None,
repo_digests: Optional[List[str]] = None,
target_digests: Optional[List[str]] = None,
target_version: Optional[str] = None,
error: Optional[str] = None,
paused: Optional[bool] = None,
@ -29,7 +29,7 @@ class UpgradeState:
self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
self.progress_id: str = progress_id
self.target_id: Optional[str] = target_id
self.repo_digests: Optional[List[str]] = repo_digests
self.target_digests: Optional[List[str]] = target_digests
self.target_version: Optional[str] = target_version
self.error: Optional[str] = error
self.paused: bool = paused or False
@ -39,7 +39,7 @@ class UpgradeState:
'target_name': self._target_name,
'progress_id': self.progress_id,
'target_id': self.target_id,
'repo_digests': self.repo_digests,
'target_digests': self.target_digests,
'target_version': self.target_version,
'error': self.error,
'paused': self.paused,
@ -75,11 +75,11 @@ class CephadmUpgrade:
assert self.upgrade_state
if not self.mgr.use_repo_digest:
return self.upgrade_state._target_name
if not self.upgrade_state.repo_digests:
if not self.upgrade_state.target_digests:
return self.upgrade_state._target_name
# FIXME: we assume the first digest is the best one to use
return self.upgrade_state.repo_digests[0]
return self.upgrade_state.target_digests[0]
def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
r = orchestrator.UpgradeStatusSpec()
@ -282,12 +282,13 @@ class CephadmUpgrade:
target_image = self.target_image
target_id = self.upgrade_state.target_id
target_digests = self.upgrade_state.target_digests
target_version = self.upgrade_state.target_version
if not target_id or not target_version or (self.mgr.use_repo_digest and not self.upgrade_state.repo_digests):
if not target_id or not target_version or not target_digests:
# need to learn the container hash
logger.info('Upgrade: First pull of %s' % target_image)
try:
target_id, target_version, repo_digests = CephadmServe(self.mgr)._get_container_image_info(
target_id, target_version, target_digests = CephadmServe(self.mgr)._get_container_image_info(
target_image)
except OrchestratorError as e:
self._fail_upgrade('UPGRADE_FAILED_PULL', {
@ -308,17 +309,19 @@ class CephadmUpgrade:
self.upgrade_state.target_id = target_id
# extract the version portion of 'ceph version {version} ({sha1})'
self.upgrade_state.target_version = target_version.split(' ')[2]
self.upgrade_state.repo_digests = repo_digests
self.upgrade_state.target_digests = target_digests
self._save_upgrade_state()
target_image = self.target_image
if target_digests is None:
target_digests = []
if target_version.startswith('ceph version '):
# tolerate/fix upgrade state from older version
self.upgrade_state.target_version = target_version.split(' ')[2]
target_version = self.upgrade_state.target_version
target_major, target_minor, target_patch = target_version.split('.')
target_major_name = self.mgr.lookup_release_name(int(target_major))
logger.info('Upgrade: Target is version %s (%s), container %s id %s' % (
target_version, target_major_name, target_image, target_id))
logger.info('Upgrade: Target is version %s (%s), container %s digests %s' % (
target_version, target_major_name, target_image, target_digests))
version_error = self._check_target_version(target_version)
if version_error:
@ -340,14 +343,14 @@ class CephadmUpgrade:
for d in daemons:
if d.daemon_type != daemon_type:
continue
if d.container_image_id == target_id:
logger.debug('daemon %s.%s version correct' % (
if any(d in target_digests for d in (d.container_image_digests or [])):
logger.debug('daemon %s.%s container digest correct' % (
daemon_type, d.daemon_id))
done += 1
continue
logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
daemon_type, d.daemon_id,
d.container_image_name, d.container_image_id, d.version))
d.container_image_name, d.container_image_digests, d.version))
assert d.daemon_type is not None
assert d.daemon_id is not None
@ -363,7 +366,7 @@ class CephadmUpgrade:
out, errs, code = CephadmServe(self.mgr)._run_cephadm(
d.hostname, '', 'inspect-image', [],
image=target_image, no_fsid=True, error_ok=True)
if code or json.loads(''.join(out)).get('image_id') != target_id:
if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
logger.info('Upgrade: Pulling %s on %s' % (target_image,
d.hostname))
out, errs, code = CephadmServe(self.mgr)._run_cephadm(
@ -380,10 +383,10 @@ class CephadmUpgrade:
})
return
r = json.loads(''.join(out))
if r.get('image_id') != target_id:
logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (
target_image, d.hostname, r['image_id'], target_id))
self.upgrade_state.target_id = r['image_id']
if not any(d in target_digests for d in r.get('repo_digests', [])):
logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
target_image, d.hostname, r['repo_digests'], target_digests))
self.upgrade_state.target_digests = r['repo_digests']
self._save_upgrade_state()
return