mgr/cephadm: autopep8 changes

Signed-off-by: Adam King <adking@redhat.com>
This commit is contained in:
Adam King 2022-04-05 17:38:20 -04:00
parent c1f3497b43
commit 62a06c4647
6 changed files with 50 additions and 25 deletions

View File

@ -2719,7 +2719,8 @@ Then run the following:
if limit is not None:
if limit < 1:
raise OrchestratorError(f'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
raise OrchestratorError(
f'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit)

View File

@ -298,7 +298,8 @@ class OSDService(CephService):
# driveselection for host
cmds: List[str] = self.driveselection_to_ceph_volume(ds,
osd_id_claims.filtered_by_host(host),
osd_id_claims.filtered_by_host(
host),
preview=True)
if not cmds:
logger.debug("No data_devices, skipping DriveGroup: {}".format(

View File

@ -166,9 +166,11 @@ class TestCephadm(object):
resolve_ip.side_effect = ['192.168.122.1', '127.0.0.1', '127.0.0.1']
assert wait(cephadm_module, cephadm_module.get_hosts()) == []
cephadm_module._add_host(HostSpec('test', '192.168.122.1'))
assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '192.168.122.1')]
assert wait(cephadm_module, cephadm_module.get_hosts()) == [
HostSpec('test', '192.168.122.1')]
cephadm_module._add_host(HostSpec('test'))
assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', '192.168.122.1')]
assert wait(cephadm_module, cephadm_module.get_hosts()) == [
HostSpec('test', '192.168.122.1')]
with pytest.raises(OrchestratorError):
cephadm_module._add_host(HostSpec('test2'))
@ -894,7 +896,8 @@ class TestCephadm(object):
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
assert all(any(cmd in exp_cmd for exp_cmd in exp_commands)
for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
@pytest.mark.parametrize(
"devices, preview, exp_commands",
@ -919,7 +922,8 @@ class TestCephadm(object):
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
assert all(any(cmd in exp_cmd for exp_cmd in exp_commands) for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
assert all(any(cmd in exp_cmd for exp_cmd in exp_commands)
for cmd in out), f'Expected cmds from f{out} in {exp_commands}'
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
json.dumps([

View File

@ -368,15 +368,18 @@ class CephadmUpgrade:
return [d for d in candidates if d.daemon_type in earlier_types]
if self.upgrade_state:
raise OrchestratorError('Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
raise OrchestratorError(
'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
try:
target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(target_name))
target_id, target_version, target_digests = self.mgr.wait_async(
CephadmServe(self.mgr)._get_container_image_info(target_name))
except OrchestratorError as e:
raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}')
# what we need to do here is build a list of daemons that must already be upgraded
# in order for the user's selection of daemons to upgrade to be valid. for example,
# if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block.
daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type not in MONITORING_STACK_TYPES]
daemons = [d for d in self.mgr.cache.get_daemons(
) if d.daemon_type not in MONITORING_STACK_TYPES]
err_msg_base = 'Cannot start upgrade. '
# "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters
dtypes = []
@ -394,7 +397,8 @@ class CephadmUpgrade:
# for our purposes here we can effectively convert our list of services into the
# set of daemon types the services contain. This works because we don't allow --services
# and --daemon-types at the same time and we only allow services of the same type
sspecs = [self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None]
sspecs = [
self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None]
stypes = list(set([s.service_type for s in sspecs]))
if len(stypes) != 1:
raise OrchestratorError('Doing upgrade by service only support services of one type at '
@ -414,15 +418,18 @@ class CephadmUpgrade:
# that reside on hosts in the list of hosts we will upgrade. Then take the type from
# that list that is latest in the upgrade order and check if any daemons on hosts not in the
# provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded.
dtypes = list(set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts]))
other_hosts_daemons = [d for d in daemons if d.hostname is not None and d.hostname not in hosts]
dtypes = list(
set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts]))
other_hosts_daemons = [
d for d in daemons if d.hostname is not None and d.hostname not in hosts]
daemons = _get_earlier_daemons([_latest_type(dtypes)], other_hosts_daemons)
err_msg_base += 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n'
need_upgrade_self, n1, n2, _ = self._detect_need_upgrade(daemons, target_digests)
if need_upgrade_self and ('mgr' not in dtypes or (daemon_types is None and services is None)):
# also report active mgr as needing to be upgraded. It is not included in the resulting list
# by default as it is treated special and handled via the need_upgrade_self bool
n1.insert(0, (self.mgr.mgr_service.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr')), True))
n1.insert(0, (self.mgr.mgr_service.get_active_daemon(
self.mgr.cache.get_daemons_by_type('mgr')), True))
if n1 or n2:
raise OrchestratorError(f'{err_msg_base}Please first upgrade '
f'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n'
@ -780,7 +787,8 @@ class CephadmUpgrade:
target_digests = []
for d_entry in to_upgrade:
if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]:
self.mgr.log.info(f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
self.mgr.log.info(
f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
return
d = d_entry[0]
assert d.daemon_type is not None
@ -822,7 +830,8 @@ class CephadmUpgrade:
self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type)
if len(to_upgrade) > 1:
logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade), self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999)))
logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade),
self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999)))
else:
logger.info('Upgrade: Updating %s.%s' %
(d.daemon_type, d.daemon_id))
@ -1038,15 +1047,19 @@ class CephadmUpgrade:
})
if self.upgrade_state.daemon_types is not None:
logger.debug(f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in self.upgrade_state.daemon_types]
logger.debug(
f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
daemons = [d for d in self.mgr.cache.get_daemons(
) if d.daemon_type in self.upgrade_state.daemon_types]
elif self.upgrade_state.services is not None:
logger.debug(f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
logger.debug(
f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
daemons = []
for service in self.upgrade_state.services:
daemons += self.mgr.cache.get_daemons_by_service(service)
else:
daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
daemons = [d for d in self.mgr.cache.get_daemons(
) if d.daemon_type in CEPH_UPGRADE_ORDER]
if self.upgrade_state.hosts is not None:
logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}')
daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
@ -1067,15 +1080,18 @@ class CephadmUpgrade:
logger.debug('Upgrade: Checking %s daemons' % daemon_type)
daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type]
need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons_of_type, target_digests)
need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(
daemons_of_type, target_digests)
upgraded_daemon_count += done
self._update_upgrade_progress(upgraded_daemon_count / len(daemons))
# make sure mgr and monitoring stack daemons are properly redeployed in staggered upgrade scenarios
if daemon_type == 'mgr' or daemon_type in MONITORING_STACK_TYPES:
if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
need_upgrade_names = [d[0].name() for d in need_upgrade] + [d[0].name() for d in need_upgrade_deployer]
dds = [d for d in self.mgr.cache.get_daemons_by_type(daemon_type) if d.name() not in need_upgrade_names]
need_upgrade_names = [d[0].name() for d in need_upgrade] + \
[d[0].name() for d in need_upgrade_deployer]
dds = [d for d in self.mgr.cache.get_daemons_by_type(
daemon_type) if d.name() not in need_upgrade_names]
need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests)
if not n1:
if not need_upgrade_self and need_upgrade_active:
@ -1113,7 +1129,8 @@ class CephadmUpgrade:
# following bits of _do_upgrade are for completing upgrade for given
# types. If we haven't actually finished upgrading all the daemons
# of this type, we should exit the loop here
_, n1, n2, _ = self._detect_need_upgrade(self.mgr.cache.get_daemons_by_type(daemon_type), target_digests)
_, n1, n2, _ = self._detect_need_upgrade(
self.mgr.cache.get_daemons_by_type(daemon_type), target_digests)
if n1 or n2:
continue

View File

@ -822,7 +822,8 @@ Usage:
values.remove(v)
for dev_type in ['data_devices', 'db_devices', 'wal_devices', 'journal_devices']:
drive_group_spec[dev_type] = DeviceSelection(paths=drive_group_spec[dev_type]) if drive_group_spec.get(dev_type) else None
drive_group_spec[dev_type] = DeviceSelection(
paths=drive_group_spec[dev_type]) if drive_group_spec.get(dev_type) else None
drive_group = DriveGroupSpec(
placement=PlacementSpec(host_pattern=host_name),

View File

@ -1062,7 +1062,8 @@ class Module(MgrModule):
for server in self.list_servers():
host = cast(str, server.get('hostname', ''))
for service in cast(List[ServiceInfoT], server.get('services', [])):
ret.update({(service['id'], service['type']): (host, service['ceph_version'], service.get('name', ''))})
ret.update({(service['id'], service['type']): (
host, service['ceph_version'], service.get('name', ''))})
return ret
@profile_method()