Merge pull request #37789 from trociny/wip-47827

mgr/rbd_support: create mirror snapshots asynchronously

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2020-10-30 09:40:10 -04:00 committed by GitHub
commit 0cd14ccff2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 792 additions and 52 deletions

View File

@ -1259,6 +1259,9 @@ CEPH_RBD_API int rbd_aio_mirror_image_get_info(rbd_image_t image,
rbd_mirror_image_info_t *mirror_image_info,
size_t info_size,
rbd_completion_t c);
CEPH_RBD_API int rbd_aio_mirror_image_get_mode(rbd_image_t image,
rbd_mirror_image_mode_t *mode,
rbd_completion_t c);
CEPH_RBD_API int rbd_aio_mirror_image_get_global_status(
rbd_image_t image,
@ -1269,6 +1272,11 @@ CEPH_RBD_API int rbd_aio_mirror_image_get_status(
size_t status_size, rbd_completion_t c)
CEPH_RBD_DEPRECATED;
CEPH_RBD_API int rbd_aio_mirror_image_create_snapshot(rbd_image_t image,
uint32_t flags,
uint64_t *snap_id,
rbd_completion_t c);
// RBD groups support functions
CEPH_RBD_API int rbd_group_create(rados_ioctx_t p, const char *name);
CEPH_RBD_API int rbd_group_remove(rados_ioctx_t p, const char *name);

View File

@ -786,6 +786,8 @@ public:
mirror_image_status_t *mirror_image_status, size_t status_size,
RBD::AioCompletion *c)
CEPH_RBD_DEPRECATED;
int aio_mirror_image_create_snapshot(uint32_t flags, uint64_t *snap_id,
RBD::AioCompletion *c);
int update_watch(UpdateWatchCtx *ctx, uint64_t *handle);
int update_unwatch(uint64_t handle);

View File

