Merge pull request #38883 from sebastian-philipp/cephadm-keep-deleted-specs

mgr/cephadm: Purge deleted services

Reviewed-by: Adam King <adking@redhat.com>
This commit is contained in:
Sebastian Wagner 2021-02-12 01:07:15 +01:00 committed by GitHub
commit 3016de8771
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 269 additions and 124 deletions

View File

@ -3544,10 +3544,17 @@ def prepare_ssh(
except RuntimeError as e:
raise Error('Failed to add host <%s>: %s' % (host, e))
if not ctx.orphan_initial_daemons:
for t in ['mon', 'mgr', 'crash']:
for t in ['mon', 'mgr', 'crash']:
if ctx.orphan_initial_daemons:
logger.info('Deploying %s service with default placement...' % t)
cli(['orch', 'apply', t])
cli(['orch', 'apply', t, '--unmanaged'])
else:
logger.info('Deploying unmanaged %s service...' % t)
cli(['orch', 'apply', t, '--unmanaged'])
if not ctx.orphan_initial_daemons:
logger.info('Deploying crash service with default placement...')
cli(['orch', 'apply', 'crash'])
if not ctx.skip_monitoring_stack:
logger.info('Enabling mgr prometheus module...')
@ -7389,7 +7396,7 @@ def _get_parser():
parser_bootstrap.add_argument(
'--orphan-initial-daemons',
action='store_true',
help='Do not create initial mon, mgr, and crash service specs')
help='Set mon and mgr service to `unmanaged`, Do not create the crash service')
parser_bootstrap.add_argument(
'--skip-monitoring-stack',
action='store_true',

View File

@ -2,7 +2,8 @@ import datetime
from copy import copy
import json
import logging
from typing import cast, TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set
from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
NamedTuple
import orchestrator
from ceph.deployment import inventory
@ -113,14 +114,42 @@ class Inventory:
self.mgr.set_store('inventory', json.dumps(self._inventory))
class SpecDescription(NamedTuple):
spec: ServiceSpec
created: datetime.datetime
deleted: Optional[datetime.datetime]
class SpecStore():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
self.mgr = mgr
self.specs = {} # type: Dict[str, ServiceSpec]
self._specs = {} # type: Dict[str, ServiceSpec]
self.spec_created = {} # type: Dict[str, datetime.datetime]
self.spec_deleted = {} # type: Dict[str, datetime.datetime]
self.spec_preview = {} # type: Dict[str, ServiceSpec]
@property
def all_specs(self) -> Mapping[str, ServiceSpec]:
"""
returns active and deleted specs. Returns read-only dict.
"""
return self._specs
def __contains__(self, name: str) -> bool:
return name in self._specs
def __getitem__(self, name: str) -> SpecDescription:
if name not in self._specs:
raise OrchestratorError(f'Service {name} not found.')
return SpecDescription(self._specs[name],
self.spec_created[name],
self.spec_deleted.get(name, None))
@property
def active_specs(self) -> Mapping[str, ServiceSpec]:
return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}
def load(self):
# type: () -> None
for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
@ -129,8 +158,13 @@ class SpecStore():
j = cast(Dict[str, dict], json.loads(v))
spec = ServiceSpec.from_json(j['spec'])
created = str_to_datetime(cast(str, j['created']))
self.specs[service_name] = spec
self._specs[service_name] = spec
self.spec_created[service_name] = created
if 'deleted' in v:
deleted = str_to_datetime(cast(str, j['deleted']))
self.spec_deleted[service_name] = deleted
self.mgr.log.debug('SpecStore: loaded spec for %s' % (
service_name))
except Exception as e:
@ -138,42 +172,52 @@ class SpecStore():
service_name, e))
pass
def save(self, spec):
# type: (ServiceSpec) -> None
def save(self, spec: ServiceSpec, update_create: bool = True) -> None:
name = spec.service_name()
if spec.preview_only:
self.spec_preview[spec.service_name()] = spec
self.spec_preview[name] = spec
return None
self.specs[spec.service_name()] = spec
self.spec_created[spec.service_name()] = datetime_now()
self._specs[name] = spec
if update_create:
self.spec_created[name] = datetime_now()
data = {
'spec': spec.to_json(),
'created': datetime_to_str(self.spec_created[name]),
}
if name in self.spec_deleted:
data['deleted'] = datetime_to_str(self.spec_deleted[name])
self.mgr.set_store(
SPEC_STORE_PREFIX + spec.service_name(),
json.dumps({
'spec': spec.to_json(),
'created': datetime_to_str(self.spec_created[spec.service_name()]),
}, sort_keys=True),
SPEC_STORE_PREFIX + name,
json.dumps(data, sort_keys=True),
)
self.mgr.events.for_service(spec, OrchestratorEvent.INFO, 'service was created')
def rm(self, service_name):
def rm(self, service_name: str) -> bool:
if service_name not in self._specs:
return False
if self._specs[service_name].preview_only:
self.finally_rm(service_name)
return True
self.spec_deleted[service_name] = datetime_now()
self.save(self._specs[service_name], update_create=False)
return True
def finally_rm(self, service_name):
# type: (str) -> bool
found = service_name in self.specs
found = service_name in self._specs
if found:
del self.specs[service_name]
del self._specs[service_name]
del self.spec_created[service_name]
if service_name in self.spec_deleted:
del self.spec_deleted[service_name]
self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
return found
def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
specs = []
for sn, spec in self.specs.items():
if not service_name or \
sn == service_name or \
sn.startswith(service_name + '.'):
specs.append(spec)
self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % (
service_name, specs))
return specs
def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
return self.spec_created.get(spec.service_name())
@ -746,7 +790,7 @@ class EventStore():
unknowns: List[str] = []
daemons = self.mgr.cache.get_daemon_names()
specs = self.mgr.spec_store.specs.keys()
specs = self.mgr.spec_store.all_specs.keys()
for k_s, v in self.events.items():
kind, subject = k_s.split(':')
if kind == 'service':

