Merge PR #31385 into master

* refs/pull/31385/head:
	mgr/ssh: add 'osd rm'
	mgr/ssh: keep inventory and service cache keys in sync
	mgr/orchestrator_cli: fix 'service ls' table format
	mgr/ssh: invalidate service state on create, remove, and service action
	mgr/orchestrator: add invalidate() to OutdatableDictMixin
	mgr/ssh: cache services
	mgr/ssh: drop specialized _remove_{mds,mgr,rgw}
	mgr/ssh: change inventory_cache_timeout to seconds (not minutes)
	mgr/orchestrator: make wait poll every 1s (not 5s)

Reviewed-by: Sebastian Wagner <swagner@suse.com>
This commit is contained in:
Sage Weil 2019-11-06 09:33:36 -06:00
commit 22523d97b6
4 changed files with 103 additions and 70 deletions

View File

@ -301,7 +301,7 @@ This is an overview of the current implementation status of the orchestrators.
mon update ⚪ ✔ ⚪ ✔
osd create ✔ ✔ ⚪ ✔
osd device {ident,fault}-{on,off} ⚪ ⚪ ⚪ ⚪
osd rm ✔ ⚪ ⚪
osd rm ✔ ⚪ ⚪
device {ident,fault}-(on,off} ⚪ ⚪ ⚪ ⚪
device ls ✔ ✔ ✔ ✔
service ls ⚪ ✔ ✔ ✔

View File

@ -1066,7 +1066,7 @@ class OrchestratorClientMixin(Orchestrator):
self._update_completion_progress(c)
while not self.wait(completions):
if any(c.should_wait for c in completions):
time.sleep(5)
time.sleep(1)
else:
break
for c in completions:
@ -1115,13 +1115,13 @@ class OutdatableData(object):
def from_json(cls, data):
return cls(data['data'], cls.time_from_string(data['last_refresh']))
def outdated(self, timeout_min=None):
if timeout_min is None:
timeout_min = 10
def outdated(self, timeout=None):
if timeout is None:
timeout = 600
if self.last_refresh is None:
return True
cutoff = datetime.datetime.utcnow() - datetime.timedelta(
minutes=timeout_min)
seconds=timeout)
return self.last_refresh < cutoff
def __repr__(self):
@ -1166,6 +1166,10 @@ class OutdatableDictMixin(object):
for o in outdated:
del self[o]
def invalidate(self, key):
self[key] = OutdatableData(self[key].data,
datetime.datetime.fromtimestamp(0))
class OutdatablePersistentDict(OutdatableDictMixin, PersistentStoreDict):
pass

View File

@ -260,8 +260,12 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
return HandleCommandResult(stdout=json.dumps(data))
else:
table = PrettyTable(
['type', 'id', 'host', 'container', 'version', 'status', 'description'],
['TYPE', 'ID', 'HOST', 'CONTAINER', 'VERSION', 'STATUS',
'DESCRIPTION'],
border=False)
table.align = 'l'
table.left_padding_width = 0
table.right_padding_width = 1
for s in services:
if s.service is None:
service_id = s.service_instance

View File

