mgr/cephadm: replace PersistentStoreDict with SpecStore

Explicit implementation of the dict of specs.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2020-02-28 16:38:26 -06:00
parent f44c11d1a5
commit 1980250ab8
3 changed files with 82 additions and 85 deletions

View File

@ -26,7 +26,6 @@ import re
import shutil
import subprocess
import uuid
from mgr_module import PersistentStoreDict
from ceph.deployment import inventory, translate
from ceph.deployment.drive_group import DriveGroupSpec
@ -64,6 +63,7 @@ DEFAULT_SSH_CONFIG = ('Host *\n'
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
HOST_CACHE_PREFIX = "host."
SPEC_STORE_PREFIX = "spec."
# for py2 compat
try:
@ -126,6 +126,46 @@ def assert_valid_host(name):
raise OrchestratorError(e)
class SpecStore():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
self.mgr = mgr
self.specs = {} # type: Dict[str, orchestrator.ServiceSpec]
def load(self):
# type: () -> None
for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
service_name = k[len(SPEC_STORE_PREFIX):]
try:
spec = ServiceSpec.from_json(json.loads(v))
self.specs[service_name] = spec
self.mgr.log.debug('SpecStore: loaded spec for %s' % (
service_name))
except Exception as e:
self.mgr.log.warning('unable to load spec for %s: %s' % (
service_name, e))
pass
def save(self, spec):
# type: (orchestrator.ServiceSpec) -> None
self.specs[spec.service_name()] = spec
self.mgr.set_store(SPEC_STORE_PREFIX + spec.service_name(),
spec.to_json())
def rm(self, service_name):
# type: (str) -> None
if service_name in self.specs:
del self.specs[service_name]
self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
def find(self, service_name):
# type: (str) -> List[orchestrator.ServiceSpec]
specs = []
for sn, spec in self.specs.items():
if sn == service_name or sn.startswith(service_name + '.'):
specs.append(spec)
return specs
class HostCache():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
@ -586,7 +626,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
self.osd_removal_report: dict = dict()
self.rm_util = RemoveUtil(self)
self.service_spec_store = PersistentStoreDict(self, 'service_spec')
self.spec_store = SpecStore(self)
self.spec_store.load()
# ensure the host lists are in sync
for h in self.inventory.keys():
@ -1605,10 +1646,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
n: str = dd.service_name()
if service_name and service_name != n:
continue
try:
_ = self.service_spec_store[dd.service_name()]
if dd.service_name() in self.spec_store.specs:
spec_presence = "present"
except IndexError:
else:
spec_presence = "absent"
if dd.daemon_type == 'osd':
spec_presence = "not applicable"
@ -1723,7 +1763,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
args.append(
(d.name(), d.hostname, force)
)
self._remove_key_from_store(d.service_name())
self.spec_store.rm(d.service_name())
if not args:
raise OrchestratorError('Unable to find daemons in %s service' % (
service_name))
@ -1731,10 +1771,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
service_name, [a[0] for a in args]))
return self._remove_daemon(args)
def _remove_key_from_store(self, spec_name: str) -> None:
self.log.debug(f"Removing {spec_name} from the service_spec store")
del self.service_spec_store[spec_name]
def get_inventory(self, host_filter=None, refresh=False):
"""
Return the storage inventory of hosts matching the given filter.
@ -2153,7 +2189,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
return self._add_daemon('mgr', spec, self._create_mgr)
def apply_mgr(self, spec):
self.save_spec(spec)
self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled MGR creation..")
@ -2166,7 +2202,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion:
self.save_spec(spec)
self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled MDS creation..")
@ -2227,7 +2263,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
def apply_rgw(self, spec):
self.save_spec(spec)
self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled RGW creation..")
@ -2251,7 +2287,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
keyring=keyring)
def apply_rbd_mirror(self, spec):
self.save_spec(spec)
self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled rbd-mirror creation..")
@ -2460,7 +2496,7 @@ receivers:
return self._apply_service('prometheus', spec, self._create_prometheus)
def apply_prometheus(self, spec):
self.save_spec(spec)
self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled prometheus creation..")
@ -2470,7 +2506,7 @@ receivers:
self._create_node_exporter)
def apply_node_exporter(self, spec):
self.save_spec(spec)
self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("Scheduled node-exporter creation..")
@ -2736,34 +2772,12 @@ receivers:
"""
return trivial_result(self.osd_removal_report)
def save_spec(self, spec: ServiceSpec) -> None:
"""
Attempts to save a ServiceSpec.
There are two ways to manipulate the service_specs stored in the mon_store
1) Using a global definition (i.e. `ceph orch apply -i <specs_file.yaml>`)
This will usually contain multiple definitions of services and daemons.
2) Using the CLI for specific service types (i.e. `ceph orch apply rgw 3 --realm foo --zone bar`)
Raises `OrchestratorError` if an entry with the same named identifier already exist to prevent overwrites from two different sources.
"""
full_spec_name = f"{spec.service_type}.{spec.name}"
try:
_ = self.service_spec_store[full_spec_name]
raise orchestrator.OrchestratorError(f"Specification for {full_spec_name} already exists. "
"Please review your existing specs with "
"`ceph orch spec dump` and try again.")
except IndexError:
self.log.info(f"New spec found. Saving <{full_spec_name}> to the store.")
_ = self.service_spec_store[full_spec_name] = spec.to_json()
def list_specs(self) -> orchestrator.Completion:
"""
Loads all entries from the service_spec mon_store root.
"""
specs = list()
for _, spec in self.service_spec_store.items():
for service_name, spec in self.spec_store.specs.items():
specs.append('---')
specs.append(yaml.dump(spec))
return trivial_result(specs)
@ -2773,7 +2787,7 @@ receivers:
Parse a multi document yaml file (represented in a inbuf object)
and loads it with it's respective ServiceSpec to validate the
initial input.
If no errors are raised `save_spec` is called.
If no errors are raised, save them.
"""
content: Iterator[Any] = yaml.load_all(spec_document)
# Load all specs from a multi document yaml file.
@ -2783,18 +2797,19 @@ receivers:
spec_o = ServiceSpec.from_json(spec)
loaded_specs.append(spec_o)
for spec in loaded_specs:
self.save_spec(spec)
self.spec_store.save(spec)
self._kick_serve_loop()
return trivial_result("ServiceSpecs saved")
def trigger_deployment(self, service_name: str,
def trigger_deployment(self,
service_name: str,
func: Callable[[ServiceSpec], orchestrator.Completion]) -> List[orchestrator.Completion]:
"""
Triggers a corresponding deployment method `func` to `service_name`
Services can have multiple entries. (i.e. different RGW configurations)
"""
self.log.debug(f"starting async {service_name} deployment")
specs = self.find_json_specs(service_name)
specs = self.spec_store.find(service_name)
completions = list()
for spec in specs:
completions.append(func(spec))
@ -2802,29 +2817,6 @@ receivers:
return completions
return [trivial_result("Nothing to do..")]
def find_json_specs(self, find_service_type: str) -> List[ServiceSpec]:
"""
Inspects the mon_store and gathers entries for the `find_service_type`
(i.e. 'mgr', 'rgw') service.
Some services have individual ServiceSpecs (rgw->RGWSpec, nfs->NFSServiceSpec)
so we need to make the distinction.
"""
specs = list()
self.log.debug(f"Checking for type {find_service_type}")
for spec_key, json_specs in self.service_spec_store.items():
if not spec_key.split('.')[0].startswith(find_service_type):
continue
if isinstance(json_specs, dict):
self.log.debug(f"Found dict in json_specs: No need to decode")
elif isinstance(json_specs, str):
self.log.debug(f"Found str in json_specs: Decoding from json")
json_specs = json.loads(json_specs)
self.log.debug(f"Found service_type: {spec_key} in the k-v store. Adding..")
specs.append(ServiceSpec.from_json(json_specs))
self.log.debug(f"Found {specs} specs.")
return specs
def _apply_services(self) -> List[orchestrator.Completion]:
"""
This is a method that is supposed to run continuously in the

