Merge branch 'wip_omap'

Reviewed-by: Sage Weil <sage.weil@dreamhost.com>
This commit is contained in:
Samuel Just 2012-03-06 11:46:24 -08:00
commit 98f8219dd3
15 changed files with 1250 additions and 47 deletions

View File

@ -229,6 +229,13 @@ OPTION(max_mds, OPT_INT, 1)
OPTION(mds_standby_for_name, OPT_STR, "")
OPTION(mds_standby_for_rank, OPT_INT, -1)
OPTION(mds_standby_replay, OPT_BOOL, false)
// If true, uses tmap as initial value for omap on old objects
OPTION(osd_auto_upgrade_tmap, OPT_BOOL, false)
// If true, TMAPPUT sets uses_tmap DEBUGGING ONLY
OPTION(osd_tmapput_sets_uses_tmap, OPT_BOOL, false)
OPTION(osd_data, OPT_STR, "")
OPTION(osd_journal, OPT_STR, "")
OPTION(osd_journal_size, OPT_INT, 0) // in mb

View File

@ -79,6 +79,14 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_PGLS: return "pgls";
case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
case CEPH_OSD_OP_OMAPGETVALSBYKEY: return "omap-get-vals-by-key";
case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals";
case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header";
case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear";
case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys";
}
return "???";
}

View File

@ -197,6 +197,17 @@ enum {
CEPH_OSD_OP_WATCH = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 15,
/* omap */
CEPH_OSD_OP_OMAPGETKEYS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 17,
CEPH_OSD_OP_OMAPGETVALS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 18,
CEPH_OSD_OP_OMAPGETHEADER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 19,
CEPH_OSD_OP_OMAPGETVALSBYKEY =
CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 20,
CEPH_OSD_OP_OMAPSETVALS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 21,
CEPH_OSD_OP_OMAPSETHEADER = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 22,
CEPH_OSD_OP_OMAPCLEAR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 23,
CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24,
/** multi **/
CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1,
CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2,
@ -290,7 +301,6 @@ static inline int ceph_osd_op_mode_modify(int op)
extern const char *ceph_osd_op_name(int op);
/*
* osd op flags
*

View File

@ -5,6 +5,7 @@
#include <string>
#include <list>
#include <map>
#include <set>
#include <tr1/memory>
#include <vector>
#include <utility>
@ -168,10 +169,37 @@ namespace librados
void rmxattr(const char *name);
void setxattr(const char *name, const bufferlist& bl);
void tmap_update(const bufferlist& cmdbl);
void tmap_put(const bufferlist& bl);
void clone_range(uint64_t dst_off,
const std::string& src_oid, uint64_t src_off,
size_t len);
/**
* set keys and values according to map
*
* @param map [in] keys and values to set
*/
void omap_set(const std::map<std::string, bufferlist> &map);
/**
* set header
*
* @param bl [in] header to set
*/
void omap_set_header(const bufferlist &bl);
/**
* Clears omap contents
*/
void omap_clear();
/**
* Clears keys in to_rm
*
* @param to_rm [in] keys to remove
*/
void omap_rm_keys(const std::set<std::string> &to_rm);
friend class IoCtx;
};
@ -191,6 +219,57 @@ namespace librados
void getxattrs(std::map<std::string, bufferlist> *pattrs, int *prval);
void read(size_t off, uint64_t len, bufferlist *pbl, int *prval);
void tmap_get(bufferlist *pbl, int *prval);
/**
* omap_get_vals: keys and values from the object omap
*
* Get up to max_return keys and values beginning after start_after
*
* @param start_after [in] list no keys smaller than start_after
* @parem max_return [in] list no more than max_return key/value pairs
* @param out_vals [out] place returned values in out_vals on completion
* @param prval [out] place error code in prval upon completion
*/
void omap_get_vals(
const std::string &start_after,
uint64_t max_return,
std::map<std::string, bufferlist> *out_vals,
int *prval);
/**
* omap_get_keys: keys from the object omap
*
* Get up to max_return keys beginning after start_after
*
* @param start_after [in] list no keys smaller than start_after
* @parem max_return [in] list no more than max_return keys
* @param out_keys [out] place returned values in out_keys on completion
* @param prval [out] place error code in prval upon completion
*/
void omap_get_keys(const std::string &start_after,
uint64_t max_return,
std::set<std::string> *out_keys,
int *prval);
/**
* omap_get_header: get header from object omap
*
* @param header [out] place header here upon completion
* @param prval [out] place error code in prval upon completion
*/
void omap_get_header(bufferlist *header, int *prval);
/**
* get key/value paris for specified keys
*
* @param to_get [in] keys to get
* @param out_vals [out] place key/value pairs found here on completion
* @param prval [out] place error code in prval upon completion
*/
void omap_get_vals_by_key(const std::set<std::string> &keys,
std::map<std::string, bufferlist> *map,
int *prval);
};

View File

