From c9b873ac3d3cba1aaac75c3446eadbecb95e79f8 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Wed, 10 Jun 2015 15:33:17 -0400 Subject: [PATCH] librados: new AIO version of notify API Allow watch/notify notifications to be sent asynchronously so that they can be safely sent from a librados op callback. Signed-off-by: Jason Dillaman --- src/include/rados/librados.h | 6 + src/include/rados/librados.hpp | 6 + src/librados/IoCtxImpl.cc | 224 +++++++++++++++++++++++---------- src/librados/IoCtxImpl.h | 3 + src/librados/librados.cc | 31 +++++ src/osdc/Objecter.h | 8 +- src/tracing/librados.tp | 25 ++++ 7 files changed, 238 insertions(+), 65 deletions(-) diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index 5d4ac753a96..1ce385dde4f 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -2092,6 +2092,7 @@ CEPH_RADOS_API int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, * -ETIMEDOUT). * * @param io the pool the object is in + * @param completion what to do when operation has been attempted * @param o the name of the object * @param buf data to send to watchers * @param buf_len length of buf in bytes @@ -2104,6 +2105,11 @@ CEPH_RADOS_API int rados_notify2(rados_ioctx_t io, const char *o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len); +CEPH_RADOS_API int rados_aio_notify(rados_ioctx_t io, const char *o, + rados_completion_t completion, + const char *buf, int buf_len, + uint64_t timeout_ms, char **reply_buffer, + size_t *reply_buffer_len); /** * Acknolwedge receipt of a notify diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index 52ce642934c..0af06544dde 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -988,6 +988,12 @@ namespace librados bufferlist& bl, ///< optional broadcast payload uint64_t timeout_ms, ///< timeout (in ms) bufferlist *pbl); ///< reply buffer + int aio_notify(const std::string& o, ///< object + AioCompletion *c, ///< completion when notify completes + bufferlist& bl, ///< optional broadcast payload + uint64_t timeout_ms, ///< timeout (in ms) + bufferlist *pbl); ///< reply buffer + int list_watchers(const std::string& o, std::list *out_watchers); int list_snaps(const std::string& o, snap_set_t *out_snaps); void set_notify_timeout(uint32_t timeout); diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index 517eeb8200b..f12c9214a30 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -25,6 +25,121 @@ #undef dout_prefix #define dout_prefix *_dout << "librados: " +namespace librados { +namespace { + +struct C_notify_Finish : public Context { + CephContext *cct; + Context *ctx; + Objecter *objecter; + Objecter::LingerOp *linger_op; + bufferlist reply_bl; + bufferlist *preply_bl; + char **preply_buf; + size_t *preply_buf_len; + + C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter, + Objecter::LingerOp *_linger_op, bufferlist *_preply_bl, + char **_preply_buf, size_t *_preply_buf_len) + : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op), + preply_bl(_preply_bl), preply_buf(_preply_buf), + preply_buf_len(_preply_buf_len) + { + linger_op->on_notify_finish = this; + linger_op->notify_result_bl = &reply_bl; + } + + virtual void finish(int r) + { + ldout(cct, 10) << __func__ << " completed notify (linger op " + << linger_op << "), r = " << r << dendl; + + // pass result back to user + // NOTE: we do this regardless of what error code we return + if (preply_buf) { + if (reply_bl.length()) { + *preply_buf = (char*)malloc(reply_bl.length()); + memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length()); + } else { + *preply_buf = NULL; + } + } + if (preply_buf_len) + *preply_buf_len = reply_bl.length(); + if (preply_bl) + preply_bl->claim(reply_bl); + + ctx->complete(r); + } +}; + +struct C_aio_linger_cancel : public Context { + Objecter *objecter; + Objecter::LingerOp *linger_op; + + C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op) + : objecter(_objecter), linger_op(_linger_op) + { + } + + virtual void finish(int r) + { + objecter->linger_cancel(linger_op); + } +}; + +struct C_aio_notify_Complete : public Context { + AioCompletionImpl *c; + Objecter::LingerOp *linger_op; + + C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op) + : c(_c), linger_op(_linger_op) + { + c->get(); + } + + virtual void finish(int r) { + c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter, + linger_op)); + + c->lock.Lock(); + c->rval = r; + c->ack = true; + c->safe = true; + c->cond.Signal(); + + if (c->callback_complete) { + c->io->client->finisher.queue(new C_AioComplete(c)); + } + if (c->callback_safe) { + c->io->client->finisher.queue(new C_AioSafe(c)); + } + c->put_unlock(); + } +}; + +struct C_aio_notify_Ack : public Context { + CephContext *cct; + C_notify_Finish *f; + + C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_f) + : cct(_cct), f(_f) + { + } + + virtual void finish(int r) + { + ldout(cct, 10) << __func__ << " linger op " << f->linger_op << " acked (" + << r << ")" << dendl; + if (r < 0) { + f->complete(r); + } + } +}; + +} // anonymous namespace +} // namespace librados + librados::IoCtxImpl::IoCtxImpl() : ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0), notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), @@ -1169,52 +1284,22 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, bufferlist *preply_bl, char **preply_buf, size_t *preply_buf_len) { - bufferlist inbl; - - struct C_NotifyFinish : public Context { - Cond cond; - Mutex lock; - bool done; - int result; - bufferlist reply_bl; - - C_NotifyFinish() - : lock("IoCtxImpl::notify::C_NotifyFinish::lock"), - done(false), - result(0) { } - - void finish(int r) {} - void complete(int r) { - lock.Lock(); - done = true; - result = r; - cond.Signal(); - lock.Unlock(); - } - void wait() { - lock.Lock(); - while (!done) - cond.Wait(lock); - lock.Unlock(); - } - } notify_private; - Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); - linger_op->on_notify_finish = ¬ify_private; - linger_op->notify_result_bl = ¬ify_private.reply_bl; - uint32_t prot_ver = 1; + C_SaferCond notify_finish_cond; + Context *notify_finish = new C_notify_Finish(client->cct, ¬ify_finish_cond, + objecter, linger_op, preply_bl, + preply_buf, preply_buf_len); + uint32_t timeout = notify_timeout; if (timeout_ms) timeout = timeout_ms / 1000; - ::encode(prot_ver, inbl); - ::encode(timeout, inbl); - ::encode(bl, inbl); // Construct RADOS op ::ObjectOperation rd; prepare_assert_ops(&rd); - rd.notify(linger_op->get_cookie(), inbl); + bufferlist inbl; + rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl); // Issue RADOS op C_SaferCond onack; @@ -1224,44 +1309,58 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, &onack, &objver); ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl; - int r_issue = onack.wait(); + int r = onack.wait(); ldout(client->cct, 10) << __func__ << " linger op " << linger_op - << " acked (" << r_issue << ")" << dendl; + << " acked (" << r << ")" << dendl; - if (r_issue == 0) { + if (r == 0) { ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish " << linger_op << dendl; - notify_private.wait(); + r = notify_finish_cond.wait(); - ldout(client->cct, 10) << __func__ << " completed notify (linger op " - << linger_op << "), r = " << notify_private.result - << dendl; } else { ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = " - << r_issue << dendl; + << r << dendl; + notify_finish->complete(r); } - // pass result back to user - // NOTE: we do this regardless of what error code we return - if (preply_buf) { - if (notify_private.reply_bl.length()) { - *preply_buf = (char*)malloc(notify_private.reply_bl.length()); - memcpy(*preply_buf, notify_private.reply_bl.c_str(), - notify_private.reply_bl.length()); - } else { - *preply_buf = NULL; - } - } - if (preply_buf_len) - *preply_buf_len = notify_private.reply_bl.length(); - if (preply_bl) - preply_bl->claim(notify_private.reply_bl); - objecter->linger_cancel(linger_op); set_sync_op_version(objver); + return r; +} - return r_issue ? r_issue : notify_private.result; +int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c, + bufferlist& bl, uint64_t timeout_ms, + bufferlist *preply_bl, char **preply_buf, + size_t *preply_buf_len) +{ + Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); + + c->io = this; + + Context *oncomplete = new C_aio_notify_Complete(c, linger_op); + C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete, + objecter, linger_op, + preply_bl, preply_buf, + preply_buf_len); + Context *onack = new C_aio_notify_Ack(client->cct, onnotify); + + uint32_t timeout = notify_timeout; + if (timeout_ms) + timeout = timeout_ms / 1000; + + // Construct RADOS op + ::ObjectOperation rd; + prepare_assert_ops(&rd); + bufferlist inbl; + rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl); + + // Issue RADOS op + objecter->linger_notify(linger_op, + rd, snap_seq, inbl, NULL, + onack, NULL); + return 0; } int librados::IoCtxImpl::set_alloc_hint(const object_t& oid, @@ -1395,4 +1494,3 @@ void librados::IoCtxImpl::C_aio_Safe::finish(int r) c->put_unlock(); } - diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index 45bfdfdbee7..b0a1b19c234 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -206,6 +206,9 @@ struct librados::IoCtxImpl { bufferlist *preplybl, char **preply_buf, size_t *preply_buf_len); int notify_ack(const object_t& oid, uint64_t notify_id, uint64_t cookie, bufferlist& bl); + int aio_notify(const object_t& oid, AioCompletionImpl *c, bufferlist& bl, + uint64_t timeout_ms, bufferlist *preplybl, char **preply_buf, + size_t *preply_buf_len); int set_alloc_hint(const object_t& oid, uint64_t expected_object_size, diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 86badc2bc84..c55fa996384 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -1811,6 +1811,15 @@ int librados::IoCtx::notify2(const string& oid, bufferlist& bl, return io_ctx_impl->notify(obj, bl, timeout_ms, preplybl, NULL, NULL); } +int librados::IoCtx::aio_notify(const string& oid, AioCompletion *c, + bufferlist& bl, uint64_t timeout_ms, + bufferlist *preplybl) +{ + object_t obj(oid); + return io_ctx_impl->aio_notify(obj, c->pc, bl, timeout_ms, preplybl, NULL, + NULL); +} + void librados::IoCtx::notify_ack(const std::string& o, uint64_t notify_id, uint64_t handle, bufferlist& bl) @@ -4072,6 +4081,28 @@ extern "C" int rados_notify2(rados_ioctx_t io, const char *o, return ret; } +extern "C" int rados_aio_notify(rados_ioctx_t io, const char *o, + rados_completion_t completion, + const char *buf, int buf_len, + uint64_t timeout_ms, char **reply_buffer, + size_t *reply_buffer_len) +{ + tracepoint(librados, rados_aio_notify_enter, io, o, completion, buf, buf_len, + timeout_ms); + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + if (buf) { + bl.push_back(buffer::copy(buf, buf_len)); + } + librados::AioCompletionImpl *c = + reinterpret_cast(completion); + int ret = ctx->aio_notify(oid, c, bl, timeout_ms, NULL, reply_buffer, + reply_buffer_len); + tracepoint(librados, rados_aio_notify_exit, ret); + return ret; +} + extern "C" int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, uint64_t handle, const char *buf, int buf_len) diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 97506a9899a..ac09e70b7dd 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -904,10 +904,14 @@ struct ObjectOperation { osd_op.op.watch.op = op; } - void notify(uint64_t cookie, bufferlist& inbl) { + void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout, + bufferlist &bl, bufferlist *inbl) { OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY); osd_op.op.notify.cookie = cookie; - osd_op.indata.append(inbl); + ::encode(prot_ver, *inbl); + ::encode(timeout, *inbl); + ::encode(bl, *inbl); + osd_op.indata.append(*inbl); } void notify_ack(uint64_t notify_id, uint64_t cookie, diff --git a/src/tracing/librados.tp b/src/tracing/librados.tp index 0ba22ea0410..dbb5d8dabbf 100644 --- a/src/tracing/librados.tp +++ b/src/tracing/librados.tp @@ -2339,6 +2339,31 @@ TRACEPOINT_EVENT(librados, rados_notify2_exit, ) ) +TRACEPOINT_EVENT(librados, rados_aio_notify_enter, + TP_ARGS( + rados_ioctx_t, ioctx, + const char*, oid, + rados_completion_t, completion, + const char*, buf, + int, buf_len, + uint64_t, timeout_ms), + TP_FIELDS( + ctf_integer_hex(rados_ioctx_t, ioctx, ioctx) + ctf_string(oid, oid) + ctf_integer_hex(rados_completion_t, completion, completion) + ceph_ctf_sequence(unsigned char, buf, buf, size_t, buf_len) + ctf_integer(uint64_t, timeout_ms, timeout_ms) + ) +) + +TRACEPOINT_EVENT(librados, rados_aio_notify_exit, + TP_ARGS( + int, retval), + TP_FIELDS( + ctf_integer(int, retval, retval) + ) +) + TRACEPOINT_EVENT(librados, rados_notify_ack_enter, TP_ARGS( rados_ioctx_t, ioctx,