From 08151a191fd41427a751d966d523cacead5fe8fe Mon Sep 17 00:00:00 2001 From: Mykola Golub Date: Wed, 21 Oct 2020 18:07:16 +0100 Subject: [PATCH] mgr/rbd_support: create mirror snapshots asynchronously To scale up with number of images. Fixes: https://tracker.ceph.com/issues/47827 Signed-off-by: Mykola Golub --- .../rbd_support/mirror_snapshot_schedule.py | 288 ++++++++++++++++-- src/pybind/mgr/rbd_support/module.py | 1 + 2 files changed, 265 insertions(+), 24 deletions(-) diff --git a/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py index e9cde79b6e2..4223894b026 100644 --- a/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py +++ b/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py @@ -144,8 +144,269 @@ class Watchers: self.unregister(pool_id, namespace) +class CreateSnapshotRequests: + + lock = Lock() + condition = Condition(lock) + + def __init__(self, handler): + self.handler = handler + self.rados = handler.module.rados + self.log = handler.log + self.pending = set() + self.queue = [] + self.ioctxs = {} + + def __del__(self): + self.wait_for_pending() + + def wait_for_pending(self): + with self.lock: + while self.pending: + self.condition.wait() + + def add(self, pool_id, namespace, image_id): + image_spec = (pool_id, namespace, image_id) + + self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format( + pool_id, namespace, image_id)) + + max_concurrent = self.handler.module.get_localized_module_option( + self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE) + + with self.lock: + if image_spec in self.pending: + self.log.info( + "CreateSnapshotRequests.add: {}/{}/{}: {}".format( + pool_id, namespace, image_id, + "previous request is still in progress")) + return + self.pending.add(image_spec) + + if len(self.pending) > max_concurrent: + self.queue.append(image_spec) + return + + self.open_image(image_spec) + + def open_image(self, image_spec): + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format( + pool_id, namespace, image_id)) + + try: + ioctx = self.get_ioctx(image_spec) + + def cb(comp, image): + self.handle_open_image(image_spec, comp, image) + + rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id) + except Exception as e: + self.log.error( + "exception when opening {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.finish(image_spec) + + def handle_open_image(self, image_spec, comp, image): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + if comp.get_return_value() < 0: + self.log.error( + "error when opening {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + self.finish(image_spec) + return + + self.get_mirror_mode(image_spec, image) + + def get_mirror_mode(self, image_spec, image): + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp, mode): + self.handle_get_mirror_mode(image_spec, image, comp, mode) + + try: + image.aio_mirror_image_get_mode(cb) + except Exception as e: + self.log.error( + "exception when getting mirror mode for {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.close_image(image_spec, image) + + def handle_get_mirror_mode(self, image_spec, image, comp, mode): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format( + pool_id, namespace, image_id, comp.get_return_value(), mode)) + + if comp.get_return_value() < 0: + self.log.error( + "error when getting mirror mode for {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + self.close_image(image_spec, image) + return + + if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format( + pool_id, namespace, image_id, + "snapshot mirroring is not enabled")) + self.close_image(image_spec) + + self.get_mirror_info(image_spec, image) + + def get_mirror_info(self, image_spec, image): + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp, info): + self.handle_get_mirror_info(image_spec, image, comp, info) + + try: + image.aio_mirror_image_get_info(cb) + except Exception as e: + self.log.error( + "exception when getting mirror info for {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.close_image(image_spec, image) + + def handle_get_mirror_info(self, image_spec, image, comp, info): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format( + pool_id, namespace, image_id, comp.get_return_value(), info)) + + if comp.get_return_value() < 0: + self.log.error( + "error when getting mirror info for {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + self.close_image(image_spec, image) + return + + self.create_snapshot(image_spec, image) + + def create_snapshot(self, image_spec, image): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp, snap_id): + self.handle_create_snapshot(image_spec, image, comp, snap_id) + + try: + image.aio_mirror_image_create_snapshot(0, cb) + except Exception as e: + self.log.error( + "exception when creating snapshot for {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.close_image(image_spec, image) + + + def handle_create_snapshot(self, image_spec, image, comp, snap_id): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format( + pool_id, namespace, image_id, comp.get_return_value(), snap_id)) + + if comp.get_return_value() < 0: + self.log.error( + "error when creating snapshot for {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + self.close_image(image_spec, image) + + def close_image(self, image_spec, image): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.close_image {}/{}/{}".format( + pool_id, namespace, image_id)) + + def cb(comp): + self.handle_close_image(image_spec, comp) + + try: + image.aio_close(cb) + except Exception as e: + self.log.error( + "exception when closing {}/{}/{}: {}".format( + pool_id, namespace, image_id, e)) + self.finish(image_spec) + + def handle_close_image(self, image_spec, comp): + pool_id, namespace, image_id = image_spec + + self.log.debug( + "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + if comp.get_return_value() < 0: + self.log.error( + "error when closing {}/{}/{}: {}".format( + pool_id, namespace, image_id, comp.get_return_value())) + + self.finish(image_spec) + + def finish(self, image_spec): + pool_id, namespace, image_id = image_spec + + self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format( + pool_id, namespace, image_id)) + + self.put_ioctx(image_spec) + + with self.lock: + self.pending.remove(image_spec) + if not self.queue: + return + image_spec = self.queue.pop(0) + + self.open_image(image_spec) + + def get_ioctx(self, image_spec): + pool_id, namespace, image_id = image_spec + nspec = (pool_id, namespace) + + with self.lock: + ioctx, images = self.ioctxs.get(nspec, (None, None)) + if not ioctx: + ioctx = self.rados.open_ioctx2(int(pool_id)) + ioctx.set_namespace(namespace) + images = set() + self.ioctxs[nspec] = (ioctx, images) + images.add(image_spec) + + return ioctx + + def put_ioctx(self, image_spec): + pool_id, namespace, image_id = image_spec + nspec = (pool_id, namespace) + + with self.lock: + ioctx, images = self.ioctxs[nspec] + images.remove(image_spec) + if not images: + del self.ioctxs[nspec] + + class MirrorSnapshotScheduleHandler: MODULE_OPTION_NAME = "mirror_snapshot_schedule" + MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create" SCHEDULE_OID = "rbd_mirror_snapshot_schedule" lock = Lock() @@ -156,6 +417,7 @@ class MirrorSnapshotScheduleHandler: self.module = module self.log = module.log self.last_refresh_images = datetime(1970, 1, 1) + self.create_snapshot_requests = CreateSnapshotRequests(self) self.init_schedule_queue() @@ -164,6 +426,7 @@ class MirrorSnapshotScheduleHandler: def _cleanup(self): self.watchers.unregister_all() + self.create_snapshot_requests.wait_for_pending() def run(self): try: @@ -176,7 +439,7 @@ class MirrorSnapshotScheduleHandler: self.condition.wait(min(wait_time, 60)) continue pool_id, namespace, image_id = image_spec - self.create_snapshot(pool_id, namespace, image_id) + self.create_snapshot_requests.add(pool_id, namespace, image_id) with self.lock: self.enqueue(datetime.now(), pool_id, namespace, image_id) @@ -184,29 +447,6 @@ class MirrorSnapshotScheduleHandler: self.log.fatal("Fatal runtime error: {}\n{}".format( ex, traceback.format_exc())) - def create_snapshot(self, pool_id, namespace, image_id): - try: - with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: - ioctx.set_namespace(namespace) - with rbd.Image(ioctx, image_id=image_id) as image: - mode = image.mirror_image_get_mode() - if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: - return - info = image.mirror_image_get_info() - if info['state'] != rbd.RBD_MIRROR_IMAGE_ENABLED or \ - not info['primary']: - return - snap_id = image.mirror_image_create_snapshot() - self.log.debug( - "create_snapshot: {}/{}/{}: snap_id={}".format( - ioctx.get_pool_name(), namespace, image.get_name(), - snap_id)) - except Exception as e: - self.log.error( - "exception when creating snapshot for {}/{}/{}: {}".format( - pool_id, namespace, image_id, e)) - - def init_schedule_queue(self): self.queue = {} self.images = {} diff --git a/src/pybind/mgr/rbd_support/module.py b/src/pybind/mgr/rbd_support/module.py index 6062ceffa78..82bd06e6238 100644 --- a/src/pybind/mgr/rbd_support/module.py +++ b/src/pybind/mgr/rbd_support/module.py @@ -145,6 +145,7 @@ class Module(MgrModule): ] MODULE_OPTIONS = [ {'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME}, + {'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE, 'type': 'int', 'default': 10}, {'name': TrashPurgeScheduleHandler.MODULE_OPTION_NAME}, ]