View File

@ -76,7 +76,7 @@ class Migrations:
"""
def interesting_specs() -> Iterator[ServiceSpec]:
for s in self.mgr.spec_store.specs.values():
for s in self.mgr.spec_store.all_specs.values():
if s.unmanaged:
continue
p = s.placement
@ -147,17 +147,17 @@ class Migrations:
This fixes the data structure consistency
"""
bad_specs = {}
for name, spec in self.mgr.spec_store.specs.items():
for name, spec in self.mgr.spec_store.all_specs.items():
if name != spec.service_name():
bad_specs[name] = (spec.service_name(), spec)
for old, (new, old_spec) in bad_specs.items():
if new not in self.mgr.spec_store.specs:
if new not in self.mgr.spec_store.all_specs:
spec = old_spec
else:
spec = self.mgr.spec_store.specs[new]
spec = self.mgr.spec_store.all_specs[new]
spec.unmanaged = True
self.mgr.spec_store.save(spec)
self.mgr.spec_store.rm(old)
self.mgr.spec_store.finally_rm(old)
return True

View File

@ -1486,8 +1486,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
if not dd.osdspec_affinity:
# If there is no osdspec_affinity, the spec should suffice for displaying
continue
if n in self.spec_store.specs:
spec = self.spec_store.specs[n]
if n in self.spec_store.all_specs:
spec = self.spec_store.all_specs[n]
else:
spec = ServiceSpec(
unmanaged=True,
@ -1505,7 +1505,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
spec=spec,
events=self.events.get_for_service(spec.service_name()),
)
if n in self.spec_store.specs:
if n in self.spec_store.all_specs:
if dd.daemon_type == 'osd':
"""
The osd count can't be determined by the Placement spec.
@ -1519,6 +1519,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
self.inventory.all_specs())
sm[n].created = self.spec_store.spec_created[n]
sm[n].deleted = self.spec_store.spec_deleted.get(n, None)
if service_type == 'nfs':
spec = cast(NFSServiceSpec, spec)
sm[n].rados_config_location = spec.rados_config_location()
@ -1535,7 +1537,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
if dd.daemon_type == 'haproxy' or dd.daemon_type == 'keepalived':
# ha-rgw has 2 daemons running per host
sm[n].size = sm[n].size * 2
for n, spec in self.spec_store.specs.items():
for n, spec in self.spec_store.all_specs.items():
if n in sm:
continue
if service_type is not None and service_type != spec.service_type:
@ -1693,12 +1695,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
def remove_service(self, service_name: str) -> str:
self.log.info('Remove service %s' % service_name)
self._trigger_preview_refresh(service_name=service_name)
if service_name in self.spec_store:
if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'):
return f'Unable to remove {service_name} service.\n' \
f'Note, you might want to mark the {service_name} service as "unmanaged"'
found = self.spec_store.rm(service_name)
if found:
self._kick_serve_loop()
service = self.cephadm_services.get(service_name, None)
if service:
service.purge()
return 'Removed service %s' % service_name
else:
# must be idempotent: still a success.
@ -1897,6 +1900,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
Add (and place) a daemon. Require explicit host placement. Do not
schedule, and do not apply the related scheduling limitations.
"""
if spec.service_name() not in self.spec_store:
raise OrchestratorError('Unable to add a Daemon without Service.\n'
'Please use `ceph orch apply ...` to create a Service.\n'
'Note, you might want to create the service with "unmanaged=true"')
self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
if not spec.placement.hosts:
raise OrchestratorError('must specify host(s) to deploy on')

View File

@ -85,6 +85,8 @@ class CephadmServe:
self._check_daemons()
self._purge_deleted_services()
if self.mgr.upgrade.continue_upgrade():
continue
@ -420,7 +422,7 @@ class CephadmServe:
def _apply_all_services(self) -> bool:
r = False
specs = [] # type: List[ServiceSpec]
for sn, spec in self.mgr.spec_store.specs.items():
for sn, spec in self.mgr.spec_store.active_specs.items():
specs.append(spec)
for spec in specs:
try:
@ -607,7 +609,7 @@ class CephadmServe:
daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
for dd in daemons:
# orphan?
spec = self.mgr.spec_store.specs.get(dd.service_name(), None)
spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None)
assert dd.hostname is not None
assert dd.daemon_type is not None
assert dd.daemon_id is not None
@ -686,6 +688,21 @@ class CephadmServe:
self.mgr._get_cephadm_service(daemon_type_to_service(
daemon_type)).daemon_check_post(daemon_descs)
def _purge_deleted_services(self) -> None:
existing_services = self.mgr.spec_store.all_specs.items()
for service_name, spec in list(existing_services):
if service_name not in self.mgr.spec_store.spec_deleted:
continue
if self.mgr.cache.get_daemons_by_service(service_name):
continue
if spec.service_type in ['mon', 'mgr']:
continue
logger.info(f'Purge service {service_name}')
self.mgr.cephadm_services[spec.service_type].purge(service_name)
self.mgr.spec_store.finally_rm(service_name)
def convert_tags_to_repo_digest(self) -> None:
if not self.mgr.use_repo_digest:
return

View File

@ -280,7 +280,7 @@ class CephadmService(metaclass=ABCMeta):
assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
def purge(self) -> None:
def purge(self, service_name: str) -> None:
"""Called to carry out any purge tasks following service removal"""
logger.debug(f'Purge called for {self.TYPE} - no action taken')
@ -594,6 +594,13 @@ class MdsService(CephService):
# if no mds found, return empty Daemon Desc
return DaemonDescription()
def purge(self, service_name: str) -> None:
self.mgr.check_mon_command({
'prefix': 'config rm',
'who': service_name,
'name': 'mds_join_fs',
})
class RgwService(CephService):
TYPE = 'rgw'
@ -654,6 +661,7 @@ class RgwService(CephService):
'val': key_data,
})
# TODO: fail, if we don't have a spec
logger.info('Saving service %s spec with placement %s' % (
spec.service_name(), spec.placement.pretty_str()))
self.mgr.spec_store.save(spec)
@ -973,6 +981,6 @@ class CephadmExporter(CephadmService):
}
return config, deps
def purge(self) -> None:
def purge(self, service_name: str) -> None:
logger.info("Purging cephadm-exporter settings from mon K/V store")
self.mgr._clear_exporter_config_settings()

View File

