mirror of
https://github.com/ceph/ceph
synced 2025-01-04 10:12:30 +00:00
rbd: ImageWatcher refactoring using Watcher super class
Signed-off-by: Ricardo Dias <rdias@suse.com>
This commit is contained in:
parent
653df23bf0
commit
7295969a30
@ -44,7 +44,6 @@ set(librbd_internal_srcs
|
||||
image/SetFlagsRequest.cc
|
||||
image/SetSnapRequest.cc
|
||||
image_watcher/NotifyLockOwner.cc
|
||||
image_watcher/RewatchRequest.cc
|
||||
journal/RemoveRequest.cc
|
||||
journal/CreateRequest.cc
|
||||
journal/OpenRequest.cc
|
||||
|
@ -12,7 +12,6 @@
|
||||
#include "librbd/Utils.h"
|
||||
#include "librbd/exclusive_lock/Policy.h"
|
||||
#include "librbd/image_watcher/NotifyLockOwner.h"
|
||||
#include "librbd/image_watcher/RewatchRequest.h"
|
||||
#include "include/encoding.h"
|
||||
#include "common/errno.h"
|
||||
#include "common/WorkQueue.h"
|
||||
@ -29,62 +28,18 @@ using namespace watch_notify;
|
||||
using util::create_async_context_callback;
|
||||
using util::create_context_callback;
|
||||
using util::create_rados_safe_callback;
|
||||
|
||||
namespace {
|
||||
|
||||
struct C_UnwatchAndFlush : public Context {
|
||||
librados::Rados rados;
|
||||
Context *on_finish;
|
||||
bool flushing = false;
|
||||
int ret_val = 0;
|
||||
|
||||
C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
|
||||
: rados(io_ctx), on_finish(on_finish) {
|
||||
}
|
||||
|
||||
virtual void complete(int r) override {
|
||||
if (ret_val == 0 && r < 0) {
|
||||
ret_val = r;
|
||||
}
|
||||
|
||||
if (!flushing) {
|
||||
flushing = true;
|
||||
|
||||
librados::AioCompletion *aio_comp = create_rados_safe_callback(this);
|
||||
r = rados.aio_watch_flush(aio_comp);
|
||||
assert(r == 0);
|
||||
aio_comp->release();
|
||||
return;
|
||||
}
|
||||
|
||||
// ensure our reference to the RadosClient is released prior
|
||||
// to completing the callback to avoid racing an explicit
|
||||
// librados shutdown
|
||||
Context *ctx = on_finish;
|
||||
r = ret_val;
|
||||
delete this;
|
||||
|
||||
ctx->complete(r);
|
||||
}
|
||||
|
||||
virtual void finish(int r) override {
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
using librbd::watcher::HandlePayloadVisitor;
|
||||
using librbd::watcher::C_NotifyAck;
|
||||
|
||||
static const double RETRY_DELAY_SECONDS = 1.0;
|
||||
|
||||
template <typename I>
|
||||
ImageWatcher<I>::ImageWatcher(I &image_ctx)
|
||||
: m_image_ctx(image_ctx),
|
||||
m_watch_lock(util::unique_lock_name("librbd::ImageWatcher::m_watch_lock", this)),
|
||||
m_watch_ctx(*this), m_watch_handle(0),
|
||||
m_watch_state(WATCH_STATE_UNREGISTERED),
|
||||
: Watcher(image_ctx.md_ctx, image_ctx.op_work_queue, image_ctx.header_oid),
|
||||
m_image_ctx(image_ctx),
|
||||
m_task_finisher(new TaskFinisher<Task>(*m_image_ctx.cct)),
|
||||
m_async_request_lock(util::unique_lock_name("librbd::ImageWatcher::m_async_request_lock", this)),
|
||||
m_owner_client_id_lock(util::unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this)),
|
||||
m_notifier(image_ctx.op_work_queue, image_ctx.md_ctx, image_ctx.header_oid)
|
||||
m_owner_client_id_lock(util::unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this))
|
||||
{
|
||||
}
|
||||
|
||||
@ -92,35 +47,6 @@ template <typename I>
|
||||
ImageWatcher<I>::~ImageWatcher()
|
||||
{
|
||||
delete m_task_finisher;
|
||||
{
|
||||
RWLock::RLocker l(m_watch_lock);
|
||||
assert(m_watch_state != WATCH_STATE_REGISTERED);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::register_watch(Context *on_finish) {
|
||||
ldout(m_image_ctx.cct, 10) << this << " registering image watcher" << dendl;
|
||||
|
||||
RWLock::RLocker watch_locker(m_watch_lock);
|
||||
assert(m_watch_state == WATCH_STATE_UNREGISTERED);
|
||||
librados::AioCompletion *aio_comp = create_rados_safe_callback(
|
||||
new C_RegisterWatch(this, on_finish));
|
||||
int r = m_image_ctx.md_ctx.aio_watch(m_image_ctx.header_oid, aio_comp,
|
||||
&m_watch_handle, &m_watch_ctx);
|
||||
assert(r == 0);
|
||||
aio_comp->release();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::handle_register_watch(int r) {
|
||||
RWLock::WLocker watch_locker(m_watch_lock);
|
||||
assert(m_watch_state == WATCH_STATE_UNREGISTERED);
|
||||
if (r < 0) {
|
||||
m_watch_handle = 0;
|
||||
} else if (r >= 0) {
|
||||
m_watch_state = WATCH_STATE_REGISTERED;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -130,42 +56,10 @@ void ImageWatcher<I>::unregister_watch(Context *on_finish) {
|
||||
|
||||
cancel_async_requests();
|
||||
|
||||
C_Gather *gather_ctx = nullptr;
|
||||
{
|
||||
RWLock::WLocker watch_locker(m_watch_lock);
|
||||
if (m_watch_state == WATCH_STATE_REWATCHING) {
|
||||
ldout(cct, 10) << this << " delaying unregister until rewatch completed"
|
||||
<< dendl;
|
||||
|
||||
assert(m_unregister_watch_ctx == nullptr);
|
||||
m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
|
||||
unregister_watch(on_finish);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
gather_ctx = new C_Gather(m_image_ctx.cct, create_async_context_callback(
|
||||
m_image_ctx, on_finish));
|
||||
if (m_watch_state == WATCH_STATE_REGISTERED ||
|
||||
m_watch_state == WATCH_STATE_ERROR) {
|
||||
m_watch_state = WATCH_STATE_UNREGISTERED;
|
||||
|
||||
librados::AioCompletion *aio_comp = create_rados_safe_callback(
|
||||
new C_UnwatchAndFlush(m_image_ctx.md_ctx, gather_ctx->new_sub()));
|
||||
int r = m_image_ctx.md_ctx.aio_unwatch(m_watch_handle, aio_comp);
|
||||
assert(r == 0);
|
||||
aio_comp->release();
|
||||
}
|
||||
}
|
||||
|
||||
assert(gather_ctx != nullptr);
|
||||
m_task_finisher->cancel_all(gather_ctx->new_sub());
|
||||
gather_ctx->activate();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::flush(Context *on_finish) {
|
||||
m_notifier.flush(on_finish);
|
||||
FunctionContext *ctx = new FunctionContext([this, on_finish](int r) {
|
||||
m_task_finisher->cancel_all(on_finish);
|
||||
});
|
||||
Watcher::unregister_watch(ctx);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -184,9 +78,7 @@ int ImageWatcher<I>::notify_async_progress(const AsyncRequestId &request,
|
||||
<< request << " @ " << offset
|
||||
<< "/" << total << dendl;
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(AsyncProgressPayload(request, offset, total)), bl);
|
||||
m_notifier.notify(bl, nullptr, nullptr);
|
||||
send_notify(AsyncProgressPayload(request, offset, total));
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -204,11 +96,9 @@ void ImageWatcher<I>::notify_async_complete(const AsyncRequestId &request,
|
||||
ldout(m_image_ctx.cct, 20) << this << " remote async request finished: "
|
||||
<< request << " = " << r << dendl;
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(AsyncCompletePayload(request, r)), bl);
|
||||
m_notifier.notify(bl, nullptr, new FunctionContext(
|
||||
boost::bind(&ImageWatcher<I>::handle_async_complete, this, request, r,
|
||||
_1)));
|
||||
send_notify(AsyncCompletePayload(request, r),
|
||||
new FunctionContext(boost::bind(&ImageWatcher<I>::handle_async_complete,
|
||||
this, request, r, _1)));
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -239,9 +129,8 @@ void ImageWatcher<I>::notify_flatten(uint64_t request_id,
|
||||
|
||||
AsyncRequestId async_request_id(get_client_id(), request_id);
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(FlattenPayload(async_request_id)), bl);
|
||||
notify_async_request(async_request_id, std::move(bl), prog_ctx, on_finish);
|
||||
notify_async_request(async_request_id, FlattenPayload(async_request_id),
|
||||
prog_ctx, on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -255,9 +144,9 @@ void ImageWatcher<I>::notify_resize(uint64_t request_id, uint64_t size,
|
||||
|
||||
AsyncRequestId async_request_id(get_client_id(), request_id);
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(ResizePayload(size, allow_shrink, async_request_id)), bl);
|
||||
notify_async_request(async_request_id, std::move(bl), prog_ctx, on_finish);
|
||||
notify_async_request(async_request_id,
|
||||
ResizePayload(size, allow_shrink, async_request_id),
|
||||
prog_ctx, on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -268,9 +157,7 @@ void ImageWatcher<I>::notify_snap_create(const std::string &snap_name,
|
||||
assert(m_image_ctx.exclusive_lock &&
|
||||
!m_image_ctx.exclusive_lock->is_lock_owner());
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(SnapCreatePayload(snap_name, snap_namespace)), bl);
|
||||
notify_lock_owner(std::move(bl), on_finish);
|
||||
notify_lock_owner(SnapCreatePayload(snap_name, snap_namespace), on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -281,9 +168,7 @@ void ImageWatcher<I>::notify_snap_rename(const snapid_t &src_snap_id,
|
||||
assert(m_image_ctx.exclusive_lock &&
|
||||
!m_image_ctx.exclusive_lock->is_lock_owner());
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(SnapRenamePayload(src_snap_id, dst_snap_name)), bl);
|
||||
notify_lock_owner(std::move(bl), on_finish);
|
||||
notify_lock_owner(SnapRenamePayload(src_snap_id, dst_snap_name), on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -293,9 +178,7 @@ void ImageWatcher<I>::notify_snap_remove(const std::string &snap_name,
|
||||
assert(m_image_ctx.exclusive_lock &&
|
||||
!m_image_ctx.exclusive_lock->is_lock_owner());
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(SnapRemovePayload(snap_name)), bl);
|
||||
notify_lock_owner(std::move(bl), on_finish);
|
||||
notify_lock_owner(SnapRemovePayload(snap_name), on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -305,9 +188,7 @@ void ImageWatcher<I>::notify_snap_protect(const std::string &snap_name,
|
||||
assert(m_image_ctx.exclusive_lock &&
|
||||
!m_image_ctx.exclusive_lock->is_lock_owner());
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(SnapProtectPayload(snap_name)), bl);
|
||||
notify_lock_owner(std::move(bl), on_finish);
|
||||
notify_lock_owner(SnapProtectPayload(snap_name), on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -317,9 +198,7 @@ void ImageWatcher<I>::notify_snap_unprotect(const std::string &snap_name,
|
||||
assert(m_image_ctx.exclusive_lock &&
|
||||
!m_image_ctx.exclusive_lock->is_lock_owner());
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(SnapUnprotectPayload(snap_name)), bl);
|
||||
notify_lock_owner(std::move(bl), on_finish);
|
||||
notify_lock_owner(SnapUnprotectPayload(snap_name), on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -332,9 +211,9 @@ void ImageWatcher<I>::notify_rebuild_object_map(uint64_t request_id,
|
||||
|
||||
AsyncRequestId async_request_id(get_client_id(), request_id);
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(RebuildObjectMapPayload(async_request_id)), bl);
|
||||
notify_async_request(async_request_id, std::move(bl), prog_ctx, on_finish);
|
||||
notify_async_request(async_request_id,
|
||||
RebuildObjectMapPayload(async_request_id),
|
||||
prog_ctx, on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -344,9 +223,7 @@ void ImageWatcher<I>::notify_rename(const std::string &image_name,
|
||||
assert(m_image_ctx.exclusive_lock &&
|
||||
!m_image_ctx.exclusive_lock->is_lock_owner());
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(RenamePayload(image_name)), bl);
|
||||
notify_lock_owner(std::move(bl), on_finish);
|
||||
notify_lock_owner(RenamePayload(image_name), on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -356,9 +233,7 @@ void ImageWatcher<I>::notify_update_features(uint64_t features, bool enabled,
|
||||
assert(m_image_ctx.exclusive_lock &&
|
||||
!m_image_ctx.exclusive_lock->is_lock_owner());
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(UpdateFeaturesPayload(features, enabled)), bl);
|
||||
notify_lock_owner(std::move(bl), on_finish);
|
||||
notify_lock_owner(UpdateFeaturesPayload(features, enabled), on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -366,9 +241,7 @@ void ImageWatcher<I>::notify_header_update(Context *on_finish) {
|
||||
ldout(m_image_ctx.cct, 10) << this << ": " << __func__ << dendl;
|
||||
|
||||
// supports legacy (empty buffer) clients
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(HeaderUpdatePayload()), bl);
|
||||
m_notifier.notify(bl, nullptr, on_finish);
|
||||
send_notify(HeaderUpdatePayload(), on_finish);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -377,7 +250,7 @@ void ImageWatcher<I>::notify_header_update(librados::IoCtx &io_ctx,
|
||||
// supports legacy (empty buffer) clients
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(HeaderUpdatePayload()), bl);
|
||||
io_ctx.notify2(oid, bl, object_watcher::Notifier::NOTIFY_TIMEOUT, nullptr);
|
||||
io_ctx.notify2(oid, bl, watcher::Notifier::NOTIFY_TIMEOUT, nullptr);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -408,8 +281,8 @@ void ImageWatcher<I>::set_owner_client_id(const ClientId& client_id) {
|
||||
|
||||
template <typename I>
|
||||
ClientId ImageWatcher<I>::get_client_id() {
|
||||
RWLock::RLocker l(m_watch_lock);
|
||||
return ClientId(m_image_ctx.md_ctx.get_instance_id(), m_watch_handle);
|
||||
RWLock::RLocker l(this->m_watch_lock);
|
||||
return ClientId(m_image_ctx.md_ctx.get_instance_id(), this->m_watch_handle);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -422,9 +295,7 @@ void ImageWatcher<I>::notify_acquired_lock() {
|
||||
set_owner_client_id(client_id);
|
||||
}
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(AcquiredLockPayload(client_id)), bl);
|
||||
m_notifier.notify(bl, nullptr, nullptr);
|
||||
send_notify(AcquiredLockPayload(client_id));
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -436,9 +307,7 @@ void ImageWatcher<I>::notify_released_lock() {
|
||||
set_owner_client_id(ClientId());
|
||||
}
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(ReleasedLockPayload(get_client_id())), bl);
|
||||
m_notifier.notify(bl, nullptr, nullptr);
|
||||
send_notify(ReleasedLockPayload(get_client_id()));
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -452,8 +321,8 @@ void ImageWatcher<I>::schedule_request_lock(bool use_timer, int timer_delay) {
|
||||
assert(m_image_ctx.exclusive_lock &&
|
||||
!m_image_ctx.exclusive_lock->is_lock_owner());
|
||||
|
||||
RWLock::RLocker watch_locker(m_watch_lock);
|
||||
if (m_watch_state == WATCH_STATE_REGISTERED) {
|
||||
RWLock::RLocker watch_locker(this->m_watch_lock);
|
||||
if (this->m_watch_state == Watcher::WATCH_STATE_REGISTERED) {
|
||||
ldout(m_image_ctx.cct, 15) << this << " requesting exclusive lock" << dendl;
|
||||
|
||||
FunctionContext *ctx = new FunctionContext(
|
||||
@ -462,8 +331,8 @@ void ImageWatcher<I>::schedule_request_lock(bool use_timer, int timer_delay) {
|
||||
if (timer_delay < 0) {
|
||||
timer_delay = RETRY_DELAY_SECONDS;
|
||||
}
|
||||
m_task_finisher->add_event_after(TASK_CODE_REQUEST_LOCK, timer_delay,
|
||||
ctx);
|
||||
m_task_finisher->add_event_after(TASK_CODE_REQUEST_LOCK,
|
||||
timer_delay, ctx);
|
||||
} else {
|
||||
m_task_finisher->queue(TASK_CODE_REQUEST_LOCK, ctx);
|
||||
}
|
||||
@ -484,10 +353,9 @@ void ImageWatcher<I>::notify_request_lock() {
|
||||
|
||||
ldout(m_image_ctx.cct, 10) << this << " notify request lock" << dendl;
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(RequestLockPayload(get_client_id(), false)), bl);
|
||||
notify_lock_owner(std::move(bl), create_context_callback<
|
||||
ImageWatcher, &ImageWatcher<I>::handle_request_lock>(this));
|
||||
notify_lock_owner(RequestLockPayload(get_client_id(), false),
|
||||
create_context_callback<
|
||||
ImageWatcher, &ImageWatcher<I>::handle_request_lock>(this));
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -521,11 +389,16 @@ void ImageWatcher<I>::handle_request_lock(int r) {
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::notify_lock_owner(bufferlist &&bl, Context *on_finish) {
|
||||
void ImageWatcher<I>::notify_lock_owner(const Payload& payload,
|
||||
Context *on_finish) {
|
||||
assert(on_finish != nullptr);
|
||||
assert(m_image_ctx.owner_lock.is_locked());
|
||||
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage(payload), bl);
|
||||
|
||||
NotifyLockOwner *notify_lock_owner = NotifyLockOwner::create(
|
||||
m_image_ctx, m_notifier, std::move(bl), on_finish);
|
||||
m_image_ctx, this->m_notifier, std::move(bl), on_finish);
|
||||
notify_lock_owner->send();
|
||||
}
|
||||
|
||||
@ -552,7 +425,8 @@ void ImageWatcher<I>::schedule_async_request_timed_out(const AsyncRequestId &id)
|
||||
Task task(TASK_CODE_ASYNC_REQUEST, id);
|
||||
m_task_finisher->cancel(task);
|
||||
|
||||
m_task_finisher->add_event_after(task, m_image_ctx.request_timed_out_seconds, ctx);
|
||||
m_task_finisher->add_event_after(task, m_image_ctx.request_timed_out_seconds,
|
||||
ctx);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -566,9 +440,9 @@ void ImageWatcher<I>::async_request_timed_out(const AsyncRequestId &id) {
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::notify_async_request(const AsyncRequestId &async_request_id,
|
||||
bufferlist &&in,
|
||||
ProgressContext& prog_ctx,
|
||||
Context *on_finish) {
|
||||
const Payload& payload,
|
||||
ProgressContext& prog_ctx,
|
||||
Context *on_finish) {
|
||||
assert(on_finish != nullptr);
|
||||
assert(m_image_ctx.owner_lock.is_locked());
|
||||
|
||||
@ -584,6 +458,7 @@ void ImageWatcher<I>::notify_async_request(const AsyncRequestId &async_request_i
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Context *on_complete = new FunctionContext(
|
||||
[this, async_request_id, on_finish](int r) {
|
||||
m_task_finisher->cancel(Task(TASK_CODE_ASYNC_REQUEST, async_request_id));
|
||||
@ -596,7 +471,7 @@ void ImageWatcher<I>::notify_async_request(const AsyncRequestId &async_request_i
|
||||
}
|
||||
|
||||
schedule_async_request_timed_out(async_request_id);
|
||||
notify_lock_owner(std::move(in), on_notify);
|
||||
notify_lock_owner(payload, on_notify);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -1003,9 +878,11 @@ void ImageWatcher<I>::process_payload(uint64_t notify_id, uint64_t handle,
|
||||
const Payload &payload, int r) {
|
||||
if (r < 0) {
|
||||
bufferlist out_bl;
|
||||
acknowledge_notify(notify_id, handle, out_bl);
|
||||
this->acknowledge_notify(notify_id, handle, out_bl);
|
||||
} else {
|
||||
apply_visitor(HandlePayloadVisitor(this, notify_id, handle), payload);
|
||||
apply_visitor(HandlePayloadVisitor<ImageWatcher<I>>(this, notify_id,
|
||||
handle),
|
||||
payload);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1047,81 +924,32 @@ void ImageWatcher<I>::handle_error(uint64_t handle, int err) {
|
||||
set_owner_client_id(ClientId());
|
||||
}
|
||||
|
||||
RWLock::WLocker l(m_watch_lock);
|
||||
if (m_watch_state == WATCH_STATE_REGISTERED) {
|
||||
m_watch_state = WATCH_STATE_ERROR;
|
||||
|
||||
FunctionContext *ctx = new FunctionContext(
|
||||
boost::bind(&ImageWatcher<I>::rewatch, this));
|
||||
m_task_finisher->queue(TASK_CODE_REREGISTER_WATCH, ctx);
|
||||
}
|
||||
Watcher::handle_error(handle, err);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::acknowledge_notify(uint64_t notify_id, uint64_t handle,
|
||||
bufferlist &out) {
|
||||
m_image_ctx.md_ctx.notify_ack(m_image_ctx.header_oid, notify_id, handle, out);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::rewatch() {
|
||||
ldout(m_image_ctx.cct, 10) << this << " re-registering image watch" << dendl;
|
||||
|
||||
RWLock::WLocker l(m_watch_lock);
|
||||
if (m_watch_state != WATCH_STATE_ERROR) {
|
||||
return;
|
||||
}
|
||||
m_watch_state = WATCH_STATE_REWATCHING;
|
||||
|
||||
Context *ctx = create_context_callback<
|
||||
ImageWatcher<I>, &ImageWatcher<I>::handle_rewatch>(this);
|
||||
RewatchRequest<I> *req = RewatchRequest<I>::create(m_image_ctx, m_watch_lock,
|
||||
&m_watch_ctx,
|
||||
&m_watch_handle, ctx);
|
||||
req->send();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::handle_rewatch(int r) {
|
||||
void ImageWatcher<I>::handle_rewatch_complete(int r) {
|
||||
CephContext *cct = m_image_ctx.cct;
|
||||
ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
|
||||
|
||||
WatchState next_watch_state = WATCH_STATE_REGISTERED;
|
||||
if (r < 0) {
|
||||
// only EBLACKLISTED or ENOENT can be returned
|
||||
assert(r == -EBLACKLISTED || r == -ENOENT);
|
||||
next_watch_state = WATCH_STATE_UNREGISTERED;
|
||||
}
|
||||
|
||||
Context *unregister_watch_ctx = nullptr;
|
||||
{
|
||||
RWLock::WLocker watch_locker(m_watch_lock);
|
||||
assert(m_watch_state == WATCH_STATE_REWATCHING);
|
||||
m_watch_state = next_watch_state;
|
||||
|
||||
std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
|
||||
|
||||
// image might have been updated while we didn't have active watch
|
||||
handle_payload(HeaderUpdatePayload(), nullptr);
|
||||
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
|
||||
if (m_image_ctx.exclusive_lock != nullptr) {
|
||||
// update the lock cookie with the new watch handle
|
||||
m_image_ctx.exclusive_lock->reacquire_lock();
|
||||
}
|
||||
}
|
||||
|
||||
// wake up pending unregister request
|
||||
if (unregister_watch_ctx != nullptr) {
|
||||
unregister_watch_ctx->complete(0);
|
||||
}
|
||||
// image might have been updated while we didn't have active watch
|
||||
handle_payload(HeaderUpdatePayload(), nullptr);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::WatchCtx::handle_notify(uint64_t notify_id,
|
||||
uint64_t handle,
|
||||
uint64_t notifier_id,
|
||||
bufferlist& bl) {
|
||||
image_watcher.handle_notify(notify_id, handle, bl);
|
||||
}
|
||||
void ImageWatcher<I>::send_notify(const Payload &payload, Context *ctx) {
|
||||
bufferlist bl;
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::WatchCtx::handle_error(uint64_t handle, int err) {
|
||||
image_watcher.handle_error(handle, err);
|
||||
::encode(NotifyMessage(payload), bl);
|
||||
Watcher::send_notify(bl, nullptr, ctx);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -1129,28 +957,9 @@ void ImageWatcher<I>::RemoteContext::finish(int r) {
|
||||
m_image_watcher.schedule_async_complete(m_async_request_id, r);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
ImageWatcher<I>::C_NotifyAck::C_NotifyAck(ImageWatcher *image_watcher,
|
||||
uint64_t notify_id, uint64_t handle)
|
||||
: image_watcher(image_watcher), notify_id(notify_id), handle(handle) {
|
||||
CephContext *cct = image_watcher->m_image_ctx.cct;
|
||||
ldout(cct, 10) << this << " C_NotifyAck start: id=" << notify_id << ", "
|
||||
<< "handle=" << handle << dendl;
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::C_NotifyAck::finish(int r) {
|
||||
assert(r == 0);
|
||||
CephContext *cct = image_watcher->m_image_ctx.cct;
|
||||
ldout(cct, 10) << this << " C_NotifyAck finish: id=" << notify_id << ", "
|
||||
<< "handle=" << handle << dendl;
|
||||
|
||||
image_watcher->acknowledge_notify(notify_id, handle, out);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageWatcher<I>::C_ResponseMessage::finish(int r) {
|
||||
CephContext *cct = notify_ack->image_watcher->m_image_ctx.cct;
|
||||
CephContext *cct = notify_ack->cct;
|
||||
ldout(cct, 10) << this << " C_ResponseMessage: r=" << r << dendl;
|
||||
|
||||
::encode(ResponseMessage(r), notify_ack->out);
|
||||
|
@ -9,29 +9,32 @@
|
||||
#include "common/RWLock.h"
|
||||
#include "include/Context.h"
|
||||
#include "include/rbd/librbd.hpp"
|
||||
#include "librbd/object_watcher/Notifier.h"
|
||||
#include "librbd/Watcher.h"
|
||||
#include "librbd/WatchNotifyTypes.h"
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <boost/variant.hpp>
|
||||
|
||||
class entity_name_t;
|
||||
|
||||
namespace librbd {
|
||||
|
||||
namespace watcher {
|
||||
template <typename> struct HandlePayloadVisitor;
|
||||
}
|
||||
|
||||
class ImageCtx;
|
||||
template <typename T> class TaskFinisher;
|
||||
template <typename> class TaskFinisher;
|
||||
|
||||
template <typename ImageCtxT = ImageCtx>
|
||||
class ImageWatcher {
|
||||
class ImageWatcher : public Watcher {
|
||||
friend struct watcher::HandlePayloadVisitor<ImageWatcher<ImageCtxT>>;
|
||||
|
||||
public:
|
||||
ImageWatcher(ImageCtxT& image_ctx);
|
||||
~ImageWatcher();
|
||||
virtual ~ImageWatcher();
|
||||
|
||||
void register_watch(Context *on_finish);
|
||||
void unregister_watch(Context *on_finish);
|
||||
void flush(Context *on_finish);
|
||||
|
||||
void notify_flatten(uint64_t request_id, ProgressContext &prog_ctx,
|
||||
Context *on_finish);
|
||||
@ -61,19 +64,7 @@ public:
|
||||
static void notify_header_update(librados::IoCtx &io_ctx,
|
||||
const std::string &oid);
|
||||
|
||||
uint64_t get_watch_handle() const {
|
||||
RWLock::RLocker watch_locker(m_watch_lock);
|
||||
return m_watch_handle;
|
||||
}
|
||||
|
||||
private:
|
||||
enum WatchState {
|
||||
WATCH_STATE_UNREGISTERED,
|
||||
WATCH_STATE_REGISTERED,
|
||||
WATCH_STATE_ERROR,
|
||||
WATCH_STATE_REWATCHING
|
||||
};
|
||||
|
||||
enum TaskCode {
|
||||
TASK_CODE_REQUEST_LOCK,
|
||||
TASK_CODE_CANCEL_ASYNC_REQUESTS,
|
||||
@ -105,18 +96,6 @@ private:
|
||||
watch_notify::AsyncRequestId m_async_request_id;
|
||||
};
|
||||
|
||||
struct WatchCtx : public librados::WatchCtx2 {
|
||||
ImageWatcher &image_watcher;
|
||||
|
||||
WatchCtx(ImageWatcher &parent) : image_watcher(parent) {}
|
||||
|
||||
virtual void handle_notify(uint64_t notify_id,
|
||||
uint64_t handle,
|
||||
uint64_t notifier_id,
|
||||
bufferlist& bl);
|
||||
virtual void handle_error(uint64_t handle, int err);
|
||||
};
|
||||
|
||||
class RemoteProgressContext : public ProgressContext {
|
||||
public:
|
||||
RemoteProgressContext(ImageWatcher &image_watcher,
|
||||
@ -158,37 +137,6 @@ private:
|
||||
ProgressContext *m_prog_ctx;
|
||||
};
|
||||
|
||||
struct C_RegisterWatch : public Context {
|
||||
ImageWatcher *image_watcher;
|
||||
Context *on_finish;
|
||||
|
||||
C_RegisterWatch(ImageWatcher *image_watcher, Context *on_finish)
|
||||
: image_watcher(image_watcher), on_finish(on_finish) {
|
||||
}
|
||||
virtual void finish(int r) override {
|
||||
image_watcher->handle_register_watch(r);
|
||||
on_finish->complete(r);
|
||||
}
|
||||
};
|
||||
struct C_NotifyAck : public Context {
|
||||
ImageWatcher *image_watcher;
|
||||
uint64_t notify_id;
|
||||
uint64_t handle;
|
||||
bufferlist out;
|
||||
|
||||
C_NotifyAck(ImageWatcher *image_watcher, uint64_t notify_id,
|
||||
uint64_t handle);
|
||||
virtual void finish(int r);
|
||||
};
|
||||
|
||||
struct C_ResponseMessage : public Context {
|
||||
C_NotifyAck *notify_ack;
|
||||
|
||||
C_ResponseMessage(C_NotifyAck *notify_ack) : notify_ack(notify_ack) {
|
||||
}
|
||||
virtual void finish(int r);
|
||||
};
|
||||
|
||||
struct C_ProcessPayload : public Context {
|
||||
ImageWatcher *image_watcher;
|
||||
uint64_t notify_id;
|
||||
@ -206,35 +154,16 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
struct HandlePayloadVisitor : public boost::static_visitor<void> {
|
||||
ImageWatcher *image_watcher;
|
||||
uint64_t notify_id;
|
||||
uint64_t handle;
|
||||
struct C_ResponseMessage : public Context {
|
||||
watcher::C_NotifyAck *notify_ack;
|
||||
|
||||
HandlePayloadVisitor(ImageWatcher *image_watcher_, uint64_t notify_id_,
|
||||
uint64_t handle_)
|
||||
: image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_)
|
||||
{
|
||||
}
|
||||
|
||||
template <typename Payload>
|
||||
inline void operator()(const Payload &payload) const {
|
||||
C_NotifyAck *ctx = new C_NotifyAck(image_watcher, notify_id,
|
||||
handle);
|
||||
if (image_watcher->handle_payload(payload, ctx)) {
|
||||
ctx->complete(0);
|
||||
}
|
||||
C_ResponseMessage(watcher::C_NotifyAck *notify_ack) : notify_ack(notify_ack) {
|
||||
}
|
||||
virtual void finish(int r);
|
||||
};
|
||||
|
||||
ImageCtxT &m_image_ctx;
|
||||
|
||||
mutable RWLock m_watch_lock;
|
||||
WatchCtx m_watch_ctx;
|
||||
uint64_t m_watch_handle;
|
||||
WatchState m_watch_state;
|
||||
Context *m_unregister_watch_ctx = nullptr;
|
||||
|
||||
TaskFinisher<Task> *m_task_finisher;
|
||||
|
||||
RWLock m_async_request_lock;
|
||||
@ -244,8 +173,6 @@ private:
|
||||
Mutex m_owner_client_id_lock;
|
||||
watch_notify::ClientId m_owner_client_id;
|
||||
|
||||
object_watcher::Notifier m_notifier;
|
||||
|
||||
void handle_register_watch(int r);
|
||||
|
||||
void schedule_cancel_async_requests();
|
||||
@ -257,13 +184,15 @@ private:
|
||||
void handle_request_lock(int r);
|
||||
void schedule_request_lock(bool use_timer, int timer_delay = -1);
|
||||
|
||||
void notify_lock_owner(bufferlist &&bl, Context *on_finish);
|
||||
void notify_lock_owner(const watch_notify::Payload& payload,
|
||||
Context *on_finish);
|
||||
|
||||
Context *remove_async_request(const watch_notify::AsyncRequestId &id);
|
||||
void schedule_async_request_timed_out(const watch_notify::AsyncRequestId &id);
|
||||
void async_request_timed_out(const watch_notify::AsyncRequestId &id);
|
||||
void notify_async_request(const watch_notify::AsyncRequestId &id,
|
||||
bufferlist &&in, ProgressContext& prog_ctx,
|
||||
const watch_notify::Payload &payload,
|
||||
ProgressContext& prog_ctx,
|
||||
Context *on_finish);
|
||||
|
||||
void schedule_async_progress(const watch_notify::AsyncRequestId &id,
|
||||
@ -280,48 +209,50 @@ private:
|
||||
ProgressContext** prog_ctx);
|
||||
|
||||
bool handle_payload(const watch_notify::HeaderUpdatePayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::AcquiredLockPayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::ReleasedLockPayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::RequestLockPayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::AsyncProgressPayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::AsyncCompletePayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::FlattenPayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::ResizePayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::SnapCreatePayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::SnapRenamePayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::SnapRemovePayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::SnapProtectPayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::SnapUnprotectPayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::RebuildObjectMapPayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::RenamePayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::UpdateFeaturesPayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::UnknownPayload& payload,
|
||||
C_NotifyAck *ctx);
|
||||
watcher::C_NotifyAck *ctx);
|
||||
void process_payload(uint64_t notify_id, uint64_t handle,
|
||||
const watch_notify::Payload &payload, int r);
|
||||
const watch_notify::Payload &payload, int r);
|
||||
|
||||
void handle_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl);
|
||||
void handle_error(uint64_t cookie, int err);
|
||||
void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out);
|
||||
virtual void handle_notify(uint64_t notify_id, uint64_t handle,
|
||||
bufferlist &bl);
|
||||
virtual void handle_error(uint64_t cookie, int err);
|
||||
virtual void handle_rewatch_complete(int r);
|
||||
|
||||
void send_notify(const watch_notify::Payload& payload,
|
||||
Context *ctx = nullptr);
|
||||
|
||||
void rewatch();
|
||||
void handle_rewatch(int r);
|
||||
};
|
||||
|
||||
} // namespace librbd
|
||||
|
@ -3,6 +3,7 @@
|
||||
|
||||
#include "cls/rbd/cls_rbd_types.h"
|
||||
#include "librbd/WatchNotifyTypes.h"
|
||||
#include "librbd/watcher/Types.h"
|
||||
#include "include/assert.h"
|
||||
#include "include/stringify.h"
|
||||
#include "common/Formatter.h"
|
||||
@ -20,35 +21,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class EncodePayloadVisitor : public boost::static_visitor<void> {
|
||||
public:
|
||||
explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
|
||||
|
||||
template <typename Payload>
|
||||
inline void operator()(const Payload &payload) const {
|
||||
::encode(static_cast<uint32_t>(Payload::NOTIFY_OP), m_bl);
|
||||
payload.encode(m_bl);
|
||||
}
|
||||
|
||||
private:
|
||||
bufferlist &m_bl;
|
||||
};
|
||||
|
||||
class DecodePayloadVisitor : public boost::static_visitor<void> {
|
||||
public:
|
||||
DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
|
||||
: m_version(version), m_iter(iter) {}
|
||||
|
||||
template <typename Payload>
|
||||
inline void operator()(Payload &payload) const {
|
||||
payload.decode(m_version, m_iter);
|
||||
}
|
||||
|
||||
private:
|
||||
__u8 m_version;
|
||||
bufferlist::iterator &m_iter;
|
||||
};
|
||||
|
||||
class DumpPayloadVisitor : public boost::static_visitor<void> {
|
||||
public:
|
||||
explicit DumpPayloadVisitor(Formatter *formatter) : m_formatter(formatter) {}
|
||||
@ -64,7 +36,7 @@ private:
|
||||
ceph::Formatter *m_formatter;
|
||||
};
|
||||
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
void ClientId::encode(bufferlist &bl) const {
|
||||
::encode(gid, bl);
|
||||
@ -318,7 +290,7 @@ bool NotifyMessage::check_for_refresh() const {
|
||||
|
||||
void NotifyMessage::encode(bufferlist& bl) const {
|
||||
ENCODE_START(5, 1, bl);
|
||||
boost::apply_visitor(EncodePayloadVisitor(bl), payload);
|
||||
boost::apply_visitor(watcher::EncodePayloadVisitor(bl), payload);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
@ -383,7 +355,7 @@ void NotifyMessage::decode(bufferlist::iterator& iter) {
|
||||
break;
|
||||
}
|
||||
|
||||
apply_visitor(DecodePayloadVisitor(struct_v, iter), payload);
|
||||
apply_visitor(watcher::DecodePayloadVisitor(struct_v, iter), payload);
|
||||
DECODE_FINISH(iter);
|
||||
}
|
||||
|
||||
|
@ -129,6 +129,8 @@ void Watcher::unregister_watch(Context *on_finish) {
|
||||
int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
|
||||
assert(r == 0);
|
||||
aio_comp->release();
|
||||
} else {
|
||||
on_finish->complete(0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,7 +6,7 @@
|
||||
#include "librbd/ImageCtx.h"
|
||||
#include "librbd/Utils.h"
|
||||
#include "librbd/WatchNotifyTypes.h"
|
||||
#include "librbd/object_watcher/Notifier.h"
|
||||
#include "librbd/watcher/Notifier.h"
|
||||
#include <map>
|
||||
|
||||
#define dout_subsys ceph_subsys_rbd
|
||||
@ -15,13 +15,14 @@
|
||||
<< this << " " << __func__
|
||||
|
||||
namespace librbd {
|
||||
|
||||
namespace image_watcher {
|
||||
|
||||
using namespace watch_notify;
|
||||
using util::create_context_callback;
|
||||
|
||||
NotifyLockOwner::NotifyLockOwner(ImageCtx &image_ctx,
|
||||
object_watcher::Notifier ¬ifier,
|
||||
watcher::Notifier ¬ifier,
|
||||
bufferlist &&bl, Context *on_finish)
|
||||
: m_image_ctx(image_ctx), m_notifier(notifier), m_bl(std::move(bl)),
|
||||
m_on_finish(on_finish) {
|
||||
|
@ -12,26 +12,26 @@ namespace librbd {
|
||||
|
||||
struct ImageCtx;
|
||||
|
||||
namespace object_watcher { class Notifier; }
|
||||
namespace watcher { class Notifier; }
|
||||
|
||||
namespace image_watcher {
|
||||
|
||||
class NotifyLockOwner {
|
||||
public:
|
||||
static NotifyLockOwner *create(ImageCtx &image_ctx,
|
||||
object_watcher::Notifier ¬ifier,
|
||||
watcher::Notifier ¬ifier,
|
||||
bufferlist &&bl, Context *on_finish) {
|
||||
return new NotifyLockOwner(image_ctx, notifier, std::move(bl), on_finish);
|
||||
}
|
||||
|
||||
NotifyLockOwner(ImageCtx &image_ctx, object_watcher::Notifier ¬ifier,
|
||||
NotifyLockOwner(ImageCtx &image_ctx, watcher::Notifier ¬ifier,
|
||||
bufferlist &&bl, Context *on_finish);
|
||||
|
||||
void send();
|
||||
|
||||
private:
|
||||
ImageCtx &m_image_ctx;
|
||||
object_watcher::Notifier &m_notifier;
|
||||
watcher::Notifier &m_notifier;
|
||||
|
||||
bufferlist m_bl;
|
||||
bufferlist m_out_bl;
|
||||
|
@ -1,126 +0,0 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
|
||||
#include "librbd/image_watcher/RewatchRequest.h"
|
||||
#include "common/errno.h"
|
||||
#include "librbd/ExclusiveLock.h"
|
||||
#include "librbd/ImageCtx.h"
|
||||
#include "librbd/Utils.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_rbd
|
||||
#undef dout_prefix
|
||||
#define dout_prefix *_dout << "librbd::image_watcher::RewatchRequest: " \
|
||||
<< this << ": " << __func__
|
||||
|
||||
namespace librbd {
|
||||
namespace image_watcher {
|
||||
|
||||
using librbd::util::create_context_callback;
|
||||
using librbd::util::create_rados_safe_callback;
|
||||
|
||||
template <typename I>
|
||||
RewatchRequest<I>::RewatchRequest(I &image_ctx, RWLock &watch_lock,
|
||||
librados::WatchCtx2 *watch_ctx,
|
||||
uint64_t *watch_handle, Context *on_finish)
|
||||
: m_image_ctx(image_ctx), m_watch_lock(watch_lock), m_watch_ctx(watch_ctx),
|
||||
m_watch_handle(watch_handle), m_on_finish(on_finish) {
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void RewatchRequest<I>::send() {
|
||||
unwatch();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void RewatchRequest<I>::unwatch() {
|
||||
assert(m_watch_lock.is_wlocked());
|
||||
assert(*m_watch_handle != 0);
|
||||
|
||||
CephContext *cct = m_image_ctx.cct;
|
||||
ldout(cct, 10) << dendl;
|
||||
|
||||
librados::AioCompletion *aio_comp = create_rados_safe_callback<
|
||||
RewatchRequest<I>, &RewatchRequest<I>::handle_unwatch>(this);
|
||||
int r = m_image_ctx.md_ctx.aio_unwatch(*m_watch_handle, aio_comp);
|
||||
assert(r == 0);
|
||||
aio_comp->release();
|
||||
|
||||
*m_watch_handle = 0;
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void RewatchRequest<I>::handle_unwatch(int r) {
|
||||
CephContext *cct = m_image_ctx.cct;
|
||||
ldout(cct, 10) << "r=" << r << dendl;
|
||||
|
||||
if (r == -EBLACKLISTED) {
|
||||
lderr(cct) << "client blacklisted" << dendl;
|
||||
finish(r);
|
||||
return;
|
||||
} else if (r < 0) {
|
||||
lderr(cct) << "failed to unwatch: " << cpp_strerror(r) << dendl;
|
||||
}
|
||||
rewatch();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void RewatchRequest<I>::rewatch() {
|
||||
CephContext *cct = m_image_ctx.cct;
|
||||
ldout(cct, 10) << dendl;
|
||||
|
||||
librados::AioCompletion *aio_comp = create_rados_safe_callback<
|
||||
RewatchRequest<I>, &RewatchRequest<I>::handle_rewatch>(this);
|
||||
int r = m_image_ctx.md_ctx.aio_watch(m_image_ctx.header_oid, aio_comp,
|
||||
&m_rewatch_handle, m_watch_ctx);
|
||||
assert(r == 0);
|
||||
aio_comp->release();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void RewatchRequest<I>::handle_rewatch(int r) {
|
||||
CephContext *cct = m_image_ctx.cct;
|
||||
ldout(cct, 10) << "r=" << r << dendl;
|
||||
|
||||
if (r == -EBLACKLISTED) {
|
||||
lderr(cct) << "client blacklisted" << dendl;
|
||||
finish(r);
|
||||
return;
|
||||
} else if (r == -ENOENT) {
|
||||
ldout(cct, 5) << "image header deleted" << dendl;
|
||||
finish(r);
|
||||
return;
|
||||
} else if (r < 0) {
|
||||
lderr(cct) << "failed to watch image header: " << cpp_strerror(r)
|
||||
<< dendl;
|
||||
rewatch();
|
||||
return;
|
||||
}
|
||||
|
||||
{
|
||||
RWLock::WLocker watch_locker(m_watch_lock);
|
||||
*m_watch_handle = m_rewatch_handle;
|
||||
}
|
||||
|
||||
{
|
||||
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
|
||||
if (m_image_ctx.exclusive_lock != nullptr) {
|
||||
// update the lock cookie with the new watch handle
|
||||
m_image_ctx.exclusive_lock->reacquire_lock();
|
||||
}
|
||||
}
|
||||
finish(0);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void RewatchRequest<I>::finish(int r) {
|
||||
CephContext *cct = m_image_ctx.cct;
|
||||
ldout(cct, 10) << "r=" << r << dendl;
|
||||
|
||||
m_on_finish->complete(r);
|
||||
delete this;
|
||||
}
|
||||
|
||||
} // namespace image_watcher
|
||||
} // namespace librbd
|
||||
|
||||
template class librbd::image_watcher::RewatchRequest<librbd::ImageCtx>;
|
@ -1,78 +0,0 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
|
||||
#ifndef CEPH_LIBRBD_IMAGE_WATCHER_REWATCH_REQUEST_H
|
||||
#define CEPH_LIBRBD_IMAGE_WATCHER_REWATCH_REQUEST_H
|
||||
|
||||
#include "include/int_types.h"
|
||||
#include "include/rados/librados.hpp"
|
||||
|
||||
struct Context;
|
||||
struct RWLock;
|
||||
|
||||
namespace librbd {
|
||||
|
||||
class ImageCtx;
|
||||
|
||||
namespace image_watcher {
|
||||
|
||||
template <typename ImageCtxT = librbd::ImageCtx>
|
||||
class RewatchRequest {
|
||||
public:
|
||||
|
||||
static RewatchRequest *create(ImageCtxT &image_ctx, RWLock &watch_lock,
|
||||
librados::WatchCtx2 *watch_ctx,
|
||||
uint64_t *watch_handle, Context *on_finish) {
|
||||
return new RewatchRequest(image_ctx, watch_lock, watch_ctx, watch_handle,
|
||||
on_finish);
|
||||
}
|
||||
|
||||
RewatchRequest(ImageCtxT &image_ctx, RWLock &watch_lock,
|
||||
librados::WatchCtx2 *watch_ctx, uint64_t *watch_handle,
|
||||
Context *on_finish);
|
||||
|
||||
void send();
|
||||
|
||||
private:
|
||||
/**
|
||||
* @verbatim
|
||||
*
|
||||
* <start>
|
||||
* |
|
||||
* v
|
||||
* UNWATCH
|
||||
* |
|
||||
* | . . . .
|
||||
* | . . (recoverable error)
|
||||
* v v .
|
||||
* REWATCH . . .
|
||||
* |
|
||||
* v
|
||||
* <finish>
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
ImageCtxT &m_image_ctx;
|
||||
RWLock &m_watch_lock;
|
||||
librados::WatchCtx2 *m_watch_ctx;
|
||||
uint64_t *m_watch_handle;
|
||||
Context *m_on_finish;
|
||||
|
||||
uint64_t m_rewatch_handle = 0;
|
||||
|
||||
void unwatch();
|
||||
void handle_unwatch(int r);
|
||||
|
||||
void rewatch();
|
||||
void handle_rewatch(int r);
|
||||
|
||||
void finish(int r);
|
||||
};
|
||||
|
||||
} // namespace image_watcher
|
||||
} // namespace librbd
|
||||
|
||||
extern template class librbd::image_watcher::RewatchRequest<librbd::ImageCtx>;
|
||||
|
||||
#endif // CEPH_LIBRBD_IMAGE_WATCHER_REWATCH_REQUEST_H
|
@ -7,6 +7,7 @@
|
||||
|
||||
#include "librbd/ImageCtx.h"
|
||||
#include "librbd/MirroringWatcher.h"
|
||||
#include "librbd/ImageWatcher.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_rbd
|
||||
#undef dout_prefix
|
||||
@ -35,3 +36,6 @@ void C_NotifyAck::finish(int r) {
|
||||
|
||||
template struct librbd::watcher::HandlePayloadVisitor<
|
||||
librbd::MirroringWatcher<librbd::ImageCtx>>;
|
||||
|
||||
template struct librbd::watcher::HandlePayloadVisitor<
|
||||
librbd::ImageWatcher<librbd::ImageCtx>>;
|
||||
|
@ -35,7 +35,6 @@ set(unittest_librbd_srcs
|
||||
exclusive_lock/test_mock_ReacquireRequest.cc
|
||||
exclusive_lock/test_mock_ReleaseRequest.cc
|
||||
image/test_mock_RefreshRequest.cc
|
||||
image_watcher/test_mock_RewatchRequest.cc
|
||||
journal/test_mock_OpenRequest.cc
|
||||
journal/test_mock_PromoteRequest.cc
|
||||
journal/test_mock_Replay.cc
|
||||
|
@ -1,215 +0,0 @@
|
||||
// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
|
||||
#include "test/librbd/test_mock_fixture.h"
|
||||
#include "include/rados/librados.hpp"
|
||||
#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
|
||||
#include "test/librados_test_stub/MockTestMemRadosClient.h"
|
||||
#include "test/librbd/test_support.h"
|
||||
#include "test/librbd/mock/MockExclusiveLock.h"
|
||||
#include "test/librbd/mock/MockImageCtx.h"
|
||||
#include "librados/AioCompletionImpl.h"
|
||||
#include "librbd/image_watcher/RewatchRequest.h"
|
||||
|
||||
namespace librbd {
|
||||
namespace {
|
||||
|
||||
struct MockTestImageCtx : public MockImageCtx {
|
||||
MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) {
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
} // namespace librbd
|
||||
|
||||
#include "librbd/image_watcher/RewatchRequest.cc"
|
||||
|
||||
namespace librbd {
|
||||
namespace image_watcher {
|
||||
|
||||
using ::testing::_;
|
||||
using ::testing::DoAll;
|
||||
using ::testing::InSequence;
|
||||
using ::testing::Invoke;
|
||||
using ::testing::Return;
|
||||
using ::testing::WithArg;
|
||||
|
||||
struct TestMockImageWatcherRewatchRequest : public TestMockFixture {
|
||||
typedef RewatchRequest<librbd::MockTestImageCtx> MockRewatchRequest;
|
||||
|
||||
TestMockImageWatcherRewatchRequest()
|
||||
: m_watch_lock("watch_lock") {
|
||||
}
|
||||
|
||||
void expect_aio_watch(MockImageCtx &mock_image_ctx, int r) {
|
||||
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
|
||||
mock_image_ctx.md_ctx));
|
||||
|
||||
EXPECT_CALL(mock_io_ctx, aio_watch(mock_image_ctx.header_oid, _, _, _))
|
||||
.WillOnce(DoAll(WithArg<1>(Invoke([&mock_image_ctx, &mock_io_ctx, r](librados::AioCompletionImpl *c) {
|
||||
c->get();
|
||||
mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) {
|
||||
mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
|
||||
}), r);
|
||||
})),
|
||||
Return(0)));
|
||||
}
|
||||
|
||||
void expect_aio_unwatch(MockImageCtx &mock_image_ctx, int r) {
|
||||
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
|
||||
mock_image_ctx.md_ctx));
|
||||
|
||||
EXPECT_CALL(mock_io_ctx, aio_unwatch(m_watch_handle, _))
|
||||
.WillOnce(DoAll(Invoke([&mock_image_ctx, &mock_io_ctx, r](uint64_t handle,
|
||||
librados::AioCompletionImpl *c) {
|
||||
c->get();
|
||||
mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) {
|
||||
mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
|
||||
}), r);
|
||||
}),
|
||||
Return(0)));
|
||||
}
|
||||
|
||||
void expect_reacquire_lock(MockExclusiveLock &mock_exclusive_lock) {
|
||||
EXPECT_CALL(mock_exclusive_lock, reacquire_lock());
|
||||
}
|
||||
|
||||
struct WatchCtx : public librados::WatchCtx2 {
|
||||
virtual void handle_notify(uint64_t, uint64_t, uint64_t,
|
||||
ceph::bufferlist&) {
|
||||
assert(false);
|
||||
}
|
||||
virtual void handle_error(uint64_t, int) {
|
||||
assert(false);
|
||||
}
|
||||
};
|
||||
|
||||
RWLock m_watch_lock;
|
||||
WatchCtx m_watch_ctx;
|
||||
uint64_t m_watch_handle = 123;
|
||||
};
|
||||
|
||||
TEST_F(TestMockImageWatcherRewatchRequest, Success) {
|
||||
librbd::ImageCtx *ictx;
|
||||
ASSERT_EQ(0, open_image(m_image_name, &ictx));
|
||||
|
||||
MockTestImageCtx mock_image_ctx(*ictx);
|
||||
|
||||
InSequence seq;
|
||||
expect_aio_unwatch(mock_image_ctx, 0);
|
||||
expect_aio_watch(mock_image_ctx, 0);
|
||||
|
||||
MockExclusiveLock mock_exclusive_lock;
|
||||
if (ictx->test_features(RBD_FEATURE_EXCLUSIVE_LOCK)) {
|
||||
mock_image_ctx.exclusive_lock = &mock_exclusive_lock;
|
||||
expect_reacquire_lock(mock_exclusive_lock);
|
||||
}
|
||||
|
||||
C_SaferCond ctx;
|
||||
MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx,
|
||||
m_watch_lock,
|
||||
&m_watch_ctx,
|
||||
&m_watch_handle,
|
||||
&ctx);
|
||||
{
|
||||
RWLock::WLocker watch_locker(m_watch_lock);
|
||||
req->send();
|
||||
}
|
||||
ASSERT_EQ(0, ctx.wait());
|
||||
}
|
||||
|
||||
TEST_F(TestMockImageWatcherRewatchRequest, UnwatchError) {
|
||||
librbd::ImageCtx *ictx;
|
||||
ASSERT_EQ(0, open_image(m_image_name, &ictx));
|
||||
|
||||
MockTestImageCtx mock_image_ctx(*ictx);
|
||||
|
||||
InSequence seq;
|
||||
expect_aio_unwatch(mock_image_ctx, -EINVAL);
|
||||
expect_aio_watch(mock_image_ctx, 0);
|
||||
|
||||
C_SaferCond ctx;
|
||||
MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx,
|
||||
m_watch_lock,
|
||||
&m_watch_ctx,
|
||||
&m_watch_handle,
|
||||
&ctx);
|
||||
{
|
||||
RWLock::WLocker watch_locker(m_watch_lock);
|
||||
req->send();
|
||||
}
|
||||
ASSERT_EQ(0, ctx.wait());
|
||||
}
|
||||
|
||||
TEST_F(TestMockImageWatcherRewatchRequest, WatchBlacklist) {
|
||||
librbd::ImageCtx *ictx;
|
||||
ASSERT_EQ(0, open_image(m_image_name, &ictx));
|
||||
|
||||
MockTestImageCtx mock_image_ctx(*ictx);
|
||||
|
||||
InSequence seq;
|
||||
expect_aio_unwatch(mock_image_ctx, 0);
|
||||
expect_aio_watch(mock_image_ctx, -EBLACKLISTED);
|
||||
|
||||
C_SaferCond ctx;
|
||||
MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx,
|
||||
m_watch_lock,
|
||||
&m_watch_ctx,
|
||||
&m_watch_handle,
|
||||
&ctx);
|
||||
{
|
||||
RWLock::WLocker watch_locker(m_watch_lock);
|
||||
req->send();
|
||||
}
|
||||
ASSERT_EQ(-EBLACKLISTED, ctx.wait());
|
||||
}
|
||||
|
||||
TEST_F(TestMockImageWatcherRewatchRequest, WatchDNE) {
|
||||
librbd::ImageCtx *ictx;
|
||||
ASSERT_EQ(0, open_image(m_image_name, &ictx));
|
||||
|
||||
MockTestImageCtx mock_image_ctx(*ictx);
|
||||
|
||||
InSequence seq;
|
||||
expect_aio_unwatch(mock_image_ctx, 0);
|
||||
expect_aio_watch(mock_image_ctx, -ENOENT);
|
||||
|
||||
C_SaferCond ctx;
|
||||
MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx,
|
||||
m_watch_lock,
|
||||
&m_watch_ctx,
|
||||
&m_watch_handle,
|
||||
&ctx);
|
||||
{
|
||||
RWLock::WLocker watch_locker(m_watch_lock);
|
||||
req->send();
|
||||
}
|
||||
ASSERT_EQ(-ENOENT, ctx.wait());
|
||||
}
|
||||
|
||||
TEST_F(TestMockImageWatcherRewatchRequest, WatchError) {
|
||||
librbd::ImageCtx *ictx;
|
||||
ASSERT_EQ(0, open_image(m_image_name, &ictx));
|
||||
|
||||
MockTestImageCtx mock_image_ctx(*ictx);
|
||||
|
||||
InSequence seq;
|
||||
expect_aio_unwatch(mock_image_ctx, 0);
|
||||
expect_aio_watch(mock_image_ctx, -EINVAL);
|
||||
expect_aio_watch(mock_image_ctx, 0);
|
||||
|
||||
C_SaferCond ctx;
|
||||
MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx,
|
||||
m_watch_lock,
|
||||
&m_watch_ctx,
|
||||
&m_watch_handle,
|
||||
&ctx);
|
||||
{
|
||||
RWLock::WLocker watch_locker(m_watch_lock);
|
||||
req->send();
|
||||
}
|
||||
ASSERT_EQ(0, ctx.wait());
|
||||
}
|
||||
|
||||
} // namespace image_watcher
|
||||
} // namespace librbd
|
@ -11,6 +11,7 @@ class Context;
|
||||
namespace librbd {
|
||||
|
||||
struct MockImageWatcher {
|
||||
MOCK_METHOD0(is_registered, bool());
|
||||
MOCK_METHOD0(unregister_watch, void());
|
||||
MOCK_METHOD1(flush, void(Context *));
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user