mgr/rbd_support: recover from rados client blocklisting

In certain scenarios the OSDs were slow to process RBD requests.
This lead to the rbd_support module's RBD client not being able to
gracefully handover a RBD exclusive lock to another RBD client.
After the condition persisted for some time, the other RBD client
forcefully acquired the lock by blocklisting the rbd_support module's
RBD client, and consequently blocklisted the module's RADOS client. The
rbd_support module stopped working. To recover the module, the entire
mgr service had to be restarted which reloaded other mgr modules.

Instead of recovering the rbd_support module from client blocklisting
by being disruptive to other mgr modules, recover the module
automatically without restarting the mgr serivce. On client getting
blocklisted, shutdown the module's handlers and blocklisted client,
create a new rados client for the module, and start the new handlers.

Fixes: https://tracker.ceph.com/issues/56724
Signed-off-by: Ramana Raja <rraja@redhat.com>
This commit is contained in:
Ramana Raja 2023-02-15 10:12:54 -05:00
parent e452899013
commit cc0468738e
7 changed files with 107 additions and 9 deletions

View File

@ -1342,6 +1342,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
addrs = self._rados.get_addrs()
self._rados.shutdown()
self._ceph_unregister_client(addrs)
self._rados = None
@API.expose
def get(self, data_name: str) -> Any:

View File

@ -48,6 +48,7 @@ class CreateSnapshotRequests:
self.wait_for_pending()
def wait_for_pending(self) -> None:
self.log.debug("CreateSnapshotRequests.wait_for_pending")
with self.lock:
while self.pending:
self.condition.wait()
@ -330,7 +331,6 @@ class MirrorSnapshotScheduleHandler:
lock = Lock()
condition = Condition(lock)
thread = None
def __init__(self, module: Any) -> None:
self.module = module
@ -340,16 +340,23 @@ class MirrorSnapshotScheduleHandler:
self.init_schedule_queue()
self.stop_thread = False
self.thread = Thread(target=self.run)
self.thread.start()
def _cleanup(self) -> None:
def shutdown(self) -> None:
self.log.info("MirrorSnapshotScheduleHandler: shutting down")
self.stop_thread = True
if self.thread.is_alive():
self.log.debug("MirrorSnapshotScheduleHandler: joining thread")
self.thread.join()
self.create_snapshot_requests.wait_for_pending()
self.log.info("MirrorSnapshotScheduleHandler: shut down")
def run(self) -> None:
try:
self.log.info("MirrorSnapshotScheduleHandler: starting")
while True:
while not self.stop_thread:
refresh_delay = self.refresh_images()
with self.lock:
(image_spec, wait_time) = self.dequeue()
@ -361,6 +368,9 @@ class MirrorSnapshotScheduleHandler:
with self.lock:
self.enqueue(datetime.now(), pool_id, namespace, image_id)
except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
self.log.exception("MirrorSnapshotScheduleHandler: client blocklisted")
self.module.client_blocklisted.set()
except Exception as ex:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
@ -451,6 +461,8 @@ class MirrorSnapshotScheduleHandler:
self.log.debug(
"load_pool_images: adding image {}".format(name))
images[pool_id][namespace][image_id] = name
except rbd.ConnectionShutdown:
raise
except Exception as e:
self.log.error(
"load_pool_images: exception when scanning pool {}: {}".format(

View File

@ -8,9 +8,11 @@ import functools
import inspect
import rados
import rbd
import traceback
from typing import cast, Any, Callable, Optional, Tuple, TypeVar
from mgr_module import CLIReadCommand, CLIWriteCommand, MgrModule, Option
from threading import Thread, Event
from .common import NotAuthorizedError
from .mirror_snapshot_schedule import image_validator, namespace_validator, \
@ -35,6 +37,8 @@ FuncT = TypeVar('FuncT', bound=Callable)
def with_latest_osdmap(func: FuncT) -> FuncT:
@functools.wraps(func)
def wrapper(self: 'Module', *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
if not self.module_ready:
return -errno.EAGAIN, "", ""
# ensure we have latest pools available
self.rados.wait_for_latest_osdmap()
try:
@ -46,6 +50,10 @@ def with_latest_osdmap(func: FuncT) -> FuncT:
# log the full traceback but don't send it to the CLI user
self.log.exception("Fatal runtime error: ")
raise
except (rados.ConnectionShutdown, rbd.ConnectionShutdown) as ex:
self.log.debug("with_latest_osdmap: client blocklisted")
self.client_blocklisted.set()
return -errno.EAGAIN, "", str(ex)
except rados.Error as ex:
return -ex.errno, "", str(ex)
except rbd.OSError as ex:
@ -74,11 +82,46 @@ class Module(MgrModule):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self.client_blocklisted = Event()
self.recovery_thread = Thread(target=self.run)
self.recovery_thread.start()
self.setup()
def setup(self) -> None:
self.log.info("starting setup")
# new client is created and registed in the MgrMap implicitly
# as 'rados' is a property attribute.
self.rados.wait_for_latest_osdmap()
self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self)
self.perf = PerfHandler(self)
self.task = TaskHandler(self)
self.trash_purge_schedule = TrashPurgeScheduleHandler(self)
self.log.info("setup complete")
self.module_ready = True
def run(self) -> None:
self.log.info("recovery thread starting")
try:
while True:
# block until rados client is blocklisted
self.client_blocklisted.wait()
self.log.info("restarting")
self.shutdown()
self.client_blocklisted.clear()
self.setup()
self.log.info("restarted")
except Exception as ex:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
def shutdown(self) -> None:
self.module_ready = False
self.mirror_snapshot_schedule.shutdown()
self.trash_purge_schedule.shutdown()
self.task.shutdown()
self.perf.shutdown()
# shut down client and deregister it from MgrMap
super().shutdown()
@CLIWriteCommand('rbd mirror snapshot schedule add')
@with_latest_osdmap

View File

@ -71,7 +71,6 @@ class PerfHandler:
lock = Lock()
query_condition = Condition(lock)
refresh_condition = Condition(lock)
thread = None
image_name_cache: Dict[Tuple[int, str], Dict[str, str]] = {}
image_name_refresh_time = datetime.fromtimestamp(0)
@ -118,13 +117,22 @@ class PerfHandler:
self.module = module
self.log = module.log
self.stop_thread = False
self.thread = Thread(target=self.run)
self.thread.start()
def shutdown(self) -> None:
self.log.info("PerfHandler: shutting down")
self.stop_thread = True
if self.thread.is_alive():
self.log.debug("PerfHandler: joining thread")
self.thread.join()
self.log.info("PerfHandler: shut down")
def run(self) -> None:
try:
self.log.info("PerfHandler: starting")
while True:
while not self.stop_thread:
with self.lock:
self.scrub_expired_queries()
self.process_raw_osd_perf_counters()
@ -135,6 +143,9 @@ class PerfHandler:
self.log.debug("PerfHandler: tick")
except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
self.log.exception("PerfHandler: client blocklisted")
self.module.client_blocklisted.set()
except Exception as ex:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))

