diff --git a/src/cls_rbd.cc b/src/cls_rbd.cc index 6fcfcd01b5b..c7f697d7825 100644 --- a/src/cls_rbd.cc +++ b/src/cls_rbd.cc @@ -53,6 +53,9 @@ cls_method_handle_t h_set_size; cls_method_handle_t h_get_parent; cls_method_handle_t h_set_parent; cls_method_handle_t h_remove_parent; +cls_method_handle_t h_add_child; +cls_method_handle_t h_remove_child; +cls_method_handle_t h_get_children; cls_method_handle_t h_get_snapcontext; cls_method_handle_t h_get_object_prefix; cls_method_handle_t h_get_snapshot_name; @@ -865,6 +868,204 @@ int remove_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out) return 0; } +/** + * methods for dealing with rbd_children object + */ + +static int decode_parent_common(bufferlist::iterator& it, uint64_t *poolid, + string *imageid, snapid_t *snapid) +{ + try { + ::decode(*poolid, it); + ::decode(*imageid, it); + ::decode(*snapid, it); + } catch (const buffer::error &err) { + return -EINVAL; + } + return 0; +} + +static int decode_parent(bufferlist *in, uint64_t *poolid, + string *imageid, snapid_t *snapid) +{ + bufferlist::iterator it = in->begin(); + return decode_parent_common(it, poolid, imageid, snapid); +} + +static int decode_parent_and_child(bufferlist *in, uint64_t *poolid, + string *imageid, snapid_t *snapid, + string *c_imageid) +{ + bufferlist::iterator it = in->begin(); + int r = decode_parent_common(it, poolid, imageid, snapid); + if (r < 0) + return r; + try { + ::decode(*c_imageid, it); + } catch (const buffer::error &err) { + return -EINVAL; + } + return 0; +} + +static string parent_key(uint64_t poolid, string imageid, snapid_t snapid) +{ + bufferlist key_bl; + ::encode(poolid, key_bl); + ::encode(imageid, key_bl); + ::encode(snapid, key_bl); + return string(key_bl.c_str(), key_bl.length()); +} + +/** + * add child to rbd_children directory object + * + * rbd_children is a map of (p_poolid, p_imageid, p_snapid) to + * [c_imageid, [c_imageid ... ]] + * + * Input: + * @param p_poolid parent pool id + * @param p_imageid parent image oid + * @param p_snapid parent snapshot id + * @param c_imageid new child image oid to add + * + * @returns 0 on success, negative error on failure + */ + +int add_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + int r; + + uint64_t p_poolid; + snapid_t p_snapid; + string p_imageid, c_imageid; + // Use set for ease of erase() for remove_child() + std::set children; + + r = decode_parent_and_child(in, &p_poolid, &p_imageid, &p_snapid, &c_imageid); + if (r < 0) + return r; + + CLS_LOG(20, "add_child %s to (%d, %s, %d)", c_imageid.c_str(), + p_poolid, p_imageid.c_str(), p_snapid); + + string key = parent_key(p_poolid, p_imageid, p_snapid); + + // get current child list for parent, if any + r = read_key(hctx, key, &children); + if ((r < 0) && (r != -ENOENT)) { + CLS_LOG(20, "add_child: omap read failed: %d", r); + return r; + } + + // add new child + children.insert(c_imageid); + + // write back + bufferlist childbl; + ::encode(children, childbl); + r = cls_cxx_map_set_val(hctx, key, &childbl); + if (r < 0) + CLS_LOG(20, "add_child: omap write failed: %d", r); + return r; +} + +/** + * remove child from rbd_children directory object + * + * Input: + * @param p_poolid parent pool id + * @param p_imageid parent image oid + * @param p_snapid parent snapshot id + * @param c_imageid new child image oid to add + * + * @returns 0 on success, negative error on failure + */ + +int remove_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + int r; + + uint64_t p_poolid; + snapid_t p_snapid; + string p_imageid, c_imageid; + std::set children; + + r = decode_parent_and_child(in, &p_poolid, &p_imageid, &p_snapid, &c_imageid); + if (r < 0) + return r; + + CLS_LOG(20, "remove_child %s from (%d, %s, %d)", c_imageid.c_str(), + p_poolid, p_imageid.c_str(), p_snapid); + + string key = parent_key(p_poolid, p_imageid, p_snapid); + + // get current child list for parent. Unlike add_child(), an empty list + // is an error (how can we remove something that doesn't exist?) + r = read_key(hctx, key, &children); + if (r < 0) { + CLS_LOG(20, "remove_child: read omap failed: %d", r); + return r; + } + + // find and remove child + children.erase(c_imageid); + + // now empty? remove entry altogether + if (children.empty()) { + r = cls_cxx_map_remove_key(hctx, key); + if (r < 0) + CLS_LOG(20, "remove_child: remove key failed: %d", r); + } else { + // write back shortened children list + bufferlist childbl; + ::encode(children, childbl); + r = cls_cxx_map_set_val(hctx, key, &childbl); + if (r < 0) + CLS_LOG(20, "remove_child: write omap failed: %d ", r); + } + return r; +} + +/** + * Input: + * @param p_poolid parent pool id + * @param p_imageid parent image oid + * @param p_snapid parent snapshot id + * @param c_imageid new child image oid to add + * + * Output: + * @param children set of children + * + * @returns 0 on success, negative error on failure + */ +int get_children(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + int r; + uint64_t p_poolid; + snapid_t p_snapid; + string p_imageid; + std::set children; + + r = decode_parent(in, &p_poolid, &p_imageid, &p_snapid); + if (r < 0) + return r; + + CLS_LOG(20, "get_children of (%d, %s, %d)", + p_poolid, p_imageid.c_str(), p_snapid); + + string key = parent_key(p_poolid, p_imageid, p_snapid); + + r = read_key(hctx, key, &children); + if (r < 0) { + if (r != -ENOENT) + CLS_LOG(20, "remove_child: read omap failed: %d", r); + return r; + } + ::encode(children, *out); + return 0; +} + /** * Get the information needed to create a rados snap context for doing @@ -1861,6 +2062,17 @@ void __cls_init() CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, remove_parent, &h_remove_parent); + /* methods for the rbd_children object */ + cls_register_cxx_method(h_class, "add_child", + CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, + add_child, &h_add_child); + cls_register_cxx_method(h_class, "remove_child", + CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, + remove_child, &h_remove_child); + cls_register_cxx_method(h_class, "get_children", + CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, + get_children, &h_get_children); + /* methods for the rbd_id.$image_name objects */ cls_register_cxx_method(h_class, "get_id", CLS_METHOD_RD | CLS_METHOD_PUBLIC, diff --git a/src/include/rbd_types.h b/src/include/rbd_types.h index 034aece9af2..6a00cdca09a 100644 --- a/src/include/rbd_types.h +++ b/src/include/rbd_types.h @@ -45,6 +45,22 @@ #define RBD_DIRECTORY "rbd_directory" #define RBD_INFO "rbd_info" +/* + * rbd_children object in each pool contains omap entries + * that map parent (poolid, imageid, snapid) to a list of children + * (imageids; snapids aren't required because we get all the snapshot + * info from a read of the child's header object anyway). + * + * The clone operation writes a new item to this child list, and rm or + * flatten removes an item, and may remove the whole entry if no children + * exist after the rm/flatten. + * + * When attempting to remove a parent, all pools are searched for + * rbd_children objects with entries referring to that parent; if any + * exist (and those children exist), the parent removal is prevented. + */ +#define RBD_CHILDREN "rbd_children" + #define RBD_DEFAULT_OBJ_ORDER 22 /* 4MB */ #define RBD_MAX_OBJ_NAME_SIZE 96 diff --git a/src/librbd/cls_rbd_client.cc b/src/librbd/cls_rbd_client.cc index 543844782b7..d0e78637803 100644 --- a/src/librbd/cls_rbd_client.cc +++ b/src/librbd/cls_rbd_client.cc @@ -1,4 +1,4 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "include/buffer.h" @@ -227,6 +227,53 @@ namespace librbd { return ioctx->exec(oid, "rbd", "remove_parent", inbl, outbl); } + int add_child(librados::IoCtx *ioctx, const std::string &oid, + uint64_t p_poolid, const std::string &p_imageid, + snapid_t p_snapid, const std::string &c_imageid) + { + bufferlist in, out; + ::encode(p_poolid, in); + ::encode(p_imageid, in); + ::encode(p_snapid, in); + ::encode(c_imageid, in); + + return ioctx->exec(oid, "rbd", "add_child", in, out); + } + + int remove_child(librados::IoCtx *ioctx, const std::string &oid, + uint64_t p_poolid, const std::string &p_imageid, + snapid_t p_snapid, const std::string &c_imageid) + { + bufferlist in, out; + ::encode(p_poolid, in); + ::encode(p_imageid, in); + ::encode(p_snapid, in); + ::encode(c_imageid, in); + + return ioctx->exec(oid, "rbd", "remove_child", in, out); + } + + int get_children(librados::IoCtx *ioctx, const std::string &oid, + uint64_t p_poolid, const std::string &p_imageid, + snapid_t p_snapid, set& children) + { + bufferlist in, out; + ::encode(p_poolid, in); + ::encode(p_imageid, in); + ::encode(p_snapid, in); + + int r = ioctx->exec(oid, "rbd", "get_children", in, out); + if (r < 0) + return r; + bufferlist::iterator it = out.begin(); + try { + ::decode(children, it); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; + } + int snapshot_add(librados::IoCtx *ioctx, const std::string &oid, snapid_t snap_id, const std::string &snap_name) { diff --git a/src/librbd/cls_rbd_client.h b/src/librbd/cls_rbd_client.h index 48335bb9646..285ed373961 100644 --- a/src/librbd/cls_rbd_client.h +++ b/src/librbd/cls_rbd_client.h @@ -56,6 +56,15 @@ namespace librbd { int64_t parent_pool, const std::string& parent_image, snapid_t parent_snap_id, uint64_t parent_overlap); int remove_parent(librados::IoCtx *ioctx, const std::string &oid); + int add_child(librados::IoCtx *ioctx, const std::string &oid, + uint64_t p_poolid, const std::string &p_imageid, + snapid_t p_snapid, const std::string &c_imageid); + int remove_child(librados::IoCtx *ioctx, const std::string &oid, + uint64_t p_poolid, const std::string &p_imageid, + snapid_t p_snapid, const std::string &c_imageid); + int get_children(librados::IoCtx *ioctx, const std::string &oid, + uint64_t p_poolid, const std::string &p_imageid, + snapid_t p_snapid, set& children); int snapshot_add(librados::IoCtx *ioctx, const std::string &oid, snapid_t snap_id, const std::string &snap_name); int snapshot_remove(librados::IoCtx *ioctx, const std::string &oid, diff --git a/src/test/rbd/test_cls_rbd.cc b/src/test/rbd/test_cls_rbd.cc index 3b246b82ac5..b9427758e4c 100644 --- a/src/test/rbd/test_cls_rbd.cc +++ b/src/test/rbd/test_cls_rbd.cc @@ -26,6 +26,9 @@ using ::librbd::cls_client::set_parent; using ::librbd::cls_client::remove_parent; using ::librbd::cls_client::snapshot_add; using ::librbd::cls_client::snapshot_remove; +using ::librbd::cls_client::add_child; +using ::librbd::cls_client::remove_child; +using ::librbd::cls_client::get_children; using ::librbd::cls_client::get_snapcontext; using ::librbd::cls_client::snapshot_list; using ::librbd::cls_client::list_locks; @@ -138,6 +141,50 @@ TEST(cls_rbd, get_and_set_id) ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados)); } +TEST(cls_rbd, add_remove_child) +{ + librados::Rados rados; + librados::IoCtx ioctx; + string pool_name = get_temp_pool_name(); + + ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); + ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); + + string oid = "rbd_children_test"; + ASSERT_EQ(0, ioctx.create(oid, true)); + + string parent_image = "parent_id"; + string snapname = "parent_snap"; + snapid_t snapid(10); + string child_image = "child_id"; + uint64_t poolid = ioctx.get_id(); + setchildren; + + // nonexistent children cannot be listed or removed + ASSERT_EQ(-ENOENT, get_children(&ioctx, oid, poolid, parent_image, snapid, + children)); + ASSERT_EQ(-ENOENT, remove_child(&ioctx, oid, poolid, parent_image, snapid, + child_image)); + + // make a parent with a snapshot + ASSERT_EQ(0, create_image(&ioctx, parent_image, 2<<20, 0, + RBD_FEATURE_LAYERING, parent_image)); + ASSERT_EQ(0, snapshot_add(&ioctx, parent_image, snapid, snapname)); + + // add, verify it showed up + ASSERT_EQ(0, add_child(&ioctx, oid, poolid, parent_image, snapid, + child_image)); + ASSERT_EQ(0, get_children(&ioctx, oid, poolid, parent_image, snapid, + children)); + bool found = (children.find(child_image) != children.end()); + ASSERT_EQ(found, true); + + ASSERT_EQ(0, remove_child(&ioctx, oid, poolid, parent_image, snapid, + child_image)); + ioctx.close(); + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados)); +} + TEST(cls_rbd, directory_methods) { librados::Rados rados;