Merge pull request #34084 from sebastian-philipp/cephadm-host-ls-offline

mgr/cephadm: enhance `host ls`

Reviewed-by: Joshua Schmid <jschmid@suse.de>
Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sebastian Wagner 2020-04-10 22:56:20 +02:00 committed by GitHub
commit c97e7afe5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 50 deletions

View File

@ -2,7 +2,7 @@ import json
import errno
import logging
import time
import yaml
from copy import copy
from threading import Event
from functools import wraps
@ -60,10 +60,13 @@ except ImportError:
logger = logging.getLogger(__name__)
DEFAULT_SSH_CONFIG = ('Host *\n'
'User root\n'
'StrictHostKeyChecking no\n'
'UserKnownHostsFile /dev/null\n')
DEFAULT_SSH_CONFIG = """
Host *
User root
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
ConnectTimeout=30
"""
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
@ -156,7 +159,7 @@ class SpecStore():
class HostCache():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
self.mgr = mgr
self.mgr: CephadmOrchestrator = mgr
self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
self.devices = {} # type: Dict[str, List[inventory.Device]]
@ -317,6 +320,18 @@ class HostCache():
r.append(dd)
return r
def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
for host, dm in self.daemons.items():
if host in self.mgr.offline_hosts:
def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
ret = copy(dd)
ret.status = -1
ret.status_desc = 'host is offline'
return ret
yield host, {name: set_offline(d) for name, d in dm.items()}
else:
yield host, dm
def get_daemons_by_service(self, service_name):
# type: (str) -> List[orchestrator.DaemonDescription]
result = [] # type: List[orchestrator.DaemonDescription]
@ -343,6 +358,9 @@ class HostCache():
def host_needs_daemon_refresh(self, host):
# type: (str) -> bool
if host in self.mgr.offline_hosts:
logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
return False
if host in self.daemon_refresh_queue:
self.daemon_refresh_queue.remove(host)
return True
@ -354,6 +372,9 @@ class HostCache():
def host_needs_device_refresh(self, host):
# type: (str) -> bool
if host in self.mgr.offline_hosts:
logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
return False
if host in self.device_refresh_queue:
self.device_refresh_queue.remove(host)
return True
@ -707,6 +728,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
if h not in self.inventory:
self.cache.rm_host(h)
# in-memory only.
self.offline_hosts: Set[str] = set()
def shutdown(self):
self.log.debug('shutdown')
self._worker_pool.close()
@ -997,38 +1021,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
self._save_upgrade_state()
return
def _check_hosts(self):
self.log.debug('_check_hosts')
bad_hosts = []
hosts = self.inventory.keys()
for host in hosts:
if host not in self.inventory:
continue
self.log.debug(' checking %s' % host)
try:
out, err, code = self._run_cephadm(
host, 'client', 'check-host', [],
error_ok=True, no_fsid=True)
if code:
self.log.debug(' host %s failed check' % host)
if self.warn_on_failed_host_check:
bad_hosts.append('host %s failed check: %s' % (host, err))
else:
self.log.debug(' host %s ok' % host)
except Exception as e:
self.log.debug(' host %s failed check' % host)
bad_hosts.append('host %s failed check: %s' % (host, e))
if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
if bad_hosts:
self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
'severity': 'warning',
'summary': '%d hosts fail cephadm check' % len(bad_hosts),
'count': len(bad_hosts),
'detail': bad_hosts,
}
self.set_health_checks(self.health_checks)
def _check_host(self, host):
if host not in self.inventory:
return
@ -1338,6 +1330,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
conn.exit()
self._cons = {}
def offline_hosts_remove(self, host):
if host in self.offline_hosts:
self.offline_hosts.remove(host)
@staticmethod
def can_run():
if remoto is not None:
@ -1564,6 +1561,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
if not addr and host in self.inventory:
addr = self.inventory[host].get('addr', host)
self.offline_hosts_remove(host)
try:
try:
conn, connr = self._get_connection(addr)
@ -1648,6 +1647,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
# this is a misleading exception as it seems to be thrown for
# any sort of connection failure, even those having nothing to
# do with "host not found" (e.g., ssh key permission denied).
self.offline_hosts.add(host)
user = 'root' if self.mode == 'root' else 'cephadm'
msg = f'Failed to connect to {host} ({addr}). ' \
f'Check that the host is reachable and accepts connections using the cephadm SSH key\n' \
@ -1686,6 +1686,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
self.inventory[spec.hostname] = spec.to_json()
self._save_inventory()
self.cache.prime_empty_host(spec.hostname)
self.offline_hosts_remove(spec.hostname)
self.event.set() # refresh stray health check
self.log.info('Added host %s' % spec.hostname)
return "Added host '{}'".format(spec.hostname)
@ -1732,7 +1733,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
hostname,
addr=info.get('addr', hostname),
labels=info.get('labels', []),
status=info.get('status', ''),
status='Offline' if hostname in self.offline_hosts else info.get('status', ''),
))
return r
@ -1865,7 +1866,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
self._refresh_host_daemons(host)
# <service_map>
sm = {} # type: Dict[str, orchestrator.ServiceDescription]
for h, dm in self.cache.daemons.items():
for h, dm in self.cache.get_daemons_with_volatile_status():
for name, dd in dm.items():
if service_type and service_type != dd.daemon_type:
continue
@ -1936,7 +1937,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
for hostname, hi in self.inventory.items():
self._refresh_host_daemons(hostname)
result = []
for h, dm in self.cache.daemons.items():
for h, dm in self.cache.get_daemons_with_volatile_status():
if host and h != host:
continue
for name, dd in dm.items():