@ -354,6 +354,43 @@ struct C_ImageGetGlobalStatus : public C_ImageGetInfo {
}
};
template <typename I>
struct C_ImageSnapshotCreate : public Context {
I *ictx;
uint64_t snap_create_flags;
uint64_t *snap_id;
Context *on_finish;
cls::rbd::MirrorImage mirror_image;
mirror::PromotionState promotion_state;
std::string primary_mirror_uuid;
C_ImageSnapshotCreate(I *ictx, uint64_t snap_create_flags, uint64_t *snap_id,
Context *on_finish)
: ictx(ictx), snap_create_flags(snap_create_flags), snap_id(snap_id),
on_finish(on_finish) {
}
void finish(int r) override {
if (r < 0 && r != -ENOENT) {
on_finish->complete(r);
return;
}
if (mirror_image.mode != cls::rbd::MIRROR_IMAGE_MODE_SNAPSHOT ||
mirror_image.state != cls::rbd::MIRROR_IMAGE_STATE_ENABLED) {
lderr(ictx->cct) << "snapshot based mirroring is not enabled" << dendl;
on_finish->complete(-EINVAL);
return;
}
auto req = mirror::snapshot::CreatePrimaryRequest<I>::create(
ictx, mirror_image.global_image_id, CEPH_NOSNAP, snap_create_flags, 0U,
snap_id, on_finish);
req->send();
}
};
} // anonymous namespace
template <typename I>
@ -1978,6 +2015,15 @@ int Mirror<I>::image_info_list(
template <typename I>
int Mirror<I>::image_snapshot_create(I *ictx, uint32_t flags,
uint64_t *snap_id) {
C_SaferCond ctx;
Mirror<I>::image_snapshot_create(ictx, flags, snap_id, &ctx);
return ctx.wait();
}
template <typename I>
void Mirror<I>::image_snapshot_create(I *ictx, uint32_t flags,
uint64_t *snap_id, Context *on_finish) {
CephContext *cct = ictx->cct;
ldout(cct, 20) << "ictx=" << ictx << dendl;
@ -1985,36 +2031,32 @@ int Mirror<I>::image_snapshot_create(I *ictx, uint32_t flags,
int r = util::snap_create_flags_api_to_internal(cct, flags,
&snap_create_flags);
if (r < 0) {
return r;
on_finish->complete(r);
return;
}
r = ictx->state->refresh_if_required();
if (r < 0) {
return r;
}
auto on_refresh = new LambdaContext(
[ictx, snap_create_flags, snap_id, on_finish](int r) {
if (r < 0) {
lderr(ictx->cct) << "refresh failed: " << cpp_strerror(r) << dendl;
on_finish->complete(r);
return;
}
cls::rbd::MirrorImage mirror_image;
r = cls_client::mirror_image_get(&ictx->md_ctx, ictx->id,
&mirror_image);
if (r == -ENOENT) {
return -EINVAL;
} else if (r < 0) {
lderr(cct) << "failed to retrieve mirror image" << dendl;
return r;
}
auto ctx = new C_ImageSnapshotCreate<I>(ictx, snap_create_flags, snap_id,
on_finish);
auto req = mirror::GetInfoRequest<I>::create(*ictx, &ctx->mirror_image,
&ctx->promotion_state,
&ctx->primary_mirror_uuid,
ctx);
req->send();
});
if (mirror_image.mode != cls::rbd::MIRROR_IMAGE_MODE_SNAPSHOT ||
mirror_image.state != cls::rbd::MIRROR_IMAGE_STATE_ENABLED) {
lderr(cct) << "snapshot based mirroring is not enabled" << dendl;
return -EINVAL;
if (ictx->state->is_refresh_required()) {
ictx->state->refresh(on_refresh);
} else {
on_refresh->complete(0);
}
C_SaferCond on_finish;
auto req = mirror::snapshot::CreatePrimaryRequest<I>::create(
ictx, mirror_image.global_image_id, CEPH_NOSNAP, snap_create_flags, 0U,
snap_id, &on_finish);
req->send();
return on_finish.wait();
}
} // namespace api

View File

@ -114,6 +114,8 @@ struct Mirror {
static int image_snapshot_create(ImageCtxT *ictx, uint32_t flags,
uint64_t *snap_id);
static void image_snapshot_create(ImageCtxT *ictx, uint32_t flags,
uint64_t *snap_id, Context *on_finish);
};
} // namespace api

View File

@ -3029,6 +3029,17 @@ namespace librbd {
#pragma GCC diagnostic pop
int Image::aio_mirror_image_create_snapshot(uint32_t flags, uint64_t *snap_id,
RBD::AioCompletion *c) {
ImageCtx *ictx = (ImageCtx *)ctx;
librbd::api::Mirror<>::image_snapshot_create(
ictx, flags, snap_id, new C_AioCompletion(ictx,
librbd::io::AIO_TYPE_GENERIC,
get_aio_completion(c)));
return 0;
}
int Image::update_watch(UpdateWatchCtx *wctx, uint64_t *handle) {
ImageCtx *ictx = (ImageCtx *)ctx;
tracepoint(librbd, update_watch_enter, ictx, wctx);
@ -6610,6 +6621,20 @@ extern "C" int rbd_aio_mirror_image_get_status(
#pragma GCC diagnostic pop
extern "C" int rbd_aio_mirror_image_create_snapshot(rbd_image_t image,
uint32_t flags,
uint64_t *snap_id,
rbd_completion_t c) {
librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
librbd::api::Mirror<>::image_snapshot_create(
ictx, flags, snap_id, new C_AioCompletion(ictx,
librbd::io::AIO_TYPE_GENERIC,
get_aio_completion(comp)));
return 0;
}
extern "C" int rbd_update_watch(rbd_image_t image, uint64_t *handle,
rbd_update_callback_t watch_cb, void *arg)
{

View File

@ -144,8 +144,269 @@ class Watchers:
self.unregister(pool_id, namespace)
class CreateSnapshotRequests:
lock = Lock()
condition = Condition(lock)
def __init__(self, handler):
self.handler = handler
self.rados = handler.module.rados
self.log = handler.log
self.pending = set()
self.queue = []
self.ioctxs = {}
def __del__(self):
self.wait_for_pending()
def wait_for_pending(self):
with self.lock:
while self.pending:
self.condition.wait()
def add(self, pool_id, namespace, image_id):
image_spec = (pool_id, namespace, image_id)
self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
pool_id, namespace, image_id))
max_concurrent = self.handler.module.get_localized_module_option(
self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE)
with self.lock:
if image_spec in self.pending:
self.log.info(
"CreateSnapshotRequests.add: {}/{}/{}: {}".format(
pool_id, namespace, image_id,
"previous request is still in progress"))
return
self.pending.add(image_spec)
if len(self.pending) > max_concurrent:
self.queue.append(image_spec)
return
self.open_image(image_spec)
def open_image(self, image_spec):
pool_id, namespace, image_id = image_spec
self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
pool_id, namespace, image_id))
try:
ioctx = self.get_ioctx(image_spec)
def cb(comp, image):
self.handle_open_image(image_spec, comp, image)
rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id)
except Exception as e:
self.log.error(
"exception when opening {}/{}/{}: {}".format(
pool_id, namespace, image_id, e))
self.finish(image_spec)
def handle_open_image(self, image_spec, comp, image):
pool_id, namespace, image_id = image_spec
self.log.debug(
"CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
pool_id, namespace, image_id, comp.get_return_value()))
if comp.get_return_value() < 0:
self.log.error(
"error when opening {}/{}/{}: {}".format(
pool_id, namespace, image_id, comp.get_return_value()))
self.finish(image_spec)
return
self.get_mirror_mode(image_spec, image)
def get_mirror_mode(self, image_spec, image):
pool_id, namespace, image_id = image_spec
self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
pool_id, namespace, image_id))
def cb(comp, mode):
self.handle_get_mirror_mode(image_spec, image, comp, mode)
try:
image.aio_mirror_image_get_mode(cb)
except Exception as e:
self.log.error(
"exception when getting mirror mode for {}/{}/{}: {}".format(
pool_id, namespace, image_id, e))
self.close_image(image_spec, image)
def handle_get_mirror_mode(self, image_spec, image, comp, mode):
pool_id, namespace, image_id = image_spec
self.log.debug(
"CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
pool_id, namespace, image_id, comp.get_return_value(), mode))
if comp.get_return_value() < 0:
self.log.error(
"error when getting mirror mode for {}/{}/{}: {}".format(
pool_id, namespace, image_id, comp.get_return_value()))
self.close_image(image_spec, image)
return
if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
self.log.debug(
"CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
pool_id, namespace, image_id,
"snapshot mirroring is not enabled"))
self.close_image(image_spec)
self.get_mirror_info(image_spec, image)
def get_mirror_info(self, image_spec, image):
pool_id, namespace, image_id = image_spec
self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
pool_id, namespace, image_id))
def cb(comp, info):
self.handle_get_mirror_info(image_spec, image, comp, info)
try:
image.aio_mirror_image_get_info(cb)
except Exception as e:
self.log.error(
"exception when getting mirror info for {}/{}/{}: {}".format(
pool_id, namespace, image_id, e))
self.close_image(image_spec, image)
def handle_get_mirror_info(self, image_spec, image, comp, info):
pool_id, namespace, image_id = image_spec
self.log.debug(
"CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
pool_id, namespace, image_id, comp.get_return_value(), info))
if comp.get_return_value() < 0:
self.log.error(
"error when getting mirror info for {}/{}/{}: {}".format(
pool_id, namespace, image_id, comp.get_return_value()))
self.close_image(image_spec, image)
return
self.create_snapshot(image_spec, image)
def create_snapshot(self, image_spec, image):
pool_id, namespace, image_id = image_spec
self.log.debug(
"CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
pool_id, namespace, image_id))
def cb(comp, snap_id):
self.handle_create_snapshot(image_spec, image, comp, snap_id)
try:
image.aio_mirror_image_create_snapshot(0, cb)
except Exception as e:
self.log.error(
"exception when creating snapshot for {}/{}/{}: {}".format(
pool_id, namespace, image_id, e))
self.close_image(image_spec, image)
def handle_create_snapshot(self, image_spec, image, comp, snap_id):
pool_id, namespace, image_id = image_spec
self.log.debug(
"CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
pool_id, namespace, image_id, comp.get_return_value(), snap_id))
if comp.get_return_value() < 0:
self.log.error(
"error when creating snapshot for {}/{}/{}: {}".format(
pool_id, namespace, image_id, comp.get_return_value()))
self.close_image(image_spec, image)
def close_image(self, image_spec, image):
pool_id, namespace, image_id = image_spec
self.log.debug(
"CreateSnapshotRequests.close_image {}/{}/{}".format(
pool_id, namespace, image_id))
def cb(comp):
self.handle_close_image(image_spec, comp)
try:
image.aio_close(cb)
except Exception as e:
self.log.error(
"exception when closing {}/{}/{}: {}".format(
pool_id, namespace, image_id, e))
self.finish(image_spec)
def handle_close_image(self, image_spec, comp):
pool_id, namespace, image_id = image_spec
self.log.debug(
"CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
pool_id, namespace, image_id, comp.get_return_value()))
if comp.get_return_value() < 0:
self.log.error(
"error when closing {}/{}/{}: {}".format(
pool_id, namespace, image_id, comp.get_return_value()))
self.finish(image_spec)
def finish(self, image_spec):
pool_id, namespace, image_id = image_spec
self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
pool_id, namespace, image_id))
self.put_ioctx(image_spec)
with self.lock:
self.pending.remove(image_spec)
if not self.queue:
return
image_spec = self.queue.pop(0)
self.open_image(image_spec)
def get_ioctx(self, image_spec):
pool_id, namespace, image_id = image_spec
nspec = (pool_id, namespace)
with self.lock:
ioctx, images = self.ioctxs.get(nspec, (None, None))
if not ioctx:
ioctx = self.rados.open_ioctx2(int(pool_id))
ioctx.set_namespace(namespace)
images = set()
self.ioctxs[nspec] = (ioctx, images)
images.add(image_spec)
return ioctx
def put_ioctx(self, image_spec):
pool_id, namespace, image_id = image_spec
nspec = (pool_id, namespace)
with self.lock:
ioctx, images = self.ioctxs[nspec]
images.remove(image_spec)
if not images:
del self.ioctxs[nspec]
class MirrorSnapshotScheduleHandler:
MODULE_OPTION_NAME = "mirror_snapshot_schedule"
MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create"
SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
lock = Lock()
@ -156,6 +417,7 @@ class MirrorSnapshotScheduleHandler:
self.module = module
self.log = module.log
self.last_refresh_images = datetime(1970, 1, 1)
self.create_snapshot_requests = CreateSnapshotRequests(self)
self.init_schedule_queue()
@ -164,6 +426,7 @@ class MirrorSnapshotScheduleHandler:
def _cleanup(self):
self.watchers.unregister_all()
self.create_snapshot_requests.wait_for_pending()
def run(self):
try:
@ -176,7 +439,7 @@ class MirrorSnapshotScheduleHandler:
self.condition.wait(min(wait_time, 60))
continue
pool_id, namespace, image_id = image_spec
self.create_snapshot(pool_id, namespace, image_id)
self.create_snapshot_requests.add(pool_id, namespace, image_id)
with self.lock:
self.enqueue(datetime.now(), pool_id, namespace, image_id)
@ -184,29 +447,6 @@ class MirrorSnapshotScheduleHandler:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
def create_snapshot(self, pool_id, namespace, image_id):
try:
with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
ioctx.set_namespace(namespace)
with rbd.Image(ioctx, image_id=image_id) as image:
mode = image.mirror_image_get_mode()
if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
return
info = image.mirror_image_get_info()
if info['state'] != rbd.RBD_MIRROR_IMAGE_ENABLED or \
not info['primary']:
return
snap_id = image.mirror_image_create_snapshot()
self.log.debug(
"create_snapshot: {}/{}/{}: snap_id={}".format(
ioctx.get_pool_name(), namespace, image.get_name(),
snap_id))
except Exception as e:
self.log.error(
"exception when creating snapshot for {}/{}/{}: {}".format(
pool_id, namespace, image_id, e))
def init_schedule_queue(self):
self.queue = {}
self.images = {}

