1
0
mirror of https://github.com/ceph/ceph synced 2025-01-24 03:53:54 +00:00

osd: add cmpxattr op handling

This commit is contained in:
Yehuda Sadeh 2010-05-10 16:18:52 -07:00
parent 4d667dde83
commit f857a2e138
4 changed files with 107 additions and 0 deletions

View File

@ -44,6 +44,7 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_SETXATTRS: return "setxattrs";
case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs";
case CEPH_OSD_OP_RMXATTR: return "rmxattr";
case CEPH_OSD_OP_CMPXATTR: return "cmpxattr";
case CEPH_OSD_OP_PULL: return "pull";
case CEPH_OSD_OP_PUSH: return "push";

View File

@ -307,6 +307,7 @@ enum {
/* xattr comparison */
enum {
CEPH_OSD_CMPXATTR_OP_NOP = 0,
CEPH_OSD_CMPXATTR_OP_EQ = 1,
CEPH_OSD_CMPXATTR_OP_NE = 2,
CEPH_OSD_CMPXATTR_OP_GT = 3,

View File

@ -871,6 +871,59 @@ bool ReplicatedPG::snap_trimmer()
return true;
}
int ReplicatedPG::do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr)
{
__u64 v2;
if (xattr.length())
v2 = atoll(xattr.c_str());
else
v2 = 0;
switch (op) {
case CEPH_OSD_CMPXATTR_OP_EQ:
return (v1 == v2);
case CEPH_OSD_CMPXATTR_OP_NE:
return (v1 != v2);
case CEPH_OSD_CMPXATTR_OP_GT:
return (v1 > v2);
case CEPH_OSD_CMPXATTR_OP_GTE:
return (v1 >= v2);
case CEPH_OSD_CMPXATTR_OP_LT:
return (v1 < v2);
case CEPH_OSD_CMPXATTR_OP_LTE:
return (v1 <= v2);
}
assert(0);
}
int ReplicatedPG::do_xattr_cmp_str(int op, nstring& v1s, bufferlist& xattr)
{
const char *v1, *v2;
v1 = v1s.data();
if (xattr.length())
v2 = xattr.c_str();
else
v2 = "";
switch (op) {
case CEPH_OSD_CMPXATTR_OP_EQ:
return (strcmp(v1, v2) == 0);
case CEPH_OSD_CMPXATTR_OP_NE:
return (strcmp(v1, v2) != 0);
case CEPH_OSD_CMPXATTR_OP_GT:
return (strcmp(v1, v2) > 0);
case CEPH_OSD_CMPXATTR_OP_GTE:
return (strcmp(v1, v2) >= 0);
case CEPH_OSD_CMPXATTR_OP_LT:
return (strcmp(v1, v2) < 0);
case CEPH_OSD_CMPXATTR_OP_LTE:
return (strcmp(v1, v2) <= 0);
}
assert(0);
}
// ========================================================================
// low level osd ops
@ -897,6 +950,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
// modify?
bool is_modify;
string cname, mname;
dout(0) << "osd_op.data.length=" << osd_op.data.length() << dendl;
bufferlist::iterator bp = osd_op.data.begin();
switch (op.op) {
case CEPH_OSD_OP_CALL:
@ -929,6 +983,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
case CEPH_OSD_OP_READ:
{
dout(0) << "CEPH_OSD_OP_READ" << dendl;
// read into a buffer
bufferlist bl;
int r = osd->store->read(coll_t::build_pg_coll(info.pgid), soid, op.extent.offset, op.extent.length, bl);
@ -1045,6 +1100,54 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
}
break;
case CEPH_OSD_OP_CMPXATTR:
{
dout(0) << "CEPH_OSD_OP_CMPXATTR" << dendl;
nstring name(op.xattr.name_len + 1);
nstring val(op.xattr.value_len + 1);
__u64 u64val;
name[0] = '_';
bp.copy(op.xattr.name_len, name.data()+1);
bufferlist xattr;
int result = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, name.c_str(), xattr);
if (result < 0 && result != -EEXIST && result !=-ENODATA)
break;
result = 0;
switch (op.xattr.cmp_mode) {
case CEPH_OSD_CMPXATTR_MODE_STRING:
{
bp.copy(op.xattr.value_len, val.data());
val[op.xattr.value_len] = 0; /* just making sure that we're null terminated */
dout(0) << "CEPH_OSD_OP_CMPXATTR name=" << name << " val=" << val << " op=" << (int)op.xattr.cmp_op << " mode=" << (int)op.xattr.cmp_mode << dendl;
__u32 len;
int r = do_xattr_cmp_str(op.xattr.cmp_op, val, xattr);
op.xattr.value_len = len;
if (!r)
result = -ECANCELED;
}
break;
case CEPH_OSD_CMPXATTR_MODE_U64:
::decode(u64val, bp);
dout(0) << "CEPH_OSD_OP_CMPXATTR name=" << name << " val=" << u64val << " op=" << (int)op.xattr.cmp_op << " mode=" << (int)op.xattr.cmp_mode << dendl;
int r = do_xattr_cmp_u64(op.xattr.cmp_op, u64val, xattr);
if (!r) {
dout(0) << "comparison returned false" << dendl;
result = -ECANCELED;
} else {
dout(0) << "comparison returned true" << dendl;
}
break;
}
if (result)
break;
info.stats.num_rd++;
}
break;
// --- WRITES ---

View File

@ -565,6 +565,8 @@ protected:
void apply_and_flush_repops(bool requeue);
void calc_trim_to();
int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
int do_xattr_cmp_str(int op, nstring& v1s, bufferlist& xattr);
public:
ReplicatedPG(OSD *o, PGPool *_pool, pg_t p, const sobject_t& oid) :