View File

@ -56,7 +56,6 @@ def cephadm_module():
mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
mock.patch("cephadm.module.CephadmOrchestrator.set_store", set_store), \
mock.patch("cephadm.module.CephadmOrchestrator.get_store", get_store),\
mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')), \
mock.patch("cephadm.module.HostCache.save_host"), \
mock.patch("cephadm.module.HostCache.rm_host"), \
mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \
@ -76,6 +75,7 @@ def cephadm_module():
}
m.__init__('cephadm', 0, 0)
m._cluster_fsid = "fsid"
m.mode = "root"
yield m

View File

@ -12,6 +12,8 @@ try:
except ImportError:
pass
from execnet.gateway_bootstrap import HostNotFound
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
NFSServiceSpec, IscsiServiceSpec
from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
@ -48,6 +50,7 @@ class TestCephadm(object):
new_mgr = cephadm_module.get_unique_name('mgr', 'myhost', existing)
match_glob(new_mgr, 'myhost.*')
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_host(self, cephadm_module):
assert wait(cephadm_module, cephadm_module.get_hosts()) == []
with self._with_host(cephadm_module, 'test'):
@ -66,6 +69,7 @@ class TestCephadm(object):
assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
assert wait(cephadm_module, cephadm_module.get_hosts()) == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_service_ls(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
@ -123,6 +127,7 @@ class TestCephadm(object):
assert out == expected
assert [ServiceDescription.from_json(o).to_json() for o in expected] == expected
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_device_ls(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.get_inventory()
@ -153,6 +158,7 @@ class TestCephadm(object):
assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
@ -164,6 +170,7 @@ class TestCephadm(object):
c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
wait(cephadm_module, c)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mgr_update(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
@ -444,3 +451,22 @@ class TestCephadm(object):
assert wait(cephadm_module, c) == 'Scheduled node-exporter update...'
assert [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())] == [spec]
assert [d.spec for d in wait(cephadm_module, cephadm_module.describe_service(service_name='node-exporter.my_exporter'))] == [spec]
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("remoto.process.check")
def test_offline(self, _check, _get_connection, cephadm_module):
_check.return_value = '{}', '', 0
_get_connection.return_value = mock.Mock(), mock.Mock()
with self._with_host(cephadm_module, 'test'):
_get_connection.side_effect = HostNotFound
code, out, err = cephadm_module.check_host('test')
assert out == ''
assert 'Failed to connect to test (test)' in err
out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
assert out == HostSpec('test', 'test', status='Offline').to_json()
_get_connection.side_effect = None
assert cephadm_module._check_host('test') is None
out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
assert out == HostSpec('test', 'test').to_json()

View File

@ -1323,6 +1323,10 @@ class DaemonDescription(object):
c[k] = datetime.datetime.strptime(c[k], DATEFMT)
return cls(**c)
def __copy__(self):
# feel free to change this:
return DaemonDescription.from_json(self.to_json())
class ServiceDescription(object):
"""
For responding to queries about the status of a particular service,

View File

@ -25,7 +25,7 @@ from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_comma
raise_if_exception, _cli_write_command, TrivialReadCompletion, OrchestratorError, \
NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \
RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
ServiceDescription, IscsiServiceSpec
ServiceDescription, DaemonDescription, IscsiServiceSpec
def nice_delta(now, t, suffix=''):
if t:
@ -410,7 +410,7 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule):
refresh=refresh)
self._orchestrator_wait([completion])
raise_if_exception(completion)
daemons = completion.result
daemons: List[DaemonDescription] = completion.result
def ukn(s):
return '<unknown>' if s is None else s
@ -432,12 +432,15 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule):
table.left_padding_width = 0
table.right_padding_width = 2
for s in sorted(daemons, key=lambda s: s.name()):
status = {
-1: 'error',
0: 'stopped',
1: 'running',
None: '<unknown>'
}[s.status]
if s.status_desc:
status = s.status_desc
else:
status = {
-1: 'error',
0: 'stopped',
1: 'running',
None: '<unknown>'
}[s.status]
if s.status == 1 and s.started:
status += ' (%s)' % to_pretty_timedelta(now - s.started)

View File

@ -5,3 +5,5 @@ requests-mock
pyyaml
prettytable
pyOpenSSL
execnet
remoto