librbd: API: methods to watch image stat update

Signed-off-by: Mykola Golub <mgolub@mirantis.com>
This commit is contained in:
Mykola Golub 2016-05-24 15:50:22 +03:00
parent 44a4651bbd
commit 9952b75a0c
10 changed files with 532 additions and 2 deletions

View File

@ -61,6 +61,8 @@ typedef void (*rbd_callback_t)(rbd_completion_t cb, void *arg);
typedef int (*librbd_progress_fn_t)(uint64_t offset, uint64_t total, void *ptr);
typedef void (*rbd_update_callback_t)(void *arg);
typedef struct {
uint64_t id;
uint64_t size;
@ -663,6 +665,28 @@ CEPH_RBD_API int rbd_group_create(rados_ioctx_t p, const char *name);
CEPH_RBD_API int rbd_group_remove(rados_ioctx_t p, const char *name);
CEPH_RBD_API int rbd_group_list(rados_ioctx_t p, char *names, size_t *size);
/**
* Register an image metadata change watcher.
*
* @param image the image to watch
* @param handle where to store the internal id assigned to this watch
* @param watch_cb what to do when a notify is received on this image
* @param arg opaque value to pass to the callback
* @returns 0 on success, negative error code on failure
*/
CEPH_RBD_API int rbd_update_watch(rbd_image_t image, uint64_t *handle,
rbd_update_callback_t watch_cb, void *arg);
/**
* Unregister an image watcher.
*
* @param image the image to unwatch
* @param handle which watch to unregister
* @returns 0 on success, negative error code on failure
*/
CEPH_RBD_API int rbd_update_unwatch(rbd_image_t image, uint64_t handle);
#ifdef __cplusplus
}
#endif

View File

