Merge pull request #34887 from sebastian-philipp/cephadm-move-specstore

mgr/cephadm: move SpecStore and HostCache to inventoy.py

Reviewed-by: Joshua Schmid <jschmid@suse.de>
This commit is contained in:
Sebastian Wagner 2020-05-06 17:10:17 +02:00 committed by GitHub
commit b1a572e8fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 317 additions and 309 deletions

View File

@ -1,7 +1,14 @@
import datetime
from copy import copy
import json
import logging
from typing import TYPE_CHECKING, Dict, List, Iterator
from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple
import six
import orchestrator
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec
from orchestrator import OrchestratorError, HostSpec
if TYPE_CHECKING:
@ -10,6 +17,11 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
HOST_CACHE_PREFIX = "host."
SPEC_STORE_PREFIX = "spec."
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
class Inventory:
def __init__(self, mgr: 'CephadmOrchestrator'):
self.mgr = mgr
@ -86,3 +98,304 @@ class Inventory:
def save(self):
self.mgr.set_store('inventory', json.dumps(self._inventory))
class SpecStore():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
self.mgr = mgr
self.specs = {} # type: Dict[str, ServiceSpec]
self.spec_created = {} # type: Dict[str, datetime.datetime]
def load(self):
# type: () -> None
for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
service_name = k[len(SPEC_STORE_PREFIX):]
try:
v = json.loads(v)
spec = ServiceSpec.from_json(v['spec'])
created = datetime.datetime.strptime(v['created'], DATEFMT)
self.specs[service_name] = spec
self.spec_created[service_name] = created
self.mgr.log.debug('SpecStore: loaded spec for %s' % (
service_name))
except Exception as e:
self.mgr.log.warning('unable to load spec for %s: %s' % (
service_name, e))
pass
def save(self, spec):
# type: (ServiceSpec) -> None
self.specs[spec.service_name()] = spec
self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
self.mgr.set_store(
SPEC_STORE_PREFIX + spec.service_name(),
json.dumps({
'spec': spec.to_json(),
'created': self.spec_created[spec.service_name()].strftime(DATEFMT),
}, sort_keys=True),
)
def rm(self, service_name):
# type: (str) -> bool
found = service_name in self.specs
if found:
del self.specs[service_name]
del self.spec_created[service_name]
self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
return found
def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
specs = []
for sn, spec in self.specs.items():
if not service_name or \
sn == service_name or \
sn.startswith(service_name + '.'):
specs.append(spec)
self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % (
service_name, specs))
return specs
class HostCache():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
self.mgr: CephadmOrchestrator = mgr
self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
self.devices = {} # type: Dict[str, List[inventory.Device]]
self.networks = {} # type: Dict[str, Dict[str, List[str]]]
self.last_device_update = {} # type: Dict[str, datetime.datetime]
self.daemon_refresh_queue = [] # type: List[str]
self.device_refresh_queue = [] # type: List[str]
self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
self.last_host_check = {} # type: Dict[str, datetime.datetime]
def load(self):
# type: () -> None
for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
host = k[len(HOST_CACHE_PREFIX):]
if host not in self.mgr.inventory:
self.mgr.log.warning('removing stray HostCache host record %s' % (
host))
self.mgr.set_store(k, None)
try:
j = json.loads(v)
if 'last_device_update' in j:
self.last_device_update[host] = datetime.datetime.strptime(
j['last_device_update'], DATEFMT)
else:
self.device_refresh_queue.append(host)
# for services, we ignore the persisted last_*_update
# and always trigger a new scrape on mgr restart.
self.daemon_refresh_queue.append(host)
self.daemons[host] = {}
self.devices[host] = []
self.networks[host] = {}
self.daemon_config_deps[host] = {}
for name, d in j.get('daemons', {}).items():
self.daemons[host][name] = \
orchestrator.DaemonDescription.from_json(d)
for d in j.get('devices', []):
self.devices[host].append(inventory.Device.from_json(d))
self.networks[host] = j.get('networks', {})
for name, d in j.get('daemon_config_deps', {}).items():
self.daemon_config_deps[host][name] = {
'deps': d.get('deps', []),
'last_config': datetime.datetime.strptime(
d['last_config'], DATEFMT),
}
if 'last_host_check' in j:
self.last_host_check[host] = datetime.datetime.strptime(
j['last_host_check'], DATEFMT)
self.mgr.log.debug(
'HostCache.load: host %s has %d daemons, '
'%d devices, %d networks' % (
host, len(self.daemons[host]), len(self.devices[host]),
len(self.networks[host])))
except Exception as e:
self.mgr.log.warning('unable to load cached state for %s: %s' % (
host, e))
pass
def update_host_daemons(self, host, dm):
# type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
self.daemons[host] = dm
self.last_daemon_update[host] = datetime.datetime.utcnow()
def update_host_devices_networks(self, host, dls, nets):
# type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
self.devices[host] = dls
self.networks[host] = nets
self.last_device_update[host] = datetime.datetime.utcnow()
def update_daemon_config_deps(self, host, name, deps, stamp):
self.daemon_config_deps[host][name] = {
'deps': deps,
'last_config': stamp,
}
def update_last_host_check(self, host):
# type: (str) -> None
self.last_host_check[host] = datetime.datetime.utcnow()
def prime_empty_host(self, host):
# type: (str) -> None
"""
Install an empty entry for a host
"""
self.daemons[host] = {}
self.devices[host] = []
self.networks[host] = {}
self.daemon_config_deps[host] = {}
self.daemon_refresh_queue.append(host)
self.device_refresh_queue.append(host)
def invalidate_host_daemons(self, host):
# type: (str) -> None
self.daemon_refresh_queue.append(host)
if host in self.last_daemon_update:
del self.last_daemon_update[host]
self.mgr.event.set()
def invalidate_host_devices(self, host):
# type: (str) -> None
self.device_refresh_queue.append(host)
if host in self.last_device_update:
del self.last_device_update[host]
self.mgr.event.set()
def save_host(self, host):
# type: (str) -> None
j = { # type: ignore
'daemons': {},
'devices': [],
'daemon_config_deps': {},
}
if host in self.last_daemon_update:
j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore
if host in self.last_device_update:
j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore
for name, dd in self.daemons[host].items():
j['daemons'][name] = dd.to_json() # type: ignore
for d in self.devices[host]:
j['devices'].append(d.to_json()) # type: ignore
j['networks'] = self.networks[host]
for name, depi in self.daemon_config_deps[host].items():
j['daemon_config_deps'][name] = { # type: ignore
'deps': depi.get('deps', []),
'last_config': depi['last_config'].strftime(DATEFMT),
}
if host in self.last_host_check:
j['last_host_check']= self.last_host_check[host].strftime(DATEFMT)
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
def rm_host(self, host):
# type: (str) -> None
if host in self.daemons:
del self.daemons[host]
if host in self.devices:
del self.devices[host]
if host in self.networks:
del self.networks[host]
if host in self.last_daemon_update:
del self.last_daemon_update[host]
if host in self.last_device_update:
del self.last_device_update[host]
if host in self.daemon_config_deps:
del self.daemon_config_deps[host]
self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
def get_hosts(self):
# type: () -> List[str]
r = []
for host, di in self.daemons.items():
r.append(host)
return r
def get_daemons(self):
# type: () -> List[orchestrator.DaemonDescription]
r = []
for host, dm in self.daemons.items():
for name, dd in dm.items():
r.append(dd)
return r
def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
for host, dm in self.daemons.items():
if host in self.mgr.offline_hosts:
def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
ret = copy(dd)
ret.status = -1
ret.status_desc = 'host is offline'
return ret
yield host, {name: set_offline(d) for name, d in dm.items()}
else:
yield host, dm
def get_daemons_by_service(self, service_name):
# type: (str) -> List[orchestrator.DaemonDescription]
result = [] # type: List[orchestrator.DaemonDescription]
for host, dm in self.daemons.items():
for name, d in dm.items():
if name.startswith(service_name + '.'):
result.append(d)
return result
def get_daemon_names(self):
# type: () -> List[str]
r = []
for host, dm in self.daemons.items():
for name, dd in dm.items():
r.append(name)
return r
def get_daemon_last_config_deps(self, host, name):
if host in self.daemon_config_deps:
if name in self.daemon_config_deps[host]:
return self.daemon_config_deps[host][name].get('deps', []), \
self.daemon_config_deps[host][name].get('last_config', None)
return None, None
def host_needs_daemon_refresh(self, host):
# type: (str) -> bool
if host in self.mgr.offline_hosts:
logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
return False
if host in self.daemon_refresh_queue:
self.daemon_refresh_queue.remove(host)
return True
cutoff = datetime.datetime.utcnow() - datetime.timedelta(
seconds=self.mgr.daemon_cache_timeout)
if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
return True
return False
def host_needs_device_refresh(self, host):
# type: (str) -> bool
if host in self.mgr.offline_hosts:
logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
return False
if host in self.device_refresh_queue:
self.device_refresh_queue.remove(host)
return True
cutoff = datetime.datetime.utcnow() - datetime.timedelta(
seconds=self.mgr.device_cache_timeout)
if host not in self.last_device_update or self.last_device_update[host] < cutoff:
return True
return False
def host_needs_check(self, host):
# type: (str) -> bool
cutoff = datetime.datetime.utcnow() - datetime.timedelta(
seconds=self.mgr.host_check_interval)
return host not in self.last_host_check or self.last_host_check[host] < cutoff
def add_daemon(self, host, dd):
# type: (str, orchestrator.DaemonDescription) -> None
assert host in self.daemons
self.daemons[host][dd.name()] = dd
def rm_daemon(self, host, name):
if host in self.daemons:
if name in self.daemons[host]:
del self.daemons[host][name]

