cls_rbd: Add cg_create, cg_dir_add, cg_dir_remove, cg_dir_list

Signed-off-by: Victor Denisov <denisovenator@gmail.com>
This commit is contained in:
Victor Denisov 2016-05-26 20:59:56 -07:00
parent 2380351e16
commit e0a6b62f41
5 changed files with 392 additions and 1 deletions

View File

@ -132,6 +132,10 @@ cls_method_handle_t h_mirror_image_status_get;
cls_method_handle_t h_mirror_image_status_list;
cls_method_handle_t h_mirror_image_status_get_summary;
cls_method_handle_t h_mirror_image_status_remove_down;
cls_method_handle_t h_group_create;
cls_method_handle_t h_group_dir_list;
cls_method_handle_t h_group_dir_add;
cls_method_handle_t h_group_dir_remove;
#define RBD_MAX_KEYS_READ 64
#define RBD_SNAP_KEY_PREFIX "snapshot_"
@ -139,6 +143,8 @@ cls_method_handle_t h_mirror_image_status_remove_down;
#define RBD_DIR_NAME_KEY_PREFIX "name_"
#define RBD_METADATA_KEY_PREFIX "metadata_"
#define GROUP_SNAP_SEQ "snap_seq"
static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
{
unsigned snap_count = 0;
@ -4135,6 +4141,212 @@ int mirror_image_status_remove_down(cls_method_context_t hctx, bufferlist *in,
return 0;
}
/**
* Initialize the header with basic metadata.
* Everything is stored as key/value pairs as omaps in the header object.
*
* Input:
* none
*
* Output:
* @return 0 on success, negative error code on failure
*/
int group_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
bufferlist snap_seqbl;
uint64_t snap_seq = 0;
::encode(snap_seq, snap_seqbl);
int r = cls_cxx_map_set_val(hctx, GROUP_SNAP_SEQ, &snap_seqbl);
if (r < 0)
return r;
return 0;
}
/**
* List consistency groups from the directory.
*
* Input:
* @param start_after (std::string)
* @param max_return (int64_t)
*
* Output:
* @param map of consistency groups (name, id)
* @return 0 on success, negative error code on failure
*/
int group_dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
string start_after;
uint64_t max_return;
try {
bufferlist::iterator iter = in->begin();
::decode(start_after, iter);
::decode(max_return, iter);
} catch (const buffer::error &err) {
return -EINVAL;
}
int max_read = RBD_MAX_KEYS_READ;
int r = max_read;
map<string, string> groups;
string last_read = dir_key_for_name(start_after);
while (r == max_read && groups.size() < max_return) {
map<string, bufferlist> vals;
CLS_LOG(20, "last_read = '%s'", last_read.c_str());
r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
max_read, &vals);
if (r < 0) {
CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
return r;
}
for (pair<string, bufferlist> val: vals) {
string id;
bufferlist::iterator iter = val.second.begin();
try {
::decode(id, iter);
} catch (const buffer::error &err) {
CLS_ERR("could not decode id of consistency group '%s'", val.first.c_str());
return -EIO;
}
CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(val.first).c_str(), id.c_str());
groups[dir_name_from_key(val.first)] = id;
if (groups.size() >= max_return)
break;
}
if (!vals.empty()) {
last_read = dir_key_for_name(groups.rbegin()->first);
}
}
::encode(groups, *out);
return 0;
}
/**
* Add a consistency group to the directory.
*
* Input:
* @param name (std::string)
* @param id (std::string)
*
* Output:
* @return 0 on success, negative error code on failure
*/
int group_dir_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
int r = cls_cxx_create(hctx, false);
if (r < 0) {
CLS_ERR("could not create consistency group directory: %s",
cpp_strerror(r).c_str());
return r;
}
string name, id;
try {
bufferlist::iterator iter = in->begin();
::decode(name, iter);
::decode(id, iter);
} catch (const buffer::error &err) {
return -EINVAL;
}
if (!name.size() || !is_valid_id(id)) {
CLS_ERR("invalid consistency group name '%s' or id '%s'",
name.c_str(), id.c_str());
return -EINVAL;
}
CLS_LOG(20, "group_dir_add name=%s id=%s", name.c_str(), id.c_str());
string tmp;
string name_key = dir_key_for_name(name);
string id_key = dir_key_for_id(id);
r = read_key(hctx, name_key, &tmp);
if (r != -ENOENT) {
CLS_LOG(10, "name already exists");
return -EEXIST;
}
r = read_key(hctx, id_key, &tmp);
if (r != -ENOENT) {
CLS_LOG(10, "id already exists");
return -EBADF;
}
bufferlist id_bl, name_bl;
::encode(id, id_bl);
::encode(name, name_bl);
map<string, bufferlist> omap_vals;
omap_vals[name_key] = id_bl;
omap_vals[id_key] = name_bl;
return cls_cxx_map_set_vals(hctx, &omap_vals);
}
/**
* Remove a consistency group from the directory.
*
* Input:
* @param name (std::string)
* @param id (std::string)
*
* Output:
* @return 0 on success, negative error code on failure
*/
int group_dir_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
string name, id;
try {
bufferlist::iterator iter = in->begin();
::decode(name, iter);
::decode(id, iter);
} catch (const buffer::error &err) {
return -EINVAL;
}
CLS_LOG(20, "group_dir_remove name=%s id=%s", name.c_str(), id.c_str());
string stored_name, stored_id;
string name_key = dir_key_for_name(name);
string id_key = dir_key_for_id(id);
int r = read_key(hctx, name_key, &stored_id);
if (r < 0) {
if (r != -ENOENT)
CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
return r;
}
r = read_key(hctx, id_key, &stored_name);
if (r < 0) {
if (r != -ENOENT)
CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
return r;
}
// check if this op raced with a rename
if (stored_name != name || stored_id != id) {
CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
return -ESTALE;
}
r = cls_cxx_map_remove_key(hctx, name_key);
if (r < 0) {
CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
return r;
}
r = cls_cxx_map_remove_key(hctx, id_key);
if (r < 0) {
CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
return r;
}
return 0;
}
void __cls_init()
{
CLS_LOG(20, "Loaded rbd class!");
@ -4349,5 +4561,18 @@ void __cls_init()
CLS_METHOD_RD | CLS_METHOD_WR,
mirror_image_status_remove_down,
&h_mirror_image_status_remove_down);
/* methods for the consistency groups feature */
cls_register_cxx_method(h_class, "group_create",
CLS_METHOD_RD | CLS_METHOD_WR,
group_create, &h_group_create);
cls_register_cxx_method(h_class, "group_dir_list",
CLS_METHOD_RD,
group_dir_list, &h_group_dir_list);
cls_register_cxx_method(h_class, "group_dir_add",
CLS_METHOD_RD | CLS_METHOD_WR,
group_dir_add, &h_group_dir_add);
cls_register_cxx_method(h_class, "group_dir_remove",
CLS_METHOD_RD | CLS_METHOD_WR,
group_dir_remove, &h_group_dir_remove);
return;
}