@ -212,6 +212,41 @@ void librados::ObjectReadOperation::getxattr(const char *name, bufferlist *pbl,
o->getxattr(name, pbl, prval);
}
void librados::ObjectReadOperation::omap_get_vals(
const std::string &start_after,
uint64_t max_return,
std::map<std::string, bufferlist> *out_vals,
int *prval)
{
::ObjectOperation *o = (::ObjectOperation *)impl;
o->omap_get_vals(start_after, max_return, out_vals, prval);
}
void librados::ObjectReadOperation::omap_get_keys(
const std::string &start_after,
uint64_t max_return,
std::set<std::string> *out_keys,
int *prval)
{
::ObjectOperation *o = (::ObjectOperation *)impl;
o->omap_get_keys(start_after, max_return, out_keys, prval);
}
void librados::ObjectReadOperation::omap_get_header(bufferlist *bl, int *prval)
{
::ObjectOperation *o = (::ObjectOperation *)impl;
o->omap_get_header(bl, prval);
}
void librados::ObjectReadOperation::omap_get_vals_by_key(
const std::set<std::string> &keys,
std::map<std::string, bufferlist> *map,
int *prval)
{
::ObjectOperation *o = (::ObjectOperation *)impl;
o->omap_get_vals_by_key(keys, map, prval);
}
void librados::ObjectReadOperation::getxattrs(map<string, bufferlist> *pattrs, int *prval)
{
::ObjectOperation *o = (::ObjectOperation *)impl;
@ -281,6 +316,13 @@ void librados::ObjectWriteOperation::setxattr(const char *name, const bufferlist
o->setxattr(name, v);
}
void librados::ObjectWriteOperation::tmap_put(const bufferlist &bl)
{
::ObjectOperation *o = (::ObjectOperation *)impl;
bufferlist c = bl;
o->tmap_put(c);
}
void librados::ObjectWriteOperation::tmap_update(const bufferlist& cmdbl)
{
::ObjectOperation *o = (::ObjectOperation *)impl;
@ -296,6 +338,33 @@ void librados::ObjectWriteOperation::clone_range(uint64_t dst_off,
o->clone_range(src_oid, src_off, len, dst_off);
}
void librados::ObjectWriteOperation::omap_set(
const map<string, bufferlist> &map)
{
::ObjectOperation *o = (::ObjectOperation *)impl;
o->omap_set(map);
}
void librados::ObjectWriteOperation::omap_set_header(const bufferlist &bl)
{
bufferlist c = bl;
::ObjectOperation *o = (::ObjectOperation *)impl;
o->omap_set_header(c);
}
void librados::ObjectWriteOperation::omap_clear()
{
::ObjectOperation *o = (::ObjectOperation *)impl;
o->omap_clear();
}
void librados::ObjectWriteOperation::omap_rm_keys(
const std::set<std::string> &to_rm)
{
::ObjectOperation *o = (::ObjectOperation *)impl;
o->omap_rm_keys(to_rm);
}
librados::WatchCtx::
~WatchCtx()
{

View File

@ -25,7 +25,7 @@
class MOSDSubOp : public Message {
static const int HEAD_VERSION = 5;
static const int HEAD_VERSION = 6;
static const int COMPAT_VERSION = 1;
public:
@ -81,6 +81,7 @@ public:
ObjectRecoveryProgress current_progress;
map<string,bufferlist> omap_entries;
bufferlist omap_header;
virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
@ -130,6 +131,8 @@ public:
}
if (header.version >= 5)
::decode(omap_entries, p);
if (header.version >= 6)
::decode(omap_header, p);
}
virtual void encode_payload(uint64_t features) {
@ -173,6 +176,7 @@ public:
::encode(recovery_progress, payload);
::encode(current_progress, payload);
::encode(omap_entries, payload);
::encode(omap_header, payload);
}
MOSDSubOp()

View File

@ -2229,6 +2229,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
assert(bp.end());
}
if (g_conf->osd_tmapput_sets_uses_tmap) {
assert(g_conf->osd_auto_upgrade_tmap);
oi.uses_tmap = true;
}
// write it
vector<OSDOp> nops(1);
OSDOp& newop = nops[0];
@ -2402,7 +2407,206 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
break;
// OMAP Read ops
case CEPH_OSD_OP_OMAPGETKEYS:
{
string start_after;
uint64_t max_return;
::decode(start_after, bp);
::decode(max_return, bp);
set<string> out_set;
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
dout(20) << "CEPH_OSD_OP_OMAPGETKEYS: "
<< " Reading " << oi.soid << " omap from tmap" << dendl;
map<string, bufferlist> vals;
bufferlist header;
int r = _get_tmap(ctx, &vals, &header);
if (r == 0) {
map<string, bufferlist>::iterator iter =
vals.upper_bound(start_after);
for (uint64_t i = 0;
i < max_return && iter != vals.end();
++i, iter++) {
out_set.insert(iter->first);
}
::encode(out_set, osd_op.outdata);
break;
}
dout(10) << "failed, reading from omap" << dendl;
// No valid tmap, use omap
}
{
ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(
coll, soid
);
assert(iter);
iter->upper_bound(start_after);
for (uint64_t i = 0;
i < max_return && iter->valid();
++i, iter->next()) {
out_set.insert(iter->key());
}
}
::encode(out_set, osd_op.outdata);
}
break;
case CEPH_OSD_OP_OMAPGETVALS:
{
string start_after;
uint64_t max_return;
::decode(start_after, bp);
::decode(max_return, bp);
map<string, bufferlist> out_set;
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
dout(20) << "CEPH_OSD_OP_OMAPGETVALS: "
<< " Reading " << oi.soid << " omap from tmap" << dendl;
map<string, bufferlist> vals;
bufferlist header;
int r = _get_tmap(ctx, &vals, &header);
if (r == 0) {
map<string, bufferlist>::iterator iter = vals.upper_bound(start_after);
for (uint64_t i = 0;
i < max_return && iter != vals.end();
++i, iter++) {
out_set.insert(*iter);
}
::encode(out_set, osd_op.outdata);
break;
}
// No valid tmap, use omap
dout(10) << "failed, reading from omap" << dendl;
}
{
ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(
coll, soid
);
assert(iter);
iter->upper_bound(start_after);
for (uint64_t i = 0;
i < max_return && iter->valid();
++i, iter->next()) {
dout(20) << "Found key " << iter->key() << dendl;
out_set.insert(make_pair(iter->key(), iter->value()));
}
}
::encode(out_set, osd_op.outdata);
}
break;
case CEPH_OSD_OP_OMAPGETHEADER:
{
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
dout(20) << "CEPH_OSD_OP_OMAPGETHEADER: "
<< " Reading " << oi.soid << " omap from tmap" << dendl;
map<string, bufferlist> vals;
bufferlist header;
int r = _get_tmap(ctx, &vals, &header);
if (r == 0) {
osd_op.outdata.claim(header);
break;
}
// No valid tmap, fall through to omap
dout(10) << "failed, reading from omap" << dendl;
}
osd->store->omap_get_header(coll, soid, &osd_op.outdata);
}
break;
case CEPH_OSD_OP_OMAPGETVALSBYKEY:
{
set<string> keys_to_get;
::decode(keys_to_get, bp);
map<string, bufferlist> out;
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
dout(20) << "CEPH_OSD_OP_OMAPGET: "
<< " Reading " << oi.soid << " omap from tmap" << dendl;
map<string, bufferlist> vals;
bufferlist header;
int r = _get_tmap(ctx, &vals, &header);
if (r == 0) {
for (set<string>::iterator iter = keys_to_get.begin();
iter != keys_to_get.end();
iter++) {
if (vals.count(*iter)) {
out.insert(*(vals.find(*iter)));
}
}
::encode(out, osd_op.outdata);
break;
}
// No valid tmap, use omap
dout(10) << "failed, reading from omap" << dendl;
}
osd->store->omap_get_values(coll, soid, keys_to_get, &out);
::encode(out, osd_op.outdata);
}
break;
// OMAP Write ops
case CEPH_OSD_OP_OMAPSETVALS:
{
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
_copy_up_tmap(ctx);
}
if (!obs.exists) {
ctx->delta_stats.num_objects++;
obs.exists = true;
}
t.touch(coll, soid);
map<string, bufferlist> to_set;
::decode(to_set, bp);
dout(20) << "setting vals: " << dendl;
for (map<string, bufferlist>::iterator i = to_set.begin();
i != to_set.end();
++i) {
dout(20) << "\t" << i->first << dendl;
}
t.omap_setkeys(coll, soid, to_set);
}
break;
case CEPH_OSD_OP_OMAPSETHEADER:
{
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
_copy_up_tmap(ctx);
}
if (!obs.exists) {
ctx->delta_stats.num_objects++;
obs.exists = true;
}
t.touch(coll, soid);
t.omap_setheader(coll, soid, osd_op.indata);
}
break;
case CEPH_OSD_OP_OMAPCLEAR:
{
if (!obs.exists) {
result = -ENOENT;
break;
}
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
_copy_up_tmap(ctx);
}
t.touch(coll, soid);
t.omap_clear(coll, soid);
}
break;
case CEPH_OSD_OP_OMAPRMKEYS:
{
if (!obs.exists) {
result = -ENOENT;
break;
}
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
_copy_up_tmap(ctx);
}
t.touch(coll, soid);
set<string> to_rm;
::decode(to_rm, bp);
t.omap_rmkeys(coll, soid, to_rm);
}
break;
default:
dout(1) << "unrecognized osd op " << op.op
<< " " << ceph_osd_op_name(op.op)
@ -2421,6 +2625,44 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
return result;
}
int ReplicatedPG::_get_tmap(OpContext *ctx,
map<string, bufferlist> *out,
bufferlist *header)
{
vector<OSDOp> nops(1);
OSDOp &newop = nops[0];
newop.op.op = CEPH_OSD_OP_TMAPGET;
do_osd_ops(ctx, nops);
try {
bufferlist::iterator i = newop.outdata.begin();
::decode(*header, i);
::decode(*out, i);
} catch (...) {
dout(20) << "unsuccessful at decoding tmap for " << ctx->new_obs.oi.soid
<< dendl;
return -EINVAL;
}
dout(20) << "successful at decoding tmap for " << ctx->new_obs.oi.soid
<< dendl;
return 0;
}
int ReplicatedPG::_copy_up_tmap(OpContext *ctx)
{
dout(20) << "copying up tmap for " << ctx->new_obs.oi.soid << dendl;
ctx->new_obs.oi.uses_tmap = false;
map<string, bufferlist> vals;
bufferlist header;
int r = _get_tmap(ctx, &vals, &header);
if (r < 0)
return 0;
ctx->op_t.omap_setkeys(coll, ctx->new_obs.oi.soid,
vals);
ctx->op_t.omap_setheader(coll, ctx->new_obs.oi.soid,
header);
return 0;
}
inline int ReplicatedPG::_delete_head(OpContext *ctx)
{
SnapSet& snapset = ctx->new_snapset;
@ -4352,6 +4594,7 @@ void ReplicatedPG::submit_push_data(
bool first,
const interval_set<uint64_t> &intervals_included,
bufferlist data_included,
bufferlist omap_header,
map<string, bufferptr> &attrs,
map<string, bufferlist> &omap_entries,
ObjectStore::Transaction *t)
@ -4359,6 +4602,7 @@ void ReplicatedPG::submit_push_data(
if (first) {
t->remove(coll_t::TEMP_COLL, recovery_info.soid);
t->touch(coll_t::TEMP_COLL, recovery_info.soid);
t->omap_setheader(coll_t::TEMP_COLL, recovery_info.soid, omap_header);
}
uint64_t off = 0;
for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
@ -4529,7 +4773,9 @@ void ReplicatedPG::handle_pull_response(OpRequest *op)
Context *onreadable = 0;
Context *onreadable_sync = 0;
submit_push_data(pi.recovery_info, first,
data_included, data, m->attrset,
data_included, data,
m->omap_header,
m->attrset,
m->omap_entries,
t);
@ -4600,6 +4846,7 @@ void ReplicatedPG::handle_push(OpRequest *op)
first,
m->data_included,
data,
m->omap_header,
m->attrset,
m->omap_entries,
t);
@ -4647,6 +4894,7 @@ int ReplicatedPG::send_push(int peer,
subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
if (progress.first) {
osd->store->omap_get_header(coll, recovery_info.soid, &subop->omap_header);
osd->store->getattrs(coll, recovery_info.soid, subop->attrset);
// Debug

View File

@ -580,6 +580,7 @@ protected:
bool first,
const interval_set<uint64_t> &intervals_included,
bufferlist data_included,
bufferlist omap_header,
map<string, bufferptr> &attrs,
map<string, bufferlist> &omap_entries,
ObjectStore::Transaction *t);
@ -870,6 +871,9 @@ private:
boost::statechart::result react(const SnapTrim&);
};
int _get_tmap(OpContext *ctx, map<string, bufferlist> *out,
bufferlist *header);
int _copy_up_tmap(OpContext *ctx);
int _delete_head(OpContext *ctx);
int _rollback_to(OpContext *ctx, ceph_osd_op& op);
public:

View File

@ -1931,6 +1931,7 @@ void object_info_t::copy_user_bits(const object_info_t& other)
truncate_size = other.truncate_size;
lost = other.lost;
category = other.category;
uses_tmap = other.uses_tmap;
}
ps_t object_info_t::legacy_object_locator_to_ps(const object_t &oid,
@ -1952,7 +1953,7 @@ ps_t object_info_t::legacy_object_locator_to_ps(const object_t &oid,
void object_info_t::encode(bufferlist& bl) const
{
ENCODE_START(8, 8, bl);
ENCODE_START(9, 8, bl);
::encode(soid, bl);
::encode(oloc, bl);
::encode(category, bl);
@ -1970,12 +1971,13 @@ void object_info_t::encode(bufferlist& bl) const
::encode(lost, bl);
::encode(watchers, bl);
::encode(user_version, bl);
::encode(uses_tmap, bl);
ENCODE_FINISH(bl);
}
void object_info_t::decode(bufferlist::iterator& bl)
{
DECODE_START_LEGACY_COMPAT_LEN(8, 8, 8, bl);
DECODE_START_LEGACY_COMPAT_LEN(9, 8, 8, bl);
if (struct_v >= 2 && struct_v <= 5) {
sobject_t obj;
::decode(obj, bl);
@ -2012,6 +2014,10 @@ void object_info_t::decode(bufferlist::iterator& bl)
::decode(watchers, bl);
::decode(user_version, bl);
}
if (struct_v >= 9)
::decode(uses_tmap, bl);
else
uses_tmap = true;
DECODE_FINISH(bl);
}

View File

@ -1582,6 +1582,7 @@ struct object_info_t {
map<entity_name_t, watch_info_t> watchers;
bool uses_tmap;
void copy_user_bits(const object_info_t& other);
@ -1599,12 +1600,12 @@ struct object_info_t {
explicit object_info_t()
: size(0), lost(false),
truncate_seq(0), truncate_size(0)
truncate_seq(0), truncate_size(0), uses_tmap(false)
{}
object_info_t(const hobject_t& s, const object_locator_t& o)
: soid(s), oloc(o), size(0),
lost(false), truncate_seq(0), truncate_size(0) {}
lost(false), truncate_seq(0), truncate_size(0), uses_tmap(false) {}
object_info_t(bufferlist& bl) {
decode(bl);

View File

@ -267,11 +267,31 @@ struct ObjectOperation {
out_bl[p] = pbl;
out_rval[p] = prval;
}
struct C_ObjectOperation_getxattrs : public Context {
struct C_ObjectOperation_decodevals : public Context {
bufferlist bl;
std::map<std::string,bufferlist> *pattrs;
int *prval;
C_ObjectOperation_getxattrs(std::map<std::string,bufferlist> *pa, int *pr)
C_ObjectOperation_decodevals(std::map<std::string,bufferlist> *pa, int *pr)
: pattrs(pa), prval(pr) {}
void finish(int r) {
if (r >= 0) {
bufferlist::iterator p = bl.begin();
try {
if (pattrs)
::decode(*pattrs, p);
}
catch (buffer::error& e) {
if (prval)
*prval = -EIO;
}
}
}
};
struct C_ObjectOperation_decodekeys : public Context {
bufferlist bl;
std::set<std::string> *pattrs;
int *prval;
C_ObjectOperation_decodekeys(std::set<std::string> *pa, int *pr)
: pattrs(pa), prval(pr) {}
void finish(int r) {
if (r >= 0) {
@ -291,7 +311,7 @@ struct ObjectOperation {
add_op(CEPH_OSD_OP_GETXATTRS);
if (pattrs || prval) {
unsigned p = ops.size() - 1;
C_ObjectOperation_getxattrs *h = new C_ObjectOperation_getxattrs(pattrs, prval);
C_ObjectOperation_decodevals *h = new C_ObjectOperation_decodevals(pattrs, prval);
out_handler[p] = h;
out_bl[p] = &h->bl;
out_rval[p] = prval;
@ -340,6 +360,95 @@ struct ObjectOperation {
add_op(CEPH_OSD_OP_TMAPGET);
}
// objectmap
void omap_get_keys(const string &start_after,
uint64_t max_to_get,
std::set<std::string> *out_set,
int *prval) {
OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
bufferlist bl;
::encode(start_after, bl);
::encode(max_to_get, bl);
op.op.extent.offset = 0;
op.op.extent.length = bl.length();
op.indata.claim_append(bl);
if (prval || out_set) {
unsigned p = ops.size() - 1;
C_ObjectOperation_decodekeys *h =
new C_ObjectOperation_decodekeys(out_set, prval);
out_handler[p] = h;
out_bl[p] = &h->bl;
out_rval[p] = prval;
}
}
void omap_get_vals(const string &start_after,
uint64_t max_to_get,
std::map<std::string, bufferlist> *out_set,
int *prval) {
OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
bufferlist bl;
::encode(start_after, bl);
::encode(max_to_get, bl);
op.op.extent.offset = 0;
op.op.extent.length = bl.length();
op.indata.claim_append(bl);
if (prval || out_set) {
unsigned p = ops.size() - 1;
C_ObjectOperation_decodevals *h =
new C_ObjectOperation_decodevals(out_set, prval);
out_handler[p] = h;
out_bl[p] = &h->bl;
out_rval[p] = prval;
}
}
void omap_get_vals_by_key(const std::set<std::string> &to_get,
std::map<std::string, bufferlist> *out_set,
int *prval) {
OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEY);
bufferlist bl;
::encode(to_get, bl);
op.op.extent.offset = 0;
op.op.extent.length = bl.length();
op.indata.claim_append(bl);
if (prval || out_set) {
unsigned p = ops.size() - 1;
C_ObjectOperation_decodevals *h =
new C_ObjectOperation_decodevals(out_set, prval);
out_handler[p] = h;
out_bl[p] = &h->bl;
out_rval[p] = prval;
}
}
void omap_get_header(bufferlist *bl, int *prval) {
add_op(CEPH_OSD_OP_OMAPGETHEADER);
unsigned p = ops.size() - 1;
out_bl[p] = bl;
out_rval[p] = prval;
}
void omap_set(const map<string, bufferlist> &map) {
bufferlist bl;
::encode(map, bl);
add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
}
void omap_set_header(bufferlist &bl) {
add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
}
void omap_clear() {
add_op(CEPH_OSD_OP_OMAPCLEAR);
}
void omap_rm_keys(const std::set<std::string> &to_remove) {
bufferlist bl;
::encode(to_remove, bl);
add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
}
// object classes
void call(const char *cname, const char *method, bufferlist &indata) {
add_call(CEPH_OSD_OP_CALL, cname, method, indata);

View File

@ -95,6 +95,10 @@ public:
virtual uint64_t get_length(const ContDesc &in) = 0;
virtual uint64_t get_attr_length(const ContDesc &in) = 0;
virtual bufferlist gen_attribute(const ContDesc &in) = 0;
virtual void get_ranges(const ContDesc &in, interval_set<uint64_t> &ranges) = 0;
virtual iterator_impl *get_iterator_impl(const ContDesc &in) = 0;
@ -147,11 +151,7 @@ public:
virtual uint64_t get_pos() const { return pos; }
iterator_impl &operator++() {
assert(!end());
pos++;
if (end()) {
return *this;
}
if (header_pos.end()) {
current = rand();
} else {
@ -207,22 +207,42 @@ public:
return (rand() % length) + get_header_length(in);
}
bufferlist gen_attribute(const ContDesc &in) {
bufferlist header;
write_header(in, header);
ContentsGenerator::iterator iter = get_iterator(in);
for (uint64_t to_write = get_attr_length(in); to_write > 0;
--to_write) {
header.append(*iter);
++iter;
}
return header;
}
uint64_t get_attr_length(const ContDesc &in) {
RandWrap rand(in.seqnum);
return (rand() % attr_length) + get_header_length(in);
}
void write_header(const ContDesc &in, bufferlist &output);
bool read_header(bufferlist::iterator &p, ContDesc &out);
uint64_t length;
uint64_t attr_length;
uint64_t min_stride_size;
uint64_t max_stride_size;
VarLenGenerator(uint64_t length, uint64_t min_stride_size, uint64_t max_stride_size) :
length(length), min_stride_size(min_stride_size), max_stride_size(max_stride_size) {}
VarLenGenerator(uint64_t length, uint64_t min_stride_size, uint64_t max_stride_size, uint64_t attr_length = 2000) :
length(length), attr_length(attr_length),
min_stride_size(min_stride_size), max_stride_size(max_stride_size) {}
};
class ObjectDesc {
public:
ObjectDesc(ContentsGenerator *cont_gen) :
layers(), cont_gen(cont_gen) {};
exists(false), tmap(false), layers(), cont_gen(cont_gen) {};
ObjectDesc(const ContDesc &init, ContentsGenerator *cont_gen) :
layers(), cont_gen(cont_gen) {
exists(false), tmap(false), layers(), cont_gen(cont_gen) {
layers.push_front(init);
};
@ -249,7 +269,9 @@ public:
}
char operator*() {
if (cur_cont == obj.layers.end()) {
if (cur_cont == obj.layers.end() && pos < obj.tmap_contents.length()) {
return obj.tmap_contents[pos];
} else if (cur_cont == obj.layers.end()) {
return '\0';
} else {
map<ContDesc,ContentsGenerator::iterator>::iterator j = cont_iters.find(*cur_cont);
@ -259,7 +281,7 @@ public:
}
bool end() {
return pos == cont_gen->get_length(*obj.layers.begin());
return pos >= cont_gen->get_length(*obj.layers.begin());
}
void seek(uint64_t _pos) {
@ -277,12 +299,21 @@ public:
}
bool deleted() {
return !layers.size(); // No layers indicates missing object
return !exists;
}
bool has_contents() {
return layers.size();
}
void update(const ContDesc &next);
bool check(bufferlist &to_check);
const ContDesc &most_recent();
map<string, ContDesc> attrs; // Both omap and xattrs
bufferlist header;
bool exists;
bool tmap;
bufferlist tmap_contents;
private:
list<ContDesc> layers;
ContentsGenerator *cont_gen;

View File

@ -15,6 +15,7 @@
#include <time.h>
#include "Object.h"
#include "TestOpStat.h"
#include "inttypes.h"
#ifndef RADOSMODEL_H
#define RADOSMODEL_H
@ -42,7 +43,10 @@ enum TestOpType {
TEST_OP_DELETE,
TEST_OP_SNAP_CREATE,
TEST_OP_SNAP_REMOVE,
TEST_OP_ROLLBACK
TEST_OP_ROLLBACK,
TEST_OP_SETATTR,
TEST_OP_RMATTR,
TEST_OP_TMAPPUT
};
class TestOp {
@ -221,6 +225,87 @@ public:
wait_cond.Signal();
}
void rm_object_attrs(const string &oid, const set<string> &attrs)
{
ObjectDesc new_obj(&cont_gen);
for (map<int, map<string,ObjectDesc> >::reverse_iterator i =
pool_obj_cont.rbegin();
i != pool_obj_cont.rend();
++i) {
map<string,ObjectDesc>::iterator j = i->second.find(oid);
if (j != i->second.end()) {
new_obj = j->second;
break;
}
}
for (set<string>::const_iterator i = attrs.begin();
i != attrs.end();
++i) {
new_obj.attrs.erase(*i);
}
new_obj.tmap = false;
pool_obj_cont[current_snap].erase(oid);
pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
}
void update_object_header(const string &oid, const bufferlist &bl)
{
ObjectDesc new_obj(&cont_gen);
for (map<int, map<string,ObjectDesc> >::reverse_iterator i =
pool_obj_cont.rbegin();
i != pool_obj_cont.rend();
++i) {
map<string,ObjectDesc>::iterator j = i->second.find(oid);
if (j != i->second.end()) {
new_obj = j->second;
break;
}
}
new_obj.header = bl;
new_obj.tmap = false;
new_obj.exists = true;
pool_obj_cont[current_snap].erase(oid);
pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
}
void set_object_tmap(const string &oid, const map<string, ContDesc> &attrs,
bufferlist header,
bufferlist tmap_contents)
{
ObjectDesc new_obj(&cont_gen);
pool_obj_cont[current_snap].erase(oid);
new_obj.attrs = attrs;
new_obj.tmap_contents = tmap_contents;
new_obj.header = header;
new_obj.tmap = true;
new_obj.exists = true;
pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
}
void update_object_attrs(const string &oid, const map<string, ContDesc> &attrs)
{
ObjectDesc new_obj(&cont_gen);
for (map<int, map<string,ObjectDesc> >::reverse_iterator i =
pool_obj_cont.rbegin();
i != pool_obj_cont.rend();
++i) {
map<string,ObjectDesc>::iterator j = i->second.find(oid);
if (j != i->second.end()) {
new_obj = j->second;
break;
}
}
for (map<string, ContDesc>::const_iterator i = attrs.begin();
i != attrs.end();
++i) {
new_obj.attrs[i->first] = i->second;
}
new_obj.tmap = false;
new_obj.exists = true;
pool_obj_cont[current_snap].erase(oid);
pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
}
void update_object(const string &oid, const ContDesc &contents)
{
ObjectDesc new_obj(&cont_gen);
@ -234,6 +319,12 @@ public:
break;
}
}
new_obj.exists = true;
if (new_obj.tmap) {
new_obj.tmap = false;
new_obj.attrs.clear();
new_obj.header = bufferlist();
}
new_obj.update(contents);
pool_obj_cont[current_snap].erase(oid);
pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
@ -300,6 +391,274 @@ public:
void read_callback(librados::completion_t comp, void *arg);
void write_callback(librados::completion_t comp, void *arg);
class RemoveAttrsOp : public TestOp {
public:
string oid;
librados::ObjectWriteOperation op;
librados::AioCompletion *comp;
bool done;
RemoveAttrsOp(RadosTestContext *context,
const string &oid,
TestOpStat *stat) :
TestOp(context, stat), oid(oid), done(false)
{}
void _begin()
{
ContDesc cont;
set<string> to_remove;
{
Mutex::Locker l(context->state_lock);
ObjectDesc obj(&context->cont_gen);
if (!context->find_object(oid, &obj)) {
context->kick();
done = true;
return;
}
cont = ContDesc(context->seq_num, context->current_snap,
context->seq_num, "");
context->oid_in_use.insert(oid);
context->oid_not_in_use.erase(oid);
if (rand() % 30) {
ContentsGenerator::iterator iter = context->cont_gen.get_iterator(cont);
for (map<string, ContDesc>::iterator i = obj.attrs.begin();
i != obj.attrs.end();
++i, ++iter) {
if (!(*iter % 3)) {
//op.rmxattr(i->first.c_str());
to_remove.insert(i->first);
}
}
if (!to_remove.size()) {
context->kick();
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
done = true;
return;
}
op.omap_rm_keys(to_remove);
} else {
op.omap_clear();
for (map<string, ContDesc>::iterator i = obj.attrs.begin();
i != obj.attrs.end();
++i) {
to_remove.insert(i->first);
}
context->update_object_header(oid, bufferlist());
}
context->rm_object_attrs(oid, to_remove);
}
pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
new pair<TestOp*, TestOp::CallbackInfo*>(this,
new TestOp::CallbackInfo(0));
comp = context->rados.aio_create_completion((void*) cb_arg, &write_callback,
NULL);
context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
}
void _finish(CallbackInfo *info)
{
Mutex::Locker l(context->state_lock);
done = true;
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
context->kick();
}
bool finished()
{
return done;
}
string getType()
{
return "RemoveAttrsOp";
}
};
class TmapPutOp : public TestOp {
public:
string oid;
librados::ObjectWriteOperation op;
librados::AioCompletion *comp;
bool done;
TmapPutOp(RadosTestContext *context,
const string &oid,
TestOpStat *stat) :
TestOp(context, stat), oid(oid), done(false)
{}
void _begin()
{
ContDesc cont;
stringstream acc;
{
Mutex::Locker l(context->state_lock);
cont = ContDesc(context->seq_num, context->current_snap,
context->seq_num, "");
context->oid_in_use.insert(oid);
context->oid_not_in_use.erase(oid);
}
map<string, bufferlist> omap_contents;
map<string, ContDesc> omap;
bufferlist header;
ContentsGenerator::iterator keygen = context->cont_gen.get_iterator(cont);
while (!*keygen) ++keygen;
while (*keygen) {
header.append(*keygen);
++keygen;
}
for (int i = 0; i < 20; ++i) {
string key;
while (!*keygen) ++keygen;
while (*keygen && key.size() < 40) {
key.push_back((*keygen % 20) + 'a');
++keygen;
}
ContDesc val(cont);
val.seqnum += context->cont_gen.get_length(cont);
val.prefix = ("oid: " + oid);
omap[key] = val;
bufferlist val_buffer = context->cont_gen.gen_attribute(val);
omap_contents[key] = val_buffer;
}
bufferlist tmap_contents;
::encode(header, tmap_contents);
::encode(omap_contents, tmap_contents);
op.remove();
op.tmap_put(tmap_contents);
{
Mutex::Locker l(context->state_lock);
context->set_object_tmap(oid, omap, header,
tmap_contents);
}
pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
new pair<TestOp*, TestOp::CallbackInfo*>(this,
new TestOp::CallbackInfo(0));
comp = context->rados.aio_create_completion((void*) cb_arg, &write_callback,
NULL);
context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
}
void _finish(CallbackInfo *info)
{
Mutex::Locker l(context->state_lock);
int r;
if ((r = comp->get_return_value())) {
cerr << "err " << r << std::endl;
assert(0);
}
done = true;
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
context->kick();
}
bool finished()
{
return done;
}
string getType()
{
return "TmapPutOp";
}
};
class SetAttrsOp : public TestOp {
public:
string oid;
librados::ObjectWriteOperation op;
librados::AioCompletion *comp;
bool done;
SetAttrsOp(RadosTestContext *context,
const string &oid,
TestOpStat *stat) :
TestOp(context, stat), oid(oid), done(false)
{}
void _begin()
{
ContDesc cont;
stringstream acc;
{
Mutex::Locker l(context->state_lock);
cont = ContDesc(context->seq_num, context->current_snap,
context->seq_num, "");
context->oid_in_use.insert(oid);
context->oid_not_in_use.erase(oid);
}
map<string, bufferlist> omap_contents;
map<string, ContDesc> omap;
bufferlist header;
ContentsGenerator::iterator keygen = context->cont_gen.get_iterator(cont);
while (!*keygen) ++keygen;
while (*keygen) {
header.append(*keygen);
++keygen;
}
for (int i = 0; i < 20; ++i) {
string key;
while (!*keygen) ++keygen;
while (*keygen && key.size() < 40) {
key.push_back((*keygen % 20) + 'a');
++keygen;
}
ContDesc val(cont);
val.seqnum += context->cont_gen.get_length(cont);
val.prefix = ("oid: " + oid);
omap[key] = val;
bufferlist val_buffer = context->cont_gen.gen_attribute(val);
omap_contents[key] = val_buffer;
}
op.omap_set_header(header);
op.omap_set(omap_contents);
{
Mutex::Locker l(context->state_lock);
context->update_object_header(oid, header);
context->update_object_attrs(oid, omap);
}
pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
new pair<TestOp*, TestOp::CallbackInfo*>(this,
new TestOp::CallbackInfo(0));
comp = context->rados.aio_create_completion((void*) cb_arg, &write_callback,
NULL);
context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
}
void _finish(CallbackInfo *info)
{
Mutex::Locker l(context->state_lock);
int r;
if ((r = comp->get_return_value())) {
cerr << "err " << r << std::endl;
assert(0);
}
done = true;
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
context->kick();
}
bool finished()
{
return done;
}
string getType()
{
return "SetAttrsOp";
}
};
class WriteOp : public TestOp {
public:
string oid;
@ -328,9 +687,7 @@ public:
context->update_object(oid, cont);
context->oid_in_use.insert(oid);
if (context->oid_not_in_use.count(oid) != 0) {
context->oid_not_in_use.erase(oid);
}
context->oid_not_in_use.erase(oid);
context->seq_num++;
@ -451,9 +808,7 @@ public:
}
context->oid_in_use.insert(oid);
if (context->oid_not_in_use.count(oid) != 0) {
context->oid_not_in_use.erase(oid);
}
context->oid_not_in_use.erase(oid);
context->seq_num++;
context->remove_object(oid);
@ -483,6 +838,7 @@ public:
context->state_lock.Lock();
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
context->kick();
context->state_lock.Unlock();
}
@ -495,16 +851,30 @@ public:
class ReadOp : public TestOp {
public:
librados::AioCompletion *completion;
librados::ObjectReadOperation op;
string oid;
bufferlist result;
ObjectDesc old_value;
int snap;
bufferlist result;
int retval;
map<string, bufferlist> attrs;
int attrretval;
set<string> omap_requested_keys;
map<string, bufferlist> omap_returned_values;
set<string> omap_keys;
map<string, bufferlist> omap;
bufferlist header;
ReadOp(RadosTestContext *context,
const string &oid,
TestOpStat *stat = 0) :
TestOp(context, stat),
oid(oid),
old_value(&context->cont_gen)
old_value(&context->cont_gen),
retval(0),
attrretval(0)
{}
void _begin()
@ -526,9 +896,31 @@ public:
if (snap >= 0) {
context->io_ctx.snap_set_read(context->snaps[snap]);
}
context->io_ctx.aio_read(context->prefix+oid, completion,
&result,
old_value.deleted() ? 0 : context->cont_gen.get_length(old_value.most_recent()), 0);
if (!old_value.tmap) {
op.read(0,
!old_value.has_contents() ? 0 :
context->cont_gen.get_length(old_value.most_recent()),
&result,
&retval);
}
for (map<string, ContDesc>::iterator i = old_value.attrs.begin();
i != old_value.attrs.end();
++i) {
if (rand() % 2) {
string key = i->first;
if (rand() % 2)
key.push_back((rand() % 26) + 'a');
omap_requested_keys.insert(key);
}
}
op.omap_get_vals_by_key(omap_requested_keys, &omap_returned_values, 0);
op.omap_get_keys("", -1, &omap_keys, 0);
op.omap_get_vals("", -1, &omap, 0);
op.omap_get_header(&header, 0);
assert(!context->io_ctx.aio_operate(context->prefix+oid, completion, &op, 0));
if (snap >= 0) {
context->io_ctx.snap_set_read(0);
}
@ -540,6 +932,7 @@ public:
assert(!done);
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
assert(completion->is_complete());
if (int err = completion->get_return_value()) {
if (!(err == -ENOENT && old_value.deleted())) {
cerr << "Error: oid " << oid << " read returned error code "
@ -547,22 +940,72 @@ public:
}
} else {
assert(!old_value.deleted());
ContDesc to_check;
bufferlist::iterator p = result.begin();
if (!context->cont_gen.read_header(p, to_check)) {
cerr << "Unable to decode oid " << oid << " at snap " << context->current_snap << std::endl;
context->errors++;
if (old_value.has_contents()) {
ContDesc to_check;
bufferlist::iterator p = result.begin();
if (!context->cont_gen.read_header(p, to_check)) {
cerr << "Unable to decode oid " << oid << " at snap " << context->current_snap << std::endl;
context->errors++;
}
if (to_check != old_value.most_recent()) {
cerr << "Found incorrect object contents " << to_check
<< ", expected " << old_value.most_recent() << " oid " << oid << std::endl;
context->errors++;
}
if (!old_value.check(result)) {
cerr << "Object " << oid << " contents " << to_check << " corrupt" << std::endl;
context->errors++;
}
if (context->errors) assert(0);
}
if (to_check != old_value.most_recent()) {
cerr << "Found incorrect object contents " << to_check
<< ", expected " << old_value.most_recent() << " oid " << oid << std::endl;
context->errors++;
// Attributes
if (!(old_value.header == header)) {
cerr << "oid: " << oid << " header does not match, old size: "
<< old_value.header.length() << " new size " << header.length()
<< std::endl;
assert(old_value.header == header);
}
if (!old_value.check(result)) {
cerr << "Object " << oid << " contents " << to_check << " corrupt" << std::endl;
context->errors++;
if (omap.size() != old_value.attrs.size()) {
cerr << "oid: " << oid << " tmap.size() is " << omap.size()
<< " and old is " << old_value.attrs.size() << std::endl;
assert(omap.size() == old_value.attrs.size());
}
if (omap_keys.size() != old_value.attrs.size()) {
cerr << "oid: " << oid << " tmap.size() is " << omap_keys.size()
<< " and old is " << old_value.attrs.size() << std::endl;
assert(omap_keys.size() == old_value.attrs.size());
}
for (map<string, bufferlist>::iterator omap_iter = omap.begin();
omap_iter != omap.end();
++omap_iter) {
assert(old_value.attrs.count(omap_iter->first));
bufferlist bl = context->cont_gen.gen_attribute(
old_value.attrs[omap_iter->first]);
assert(bl.length() == omap_iter->second.length());
bufferlist::iterator k = bl.begin();
for(bufferlist::iterator l = omap_iter->second.begin();
!k.end() && !l.end();
++k, ++l) {
assert(*l == *k);
}
}
for (set<string>::iterator i = omap_requested_keys.begin();
i != omap_requested_keys.end();
++i) {
if (!omap_returned_values.count(*i))
assert(!old_value.attrs.count(*i));
if (!old_value.attrs.count(*i))
assert(!omap_returned_values.count(*i));
}
for (map<string, bufferlist>::iterator i = omap_returned_values.begin();
i != omap_returned_values.end();
++i) {
assert(omap_requested_keys.count(i->first));
assert(omap.count(i->first));
assert(old_value.attrs.count(i->first));
assert(i->second == omap[i->first]);
}
if (context->errors) assert(0);
}
context->kick();
done = true;
@ -594,7 +1037,21 @@ public:
context->state_lock.Lock();
context->add_snap(snap);
vector<uint64_t> snapset(context->snaps.size());
int j = 0;
for (map<int,uint64_t>::reverse_iterator i = context->snaps.rbegin();
i != context->snaps.rend();
++i, ++j) {
snapset[j] = i->second;
}
context->state_lock.Unlock();
int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset);
if (r) {
cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl;
assert(0);
}
}
string getType()
@ -621,6 +1078,20 @@ public:
context->state_lock.Unlock();
assert(!context->io_ctx.selfmanaged_snap_remove(snap));
vector<uint64_t> snapset(context->snaps.size());
int j = 0;
for (map<int,uint64_t>::reverse_iterator i = context->snaps.rbegin();
i != context->snaps.rend();
++i, ++j) {
snapset[j] = i->second;
}
int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset);
if (r) {
cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl;
assert(0);
}
}
string getType()
@ -646,6 +1117,7 @@ public:
{
context->state_lock.Lock();
context->oid_in_use.insert(oid);
context->oid_not_in_use.erase(oid);
context->roll_back(oid, roll_back_to);
uint64_t snap = context->snaps[roll_back_to];
@ -666,6 +1138,12 @@ public:
cerr << "r is " << r << std::endl;
assert(0);
}
{
Mutex::Locker l(context->state_lock);
context->oid_in_use.erase(oid);
context->oid_not_in_use.insert(oid);
}
}
string getType()

View File

@ -83,6 +83,8 @@ private:
TestOp *gen_op(RadosTestContext &context, TestOpType type)
{
string oid;
cout << "oids not in use " << context.oid_not_in_use.size() << std::endl;
assert(context.oid_not_in_use.size());
switch (type) {
case TEST_OP_READ:
oid = *(rand_choose(context.oid_not_in_use));
@ -121,10 +123,27 @@ private:
int snap = rand_choose(context.snaps)->first;
string oid = *(rand_choose(context.oid_not_in_use));
cout << "RollingBack " << oid << " to " << snap << std::endl;
m_nextop = new ReadOp(&context, oid, m_stats);
return new RollbackOp(&context, oid, snap);
}
case TEST_OP_SETATTR:
oid = *(rand_choose(context.oid_not_in_use));
cout << "Setting attrs on " << oid
<< " current snap is " << context.current_snap << std::endl;
return new SetAttrsOp(&context, oid, m_stats);
case TEST_OP_RMATTR:
oid = *(rand_choose(context.oid_not_in_use));
cout << "Removing attrs on " << oid
<< " current snap is " << context.current_snap << std::endl;
return new RemoveAttrsOp(&context, oid, m_stats);
case TEST_OP_TMAPPUT:
oid = *(rand_choose(context.oid_not_in_use));
cout << "Setting tmap on " << oid
<< " current snap is " << context.current_snap << std::endl;
return new TmapPutOp(&context, oid, m_stats);
default:
cerr << "Invalid op type " << type << std::endl;
assert(0);
@ -161,6 +180,9 @@ int main(int argc, char **argv)
{ TEST_OP_SNAP_CREATE, "snap_create" },
{ TEST_OP_SNAP_REMOVE, "snap_remove" },
{ TEST_OP_ROLLBACK, "rollback" },
{ TEST_OP_SETATTR, "setattr" },
{ TEST_OP_RMATTR, "rmattr" },
{ TEST_OP_TMAPPUT, "tmapput" },
{ TEST_OP_READ /* grr */, NULL },
};

View File

@ -7,6 +7,7 @@
#include <semaphore.h>
#include <sstream>
#include <string>
#include <boost/scoped_ptr.hpp>
using std::ostringstream;
using namespace librados;
@ -691,3 +692,129 @@ TEST(LibRadosAio, RoundTripWriteFullPP) {
delete my_completion2;
delete my_completion3;
}
using std::string;
using std::map;
using std::set;
TEST(LibRadosAio, OmapPP) {
Rados cluster;
std::string pool_name = get_temp_pool_name();
ASSERT_EQ("", create_one_pool_pp(pool_name, cluster));
IoCtx ioctx;
cluster.ioctx_create(pool_name.c_str(), ioctx);
string header_str = "baz";
bufferptr bp(header_str.c_str(), header_str.size() + 1);
bufferlist header_to_set;
header_to_set.push_back(bp);
map<string, bufferlist> to_set;
{
boost::scoped_ptr<AioCompletion> my_completion(cluster.aio_create_completion(0, 0, 0));
ObjectWriteOperation op;
string val = "bar";
to_set["foo"] = header_to_set;
to_set["foo2"] = header_to_set;
to_set["foo3"] = header_to_set;
op.omap_set(to_set);
op.omap_set_header(header_to_set);
ioctx.aio_operate("test_obj", my_completion.get(), &op);
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion->wait_for_complete());
}
}
{
boost::scoped_ptr<AioCompletion> my_completion(cluster.aio_create_completion(0, 0, 0));
ObjectReadOperation op;
set<string> set_got;
map<string, bufferlist> map_got;
set<string> to_get;
map<string, bufferlist> got3;
bufferlist header;
op.omap_get_keys("", 1, &set_got, 0);
op.omap_get_vals("foo", 1, &map_got, 0);
to_get.insert("foo");
to_get.insert("foo3");
op.omap_get_vals_by_key(to_get, &got3, 0);
op.omap_get_header(&header, 0);
ioctx.aio_operate("test_obj", my_completion.get(), &op, 0);
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion->wait_for_complete());
}
ASSERT_EQ(header.length(), header_to_set.length());
ASSERT_EQ(set_got.size(), (unsigned)1);
ASSERT_EQ(*set_got.begin(), "foo");
ASSERT_EQ(map_got.size(), (unsigned)1);
ASSERT_EQ(map_got.begin()->first, "foo2");
ASSERT_EQ(got3.size(), (unsigned)2);
ASSERT_EQ(got3.begin()->first, "foo");
ASSERT_EQ(got3.rbegin()->first, "foo3");
}
{
boost::scoped_ptr<AioCompletion> my_completion(cluster.aio_create_completion(0, 0, 0));
ObjectWriteOperation op;
set<string> to_remove;
to_remove.insert("foo2");
op.omap_rm_keys(to_remove);
ioctx.aio_operate("test_obj", my_completion.get(), &op);
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion->wait_for_complete());
}
}
{
boost::scoped_ptr<AioCompletion> my_completion(cluster.aio_create_completion(0, 0, 0));
ObjectReadOperation op;
set<string> set_got;
op.omap_get_keys("", -1, &set_got, 0);
ioctx.aio_operate("test_obj", my_completion.get(), &op, 0);
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion->wait_for_complete());
}
ASSERT_EQ(set_got.size(), (unsigned)2);
}
{
boost::scoped_ptr<AioCompletion> my_completion(cluster.aio_create_completion(0, 0, 0));
ObjectWriteOperation op;
op.omap_clear();
ioctx.aio_operate("test_obj", my_completion.get(), &op);
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion->wait_for_complete());
}
}
{
boost::scoped_ptr<AioCompletion> my_completion(cluster.aio_create_completion(0, 0, 0));
ObjectReadOperation op;
set<string> set_got;
op.omap_get_keys("", -1, &set_got, 0);
ioctx.aio_operate("test_obj", my_completion.get(), &op, 0);
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion->wait_for_complete());
}
ASSERT_EQ(set_got.size(), (unsigned)0);
}
ioctx.remove("test_obj");
}