@ -181,6 +181,15 @@ private:
rbd_image_options_t opts;
};
class CEPH_RBD_API UpdateWatchCtx {
public:
virtual ~UpdateWatchCtx() {}
/**
* Callback activated when we receive a notify event.
*/
virtual void handle_notify() = 0;
};
class CEPH_RBD_API Image
{
public:
@ -365,6 +374,9 @@ public:
int mirror_image_get_status(mirror_image_status_t *mirror_image_status,
size_t status_size);
int update_watch(UpdateWatchCtx *ctx, uint64_t *handle);
int update_unwatch(uint64_t handle);
private:
friend class RBD;

View File

@ -2,6 +2,7 @@
// vim: ts=8 sw=2 smarttab
#include "librbd/ImageState.h"
#include "include/rbd/librbd.hpp"
#include "common/dout.h"
#include "common/errno.h"
#include "common/Cond.h"
@ -22,16 +23,224 @@ namespace librbd {
using util::create_async_context_callback;
using util::create_context_callback;
class ImageUpdateWatchers {
public:
ImageUpdateWatchers(CephContext *cct) : m_cct(cct),
m_lock(util::unique_lock_name("librbd::ImageUpdateWatchers::m_lock", this)) {
}
~ImageUpdateWatchers() {
assert(m_watchers.empty());
assert(m_in_flight.empty());
assert(m_pending_unregister.empty());
assert(m_on_shut_down_finish == nullptr);
destroy_work_queue();
}
void flush(Context *on_finish) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl;
{
Mutex::Locker locker(m_lock);
if (!m_in_flight.empty()) {
Context *ctx = new FunctionContext(
[this, on_finish](int r) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing flush" << dendl;
on_finish->complete(r);
});
m_work_queue->queue(ctx, 0);
return;
}
}
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing flush" << dendl;
on_finish->complete(0);
}
void shut_down(Context *on_finish) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl;
{
Mutex::Locker locker(m_lock);
assert(m_on_shut_down_finish == nullptr);
m_watchers.clear();
if (!m_in_flight.empty()) {
m_on_shut_down_finish = on_finish;
return;
}
}
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing shut down" << dendl;
on_finish->complete(0);
}
void register_watcher(UpdateWatchCtx *watcher, uint64_t *handle) {
ldout(m_cct, 20) << __func__ << ": watcher=" << watcher << dendl;
Mutex::Locker locker(m_lock);
assert(m_on_shut_down_finish == nullptr);
create_work_queue();
*handle = m_next_handle++;
m_watchers.insert(std::make_pair(*handle, watcher));
}
void unregister_watcher(uint64_t handle, Context *on_finish) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle="
<< handle << dendl;
int r = 0;
{
Mutex::Locker locker(m_lock);
auto it = m_watchers.find(handle);
if (it == m_watchers.end()) {
r = -ENOENT;
} else {
if (m_in_flight.find(handle) != m_in_flight.end()) {
assert(m_pending_unregister.find(handle) == m_pending_unregister.end());
m_pending_unregister[handle] = on_finish;
on_finish = nullptr;
}
m_watchers.erase(it);
}
}
if (on_finish) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing unregister" << dendl;
on_finish->complete(r);
}
}
void notify() {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl;
Mutex::Locker locker(m_lock);
for (auto it : m_watchers) {
send_notify(it.first, it.second);
}
}
void send_notify(uint64_t handle, UpdateWatchCtx *watcher) {
assert(m_lock.is_locked());
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle="
<< handle << ", watcher=" << watcher << dendl;
m_in_flight.insert(handle);
Context *ctx = new FunctionContext(
[this, handle, watcher](int r) {
handle_notify(handle, watcher);
});
m_work_queue->queue(ctx, 0);
}
void handle_notify(uint64_t handle, UpdateWatchCtx *watcher) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle="
<< handle << ", watcher=" << watcher << dendl;
watcher->handle_notify();
Context *on_unregister_finish = nullptr;
Context *on_shut_down_finish = nullptr;
{
Mutex::Locker locker(m_lock);
auto in_flight_it = m_in_flight.find(handle);
assert(in_flight_it != m_in_flight.end());
m_in_flight.erase(in_flight_it);
// If there is no more in flight notifications for this watcher
// and it is pending unregister, complete it now.
if (m_in_flight.find(handle) == m_in_flight.end()) {
auto it = m_pending_unregister.find(handle);
if (it != m_pending_unregister.end()) {
on_unregister_finish = it->second;
m_pending_unregister.erase(it);
}
}
if (m_in_flight.empty()) {
assert(m_pending_unregister.empty());
if (m_on_shut_down_finish != nullptr) {
std::swap(m_on_shut_down_finish, on_shut_down_finish);
}
}
}
if (on_unregister_finish != nullptr) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing unregister" << dendl;
on_unregister_finish->complete(0);
}
if (on_shut_down_finish != nullptr) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing shut down" << dendl;
on_shut_down_finish->complete(0);
}
}
private:
class ThreadPoolSingleton : public ThreadPool {
public:
explicit ThreadPoolSingleton(CephContext *cct)
: ThreadPool(cct, "librbd::ImageUpdateWatchers::thread_pool", "tp_librbd",
1) {
start();
}
virtual ~ThreadPoolSingleton() {
stop();
}
};
CephContext *m_cct;
Mutex m_lock;
ContextWQ *m_work_queue = nullptr;
std::map<uint64_t, UpdateWatchCtx*> m_watchers;
uint64_t m_next_handle = 0;
std::multiset<uint64_t> m_in_flight;
std::map<uint64_t, Context*> m_pending_unregister;
Context *m_on_shut_down_finish = nullptr;
void create_work_queue() {
if (m_work_queue != nullptr) {
return;
}
ThreadPoolSingleton *thread_pool_singleton;
m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
thread_pool_singleton, "librbd::ImageUpdateWatchers::thread_pool");
m_work_queue = new ContextWQ("librbd::ImageUpdateWatchers::op_work_queue",
m_cct->_conf->rbd_op_thread_timeout,
thread_pool_singleton);
}
void destroy_work_queue() {
if (m_work_queue == nullptr) {
return;
}
m_work_queue->drain();
delete m_work_queue;
}
};
template <typename I>
ImageState<I>::ImageState(I *image_ctx)
: m_image_ctx(image_ctx), m_state(STATE_UNINITIALIZED),
m_lock(util::unique_lock_name("librbd::ImageState::m_lock", this)),
m_last_refresh(0), m_refresh_seq(0) {
m_last_refresh(0), m_refresh_seq(0),
m_update_watchers(new ImageUpdateWatchers(image_ctx->cct)) {
}
template <typename I>
ImageState<I>::~ImageState() {
assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
delete m_update_watchers;
}
template <typename I>
@ -84,8 +293,12 @@ void ImageState<I>::handle_update_notification() {
++m_refresh_seq;
CephContext *cct = m_image_ctx->cct;
ldout(cct, 20) << "refresh_seq = " << m_refresh_seq << ", "
ldout(cct, 20) << __func__ << ": refresh_seq = " << m_refresh_seq << ", "
<< "last_refresh = " << m_last_refresh << dendl;
if (m_state == STATE_OPEN) {
m_update_watchers->notify();
}
}
template <typename I>
@ -164,6 +377,44 @@ void ImageState<I>::snap_set(const std::string &snap_name, Context *on_finish) {
execute_action_unlock(action, on_finish);
}
template <typename I>
int ImageState<I>::register_update_watcher(UpdateWatchCtx *watcher,
uint64_t *handle) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 20) << __func__ << dendl;
m_update_watchers->register_watcher(watcher, handle);
ldout(cct, 20) << __func__ << ": handle=" << *handle << dendl;
return 0;
}
template <typename I>
int ImageState<I>::unregister_update_watcher(uint64_t handle) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 20) << __func__ << ": handle=" << handle << dendl;
C_SaferCond ctx;
m_update_watchers->unregister_watcher(handle, &ctx);
return ctx.wait();
}
template <typename I>
void ImageState<I>::flush_update_watchers(Context *on_finish) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 20) << __func__ << dendl;
m_update_watchers->flush(on_finish);
}
template <typename I>
void ImageState<I>::shut_down_update_watchers(Context *on_finish) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 20) << __func__ << dendl;
m_update_watchers->shut_down(on_finish);
}
template <typename I>
bool ImageState<I>::is_transition_state() const {
switch (m_state) {

View File

@ -16,6 +16,8 @@ class RWLock;
namespace librbd {
class ImageCtx;
class ImageUpdateWatchers;
class UpdateWatchCtx;
template <typename ImageCtxT = ImageCtx>
class ImageState {
@ -40,6 +42,11 @@ public:
void snap_set(const std::string &snap_name, Context *on_finish);
int register_update_watcher(UpdateWatchCtx *watcher, uint64_t *handle);
int unregister_update_watcher(uint64_t handle);
void flush_update_watchers(Context *on_finish);
void shut_down_update_watchers(Context *on_finish);
private:
enum State {
STATE_UNINITIALIZED,
@ -95,6 +102,8 @@ private:
uint64_t m_last_refresh;
uint64_t m_refresh_seq;
ImageUpdateWatchers *m_update_watchers;
bool is_transition_state() const;
bool is_closed() const;

View File

@ -553,6 +553,10 @@ bool ImageWatcher::handle_payload(const HeaderUpdatePayload &payload,
m_image_ctx.state->handle_update_notification();
m_image_ctx.perfcounter->inc(l_librbd_notify);
if (ack_ctx != nullptr) {
m_image_ctx.state->flush_update_watchers(new C_ResponseMessage(ack_ctx));
return false;
}
return true;
}

View File

@ -31,6 +31,30 @@ CloseRequest<I>::CloseRequest(I *image_ctx, Context *on_finish)
template <typename I>
void CloseRequest<I>::send() {
send_shut_down_update_watchers();
}
template <typename I>
void CloseRequest<I>::send_shut_down_update_watchers() {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 10) << this << " " << __func__ << dendl;
m_image_ctx->state->shut_down_update_watchers(create_async_context_callback(
*m_image_ctx, create_context_callback<
CloseRequest<I>, &CloseRequest<I>::handle_shut_down_update_watchers>(this)));
}
template <typename I>
void CloseRequest<I>::handle_shut_down_update_watchers(int r) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
save_result(r);
if (r < 0) {
lderr(cct) << "failed to shut down update watchers: " << cpp_strerror(r)
<< dendl;
}
send_unregister_image_watcher();
}

View File

@ -30,6 +30,9 @@ private:
* <start>
* |
* v
* SHUT_DOWN_UPDATE_WATCHERS
* |
* v
* UNREGISTER_IMAGE_WATCHER
* |
* v
@ -71,6 +74,9 @@ private:
decltype(m_image_ctx->exclusive_lock) m_exclusive_lock;
void send_shut_down_update_watchers();
void handle_shut_down_update_watchers(int r);
void send_unregister_image_watcher();
void handle_unregister_image_watcher(int r);

View File

@ -128,6 +128,19 @@ struct C_CloseComplete : public Context {
}
};
struct C_UpdateWatchCB : public librbd::UpdateWatchCtx {
rbd_update_callback_t watch_cb;
void *arg;
uint64_t handle = 0;
C_UpdateWatchCB(rbd_update_callback_t watch_cb, void *arg) :
watch_cb(watch_cb), arg(arg) {
}
void handle_notify() {
watch_cb(arg);
}
};
void mirror_image_info_cpp_to_c(const librbd::mirror_image_info_t &cpp_info,
rbd_mirror_image_info_t *c_info) {
c_info->global_id = strdup(cpp_info.global_id.c_str());
@ -1358,6 +1371,22 @@ namespace librbd {
status_size);
}
int Image::update_watch(UpdateWatchCtx *wctx, uint64_t *handle) {
ImageCtx *ictx = (ImageCtx *)ctx;
tracepoint(librbd, update_watch_enter, ictx, wctx);
int r = ictx->state->register_update_watcher(wctx, handle);
tracepoint(librbd, update_watch_exit, r, *handle);
return r;
}
int Image::update_unwatch(uint64_t handle) {
ImageCtx *ictx = (ImageCtx *)ctx;
tracepoint(librbd, update_unwatch_enter, ictx, handle);
int r = ictx->state->unregister_update_watcher(handle);
tracepoint(librbd, update_unwatch_exit, r);
return r;
}
} // namespace librbd
extern "C" void rbd_version(int *major, int *minor, int *extra)
@ -2832,6 +2861,29 @@ extern "C" int rbd_mirror_image_get_status(rbd_image_t image,
return 0;
}
extern "C" int rbd_update_watch(rbd_image_t image, uint64_t *handle,
rbd_update_callback_t watch_cb, void *arg)
{
librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
C_UpdateWatchCB *wctx = new C_UpdateWatchCB(watch_cb, arg);
tracepoint(librbd, update_watch_enter, ictx, wctx);
int r = ictx->state->register_update_watcher(wctx, &wctx->handle);
tracepoint(librbd, update_watch_exit, r, wctx->handle);
*handle = reinterpret_cast<uint64_t>(wctx);
return r;
}
extern "C" int rbd_update_unwatch(rbd_image_t image, uint64_t handle)
{
librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
C_UpdateWatchCB *wctx = reinterpret_cast<C_UpdateWatchCB *>(handle);
tracepoint(librbd, update_unwatch_enter, ictx, wctx->handle);
int r = ictx->state->unregister_update_watcher(wctx->handle);
delete wctx;
tracepoint(librbd, update_unwatch_exit, r);
return r;
}
extern "C" int rbd_aio_is_complete(rbd_completion_t c)
{
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;

View File

@ -23,6 +23,7 @@
#include "gtest/gtest.h"
#include <chrono>
#include <errno.h>
#include <stdarg.h>
#include <stdio.h>
@ -39,6 +40,8 @@
#include "test/librados/test.h"
#include "test/librbd/test_support.h"
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/errno.h"
#include "common/event_socket.h"
#include "include/interval_set.h"
@ -53,6 +56,8 @@
using namespace std;
using std::chrono::seconds;
#define ASSERT_PASSED(x, args...) \
do { \
bool passed = false; \
@ -455,6 +460,111 @@ TEST_F(TestLibRBD, ResizeAndStatPP)
ioctx.close();
}
TEST_F(TestLibRBD, UpdateWatchAndResize)
{
rados_ioctx_t ioctx;
rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
rbd_image_t image;
int order = 0;
std::string name = get_temp_image_name();
uint64_t size = 2 << 20;
struct Watcher {
static void cb(void *arg) {
Watcher *watcher = (Watcher *)arg;
watcher->handle_notify();
}
Watcher(rbd_image_t &image) : m_image(image), m_lock("lock") {}
void handle_notify() {
rbd_image_info_t info;
ASSERT_EQ(0, rbd_stat(m_image, &info, sizeof(info)));
Mutex::Locker locker(m_lock);
m_size = info.size;
m_cond.Signal();
}
void wait_for_size(size_t size) {
Mutex::Locker locker(m_lock);
while (m_size != size) {
CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct());
ASSERT_EQ(0, m_cond.WaitInterval(cct, m_lock, seconds(5)));
}
}
rbd_image_t &m_image;
Mutex m_lock;
Cond m_cond;
size_t m_size = 0;
} watcher(image);
uint64_t handle;
ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
ASSERT_EQ(0, rbd_update_watch(image, &handle, Watcher::cb, &watcher));
ASSERT_EQ(0, rbd_resize(image, size * 4));
watcher.wait_for_size(size * 4);
ASSERT_EQ(0, rbd_resize(image, size / 2));
watcher.wait_for_size(size / 2);
ASSERT_EQ(0, rbd_update_unwatch(image, handle));
ASSERT_EQ(0, rbd_close(image));
rados_ioctx_destroy(ioctx);
}
TEST_F(TestLibRBD, UpdateWatchAndResizePP)
{
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
{
librbd::RBD rbd;
librbd::Image image;
int order = 0;
std::string name = get_temp_image_name();
uint64_t size = 2 << 20;
struct Watcher : public librbd::UpdateWatchCtx {
Watcher(librbd::Image &image) : m_image(image), m_lock("lock") {
}
void handle_notify() {
librbd::image_info_t info;
ASSERT_EQ(0, m_image.stat(info, sizeof(info)));
Mutex::Locker locker(m_lock);
m_size = info.size;
m_cond.Signal();
}
void wait_for_size(size_t size) {
Mutex::Locker locker(m_lock);
while (m_size != size) {
CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct());
ASSERT_EQ(0, m_cond.WaitInterval(cct, m_lock, seconds(5)));
}
}
librbd::Image &m_image;
Mutex m_lock;
Cond m_cond;
size_t m_size = 0;
} watcher(image);
uint64_t handle;
ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
ASSERT_EQ(0, image.update_watch(&watcher, &handle));
ASSERT_EQ(0, image.resize(size * 4));
watcher.wait_for_size(size * 4);
ASSERT_EQ(0, image.resize(size / 2));
watcher.wait_for_size(size / 2);
ASSERT_EQ(0, image.update_unwatch(handle));
}
ioctx.close();
}
int test_ls(rados_ioctx_t io_ctx, size_t num_expected, ...)
{
int num_images, i;

View File

@ -1901,3 +1901,41 @@ TRACEPOINT_EVENT(librbd, group_list_exit,
ctf_integer(int, retval, retval)
)
)
TRACEPOINT_EVENT(librbd, update_watch_enter,
TP_ARGS(
void*, imagectx,
void*, watchctx),
TP_FIELDS(
ctf_integer_hex(void*, imagctx, imagectx)
ctf_integer_hex(void*, watchctx, watchctx)
)
)
TRACEPOINT_EVENT(librbd, update_watch_exit,
TP_ARGS(
int, retval,
uint64_t, handle),
TP_FIELDS(
ctf_integer(int, retval, retval)
ctf_integer(uint64_t, handle, handle)
)
)
TRACEPOINT_EVENT(librbd, update_unwatch_enter,
TP_ARGS(
void*, imagectx,
uint64_t, handle),
TP_FIELDS(
ctf_integer_hex(void*, imagctx, imagectx)
ctf_integer(uint64_t, handle, handle)
)
)
TRACEPOINT_EVENT(librbd, update_unwatch_exit,
TP_ARGS(
int, retval),
TP_FIELDS(
ctf_integer(int, retval, retval)
)
)