@ -63,14 +63,15 @@ class HA_RGWService(CephService):
service_name: str = "ha-rgw." + daemon_id.split('.')[0]
# if no service spec, return empty config
if not daemon_spec.spec and service_name not in self.mgr.spec_store.specs:
# TODO: fail, if we don't have any spec
if not daemon_spec.spec and service_name not in self.mgr.spec_store.all_specs:
config_files = {'files': {}} # type: Dict[str, Any]
return config_files, []
elif daemon_spec.spec:
spec = daemon_spec.spec
else:
# service spec is not attached to daemon spec but is in spec store
spec = cast(HA_RGWSpec, self.mgr.spec_store.specs[service_name])
spec = cast(HA_RGWSpec, self.mgr.spec_store.all_specs[service_name])
rgw_daemons = self.mgr.cache.get_daemons_by_type('rgw')
rgw_servers = []
@ -110,14 +111,15 @@ class HA_RGWService(CephService):
service_name: str = "ha-rgw." + daemon_id.split('.')[0]
# if no service spec, return empty config
if not daemon_spec.spec and service_name not in self.mgr.spec_store.specs:
# TODO: In case of no spec, fail here
if not daemon_spec.spec and service_name not in self.mgr.spec_store.all_specs:
config_file = {'files': {}} # type: Dict[str, Any]
return config_file, []
elif daemon_spec.spec:
spec = daemon_spec.spec
else:
# service spec is not attached to daemon spec but is in spec store
spec = cast(HA_RGWSpec, self.mgr.spec_store.specs[service_name])
spec = cast(HA_RGWSpec, self.mgr.spec_store.all_specs[service_name])
all_hosts = []
for h, network, name in spec.definitive_host_list:

View File

@ -19,6 +19,7 @@ class IscsiService(CephService):
assert spec.pool
self.mgr._check_pool_exists(spec.pool, spec.service_name())
# TODO: remove this:
logger.info('Saving service %s spec with placement %s' % (
spec.service_name(), spec.placement.pretty_str()))
self.mgr.spec_store.save(spec)
@ -77,8 +78,9 @@ class IscsiService(CephService):
def get_set_cmd_dicts(out: str) -> List[dict]:
gateways = json.loads(out)['gateways']
cmd_dicts = []
# TODO: fail, if we don't have a spec
spec = cast(IscsiServiceSpec,
self.mgr.spec_store.specs.get(daemon_descrs[0].service_name(), None))
self.mgr.spec_store.all_specs.get(daemon_descrs[0].service_name(), None))
if spec.api_secure and spec.ssl_cert and spec.ssl_key:
cmd_dicts.append({
'prefix': 'dashboard set-iscsi-api-ssl-verification',
@ -91,8 +93,9 @@ class IscsiService(CephService):
})
for dd in daemon_descrs:
assert dd.hostname is not None
# todo: this can fail:
spec = cast(IscsiServiceSpec,
self.mgr.spec_store.specs.get(dd.service_name(), None))
self.mgr.spec_store.all_specs.get(dd.service_name(), None))
if not spec:
logger.warning('No ServiceSpec found for %s', dd)
continue

View File

@ -19,6 +19,7 @@ class NFSService(CephService):
assert spec.pool
self.mgr._check_pool_exists(spec.pool, spec.service_name())
# TODO: Fail here, in case of no spec
logger.info('Saving service %s spec with placement %s' % (
spec.service_name(), spec.placement.pretty_str()))
self.mgr.spec_store.save(spec)

View File

