librados: add cmpext API

The compare-extent (cmpext) operation allows callers to compare existing
object contents with an arbitrary buffer. cmpext requests can be
compounded with read and write operations, allowing for atomic object
content updates. return 0 on success, negative error code
on failure, (-MAX_ERRNO - mismatch_off) on mismatch

This commit is based on Mike Christie's initial C++ API, with the
addition of AIO support and a C API. Response marshalling was also
reworked, so that the miscompare offset is unmarshalled transparently to
the caller.

Signed-off-by: Zhengyong Wang <wangzhengyong@cmss.chinamobile.com>
Signed-off-by: David Disseldorp <ddiss@suse.de>
This commit is contained in:
wangzhengyong 2017-04-21 17:01:11 +08:00
parent 351f78d7b9
commit 0ccebc5c9b
7 changed files with 377 additions and 4 deletions

View File

@ -317,7 +317,7 @@ struct rados_cluster_stat_t {
* - Creating objects: rados_write_op_create()
* - IO on objects: rados_write_op_append(), rados_write_op_write(), rados_write_op_zero
* rados_write_op_write_full(), rados_write_op_writesame(), rados_write_op_remove,
* rados_write_op_truncate(), rados_write_op_zero()
* rados_write_op_truncate(), rados_write_op_zero(), rados_write_op_cmpext()
* - Hints: rados_write_op_set_alloc_hint()
* - Performing the operation: rados_write_op_operate(), rados_aio_write_op_operate()
*/
@ -336,7 +336,8 @@ typedef void *rados_write_op_t;
* rados_read_op_omap_cmp()
* - Object properties: rados_read_op_stat(), rados_read_op_assert_exists(),
* rados_read_op_assert_version()
* - IO on objects: rados_read_op_read(), rados_read_op_checksum()
* - IO on objects: rados_read_op_read(), rados_read_op_checksum(),
* rados_read_op_cmpext()
* - Custom operations: rados_read_op_exec(), rados_read_op_exec_user_buf()
* - Request properties: rados_read_op_set_flags()
* - Performing the operation: rados_read_op_operate(),
@ -1519,6 +1520,21 @@ CEPH_RADOS_API int rados_remove(rados_ioctx_t io, const char *oid);
CEPH_RADOS_API int rados_trunc(rados_ioctx_t io, const char *oid,
uint64_t size);
/**
* Compare an on-disk object range with a buffer
*
* @param io the context in which to perform the comparison
* @param o name of the object
* @param cmp_buf buffer containing bytes to be compared with object contents
* @param cmp_len length to compare and size of @cmp_buf in bytes
* @param off object byte offset at which to start the comparison
* @returns 0 on success, negative error code on failure,
* (-MAX_ERRNO - mismatch_off) on mismatch
*/
CEPH_RADOS_API int rados_cmpext(rados_ioctx_t io, const char *o,
const char *cmp_buf, size_t cmp_len,
uint64_t off);
/**
* @name Xattrs
* Extended attributes are stored as extended attributes on the files
@ -2110,6 +2126,24 @@ CEPH_RADOS_API int rados_aio_stat(rados_ioctx_t io, const char *o,
rados_completion_t completion,
uint64_t *psize, time_t *pmtime);
/**
* Asynchronously compare an on-disk object range with a buffer
*
* @param io the context in which to perform the comparison
* @param o the name of the object to compare with
* @param completion what to do when the comparison is complete
* @param cmp_buf buffer containing bytes to be compared with object contents
* @param cmp_len length to compare and size of @cmp_buf in bytes
* @param off object byte offset at which to start the comparison
* @returns 0 on success, negative error code on failure,
* (-MAX_ERRNO - mismatch_off) on mismatch
*/
CEPH_RADOS_API int rados_aio_cmpext(rados_ioctx_t io, const char *o,
rados_completion_t completion,
const char *cmp_buf,
size_t cmp_len,
uint64_t off);
/**
* Cancel async operation
*
@ -2722,6 +2756,22 @@ CEPH_RADOS_API void rados_write_op_assert_exists(rados_write_op_t write_op);
*/
CEPH_RADOS_API void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver);
/**
* Ensure that given object range (extent) satisfies comparison.
*
* @param write_op operation to add this action to
* @param cmp_buf buffer containing bytes to be compared with object contents
* @param cmp_len length to compare and size of @cmp_buf in bytes
* @param off object byte offset at which to start the comparison
* @param prval returned result of comparison, 0 on success, negative error code
* on failure, (-MAX_ERRNO - mismatch_off) on mismatch
*/
CEPH_RADOS_API void rados_write_op_cmpext(rados_write_op_t write_op,
const char *cmp_buf,
size_t cmp_len,
uint64_t off,
int *prval);
/**
* Ensure that given xattr satisfies comparison.
* If the comparison is not satisfied, the return code of the
@ -3023,6 +3073,22 @@ CEPH_RADOS_API void rados_read_op_assert_exists(rados_read_op_t read_op);
*/
CEPH_RADOS_API void rados_read_op_assert_version(rados_read_op_t read_op, uint64_t ver);
/**
* Ensure that given object range (extent) satisfies comparison.
*
* @param read_op operation to add this action to
* @param cmp_buf buffer containing bytes to be compared with object contents
* @param cmp_len length to compare and size of @cmp_buf in bytes
* @param off object byte offset at which to start the comparison
* @param prval returned result of comparison, 0 on success, negative error code
* on failure, (-MAX_ERRNO - mismatch_off) on mismatch
*/
CEPH_RADOS_API void rados_read_op_cmpext(rados_read_op_t read_op,
const char *cmp_buf,
size_t cmp_len,
uint64_t off,
int *prval);
/**
* Ensure that the an xattr satisfies a comparison
* If the comparison is not satisfied, the return code of the

View File

@ -301,6 +301,7 @@ namespace librados
//flag mean ObjectOperationFlags
void set_op_flags2(int flags);
void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval);
void cmpxattr(const char *name, uint8_t op, const bufferlist& val);
void cmpxattr(const char *name, uint8_t op, uint64_t v);
void exec(const char *cls, const char *method, bufferlist& inbl);
@ -755,6 +756,7 @@ namespace librados
int remove(const std::string& oid, int flags);
int trunc(const std::string& oid, uint64_t size);
int mapext(const std::string& o, uint64_t off, size_t len, std::map<uint64_t,uint64_t>& m);
int cmpext(const std::string& o, uint64_t off, bufferlist& cmp_bl);
int sparse_read(const std::string& o, std::map<uint64_t,uint64_t>& m, bufferlist& bl, size_t len, uint64_t off);
int getxattr(const std::string& oid, const char *name, bufferlist& bl);
int getxattrs(const std::string& oid, std::map<std::string, bufferlist>& attrset);
@ -991,6 +993,20 @@ namespace librados
int aio_sparse_read(const std::string& oid, AioCompletion *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off, uint64_t snapid);
/**
* Asynchronously compare an on-disk object range with a buffer
*
* @param oid the name of the object to read from
* @param c what to do when the read is complete
* @param off object byte offset at which to start the comparison
* @param cmp_bl buffer containing bytes to be compared with object contents
* @returns 0 on success, negative error code on failure,
* (-MAX_ERRNO - mismatch_off) on mismatch
*/
int aio_cmpext(const std::string& oid,
librados::AioCompletion *c,
uint64_t off,
bufferlist& cmp_bl);
int aio_write(const std::string& oid, AioCompletion *c, const bufferlist& bl,
size_t len, uint64_t off);
int aio_append(const std::string& oid, AioCompletion *c, const bufferlist& bl,

View File

@ -909,6 +909,54 @@ int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
return 0;
}
int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
AioCompletionImpl *c,
uint64_t off,
bufferlist& cmp_bl)
{
if (cmp_bl.length() > UINT_MAX/2)
return -E2BIG;
Context *onack = new C_aio_Complete(c);
c->is_read = true;
c->io = this;
Objecter::Op *o = objecter->prepare_cmpext_op(
oid, oloc, off, cmp_bl, snap_seq, 0,
onack, &c->objver);
objecter->op_submit(o, &c->tid);
return 0;
}
/* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
AioCompletionImpl *c,
const char *cmp_buf,
size_t cmp_len,
uint64_t off)
{
if (cmp_len > UINT_MAX/2)
return -E2BIG;
bufferlist cmp_bl;
cmp_bl.append(cmp_buf, cmp_len);
Context *nested = new C_aio_Complete(c);
C_ObjectOperation *onack = new C_ObjectOperation(nested);
c->is_read = true;
c->io = this;
onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL);
Objecter::Op *o = objecter->prepare_read_op(
oid, oloc, onack->m_ops, snap_seq, NULL, 0, onack, &c->objver);
objecter->op_submit(o, &c->tid);
return 0;
}
int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl, size_t len,
uint64_t off)
@ -1376,6 +1424,18 @@ int librados::IoCtxImpl::read(const object_t& oid,
return bl.length();
}
int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off,
bufferlist& cmp_bl)
{
if (cmp_bl.length() > UINT_MAX/2)
return -E2BIG;
::ObjectOperation op;
prepare_assert_ops(&op);
op.cmpext(off, cmp_bl, NULL);
return operate_read(oid, &op, NULL);
}
int librados::IoCtxImpl::mapext(const object_t& oid,
uint64_t off, size_t len,
std::map<uint64_t,uint64_t>& m)

View File

@ -138,6 +138,7 @@ struct librados::IoCtxImpl {
int stat(const object_t& oid, uint64_t *psize, time_t *pmtime);
int stat2(const object_t& oid, uint64_t *psize, struct timespec *pts);
int trunc(const object_t& oid, uint64_t size);
int cmpext(const object_t& oid, uint64_t off, bufferlist& cmp_bl);
int tmap_update(const object_t& oid, bufferlist& cmdbl);
int tmap_put(const object_t& oid, bufferlist& bl);
@ -191,6 +192,10 @@ struct librados::IoCtxImpl {
int aio_sparse_read(const object_t oid, AioCompletionImpl *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off, uint64_t snapid);
int aio_cmpext(const object_t& oid, AioCompletionImpl *c, uint64_t off,
bufferlist& cmp_bl);
int aio_cmpext(const object_t& oid, AioCompletionImpl *c,
const char *cmp_buf, size_t cmp_len, uint64_t off);
int aio_write(const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl, size_t len, uint64_t off);
int aio_append(const object_t &oid, AioCompletionImpl *c,

View File

@ -153,6 +153,14 @@ void librados::ObjectOperation::set_op_flags2(int flags)
::set_op_flags(o, flags);
}
void librados::ObjectOperation::cmpext(uint64_t off,
bufferlist &cmp_bl,
int *prval)
{
::ObjectOperation *o = &impl->o;
o->cmpext(off, cmp_bl, prval);
}
void librados::ObjectOperation::cmpxattr(const char *name, uint8_t op, const bufferlist& v)
{
::ObjectOperation *o = &impl->o;
@ -1222,6 +1230,12 @@ int librados::IoCtx::mapext(const std::string& oid, uint64_t off, size_t len,
return io_ctx_impl->mapext(obj, off, len, m);
}
int librados::IoCtx::cmpext(const std::string& oid, uint64_t off, bufferlist& cmp_bl)
{
object_t obj(oid);
return io_ctx_impl->cmpext(obj, off, cmp_bl);
}
int librados::IoCtx::sparse_read(const std::string& oid, std::map<uint64_t,uint64_t>& m,
bufferlist& bl, size_t len, uint64_t off)
{
@ -1829,6 +1843,14 @@ int librados::IoCtx::aio_exec(const std::string& oid,
return io_ctx_impl->aio_exec(obj, c->pc, cls, method, inbl, outbl);
}
int librados::IoCtx::aio_cmpext(const std::string& oid,
librados::AioCompletion *c,
uint64_t off,
bufferlist& cmp_bl)
{
return io_ctx_impl->aio_cmpext(oid, c->pc, off, cmp_bl);
}
int librados::IoCtx::aio_sparse_read(const std::string& oid, librados::AioCompletion *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off)
@ -3920,6 +3942,23 @@ extern "C" int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, tim
return retval;
}
extern "C" int rados_cmpext(rados_ioctx_t io, const char *o,
const char *cmp_buf, size_t cmp_len, uint64_t off)
{
tracepoint(librados, rados_cmpext_enter, io, o, cmp_buf, cmp_len, off);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
int ret;
object_t oid(o);
bufferlist cmp_bl;
cmp_bl.append(cmp_buf, cmp_len);
ret = ctx->cmpext(oid, off, cmp_bl);
tracepoint(librados, rados_cmpext_exit, ret);
return ret;
}
extern "C" int rados_getxattr(rados_ioctx_t io, const char *o, const char *name,
char *buf, size_t len)
{
@ -4723,7 +4762,7 @@ extern "C" int rados_aio_rmxattr(rados_ioctx_t io, const char *o,
return retval;
}
extern "C" int rados_aio_stat(rados_ioctx_t io, const char *o,
extern "C" int rados_aio_stat(rados_ioctx_t io, const char *o,
rados_completion_t completion,
uint64_t *psize, time_t *pmtime)
{
@ -4736,6 +4775,20 @@ extern "C" int rados_aio_stat(rados_ioctx_t io, const char *o,
return retval;
}
extern "C" int rados_aio_cmpext(rados_ioctx_t io, const char *o,
rados_completion_t completion, const char *cmp_buf,
size_t cmp_len, uint64_t off)
{
tracepoint(librados, rados_aio_cmpext_enter, io, o, completion, cmp_buf,
cmp_len, off);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
int retval = ctx->aio_cmpext(oid, (librados::AioCompletionImpl*)completion,
cmp_buf, cmp_len, off);
tracepoint(librados, rados_aio_cmpext_exit, retval);
return retval;
}
extern "C" int rados_aio_cancel(rados_ioctx_t io, rados_completion_t completion)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
@ -5196,6 +5249,18 @@ extern "C" void rados_write_op_assert_exists(rados_write_op_t write_op)
tracepoint(librados, rados_write_op_assert_exists_exit);
}
extern "C" void rados_write_op_cmpext(rados_write_op_t write_op,
const char *cmp_buf,
size_t cmp_len,
uint64_t off,
int *prval)
{
tracepoint(librados, rados_write_op_cmpext_enter, write_op, cmp_buf,
cmp_len, off, prval);
((::ObjectOperation *)write_op)->cmpext(off, cmp_len, cmp_buf, prval);
tracepoint(librados, rados_write_op_cmpext_exit);
}
extern "C" void rados_write_op_cmpxattr(rados_write_op_t write_op,
const char *name,
uint8_t comparison_operator,
@ -5515,6 +5580,18 @@ extern "C" void rados_read_op_assert_exists(rados_read_op_t read_op)
tracepoint(librados, rados_read_op_assert_exists_exit);
}
extern "C" void rados_read_op_cmpext(rados_read_op_t read_op,
const char *cmp_buf,
size_t cmp_len,
uint64_t off,
int *prval)
{
tracepoint(librados, rados_read_op_cmpext_enter, read_op, cmp_buf,
cmp_len, off, prval);
((::ObjectOperation *)read_op)->cmpext(off, cmp_len, cmp_buf, prval);
tracepoint(librados, rados_read_op_cmpext_exit);
}
extern "C" void rados_read_op_cmpxattr(rados_read_op_t read_op,
const char *name,
uint8_t comparison_operator,

View File

@ -282,7 +282,37 @@ struct ObjectOperation {
out_handler[p] = h;
out_rval[p] = prval;
}
// object data
// object cmpext
struct C_ObjectOperation_cmpext : public Context {
int *prval;
C_ObjectOperation_cmpext(int *prval)
: prval(prval) {}
void finish(int r) {
if (prval)
*prval = r;
}
};
void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval) {
add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
unsigned p = ops.size() - 1;
C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
out_handler[p] = h;
out_rval[p] = prval;
}
// Used by C API
void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) {
bufferlist cmp_bl;
cmp_bl.append(cmp_buf, cmp_len);
add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
unsigned p = ops.size() - 1;
C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
out_handler[p] = h;
out_rval[p] = prval;
}
void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval,
Context* ctx) {
bufferlist bl;
@ -2342,6 +2372,38 @@ public:
return tid;
}
Op *prepare_cmpext_op(
const object_t& oid, const object_locator_t& oloc,
uint64_t off, bufferlist &cmp_bl,
snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
ObjectOperation *extra_ops = NULL, int op_flags = 0) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_CMPEXT;
ops[i].op.extent.offset = off;
ops[i].op.extent.length = cmp_bl.length();
ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0;
ops[i].indata = cmp_bl;
ops[i].op.flags = op_flags;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
CEPH_OSD_FLAG_READ, onfinish, objver);
o->snapid = snap;
return o;
}
ceph_tid_t cmpext(
const object_t& oid, const object_locator_t& oloc,
uint64_t off, bufferlist &cmp_bl,
snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
ObjectOperation *extra_ops = NULL, int op_flags = 0) {
Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
flags, onfinish, objver, extra_ops, op_flags);
ceph_tid_t tid;
op_submit(o, &tid);
return tid;
}
ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snap,
bufferlist *pbl, int flags, uint64_t trunc_size,

View File

@ -1488,6 +1488,29 @@ TRACEPOINT_EVENT(librados, rados_ioctx_snap_get_stamp_exit,
)
)
TRACEPOINT_EVENT(librados, rados_cmpext_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
const char*, oid,
const char*, cmp_buf,
size_t, cmp_len,
uint64_t, off),
TP_FIELDS(
ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
ctf_string(oid, oid)
ceph_ctf_sequence(unsigned char, cmp_buf, cmp_buf, size_t, cmp_len)
ctf_integer(uint64_t, off, off)
)
)
TRACEPOINT_EVENT(librados, rados_cmpext_exit,
TP_ARGS(
int, retval),
TP_FIELDS(
ctf_integer(int, retval, retval)
)
)
TRACEPOINT_EVENT(librados, rados_getxattr_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
@ -2466,6 +2489,28 @@ TRACEPOINT_EVENT(librados, rados_aio_exec_exit,
)
)
TRACEPOINT_EVENT(librados, rados_aio_cmpext_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
const char*, oid,
rados_completion_t, completion,
const char*, cmp_buf,
size_t, cmp_len,
uint64_t, off),
TP_FIELDS(
ceph_ctf_sequence(unsigned char, cmp_buf, cmp_buf, size_t, cmp_len)
ctf_integer(uint64_t, off, off)
)
)
TRACEPOINT_EVENT(librados, rados_aio_cmpext_exit,
TP_ARGS(
int, retval),
TP_FIELDS(
ctf_integer(int, retval, retval)
)
)
TRACEPOINT_EVENT(librados, rados_watch_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
@ -3051,6 +3096,27 @@ TRACEPOINT_EVENT(librados, rados_write_op_assert_exists_exit,
TP_FIELDS()
)
TRACEPOINT_EVENT(librados, rados_write_op_cmpext_enter,
TP_ARGS(
rados_write_op_t, op,
const char*, cmp_buffer,
size_t, cmp_len,
uint64_t, offset,
int*, prval),
TP_FIELDS(
ctf_integer_hex(rados_write_op_t, op, op)
ceph_ctf_sequence(unsigned char, cmp_buffer, cmp_buffer, size_t, cmp_len)
ctf_integer(size_t, cmp_len, cmp_len)
ctf_integer(uint64_t, offset, offset)
ctf_integer_hex(void*, prval, prval)
)
)
TRACEPOINT_EVENT(librados, rados_write_op_cmpext_exit,
TP_ARGS(),
TP_FIELDS()
)
TRACEPOINT_EVENT(librados, rados_write_op_cmpxattr_enter,
TP_ARGS(
rados_write_op_t, op,
@ -3511,6 +3577,27 @@ TRACEPOINT_EVENT(librados, rados_read_op_assert_exists_exit,
TP_FIELDS()
)
TRACEPOINT_EVENT(librados, rados_read_op_cmpext_enter,
TP_ARGS(
rados_read_op_t, op,
const char*, cmp_buffer,
size_t, cmp_len,
uint64_t, offset,
int*, prval),
TP_FIELDS(
ctf_integer_hex(rados_read_op_t, op, op)
ceph_ctf_sequence(unsigned char, cmp_buffer, cmp_buffer, size_t, cmp_len)
ctf_integer(size_t, cmp_len, cmp_len)
ctf_integer(uint64_t, offset, offset)
ctf_integer_hex(void*, prval, prval)
)
)
TRACEPOINT_EVENT(librados, rados_read_op_cmpext_exit,
TP_ARGS(),
TP_FIELDS()
)
TRACEPOINT_EVENT(librados, rados_read_op_cmpxattr_enter,
TP_ARGS(
rados_read_op_t, read_op,