librados: new AIO version of notify API

Allow watch/notify notifications to be sent asynchronously so
that they can be safely sent from a librados op callback.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2015-06-10 15:33:17 -04:00
parent 8cee9a27f9
commit c9b873ac3d
7 changed files with 238 additions and 65 deletions

View File

@ -2092,6 +2092,7 @@ CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver,
* -ETIMEDOUT).
*
* @param io the pool the object is in
* @param completion what to do when operation has been attempted
* @param o the name of the object
* @param buf data to send to watchers
* @param buf_len length of buf in bytes
@ -2104,6 +2105,11 @@ CEPH_RADOS_API int rados_notify2(rados_ioctx_t io, const char *o,
const char *buf, int buf_len,
uint64_t timeout_ms,
char **reply_buffer, size_t *reply_buffer_len);
CEPH_RADOS_API int rados_aio_notify(rados_ioctx_t io, const char *o,
rados_completion_t completion,
const char *buf, int buf_len,
uint64_t timeout_ms, char **reply_buffer,
size_t *reply_buffer_len);
/**
* Acknolwedge receipt of a notify

View File

@ -988,6 +988,12 @@ namespace librados
bufferlist& bl, ///< optional broadcast payload
uint64_t timeout_ms, ///< timeout (in ms)
bufferlist *pbl); ///< reply buffer
int aio_notify(const std::string& o, ///< object
AioCompletion *c, ///< completion when notify completes
bufferlist& bl, ///< optional broadcast payload
uint64_t timeout_ms, ///< timeout (in ms)
bufferlist *pbl); ///< reply buffer
int list_watchers(const std::string& o, std::list<obj_watch_t> *out_watchers);
int list_snaps(const std::string& o, snap_set_t *out_snaps);
void set_notify_timeout(uint32_t timeout);

View File

@ -25,6 +25,121 @@
#undef dout_prefix
#define dout_prefix *_dout << "librados: "
namespace librados {
namespace {
struct C_notify_Finish : public Context {
CephContext *cct;
Context *ctx;
Objecter *objecter;
Objecter::LingerOp *linger_op;
bufferlist reply_bl;
bufferlist *preply_bl;
char **preply_buf;
size_t *preply_buf_len;
C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter,
Objecter::LingerOp *_linger_op, bufferlist *_preply_bl,
char **_preply_buf, size_t *_preply_buf_len)
: cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op),
preply_bl(_preply_bl), preply_buf(_preply_buf),
preply_buf_len(_preply_buf_len)
{
linger_op->on_notify_finish = this;
linger_op->notify_result_bl = &reply_bl;
}
virtual void finish(int r)
{
ldout(cct, 10) << __func__ << " completed notify (linger op "
<< linger_op << "), r = " << r << dendl;
// pass result back to user
// NOTE: we do this regardless of what error code we return
if (preply_buf) {
if (reply_bl.length()) {
*preply_buf = (char*)malloc(reply_bl.length());
memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length());
} else {
*preply_buf = NULL;
}
}
if (preply_buf_len)
*preply_buf_len = reply_bl.length();
if (preply_bl)
preply_bl->claim(reply_bl);
ctx->complete(r);
}
};
struct C_aio_linger_cancel : public Context {
Objecter *objecter;
Objecter::LingerOp *linger_op;
C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op)
: objecter(_objecter), linger_op(_linger_op)
{
}
virtual void finish(int r)
{
objecter->linger_cancel(linger_op);
}
};
struct C_aio_notify_Complete : public Context {
AioCompletionImpl *c;
Objecter::LingerOp *linger_op;
C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
: c(_c), linger_op(_linger_op)
{
c->get();
}
virtual void finish(int r) {
c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
linger_op));
c->lock.Lock();
c->rval = r;
c->ack = true;
c->safe = true;
c->cond.Signal();
if (c->callback_complete) {
c->io->client->finisher.queue(new C_AioComplete(c));
}
if (c->callback_safe) {
c->io->client->finisher.queue(new C_AioSafe(c));
}
c->put_unlock();
}
};
struct C_aio_notify_Ack : public Context {
CephContext *cct;
C_notify_Finish *f;
C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_f)
: cct(_cct), f(_f)
{
}
virtual void finish(int r)
{
ldout(cct, 10) << __func__ << " linger op " << f->linger_op << " acked ("
<< r << ")" << dendl;
if (r < 0) {
f->complete(r);
}
}
};
} // anonymous namespace
} // namespace librados
librados::IoCtxImpl::IoCtxImpl() :
ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0),
notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
@ -1169,52 +1284,22 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
bufferlist *preply_bl,
char **preply_buf, size_t *preply_buf_len)
{
bufferlist inbl;
struct C_NotifyFinish : public Context {
Cond cond;
Mutex lock;
bool done;
int result;
bufferlist reply_bl;
C_NotifyFinish()
: lock("IoCtxImpl::notify::C_NotifyFinish::lock"),
done(false),
result(0) { }
void finish(int r) {}
void complete(int r) {
lock.Lock();
done = true;
result = r;
cond.Signal();
lock.Unlock();
}
void wait() {
lock.Lock();
while (!done)
cond.Wait(lock);
lock.Unlock();
}
} notify_private;
Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
linger_op->on_notify_finish = &notify_private;
linger_op->notify_result_bl = &notify_private.reply_bl;
uint32_t prot_ver = 1;
C_SaferCond notify_finish_cond;
Context *notify_finish = new C_notify_Finish(client->cct, &notify_finish_cond,
objecter, linger_op, preply_bl,
preply_buf, preply_buf_len);
uint32_t timeout = notify_timeout;
if (timeout_ms)
timeout = timeout_ms / 1000;
::encode(prot_ver, inbl);
::encode(timeout, inbl);
::encode(bl, inbl);
// Construct RADOS op
::ObjectOperation rd;
prepare_assert_ops(&rd);
rd.notify(linger_op->get_cookie(), inbl);
bufferlist inbl;
rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
// Issue RADOS op
C_SaferCond onack;
@ -1224,44 +1309,58 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
&onack, &objver);
ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
int r_issue = onack.wait();
int r = onack.wait();
ldout(client->cct, 10) << __func__ << " linger op " << linger_op
<< " acked (" << r_issue << ")" << dendl;
<< " acked (" << r << ")" << dendl;
if (r_issue == 0) {
if (r == 0) {
ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
<< linger_op << dendl;
notify_private.wait();
r = notify_finish_cond.wait();
ldout(client->cct, 10) << __func__ << " completed notify (linger op "
<< linger_op << "), r = " << notify_private.result
<< dendl;
} else {
ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
<< r_issue << dendl;
<< r << dendl;
notify_finish->complete(r);
}
// pass result back to user
// NOTE: we do this regardless of what error code we return
if (preply_buf) {
if (notify_private.reply_bl.length()) {
*preply_buf = (char*)malloc(notify_private.reply_bl.length());
memcpy(*preply_buf, notify_private.reply_bl.c_str(),
notify_private.reply_bl.length());
} else {
*preply_buf = NULL;
}
}
if (preply_buf_len)
*preply_buf_len = notify_private.reply_bl.length();
if (preply_bl)
preply_bl->claim(notify_private.reply_bl);
objecter->linger_cancel(linger_op);
set_sync_op_version(objver);
return r;
}
return r_issue ? r_issue : notify_private.result;
int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
bufferlist& bl, uint64_t timeout_ms,
bufferlist *preply_bl, char **preply_buf,
size_t *preply_buf_len)
{
Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
c->io = this;
Context *oncomplete = new C_aio_notify_Complete(c, linger_op);
C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
objecter, linger_op,
preply_bl, preply_buf,
preply_buf_len);
Context *onack = new C_aio_notify_Ack(client->cct, onnotify);
uint32_t timeout = notify_timeout;
if (timeout_ms)
timeout = timeout_ms / 1000;
// Construct RADOS op
::ObjectOperation rd;
prepare_assert_ops(&rd);
bufferlist inbl;
rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
// Issue RADOS op
objecter->linger_notify(linger_op,
rd, snap_seq, inbl, NULL,
onack, NULL);
return 0;
}
int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
@ -1395,4 +1494,3 @@ void librados::IoCtxImpl::C_aio_Safe::finish(int r)
c->put_unlock();
}

View File

@ -206,6 +206,9 @@ struct librados::IoCtxImpl {
bufferlist *preplybl, char **preply_buf, size_t *preply_buf_len);
int notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie,
bufferlist& bl);
int aio_notify(const object_t& oid, AioCompletionImpl *c, bufferlist& bl,
uint64_t timeout_ms, bufferlist *preplybl, char **preply_buf,
size_t *preply_buf_len);
int set_alloc_hint(const object_t& oid,
uint64_t expected_object_size,

View File

@ -1811,6 +1811,15 @@ int librados::IoCtx::notify2(const string& oid, bufferlist& bl,
return io_ctx_impl->notify(obj, bl, timeout_ms, preplybl, NULL, NULL);
}
int librados::IoCtx::aio_notify(const string& oid, AioCompletion *c,
bufferlist& bl, uint64_t timeout_ms,
bufferlist *preplybl)
{
object_t obj(oid);
return io_ctx_impl->aio_notify(obj, c->pc, bl, timeout_ms, preplybl, NULL,
NULL);
}
void librados::IoCtx::notify_ack(const std::string& o,
uint64_t notify_id, uint64_t handle,
bufferlist& bl)
@ -4072,6 +4081,28 @@ extern "C" int rados_notify2(rados_ioctx_t io, const char *o,
return ret;
}
extern "C" int rados_aio_notify(rados_ioctx_t io, const char *o,
rados_completion_t completion,
const char *buf, int buf_len,
uint64_t timeout_ms, char **reply_buffer,
size_t *reply_buffer_len)
{
tracepoint(librados, rados_aio_notify_enter, io, o, completion, buf, buf_len,
timeout_ms);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
bufferlist bl;
if (buf) {
bl.push_back(buffer::copy(buf, buf_len));
}
librados::AioCompletionImpl *c =
reinterpret_cast<librados::AioCompletionImpl*>(completion);
int ret = ctx->aio_notify(oid, c, bl, timeout_ms, NULL, reply_buffer,
reply_buffer_len);
tracepoint(librados, rados_aio_notify_exit, ret);
return ret;
}
extern "C" int rados_notify_ack(rados_ioctx_t io, const char *o,
uint64_t notify_id, uint64_t handle,
const char *buf, int buf_len)

View File

@ -904,10 +904,14 @@ struct ObjectOperation {
osd_op.op.watch.op = op;
}
void notify(uint64_t cookie, bufferlist& inbl) {
void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
bufferlist &bl, bufferlist *inbl) {
OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
osd_op.op.notify.cookie = cookie;
osd_op.indata.append(inbl);
::encode(prot_ver, *inbl);
::encode(timeout, *inbl);
::encode(bl, *inbl);
osd_op.indata.append(*inbl);
}
void notify_ack(uint64_t notify_id, uint64_t cookie,

View File

@ -2339,6 +2339,31 @@ TRACEPOINT_EVENT(librados, rados_notify2_exit,
)
)
TRACEPOINT_EVENT(librados, rados_aio_notify_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
const char*, oid,
rados_completion_t, completion,
const char*, buf,
int, buf_len,
uint64_t, timeout_ms),
TP_FIELDS(
ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
ctf_string(oid, oid)
ctf_integer_hex(rados_completion_t, completion, completion)
ceph_ctf_sequence(unsigned char, buf, buf, size_t, buf_len)
ctf_integer(uint64_t, timeout_ms, timeout_ms)
)
)
TRACEPOINT_EVENT(librados, rados_aio_notify_exit,
TP_ARGS(
int, retval),
TP_FIELDS(
ctf_integer(int, retval, retval)
)
)
TRACEPOINT_EVENT(librados, rados_notify_ack_enter,
TP_ARGS(
rados_ioctx_t, ioctx,