View File

@ -418,6 +418,8 @@ class Schedules:
with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
self.load_from_pool(ioctx, namespace_validator,
image_validator)
except rados.ConnectionShutdown:
raise
except rados.Error as e:
self.handler.log.error(
"Failed to load schedules for pool {}: {}".format(

View File

@ -153,7 +153,6 @@ MigrationStatusT = Dict[str, str]
class TaskHandler:
lock = Lock()
condition = Condition(lock)
thread = None
in_progress_task = None
tasks_by_sequence: Dict[int, Task] = dict()
@ -170,6 +169,7 @@ class TaskHandler:
with self.lock:
self.init_task_queue()
self.stop_thread = False
self.thread = Thread(target=self.run)
self.thread.start()
@ -191,10 +191,18 @@ class TaskHandler:
return (match.group(1) or self.default_pool_name, match.group(2) or '',
match.group(3))
def shutdown(self) -> None:
self.log.info("TaskHandler: shutting down")
self.stop_thread = True
if self.thread.is_alive():
self.log.debug("TaskHandler: joining thread")
self.thread.join()
self.log.info("TaskHandler: shut down")
def run(self) -> None:
try:
self.log.info("TaskHandler: starting")
while True:
while not self.stop_thread:
with self.lock:
now = datetime.now()
for sequence in sorted([sequence for sequence, task
@ -205,6 +213,9 @@ class TaskHandler:
self.condition.wait(5)
self.log.debug("TaskHandler: tick")
except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
self.log.exception("TaskHandler: client blocklisted")
self.module.client_blocklisted.set()
except Exception as ex:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
@ -428,6 +439,9 @@ class TaskHandler:
self.complete_progress(task)
self.remove_task(None, task)
except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
raise
except (rados.Error, rbd.Error) as e:
self.log.error("execute_task: {}".format(e))
task.retry_message = "{}".format(e)

View File

@ -18,7 +18,6 @@ class TrashPurgeScheduleHandler:
lock = Lock()
condition = Condition(lock)
thread = None
def __init__(self, module: Any) -> None:
self.module = module
@ -27,13 +26,22 @@ class TrashPurgeScheduleHandler:
self.init_schedule_queue()
self.stop_thread = False
self.thread = Thread(target=self.run)
self.thread.start()
def shutdown(self) -> None:
self.log.info("TrashPurgeScheduleHandler: shutting down")
self.stop_thread = True
if self.thread.is_alive():
self.log.debug("TrashPurgeScheduleHandler: joining thread")
self.thread.join()
self.log.info("TrashPurgeScheduleHandler: shut down")
def run(self) -> None:
try:
self.log.info("TrashPurgeScheduleHandler: starting")
while True:
while not self.stop_thread:
refresh_delay = self.refresh_pools()
with self.lock:
(ns_spec, wait_time) = self.dequeue()
@ -45,6 +53,9 @@ class TrashPurgeScheduleHandler:
with self.lock:
self.enqueue(datetime.now(), pool_id, namespace)
except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
self.log.exception("TrashPurgeScheduleHandler: client blocklisted")
self.module.client_blocklisted.set()
except Exception as ex:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
@ -54,6 +65,8 @@ class TrashPurgeScheduleHandler:
with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
ioctx.set_namespace(namespace)
rbd.RBD().trash_purge(ioctx, datetime.now())
except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
raise
except Exception as e:
self.log.error("exception when purging {}/{}: {}".format(
pool_id, namespace, e))
@ -114,6 +127,8 @@ class TrashPurgeScheduleHandler:
pool_namespaces += rbd.RBD().namespace_list(ioctx)
except rbd.OperationNotSupported:
self.log.debug("namespaces not supported")
except rbd.ConnectionShutdown:
raise
except Exception as e:
self.log.error("exception when scanning pool {}: {}".format(
pool_name, e))