diff --git a/src/include/rbd/librbd.h b/src/include/rbd/librbd.h index 9c09547e954..ae430788273 100644 --- a/src/include/rbd/librbd.h +++ b/src/include/rbd/librbd.h @@ -61,6 +61,8 @@ typedef void (*rbd_callback_t)(rbd_completion_t cb, void *arg); typedef int (*librbd_progress_fn_t)(uint64_t offset, uint64_t total, void *ptr); +typedef void (*rbd_update_callback_t)(void *arg); + typedef struct { uint64_t id; uint64_t size; @@ -663,6 +665,28 @@ CEPH_RBD_API int rbd_group_create(rados_ioctx_t p, const char *name); CEPH_RBD_API int rbd_group_remove(rados_ioctx_t p, const char *name); CEPH_RBD_API int rbd_group_list(rados_ioctx_t p, char *names, size_t *size); +/** + * Register an image metadata change watcher. + * + * @param image the image to watch + * @param handle where to store the internal id assigned to this watch + * @param watch_cb what to do when a notify is received on this image + * @param arg opaque value to pass to the callback + * @returns 0 on success, negative error code on failure + */ +CEPH_RBD_API int rbd_update_watch(rbd_image_t image, uint64_t *handle, + rbd_update_callback_t watch_cb, void *arg); + +/** + * Unregister an image watcher. + * + * @param image the image to unwatch + * @param handle which watch to unregister + * @returns 0 on success, negative error code on failure + */ +CEPH_RBD_API int rbd_update_unwatch(rbd_image_t image, uint64_t handle); + + #ifdef __cplusplus } #endif diff --git a/src/include/rbd/librbd.hpp b/src/include/rbd/librbd.hpp index 6b2026d4e81..61fc7bbcf58 100644 --- a/src/include/rbd/librbd.hpp +++ b/src/include/rbd/librbd.hpp @@ -181,6 +181,15 @@ private: rbd_image_options_t opts; }; +class CEPH_RBD_API UpdateWatchCtx { +public: + virtual ~UpdateWatchCtx() {} + /** + * Callback activated when we receive a notify event. + */ + virtual void handle_notify() = 0; +}; + class CEPH_RBD_API Image { public: @@ -365,6 +374,9 @@ public: int mirror_image_get_status(mirror_image_status_t *mirror_image_status, size_t status_size); + int update_watch(UpdateWatchCtx *ctx, uint64_t *handle); + int update_unwatch(uint64_t handle); + private: friend class RBD; diff --git a/src/librbd/ImageState.cc b/src/librbd/ImageState.cc index 39f4ee61066..5b450b0671e 100644 --- a/src/librbd/ImageState.cc +++ b/src/librbd/ImageState.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "librbd/ImageState.h" +#include "include/rbd/librbd.hpp" #include "common/dout.h" #include "common/errno.h" #include "common/Cond.h" @@ -22,16 +23,224 @@ namespace librbd { using util::create_async_context_callback; using util::create_context_callback; +class ImageUpdateWatchers { +public: + + ImageUpdateWatchers(CephContext *cct) : m_cct(cct), + m_lock(util::unique_lock_name("librbd::ImageUpdateWatchers::m_lock", this)) { + } + + ~ImageUpdateWatchers() { + assert(m_watchers.empty()); + assert(m_in_flight.empty()); + assert(m_pending_unregister.empty()); + assert(m_on_shut_down_finish == nullptr); + + destroy_work_queue(); + } + + void flush(Context *on_finish) { + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl; + { + Mutex::Locker locker(m_lock); + if (!m_in_flight.empty()) { + Context *ctx = new FunctionContext( + [this, on_finish](int r) { + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ + << ": completing flush" << dendl; + on_finish->complete(r); + }); + m_work_queue->queue(ctx, 0); + return; + } + } + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ + << ": completing flush" << dendl; + on_finish->complete(0); + } + + void shut_down(Context *on_finish) { + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl; + { + Mutex::Locker locker(m_lock); + assert(m_on_shut_down_finish == nullptr); + m_watchers.clear(); + if (!m_in_flight.empty()) { + m_on_shut_down_finish = on_finish; + return; + } + } + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ + << ": completing shut down" << dendl; + on_finish->complete(0); + } + + void register_watcher(UpdateWatchCtx *watcher, uint64_t *handle) { + ldout(m_cct, 20) << __func__ << ": watcher=" << watcher << dendl; + + Mutex::Locker locker(m_lock); + assert(m_on_shut_down_finish == nullptr); + + create_work_queue(); + + *handle = m_next_handle++; + m_watchers.insert(std::make_pair(*handle, watcher)); + } + + void unregister_watcher(uint64_t handle, Context *on_finish) { + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle=" + << handle << dendl; + int r = 0; + { + Mutex::Locker locker(m_lock); + auto it = m_watchers.find(handle); + if (it == m_watchers.end()) { + r = -ENOENT; + } else { + if (m_in_flight.find(handle) != m_in_flight.end()) { + assert(m_pending_unregister.find(handle) == m_pending_unregister.end()); + m_pending_unregister[handle] = on_finish; + on_finish = nullptr; + } + m_watchers.erase(it); + } + } + + if (on_finish) { + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ + << ": completing unregister" << dendl; + on_finish->complete(r); + } + } + + void notify() { + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl; + + Mutex::Locker locker(m_lock); + for (auto it : m_watchers) { + send_notify(it.first, it.second); + } + } + + void send_notify(uint64_t handle, UpdateWatchCtx *watcher) { + assert(m_lock.is_locked()); + + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle=" + << handle << ", watcher=" << watcher << dendl; + + m_in_flight.insert(handle); + + Context *ctx = new FunctionContext( + [this, handle, watcher](int r) { + handle_notify(handle, watcher); + }); + + m_work_queue->queue(ctx, 0); + } + + void handle_notify(uint64_t handle, UpdateWatchCtx *watcher) { + + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle=" + << handle << ", watcher=" << watcher << dendl; + + watcher->handle_notify(); + + Context *on_unregister_finish = nullptr; + Context *on_shut_down_finish = nullptr; + + { + Mutex::Locker locker(m_lock); + + auto in_flight_it = m_in_flight.find(handle); + assert(in_flight_it != m_in_flight.end()); + m_in_flight.erase(in_flight_it); + + // If there is no more in flight notifications for this watcher + // and it is pending unregister, complete it now. + if (m_in_flight.find(handle) == m_in_flight.end()) { + auto it = m_pending_unregister.find(handle); + if (it != m_pending_unregister.end()) { + on_unregister_finish = it->second; + m_pending_unregister.erase(it); + } + } + + if (m_in_flight.empty()) { + assert(m_pending_unregister.empty()); + if (m_on_shut_down_finish != nullptr) { + std::swap(m_on_shut_down_finish, on_shut_down_finish); + } + } + } + + if (on_unregister_finish != nullptr) { + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ + << ": completing unregister" << dendl; + on_unregister_finish->complete(0); + } + + if (on_shut_down_finish != nullptr) { + ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ + << ": completing shut down" << dendl; + on_shut_down_finish->complete(0); + } + } + +private: + class ThreadPoolSingleton : public ThreadPool { + public: + explicit ThreadPoolSingleton(CephContext *cct) + : ThreadPool(cct, "librbd::ImageUpdateWatchers::thread_pool", "tp_librbd", + 1) { + start(); + } + virtual ~ThreadPoolSingleton() { + stop(); + } + }; + + CephContext *m_cct; + Mutex m_lock; + ContextWQ *m_work_queue = nullptr; + std::map<uint64_t, UpdateWatchCtx*> m_watchers; + uint64_t m_next_handle = 0; + std::multiset<uint64_t> m_in_flight; + std::map<uint64_t, Context*> m_pending_unregister; + Context *m_on_shut_down_finish = nullptr; + + void create_work_queue() { + if (m_work_queue != nullptr) { + return; + } + ThreadPoolSingleton *thread_pool_singleton; + m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( + thread_pool_singleton, "librbd::ImageUpdateWatchers::thread_pool"); + m_work_queue = new ContextWQ("librbd::ImageUpdateWatchers::op_work_queue", + m_cct->_conf->rbd_op_thread_timeout, + thread_pool_singleton); + } + + void destroy_work_queue() { + if (m_work_queue == nullptr) { + return; + } + m_work_queue->drain(); + delete m_work_queue; + } +}; + template <typename I> ImageState<I>::ImageState(I *image_ctx) : m_image_ctx(image_ctx), m_state(STATE_UNINITIALIZED), m_lock(util::unique_lock_name("librbd::ImageState::m_lock", this)), - m_last_refresh(0), m_refresh_seq(0) { + m_last_refresh(0), m_refresh_seq(0), + m_update_watchers(new ImageUpdateWatchers(image_ctx->cct)) { } template <typename I> ImageState<I>::~ImageState() { assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED); + delete m_update_watchers; } template <typename I> @@ -84,8 +293,12 @@ void ImageState<I>::handle_update_notification() { ++m_refresh_seq; CephContext *cct = m_image_ctx->cct; - ldout(cct, 20) << "refresh_seq = " << m_refresh_seq << ", " + ldout(cct, 20) << __func__ << ": refresh_seq = " << m_refresh_seq << ", " << "last_refresh = " << m_last_refresh << dendl; + + if (m_state == STATE_OPEN) { + m_update_watchers->notify(); + } } template <typename I> @@ -164,6 +377,44 @@ void ImageState<I>::snap_set(const std::string &snap_name, Context *on_finish) { execute_action_unlock(action, on_finish); } +template <typename I> +int ImageState<I>::register_update_watcher(UpdateWatchCtx *watcher, + uint64_t *handle) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << __func__ << dendl; + + m_update_watchers->register_watcher(watcher, handle); + + ldout(cct, 20) << __func__ << ": handle=" << *handle << dendl; + return 0; +} + +template <typename I> +int ImageState<I>::unregister_update_watcher(uint64_t handle) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << __func__ << ": handle=" << handle << dendl; + + C_SaferCond ctx; + m_update_watchers->unregister_watcher(handle, &ctx); + return ctx.wait(); +} + +template <typename I> +void ImageState<I>::flush_update_watchers(Context *on_finish) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << __func__ << dendl; + + m_update_watchers->flush(on_finish); +} + +template <typename I> +void ImageState<I>::shut_down_update_watchers(Context *on_finish) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << __func__ << dendl; + + m_update_watchers->shut_down(on_finish); +} + template <typename I> bool ImageState<I>::is_transition_state() const { switch (m_state) { diff --git a/src/librbd/ImageState.h b/src/librbd/ImageState.h index b60172f0b0a..bad42775d39 100644 --- a/src/librbd/ImageState.h +++ b/src/librbd/ImageState.h @@ -16,6 +16,8 @@ class RWLock; namespace librbd { class ImageCtx; +class ImageUpdateWatchers; +class UpdateWatchCtx; template <typename ImageCtxT = ImageCtx> class ImageState { @@ -40,6 +42,11 @@ public: void snap_set(const std::string &snap_name, Context *on_finish); + int register_update_watcher(UpdateWatchCtx *watcher, uint64_t *handle); + int unregister_update_watcher(uint64_t handle); + void flush_update_watchers(Context *on_finish); + void shut_down_update_watchers(Context *on_finish); + private: enum State { STATE_UNINITIALIZED, @@ -95,6 +102,8 @@ private: uint64_t m_last_refresh; uint64_t m_refresh_seq; + ImageUpdateWatchers *m_update_watchers; + bool is_transition_state() const; bool is_closed() const; diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc index ffe951904a9..3e7072449cd 100644 --- a/src/librbd/ImageWatcher.cc +++ b/src/librbd/ImageWatcher.cc @@ -553,6 +553,10 @@ bool ImageWatcher::handle_payload(const HeaderUpdatePayload &payload, m_image_ctx.state->handle_update_notification(); m_image_ctx.perfcounter->inc(l_librbd_notify); + if (ack_ctx != nullptr) { + m_image_ctx.state->flush_update_watchers(new C_ResponseMessage(ack_ctx)); + return false; + } return true; } diff --git a/src/librbd/image/CloseRequest.cc b/src/librbd/image/CloseRequest.cc index 77a8ab76bf5..4ee52a58724 100644 --- a/src/librbd/image/CloseRequest.cc +++ b/src/librbd/image/CloseRequest.cc @@ -31,6 +31,30 @@ CloseRequest<I>::CloseRequest(I *image_ctx, Context *on_finish) template <typename I> void CloseRequest<I>::send() { + send_shut_down_update_watchers(); +} + +template <typename I> +void CloseRequest<I>::send_shut_down_update_watchers() { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 10) << this << " " << __func__ << dendl; + + m_image_ctx->state->shut_down_update_watchers(create_async_context_callback( + *m_image_ctx, create_context_callback< + CloseRequest<I>, &CloseRequest<I>::handle_shut_down_update_watchers>(this))); +} + +template <typename I> +void CloseRequest<I>::handle_shut_down_update_watchers(int r) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl; + + save_result(r); + if (r < 0) { + lderr(cct) << "failed to shut down update watchers: " << cpp_strerror(r) + << dendl; + } + send_unregister_image_watcher(); } diff --git a/src/librbd/image/CloseRequest.h b/src/librbd/image/CloseRequest.h index 8c674f942ff..e7d21845b7a 100644 --- a/src/librbd/image/CloseRequest.h +++ b/src/librbd/image/CloseRequest.h @@ -30,6 +30,9 @@ private: * <start> * | * v + * SHUT_DOWN_UPDATE_WATCHERS + * | + * v * UNREGISTER_IMAGE_WATCHER * | * v @@ -71,6 +74,9 @@ private: decltype(m_image_ctx->exclusive_lock) m_exclusive_lock; + void send_shut_down_update_watchers(); + void handle_shut_down_update_watchers(int r); + void send_unregister_image_watcher(); void handle_unregister_image_watcher(int r); diff --git a/src/librbd/librbd.cc b/src/librbd/librbd.cc index 052822f03e5..c518248dbf1 100644 --- a/src/librbd/librbd.cc +++ b/src/librbd/librbd.cc @@ -128,6 +128,19 @@ struct C_CloseComplete : public Context { } }; +struct C_UpdateWatchCB : public librbd::UpdateWatchCtx { + rbd_update_callback_t watch_cb; + void *arg; + uint64_t handle = 0; + + C_UpdateWatchCB(rbd_update_callback_t watch_cb, void *arg) : + watch_cb(watch_cb), arg(arg) { + } + void handle_notify() { + watch_cb(arg); + } +}; + void mirror_image_info_cpp_to_c(const librbd::mirror_image_info_t &cpp_info, rbd_mirror_image_info_t *c_info) { c_info->global_id = strdup(cpp_info.global_id.c_str()); @@ -1358,6 +1371,22 @@ namespace librbd { status_size); } + int Image::update_watch(UpdateWatchCtx *wctx, uint64_t *handle) { + ImageCtx *ictx = (ImageCtx *)ctx; + tracepoint(librbd, update_watch_enter, ictx, wctx); + int r = ictx->state->register_update_watcher(wctx, handle); + tracepoint(librbd, update_watch_exit, r, *handle); + return r; + } + + int Image::update_unwatch(uint64_t handle) { + ImageCtx *ictx = (ImageCtx *)ctx; + tracepoint(librbd, update_unwatch_enter, ictx, handle); + int r = ictx->state->unregister_update_watcher(handle); + tracepoint(librbd, update_unwatch_exit, r); + return r; + } + } // namespace librbd extern "C" void rbd_version(int *major, int *minor, int *extra) @@ -2832,6 +2861,29 @@ extern "C" int rbd_mirror_image_get_status(rbd_image_t image, return 0; } +extern "C" int rbd_update_watch(rbd_image_t image, uint64_t *handle, + rbd_update_callback_t watch_cb, void *arg) +{ + librbd::ImageCtx *ictx = (librbd::ImageCtx *)image; + C_UpdateWatchCB *wctx = new C_UpdateWatchCB(watch_cb, arg); + tracepoint(librbd, update_watch_enter, ictx, wctx); + int r = ictx->state->register_update_watcher(wctx, &wctx->handle); + tracepoint(librbd, update_watch_exit, r, wctx->handle); + *handle = reinterpret_cast<uint64_t>(wctx); + return r; +} + +extern "C" int rbd_update_unwatch(rbd_image_t image, uint64_t handle) +{ + librbd::ImageCtx *ictx = (librbd::ImageCtx *)image; + C_UpdateWatchCB *wctx = reinterpret_cast<C_UpdateWatchCB *>(handle); + tracepoint(librbd, update_unwatch_enter, ictx, wctx->handle); + int r = ictx->state->unregister_update_watcher(wctx->handle); + delete wctx; + tracepoint(librbd, update_unwatch_exit, r); + return r; +} + extern "C" int rbd_aio_is_complete(rbd_completion_t c) { librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c; diff --git a/src/test/librbd/test_librbd.cc b/src/test/librbd/test_librbd.cc index d4dce59ca3b..b7b48cdbf11 100644 --- a/src/test/librbd/test_librbd.cc +++ b/src/test/librbd/test_librbd.cc @@ -23,6 +23,7 @@ #include "gtest/gtest.h" +#include <chrono> #include <errno.h> #include <stdarg.h> #include <stdio.h> @@ -39,6 +40,8 @@ #include "test/librados/test.h" #include "test/librbd/test_support.h" +#include "common/Cond.h" +#include "common/Mutex.h" #include "common/errno.h" #include "common/event_socket.h" #include "include/interval_set.h" @@ -53,6 +56,8 @@ using namespace std; +using std::chrono::seconds; + #define ASSERT_PASSED(x, args...) \ do { \ bool passed = false; \ @@ -455,6 +460,111 @@ TEST_F(TestLibRBD, ResizeAndStatPP) ioctx.close(); } +TEST_F(TestLibRBD, UpdateWatchAndResize) +{ + rados_ioctx_t ioctx; + rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx); + + rbd_image_t image; + int order = 0; + std::string name = get_temp_image_name(); + uint64_t size = 2 << 20; + struct Watcher { + static void cb(void *arg) { + Watcher *watcher = (Watcher *)arg; + watcher->handle_notify(); + } + Watcher(rbd_image_t &image) : m_image(image), m_lock("lock") {} + void handle_notify() { + rbd_image_info_t info; + ASSERT_EQ(0, rbd_stat(m_image, &info, sizeof(info))); + Mutex::Locker locker(m_lock); + m_size = info.size; + m_cond.Signal(); + } + void wait_for_size(size_t size) { + Mutex::Locker locker(m_lock); + while (m_size != size) { + CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct()); + ASSERT_EQ(0, m_cond.WaitInterval(cct, m_lock, seconds(5))); + } + } + rbd_image_t &m_image; + Mutex m_lock; + Cond m_cond; + size_t m_size = 0; + } watcher(image); + uint64_t handle; + + ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order)); + ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL)); + + ASSERT_EQ(0, rbd_update_watch(image, &handle, Watcher::cb, &watcher)); + + ASSERT_EQ(0, rbd_resize(image, size * 4)); + watcher.wait_for_size(size * 4); + + ASSERT_EQ(0, rbd_resize(image, size / 2)); + watcher.wait_for_size(size / 2); + + ASSERT_EQ(0, rbd_update_unwatch(image, handle)); + + ASSERT_EQ(0, rbd_close(image)); + rados_ioctx_destroy(ioctx); +} + +TEST_F(TestLibRBD, UpdateWatchAndResizePP) +{ + librados::IoCtx ioctx; + ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx)); + + { + librbd::RBD rbd; + librbd::Image image; + int order = 0; + std::string name = get_temp_image_name(); + uint64_t size = 2 << 20; + struct Watcher : public librbd::UpdateWatchCtx { + Watcher(librbd::Image &image) : m_image(image), m_lock("lock") { + } + void handle_notify() { + librbd::image_info_t info; + ASSERT_EQ(0, m_image.stat(info, sizeof(info))); + Mutex::Locker locker(m_lock); + m_size = info.size; + m_cond.Signal(); + } + void wait_for_size(size_t size) { + Mutex::Locker locker(m_lock); + while (m_size != size) { + CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct()); + ASSERT_EQ(0, m_cond.WaitInterval(cct, m_lock, seconds(5))); + } + } + librbd::Image &m_image; + Mutex m_lock; + Cond m_cond; + size_t m_size = 0; + } watcher(image); + uint64_t handle; + + ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order)); + ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL)); + + ASSERT_EQ(0, image.update_watch(&watcher, &handle)); + + ASSERT_EQ(0, image.resize(size * 4)); + watcher.wait_for_size(size * 4); + + ASSERT_EQ(0, image.resize(size / 2)); + watcher.wait_for_size(size / 2); + + ASSERT_EQ(0, image.update_unwatch(handle)); + } + + ioctx.close(); +} + int test_ls(rados_ioctx_t io_ctx, size_t num_expected, ...) { int num_images, i; diff --git a/src/tracing/librbd.tp b/src/tracing/librbd.tp index e85aa024a3e..2652ae9fed6 100644 --- a/src/tracing/librbd.tp +++ b/src/tracing/librbd.tp @@ -1901,3 +1901,41 @@ TRACEPOINT_EVENT(librbd, group_list_exit, ctf_integer(int, retval, retval) ) ) + +TRACEPOINT_EVENT(librbd, update_watch_enter, + TP_ARGS( + void*, imagectx, + void*, watchctx), + TP_FIELDS( + ctf_integer_hex(void*, imagctx, imagectx) + ctf_integer_hex(void*, watchctx, watchctx) + ) +) + +TRACEPOINT_EVENT(librbd, update_watch_exit, + TP_ARGS( + int, retval, + uint64_t, handle), + TP_FIELDS( + ctf_integer(int, retval, retval) + ctf_integer(uint64_t, handle, handle) + ) +) + +TRACEPOINT_EVENT(librbd, update_unwatch_enter, + TP_ARGS( + void*, imagectx, + uint64_t, handle), + TP_FIELDS( + ctf_integer_hex(void*, imagctx, imagectx) + ctf_integer(uint64_t, handle, handle) + ) +) + +TRACEPOINT_EVENT(librbd, update_unwatch_exit, + TP_ARGS( + int, retval), + TP_FIELDS( + ctf_integer(int, retval, retval) + ) +)