View File

@ -2,7 +2,6 @@ import json
import errno
import logging
import time
from copy import copy
from threading import Event
from functools import wraps
@ -42,7 +41,7 @@ from . import remotes
from . import utils
from .nfs import NFSGanesha
from .osd import RemoveUtil, OSDRemoval
from .inventory import Inventory
from .inventory import Inventory, SpecStore, HostCache
try:
import remoto
@ -70,9 +69,6 @@ Host *
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
HOST_CACHE_PREFIX = "host."
SPEC_STORE_PREFIX = "spec."
# ceph daemon types that use the ceph container image.
# NOTE: listed in upgrade order!
CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
@ -100,307 +96,6 @@ except ImportError:
self.cleanup()
class SpecStore():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
self.mgr = mgr
self.specs = {} # type: Dict[str, ServiceSpec]
self.spec_created = {} # type: Dict[str, datetime.datetime]
def load(self):
# type: () -> None
for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
service_name = k[len(SPEC_STORE_PREFIX):]
try:
v = json.loads(v)
spec = ServiceSpec.from_json(v['spec'])
created = datetime.datetime.strptime(v['created'], DATEFMT)
self.specs[service_name] = spec
self.spec_created[service_name] = created
self.mgr.log.debug('SpecStore: loaded spec for %s' % (
service_name))
except Exception as e:
self.mgr.log.warning('unable to load spec for %s: %s' % (
service_name, e))
pass
def save(self, spec):
# type: (ServiceSpec) -> None
self.specs[spec.service_name()] = spec
self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
self.mgr.set_store(
SPEC_STORE_PREFIX + spec.service_name(),
json.dumps({
'spec': spec.to_json(),
'created': self.spec_created[spec.service_name()].strftime(DATEFMT),
}, sort_keys=True),
)
def rm(self, service_name):
# type: (str) -> bool
found = service_name in self.specs
if found:
del self.specs[service_name]
del self.spec_created[service_name]
self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
return found
def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
specs = []
for sn, spec in self.specs.items():
if not service_name or \
sn == service_name or \
sn.startswith(service_name + '.'):
specs.append(spec)
self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % (
service_name, specs))
return specs
class HostCache():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
self.mgr: CephadmOrchestrator = mgr
self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
self.devices = {} # type: Dict[str, List[inventory.Device]]
self.networks = {} # type: Dict[str, Dict[str, List[str]]]
self.last_device_update = {} # type: Dict[str, datetime.datetime]
self.daemon_refresh_queue = [] # type: List[str]
self.device_refresh_queue = [] # type: List[str]
self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
self.last_host_check = {} # type: Dict[str, datetime.datetime]
def load(self):
# type: () -> None
for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
host = k[len(HOST_CACHE_PREFIX):]
if host not in self.mgr.inventory:
self.mgr.log.warning('removing stray HostCache host record %s' % (
host))
self.mgr.set_store(k, None)
try:
j = json.loads(v)
if 'last_device_update' in j:
self.last_device_update[host] = datetime.datetime.strptime(
j['last_device_update'], DATEFMT)
else:
self.device_refresh_queue.append(host)
# for services, we ignore the persisted last_*_update
# and always trigger a new scrape on mgr restart.
self.daemon_refresh_queue.append(host)
self.daemons[host] = {}
self.devices[host] = []
self.networks[host] = {}
self.daemon_config_deps[host] = {}
for name, d in j.get('daemons', {}).items():
self.daemons[host][name] = \
orchestrator.DaemonDescription.from_json(d)
for d in j.get('devices', []):
self.devices[host].append(inventory.Device.from_json(d))
self.networks[host] = j.get('networks', {})
for name, d in j.get('daemon_config_deps', {}).items():
self.daemon_config_deps[host][name] = {
'deps': d.get('deps', []),
'last_config': datetime.datetime.strptime(
d['last_config'], DATEFMT),
}
if 'last_host_check' in j:
self.last_host_check[host] = datetime.datetime.strptime(
j['last_host_check'], DATEFMT)
self.mgr.log.debug(
'HostCache.load: host %s has %d daemons, '
'%d devices, %d networks' % (
host, len(self.daemons[host]), len(self.devices[host]),
len(self.networks[host])))
except Exception as e:
self.mgr.log.warning('unable to load cached state for %s: %s' % (
host, e))
pass
def update_host_daemons(self, host, dm):
# type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
self.daemons[host] = dm
self.last_daemon_update[host] = datetime.datetime.utcnow()
def update_host_devices_networks(self, host, dls, nets):
# type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
self.devices[host] = dls
self.networks[host] = nets
self.last_device_update[host] = datetime.datetime.utcnow()
def update_daemon_config_deps(self, host, name, deps, stamp):
self.daemon_config_deps[host][name] = {
'deps': deps,
'last_config': stamp,
}
def update_last_host_check(self, host):
# type: (str) -> None
self.last_host_check[host] = datetime.datetime.utcnow()
def prime_empty_host(self, host):
# type: (str) -> None
"""
Install an empty entry for a host
"""
self.daemons[host] = {}
self.devices[host] = []
self.networks[host] = {}
self.daemon_config_deps[host] = {}
self.daemon_refresh_queue.append(host)
self.device_refresh_queue.append(host)
def invalidate_host_daemons(self, host):
# type: (str) -> None
self.daemon_refresh_queue.append(host)
if host in self.last_daemon_update:
del self.last_daemon_update[host]
self.mgr.event.set()
def invalidate_host_devices(self, host):
# type: (str) -> None
self.device_refresh_queue.append(host)
if host in self.last_device_update:
del self.last_device_update[host]
self.mgr.event.set()
def save_host(self, host):
# type: (str) -> None
j = { # type: ignore
'daemons': {},
'devices': [],
'daemon_config_deps': {},
}
if host in self.last_daemon_update:
j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore
if host in self.last_device_update:
j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore
for name, dd in self.daemons[host].items():
j['daemons'][name] = dd.to_json() # type: ignore
for d in self.devices[host]:
j['devices'].append(d.to_json()) # type: ignore
j['networks'] = self.networks[host]
for name, depi in self.daemon_config_deps[host].items():
j['daemon_config_deps'][name] = { # type: ignore
'deps': depi.get('deps', []),
'last_config': depi['last_config'].strftime(DATEFMT),
}
if host in self.last_host_check:
j['last_host_check']= self.last_host_check[host].strftime(DATEFMT)
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
def rm_host(self, host):
# type: (str) -> None
if host in self.daemons:
del self.daemons[host]
if host in self.devices:
del self.devices[host]
if host in self.networks:
del self.networks[host]
if host in self.last_daemon_update:
del self.last_daemon_update[host]
if host in self.last_device_update:
del self.last_device_update[host]
if host in self.daemon_config_deps:
del self.daemon_config_deps[host]
self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
def get_hosts(self):
# type: () -> List[str]
r = []
for host, di in self.daemons.items():
r.append(host)
return r
def get_daemons(self):
# type: () -> List[orchestrator.DaemonDescription]
r = []
for host, dm in self.daemons.items():
for name, dd in dm.items():
r.append(dd)
return r
def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
for host, dm in self.daemons.items():
if host in self.mgr.offline_hosts:
def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
ret = copy(dd)
ret.status = -1
ret.status_desc = 'host is offline'
return ret
yield host, {name: set_offline(d) for name, d in dm.items()}
else:
yield host, dm
def get_daemons_by_service(self, service_name):
# type: (str) -> List[orchestrator.DaemonDescription]
result = [] # type: List[orchestrator.DaemonDescription]
for host, dm in self.daemons.items():
for name, d in dm.items():
if name.startswith(service_name + '.'):
result.append(d)
return result
def get_daemon_names(self):
# type: () -> List[str]
r = []
for host, dm in self.daemons.items():
for name, dd in dm.items():
r.append(name)
return r
def get_daemon_last_config_deps(self, host, name):
if host in self.daemon_config_deps:
if name in self.daemon_config_deps[host]:
return self.daemon_config_deps[host][name].get('deps', []), \
self.daemon_config_deps[host][name].get('last_config', None)
return None, None
def host_needs_daemon_refresh(self, host):
# type: (str) -> bool
if host in self.mgr.offline_hosts:
logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
return False
if host in self.daemon_refresh_queue:
self.daemon_refresh_queue.remove(host)
return True
cutoff = datetime.datetime.utcnow() - datetime.timedelta(
seconds=self.mgr.daemon_cache_timeout)
if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
return True
return False
def host_needs_device_refresh(self, host):
# type: (str) -> bool
if host in self.mgr.offline_hosts:
logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
return False
if host in self.device_refresh_queue:
self.device_refresh_queue.remove(host)
return True
cutoff = datetime.datetime.utcnow() - datetime.timedelta(
seconds=self.mgr.device_cache_timeout)
if host not in self.last_device_update or self.last_device_update[host] < cutoff:
return True
return False
def host_needs_check(self, host):
# type: (str) -> bool
cutoff = datetime.datetime.utcnow() - datetime.timedelta(
seconds=self.mgr.host_check_interval)
return host not in self.last_host_check or self.last_host_check[host] < cutoff
def add_daemon(self, host, dd):
# type: (str, orchestrator.DaemonDescription) -> None
assert host in self.daemons
self.daemons[host][dd.name()] = dd
def rm_daemon(self, host, name):
if host in self.daemons:
if name in self.daemons[host]:
del self.daemons[host][name]
class AsyncCompletion(orchestrator.Completion):
def __init__(self,
_first_promise=None, # type: Optional[orchestrator.Completion]

View File

@ -56,8 +56,8 @@ def cephadm_module():
mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
mock.patch("cephadm.module.CephadmOrchestrator.set_store", set_store), \
mock.patch("cephadm.module.CephadmOrchestrator.get_store", get_store),\
mock.patch("cephadm.module.HostCache.save_host"), \
mock.patch("cephadm.module.HostCache.rm_host"), \
mock.patch("cephadm.inventory.HostCache.save_host"), \
mock.patch("cephadm.inventory.HostCache.rm_host"), \
mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \
mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command), \
mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix", get_store_prefix):