librbd: don't use boost::variant for image watcher messages

We are reached 20 types limit and are not able to add new message
types.

Signed-off-by: Mykola Golub <mgolub@suse.com>
This commit is contained in:
Mykola Golub 2020-04-03 11:36:47 +01:00
parent f3590d6334
commit 34bde6542f
8 changed files with 380 additions and 285 deletions

View File

@ -13,7 +13,6 @@
#include "librbd/exclusive_lock/Policy.h"
#include "librbd/image_watcher/NotifyLockOwner.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/watcher/Utils.h"
#include "include/encoding.h"
#include "common/errno.h"
#include "common/WorkQueue.h"
@ -30,7 +29,6 @@ using namespace watch_notify;
using util::create_async_context_callback;
using util::create_context_callback;
using util::create_rados_callback;
using librbd::watcher::util::HandlePayloadVisitor;
using ceph::encode;
using ceph::decode;
@ -42,12 +40,13 @@ struct ImageWatcher<I>::C_ProcessPayload : public Context {
ImageWatcher *image_watcher;
uint64_t notify_id;
uint64_t handle;
watch_notify::Payload payload;
std::unique_ptr<watch_notify::Payload> payload;
C_ProcessPayload(ImageWatcher *image_watcher_, uint64_t notify_id_,
uint64_t handle_, const watch_notify::Payload &payload)
: image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_),
payload(payload) {
C_ProcessPayload(ImageWatcher *image_watcher, uint64_t notify_id,
uint64_t handle,
std::unique_ptr<watch_notify::Payload> &&payload)
: image_watcher(image_watcher), notify_id(notify_id), handle(handle),
payload(std::move(payload)) {
}
void finish(int r) override {
@ -57,7 +56,7 @@ struct ImageWatcher<I>::C_ProcessPayload : public Context {
bufferlist bl;
image_watcher->acknowledge_notify(notify_id, handle, bl);
} else {
image_watcher->process_payload(notify_id, handle, payload);
image_watcher->process_payload(notify_id, handle, payload.get());
}
image_watcher->m_async_op_tracker.finish_op();
}
@ -122,7 +121,7 @@ int ImageWatcher<I>::notify_async_progress(const AsyncRequestId &request,
<< request << " @ " << offset
<< "/" << total << dendl;
send_notify(AsyncProgressPayload(request, offset, total));
send_notify(new AsyncProgressPayload(request, offset, total));
return 0;
}
@ -140,7 +139,7 @@ void ImageWatcher<I>::notify_async_complete(const AsyncRequestId &request,
ldout(m_image_ctx.cct, 20) << this << " remote async request finished: "
<< request << " = " << r << dendl;
send_notify(AsyncCompletePayload(request, r),
send_notify(new AsyncCompletePayload(request, r),
new LambdaContext(boost::bind(&ImageWatcher<I>::handle_async_complete,
this, request, r, _1)));
}
@ -173,7 +172,7 @@ void ImageWatcher<I>::notify_flatten(uint64_t request_id,
AsyncRequestId async_request_id(get_client_id(), request_id);
notify_async_request(async_request_id, FlattenPayload(async_request_id),
notify_async_request(async_request_id, new FlattenPayload(async_request_id),
prog_ctx, on_finish);
}
@ -189,7 +188,7 @@ void ImageWatcher<I>::notify_resize(uint64_t request_id, uint64_t size,
AsyncRequestId async_request_id(get_client_id(), request_id);
notify_async_request(async_request_id,
ResizePayload(size, allow_shrink, async_request_id),
new ResizePayload(size, allow_shrink, async_request_id),
prog_ctx, on_finish);
}
@ -201,7 +200,8 @@ void ImageWatcher<I>::notify_snap_create(const cls::rbd::SnapshotNamespace &snap
ceph_assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
notify_lock_owner(SnapCreatePayload(snap_namespace, snap_name), on_finish);
notify_lock_owner(new SnapCreatePayload(snap_namespace, snap_name),
on_finish);
}
template <typename I>
@ -212,7 +212,8 @@ void ImageWatcher<I>::notify_snap_rename(const snapid_t &src_snap_id,
ceph_assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
notify_lock_owner(SnapRenamePayload(src_snap_id, dst_snap_name), on_finish);
notify_lock_owner(new SnapRenamePayload(src_snap_id, dst_snap_name),
on_finish);
}
template <typename I>
@ -223,7 +224,8 @@ void ImageWatcher<I>::notify_snap_remove(const cls::rbd::SnapshotNamespace &snap
ceph_assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
notify_lock_owner(SnapRemovePayload(snap_namespace, snap_name), on_finish);
notify_lock_owner(new SnapRemovePayload(snap_namespace, snap_name),
on_finish);
}
template <typename I>
@ -234,7 +236,8 @@ void ImageWatcher<I>::notify_snap_protect(const cls::rbd::SnapshotNamespace &sna
ceph_assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
notify_lock_owner(SnapProtectPayload(snap_namespace, snap_name), on_finish);
notify_lock_owner(new SnapProtectPayload(snap_namespace, snap_name),
on_finish);
}
template <typename I>
@ -245,7 +248,8 @@ void ImageWatcher<I>::notify_snap_unprotect(const cls::rbd::SnapshotNamespace &s
ceph_assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
notify_lock_owner(SnapUnprotectPayload(snap_namespace, snap_name), on_finish);
notify_lock_owner(new SnapUnprotectPayload(snap_namespace, snap_name),
on_finish);
}
template <typename I>
@ -259,7 +263,7 @@ void ImageWatcher<I>::notify_rebuild_object_map(uint64_t request_id,
AsyncRequestId async_request_id(get_client_id(), request_id);
notify_async_request(async_request_id,
RebuildObjectMapPayload(async_request_id),
new RebuildObjectMapPayload(async_request_id),
prog_ctx, on_finish);
}
@ -270,7 +274,7 @@ void ImageWatcher<I>::notify_rename(const std::string &image_name,
ceph_assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
notify_lock_owner(RenamePayload(image_name), on_finish);
notify_lock_owner(new RenamePayload(image_name), on_finish);
}
template <typename I>
@ -280,7 +284,7 @@ void ImageWatcher<I>::notify_update_features(uint64_t features, bool enabled,
ceph_assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
notify_lock_owner(UpdateFeaturesPayload(features, enabled), on_finish);
notify_lock_owner(new UpdateFeaturesPayload(features, enabled), on_finish);
}
template <typename I>
@ -293,7 +297,7 @@ void ImageWatcher<I>::notify_migrate(uint64_t request_id,
AsyncRequestId async_request_id(get_client_id(), request_id);
notify_async_request(async_request_id, MigratePayload(async_request_id),
notify_async_request(async_request_id, new MigratePayload(async_request_id),
prog_ctx, on_finish);
}
@ -308,8 +312,8 @@ void ImageWatcher<I>::notify_sparsify(uint64_t request_id, size_t sparse_size,
AsyncRequestId async_request_id(get_client_id(), request_id);
notify_async_request(async_request_id,
SparsifyPayload(async_request_id, sparse_size), prog_ctx,
on_finish);
new SparsifyPayload(async_request_id, sparse_size),
prog_ctx, on_finish);
}
template <typename I>
@ -317,7 +321,7 @@ void ImageWatcher<I>::notify_header_update(Context *on_finish) {
ldout(m_image_ctx.cct, 10) << this << ": " << __func__ << dendl;
// supports legacy (empty buffer) clients
send_notify(HeaderUpdatePayload(), on_finish);
send_notify(new HeaderUpdatePayload(), on_finish);
}
template <typename I>
@ -325,7 +329,7 @@ void ImageWatcher<I>::notify_header_update(librados::IoCtx &io_ctx,
const std::string &oid) {
// supports legacy (empty buffer) clients
bufferlist bl;
encode(NotifyMessage(HeaderUpdatePayload()), bl);
encode(NotifyMessage(new HeaderUpdatePayload()), bl);
io_ctx.notify2(oid, bl, watcher::Notifier::NOTIFY_TIMEOUT, nullptr);
}
@ -371,7 +375,7 @@ void ImageWatcher<I>::notify_acquired_lock() {
set_owner_client_id(client_id);
}
send_notify(AcquiredLockPayload(client_id));
send_notify(new AcquiredLockPayload(client_id));
}
template <typename I>
@ -383,7 +387,7 @@ void ImageWatcher<I>::notify_released_lock() {
set_owner_client_id(ClientId());
}
send_notify(ReleasedLockPayload(get_client_id()));
send_notify(new ReleasedLockPayload(get_client_id()));
}
template <typename I>
@ -429,7 +433,7 @@ void ImageWatcher<I>::notify_request_lock() {
ldout(m_image_ctx.cct, 10) << this << " notify request lock" << dendl;
notify_lock_owner(RequestLockPayload(get_client_id(), false),
notify_lock_owner(new RequestLockPayload(get_client_id(), false),
create_context_callback<
ImageWatcher, &ImageWatcher<I>::handle_request_lock>(this));
}
@ -469,8 +473,7 @@ void ImageWatcher<I>::handle_request_lock(int r) {
}
template <typename I>
void ImageWatcher<I>::notify_lock_owner(const Payload& payload,
Context *on_finish) {
void ImageWatcher<I>::notify_lock_owner(Payload *payload, Context *on_finish) {
ceph_assert(on_finish != nullptr);
ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
@ -520,10 +523,9 @@ void ImageWatcher<I>::async_request_timed_out(const AsyncRequestId &id) {
}
template <typename I>
void ImageWatcher<I>::notify_async_request(const AsyncRequestId &async_request_id,
const Payload& payload,
ProgressContext& prog_ctx,
Context *on_finish) {
void ImageWatcher<I>::notify_async_request(
const AsyncRequestId &async_request_id, Payload *payload,
ProgressContext& prog_ctx, Context *on_finish) {
ceph_assert(on_finish != nullptr);
ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
@ -1039,9 +1041,86 @@ bool ImageWatcher<I>::handle_payload(const UnknownPayload &payload,
template <typename I>
void ImageWatcher<I>::process_payload(uint64_t notify_id, uint64_t handle,
const Payload &payload) {
apply_visitor(HandlePayloadVisitor<ImageWatcher<I>>(this, notify_id, handle),
payload);
Payload *payload) {
auto ctx = new Watcher::C_NotifyAck(this, notify_id, handle);
bool complete;
switch (payload->get_notify_op()) {
case NOTIFY_OP_ACQUIRED_LOCK:
complete = handle_payload(*(static_cast<AcquiredLockPayload *>(payload)),
ctx);
break;
case NOTIFY_OP_RELEASED_LOCK:
complete = handle_payload(*(static_cast<ReleasedLockPayload *>(payload)),
ctx);
break;
case NOTIFY_OP_REQUEST_LOCK:
complete = handle_payload(*(static_cast<RequestLockPayload *>(payload)),
ctx);
break;
case NOTIFY_OP_HEADER_UPDATE:
complete = handle_payload(*(static_cast<HeaderUpdatePayload *>(payload)),
ctx);
break;
case NOTIFY_OP_ASYNC_PROGRESS:
complete = handle_payload(*(static_cast<AsyncProgressPayload *>(payload)),
ctx);
break;
case NOTIFY_OP_ASYNC_COMPLETE:
complete = handle_payload(*(static_cast<AsyncCompletePayload *>(payload)),
ctx);
break;
case NOTIFY_OP_FLATTEN:
complete = handle_payload(*(static_cast<FlattenPayload *>(payload)), ctx);
break;
case NOTIFY_OP_RESIZE:
complete = handle_payload(*(static_cast<ResizePayload *>(payload)), ctx);
break;
case NOTIFY_OP_SNAP_CREATE:
complete = handle_payload(*(static_cast<SnapCreatePayload *>(payload)),
ctx);
break;
case NOTIFY_OP_SNAP_REMOVE:
complete = handle_payload(*(static_cast<SnapRemovePayload *>(payload)),
ctx);
break;
case NOTIFY_OP_SNAP_RENAME:
complete = handle_payload(*(static_cast<SnapRenamePayload *>(payload)),
ctx);
break;
case NOTIFY_OP_SNAP_PROTECT:
complete = handle_payload(*(static_cast<SnapProtectPayload *>(payload)),
ctx);
break;
case NOTIFY_OP_SNAP_UNPROTECT:
complete = handle_payload(*(static_cast<SnapUnprotectPayload *>(payload)),
ctx);
break;
case NOTIFY_OP_REBUILD_OBJECT_MAP:
complete = handle_payload(*(static_cast<RebuildObjectMapPayload *>(payload)),
ctx);
break;
case NOTIFY_OP_RENAME:
complete = handle_payload(*(static_cast<RenamePayload *>(payload)), ctx);
break;
case NOTIFY_OP_UPDATE_FEATURES:
complete = handle_payload(*(static_cast<UpdateFeaturesPayload *>(payload)),
ctx);
break;
case NOTIFY_OP_MIGRATE:
complete = handle_payload(*(static_cast<MigratePayload *>(payload)), ctx);
break;
case NOTIFY_OP_SPARSIFY:
complete = handle_payload(*(static_cast<SparsifyPayload *>(payload)), ctx);
break;
default:
ceph_assert(payload->get_notify_op() == static_cast<NotifyOp>(-1));
complete = handle_payload(*(static_cast<UnknownPayload *>(payload)), ctx);
}
if (complete) {
ctx->complete(0);
}
}
template <typename I>
@ -1050,7 +1129,7 @@ void ImageWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
NotifyMessage notify_message;
if (bl.length() == 0) {
// legacy notification for header updates
notify_message = NotifyMessage(HeaderUpdatePayload());
notify_message = NotifyMessage(new HeaderUpdatePayload());
} else {
try {
auto iter = bl.cbegin();
@ -1065,10 +1144,11 @@ void ImageWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
// if an image refresh is required, refresh before processing the request
if (notify_message.check_for_refresh() &&
m_image_ctx.state->is_refresh_required()) {
m_image_ctx.state->refresh(new C_ProcessPayload(this, notify_id, handle,
notify_message.payload));
m_image_ctx.state->refresh(
new C_ProcessPayload(this, notify_id, handle,
std::move(notify_message.payload)));
} else {
process_payload(notify_id, handle, notify_message.payload);
process_payload(notify_id, handle, notify_message.payload.get());
}
}
@ -1103,7 +1183,7 @@ void ImageWatcher<I>::handle_rewatch_complete(int r) {
}
template <typename I>
void ImageWatcher<I>::send_notify(const Payload &payload, Context *ctx) {
void ImageWatcher<I>::send_notify(Payload *payload, Context *ctx) {
bufferlist bl;
encode(NotifyMessage(payload), bl);

View File

@ -18,18 +18,11 @@ class entity_name_t;
namespace librbd {
namespace watcher {
namespace util {
template <typename> struct HandlePayloadVisitor;
}
}
class ImageCtx;
template <typename> class TaskFinisher;
template <typename ImageCtxT = ImageCtx>
class ImageWatcher : public Watcher {
friend struct watcher::util::HandlePayloadVisitor<ImageWatcher<ImageCtxT>>;
public:
ImageWatcher(ImageCtxT& image_ctx);
@ -182,14 +175,13 @@ private:
void handle_request_lock(int r);
void schedule_request_lock(bool use_timer, int timer_delay = -1);
void notify_lock_owner(const watch_notify::Payload& payload,
Context *on_finish);
void notify_lock_owner(watch_notify::Payload *payload, Context *on_finish);
Context *remove_async_request(const watch_notify::AsyncRequestId &id);
void schedule_async_request_timed_out(const watch_notify::AsyncRequestId &id);
void async_request_timed_out(const watch_notify::AsyncRequestId &id);
void notify_async_request(const watch_notify::AsyncRequestId &id,
const watch_notify::Payload &payload,
watch_notify::Payload *payload,
ProgressContext& prog_ctx,
Context *on_finish);
@ -245,15 +237,14 @@ private:
bool handle_payload(const watch_notify::UnknownPayload& payload,
C_NotifyAck *ctx);
void process_payload(uint64_t notify_id, uint64_t handle,
const watch_notify::Payload &payload);
watch_notify::Payload *payload);
void handle_notify(uint64_t notify_id, uint64_t handle,
uint64_t notifier_id, bufferlist &bl) override;
void handle_error(uint64_t cookie, int err) override;
void handle_rewatch_complete(int r) override;
void send_notify(const watch_notify::Payload& payload,
Context *ctx = nullptr);
void send_notify(watch_notify::Payload *payload, Context *ctx = nullptr);
};

View File

@ -6,46 +6,10 @@
#include "include/ceph_assert.h"
#include "include/stringify.h"
#include "librbd/WatchNotifyTypes.h"
#include "librbd/watcher/Utils.h"
namespace librbd {
namespace watch_notify {
namespace {
class CheckForRefreshVisitor : public boost::static_visitor<bool> {
public:
template <typename Payload>
inline bool operator()(const Payload &payload) const {
return Payload::CHECK_FOR_REFRESH;
}
};
class GetNotifyOpVisitor : public boost::static_visitor<NotifyOp> {
public:
template <typename Payload>
NotifyOp operator()(const Payload &payload) const {
return Payload::NOTIFY_OP;
}
};
class DumpPayloadVisitor : public boost::static_visitor<void> {
public:
explicit DumpPayloadVisitor(Formatter *formatter) : m_formatter(formatter) {}
template <typename Payload>
inline void operator()(const Payload &payload) const {
NotifyOp notify_op = Payload::NOTIFY_OP;
m_formatter->dump_string("notify_op", stringify(notify_op));
payload.dump(m_formatter);
}
private:
ceph::Formatter *m_formatter;
};
} // anonymous namespace
void AsyncRequestId::encode(bufferlist &bl) const {
using ceph::encode;
encode(client_id, bl);
@ -320,12 +284,13 @@ void UnknownPayload::dump(Formatter *f) const {
}
bool NotifyMessage::check_for_refresh() const {
return boost::apply_visitor(CheckForRefreshVisitor(), payload);
return payload->check_for_refresh();
}
void NotifyMessage::encode(bufferlist& bl) const {
ENCODE_START(6, 1, bl);
boost::apply_visitor(watcher::util::EncodePayloadVisitor(bl), payload);
encode(static_cast<uint32_t>(payload->get_notify_op()), bl);
payload->encode(bl);
ENCODE_FINISH(bl);
}
@ -338,95 +303,92 @@ void NotifyMessage::decode(bufferlist::const_iterator& iter) {
// select the correct payload variant based upon the encoded op
switch (notify_op) {
case NOTIFY_OP_ACQUIRED_LOCK:
payload = AcquiredLockPayload();
payload.reset(new AcquiredLockPayload());
break;
case NOTIFY_OP_RELEASED_LOCK:
payload = ReleasedLockPayload();
payload.reset(new ReleasedLockPayload());
break;
case NOTIFY_OP_REQUEST_LOCK:
payload = RequestLockPayload();
payload.reset(new RequestLockPayload());
break;
case NOTIFY_OP_HEADER_UPDATE:
payload = HeaderUpdatePayload();
payload.reset(new HeaderUpdatePayload());
break;
case NOTIFY_OP_ASYNC_PROGRESS:
payload = AsyncProgressPayload();
payload.reset(new AsyncProgressPayload());
break;
case NOTIFY_OP_ASYNC_COMPLETE:
payload = AsyncCompletePayload();
payload.reset(new AsyncCompletePayload());
break;
case NOTIFY_OP_FLATTEN:
payload = FlattenPayload();
payload.reset(new FlattenPayload());
break;
case NOTIFY_OP_RESIZE:
payload = ResizePayload();
payload.reset(new ResizePayload());
break;
case NOTIFY_OP_SNAP_CREATE:
payload = SnapCreatePayload();
payload.reset(new SnapCreatePayload());
break;
case NOTIFY_OP_SNAP_REMOVE:
payload = SnapRemovePayload();
payload.reset(new SnapRemovePayload());
break;
case NOTIFY_OP_SNAP_RENAME:
payload = SnapRenamePayload();
payload.reset(new SnapRenamePayload());
break;
case NOTIFY_OP_SNAP_PROTECT:
payload = SnapProtectPayload();
payload.reset(new SnapProtectPayload());
break;
case NOTIFY_OP_SNAP_UNPROTECT:
payload = SnapUnprotectPayload();
payload.reset(new SnapUnprotectPayload());
break;
case NOTIFY_OP_REBUILD_OBJECT_MAP:
payload = RebuildObjectMapPayload();
payload.reset(new RebuildObjectMapPayload());
break;
case NOTIFY_OP_RENAME:
payload = RenamePayload();
payload.reset(new RenamePayload());
break;
case NOTIFY_OP_UPDATE_FEATURES:
payload = UpdateFeaturesPayload();
payload.reset(new UpdateFeaturesPayload());
break;
case NOTIFY_OP_MIGRATE:
payload = MigratePayload();
payload.reset(new MigratePayload());
break;
case NOTIFY_OP_SPARSIFY:
payload = SparsifyPayload();
break;
default:
payload = UnknownPayload();
payload.reset(new SparsifyPayload());
break;
}
apply_visitor(watcher::util::DecodePayloadVisitor(struct_v, iter), payload);
payload->decode(struct_v, iter);
DECODE_FINISH(iter);
}
void NotifyMessage::dump(Formatter *f) const {
apply_visitor(DumpPayloadVisitor(f), payload);
payload->dump(f);
}
NotifyOp NotifyMessage::get_notify_op() const {
return apply_visitor(GetNotifyOpVisitor(), payload);
return payload->get_notify_op();
}
void NotifyMessage::generate_test_instances(std::list<NotifyMessage *> &o) {
o.push_back(new NotifyMessage(AcquiredLockPayload(ClientId(1, 2))));
o.push_back(new NotifyMessage(ReleasedLockPayload(ClientId(1, 2))));
o.push_back(new NotifyMessage(RequestLockPayload(ClientId(1, 2), true)));
o.push_back(new NotifyMessage(HeaderUpdatePayload()));
o.push_back(new NotifyMessage(AsyncProgressPayload(AsyncRequestId(ClientId(0, 1), 2), 3, 4)));
o.push_back(new NotifyMessage(AsyncCompletePayload(AsyncRequestId(ClientId(0, 1), 2), 3)));
o.push_back(new NotifyMessage(FlattenPayload(AsyncRequestId(ClientId(0, 1), 2))));
o.push_back(new NotifyMessage(ResizePayload(123, true, AsyncRequestId(ClientId(0, 1), 2))));
o.push_back(new NotifyMessage(SnapCreatePayload(cls::rbd::UserSnapshotNamespace(),
"foo")));
o.push_back(new NotifyMessage(SnapRemovePayload(cls::rbd::UserSnapshotNamespace(), "foo")));
o.push_back(new NotifyMessage(SnapProtectPayload(cls::rbd::UserSnapshotNamespace(), "foo")));
o.push_back(new NotifyMessage(SnapUnprotectPayload(cls::rbd::UserSnapshotNamespace(), "foo")));
o.push_back(new NotifyMessage(RebuildObjectMapPayload(AsyncRequestId(ClientId(0, 1), 2))));
o.push_back(new NotifyMessage(RenamePayload("foo")));
o.push_back(new NotifyMessage(UpdateFeaturesPayload(1, true)));
o.push_back(new NotifyMessage(MigratePayload(AsyncRequestId(ClientId(0, 1), 2))));
o.push_back(new NotifyMessage(SparsifyPayload(AsyncRequestId(ClientId(0, 1), 2), 1)));
o.push_back(new NotifyMessage(new AcquiredLockPayload(ClientId(1, 2))));
o.push_back(new NotifyMessage(new ReleasedLockPayload(ClientId(1, 2))));
o.push_back(new NotifyMessage(new RequestLockPayload(ClientId(1, 2), true)));
o.push_back(new NotifyMessage(new HeaderUpdatePayload()));
o.push_back(new NotifyMessage(new AsyncProgressPayload(AsyncRequestId(ClientId(0, 1), 2), 3, 4)));
o.push_back(new NotifyMessage(new AsyncCompletePayload(AsyncRequestId(ClientId(0, 1), 2), 3)));
o.push_back(new NotifyMessage(new FlattenPayload(AsyncRequestId(ClientId(0, 1), 2))));
o.push_back(new NotifyMessage(new ResizePayload(123, true, AsyncRequestId(ClientId(0, 1), 2))));
o.push_back(new NotifyMessage(new SnapCreatePayload(cls::rbd::UserSnapshotNamespace(),
"foo")));
o.push_back(new NotifyMessage(new SnapRemovePayload(cls::rbd::UserSnapshotNamespace(), "foo")));
o.push_back(new NotifyMessage(new SnapProtectPayload(cls::rbd::UserSnapshotNamespace(), "foo")));
o.push_back(new NotifyMessage(new SnapUnprotectPayload(cls::rbd::UserSnapshotNamespace(), "foo")));
o.push_back(new NotifyMessage(new RebuildObjectMapPayload(AsyncRequestId(ClientId(0, 1), 2))));
o.push_back(new NotifyMessage(new RenamePayload("foo")));
o.push_back(new NotifyMessage(new UpdateFeaturesPayload(1, true)));
o.push_back(new NotifyMessage(new MigratePayload(AsyncRequestId(ClientId(0, 1), 2))));
o.push_back(new NotifyMessage(new SparsifyPayload(AsyncRequestId(ClientId(0, 1), 2), 1)));
}
void ResponseMessage::encode(bufferlist& bl) const {

View File

@ -10,6 +10,7 @@
#include "librbd/watcher/Types.h"
#include <iosfwd>
#include <list>
#include <memory>
#include <string>
#include <boost/variant.hpp>
@ -69,67 +70,94 @@ enum NotifyOp {
NOTIFY_OP_SPARSIFY = 17,
};
struct AcquiredLockPayload {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_ACQUIRED_LOCK;
static const bool CHECK_FOR_REFRESH = false;
struct Payload {
virtual ~Payload() {}
virtual NotifyOp get_notify_op() const = 0;
virtual bool check_for_refresh() const = 0;
virtual void encode(bufferlist &bl) const = 0;
virtual void decode(__u8 version, bufferlist::const_iterator &iter) = 0;
virtual void dump(Formatter *f) const = 0;
};
struct AcquiredLockPayload : public Payload {
ClientId client_id;
AcquiredLockPayload() {}
AcquiredLockPayload(const ClientId &client_id_) : client_id(client_id_) {}
AcquiredLockPayload(const ClientId &client_id) : client_id(client_id) {}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_ACQUIRED_LOCK;
}
bool check_for_refresh() const override {
return false;
}
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct ReleasedLockPayload {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_RELEASED_LOCK;
static const bool CHECK_FOR_REFRESH = false;
struct ReleasedLockPayload : public Payload {
ClientId client_id;
ReleasedLockPayload() {}
ReleasedLockPayload(const ClientId &client_id_) : client_id(client_id_) {}
ReleasedLockPayload(const ClientId &client_id) : client_id(client_id) {}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_RELEASED_LOCK;
}
bool check_for_refresh() const override {
return false;
}
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct RequestLockPayload {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_REQUEST_LOCK;
static const bool CHECK_FOR_REFRESH = false;
struct RequestLockPayload : public Payload {
ClientId client_id;
bool force = false;
RequestLockPayload() {}
RequestLockPayload(const ClientId &client_id_, bool force_)
: client_id(client_id_), force(force_) {
RequestLockPayload(const ClientId &client_id, bool force)
: client_id(client_id), force(force) {
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_REQUEST_LOCK;
}
bool check_for_refresh() const override {
return false;
}
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct HeaderUpdatePayload {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_HEADER_UPDATE;
static const bool CHECK_FOR_REFRESH = false;
struct HeaderUpdatePayload : public Payload {
NotifyOp get_notify_op() const override {
return NOTIFY_OP_HEADER_UPDATE;
}
bool check_for_refresh() const override {
return false;
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct AsyncRequestPayloadBase {
struct AsyncRequestPayloadBase : public Payload {
public:
AsyncRequestId async_request_id;
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
protected:
AsyncRequestPayloadBase() {}
@ -137,229 +165,262 @@ protected:
};
struct AsyncProgressPayload : public AsyncRequestPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_ASYNC_PROGRESS;
static const bool CHECK_FOR_REFRESH = false;
uint64_t offset = 0;
uint64_t total = 0;
AsyncProgressPayload() : offset(0), total(0) {}
AsyncProgressPayload(const AsyncRequestId &id, uint64_t offset_, uint64_t total_)
: AsyncRequestPayloadBase(id), offset(offset_), total(total_) {}
AsyncProgressPayload() {}
AsyncProgressPayload(const AsyncRequestId &id, uint64_t offset, uint64_t total)
: AsyncRequestPayloadBase(id), offset(offset), total(total) {}
uint64_t offset;
uint64_t total;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_ASYNC_PROGRESS;
}
bool check_for_refresh() const override {
return false;
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct AsyncCompletePayload : public AsyncRequestPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_ASYNC_COMPLETE;
static const bool CHECK_FOR_REFRESH = false;
int result = 0;
AsyncCompletePayload() : result(0) {}
AsyncCompletePayload() {}
AsyncCompletePayload(const AsyncRequestId &id, int r)
: AsyncRequestPayloadBase(id), result(r) {}
int result;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_ASYNC_COMPLETE;
}
bool check_for_refresh() const override {
return false;
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct FlattenPayload : public AsyncRequestPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_FLATTEN;
static const bool CHECK_FOR_REFRESH = true;
FlattenPayload() {}
FlattenPayload(const AsyncRequestId &id) : AsyncRequestPayloadBase(id) {}
NotifyOp get_notify_op() const override {
return NOTIFY_OP_FLATTEN;
}
bool check_for_refresh() const override {
return true;
}
};
struct ResizePayload : public AsyncRequestPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_RESIZE;
static const bool CHECK_FOR_REFRESH = true;
uint64_t size = 0;
bool allow_shrink = true;
ResizePayload() : size(0), allow_shrink(true) {}
ResizePayload(uint64_t size_, bool allow_shrink_, const AsyncRequestId &id)
: AsyncRequestPayloadBase(id), size(size_), allow_shrink(allow_shrink_) {}
ResizePayload() {}
ResizePayload(uint64_t size, bool allow_shrink, const AsyncRequestId &id)
: AsyncRequestPayloadBase(id), size(size), allow_shrink(allow_shrink) {}
uint64_t size;
bool allow_shrink;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_RESIZE;
}
bool check_for_refresh() const override {
return true;
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct SnapPayloadBase {
struct SnapPayloadBase : public Payload {
public:
static const bool CHECK_FOR_REFRESH = true;
cls::rbd::SnapshotNamespace snap_namespace;
std::string snap_name;
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
bool check_for_refresh() const override {
return true;
}
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
protected:
SnapPayloadBase() {}
SnapPayloadBase(const cls::rbd::SnapshotNamespace& _snap_namespace,
SnapPayloadBase(const cls::rbd::SnapshotNamespace& snap_namespace,
const std::string &name)
: snap_namespace(_snap_namespace), snap_name(name) {}
: snap_namespace(snap_namespace), snap_name(name) {}
};
struct SnapCreatePayload : public SnapPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_SNAP_CREATE;
SnapCreatePayload() {}
SnapCreatePayload(const cls::rbd::SnapshotNamespace &_snap_namespace,
const std::string &name)
: SnapPayloadBase(_snap_namespace, name) {}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_SNAP_CREATE;
}
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct SnapRenamePayload : public SnapPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_SNAP_RENAME;
uint64_t snap_id = 0;
SnapRenamePayload() {}
SnapRenamePayload(const uint64_t &src_snap_id,
const std::string &dst_name)
: SnapPayloadBase(cls::rbd::UserSnapshotNamespace(), dst_name), snap_id(src_snap_id) {}
uint64_t snap_id = 0;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_SNAP_RENAME;
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct SnapRemovePayload : public SnapPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_SNAP_REMOVE;
SnapRemovePayload() {}
SnapRemovePayload(const cls::rbd::SnapshotNamespace& snap_namespace,
const std::string &name)
: SnapPayloadBase(snap_namespace, name) {}
NotifyOp get_notify_op() const override {
return NOTIFY_OP_SNAP_REMOVE;
}
};
struct SnapProtectPayload : public SnapPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_SNAP_PROTECT;
SnapProtectPayload() {}
SnapProtectPayload(const cls::rbd::SnapshotNamespace& snap_namespace,
const std::string &name)
: SnapPayloadBase(snap_namespace, name) {}
NotifyOp get_notify_op() const override {
return NOTIFY_OP_SNAP_PROTECT;
}
};
struct SnapUnprotectPayload : public SnapPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_SNAP_UNPROTECT;
SnapUnprotectPayload() {}
SnapUnprotectPayload(const cls::rbd::SnapshotNamespace& snap_namespace,
const std::string &name)
: SnapPayloadBase(snap_namespace, name) {}
NotifyOp get_notify_op() const override {
return NOTIFY_OP_SNAP_UNPROTECT;
}
};
struct RebuildObjectMapPayload : public AsyncRequestPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_REBUILD_OBJECT_MAP;
static const bool CHECK_FOR_REFRESH = true;
RebuildObjectMapPayload() {}
RebuildObjectMapPayload(const AsyncRequestId &id)
: AsyncRequestPayloadBase(id) {}
NotifyOp get_notify_op() const override {
return NOTIFY_OP_REBUILD_OBJECT_MAP;
}
bool check_for_refresh() const override {
return true;
}
};
struct RenamePayload {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_RENAME;
static const bool CHECK_FOR_REFRESH = true;
struct RenamePayload : public Payload {
std::string image_name;
RenamePayload() {}
RenamePayload(const std::string _image_name) : image_name(_image_name) {}
std::string image_name;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_RENAME;
}
bool check_for_refresh() const override {
return true;
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
};
struct UpdateFeaturesPayload {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_UPDATE_FEATURES;
static const bool CHECK_FOR_REFRESH = true;
struct UpdateFeaturesPayload : public Payload {
uint64_t features = 0;
bool enabled = false;
UpdateFeaturesPayload() : features(0), enabled(false) {}
UpdateFeaturesPayload(uint64_t features_, bool enabled_)
: features(features_), enabled(enabled_) {}
UpdateFeaturesPayload() {}
UpdateFeaturesPayload(uint64_t features, bool enabled)
: features(features), enabled(enabled) {}
uint64_t features;
bool enabled;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_UPDATE_FEATURES;
}
bool check_for_refresh() const override {
return true;
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct MigratePayload : public AsyncRequestPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_MIGRATE;
static const bool CHECK_FOR_REFRESH = true;
MigratePayload() {}
MigratePayload(const AsyncRequestId &id) : AsyncRequestPayloadBase(id) {}
NotifyOp get_notify_op() const override {
return NOTIFY_OP_MIGRATE;
}
bool check_for_refresh() const override {
return true;
}
};
struct SparsifyPayload : public AsyncRequestPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_SPARSIFY;
static const bool CHECK_FOR_REFRESH = true;
size_t sparse_size = 0;
SparsifyPayload() {}
SparsifyPayload(const AsyncRequestId &id, size_t sparse_size)
: AsyncRequestPayloadBase(id), sparse_size(sparse_size) {}
: AsyncRequestPayloadBase(id), sparse_size(sparse_size) {
}
size_t sparse_size = 0;
NotifyOp get_notify_op() const override {
return NOTIFY_OP_SPARSIFY;
}
bool check_for_refresh() const override {
return true;
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
struct UnknownPayload {
static const NotifyOp NOTIFY_OP = static_cast<NotifyOp>(-1);
static const bool CHECK_FOR_REFRESH = false;
struct UnknownPayload : public Payload {
NotifyOp get_notify_op() const override {
return static_cast<NotifyOp>(-1);
}
bool check_for_refresh() const override {
return false;
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::const_iterator &iter);
void dump(Formatter *f) const;
void encode(bufferlist &bl) const override;
void decode(__u8 version, bufferlist::const_iterator &iter) override;
void dump(Formatter *f) const override;
};
typedef boost::variant<AcquiredLockPayload,
ReleasedLockPayload,
RequestLockPayload,
HeaderUpdatePayload,
AsyncProgressPayload,
AsyncCompletePayload,
FlattenPayload,
ResizePayload,
SnapCreatePayload,
SnapRemovePayload,
SnapRenamePayload,
SnapProtectPayload,
SnapUnprotectPayload,
RebuildObjectMapPayload,
RenamePayload,
UpdateFeaturesPayload,
MigratePayload,
SparsifyPayload,
UnknownPayload> Payload;
struct NotifyMessage {
NotifyMessage() : payload(UnknownPayload()) {}
NotifyMessage(const Payload &payload_) : payload(payload_) {}
NotifyMessage() : payload(new UnknownPayload()) {}
NotifyMessage(Payload *payload) : payload(payload) {}
Payload payload;
std::unique_ptr<Payload> payload;
bool check_for_refresh() const;

View File

@ -142,7 +142,8 @@ void ImageMeta<I>::notify_update(Context* on_finish) {
// open a non-primary image read/write and therefore cannot re-use
// the ImageWatcher to send the notification
bufferlist bl;
encode(watch_notify::NotifyMessage(watch_notify::HeaderUpdatePayload()), bl);
encode(watch_notify::NotifyMessage(new watch_notify::HeaderUpdatePayload()),
bl);
m_out_bl.clear();
auto ctx = new LambdaContext([this, on_finish](int r) {

View File

@ -186,14 +186,14 @@ public:
int notify_async_progress(librbd::ImageCtx *ictx, const AsyncRequestId &id,
uint64_t offset, uint64_t total) {
bufferlist bl;
encode(NotifyMessage(AsyncProgressPayload(id, offset, total)), bl);
encode(NotifyMessage(new AsyncProgressPayload(id, offset, total)), bl);
return m_ioctx.notify2(ictx->header_oid, bl, 5000, NULL);
}
int notify_async_complete(librbd::ImageCtx *ictx, const AsyncRequestId &id,
int r) {
bufferlist bl;
encode(NotifyMessage(AsyncCompletePayload(id, r)), bl);
encode(NotifyMessage(new AsyncCompletePayload(id, r)), bl);
return m_ioctx.notify2(ictx->header_oid, bl, 5000, NULL);
}

View File

@ -8,7 +8,7 @@ TYPE(librbd::mirroring_watcher::NotifyMessage)
#include "librbd/trash_watcher/Types.h"
TYPE(librbd::mirroring_watcher::NotifyMessage)
#include "librbd/WatchNotifyTypes.h"
TYPE(librbd::watch_notify::NotifyMessage)
TYPE_NOCOPY(librbd::watch_notify::NotifyMessage)
TYPE(librbd::watch_notify::ResponseMessage)
#include "rbd_replay/ActionTypes.h"

View File

@ -34,7 +34,7 @@ public:
using namespace librbd::watch_notify;
NotifyMessage notify_message;
if (bl.length() == 0) {
notify_message = NotifyMessage(HeaderUpdatePayload());
notify_message = NotifyMessage(new HeaderUpdatePayload());
} else {
try {
auto iter = bl.cbegin();