@ -101,17 +101,33 @@ def with_host(m: CephadmOrchestrator, name, refresh_hosts=True):
wait(m, m.remove_host(name))
def assert_rm_service(cephadm, srv_name):
def assert_rm_service(cephadm: CephadmOrchestrator, srv_name):
mon_or_mgr = cephadm.spec_store[srv_name].spec.service_type in ('mon', 'mgr')
if mon_or_mgr:
assert 'Unable' in wait(cephadm, cephadm.remove_service(srv_name))
return
assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}'
assert cephadm.spec_store[srv_name].deleted is not None
CephadmServe(cephadm)._check_daemons()
CephadmServe(cephadm)._apply_all_services()
assert cephadm.spec_store[srv_name].deleted
unmanaged = cephadm.spec_store[srv_name].spec.unmanaged
CephadmServe(cephadm)._purge_deleted_services()
if not unmanaged: # cause then we're not deleting daemons
assert srv_name not in cephadm.spec_store, f'{cephadm.spec_store[srv_name]!r}'
@contextmanager
def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth, host: str) -> Iterator[List[str]]:
if spec.placement.is_empty():
def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '') -> Iterator[List[str]]:
if spec.placement.is_empty() and host:
spec.placement = PlacementSpec(hosts=[host], count=1)
c = meth(cephadm_module, spec)
assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
if meth is not None:
c = meth(cephadm_module, spec)
assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
else:
c = cephadm_module.apply([spec])
assert wait(cephadm_module, c) == [f'Scheduled {spec.service_name()} update...']
specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())]
assert spec in specs
@ -119,7 +135,8 @@ def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth, h
dds = wait(cephadm_module, cephadm_module.list_daemons())
own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()]
assert own_dds
if host:
assert own_dds
yield [dd.name() for dd in own_dds]

View File

