mirror of
https://github.com/ceph/ceph
synced 2024-12-18 09:25:49 +00:00
Merge pull request #14072 from trociny/wip-notify
librbd: Notifier::notify API improvement Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
commit
b1f1df4f37
@ -1,6 +1,7 @@
|
||||
add_library(rbd_types STATIC
|
||||
journal/Types.cc
|
||||
mirroring_watcher/Types.cc
|
||||
watcher/Types.cc
|
||||
WatchNotifyTypes.cc)
|
||||
|
||||
set(librbd_internal_srcs
|
||||
@ -100,7 +101,6 @@ set(librbd_internal_srcs
|
||||
operation/TrimRequest.cc
|
||||
watcher/Notifier.cc
|
||||
watcher/RewatchRequest.cc
|
||||
watcher/Types.cc
|
||||
${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc)
|
||||
|
||||
add_library(rbd_api STATIC librbd.cc)
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include "librbd/exclusive_lock/Policy.h"
|
||||
#include "librbd/image_watcher/NotifyLockOwner.h"
|
||||
#include "librbd/io/AioCompletion.h"
|
||||
#include "librbd/watcher/Utils.h"
|
||||
#include "include/encoding.h"
|
||||
#include "common/errno.h"
|
||||
#include "common/WorkQueue.h"
|
||||
@ -28,8 +29,7 @@ using namespace watch_notify;
|
||||
using util::create_async_context_callback;
|
||||
using util::create_context_callback;
|
||||
using util::create_rados_callback;
|
||||
using librbd::watcher::HandlePayloadVisitor;
|
||||
using librbd::watcher::C_NotifyAck;
|
||||
using librbd::watcher::util::HandlePayloadVisitor;
|
||||
|
||||
static const double RETRY_DELAY_SECONDS = 1.0;
|
||||
|
||||
|
@ -20,15 +20,17 @@ class entity_name_t;
|
||||
namespace librbd {
|
||||
|
||||
namespace watcher {
|
||||
namespace util {
|
||||
template <typename> struct HandlePayloadVisitor;
|
||||
}
|
||||
}
|
||||
|
||||
class ImageCtx;
|
||||
template <typename> class TaskFinisher;
|
||||
|
||||
template <typename ImageCtxT = ImageCtx>
|
||||
class ImageWatcher : public Watcher {
|
||||
friend struct watcher::HandlePayloadVisitor<ImageWatcher<ImageCtxT>>;
|
||||
friend struct watcher::util::HandlePayloadVisitor<ImageWatcher<ImageCtxT>>;
|
||||
|
||||
public:
|
||||
ImageWatcher(ImageCtxT& image_ctx);
|
||||
@ -161,9 +163,9 @@ private:
|
||||
};
|
||||
|
||||
struct C_ResponseMessage : public Context {
|
||||
watcher::C_NotifyAck *notify_ack;
|
||||
C_NotifyAck *notify_ack;
|
||||
|
||||
C_ResponseMessage(watcher::C_NotifyAck *notify_ack) : notify_ack(notify_ack) {
|
||||
C_ResponseMessage(C_NotifyAck *notify_ack) : notify_ack(notify_ack) {
|
||||
}
|
||||
void finish(int r) override;
|
||||
};
|
||||
@ -215,41 +217,41 @@ private:
|
||||
ProgressContext** prog_ctx);
|
||||
|
||||
bool handle_payload(const watch_notify::HeaderUpdatePayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::AcquiredLockPayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::ReleasedLockPayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::RequestLockPayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::AsyncProgressPayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::AsyncCompletePayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::FlattenPayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::ResizePayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::SnapCreatePayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::SnapRenamePayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::SnapRemovePayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::SnapProtectPayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::SnapUnprotectPayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::RebuildObjectMapPayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::RenamePayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::UpdateFeaturesPayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
bool handle_payload(const watch_notify::UnknownPayload& payload,
|
||||
watcher::C_NotifyAck *ctx);
|
||||
C_NotifyAck *ctx);
|
||||
void process_payload(uint64_t notify_id, uint64_t handle,
|
||||
const watch_notify::Payload &payload, int r);
|
||||
const watch_notify::Payload &payload, int r);
|
||||
|
||||
void handle_notify(uint64_t notify_id, uint64_t handle,
|
||||
uint64_t notifier_id, bufferlist &bl) override;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include "include/rados/librados.hpp"
|
||||
#include "common/errno.h"
|
||||
#include "librbd/Utils.h"
|
||||
#include "librbd/watcher/Utils.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_rbd
|
||||
#undef dout_prefix
|
||||
@ -16,6 +17,8 @@ namespace librbd {
|
||||
using namespace mirroring_watcher;
|
||||
using namespace watcher;
|
||||
|
||||
using librbd::util::create_rados_callback;
|
||||
|
||||
namespace {
|
||||
|
||||
static const uint64_t NOTIFY_TIMEOUT_MS = 5000;
|
||||
@ -46,7 +49,7 @@ void MirroringWatcher<I>::notify_mode_updated(librados::IoCtx &io_ctx,
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage{ModeUpdatedPayload{mirror_mode}}, bl);
|
||||
|
||||
librados::AioCompletion *comp = util::create_rados_callback(on_finish);
|
||||
librados::AioCompletion *comp = create_rados_callback(on_finish);
|
||||
int r = io_ctx.aio_notify(RBD_MIRRORING, comp, bl, NOTIFY_TIMEOUT_MS,
|
||||
nullptr);
|
||||
assert(r == 0);
|
||||
@ -76,7 +79,7 @@ void MirroringWatcher<I>::notify_image_updated(
|
||||
::encode(NotifyMessage{ImageUpdatedPayload{
|
||||
mirror_image_state, image_id, global_image_id}}, bl);
|
||||
|
||||
librados::AioCompletion *comp = util::create_rados_callback(on_finish);
|
||||
librados::AioCompletion *comp = create_rados_callback(on_finish);
|
||||
int r = io_ctx.aio_notify(RBD_MIRRORING, comp, bl, NOTIFY_TIMEOUT_MS,
|
||||
nullptr);
|
||||
assert(r == 0);
|
||||
@ -104,9 +107,8 @@ void MirroringWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
|
||||
return;
|
||||
}
|
||||
|
||||
apply_visitor(HandlePayloadVisitor<MirroringWatcher<I>>(this, notify_id,
|
||||
handle),
|
||||
notify_message.payload);
|
||||
apply_visitor(watcher::util::HandlePayloadVisitor<MirroringWatcher<I>>(
|
||||
this, notify_id, handle), notify_message.payload);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
|
@ -17,12 +17,14 @@ namespace librados {
|
||||
namespace librbd {
|
||||
|
||||
namespace watcher {
|
||||
namespace util {
|
||||
template <typename> struct HandlePayloadVisitor;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename ImageCtxT = librbd::ImageCtx>
|
||||
class MirroringWatcher : public Watcher {
|
||||
friend struct watcher::HandlePayloadVisitor<MirroringWatcher<ImageCtxT>>;
|
||||
friend struct watcher::util::HandlePayloadVisitor<MirroringWatcher<ImageCtxT>>;
|
||||
|
||||
public:
|
||||
MirroringWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue);
|
||||
|
@ -2,11 +2,11 @@
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
|
||||
#include "cls/rbd/cls_rbd_types.h"
|
||||
#include "librbd/WatchNotifyTypes.h"
|
||||
#include "librbd/watcher/Types.h"
|
||||
#include "common/Formatter.h"
|
||||
#include "include/assert.h"
|
||||
#include "include/stringify.h"
|
||||
#include "common/Formatter.h"
|
||||
#include "librbd/WatchNotifyTypes.h"
|
||||
#include "librbd/watcher/Utils.h"
|
||||
|
||||
namespace librbd {
|
||||
namespace watch_notify {
|
||||
@ -38,21 +38,6 @@ private:
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
void ClientId::encode(bufferlist &bl) const {
|
||||
::encode(gid, bl);
|
||||
::encode(handle, bl);
|
||||
}
|
||||
|
||||
void ClientId::decode(bufferlist::iterator &iter) {
|
||||
::decode(gid, iter);
|
||||
::decode(handle, iter);
|
||||
}
|
||||
|
||||
void ClientId::dump(Formatter *f) const {
|
||||
f->dump_unsigned("gid", gid);
|
||||
f->dump_unsigned("handle", handle);
|
||||
}
|
||||
|
||||
void AsyncRequestId::encode(bufferlist &bl) const {
|
||||
::encode(client_id, bl);
|
||||
::encode(request_id, bl);
|
||||
@ -295,7 +280,7 @@ bool NotifyMessage::check_for_refresh() const {
|
||||
|
||||
void NotifyMessage::encode(bufferlist& bl) const {
|
||||
ENCODE_START(6, 1, bl);
|
||||
boost::apply_visitor(watcher::EncodePayloadVisitor(bl), payload);
|
||||
boost::apply_visitor(watcher::util::EncodePayloadVisitor(bl), payload);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
@ -360,7 +345,7 @@ void NotifyMessage::decode(bufferlist::iterator& iter) {
|
||||
break;
|
||||
}
|
||||
|
||||
apply_visitor(watcher::DecodePayloadVisitor(struct_v, iter), payload);
|
||||
apply_visitor(watcher::util::DecodePayloadVisitor(struct_v, iter), payload);
|
||||
DECODE_FINISH(iter);
|
||||
}
|
||||
|
||||
@ -470,12 +455,6 @@ std::ostream &operator<<(std::ostream &out,
|
||||
return out;
|
||||
}
|
||||
|
||||
std::ostream &operator<<(std::ostream &out,
|
||||
const librbd::watch_notify::ClientId &client_id) {
|
||||
out << "[" << client_id.gid << "," << client_id.handle << "]";
|
||||
return out;
|
||||
}
|
||||
|
||||
std::ostream &operator<<(std::ostream &out,
|
||||
const librbd::watch_notify::AsyncRequestId &request) {
|
||||
out << "[" << request.client_id.gid << "," << request.client_id.handle << ","
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include "include/int_types.h"
|
||||
#include "include/buffer_fwd.h"
|
||||
#include "include/encoding.h"
|
||||
#include "librbd/watcher/Types.h"
|
||||
#include <iosfwd>
|
||||
#include <list>
|
||||
#include <string>
|
||||
@ -19,35 +20,9 @@ class Formatter;
|
||||
namespace librbd {
|
||||
namespace watch_notify {
|
||||
|
||||
struct ClientId {
|
||||
uint64_t gid;
|
||||
uint64_t handle;
|
||||
using librbd::watcher::ClientId;
|
||||
|
||||
ClientId() : gid(0), handle(0) {}
|
||||
ClientId(uint64_t gid_, uint64_t handle_) : gid(gid_), handle(handle_) {}
|
||||
|
||||
void encode(bufferlist& bl) const;
|
||||
void decode(bufferlist::iterator& it);
|
||||
void dump(Formatter *f) const;
|
||||
|
||||
inline bool is_valid() const {
|
||||
return (*this != ClientId());
|
||||
}
|
||||
|
||||
inline bool operator==(const ClientId &rhs) const {
|
||||
return (gid == rhs.gid && handle == rhs.handle);
|
||||
}
|
||||
inline bool operator!=(const ClientId &rhs) const {
|
||||
return !(*this == rhs);
|
||||
}
|
||||
inline bool operator<(const ClientId &rhs) const {
|
||||
if (gid != rhs.gid) {
|
||||
return gid < rhs.gid;
|
||||
} else {
|
||||
return handle < rhs.handle;
|
||||
}
|
||||
}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(ClientId);
|
||||
|
||||
struct AsyncRequestId {
|
||||
ClientId client_id;
|
||||
@ -386,12 +361,9 @@ struct ResponseMessage {
|
||||
|
||||
std::ostream &operator<<(std::ostream &out,
|
||||
const librbd::watch_notify::NotifyOp &op);
|
||||
std::ostream &operator<<(std::ostream &out,
|
||||
const librbd::watch_notify::ClientId &client);
|
||||
std::ostream &operator<<(std::ostream &out,
|
||||
const librbd::watch_notify::AsyncRequestId &request);
|
||||
|
||||
WRITE_CLASS_ENCODER(librbd::watch_notify::ClientId);
|
||||
WRITE_CLASS_ENCODER(librbd::watch_notify::AsyncRequestId);
|
||||
WRITE_CLASS_ENCODER(librbd::watch_notify::NotifyMessage);
|
||||
WRITE_CLASS_ENCODER(librbd::watch_notify::ResponseMessage);
|
||||
|
@ -11,9 +11,6 @@
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#define dout_subsys ceph_subsys_rbd
|
||||
#undef dout_prefix
|
||||
#define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
|
||||
<< ": "
|
||||
|
||||
namespace librbd {
|
||||
|
||||
@ -66,6 +63,27 @@ struct C_UnwatchAndFlush : public Context {
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
#undef dout_prefix
|
||||
#define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
|
||||
<< __func__ << ": "
|
||||
|
||||
Watcher::C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
|
||||
uint64_t handle)
|
||||
: watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
|
||||
handle(handle) {
|
||||
ldout(cct, 10) << "id=" << notify_id << ", " << "handle=" << handle << dendl;
|
||||
}
|
||||
|
||||
void Watcher::C_NotifyAck::finish(int r) {
|
||||
ldout(cct, 10) << "r=" << r << dendl;
|
||||
assert(r == 0);
|
||||
watcher->acknowledge_notify(notify_id, handle, out);
|
||||
}
|
||||
|
||||
#undef dout_prefix
|
||||
#define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
|
||||
<< ": "
|
||||
|
||||
Watcher::Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
|
||||
const string& oid)
|
||||
: m_ioctx(ioctx), m_work_queue(work_queue), m_oid(oid),
|
||||
@ -233,9 +251,10 @@ void Watcher::handle_rewatch(int r) {
|
||||
}
|
||||
}
|
||||
|
||||
void Watcher::send_notify(bufferlist& payload, bufferlist *out_bl,
|
||||
void Watcher::send_notify(bufferlist& payload,
|
||||
watcher::NotifyResponse *response,
|
||||
Context *on_finish) {
|
||||
m_notifier.notify(payload, out_bl, on_finish);
|
||||
m_notifier.notify(payload, response, on_finish);
|
||||
}
|
||||
|
||||
void Watcher::WatchCtx::handle_notify(uint64_t notify_id,
|
||||
|
@ -16,10 +16,21 @@ class ContextWQ;
|
||||
|
||||
namespace librbd {
|
||||
|
||||
class Watcher {
|
||||
friend struct watcher::C_NotifyAck;
|
||||
namespace watcher { struct NotifyResponse; }
|
||||
|
||||
class Watcher {
|
||||
public:
|
||||
struct C_NotifyAck : public Context {
|
||||
Watcher *watcher;
|
||||
CephContext *cct;
|
||||
uint64_t notify_id;
|
||||
uint64_t handle;
|
||||
bufferlist out;
|
||||
|
||||
C_NotifyAck(Watcher *watcher, uint64_t notify_id, uint64_t handle);
|
||||
void finish(int r) override;
|
||||
};
|
||||
|
||||
Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
|
||||
const std::string& oid);
|
||||
virtual ~Watcher();
|
||||
@ -63,7 +74,8 @@ protected:
|
||||
watcher::Notifier m_notifier;
|
||||
WatchState m_watch_state;
|
||||
|
||||
void send_notify(bufferlist &payload, bufferlist *out_bl = nullptr,
|
||||
void send_notify(bufferlist &payload,
|
||||
watcher::NotifyResponse *response = nullptr,
|
||||
Context *on_finish = nullptr);
|
||||
|
||||
virtual void handle_notify(uint64_t notify_id, uint64_t handle,
|
||||
|
@ -37,7 +37,7 @@ void NotifyLockOwner::send_notify() {
|
||||
ldout(cct, 20) << dendl;
|
||||
|
||||
assert(m_image_ctx.owner_lock.is_locked());
|
||||
m_notifier.notify(m_bl, &m_out_bl, create_context_callback<
|
||||
m_notifier.notify(m_bl, &m_notify_response, create_context_callback<
|
||||
NotifyLockOwner, &NotifyLockOwner::handle_notify>(this));
|
||||
}
|
||||
|
||||
@ -52,30 +52,17 @@ void NotifyLockOwner::handle_notify(int r) {
|
||||
return;
|
||||
}
|
||||
|
||||
typedef std::map<std::pair<uint64_t, uint64_t>, bufferlist> responses_t;
|
||||
responses_t responses;
|
||||
if (m_out_bl.length() > 0) {
|
||||
try {
|
||||
bufferlist::iterator iter = m_out_bl.begin();
|
||||
::decode(responses, iter);
|
||||
} catch (const buffer::error &err) {
|
||||
lderr(cct) << ": failed to decode response" << dendl;
|
||||
finish(-EINVAL);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
bufferlist response;
|
||||
bool lock_owner_responded = false;
|
||||
for (responses_t::iterator i = responses.begin(); i != responses.end(); ++i) {
|
||||
if (i->second.length() > 0) {
|
||||
for (auto &it : m_notify_response.acks) {
|
||||
if (it.second.length() > 0) {
|
||||
if (lock_owner_responded) {
|
||||
lderr(cct) << ": duplicate lock owners detected" << dendl;
|
||||
finish(-EINVAL);
|
||||
return;
|
||||
}
|
||||
lock_owner_responded = true;
|
||||
response.claim(i->second);
|
||||
response.claim(it.second);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#define CEPH_LIBRBD_IMAGE_WATCHER_NOTIFY_LOCK_OWNER_H
|
||||
|
||||
#include "include/buffer.h"
|
||||
#include "librbd/watcher/Types.h"
|
||||
|
||||
class Context;
|
||||
|
||||
@ -34,7 +35,7 @@ private:
|
||||
watcher::Notifier &m_notifier;
|
||||
|
||||
bufferlist m_bl;
|
||||
bufferlist m_out_bl;
|
||||
watcher::NotifyResponse m_notify_response;
|
||||
Context *m_on_finish;
|
||||
|
||||
void send_notify();
|
||||
|
@ -1,11 +1,11 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
|
||||
#include "librbd/mirroring_watcher/Types.h"
|
||||
#include "librbd/watcher/Types.h"
|
||||
#include "common/Formatter.h"
|
||||
#include "include/assert.h"
|
||||
#include "include/stringify.h"
|
||||
#include "common/Formatter.h"
|
||||
#include "librbd/mirroring_watcher/Types.h"
|
||||
#include "librbd/watcher/Utils.h"
|
||||
|
||||
namespace librbd {
|
||||
namespace mirroring_watcher {
|
||||
@ -76,7 +76,7 @@ void UnknownPayload::dump(Formatter *f) const {
|
||||
|
||||
void NotifyMessage::encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
boost::apply_visitor(watcher::EncodePayloadVisitor(bl), payload);
|
||||
boost::apply_visitor(watcher::util::EncodePayloadVisitor(bl), payload);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
@ -99,7 +99,7 @@ void NotifyMessage::decode(bufferlist::iterator& iter) {
|
||||
break;
|
||||
}
|
||||
|
||||
apply_visitor(watcher::DecodePayloadVisitor(struct_v, iter), payload);
|
||||
apply_visitor(watcher::util::DecodePayloadVisitor(struct_v, iter), payload);
|
||||
DECODE_FINISH(iter);
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include "common/WorkQueue.h"
|
||||
#include "librbd/ImageCtx.h"
|
||||
#include "librbd/Utils.h"
|
||||
#include "librbd/watcher/Types.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_rbd
|
||||
#undef dout_prefix
|
||||
@ -15,6 +16,25 @@ namespace watcher {
|
||||
|
||||
const uint64_t Notifier::NOTIFY_TIMEOUT = 5000;
|
||||
|
||||
Notifier::C_AioNotify::C_AioNotify(Notifier *notifier, NotifyResponse *response,
|
||||
Context *on_finish)
|
||||
: notifier(notifier), response(response), on_finish(on_finish) {
|
||||
}
|
||||
|
||||
void Notifier::C_AioNotify::finish(int r) {
|
||||
if (response != nullptr) {
|
||||
if (r == 0 || r == -ETIMEDOUT) {
|
||||
try {
|
||||
bufferlist::iterator it = out_bl.begin();
|
||||
::decode(*response, it);
|
||||
} catch (const buffer::error &err) {
|
||||
r = -EBADMSG;
|
||||
}
|
||||
}
|
||||
}
|
||||
notifier->handle_notify(r, on_finish);
|
||||
}
|
||||
|
||||
Notifier::Notifier(ContextWQ *work_queue, IoCtx &ioctx, const std::string &oid)
|
||||
: m_work_queue(work_queue), m_ioctx(ioctx), m_oid(oid),
|
||||
m_aio_notify_lock(util::unique_lock_name(
|
||||
@ -37,7 +57,8 @@ void Notifier::flush(Context *on_finish) {
|
||||
m_aio_notify_flush_ctxs.push_back(on_finish);
|
||||
}
|
||||
|
||||
void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) {
|
||||
void Notifier::notify(bufferlist &bl, NotifyResponse *response,
|
||||
Context *on_finish) {
|
||||
{
|
||||
Mutex::Locker aio_notify_locker(m_aio_notify_lock);
|
||||
++m_pending_aio_notifies;
|
||||
@ -46,9 +67,9 @@ void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) {
|
||||
<< dendl;
|
||||
}
|
||||
|
||||
C_AioNotify *ctx = new C_AioNotify(this, on_finish);
|
||||
C_AioNotify *ctx = new C_AioNotify(this, response, on_finish);
|
||||
librados::AioCompletion *comp = util::create_rados_callback(ctx);
|
||||
int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, out_bl);
|
||||
int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, &ctx->out_bl);
|
||||
assert(r == 0);
|
||||
comp->release();
|
||||
}
|
||||
|
@ -16,6 +16,8 @@ namespace librbd {
|
||||
|
||||
namespace watcher {
|
||||
|
||||
struct NotifyResponse;
|
||||
|
||||
class Notifier {
|
||||
public:
|
||||
static const uint64_t NOTIFY_TIMEOUT;
|
||||
@ -25,21 +27,21 @@ public:
|
||||
~Notifier();
|
||||
|
||||
void flush(Context *on_finish);
|
||||
void notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish);
|
||||
void notify(bufferlist &bl, NotifyResponse *response, Context *on_finish);
|
||||
|
||||
private:
|
||||
typedef std::list<Context*> Contexts;
|
||||
|
||||
struct C_AioNotify : public Context {
|
||||
Notifier *notifier;
|
||||
NotifyResponse *response;
|
||||
Context *on_finish;
|
||||
bufferlist out_bl;
|
||||
|
||||
C_AioNotify(Notifier *notifier, Context *on_finish)
|
||||
: notifier(notifier), on_finish(on_finish) {
|
||||
}
|
||||
void finish(int r) override {
|
||||
notifier->handle_notify(r, on_finish);
|
||||
}
|
||||
C_AioNotify(Notifier *notifier, NotifyResponse *response,
|
||||
Context *on_finish);
|
||||
|
||||
void finish(int r) override;
|
||||
};
|
||||
|
||||
ContextWQ *m_work_queue;
|
||||
|
@ -2,40 +2,43 @@
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
|
||||
#include "librbd/watcher/Types.h"
|
||||
#include "librbd/Watcher.h"
|
||||
#include "common/dout.h"
|
||||
|
||||
#include "librbd/ImageCtx.h"
|
||||
#include "librbd/MirroringWatcher.h"
|
||||
#include "librbd/ImageWatcher.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_rbd
|
||||
#undef dout_prefix
|
||||
#define dout_prefix *_dout << "librbd::Watcher: "
|
||||
#include "common/Formatter.h"
|
||||
|
||||
namespace librbd {
|
||||
namespace watcher {
|
||||
|
||||
C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
|
||||
uint64_t handle)
|
||||
: watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
|
||||
handle(handle) {
|
||||
ldout(cct, 10) << this << " C_NotifyAck start: id=" << notify_id << ", "
|
||||
<< "handle=" << handle << dendl;
|
||||
void ClientId::encode(bufferlist &bl) const {
|
||||
::encode(gid, bl);
|
||||
::encode(handle, bl);
|
||||
}
|
||||
|
||||
void C_NotifyAck::finish(int r) {
|
||||
assert(r == 0);
|
||||
ldout(cct, 10) << this << " C_NotifyAck finish: id=" << notify_id << ", "
|
||||
<< "handle=" << handle << dendl;
|
||||
watcher->acknowledge_notify(notify_id, handle, out);
|
||||
void ClientId::decode(bufferlist::iterator &iter) {
|
||||
::decode(gid, iter);
|
||||
::decode(handle, iter);
|
||||
}
|
||||
|
||||
void ClientId::dump(Formatter *f) const {
|
||||
f->dump_unsigned("gid", gid);
|
||||
f->dump_unsigned("handle", handle);
|
||||
}
|
||||
|
||||
WRITE_CLASS_ENCODER(ClientId);
|
||||
|
||||
void NotifyResponse::encode(bufferlist& bl) const {
|
||||
::encode(acks, bl);
|
||||
::encode(timeouts, bl);
|
||||
}
|
||||
|
||||
void NotifyResponse::decode(bufferlist::iterator& iter) {
|
||||
::decode(acks, iter);
|
||||
::decode(timeouts, iter);
|
||||
}
|
||||
|
||||
} // namespace watcher
|
||||
} // namespace librbd
|
||||
|
||||
template struct librbd::watcher::HandlePayloadVisitor<
|
||||
librbd::MirroringWatcher<librbd::ImageCtx>>;
|
||||
|
||||
template struct librbd::watcher::HandlePayloadVisitor<
|
||||
librbd::ImageWatcher<librbd::ImageCtx>>;
|
||||
std::ostream &operator<<(std::ostream &out,
|
||||
const librbd::watcher::ClientId &client_id) {
|
||||
out << "[" << client_id.gid << "," << client_id.handle << "]";
|
||||
return out;
|
||||
}
|
||||
|
@ -4,13 +4,11 @@
|
||||
#ifndef CEPH_LIBRBD_WATCHER_TYPES_H
|
||||
#define CEPH_LIBRBD_WATCHER_TYPES_H
|
||||
|
||||
#include "include/int_types.h"
|
||||
#include "include/buffer_fwd.h"
|
||||
#include "include/encoding.h"
|
||||
#include "include/Context.h"
|
||||
|
||||
namespace ceph {
|
||||
class Formatter;
|
||||
}
|
||||
namespace ceph { class Formatter; }
|
||||
|
||||
namespace librbd {
|
||||
|
||||
@ -18,65 +16,42 @@ class Watcher;
|
||||
|
||||
namespace watcher {
|
||||
|
||||
struct C_NotifyAck : public Context {
|
||||
Watcher *watcher;
|
||||
CephContext *cct;
|
||||
uint64_t notify_id;
|
||||
uint64_t handle;
|
||||
bufferlist out;
|
||||
|
||||
C_NotifyAck(Watcher *watcher, uint64_t notify_id, uint64_t handle);
|
||||
void finish(int r) override;
|
||||
};
|
||||
|
||||
template <typename Watcher>
|
||||
struct HandlePayloadVisitor : public boost::static_visitor<void> {
|
||||
Watcher *watcher;
|
||||
uint64_t notify_id;
|
||||
struct ClientId {
|
||||
uint64_t gid;
|
||||
uint64_t handle;
|
||||
|
||||
HandlePayloadVisitor(Watcher *watcher_, uint64_t notify_id_,
|
||||
uint64_t handle_)
|
||||
: watcher(watcher_), notify_id(notify_id_), handle(handle_)
|
||||
{
|
||||
ClientId() : gid(0), handle(0) {}
|
||||
ClientId(uint64_t gid, uint64_t handle) : gid(gid), handle(handle) {}
|
||||
|
||||
void encode(bufferlist& bl) const;
|
||||
void decode(bufferlist::iterator& it);
|
||||
void dump(Formatter *f) const;
|
||||
|
||||
inline bool is_valid() const {
|
||||
return (*this != ClientId());
|
||||
}
|
||||
|
||||
template <typename P>
|
||||
inline void operator()(const P &payload) const {
|
||||
C_NotifyAck *ctx = new C_NotifyAck(watcher, notify_id, handle);
|
||||
if (watcher->handle_payload(payload, ctx)) {
|
||||
ctx->complete(0);
|
||||
inline bool operator==(const ClientId &rhs) const {
|
||||
return (gid == rhs.gid && handle == rhs.handle);
|
||||
}
|
||||
inline bool operator!=(const ClientId &rhs) const {
|
||||
return !(*this == rhs);
|
||||
}
|
||||
inline bool operator<(const ClientId &rhs) const {
|
||||
if (gid != rhs.gid) {
|
||||
return gid < rhs.gid;
|
||||
} else {
|
||||
return handle < rhs.handle;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class EncodePayloadVisitor : public boost::static_visitor<void> {
|
||||
public:
|
||||
explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
|
||||
struct NotifyResponse {
|
||||
std::map<ClientId, bufferlist> acks;
|
||||
std::vector<ClientId> timeouts;
|
||||
|
||||
template <typename P>
|
||||
inline void operator()(const P &payload) const {
|
||||
::encode(static_cast<uint32_t>(P::NOTIFY_OP), m_bl);
|
||||
payload.encode(m_bl);
|
||||
}
|
||||
|
||||
private:
|
||||
bufferlist &m_bl;
|
||||
};
|
||||
|
||||
class DecodePayloadVisitor : public boost::static_visitor<void> {
|
||||
public:
|
||||
DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
|
||||
: m_version(version), m_iter(iter) {}
|
||||
|
||||
template <typename P>
|
||||
inline void operator()(P &payload) const {
|
||||
payload.decode(m_version, m_iter);
|
||||
}
|
||||
|
||||
private:
|
||||
__u8 m_version;
|
||||
bufferlist::iterator &m_iter;
|
||||
void encode(bufferlist& bl) const;
|
||||
void decode(bufferlist::iterator& it);
|
||||
};
|
||||
|
||||
template <typename ImageCtxT>
|
||||
@ -87,4 +62,10 @@ struct Traits {
|
||||
} // namespace watcher
|
||||
} // namespace librbd
|
||||
|
||||
std::ostream &operator<<(std::ostream &out,
|
||||
const librbd::watcher::ClientId &client);
|
||||
|
||||
WRITE_CLASS_ENCODER(librbd::watcher::ClientId);
|
||||
WRITE_CLASS_ENCODER(librbd::watcher::NotifyResponse);
|
||||
|
||||
#endif // CEPH_LIBRBD_WATCHER_TYPES_H
|
||||
|
73
src/librbd/watcher/Utils.h
Normal file
73
src/librbd/watcher/Utils.h
Normal file
@ -0,0 +1,73 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
|
||||
#ifndef CEPH_LIBRBD_WATCHER_UTILS_H
|
||||
#define CEPH_LIBRBD_WATCHER_UTILS_H
|
||||
|
||||
#include "include/buffer_fwd.h"
|
||||
#include "include/encoding.h"
|
||||
#include "include/Context.h"
|
||||
#include "librbd/Watcher.h"
|
||||
|
||||
namespace ceph { class Formatter; }
|
||||
|
||||
namespace librbd {
|
||||
namespace watcher {
|
||||
namespace util {
|
||||
|
||||
template <typename Watcher>
|
||||
struct HandlePayloadVisitor : public boost::static_visitor<void> {
|
||||
Watcher *watcher;
|
||||
uint64_t notify_id;
|
||||
uint64_t handle;
|
||||
|
||||
HandlePayloadVisitor(Watcher *watcher_, uint64_t notify_id_,
|
||||
uint64_t handle_)
|
||||
: watcher(watcher_), notify_id(notify_id_), handle(handle_)
|
||||
{
|
||||
}
|
||||
|
||||
template <typename P>
|
||||
inline void operator()(const P &payload) const {
|
||||
typename Watcher::C_NotifyAck *ctx =
|
||||
new typename Watcher::C_NotifyAck(watcher, notify_id, handle);
|
||||
if (watcher->handle_payload(payload, ctx)) {
|
||||
ctx->complete(0);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class EncodePayloadVisitor : public boost::static_visitor<void> {
|
||||
public:
|
||||
explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
|
||||
|
||||
template <typename P>
|
||||
inline void operator()(const P &payload) const {
|
||||
::encode(static_cast<uint32_t>(P::NOTIFY_OP), m_bl);
|
||||
payload.encode(m_bl);
|
||||
}
|
||||
|
||||
private:
|
||||
bufferlist &m_bl;
|
||||
};
|
||||
|
||||
class DecodePayloadVisitor : public boost::static_visitor<void> {
|
||||
public:
|
||||
DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
|
||||
: m_version(version), m_iter(iter) {}
|
||||
|
||||
template <typename P>
|
||||
inline void operator()(P &payload) const {
|
||||
payload.decode(m_version, m_iter);
|
||||
}
|
||||
|
||||
private:
|
||||
__u8 m_version;
|
||||
bufferlist::iterator &m_iter;
|
||||
};
|
||||
|
||||
} // namespace util
|
||||
} // namespace watcher
|
||||
} // namespace librbd
|
||||
|
||||
#endif // CEPH_LIBRBD_WATCHER_UTILS_H
|
@ -903,8 +903,8 @@ void LeaderWatcher<I>::notify_heartbeat() {
|
||||
bufferlist bl;
|
||||
::encode(NotifyMessage{HeartbeatPayload{}}, bl);
|
||||
|
||||
m_heartbeat_ack_bl.clear();
|
||||
send_notify(bl, &m_heartbeat_ack_bl, ctx);
|
||||
m_heartbeat_response.acks.clear();
|
||||
send_notify(bl, &m_heartbeat_response, ctx);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -930,31 +930,17 @@ void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
bufferlist::iterator iter = m_heartbeat_ack_bl.begin();
|
||||
uint32_t num_acks;
|
||||
::decode(num_acks, iter);
|
||||
dout(20) << m_heartbeat_response.acks.size() << " acks received, "
|
||||
<< m_heartbeat_response.timeouts.size() << " timed out" << dendl;
|
||||
|
||||
dout(20) << num_acks << " acks received" << dendl;
|
||||
|
||||
for (uint32_t i = 0; i < num_acks; i++) {
|
||||
uint64_t notifier_id;
|
||||
uint64_t cookie;
|
||||
bufferlist reply_bl;
|
||||
|
||||
::decode(notifier_id, iter);
|
||||
::decode(cookie, iter);
|
||||
::decode(reply_bl, iter);
|
||||
|
||||
if (notifier_id == m_notifier_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::string instance_id = stringify(notifier_id);
|
||||
m_instances->notify(instance_id);
|
||||
for (auto &it: m_heartbeat_response.acks) {
|
||||
uint64_t notifier_id = it.first.gid;
|
||||
if (notifier_id == m_notifier_id) {
|
||||
continue;
|
||||
}
|
||||
} catch (const buffer::error &err) {
|
||||
derr << ": error decoding heartbeat acks: " << err.what() << dendl;
|
||||
|
||||
std::string instance_id = stringify(notifier_id);
|
||||
m_instances->notify(instance_id);
|
||||
}
|
||||
|
||||
schedule_timer_task("heartbeat", 1, true,
|
||||
@ -1022,7 +1008,7 @@ void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
|
||||
dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
|
||||
<< "notifier_id=" << notifier_id << dendl;
|
||||
|
||||
Context *ctx = new librbd::watcher::C_NotifyAck(this, notify_id, handle);
|
||||
Context *ctx = new C_NotifyAck(this, notify_id, handle);
|
||||
|
||||
if (notifier_id == m_notifier_id) {
|
||||
dout(20) << "our own notification, ignoring" << dendl;
|
||||
|
@ -10,8 +10,9 @@
|
||||
|
||||
#include "common/AsyncOpTracker.h"
|
||||
#include "librbd/ManagedLock.h"
|
||||
#include "librbd/managed_lock/Types.h"
|
||||
#include "librbd/Watcher.h"
|
||||
#include "librbd/managed_lock/Types.h"
|
||||
#include "librbd/watcher/Types.h"
|
||||
#include "Instances.h"
|
||||
#include "MirrorStatusWatcher.h"
|
||||
#include "tools/rbd_mirror/leader_watcher/Types.h"
|
||||
@ -195,7 +196,7 @@ private:
|
||||
Context *m_timer_task = nullptr;
|
||||
C_TimerGate *m_timer_gate = nullptr;
|
||||
|
||||
bufferlist m_heartbeat_ack_bl;
|
||||
librbd::watcher::NotifyResponse m_heartbeat_response;
|
||||
|
||||
bool is_leader(Mutex &m_lock);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user