View File

@ -1433,5 +1433,52 @@ namespace librbd {
op->exec("rbd", "mirror_image_status_remove_down", bl);
}
// Consistency groups functions
int group_create(librados::IoCtx *ioctx, const std::string &oid)
{
bufferlist bl, bl2;
return ioctx->exec(oid, "rbd", "group_create", bl, bl2);
}
int group_dir_list(librados::IoCtx *ioctx, const std::string &oid,
const std::string &start, uint64_t max_return,
map<string, string> *cgs)
{
bufferlist in, out;
::encode(start, in);
::encode(max_return, in);
int r = ioctx->exec(oid, "rbd", "group_dir_list", in, out);
if (r < 0)
return r;
bufferlist::iterator iter = out.begin();
try {
::decode(*cgs, iter);
} catch (const buffer::error &err) {
return -EBADMSG;
}
return 0;
}
int group_dir_add(librados::IoCtx *ioctx, const std::string &oid,
const std::string &name, const std::string &id)
{
bufferlist in, out;
::encode(name, in);
::encode(id, in);
return ioctx->exec(oid, "rbd", "group_dir_add", in, out);
}
int group_dir_remove(librados::IoCtx *ioctx, const std::string &oid,
const std::string &name, const std::string &id)
{
bufferlist in, out;
::encode(name, in);
::encode(id, in);
return ioctx->exec(oid, "rbd", "group_dir_remove", in, out);
}
} // namespace cls_client
} // namespace librbd

View File

@ -288,6 +288,16 @@ namespace librbd {
int mirror_image_status_remove_down(librados::IoCtx *ioctx);
void mirror_image_status_remove_down(librados::ObjectWriteOperation *op);
// Consistency groups functions
int group_create(librados::IoCtx *ioctx, const std::string &oid);
int group_dir_list(librados::IoCtx *ioctx, const std::string &oid,
const std::string &start, uint64_t max_return,
map<string, string> *groups);
int group_dir_add(librados::IoCtx *ioctx, const std::string &oid,
const std::string &name, const std::string &id);
int group_dir_remove(librados::IoCtx *ioctx, const std::string &oid,
const std::string &name, const std::string &id);
} // namespace cls_client
} // namespace librbd
#endif // CEPH_LIBRBD_CLS_RBD_CLIENT_H

