cls_rbd, cls_rbd_client: add/remove/get children

Manipulate the new rbd_children object, which holds mappings between
parent snapshots and their children clone images

Signed-off-by: Dan Mick <dan.mick@inktank.com>
This commit is contained in:
Dan Mick 2012-08-02 17:50:06 -07:00
parent 4fd393f3ce
commit 93fed220d5
5 changed files with 332 additions and 1 deletions

View File

@ -53,6 +53,9 @@ cls_method_handle_t h_set_size;
cls_method_handle_t h_get_parent;
cls_method_handle_t h_set_parent;
cls_method_handle_t h_remove_parent;
cls_method_handle_t h_add_child;
cls_method_handle_t h_remove_child;
cls_method_handle_t h_get_children;
cls_method_handle_t h_get_snapcontext;
cls_method_handle_t h_get_object_prefix;
cls_method_handle_t h_get_snapshot_name;
@ -865,6 +868,204 @@ int remove_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
return 0;
}
/**
* methods for dealing with rbd_children object
*/
static int decode_parent_common(bufferlist::iterator& it, uint64_t *poolid,
string *imageid, snapid_t *snapid)
{
try {
::decode(*poolid, it);
::decode(*imageid, it);
::decode(*snapid, it);
} catch (const buffer::error &err) {
return -EINVAL;
}
return 0;
}
static int decode_parent(bufferlist *in, uint64_t *poolid,
string *imageid, snapid_t *snapid)
{
bufferlist::iterator it = in->begin();
return decode_parent_common(it, poolid, imageid, snapid);
}
static int decode_parent_and_child(bufferlist *in, uint64_t *poolid,
string *imageid, snapid_t *snapid,
string *c_imageid)
{
bufferlist::iterator it = in->begin();
int r = decode_parent_common(it, poolid, imageid, snapid);
if (r < 0)
return r;
try {
::decode(*c_imageid, it);
} catch (const buffer::error &err) {
return -EINVAL;
}
return 0;
}
static string parent_key(uint64_t poolid, string imageid, snapid_t snapid)
{
bufferlist key_bl;
::encode(poolid, key_bl);
::encode(imageid, key_bl);
::encode(snapid, key_bl);
return string(key_bl.c_str(), key_bl.length());
}
/**
* add child to rbd_children directory object
*
* rbd_children is a map of (p_poolid, p_imageid, p_snapid) to
* [c_imageid, [c_imageid ... ]]
*
* Input:
* @param p_poolid parent pool id
* @param p_imageid parent image oid
* @param p_snapid parent snapshot id
* @param c_imageid new child image oid to add
*
* @returns 0 on success, negative error on failure
*/
int add_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
int r;
uint64_t p_poolid;
snapid_t p_snapid;
string p_imageid, c_imageid;
// Use set for ease of erase() for remove_child()
std::set<string> children;
r = decode_parent_and_child(in, &p_poolid, &p_imageid, &p_snapid, &c_imageid);
if (r < 0)
return r;
CLS_LOG(20, "add_child %s to (%d, %s, %d)", c_imageid.c_str(),
p_poolid, p_imageid.c_str(), p_snapid);
string key = parent_key(p_poolid, p_imageid, p_snapid);
// get current child list for parent, if any
r = read_key(hctx, key, &children);
if ((r < 0) && (r != -ENOENT)) {
CLS_LOG(20, "add_child: omap read failed: %d", r);
return r;
}
// add new child
children.insert(c_imageid);
// write back
bufferlist childbl;
::encode(children, childbl);
r = cls_cxx_map_set_val(hctx, key, &childbl);
if (r < 0)
CLS_LOG(20, "add_child: omap write failed: %d", r);
return r;
}
/**
* remove child from rbd_children directory object
*
* Input:
* @param p_poolid parent pool id
* @param p_imageid parent image oid
* @param p_snapid parent snapshot id
* @param c_imageid new child image oid to add
*
* @returns 0 on success, negative error on failure
*/
int remove_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
int r;
uint64_t p_poolid;
snapid_t p_snapid;
string p_imageid, c_imageid;
std::set<string> children;
r = decode_parent_and_child(in, &p_poolid, &p_imageid, &p_snapid, &c_imageid);
if (r < 0)
return r;
CLS_LOG(20, "remove_child %s from (%d, %s, %d)", c_imageid.c_str(),
p_poolid, p_imageid.c_str(), p_snapid);
string key = parent_key(p_poolid, p_imageid, p_snapid);
// get current child list for parent. Unlike add_child(), an empty list
// is an error (how can we remove something that doesn't exist?)
r = read_key(hctx, key, &children);
if (r < 0) {
CLS_LOG(20, "remove_child: read omap failed: %d", r);
return r;
}
// find and remove child
children.erase(c_imageid);
// now empty? remove entry altogether
if (children.empty()) {
r = cls_cxx_map_remove_key(hctx, key);
if (r < 0)
CLS_LOG(20, "remove_child: remove key failed: %d", r);
} else {
// write back shortened children list
bufferlist childbl;
::encode(children, childbl);
r = cls_cxx_map_set_val(hctx, key, &childbl);
if (r < 0)
CLS_LOG(20, "remove_child: write omap failed: %d ", r);
}
return r;
}
/**
* Input:
* @param p_poolid parent pool id
* @param p_imageid parent image oid
* @param p_snapid parent snapshot id
* @param c_imageid new child image oid to add
*
* Output:
* @param children set<string> of children
*
* @returns 0 on success, negative error on failure
*/
int get_children(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
int r;
uint64_t p_poolid;
snapid_t p_snapid;
string p_imageid;
std::set<string> children;
r = decode_parent(in, &p_poolid, &p_imageid, &p_snapid);
if (r < 0)
return r;
CLS_LOG(20, "get_children of (%d, %s, %d)",
p_poolid, p_imageid.c_str(), p_snapid);
string key = parent_key(p_poolid, p_imageid, p_snapid);
r = read_key(hctx, key, &children);
if (r < 0) {
if (r != -ENOENT)
CLS_LOG(20, "remove_child: read omap failed: %d", r);
return r;
}
::encode(children, *out);
return 0;
}
/**
* Get the information needed to create a rados snap context for doing
@ -1861,6 +2062,17 @@ void __cls_init()
CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
remove_parent, &h_remove_parent);
/* methods for the rbd_children object */
cls_register_cxx_method(h_class, "add_child",
CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
add_child, &h_add_child);
cls_register_cxx_method(h_class, "remove_child",
CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
remove_child, &h_remove_child);
cls_register_cxx_method(h_class, "get_children",
CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
get_children, &h_get_children);
/* methods for the rbd_id.$image_name objects */
cls_register_cxx_method(h_class, "get_id",
CLS_METHOD_RD | CLS_METHOD_PUBLIC,

View File

@ -45,6 +45,22 @@
#define RBD_DIRECTORY "rbd_directory"
#define RBD_INFO "rbd_info"
/*
* rbd_children object in each pool contains omap entries
* that map parent (poolid, imageid, snapid) to a list of children
* (imageids; snapids aren't required because we get all the snapshot
* info from a read of the child's header object anyway).
*
* The clone operation writes a new item to this child list, and rm or
* flatten removes an item, and may remove the whole entry if no children
* exist after the rm/flatten.
*
* When attempting to remove a parent, all pools are searched for
* rbd_children objects with entries referring to that parent; if any
* exist (and those children exist), the parent removal is prevented.
*/
#define RBD_CHILDREN "rbd_children"
#define RBD_DEFAULT_OBJ_ORDER 22 /* 4MB */
#define RBD_MAX_OBJ_NAME_SIZE 96

View File

@ -1,4 +1,4 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "include/buffer.h"
@ -227,6 +227,53 @@ namespace librbd {
return ioctx->exec(oid, "rbd", "remove_parent", inbl, outbl);
}
int add_child(librados::IoCtx *ioctx, const std::string &oid,
uint64_t p_poolid, const std::string &p_imageid,
snapid_t p_snapid, const std::string &c_imageid)
{
bufferlist in, out;
::encode(p_poolid, in);
::encode(p_imageid, in);
::encode(p_snapid, in);
::encode(c_imageid, in);
return ioctx->exec(oid, "rbd", "add_child", in, out);
}
int remove_child(librados::IoCtx *ioctx, const std::string &oid,
uint64_t p_poolid, const std::string &p_imageid,
snapid_t p_snapid, const std::string &c_imageid)
{
bufferlist in, out;
::encode(p_poolid, in);
::encode(p_imageid, in);
::encode(p_snapid, in);
::encode(c_imageid, in);
return ioctx->exec(oid, "rbd", "remove_child", in, out);
}
int get_children(librados::IoCtx *ioctx, const std::string &oid,
uint64_t p_poolid, const std::string &p_imageid,
snapid_t p_snapid, set<string>& children)
{
bufferlist in, out;
::encode(p_poolid, in);
::encode(p_imageid, in);
::encode(p_snapid, in);
int r = ioctx->exec(oid, "rbd", "get_children", in, out);
if (r < 0)
return r;
bufferlist::iterator it = out.begin();
try {
::decode(children, it);
} catch (const buffer::error &err) {
return -EBADMSG;
}
return 0;
}
int snapshot_add(librados::IoCtx *ioctx, const std::string &oid,
snapid_t snap_id, const std::string &snap_name)
{

View File

@ -56,6 +56,15 @@ namespace librbd {
int64_t parent_pool, const std::string& parent_image,
snapid_t parent_snap_id, uint64_t parent_overlap);
int remove_parent(librados::IoCtx *ioctx, const std::string &oid);
int add_child(librados::IoCtx *ioctx, const std::string &oid,
uint64_t p_poolid, const std::string &p_imageid,
snapid_t p_snapid, const std::string &c_imageid);
int remove_child(librados::IoCtx *ioctx, const std::string &oid,
uint64_t p_poolid, const std::string &p_imageid,
snapid_t p_snapid, const std::string &c_imageid);
int get_children(librados::IoCtx *ioctx, const std::string &oid,
uint64_t p_poolid, const std::string &p_imageid,
snapid_t p_snapid, set<string>& children);
int snapshot_add(librados::IoCtx *ioctx, const std::string &oid,
snapid_t snap_id, const std::string &snap_name);
int snapshot_remove(librados::IoCtx *ioctx, const std::string &oid,

View File

@ -26,6 +26,9 @@ using ::librbd::cls_client::set_parent;
using ::librbd::cls_client::remove_parent;
using ::librbd::cls_client::snapshot_add;
using ::librbd::cls_client::snapshot_remove;
using ::librbd::cls_client::add_child;
using ::librbd::cls_client::remove_child;
using ::librbd::cls_client::get_children;
using ::librbd::cls_client::get_snapcontext;
using ::librbd::cls_client::snapshot_list;
using ::librbd::cls_client::list_locks;
@ -138,6 +141,50 @@ TEST(cls_rbd, get_and_set_id)
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
}
TEST(cls_rbd, add_remove_child)
{
librados::Rados rados;
librados::IoCtx ioctx;
string pool_name = get_temp_pool_name();
ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
string oid = "rbd_children_test";
ASSERT_EQ(0, ioctx.create(oid, true));
string parent_image = "parent_id";
string snapname = "parent_snap";
snapid_t snapid(10);
string child_image = "child_id";
uint64_t poolid = ioctx.get_id();
set<string>children;
// nonexistent children cannot be listed or removed
ASSERT_EQ(-ENOENT, get_children(&ioctx, oid, poolid, parent_image, snapid,
children));
ASSERT_EQ(-ENOENT, remove_child(&ioctx, oid, poolid, parent_image, snapid,
child_image));
// make a parent with a snapshot
ASSERT_EQ(0, create_image(&ioctx, parent_image, 2<<20, 0,
RBD_FEATURE_LAYERING, parent_image));
ASSERT_EQ(0, snapshot_add(&ioctx, parent_image, snapid, snapname));
// add, verify it showed up
ASSERT_EQ(0, add_child(&ioctx, oid, poolid, parent_image, snapid,
child_image));
ASSERT_EQ(0, get_children(&ioctx, oid, poolid, parent_image, snapid,
children));
bool found = (children.find(child_image) != children.end());
ASSERT_EQ(found, true);
ASSERT_EQ(0, remove_child(&ioctx, oid, poolid, parent_image, snapid,
child_image));
ioctx.close();
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
}
TEST(cls_rbd, directory_methods)
{
librados::Rados rados;