View File

@ -145,6 +145,7 @@ class Module(MgrModule):
]
MODULE_OPTIONS = [
{'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME},
{'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE, 'type': 'int', 'default': 10},
{'name': TrashPurgeScheduleHandler.MODULE_OPTION_NAME},
]

View File

@ -472,9 +472,20 @@ cdef extern from "rbd/librbd.h" nogil:
rbd_image_t *image, const char *snap_name)
int rbd_open_by_id_read_only(rados_ioctx_t io, const char *image_id,
rbd_image_t *image, const char *snap_name)
int rbd_aio_open(rados_ioctx_t io, const char *name, rbd_image_t *image,
const char *snap_name, rbd_completion_t c)
int rbd_aio_open_by_id(rados_ioctx_t io, const char *id, rbd_image_t *image,
const char *snap_name, rbd_completion_t c)
int rbd_aio_open_read_only(rados_ioctx_t io, const char *name,
rbd_image_t *image, const char *snap_name,
rbd_completion_t c)
int rbd_aio_open_by_id_read_only(rados_ioctx_t io, const char *id,
rbd_image_t *image, const char *snap_name,
rbd_completion_t c)
int rbd_features_to_string(uint64_t features, char *str_features, size_t *size)
int rbd_features_from_string(const char *str_features, uint64_t *features)
int rbd_close(rbd_image_t image)
int rbd_aio_close(rbd_image_t image, rbd_completion_t c)
int rbd_resize2(rbd_image_t image, uint64_t size, bint allow_shrink,
librbd_progress_fn_t cb, void *cbdata)
int rbd_stat(rbd_image_t image, rbd_image_info_t *info, size_t infosize)
@ -613,13 +624,22 @@ cdef extern from "rbd/librbd.h" nogil:
int rbd_mirror_image_resync(rbd_image_t image)
int rbd_mirror_image_create_snapshot2(rbd_image_t image, uint32_t flags,
uint64_t *snap_id)
int rbd_aio_mirror_image_create_snapshot(rbd_image_t image, uint32_t flags,
uint64_t *snap_id,
rbd_completion_t c)
int rbd_mirror_image_get_info(rbd_image_t image,
rbd_mirror_image_info_t *mirror_image_info,
size_t info_size)
void rbd_mirror_image_get_info_cleanup(
rbd_mirror_image_info_t *mirror_image_info)
int rbd_aio_mirror_image_get_info(
rbd_image_t image, rbd_mirror_image_info_t *mirror_image_info,
size_t info_size, rbd_completion_t c)
int rbd_mirror_image_get_mode(rbd_image_t image,
rbd_mirror_image_mode_t *mode)
int rbd_aio_mirror_image_get_mode(rbd_image_t image,
rbd_mirror_image_mode_t *mode,
rbd_completion_t c)
int rbd_mirror_image_get_global_status(
rbd_image_t image,
rbd_mirror_image_global_status_t *mirror_image_global_status,
@ -2689,6 +2709,45 @@ class RBD(object):
raise make_ex(ret, 'error getting features bitmask from str')
return features
def aio_open_image(self, oncomplete, ioctx, name=None, snapshot=None,
read_only=False, image_id=None):
"""
Asynchronously open the image at the given snapshot.
Specify either name or id, otherwise :class:`InvalidArgument` is raised.
oncomplete will be called with the created Image object as
well as the completion:
oncomplete(completion, image)
If a snapshot is specified, the image will be read-only, unless
:func:`Image.set_snap` is called later.
If read-only mode is used, metadata for the :class:`Image`
object (such as which snapshots exist) may become obsolete. See
the C api for more details.
To clean up from opening the image, :func:`Image.close` or
:func:`Image.aio_close` should be called.
:param oncomplete: what to do when open is complete
:type oncomplete: completion
:param ioctx: determines which RADOS pool the image is in
:type ioctx: :class:`rados.Ioctx`
:param name: the name of the image
:type name: str
:param snapshot: which snapshot to read from
:type snaphshot: str
:param read_only: whether to open the image in read-only mode
:type read_only: bool
:param image_id: the id of the image
:type image_id: str
:returns: :class:`Completion` - the completion object
"""
image = Image(ioctx, name, snapshot, read_only, image_id, oncomplete)
comp, image._open_completion = image._open_completion, None
return comp
cdef class MirrorPeerIterator(object):
"""
@ -3331,9 +3390,10 @@ cdef class Image(object):
cdef object name
cdef object ioctx
cdef rados_ioctx_t _ioctx
cdef Completion _open_completion
def __init__(self, ioctx, name=None, snapshot=None,
read_only=False, image_id=None):
read_only=False, image_id=None, _oncomplete=None):
"""
Open the image at the given snapshot.
Specify either name or id, otherwise :class:`InvalidArgument` is raised.
@ -3379,6 +3439,51 @@ cdef class Image(object):
char *_name = opt_str(name)
char *_image_id = opt_str(image_id)
char *_snapshot = opt_str(snapshot)
cdef Completion completion
if _oncomplete:
def oncomplete(completion_v):
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
if return_value == 0:
self.closed = False
if name is None:
self.name = self.get_name()
return _oncomplete(_completion_v, self)
completion = self.__get_completion(oncomplete)
try:
completion.__persist()
if read_only:
with nogil:
if name is not None:
ret = rbd_aio_open_read_only(
_ioctx, _name, &self.image, _snapshot,
completion.rbd_comp)
else:
ret = rbd_aio_open_by_id_read_only(
_ioctx, _image_id, &self.image, _snapshot,
completion.rbd_comp)
else:
with nogil:
if name is not None:
ret = rbd_aio_open(
_ioctx, _name, &self.image, _snapshot,
completion.rbd_comp)
else:
ret = rbd_aio_open_by_id(
_ioctx, _image_id, &self.image, _snapshot,
completion.rbd_comp)
if ret != 0:
raise make_ex(ret, 'error opening image %s at snapshot %s' %
(self.name, snapshot))
except:
completion.__unpersist()
raise
self._open_completion = completion
return
if read_only:
with nogil:
if name is not None:
@ -3455,6 +3560,31 @@ cdef class Image(object):
raise make_ex(ret, 'error while closing image %s' % (
self.name,))
@requires_not_closed
def aio_close(self, oncomplete):
"""
Asynchronously close the image.
After this is called, this object should not be used.
:param oncomplete: what to do when close is complete
:type oncomplete: completion
:returns: :class:`Completion` - the completion object
"""
cdef Completion completion = self.__get_completion(oncomplete)
self.closed = True
try:
completion.__persist()
with nogil:
ret = rbd_aio_close(self.image, completion.rbd_comp)
if ret < 0:
raise make_ex(ret, 'error while closing image %s' %
self.name)
except:
completion.__unpersist()
raise
return completion
def __dealloc__(self):
self.close()
@ -4871,8 +5001,8 @@ written." % (self.name, ret, length))
"""
Create mirror snapshot.
:param force: ignore mirror snapshot limit
:type force: bool
:param flags: create snapshot flags
:type flags: int
:returns: int - the snapshot Id
"""
cdef:
@ -4886,6 +5016,54 @@ written." % (self.name, ret, length))
self.name)
return snap_id
@requires_not_closed
def aio_mirror_image_create_snapshot(self, flags, oncomplete):
"""
Asynchronously create mirror snapshot.
Raises :class:`InvalidArgument` if the image is not in mirror
snapshot mode.
oncomplete will be called with the created snap ID as
well as the completion:
oncomplete(completion, snap_id)
:param flags: create snapshot flags
:type flags: int
:param oncomplete: what to do when the read is complete
:type oncomplete: completion
:returns: :class:`Completion` - the completion object
:raises: :class:`InvalidArgument`
"""
cdef:
uint32_t _flags = flags
Completion completion
def oncomplete_(completion_v):
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
snap_id = <object>(<uint64_t *>_completion_v.buf)[0] \
if return_value >= 0 else None
return oncomplete(_completion_v, snap_id)
completion = self.__get_completion(oncomplete_)
completion.buf = PyBytes_FromStringAndSize(NULL, sizeof(uint64_t))
try:
completion.__persist()
with nogil:
ret = rbd_aio_mirror_image_create_snapshot(self.image, _flags,
<uint64_t *>completion.buf,
completion.rbd_comp)
if ret < 0:
raise make_ex(ret, 'error creating mirror snapshot for image %s' %
self.name)
except:
completion.__unpersist()
raise
return completion
@requires_not_closed
def mirror_image_get_info(self):
"""
@ -4912,6 +5090,53 @@ written." % (self.name, ret, length))
rbd_mirror_image_get_info_cleanup(&c_info)
return info
@requires_not_closed
def aio_mirror_image_get_info(self, oncomplete):
"""
Asynchronously get mirror info for the image.
oncomplete will be called with the returned info as
well as the completion:
oncomplete(completion, info)
:param oncomplete: what to do when get info is complete
:type oncomplete: completion
:returns: :class:`Completion` - the completion object
"""
cdef:
Completion completion
def oncomplete_(completion_v):
cdef:
Completion _completion_v = completion_v
rbd_mirror_image_info_t *c_info = <rbd_mirror_image_info_t *>_completion_v.buf
info = {
'global_id' : decode_cstr(c_info[0].global_id),
'state' : int(c_info[0].state),
'primary' : c_info[0].primary,
}
rbd_mirror_image_get_info_cleanup(c_info)
return oncomplete(_completion_v, info)
completion = self.__get_completion(oncomplete_)
completion.buf = PyBytes_FromStringAndSize(
NULL, sizeof(rbd_mirror_image_info_t))
try:
completion.__persist()
with nogil:
ret = rbd_aio_mirror_image_get_info(
self.image, <rbd_mirror_image_info_t *>completion.buf,
sizeof(rbd_mirror_image_info_t), completion.rbd_comp)
if ret != 0:
raise make_ex(
ret, 'error getting mirror info for image %s' % self.name)
except:
completion.__unpersist()
raise
return completion
@requires_not_closed
def mirror_image_get_mode(self):
"""
@ -4926,6 +5151,48 @@ written." % (self.name, ret, length))
raise make_ex(ret, 'error getting mirror mode for image %s' % self.name)
return int(c_mode)
@requires_not_closed
def aio_mirror_image_get_mode(self, oncomplete):
"""
Asynchronously get mirror mode for the image.
oncomplete will be called with the returned mode as
well as the completion:
oncomplete(completion, mode)
:param oncomplete: what to do when get info is complete
:type oncomplete: completion
:returns: :class:`Completion` - the completion object
"""
cdef:
Completion completion
def oncomplete_(completion_v):
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
mode = int((<rbd_mirror_image_mode_t *>_completion_v.buf)[0]) \
if return_value >= 0 else None
return oncomplete(_completion_v, mode)
completion = self.__get_completion(oncomplete_)
completion.buf = PyBytes_FromStringAndSize(
NULL, sizeof(rbd_mirror_image_mode_t))
try:
completion.__persist()
with nogil:
ret = rbd_aio_mirror_image_get_mode(
self.image, <rbd_mirror_image_mode_t *>completion.buf,
completion.rbd_comp)
if ret != 0:
raise make_ex(
ret, 'error getting mirror mode for image %s' % self.name)
except:
completion.__unpersist()
raise
return completion
@requires_not_closed
def mirror_image_get_status(self):
"""

View File

@ -1470,3 +1470,66 @@ TEST_F(TestMirroring, SnapshotPromoteDemote)
ASSERT_EQ(0, m_rbd.mirror_peer_site_remove(m_ioctx, peer_uuid));
ASSERT_EQ(0, m_rbd.mirror_mode_set(m_ioctx, RBD_MIRROR_MODE_DISABLED));
}
TEST_F(TestMirroring, AioSnapshotCreate)
{
REQUIRE_FORMAT_V2();
std::list<std::string> image_names;
for (size_t idx = 0; idx < 10; ++idx) {
image_names.push_back(get_temp_image_name());
}
ASSERT_EQ(0, m_rbd.mirror_mode_set(m_ioctx, RBD_MIRROR_MODE_IMAGE));
std::string peer_uuid;
ASSERT_EQ(0, m_rbd.mirror_peer_site_add(m_ioctx, &peer_uuid,
RBD_MIRROR_PEER_DIRECTION_RX_TX,
"cluster", "client"));
// create mirror images
uint64_t features;
ASSERT_TRUE(get_features(&features));
int order = 20;
std::list<librbd::Image> images;
for (auto &image_name : image_names) {
ASSERT_EQ(0, m_rbd.create2(m_ioctx, image_name.c_str(), 2048, features,
&order));
images.emplace_back();
ASSERT_EQ(0, m_rbd.open(m_ioctx, images.back(), image_name.c_str()));
ASSERT_EQ(0, images.back().mirror_image_enable2(
RBD_MIRROR_IMAGE_MODE_SNAPSHOT));
}
// create snapshots
std::list<uint64_t> snap_ids;
std::list<librbd::RBD::AioCompletion *> aio_comps;
for (auto &image : images) {
snap_ids.emplace_back();
aio_comps.push_back(new librbd::RBD::AioCompletion(nullptr, nullptr));
ASSERT_EQ(0, image.aio_mirror_image_create_snapshot(0, &snap_ids.back(),
aio_comps.back()));
}
for (auto aio_comp : aio_comps) {
ASSERT_EQ(0, aio_comp->wait_for_complete());
ASSERT_EQ(1, aio_comp->is_complete());
ASSERT_EQ(0, aio_comp->get_return_value());
aio_comp->release();
}
aio_comps.clear();
// verify
for (auto &image : images) {
vector<librbd::snap_info_t> snaps;
ASSERT_EQ(0, image.snap_list(snaps));
ASSERT_EQ(2U, snaps.size());
ASSERT_EQ(snaps[1].id, snap_ids.front());
std::string image_name;
ASSERT_EQ(0, image.get_name(&image_name));
ASSERT_EQ(0, image.close());
ASSERT_EQ(0, m_rbd.remove(m_ioctx, image_name.c_str()));
snap_ids.pop_front();
}
ASSERT_EQ(0, m_rbd.mirror_peer_site_remove(m_ioctx, peer_uuid));
ASSERT_EQ(0, m_rbd.mirror_mode_set(m_ioctx, RBD_MIRROR_MODE_DISABLED));
}

View File

@ -325,6 +325,40 @@ def test_open_by_id():
eq(image.get_name(), image_name)
RBD().remove(ioctx, image_name)
def test_aio_open():
with Rados(conffile='') as cluster:
with cluster.open_ioctx(pool_name) as ioctx:
image_name = get_temp_image_name()
order = 20
RBD().create(ioctx, image_name, IMG_SIZE, order)
# this is a list so that the open_cb() can modify it
image = [None]
def open_cb(_, image_):
image[0] = image_
comp = RBD().aio_open_image(open_cb, ioctx, image_name)
comp.wait_for_complete_and_cb()
eq(comp.get_return_value(), 0)
eq(sys.getrefcount(comp), 2)
assert_not_equal(image[0], None)
image = image[0]
eq(image.get_name(), image_name)
check_stat(image.stat(), IMG_SIZE, order)
closed = [False]
def close_cb(_):
closed[0] = True
comp = image.aio_close(close_cb)
comp.wait_for_complete_and_cb()
eq(comp.get_return_value(), 0)
eq(sys.getrefcount(comp), 2)
eq(closed[0], True)
RBD().remove(ioctx, image_name)
def test_remove_dne():
assert_raises(ImageNotFound, remove_image)
@ -2206,6 +2240,62 @@ class TestMirroring(object):
self.rbd.mirror_peer_remove(ioctx, peer2_uuid)
self.image.mirror_image_promote(False)
def test_aio_mirror_image_create_snapshot(self):
peer_uuid = self.rbd.mirror_peer_add(ioctx, "cluster", "client")
self.rbd.mirror_mode_set(ioctx, RBD_MIRROR_MODE_IMAGE)
self.image.mirror_image_disable(False)
self.image.mirror_image_enable(RBD_MIRROR_IMAGE_MODE_SNAPSHOT)
snaps = list(self.image.list_snaps())
eq(1, len(snaps))
snap = snaps[0]
eq(snap['namespace'], RBD_SNAP_NAMESPACE_TYPE_MIRROR)
eq(RBD_SNAP_MIRROR_STATE_PRIMARY, snap['mirror']['state'])
# this is a list so that the local cb() can modify it
info = [None]
def cb(_, _info):
info[0] = _info
comp = self.image.aio_mirror_image_get_info(cb)
comp.wait_for_complete_and_cb()
assert_not_equal(info[0], None)
eq(comp.get_return_value(), 0)
eq(sys.getrefcount(comp), 2)
info = info[0]
global_id = info['global_id']
self.check_info(info, global_id, RBD_MIRROR_IMAGE_ENABLED, True)
mode = [None]
def cb(_, _mode):
mode[0] = _mode
comp = self.image.aio_mirror_image_get_mode(cb)
comp.wait_for_complete_and_cb()
eq(comp.get_return_value(), 0)
eq(sys.getrefcount(comp), 2)
eq(mode[0], RBD_MIRROR_IMAGE_MODE_SNAPSHOT)
snap_id = [None]
def cb(_, _snap_id):
snap_id[0] = _snap_id
comp = self.image.aio_mirror_image_create_snapshot(0, cb)
comp.wait_for_complete_and_cb()
assert_not_equal(snap_id[0], None)
eq(comp.get_return_value(), 0)
eq(sys.getrefcount(comp), 2)
snaps = list(self.image.list_snaps())
eq(2, len(snaps))
snap = snaps[1]
eq(snap['id'], snap_id[0])
eq(snap['namespace'], RBD_SNAP_NAMESPACE_TYPE_MIRROR)
eq(RBD_SNAP_MIRROR_STATE_PRIMARY, snap['mirror']['state'])
eq([peer_uuid], snap['mirror']['mirror_peer_uuids'])
self.rbd.mirror_peer_remove(ioctx, peer_uuid)
class TestTrash(object):
def setUp(self):