View File

@ -74,6 +74,10 @@
#define RBD_HEADER_SIGNATURE "RBD"
#define RBD_HEADER_VERSION "001.005"
#define RBD_GROUP_HEADER_PREFIX "rbd_group_header."
#define RBD_GROUP_DIRECTORY "rbd_group_directory"
struct rbd_info {
__le64 max_id;
} __attribute__ ((packed));
@ -102,5 +106,4 @@ struct rbd_obj_header_ondisk {
struct rbd_obj_snap_ondisk snaps[0];
} __attribute__((packed));
#endif

View File

@ -1699,3 +1699,109 @@ TEST_F(TestClsRbd, mirror_image_status) {
ASSERT_EQ(0U, images.size());
ASSERT_EQ(0U, statuses.size());
}
TEST_F(TestClsRbd, group_create) {
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
string group_id = "group_id";
ASSERT_EQ(0, group_create(&ioctx, group_id));
uint64_t psize;
time_t pmtime;
ASSERT_EQ(0, ioctx.stat(group_id, &psize, &pmtime));
}
TEST_F(TestClsRbd, group_dir_list) {
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
string group_id1 = "cgid1";
string group_name1 = "cgname1";
string group_id2 = "cgid2";
string group_name2 = "cgname2";
ASSERT_EQ(0, group_dir_add(&ioctx, RBD_GROUP_DIRECTORY, group_name1, group_id1));
ASSERT_EQ(0, group_dir_add(&ioctx, RBD_GROUP_DIRECTORY, group_name2, group_id2));
map<string, string> cgs;
ASSERT_EQ(0, group_dir_list(&ioctx, RBD_GROUP_DIRECTORY, "", 10, &cgs));
ASSERT_EQ(2, cgs.size());
auto it = cgs.begin();
ASSERT_EQ(group_id1, it->second);
ASSERT_EQ(group_name1, it->first);
++it;
ASSERT_EQ(group_id2, it->second);
ASSERT_EQ(group_name2, it->first);
}
void add_group_to_dir(librados::IoCtx ioctx, string group_id, string group_name) {
ASSERT_EQ(0, group_dir_add(&ioctx, RBD_GROUP_DIRECTORY, group_name, group_id));
set<string> keys;
ASSERT_EQ(0, ioctx.omap_get_keys(RBD_GROUP_DIRECTORY, "", 10, &keys));
ASSERT_EQ(2, keys.size());
ASSERT_EQ("id_" + group_id, *keys.begin());
ASSERT_EQ("name_" + group_name, *keys.rbegin());
}
TEST_F(TestClsRbd, group_dir_add) {
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
ioctx.remove(RBD_GROUP_DIRECTORY);
string group_id = "cgid";
string group_name = "cgname";
add_group_to_dir(ioctx, group_id, group_name);
}
TEST_F(TestClsRbd, dir_add_already_existing) {
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
ioctx.remove(RBD_GROUP_DIRECTORY);
string group_id = "cgidexisting";
string group_name = "cgnameexisting";
add_group_to_dir(ioctx, group_id, group_name);
ASSERT_EQ(-EEXIST, group_dir_add(&ioctx, RBD_GROUP_DIRECTORY, group_name, group_id));
}
TEST_F(TestClsRbd, group_dir_remove) {
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
ioctx.remove(RBD_GROUP_DIRECTORY);
string group_id = "cgidtodel";
string group_name = "cgnametodel";
add_group_to_dir(ioctx, group_id, group_name);
ASSERT_EQ(0, group_dir_remove(&ioctx, RBD_GROUP_DIRECTORY, group_name, group_id));
set<string> keys;
ASSERT_EQ(0, ioctx.omap_get_keys(RBD_GROUP_DIRECTORY, "", 10, &keys));
ASSERT_EQ(0, keys.size());
}
TEST_F(TestClsRbd, group_dir_remove_missing) {
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
ioctx.remove(RBD_GROUP_DIRECTORY);
string group_id = "cgidtodelmissing";
string group_name = "cgnametodelmissing";
// These two lines ensure that RBD_GROUP_DIRECTORY exists. It's important for the
// last two lines.
add_group_to_dir(ioctx, group_id, group_name);
ASSERT_EQ(0, group_dir_remove(&ioctx, RBD_GROUP_DIRECTORY, group_name, group_id));
// Removing missing
ASSERT_EQ(-ENOENT, group_dir_remove(&ioctx, RBD_GROUP_DIRECTORY, group_name, group_id));
set<string> keys;
ASSERT_EQ(0, ioctx.omap_get_keys(RBD_GROUP_DIRECTORY, "", 10, &keys));
ASSERT_EQ(0, keys.size());
}