@ -24,7 +24,7 @@ from orchestrator import DaemonDescription, InventoryHost, \
HostSpec, OrchestratorError
from tests import mock
from .fixtures import wait, _run_cephadm, match_glob, with_host, \
with_cephadm_module, with_service, assert_rm_service, _deploy_cephadm_binary
with_cephadm_module, with_service, _deploy_cephadm_binary
from cephadm.module import CephadmOrchestrator
"""
@ -106,8 +106,8 @@ class TestCephadm(object):
with with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
assert wait(cephadm_module, c) == []
with with_daemon(cephadm_module, ServiceSpec('mds', 'name'), CephadmOrchestrator.add_mds, 'test'):
with with_service(cephadm_module, ServiceSpec('mds', 'name', unmanaged=True)) as _, \
with_daemon(cephadm_module, ServiceSpec('mds', 'name'), CephadmOrchestrator.add_mds, 'test') as _:
c = cephadm_module.list_daemons()
@ -132,11 +132,11 @@ class TestCephadm(object):
out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
expected = [
{
'placement': {'hosts': ['test']},
'placement': {'count': 2},
'service_id': 'name',
'service_name': 'mds.name',
'service_type': 'mds',
'status': {'running': 1, 'size': 0},
'status': {'created': mock.ANY, 'running': 1, 'size': 2},
'unmanaged': True
},
{
@ -189,7 +189,8 @@ class TestCephadm(object):
def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
cephadm_module.service_cache_timeout = 10
with with_host(cephadm_module, 'test'):
with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
c = cephadm_module.daemon_action('redeploy', 'rgw.' + daemon_id)
assert wait(cephadm_module,
@ -214,7 +215,8 @@ class TestCephadm(object):
def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
cephadm_module.service_cache_timeout = 10
with with_host(cephadm_module, 'test'):
with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command:
_ceph_send_command.side_effect = Exception("myerror")
@ -318,14 +320,15 @@ class TestCephadm(object):
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"):
ps = PlacementSpec(hosts=['test'], count=1)
with with_service(cephadm_module, ServiceSpec(service_type='mon', unmanaged=True)):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
wait(cephadm_module, c)
assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"):
ps = PlacementSpec(hosts=['test'], count=1)
c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
wait(cephadm_module, c)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_mgr_update(self, cephadm_module):
@ -527,19 +530,20 @@ class TestCephadm(object):
def test_rgw_update(self, cephadm_module):
with with_host(cephadm_module, 'host1'):
with with_host(cephadm_module, 'host2'):
ps = PlacementSpec(hosts=['host1'], count=1)
c = cephadm_module.add_rgw(
RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
with with_service(cephadm_module, RGWSpec(rgw_realm='realm', rgw_zone='zone1', unmanaged=True)):
ps = PlacementSpec(hosts=['host1'], count=1)
c = cephadm_module.add_rgw(
RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
r = CephadmServe(cephadm_module)._apply_service(
RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
assert r
ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
r = CephadmServe(cephadm_module)._apply_service(
RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
assert r
assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host1')
assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host2')
assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host1')
assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host2')
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
json.dumps([
@ -580,9 +584,12 @@ class TestCephadm(object):
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_daemon_add(self, spec: ServiceSpec, meth, cephadm_module):
unmanaged_spec = ServiceSpec.from_json(spec.to_json())
unmanaged_spec.unmanaged = True
with with_host(cephadm_module, 'test'):
with with_daemon(cephadm_module, spec, meth, 'test'):
pass
with with_service(cephadm_module, unmanaged_spec):
with with_daemon(cephadm_module, spec, meth, 'test'):
pass
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_daemon_add_fail(self, _run_cephadm, cephadm_module):
@ -590,15 +597,17 @@ class TestCephadm(object):
with with_host(cephadm_module, 'test'):
spec = ServiceSpec(
service_type='mgr',
placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1)
placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1),
unmanaged=True
)
_run_cephadm.side_effect = OrchestratorError('fail')
with pytest.raises(OrchestratorError):
wait(cephadm_module, cephadm_module.add_mgr(spec))
cephadm_module.assert_issued_mon_command({
'prefix': 'auth rm',
'entity': 'mgr.x',
})
with with_service(cephadm_module, spec):
_run_cephadm.side_effect = OrchestratorError('fail')
with pytest.raises(OrchestratorError):
wait(cephadm_module, cephadm_module.add_mgr(spec))
cephadm_module.assert_issued_mon_command({
'prefix': 'auth rm',
'entity': 'mgr.x',
})
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
@ -610,16 +619,14 @@ class TestCephadm(object):
pool='pool',
namespace='namespace',
placement=ps)
c = cephadm_module.add_nfs(spec)
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed nfs.name.* on host 'test'")
unmanaged_spec = ServiceSpec.from_json(spec.to_json())
unmanaged_spec.unmanaged = True
with with_service(cephadm_module, unmanaged_spec):
c = cephadm_module.add_nfs(spec)
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed nfs.name.* on host 'test'")
assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test')
# Hack. We never created the service, but we now need to remove it.
# this is in contrast to the other services, which don't create this service
# automatically.
assert_rm_service(cephadm_module, 'nfs.name')
assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test')
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
@ -632,16 +639,15 @@ class TestCephadm(object):
api_user='user',
api_password='password',
placement=ps)
c = cephadm_module.add_iscsi(spec)
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed iscsi.name.* on host 'test'")
unmanaged_spec = ServiceSpec.from_json(spec.to_json())
unmanaged_spec.unmanaged = True
with with_service(cephadm_module, unmanaged_spec):
assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test')
c = cephadm_module.add_iscsi(spec)
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed iscsi.name.* on host 'test'")
# Hack. We never created the service, but we now need to remove it.
# this is in contrast to the other services, which don't create this service
# automatically.
assert_rm_service(cephadm_module, 'iscsi.name')
assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test')
@pytest.mark.parametrize(
"on_bool",
@ -767,6 +773,25 @@ class TestCephadm(object):
with with_service(cephadm_module, spec, meth, 'test'):
pass
@mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_mds_config_purge(self, cephadm_module: CephadmOrchestrator):
spec = ServiceSpec('mds', service_id='fsname')
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, spec, host='test'):
ret, out, err = cephadm_module.check_mon_command({
'prefix': 'config get',
'who': spec.service_name(),
'key': 'mds_join_fs',
})
assert out == 'fsname'
ret, out, err = cephadm_module.check_mon_command({
'prefix': 'config get',
'who': spec.service_name(),
'key': 'mds_join_fs',
})
assert not out
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator):

View File

@ -74,15 +74,15 @@ def test_migrate_service_id_mon_one(cephadm_module: CephadmOrchestrator):
cephadm_module.spec_store.load()
assert len(cephadm_module.spec_store.specs) == 1
assert cephadm_module.spec_store.specs['mon.wrong'].service_name() == 'mon'
assert len(cephadm_module.spec_store.all_specs) == 1
assert cephadm_module.spec_store.all_specs['mon.wrong'].service_name() == 'mon'
cephadm_module.migration_current = 1
cephadm_module.migration.migrate()
assert cephadm_module.migration_current == 2
assert len(cephadm_module.spec_store.specs) == 1
assert cephadm_module.spec_store.specs['mon'] == ServiceSpec(
assert len(cephadm_module.spec_store.all_specs) == 1
assert cephadm_module.spec_store.all_specs['mon'] == ServiceSpec(
service_type='mon',
unmanaged=True,
placement=PlacementSpec(hosts=['host1'])
@ -116,16 +116,16 @@ def test_migrate_service_id_mon_two(cephadm_module: CephadmOrchestrator):
cephadm_module.spec_store.load()
assert len(cephadm_module.spec_store.specs) == 2
assert cephadm_module.spec_store.specs['mon.wrong'].service_name() == 'mon'
assert cephadm_module.spec_store.specs['mon'].service_name() == 'mon'
assert len(cephadm_module.spec_store.all_specs) == 2
assert cephadm_module.spec_store.all_specs['mon.wrong'].service_name() == 'mon'
assert cephadm_module.spec_store.all_specs['mon'].service_name() == 'mon'
cephadm_module.migration_current = 1
cephadm_module.migration.migrate()
assert cephadm_module.migration_current == 2
assert len(cephadm_module.spec_store.specs) == 1
assert cephadm_module.spec_store.specs['mon'] == ServiceSpec(
assert len(cephadm_module.spec_store.all_specs) == 1
assert cephadm_module.spec_store.all_specs['mon'] == ServiceSpec(
service_type='mon',
unmanaged=True,
placement=PlacementSpec(count=5)
@ -149,4 +149,4 @@ def test_migrate_service_id_mds_one(cephadm_module: CephadmOrchestrator):
cephadm_module.spec_store.load()
# there is nothing to migrate, as the spec is gone now.
assert len(cephadm_module.spec_store.specs) == 0
assert len(cephadm_module.spec_store.all_specs) == 0

View File

@ -1480,6 +1480,7 @@ class ServiceDescription(object):
service_url: Optional[str] = None,
last_refresh: Optional[datetime.datetime] = None,
created: Optional[datetime.datetime] = None,
deleted: Optional[datetime.datetime] = None,
size: int = 0,
running: int = 0,
events: Optional[List['OrchestratorEvent']] = None) -> None:
@ -1506,6 +1507,7 @@ class ServiceDescription(object):
# datetime when this info was last refreshed
self.last_refresh: Optional[datetime.datetime] = last_refresh
self.created: Optional[datetime.datetime] = created
self.deleted: Optional[datetime.datetime] = deleted
self.spec: ServiceSpec = spec

View File

@ -560,7 +560,7 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule,
return HandleCommandResult(stdout="No services reported")
elif format != Format.plain:
if export:
data = [s.spec for s in services]
data = [s.spec for s in services if s.deleted is None]
return HandleCommandResult(stdout=to_format(data, format, many=True, cls=ServiceSpec))
else:
return HandleCommandResult(stdout=to_format(services, format, many=True, cls=ServiceDescription))
@ -588,10 +588,15 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule,
pl = '<unmanaged>'
else:
pl = s.spec.placement.pretty_str()
if s.deleted:
refreshed = '<deleting>'
else:
refreshed = nice_delta(now, s.last_refresh, ' ago')
table.add_row((
s.spec.service_name(),
'%d/%d' % (s.running, s.size),
nice_delta(now, s.last_refresh, ' ago'),
refreshed,
nice_delta(now, s.created),
pl,
ukn(s.container_image_name),

View File

@ -117,6 +117,10 @@ if 'UNITTEST' in os.environ:
self.mock_store_set('config', f'{cmd["who"]}/{cmd["name"]}', cmd['value'])
return ''
def config_rm():
self.mock_store_set('config', f'{cmd["who"]}/{cmd["name"]}', None)
return ''
def config_dump():
r = []
for prefix, value in self.mock_store_preifx('config', '').items():
@ -135,6 +139,8 @@ if 'UNITTEST' in os.environ:
outb = config_set()
elif cmd['prefix'] == 'config dump':
outb = config_dump()
elif cmd['prefix'] == 'config rm':
outb = config_rm()
elif hasattr(self, '_mon_command_mock_' + cmd['prefix'].replace(' ', '_')):
a = getattr(self, '_mon_command_mock_' + cmd['prefix'].replace(' ', '_'))
outb = a(cmd)