mirror of
https://github.com/ceph/ceph
synced 2025-02-24 19:47:44 +00:00
mgr/rbd_support: create mirror snapshots asynchronously
To scale up with number of images. Fixes: https://tracker.ceph.com/issues/47827 Signed-off-by: Mykola Golub <mgolub@suse.com>
This commit is contained in:
parent
13ce488d7a
commit
08151a191f
@ -144,8 +144,269 @@ class Watchers:
|
||||
self.unregister(pool_id, namespace)
|
||||
|
||||
|
||||
class CreateSnapshotRequests:
|
||||
|
||||
lock = Lock()
|
||||
condition = Condition(lock)
|
||||
|
||||
def __init__(self, handler):
|
||||
self.handler = handler
|
||||
self.rados = handler.module.rados
|
||||
self.log = handler.log
|
||||
self.pending = set()
|
||||
self.queue = []
|
||||
self.ioctxs = {}
|
||||
|
||||
def __del__(self):
|
||||
self.wait_for_pending()
|
||||
|
||||
def wait_for_pending(self):
|
||||
with self.lock:
|
||||
while self.pending:
|
||||
self.condition.wait()
|
||||
|
||||
def add(self, pool_id, namespace, image_id):
|
||||
image_spec = (pool_id, namespace, image_id)
|
||||
|
||||
self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
|
||||
pool_id, namespace, image_id))
|
||||
|
||||
max_concurrent = self.handler.module.get_localized_module_option(
|
||||
self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE)
|
||||
|
||||
with self.lock:
|
||||
if image_spec in self.pending:
|
||||
self.log.info(
|
||||
"CreateSnapshotRequests.add: {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id,
|
||||
"previous request is still in progress"))
|
||||
return
|
||||
self.pending.add(image_spec)
|
||||
|
||||
if len(self.pending) > max_concurrent:
|
||||
self.queue.append(image_spec)
|
||||
return
|
||||
|
||||
self.open_image(image_spec)
|
||||
|
||||
def open_image(self, image_spec):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
|
||||
pool_id, namespace, image_id))
|
||||
|
||||
try:
|
||||
ioctx = self.get_ioctx(image_spec)
|
||||
|
||||
def cb(comp, image):
|
||||
self.handle_open_image(image_spec, comp, image)
|
||||
|
||||
rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id)
|
||||
except Exception as e:
|
||||
self.log.error(
|
||||
"exception when opening {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, e))
|
||||
self.finish(image_spec)
|
||||
|
||||
def handle_open_image(self, image_spec, comp, image):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug(
|
||||
"CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
|
||||
pool_id, namespace, image_id, comp.get_return_value()))
|
||||
|
||||
if comp.get_return_value() < 0:
|
||||
self.log.error(
|
||||
"error when opening {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, comp.get_return_value()))
|
||||
self.finish(image_spec)
|
||||
return
|
||||
|
||||
self.get_mirror_mode(image_spec, image)
|
||||
|
||||
def get_mirror_mode(self, image_spec, image):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
|
||||
pool_id, namespace, image_id))
|
||||
|
||||
def cb(comp, mode):
|
||||
self.handle_get_mirror_mode(image_spec, image, comp, mode)
|
||||
|
||||
try:
|
||||
image.aio_mirror_image_get_mode(cb)
|
||||
except Exception as e:
|
||||
self.log.error(
|
||||
"exception when getting mirror mode for {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, e))
|
||||
self.close_image(image_spec, image)
|
||||
|
||||
def handle_get_mirror_mode(self, image_spec, image, comp, mode):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug(
|
||||
"CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
|
||||
pool_id, namespace, image_id, comp.get_return_value(), mode))
|
||||
|
||||
if comp.get_return_value() < 0:
|
||||
self.log.error(
|
||||
"error when getting mirror mode for {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, comp.get_return_value()))
|
||||
self.close_image(image_spec, image)
|
||||
return
|
||||
|
||||
if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
|
||||
self.log.debug(
|
||||
"CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id,
|
||||
"snapshot mirroring is not enabled"))
|
||||
self.close_image(image_spec)
|
||||
|
||||
self.get_mirror_info(image_spec, image)
|
||||
|
||||
def get_mirror_info(self, image_spec, image):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
|
||||
pool_id, namespace, image_id))
|
||||
|
||||
def cb(comp, info):
|
||||
self.handle_get_mirror_info(image_spec, image, comp, info)
|
||||
|
||||
try:
|
||||
image.aio_mirror_image_get_info(cb)
|
||||
except Exception as e:
|
||||
self.log.error(
|
||||
"exception when getting mirror info for {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, e))
|
||||
self.close_image(image_spec, image)
|
||||
|
||||
def handle_get_mirror_info(self, image_spec, image, comp, info):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug(
|
||||
"CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
|
||||
pool_id, namespace, image_id, comp.get_return_value(), info))
|
||||
|
||||
if comp.get_return_value() < 0:
|
||||
self.log.error(
|
||||
"error when getting mirror info for {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, comp.get_return_value()))
|
||||
self.close_image(image_spec, image)
|
||||
return
|
||||
|
||||
self.create_snapshot(image_spec, image)
|
||||
|
||||
def create_snapshot(self, image_spec, image):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug(
|
||||
"CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
|
||||
pool_id, namespace, image_id))
|
||||
|
||||
def cb(comp, snap_id):
|
||||
self.handle_create_snapshot(image_spec, image, comp, snap_id)
|
||||
|
||||
try:
|
||||
image.aio_mirror_image_create_snapshot(0, cb)
|
||||
except Exception as e:
|
||||
self.log.error(
|
||||
"exception when creating snapshot for {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, e))
|
||||
self.close_image(image_spec, image)
|
||||
|
||||
|
||||
def handle_create_snapshot(self, image_spec, image, comp, snap_id):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug(
|
||||
"CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
|
||||
pool_id, namespace, image_id, comp.get_return_value(), snap_id))
|
||||
|
||||
if comp.get_return_value() < 0:
|
||||
self.log.error(
|
||||
"error when creating snapshot for {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, comp.get_return_value()))
|
||||
|
||||
self.close_image(image_spec, image)
|
||||
|
||||
def close_image(self, image_spec, image):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug(
|
||||
"CreateSnapshotRequests.close_image {}/{}/{}".format(
|
||||
pool_id, namespace, image_id))
|
||||
|
||||
def cb(comp):
|
||||
self.handle_close_image(image_spec, comp)
|
||||
|
||||
try:
|
||||
image.aio_close(cb)
|
||||
except Exception as e:
|
||||
self.log.error(
|
||||
"exception when closing {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, e))
|
||||
self.finish(image_spec)
|
||||
|
||||
def handle_close_image(self, image_spec, comp):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug(
|
||||
"CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
|
||||
pool_id, namespace, image_id, comp.get_return_value()))
|
||||
|
||||
if comp.get_return_value() < 0:
|
||||
self.log.error(
|
||||
"error when closing {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, comp.get_return_value()))
|
||||
|
||||
self.finish(image_spec)
|
||||
|
||||
def finish(self, image_spec):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
|
||||
self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
|
||||
pool_id, namespace, image_id))
|
||||
|
||||
self.put_ioctx(image_spec)
|
||||
|
||||
with self.lock:
|
||||
self.pending.remove(image_spec)
|
||||
if not self.queue:
|
||||
return
|
||||
image_spec = self.queue.pop(0)
|
||||
|
||||
self.open_image(image_spec)
|
||||
|
||||
def get_ioctx(self, image_spec):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
nspec = (pool_id, namespace)
|
||||
|
||||
with self.lock:
|
||||
ioctx, images = self.ioctxs.get(nspec, (None, None))
|
||||
if not ioctx:
|
||||
ioctx = self.rados.open_ioctx2(int(pool_id))
|
||||
ioctx.set_namespace(namespace)
|
||||
images = set()
|
||||
self.ioctxs[nspec] = (ioctx, images)
|
||||
images.add(image_spec)
|
||||
|
||||
return ioctx
|
||||
|
||||
def put_ioctx(self, image_spec):
|
||||
pool_id, namespace, image_id = image_spec
|
||||
nspec = (pool_id, namespace)
|
||||
|
||||
with self.lock:
|
||||
ioctx, images = self.ioctxs[nspec]
|
||||
images.remove(image_spec)
|
||||
if not images:
|
||||
del self.ioctxs[nspec]
|
||||
|
||||
|
||||
class MirrorSnapshotScheduleHandler:
|
||||
MODULE_OPTION_NAME = "mirror_snapshot_schedule"
|
||||
MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create"
|
||||
SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
|
||||
|
||||
lock = Lock()
|
||||
@ -156,6 +417,7 @@ class MirrorSnapshotScheduleHandler:
|
||||
self.module = module
|
||||
self.log = module.log
|
||||
self.last_refresh_images = datetime(1970, 1, 1)
|
||||
self.create_snapshot_requests = CreateSnapshotRequests(self)
|
||||
|
||||
self.init_schedule_queue()
|
||||
|
||||
@ -164,6 +426,7 @@ class MirrorSnapshotScheduleHandler:
|
||||
|
||||
def _cleanup(self):
|
||||
self.watchers.unregister_all()
|
||||
self.create_snapshot_requests.wait_for_pending()
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
@ -176,7 +439,7 @@ class MirrorSnapshotScheduleHandler:
|
||||
self.condition.wait(min(wait_time, 60))
|
||||
continue
|
||||
pool_id, namespace, image_id = image_spec
|
||||
self.create_snapshot(pool_id, namespace, image_id)
|
||||
self.create_snapshot_requests.add(pool_id, namespace, image_id)
|
||||
with self.lock:
|
||||
self.enqueue(datetime.now(), pool_id, namespace, image_id)
|
||||
|
||||
@ -184,29 +447,6 @@ class MirrorSnapshotScheduleHandler:
|
||||
self.log.fatal("Fatal runtime error: {}\n{}".format(
|
||||
ex, traceback.format_exc()))
|
||||
|
||||
def create_snapshot(self, pool_id, namespace, image_id):
|
||||
try:
|
||||
with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
|
||||
ioctx.set_namespace(namespace)
|
||||
with rbd.Image(ioctx, image_id=image_id) as image:
|
||||
mode = image.mirror_image_get_mode()
|
||||
if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
|
||||
return
|
||||
info = image.mirror_image_get_info()
|
||||
if info['state'] != rbd.RBD_MIRROR_IMAGE_ENABLED or \
|
||||
not info['primary']:
|
||||
return
|
||||
snap_id = image.mirror_image_create_snapshot()
|
||||
self.log.debug(
|
||||
"create_snapshot: {}/{}/{}: snap_id={}".format(
|
||||
ioctx.get_pool_name(), namespace, image.get_name(),
|
||||
snap_id))
|
||||
except Exception as e:
|
||||
self.log.error(
|
||||
"exception when creating snapshot for {}/{}/{}: {}".format(
|
||||
pool_id, namespace, image_id, e))
|
||||
|
||||
|
||||
def init_schedule_queue(self):
|
||||
self.queue = {}
|
||||
self.images = {}
|
||||
|
@ -145,6 +145,7 @@ class Module(MgrModule):
|
||||
]
|
||||
MODULE_OPTIONS = [
|
||||
{'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME},
|
||||
{'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE, 'type': 'int', 'default': 10},
|
||||
{'name': TrashPurgeScheduleHandler.MODULE_OPTION_NAME},
|
||||
]
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user