@ -106,7 +106,7 @@ def log_exceptions(f):
return wrapper
class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
class SSHOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
_STORE_HOST_PREFIX = "host"
@ -120,10 +120,16 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
},
{
'name': 'inventory_cache_timeout',
'type': 'int',
'default': 10,
'type': 'seconds',
'default': 10 * 60,
'desc': 'seconds to cache device inventory',
},
{
'name': 'service_cache_timeout',
'type': 'seconds',
'default': 60,
'desc': 'seconds to cache service (daemon) inventory',
},
]
COMMANDS = [
@ -166,8 +172,17 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
self.inventory_cache = orchestrator.OutdatablePersistentDict(
self, self._STORE_HOST_PREFIX + '.devices')
self.daemon_cache = orchestrator.OutdatablePersistentDict(
self, self._STORE_HOST_PREFIX + '.daemons')
self.service_cache = orchestrator.OutdatablePersistentDict(
self, self._STORE_HOST_PREFIX + '.services')
# ensure the host lists are in sync
for h in set(self.inventory_cache.keys()) | set(self.service_cache.keys()):
if h not in self.inventory_cache:
self.log.debug('adding inventory item for %s' % h)
self.inventory_cache[h] = orchestrator.OutdatableData()
if h not in self.service_cache:
self.log.debug('adding service item for %s' % h)
self.service_cache[h] = orchestrator.OutdatableData()
def config_notify(self):
"""
@ -418,6 +433,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
@log_exceptions
def run(host):
self.inventory_cache[host] = orchestrator.OutdatableData()
self.service_cache[host] = orchestrator.OutdatableData()
return "Added host '{}'".format(host)
return SSHWriteCompletion(
@ -432,6 +448,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
@log_exceptions
def run(host):
del self.inventory_cache[host]
del self.service_cache[host]
return "Removed host '{}'".format(host)
return SSHWriteCompletion(
@ -450,20 +467,42 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache]
return orchestrator.TrivialReadCompletion(nodes)
def _refresh_host_services(self, host):
out, code = self._run_ceph_daemon(
host, 'mon', 'ls', [], no_fsid=True)
data = json.loads(''.join(out))
self.log.debug('refreshed host %s services: %s' % (host, data))
self.service_cache[host] = orchestrator.OutdatableData(data)
return data
def _get_services(self,
service_type=None,
service_name=None,
service_id=None,
node_name=None):
daemons = {}
for host, _ in self._get_hosts():
self.log.info("refresh stale daemons for '{}'".format(host))
out, code = self._run_ceph_daemon(
host, 'mon', 'ls', [], no_fsid=True)
daemons[host] = json.loads(''.join(out))
node_name=None,
refresh=False):
hosts = []
wait_for = []
for host, host_info in self.service_cache.items_filtered():
hosts.append(host)
if host_info.outdated(self.service_cache_timeout) or refresh:
self.log.info("refresing stale services for '{}'".format(host))
wait_for.append(
SSHReadCompletion(self._worker_pool.apply_async(
self._refresh_host_services, (host,))))
else:
self.log.debug('have recent services for %s: %s' % (
host, host_info.data))
wait_for.append(
orchestrator.TrivialReadCompletion([host_info.data]))
self._orchestrator_wait(wait_for)
services = {}
for host, c in zip(hosts, wait_for):
services[host] = c.result[0]
result = []
for host, ls in daemons.items():
for host, ls in services.items():
for d in ls:
if not d['style'].startswith('ceph-daemon'):
self.log.debug('ignoring non-ceph-daemon on %s: %s' % (host, d))
@ -504,7 +543,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
service_type + " unsupported")
result = self._get_services(service_type,
service_id=service_id,
node_name=node_name)
node_name=node_name,
refresh=refresh)
return orchestrator.TrivialReadCompletion(result)
def service_action(self, action, service_type,
@ -560,6 +600,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
host, name, 'unit',
['--name', name, a],
error_ok=True)
self.service_cache.invalidate(host)
self.log.debug('_service_action code %s out %s' % (code, out))
return "{} {} from host '{}'".format(action, name, host)
@ -700,6 +741,17 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
return SSHWriteCompletion(result)
def remove_osds(self, name):
daemons = self._get_services('osd', service_id=name)
results = []
for d in daemons:
results.append(self._worker_pool.apply_async(
self._remove_daemon,
('osd.%s' % d.service_instance, d.nodename)))
if not results:
raise OrchestratorError('Unable to find osd.%s' % name)
return SSHWriteCompletion(results)
def _create_daemon(self, daemon_type, daemon_id, host, keyring,
extra_args=[]):
conn = self._get_connection(host)
@ -732,7 +784,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
] + extra_args,
stdin=j)
self.log.debug('create_daemon code %s out %s' % (code, out))
self.service_cache.invalidate(host)
return "(Re)deployed {} on host '{}'".format(name, host)
except Exception as e:
@ -743,26 +795,16 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
self.log.info("create_daemon({}): finished".format(host))
conn.exit()
def _remove_daemon(self, daemon_type, daemon_id, host):
def _remove_daemon(self, name, host):
"""
Remove a daemon
"""
conn = self._get_connection(host)
try:
name = '%s.%s' % (daemon_type, daemon_id)
out, code = self._run_ceph_daemon(host, name, 'rm-daemon', [])
self.log.debug('remove_daemon code %s out %s' % (code, out))
return "Removed {} on host '{}'".format(name, host)
except Exception as e:
self.log.error("remove_daemon({}): error: {}".format(host, e))
raise
finally:
self.log.info("remove_daemon({}): finished".format(host))
conn.exit()
out, code = self._run_ceph_daemon(
host, name, 'rm-daemon',
['--name', name])
self.log.debug('_remove_daemon code %s out %s' % (code, out))
self.service_cache.invalidate(host)
return "Removed {} from host '{}'".format(name, host)
def _create_mon(self, host, network):
"""
@ -836,14 +878,6 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
return self._create_daemon('mgr', name, host, keyring)
def _remove_mgr(self, mgr_id, host):
name = 'mgr.' + mgr_id
out, code = self._run_ceph_daemon(
host, name, 'rm-daemon',
['--name', name])
self.log.debug('remove_mgr code %s out %s' % (code, out))
return "Removed {} from host '{}'".format(name, host)
def update_mgrs(self, num, hosts):
"""
Adjust the number of cluster managers.
@ -868,11 +902,12 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
connected.append(mgr_map['active_name'])
for standby in mgr_map['standbys']:
connected.append(standby['name'])
for daemon in daemons:
if daemon.service_instance not in connected:
for d in daemons:
if d.service_instance not in connected:
result = self._worker_pool.apply_async(
self._remove_mgr,
(daemon.service_instance, daemon.nodename))
self._remove_daemon,
('%s.%s' % (d.service_type, d.service_instance),
d.nodename))
results.append(result)
num_to_remove -= 1
if num_to_remove == 0:
@ -940,7 +975,9 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
to_remove = len(daemons) - spec.count
for d in daemons[0:to_remove]:
results.append(self._worker_pool.apply_async(
self._remove_mds, (d.service_instance, d.nodename)))
self._remove_daemon,
('%s.%s' % (d.service_type, d.service_instance),
d.nodename)))
elif len(daemons) < spec.count:
# add some
spec.count -= len(daemons)
@ -964,19 +1001,13 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
for d in daemons:
if d.service_instance == name or d.service_instance.startswith(name + '.'):
results.append(self._worker_pool.apply_async(
self._remove_mds, (d.service_instance, d.nodename)))
self._remove_daemon,
('%s.%s' % (d.service_type, d.service_instance),
d.nodename)))
if not results:
raise OrchestratorError('Unable to find mds.%s[-*] daemon(s)' % name)
return SSHWriteCompletion(results)
def _remove_mds(self, mds_id, host):
name = 'mds.' + mds_id
out, code = self._run_ceph_daemon(
host, name, 'rm-daemon',
['--name', name])
self.log.debug('remove_mds code %s out %s' % (code, out))
return "Removed {} from host '{}'".format(name, host)
def add_rgw(self, spec):
if len(spec.placement.nodes) < spec.count:
raise RuntimeError("must specify at least %d hosts" % spec.count)
@ -1023,19 +1054,13 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
for d in daemons:
if d.service_instance == name or d.service_instance.startswith(name + '.'):
results.append(self._worker_pool.apply_async(
self._remove_rgw, (d.service_instance, d.nodename)))
self._remove_daemon,
('%s.%s' % (d.service_type, d.service_instance),
d.nodename)))
if not results:
raise RuntimeError('Unable to find rgw.%s[-*] daemon(s)' % name)
return SSHWriteCompletion(results)
def _remove_rgw(self, rgw_id, host):
name = 'rgw.' + rgw_id
out, code = self._run_ceph_daemon(
host, name, 'rm-daemon',
['--name', name])
self.log.debug('remove_rgw code %s out %s' % (code, out))
return "Removed {} from host '{}'".format(name, host)
def update_rgw(self, spec):
daemons = self._get_services('rgw', service_name=spec.name)
results = []