cls_rbd: add initial support for separate data pools

Fixes: http://tracker.ceph.com/issues/17422
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2016-10-04 23:38:12 -04:00
parent c5f2290047
commit 06448acf3d
4 changed files with 150 additions and 37 deletions

View File

@ -84,6 +84,7 @@ cls_method_handle_t h_remove_child;
cls_method_handle_t h_get_children;
cls_method_handle_t h_get_snapcontext;
cls_method_handle_t h_get_object_prefix;
cls_method_handle_t h_get_data_pool;
cls_method_handle_t h_get_snapshot_name;
cls_method_handle_t h_snapshot_add;
cls_method_handle_t h_snapshot_remove;
@ -261,6 +262,7 @@ static bool is_valid_id(const string &id) {
* @param order bits to shift to determine the size of data objects (uint8_t)
* @param features what optional things this image will use (uint64_t)
* @param object_prefix a prefix for all the data objects
* @param data_pool_id pool id where data objects is stored (int64_t)
*
* Output:
* @return 0 on success, negative error code on failure
@ -270,6 +272,7 @@ int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
string object_prefix;
uint64_t features, size;
uint8_t order;
int64_t data_pool_id = -1;
try {
bufferlist::iterator iter = in->begin();
@ -277,6 +280,9 @@ int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
::decode(order, iter);
::decode(features, iter);
::decode(object_prefix, iter);
if (!iter.end()) {
::decode(data_pool_id, iter);
}
} catch (const buffer::error &err) {
return -EINVAL;
}
@ -318,6 +324,21 @@ int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
omap_vals["features"] = featuresbl;
omap_vals["object_prefix"] = object_prefixbl;
omap_vals["snap_seq"] = snap_seqbl;
if (features & RBD_FEATURE_DATA_POOL) {
if (data_pool_id == -1) {
CLS_ERR("data pool not provided with feature enabled");
return -EINVAL;
}
bufferlist data_pool_id_bl;
::encode(data_pool_id, data_pool_id_bl);
omap_vals["data_pool_id"] = data_pool_id_bl;
} else if (data_pool_id != -1) {
CLS_ERR("data pool provided with feature disabled");
return -EINVAL;
}
r = cls_cxx_map_set_vals(hctx, &omap_vals);
if (r < 0)
return r;
@ -1463,6 +1484,31 @@ int get_object_prefix(cls_method_context_t hctx, bufferlist *in, bufferlist *out
return 0;
}
/**
* Input:
* none
*
* Output:
* @param pool_id (int64_t) of data pool or -1 if none
* @returns 0 on success, negative error code on failure
*/
int get_data_pool(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(20, "get_data_pool");
int64_t data_pool_id;
int r = read_key(hctx, "data_pool_id", &data_pool_id);
if (r == -ENOENT) {
data_pool_id = -1;
} else if (r < 0) {
CLS_ERR("error reading image data pool id: %s", cpp_strerror(r).c_str());
return r;
}
::encode(data_pool_id, *out);
return 0;
}
int get_snapshot_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
uint64_t snap_id;
@ -4730,6 +4776,8 @@ void __cls_init()
cls_register_cxx_method(h_class, "get_object_prefix",
CLS_METHOD_RD,
get_object_prefix, &h_get_object_prefix);
cls_register_cxx_method(h_class, "get_data_pool", CLS_METHOD_RD,
get_data_pool, &h_get_data_pool);
cls_register_cxx_method(h_class, "get_snapshot_name",
CLS_METHOD_RD,
get_snapshot_name, &h_get_snapshot_name);

View File

@ -156,24 +156,26 @@ namespace librbd {
parent, group_ref);
}
void create_image(librados::ObjectWriteOperation *op, uint64_t size, uint8_t order,
uint64_t features, const std::string &object_prefix)
void create_image(librados::ObjectWriteOperation *op, uint64_t size,
uint8_t order, uint64_t features,
const std::string &object_prefix, int64_t data_pool_id)
{
bufferlist bl;
::encode(size, bl);
::encode(order, bl);
::encode(features, bl);
::encode(object_prefix, (bl));
::encode(object_prefix, bl);
::encode(data_pool_id, bl);
op->exec("rbd", "create", bl);
}
int create_image(librados::IoCtx *ioctx, const std::string &oid,
uint64_t size, uint8_t order, uint64_t features,
const std::string &object_prefix)
const std::string &object_prefix, int64_t data_pool_id)
{
librados::ObjectWriteOperation op;
create_image(&op, size, order, features, object_prefix);
create_image(&op, size, order, features, object_prefix, data_pool_id);
return ioctx->operate(oid, &op);
}
@ -235,6 +237,35 @@ namespace librbd {
return 0;
}
void get_data_pool_start(librados::ObjectReadOperation *op) {
bufferlist bl;
op->exec("rbd", "get_data_pool", bl);
}
int get_data_pool_finish(bufferlist::iterator *it, int64_t *data_pool_id) {
try {
::decode(*data_pool_id, *it);
} catch (const buffer::error &err) {
return -EBADMSG;
}
return 0;
}
int get_data_pool(librados::IoCtx *ioctx, const std::string &oid,
int64_t *data_pool_id) {
librados::ObjectReadOperation op;
get_data_pool_start(&op);
bufferlist out_bl;
int r = ioctx->operate(oid, &op, &out_bl);
if (r < 0) {
return r;
}
bufferlist::iterator it = out_bl.begin();
return get_data_pool_finish(&it, data_pool_id);
}
int get_size(librados::IoCtx *ioctx, const std::string &oid,
snapid_t snap_id, uint64_t *size, uint8_t *order)
{

View File

@ -49,11 +49,12 @@ namespace librbd {
cls::rbd::GroupSpec *uplink);
// low-level interface (mainly for testing)
void create_image(librados::ObjectWriteOperation *op, uint64_t size, uint8_t order,
uint64_t features, const std::string &object_prefix);
void create_image(librados::ObjectWriteOperation *op, uint64_t size,
uint8_t order, uint64_t features,
const std::string &object_prefix, int64_t data_pool_id);
int create_image(librados::IoCtx *ioctx, const std::string &oid,
uint64_t size, uint8_t order, uint64_t features,
const std::string &object_prefix);
const std::string &object_prefix, int64_t data_pool_id);
int get_features(librados::IoCtx *ioctx, const std::string &oid,
snapid_t snap_id, uint64_t *features);
void set_features(librados::ObjectWriteOperation *op, uint64_t features,
@ -62,6 +63,10 @@ namespace librbd {
uint64_t features, uint64_t mask);
int get_object_prefix(librados::IoCtx *ioctx, const std::string &oid,
std::string *object_prefix);
void get_data_pool_start(librados::ObjectReadOperation *op);
int get_data_pool_finish(bufferlist::iterator *it, int64_t *data_pool_id);
int get_data_pool(librados::IoCtx *ioctx, const std::string &oid,
int64_t *data_pool_id);
int get_size(librados::IoCtx *ioctx, const std::string &oid,
snapid_t snap_id, uint64_t *size, uint8_t *order);
int set_size(librados::IoCtx *ioctx, const std::string &oid,

View File

@ -185,7 +185,7 @@ TEST_F(TestClsRbd, add_remove_child)
// create the parent and snapshot
ASSERT_EQ(0, create_image(&ioctx, parent_image, 2<<20, 0,
RBD_FEATURE_LAYERING, parent_image));
RBD_FEATURE_LAYERING, parent_image, -1));
ASSERT_EQ(0, snapshot_add(&ioctx, parent_image, snapid, snapname));
// add child to it, verify it showed up
@ -332,23 +332,31 @@ TEST_F(TestClsRbd, create)
string object_prefix = oid;
ASSERT_EQ(0, create_image(&ioctx, oid, size, order,
features, object_prefix));
features, object_prefix, -1));
ASSERT_EQ(-EEXIST, create_image(&ioctx, oid, size, order,
features, object_prefix));
features, object_prefix, -1));
ASSERT_EQ(0, ioctx.remove(oid));
ASSERT_EQ(-EINVAL, create_image(&ioctx, oid, size, order,
features, ""));
features, "", -1));
ASSERT_EQ(-ENOENT, ioctx.remove(oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, order,
features, object_prefix));
features, object_prefix, -1));
ASSERT_EQ(0, ioctx.remove(oid));
ASSERT_EQ(-ENOSYS, create_image(&ioctx, oid, size, order,
-1, object_prefix));
-1, object_prefix, -1));
ASSERT_EQ(-ENOENT, ioctx.remove(oid));
ASSERT_EQ(0, create_image(&ioctx, oid, size, order, RBD_FEATURE_DATA_POOL,
object_prefix, 123));
ASSERT_EQ(0, ioctx.remove(oid));
ASSERT_EQ(-EINVAL, create_image(&ioctx, oid, size, order,
RBD_FEATURE_DATA_POOL, object_prefix, -1));
ASSERT_EQ(-EINVAL, create_image(&ioctx, oid, size, order, 0, object_prefix,
123));
bufferlist inbl, outbl;
ASSERT_EQ(-EINVAL, ioctx.exec(oid, "rbd", "create", inbl, outbl));
@ -365,7 +373,7 @@ TEST_F(TestClsRbd, get_features)
uint64_t features;
ASSERT_EQ(-ENOENT, get_features(&ioctx, oid, CEPH_NOSNAP, &features));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid, -1));
ASSERT_EQ(0, get_features(&ioctx, oid, CEPH_NOSNAP, &features));
ASSERT_EQ(0u, features);
@ -390,13 +398,32 @@ TEST_F(TestClsRbd, get_object_prefix)
string object_prefix;
ASSERT_EQ(-ENOENT, get_object_prefix(&ioctx, oid, &object_prefix));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid, -1));
ASSERT_EQ(0, get_object_prefix(&ioctx, oid, &object_prefix));
ASSERT_EQ(oid, object_prefix);
ioctx.close();
}
TEST_F(TestClsRbd, get_data_pool)
{
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
string oid = get_temp_image_name();
int64_t data_pool_id;
ASSERT_EQ(0, ioctx.create(oid, true));
ASSERT_EQ(0, get_data_pool(&ioctx, oid, &data_pool_id));
ASSERT_EQ(-1, data_pool_id);
ASSERT_EQ(0, ioctx.remove(oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, RBD_FEATURE_DATA_POOL, oid,
12));
ASSERT_EQ(0, get_data_pool(&ioctx, oid, &data_pool_id));
ASSERT_EQ(12, data_pool_id);
}
TEST_F(TestClsRbd, get_size)
{
librados::IoCtx ioctx;
@ -407,13 +434,13 @@ TEST_F(TestClsRbd, get_size)
uint8_t order;
ASSERT_EQ(-ENOENT, get_size(&ioctx, oid, CEPH_NOSNAP, &size, &order));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid, -1));
ASSERT_EQ(0, get_size(&ioctx, oid, CEPH_NOSNAP, &size, &order));
ASSERT_EQ(0u, size);
ASSERT_EQ(22, order);
ASSERT_EQ(0, ioctx.remove(oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 2 << 22, 0, 0, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 2 << 22, 0, 0, oid, -1));
ASSERT_EQ(0, get_size(&ioctx, oid, CEPH_NOSNAP, &size, &order));
ASSERT_EQ(2u << 22, size);
ASSERT_EQ(0, order);
@ -433,7 +460,7 @@ TEST_F(TestClsRbd, set_size)
uint64_t size;
uint8_t order;
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid, -1));
ASSERT_EQ(0, get_size(&ioctx, oid, CEPH_NOSNAP, &size, &order));
ASSERT_EQ(0u, size);
ASSERT_EQ(22, order);
@ -464,8 +491,8 @@ TEST_F(TestClsRbd, protection_status)
ASSERT_EQ(-ENOENT, set_protection_status(&ioctx, oid,
CEPH_NOSNAP, status));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, RBD_FEATURE_LAYERING, oid));
ASSERT_EQ(0, create_image(&ioctx, oid2, 0, 22, 0, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, RBD_FEATURE_LAYERING, oid, -1));
ASSERT_EQ(0, create_image(&ioctx, oid2, 0, 22, 0, oid, -1));
ASSERT_EQ(-EINVAL, get_protection_status(&ioctx, oid2,
CEPH_NOSNAP, &status));
ASSERT_EQ(-ENOEXEC, set_protection_status(&ioctx, oid2,
@ -528,10 +555,10 @@ TEST_F(TestClsRbd, snapshot_limits)
librados::ObjectWriteOperation op;
string oid = get_temp_image_name();
uint64_t limit;
ASSERT_EQ(-ENOENT, snapshot_get_limit(&ioctx, oid, &limit));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, RBD_FEATURE_LAYERING, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, RBD_FEATURE_LAYERING, oid, -1));
snapshot_set_limit(&op, 2);
@ -562,7 +589,7 @@ TEST_F(TestClsRbd, parents)
ASSERT_EQ(-ENOENT, get_parent(&ioctx, "doesnotexist", CEPH_NOSNAP, &pspec, &size));
// old image should fail
ASSERT_EQ(0, create_image(&ioctx, "old", 33<<20, 22, 0, "old_blk."));
ASSERT_EQ(0, create_image(&ioctx, "old", 33<<20, 22, 0, "old_blk.", -1));
// get nonexistent parent: succeed, return (-1, "", CEPH_NOSNAP), overlap 0
ASSERT_EQ(0, get_parent(&ioctx, "old", CEPH_NOSNAP, &pspec, &size));
ASSERT_EQ(pspec.pool_id, -1);
@ -574,7 +601,8 @@ TEST_F(TestClsRbd, parents)
ASSERT_EQ(-ENOEXEC, remove_parent(&ioctx, "old"));
// new image will work
ASSERT_EQ(0, create_image(&ioctx, oid, 33<<20, 22, RBD_FEATURE_LAYERING, "foo."));
ASSERT_EQ(0, create_image(&ioctx, oid, 33<<20, 22, RBD_FEATURE_LAYERING,
"foo.", -1));
ASSERT_EQ(0, get_parent(&ioctx, oid, CEPH_NOSNAP, &pspec, &size));
ASSERT_EQ(-1, pspec.pool_id);
@ -707,7 +735,7 @@ TEST_F(TestClsRbd, parents)
ASSERT_EQ(0, ioctx.remove(oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 33<<20, 22,
RBD_FEATURE_LAYERING | RBD_FEATURE_DEEP_FLATTEN,
"foo."));
"foo.", -1));
ASSERT_EQ(0, set_parent(&ioctx, oid, parent_spec(1, "parent", 3), 100<<20));
ASSERT_EQ(0, snapshot_add(&ioctx, oid, 1, "snap1"));
ASSERT_EQ(0, snapshot_add(&ioctx, oid, 2, "snap2"));
@ -731,7 +759,7 @@ TEST_F(TestClsRbd, snapshots)
string oid = get_temp_image_name();
ASSERT_EQ(-ENOENT, snapshot_add(&ioctx, oid, 0, "snap1"));
ASSERT_EQ(0, create_image(&ioctx, oid, 10, 22, 0, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 10, 22, 0, oid, -1));
vector<string> snap_names;
vector<uint64_t> snap_sizes;
@ -908,13 +936,14 @@ TEST_F(TestClsRbd, stripingv2)
string oid = get_temp_image_name();
string oid2 = get_temp_image_name();
ASSERT_EQ(0, create_image(&ioctx, oid, 10, 22, 0, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 10, 22, 0, oid, -1));
uint64_t su = 65536, sc = 12;
ASSERT_EQ(-ENOEXEC, get_stripe_unit_count(&ioctx, oid, &su, &sc));
ASSERT_EQ(-ENOEXEC, set_stripe_unit_count(&ioctx, oid, su, sc));
ASSERT_EQ(0, create_image(&ioctx, oid2, 10, 22, RBD_FEATURE_STRIPINGV2, oid2));
ASSERT_EQ(0, create_image(&ioctx, oid2, 10, 22, RBD_FEATURE_STRIPINGV2,
oid2, -1));
ASSERT_EQ(0, get_stripe_unit_count(&ioctx, oid2, &su, &sc));
ASSERT_EQ(1ull << 22, su);
ASSERT_EQ(1ull, sc);
@ -945,7 +974,7 @@ TEST_F(TestClsRbd, get_mutable_metadata_features)
string oid = get_temp_image_name();
ASSERT_EQ(0, create_image(&ioctx, oid, 10, 22, RBD_FEATURE_EXCLUSIVE_LOCK,
oid));
oid, -1));
uint64_t size, features, incompatible_features;
std::map<rados::cls::lock::locker_id_t,
@ -1193,7 +1222,7 @@ TEST_F(TestClsRbd, flags)
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
string oid = get_temp_image_name();
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid, -1));
uint64_t flags;
std::vector<snapid_t> snap_ids;
@ -1229,7 +1258,7 @@ TEST_F(TestClsRbd, metadata)
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
string oid = get_temp_image_name();
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, 0, oid, -1));
map<string, bufferlist> pairs;
string value;
@ -1294,7 +1323,7 @@ TEST_F(TestClsRbd, set_features)
string oid = get_temp_image_name();
uint64_t base_features = RBD_FEATURE_LAYERING | RBD_FEATURE_DEEP_FLATTEN;
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, base_features, oid));
ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, base_features, oid, -1));
uint64_t features = RBD_FEATURES_MUTABLE;
uint64_t mask = RBD_FEATURES_MUTABLE;
@ -1952,7 +1981,7 @@ TEST_F(TestClsRbd, image_add_group) {
string image_id = "imageid";
ASSERT_EQ(0, create_image(&ioctx, image_id, 2<<20, 0,
RBD_FEATURE_LAYERING, image_id));
RBD_FEATURE_LAYERING, image_id, -1));
string group_id = "group_id";
@ -1978,7 +2007,7 @@ TEST_F(TestClsRbd, image_remove_group) {
string image_id = "image_id";
ASSERT_EQ(0, create_image(&ioctx, image_id, 2<<20, 0,
RBD_FEATURE_LAYERING, image_id));
RBD_FEATURE_LAYERING, image_id, -1));
string group_id = "group_id";
@ -2002,7 +2031,7 @@ TEST_F(TestClsRbd, image_get_group) {
string image_id = "imageidgroupspec";
ASSERT_EQ(0, create_image(&ioctx, image_id, 2<<20, 0,
RBD_FEATURE_LAYERING, image_id));
RBD_FEATURE_LAYERING, image_id, -1));
string group_id = "group_id_get_group_spec";