Merge pull request #45248 from adk3798/offline-thread

mgr/cephadm: offline host watcher

Reviewed-by: Michael Fritch <mfritch@suse.com>
This commit is contained in:
Adam King 2022-03-30 13:50:07 -04:00 committed by GitHub
commit 7e41eb9f93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 82 additions and 6 deletions

View File

@ -58,8 +58,10 @@ from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore,
ClientKeyringStore, ClientKeyringSpec
from .upgrade import CephadmUpgrade
from .template import TemplateMgr
from .utils import CEPH_IMAGE_TYPES, forall_hosts, cephadmNoImage
from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
cephadmNoImage
from .configchecks import CephadmConfigChecks
from .offline_watcher import OfflineHostWatcher
try:
import asyncssh
@ -544,11 +546,15 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
if self.use_agent:
self.agent_helpers._apply_agent()
self.offline_watcher = OfflineHostWatcher(self)
self.offline_watcher.start()
def shutdown(self) -> None:
self.log.debug('shutdown')
self._worker_pool.close()
self._worker_pool.join()
self.cherrypy_thread.shutdown()
self.offline_watcher.shutdown()
self.run = False
self.event.set()
@ -774,6 +780,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
self.cache.save_host(host)
return None
def update_watched_hosts(self) -> None:
# currently, we are watching hosts with nfs daemons
hosts_to_watch = [d.hostname for d in self.cache.get_daemons(
) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES]
self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None])))
def offline_hosts_remove(self, host: str) -> None:
if host in self.offline_hosts:
self.offline_hosts.remove(host)

View File

@ -0,0 +1,60 @@
import logging
from typing import List, Optional, TYPE_CHECKING
import multiprocessing as mp
import threading
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
logger = logging.getLogger(__name__)
class OfflineHostWatcher(threading.Thread):
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr = mgr
self.hosts: Optional[List[str]] = None
self.new_hosts: Optional[List[str]] = None
self.stop = False
self.event = threading.Event()
super(OfflineHostWatcher, self).__init__(target=self.run)
def run(self) -> None:
self.thread_pool = mp.pool.ThreadPool(10)
while not self.stop:
# only need to take action if we have hosts to check
if self.hosts or self.new_hosts:
if self.new_hosts:
self.hosts = self.new_hosts
self.new_hosts = None
logger.debug(f'OfflineHostDetector: Checking if hosts: {self.hosts} are offline.')
assert self.hosts is not None
self.thread_pool.map(self.check_host, self.hosts)
self.event.wait(20)
self.event.clear()
self.thread_pool.close()
self.thread_pool.join()
def check_host(self, host: str) -> None:
if host not in self.mgr.offline_hosts:
try:
self.mgr.ssh.check_execute_command(host, ['true'])
except Exception:
logger.debug(f'OfflineHostDetector: detected {host} to be offline')
# kick serve loop in case corrective action must be taken for offline host
self.mgr._kick_serve_loop()
def set_hosts(self, hosts: List[str]) -> None:
hosts.sort()
if (not self.hosts or self.hosts != hosts) and hosts:
self.new_hosts = hosts
logger.debug(
f'OfflineHostDetector: Hosts to check if offline swapped to: {self.new_hosts}.')
self.wakeup()
def wakeup(self) -> None:
self.event.set()
def shutdown(self) -> None:
self.stop = True
self.wakeup()

View File

@ -447,5 +447,6 @@ class HostAssignment(object):
continue
in_maintenance[h.hostname] = False
unreachable_hosts = [h.hostname for h in self.unreachable_hosts]
candidates = [c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
candidates = [
c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
return candidates

View File

@ -513,7 +513,7 @@ class CephadmServe:
f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
len(self.mgr.apply_spec_fails),
warnings)
self.mgr.update_watched_hosts()
return r
def _apply_service_config(self, spec: ServiceSpec) -> None:

View File

@ -75,7 +75,8 @@ class SSHManager:
with self.redirect_log(host, addr):
try:
ssh_options = asyncssh.SSHClientConnectionOptions(keepalive_interval=7, keepalive_count_max=3)
ssh_options = asyncssh.SSHClientConnectionOptions(
keepalive_interval=7, keepalive_count_max=3)
conn = await asyncssh.connect(addr, username=self.mgr.ssh_user, client_keys=[self.mgr.tkey.name],
known_hosts=None, config=[self.mgr.ssh_config_fname],
preferred_auth=['publickey'], options=ssh_options)

View File

@ -98,7 +98,8 @@ def with_cephadm_module(module_options=None, store=None):
mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
mock.patch('cephadm.agent.CherryPyThread.run'):
mock.patch('cephadm.agent.CherryPyThread.run'), \
mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'):
m = CephadmOrchestrator.__new__(CephadmOrchestrator)
if module_options is not None:

View File

@ -23,7 +23,8 @@ class CephadmNoImage(Enum):
# NOTE: order important here as these are used for upgrade order
CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror', 'cephfs-mirror']
GATEWAY_TYPES = ['iscsi', 'nfs']
MONITORING_STACK_TYPES = ['node-exporter', 'prometheus', 'alertmanager', 'grafana', 'loki', 'promtail']
MONITORING_STACK_TYPES = ['node-exporter', 'prometheus',
'alertmanager', 'grafana', 'loki', 'promtail']
RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['nfs']
CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES