diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index 62ad03f31ef..3cfe74f41cb 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -1,7 +1,14 @@ +import datetime +from copy import copy import json import logging -from typing import TYPE_CHECKING, Dict, List, Iterator +from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple +import six + +import orchestrator +from ceph.deployment import inventory +from ceph.deployment.service_spec import ServiceSpec from orchestrator import OrchestratorError, HostSpec if TYPE_CHECKING: @@ -10,6 +17,11 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +HOST_CACHE_PREFIX = "host." +SPEC_STORE_PREFIX = "spec." +DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' + + class Inventory: def __init__(self, mgr: 'CephadmOrchestrator'): self.mgr = mgr @@ -86,3 +98,304 @@ class Inventory: def save(self): self.mgr.set_store('inventory', json.dumps(self._inventory)) + + +class SpecStore(): + def __init__(self, mgr): + # type: (CephadmOrchestrator) -> None + self.mgr = mgr + self.specs = {} # type: Dict[str, ServiceSpec] + self.spec_created = {} # type: Dict[str, datetime.datetime] + + def load(self): + # type: () -> None + for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)): + service_name = k[len(SPEC_STORE_PREFIX):] + try: + v = json.loads(v) + spec = ServiceSpec.from_json(v['spec']) + created = datetime.datetime.strptime(v['created'], DATEFMT) + self.specs[service_name] = spec + self.spec_created[service_name] = created + self.mgr.log.debug('SpecStore: loaded spec for %s' % ( + service_name)) + except Exception as e: + self.mgr.log.warning('unable to load spec for %s: %s' % ( + service_name, e)) + pass + + def save(self, spec): + # type: (ServiceSpec) -> None + self.specs[spec.service_name()] = spec + self.spec_created[spec.service_name()] = datetime.datetime.utcnow() + self.mgr.set_store( + SPEC_STORE_PREFIX + spec.service_name(), + json.dumps({ + 'spec': spec.to_json(), + 'created': self.spec_created[spec.service_name()].strftime(DATEFMT), + }, sort_keys=True), + ) + + def rm(self, service_name): + # type: (str) -> bool + found = service_name in self.specs + if found: + del self.specs[service_name] + del self.spec_created[service_name] + self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) + return found + + def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]: + specs = [] + for sn, spec in self.specs.items(): + if not service_name or \ + sn == service_name or \ + sn.startswith(service_name + '.'): + specs.append(spec) + self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % ( + service_name, specs)) + return specs + +class HostCache(): + def __init__(self, mgr): + # type: (CephadmOrchestrator) -> None + self.mgr: CephadmOrchestrator = mgr + self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] + self.last_daemon_update = {} # type: Dict[str, datetime.datetime] + self.devices = {} # type: Dict[str, List[inventory.Device]] + self.networks = {} # type: Dict[str, Dict[str, List[str]]] + self.last_device_update = {} # type: Dict[str, datetime.datetime] + self.daemon_refresh_queue = [] # type: List[str] + self.device_refresh_queue = [] # type: List[str] + self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] + self.last_host_check = {} # type: Dict[str, datetime.datetime] + + def load(self): + # type: () -> None + for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)): + host = k[len(HOST_CACHE_PREFIX):] + if host not in self.mgr.inventory: + self.mgr.log.warning('removing stray HostCache host record %s' % ( + host)) + self.mgr.set_store(k, None) + try: + j = json.loads(v) + if 'last_device_update' in j: + self.last_device_update[host] = datetime.datetime.strptime( + j['last_device_update'], DATEFMT) + else: + self.device_refresh_queue.append(host) + # for services, we ignore the persisted last_*_update + # and always trigger a new scrape on mgr restart. + self.daemon_refresh_queue.append(host) + self.daemons[host] = {} + self.devices[host] = [] + self.networks[host] = {} + self.daemon_config_deps[host] = {} + for name, d in j.get('daemons', {}).items(): + self.daemons[host][name] = \ + orchestrator.DaemonDescription.from_json(d) + for d in j.get('devices', []): + self.devices[host].append(inventory.Device.from_json(d)) + self.networks[host] = j.get('networks', {}) + for name, d in j.get('daemon_config_deps', {}).items(): + self.daemon_config_deps[host][name] = { + 'deps': d.get('deps', []), + 'last_config': datetime.datetime.strptime( + d['last_config'], DATEFMT), + } + if 'last_host_check' in j: + self.last_host_check[host] = datetime.datetime.strptime( + j['last_host_check'], DATEFMT) + self.mgr.log.debug( + 'HostCache.load: host %s has %d daemons, ' + '%d devices, %d networks' % ( + host, len(self.daemons[host]), len(self.devices[host]), + len(self.networks[host]))) + except Exception as e: + self.mgr.log.warning('unable to load cached state for %s: %s' % ( + host, e)) + pass + + def update_host_daemons(self, host, dm): + # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None + self.daemons[host] = dm + self.last_daemon_update[host] = datetime.datetime.utcnow() + + def update_host_devices_networks(self, host, dls, nets): + # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None + self.devices[host] = dls + self.networks[host] = nets + self.last_device_update[host] = datetime.datetime.utcnow() + + def update_daemon_config_deps(self, host, name, deps, stamp): + self.daemon_config_deps[host][name] = { + 'deps': deps, + 'last_config': stamp, + } + + def update_last_host_check(self, host): + # type: (str) -> None + self.last_host_check[host] = datetime.datetime.utcnow() + + def prime_empty_host(self, host): + # type: (str) -> None + """ + Install an empty entry for a host + """ + self.daemons[host] = {} + self.devices[host] = [] + self.networks[host] = {} + self.daemon_config_deps[host] = {} + self.daemon_refresh_queue.append(host) + self.device_refresh_queue.append(host) + + def invalidate_host_daemons(self, host): + # type: (str) -> None + self.daemon_refresh_queue.append(host) + if host in self.last_daemon_update: + del self.last_daemon_update[host] + self.mgr.event.set() + + def invalidate_host_devices(self, host): + # type: (str) -> None + self.device_refresh_queue.append(host) + if host in self.last_device_update: + del self.last_device_update[host] + self.mgr.event.set() + + def save_host(self, host): + # type: (str) -> None + j = { # type: ignore + 'daemons': {}, + 'devices': [], + 'daemon_config_deps': {}, + } + if host in self.last_daemon_update: + j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore + if host in self.last_device_update: + j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore + for name, dd in self.daemons[host].items(): + j['daemons'][name] = dd.to_json() # type: ignore + for d in self.devices[host]: + j['devices'].append(d.to_json()) # type: ignore + j['networks'] = self.networks[host] + for name, depi in self.daemon_config_deps[host].items(): + j['daemon_config_deps'][name] = { # type: ignore + 'deps': depi.get('deps', []), + 'last_config': depi['last_config'].strftime(DATEFMT), + } + if host in self.last_host_check: + j['last_host_check']= self.last_host_check[host].strftime(DATEFMT) + self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) + + def rm_host(self, host): + # type: (str) -> None + if host in self.daemons: + del self.daemons[host] + if host in self.devices: + del self.devices[host] + if host in self.networks: + del self.networks[host] + if host in self.last_daemon_update: + del self.last_daemon_update[host] + if host in self.last_device_update: + del self.last_device_update[host] + if host in self.daemon_config_deps: + del self.daemon_config_deps[host] + self.mgr.set_store(HOST_CACHE_PREFIX + host, None) + + def get_hosts(self): + # type: () -> List[str] + r = [] + for host, di in self.daemons.items(): + r.append(host) + return r + + def get_daemons(self): + # type: () -> List[orchestrator.DaemonDescription] + r = [] + for host, dm in self.daemons.items(): + for name, dd in dm.items(): + r.append(dd) + return r + + def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: + for host, dm in self.daemons.items(): + if host in self.mgr.offline_hosts: + def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: + ret = copy(dd) + ret.status = -1 + ret.status_desc = 'host is offline' + return ret + yield host, {name: set_offline(d) for name, d in dm.items()} + else: + yield host, dm + + def get_daemons_by_service(self, service_name): + # type: (str) -> List[orchestrator.DaemonDescription] + result = [] # type: List[orchestrator.DaemonDescription] + for host, dm in self.daemons.items(): + for name, d in dm.items(): + if name.startswith(service_name + '.'): + result.append(d) + return result + + def get_daemon_names(self): + # type: () -> List[str] + r = [] + for host, dm in self.daemons.items(): + for name, dd in dm.items(): + r.append(name) + return r + + def get_daemon_last_config_deps(self, host, name): + if host in self.daemon_config_deps: + if name in self.daemon_config_deps[host]: + return self.daemon_config_deps[host][name].get('deps', []), \ + self.daemon_config_deps[host][name].get('last_config', None) + return None, None + + def host_needs_daemon_refresh(self, host): + # type: (str) -> bool + if host in self.mgr.offline_hosts: + logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh') + return False + if host in self.daemon_refresh_queue: + self.daemon_refresh_queue.remove(host) + return True + cutoff = datetime.datetime.utcnow() - datetime.timedelta( + seconds=self.mgr.daemon_cache_timeout) + if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff: + return True + return False + + def host_needs_device_refresh(self, host): + # type: (str) -> bool + if host in self.mgr.offline_hosts: + logger.debug(f'Host "{host}" marked as offline. Skipping device refresh') + return False + if host in self.device_refresh_queue: + self.device_refresh_queue.remove(host) + return True + cutoff = datetime.datetime.utcnow() - datetime.timedelta( + seconds=self.mgr.device_cache_timeout) + if host not in self.last_device_update or self.last_device_update[host] < cutoff: + return True + return False + + def host_needs_check(self, host): + # type: (str) -> bool + cutoff = datetime.datetime.utcnow() - datetime.timedelta( + seconds=self.mgr.host_check_interval) + return host not in self.last_host_check or self.last_host_check[host] < cutoff + + def add_daemon(self, host, dd): + # type: (str, orchestrator.DaemonDescription) -> None + assert host in self.daemons + self.daemons[host][dd.name()] = dd + + def rm_daemon(self, host, name): + if host in self.daemons: + if name in self.daemons[host]: + del self.daemons[host][name] \ No newline at end of file diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 07cf44ef0d2..ca6ad58e208 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -2,7 +2,6 @@ import json import errno import logging import time -from copy import copy from threading import Event from functools import wraps @@ -42,7 +41,7 @@ from . import remotes from . import utils from .nfs import NFSGanesha from .osd import RemoveUtil, OSDRemoval -from .inventory import Inventory +from .inventory import Inventory, SpecStore, HostCache try: import remoto @@ -70,9 +69,6 @@ Host * DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ' -HOST_CACHE_PREFIX = "host." -SPEC_STORE_PREFIX = "spec." - # ceph daemon types that use the ceph container image. # NOTE: listed in upgrade order! CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror'] @@ -100,307 +96,6 @@ except ImportError: self.cleanup() -class SpecStore(): - def __init__(self, mgr): - # type: (CephadmOrchestrator) -> None - self.mgr = mgr - self.specs = {} # type: Dict[str, ServiceSpec] - self.spec_created = {} # type: Dict[str, datetime.datetime] - - def load(self): - # type: () -> None - for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)): - service_name = k[len(SPEC_STORE_PREFIX):] - try: - v = json.loads(v) - spec = ServiceSpec.from_json(v['spec']) - created = datetime.datetime.strptime(v['created'], DATEFMT) - self.specs[service_name] = spec - self.spec_created[service_name] = created - self.mgr.log.debug('SpecStore: loaded spec for %s' % ( - service_name)) - except Exception as e: - self.mgr.log.warning('unable to load spec for %s: %s' % ( - service_name, e)) - pass - - def save(self, spec): - # type: (ServiceSpec) -> None - self.specs[spec.service_name()] = spec - self.spec_created[spec.service_name()] = datetime.datetime.utcnow() - self.mgr.set_store( - SPEC_STORE_PREFIX + spec.service_name(), - json.dumps({ - 'spec': spec.to_json(), - 'created': self.spec_created[spec.service_name()].strftime(DATEFMT), - }, sort_keys=True), - ) - - def rm(self, service_name): - # type: (str) -> bool - found = service_name in self.specs - if found: - del self.specs[service_name] - del self.spec_created[service_name] - self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) - return found - - def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]: - specs = [] - for sn, spec in self.specs.items(): - if not service_name or \ - sn == service_name or \ - sn.startswith(service_name + '.'): - specs.append(spec) - self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % ( - service_name, specs)) - return specs - -class HostCache(): - def __init__(self, mgr): - # type: (CephadmOrchestrator) -> None - self.mgr: CephadmOrchestrator = mgr - self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] - self.last_daemon_update = {} # type: Dict[str, datetime.datetime] - self.devices = {} # type: Dict[str, List[inventory.Device]] - self.networks = {} # type: Dict[str, Dict[str, List[str]]] - self.last_device_update = {} # type: Dict[str, datetime.datetime] - self.daemon_refresh_queue = [] # type: List[str] - self.device_refresh_queue = [] # type: List[str] - self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] - self.last_host_check = {} # type: Dict[str, datetime.datetime] - - def load(self): - # type: () -> None - for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)): - host = k[len(HOST_CACHE_PREFIX):] - if host not in self.mgr.inventory: - self.mgr.log.warning('removing stray HostCache host record %s' % ( - host)) - self.mgr.set_store(k, None) - try: - j = json.loads(v) - if 'last_device_update' in j: - self.last_device_update[host] = datetime.datetime.strptime( - j['last_device_update'], DATEFMT) - else: - self.device_refresh_queue.append(host) - # for services, we ignore the persisted last_*_update - # and always trigger a new scrape on mgr restart. - self.daemon_refresh_queue.append(host) - self.daemons[host] = {} - self.devices[host] = [] - self.networks[host] = {} - self.daemon_config_deps[host] = {} - for name, d in j.get('daemons', {}).items(): - self.daemons[host][name] = \ - orchestrator.DaemonDescription.from_json(d) - for d in j.get('devices', []): - self.devices[host].append(inventory.Device.from_json(d)) - self.networks[host] = j.get('networks', {}) - for name, d in j.get('daemon_config_deps', {}).items(): - self.daemon_config_deps[host][name] = { - 'deps': d.get('deps', []), - 'last_config': datetime.datetime.strptime( - d['last_config'], DATEFMT), - } - if 'last_host_check' in j: - self.last_host_check[host] = datetime.datetime.strptime( - j['last_host_check'], DATEFMT) - self.mgr.log.debug( - 'HostCache.load: host %s has %d daemons, ' - '%d devices, %d networks' % ( - host, len(self.daemons[host]), len(self.devices[host]), - len(self.networks[host]))) - except Exception as e: - self.mgr.log.warning('unable to load cached state for %s: %s' % ( - host, e)) - pass - - def update_host_daemons(self, host, dm): - # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None - self.daemons[host] = dm - self.last_daemon_update[host] = datetime.datetime.utcnow() - - def update_host_devices_networks(self, host, dls, nets): - # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None - self.devices[host] = dls - self.networks[host] = nets - self.last_device_update[host] = datetime.datetime.utcnow() - - def update_daemon_config_deps(self, host, name, deps, stamp): - self.daemon_config_deps[host][name] = { - 'deps': deps, - 'last_config': stamp, - } - - def update_last_host_check(self, host): - # type: (str) -> None - self.last_host_check[host] = datetime.datetime.utcnow() - - def prime_empty_host(self, host): - # type: (str) -> None - """ - Install an empty entry for a host - """ - self.daemons[host] = {} - self.devices[host] = [] - self.networks[host] = {} - self.daemon_config_deps[host] = {} - self.daemon_refresh_queue.append(host) - self.device_refresh_queue.append(host) - - def invalidate_host_daemons(self, host): - # type: (str) -> None - self.daemon_refresh_queue.append(host) - if host in self.last_daemon_update: - del self.last_daemon_update[host] - self.mgr.event.set() - - def invalidate_host_devices(self, host): - # type: (str) -> None - self.device_refresh_queue.append(host) - if host in self.last_device_update: - del self.last_device_update[host] - self.mgr.event.set() - - def save_host(self, host): - # type: (str) -> None - j = { # type: ignore - 'daemons': {}, - 'devices': [], - 'daemon_config_deps': {}, - } - if host in self.last_daemon_update: - j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore - if host in self.last_device_update: - j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore - for name, dd in self.daemons[host].items(): - j['daemons'][name] = dd.to_json() # type: ignore - for d in self.devices[host]: - j['devices'].append(d.to_json()) # type: ignore - j['networks'] = self.networks[host] - for name, depi in self.daemon_config_deps[host].items(): - j['daemon_config_deps'][name] = { # type: ignore - 'deps': depi.get('deps', []), - 'last_config': depi['last_config'].strftime(DATEFMT), - } - if host in self.last_host_check: - j['last_host_check']= self.last_host_check[host].strftime(DATEFMT) - self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) - - def rm_host(self, host): - # type: (str) -> None - if host in self.daemons: - del self.daemons[host] - if host in self.devices: - del self.devices[host] - if host in self.networks: - del self.networks[host] - if host in self.last_daemon_update: - del self.last_daemon_update[host] - if host in self.last_device_update: - del self.last_device_update[host] - if host in self.daemon_config_deps: - del self.daemon_config_deps[host] - self.mgr.set_store(HOST_CACHE_PREFIX + host, None) - - def get_hosts(self): - # type: () -> List[str] - r = [] - for host, di in self.daemons.items(): - r.append(host) - return r - - def get_daemons(self): - # type: () -> List[orchestrator.DaemonDescription] - r = [] - for host, dm in self.daemons.items(): - for name, dd in dm.items(): - r.append(dd) - return r - - def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: - for host, dm in self.daemons.items(): - if host in self.mgr.offline_hosts: - def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: - ret = copy(dd) - ret.status = -1 - ret.status_desc = 'host is offline' - return ret - yield host, {name: set_offline(d) for name, d in dm.items()} - else: - yield host, dm - - def get_daemons_by_service(self, service_name): - # type: (str) -> List[orchestrator.DaemonDescription] - result = [] # type: List[orchestrator.DaemonDescription] - for host, dm in self.daemons.items(): - for name, d in dm.items(): - if name.startswith(service_name + '.'): - result.append(d) - return result - - def get_daemon_names(self): - # type: () -> List[str] - r = [] - for host, dm in self.daemons.items(): - for name, dd in dm.items(): - r.append(name) - return r - - def get_daemon_last_config_deps(self, host, name): - if host in self.daemon_config_deps: - if name in self.daemon_config_deps[host]: - return self.daemon_config_deps[host][name].get('deps', []), \ - self.daemon_config_deps[host][name].get('last_config', None) - return None, None - - def host_needs_daemon_refresh(self, host): - # type: (str) -> bool - if host in self.mgr.offline_hosts: - logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh') - return False - if host in self.daemon_refresh_queue: - self.daemon_refresh_queue.remove(host) - return True - cutoff = datetime.datetime.utcnow() - datetime.timedelta( - seconds=self.mgr.daemon_cache_timeout) - if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff: - return True - return False - - def host_needs_device_refresh(self, host): - # type: (str) -> bool - if host in self.mgr.offline_hosts: - logger.debug(f'Host "{host}" marked as offline. Skipping device refresh') - return False - if host in self.device_refresh_queue: - self.device_refresh_queue.remove(host) - return True - cutoff = datetime.datetime.utcnow() - datetime.timedelta( - seconds=self.mgr.device_cache_timeout) - if host not in self.last_device_update or self.last_device_update[host] < cutoff: - return True - return False - - def host_needs_check(self, host): - # type: (str) -> bool - cutoff = datetime.datetime.utcnow() - datetime.timedelta( - seconds=self.mgr.host_check_interval) - return host not in self.last_host_check or self.last_host_check[host] < cutoff - - def add_daemon(self, host, dd): - # type: (str, orchestrator.DaemonDescription) -> None - assert host in self.daemons - self.daemons[host][dd.name()] = dd - - def rm_daemon(self, host, name): - if host in self.daemons: - if name in self.daemons[host]: - del self.daemons[host][name] - - class AsyncCompletion(orchestrator.Completion): def __init__(self, _first_promise=None, # type: Optional[orchestrator.Completion] diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py index f4e10d7d0e6..66522925dd5 100644 --- a/src/pybind/mgr/cephadm/tests/fixtures.py +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -56,8 +56,8 @@ def cephadm_module(): mock.patch("cephadm.module.CephadmOrchestrator.remote"),\ mock.patch("cephadm.module.CephadmOrchestrator.set_store", set_store), \ mock.patch("cephadm.module.CephadmOrchestrator.get_store", get_store),\ - mock.patch("cephadm.module.HostCache.save_host"), \ - mock.patch("cephadm.module.HostCache.rm_host"), \ + mock.patch("cephadm.inventory.HostCache.save_host"), \ + mock.patch("cephadm.inventory.HostCache.rm_host"), \ mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \ mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command), \ mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix", get_store_prefix):