View File

@ -285,8 +285,7 @@ class TestCephadm(object):
))
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
@mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
def test_remove_daemon(self, rm_key, _rm_host, _save_spec, cephadm_module):
def test_remove_daemon(self, _rm_host, _save_spec, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
@ -308,8 +307,8 @@ class TestCephadm(object):
))
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
@mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
def test_remove_service(self, _rm_key, _rm_host, _save_spec, cephadm_module):
@mock.patch("cephadm.module.SpecStore.rm")
def test_remove_service(self, _rm_spec, _rm_host, _save_spec, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
@ -406,7 +405,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
@mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_mgr_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@ -421,7 +420,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
@mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_mds_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@ -436,7 +435,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
@mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_rgw_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@ -451,7 +450,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
@mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_rbd_mirror_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@ -466,7 +465,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
@mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_prometheus_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@ -481,7 +480,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
@mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
def test_apply_node_exporter_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@ -496,7 +495,7 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
@mock.patch("cephadm.module.SpecStore.save")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
@mock.patch("cephadm.module.yaml.load_all", return_value=[{'service_type': 'rgw', 'placement': {'count': 1}, 'spec': {'rgw_realm': 'realm1', 'rgw_zone': 'zone1'}}])
@ -515,12 +514,12 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
@mock.patch("cephadm.module.CephadmOrchestrator.find_json_specs")
def test_trigger_deployment_todo(self, _find_json_spec, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
@mock.patch("cephadm.module.SpecStore.find")
def test_trigger_deployment_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
with self._with_host(cephadm_module, 'test'):
_find_json_spec.return_value = ['something']
_find.return_value = ['something']
c = cephadm_module.trigger_deployment('foo', lambda x: x)
_find_json_spec.assert_called_with('foo')
_find.assert_called_with('foo')
assert c == ['something']
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@ -529,12 +528,12 @@ class TestCephadm(object):
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("cephadm.module.HostCache.save_host")
@mock.patch("cephadm.module.HostCache.rm_host")
@mock.patch("cephadm.module.CephadmOrchestrator.find_json_specs")
def test_trigger_deployment_no_todo(self, _find_json_spec, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
@mock.patch("cephadm.module.SpecStore.find")
def test_trigger_deployment_no_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
with self._with_host(cephadm_module, 'test'):
_find_json_spec.return_value = []
_find.return_value = []
c = cephadm_module.trigger_deployment('foo', lambda x: x)
_find_json_spec.assert_called_with('foo')
_find.assert_called_with('foo')
assert wait(cephadm_module, c[0]) == 'Nothing to do..'
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))

View File

@ -1577,6 +1577,12 @@ class ServiceSpec(object):
args.update({k: v})
return _cls(**args) # type: ignore
def service_name(self):
n = self.service_type
if self.name:
n += '.' + self.name
return n
def to_json(self):
return json.dumps(self, default=lambda o: o.__dict__,
sort_keys=True, indent=4)