rbd-mirror A/A: coordinate image syncs with leader

Fixes: http://tracker.ceph.com/issues/18789
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
This commit is contained in:
Mykola Golub 2017-04-17 21:05:25 +02:00
parent 55f9c625bd
commit 4db1ee39c0
32 changed files with 1612 additions and 777 deletions

View File

@ -3,7 +3,7 @@
#include "test/rbd_mirror/test_mock_fixture.h"
#include "librbd/journal/TypeTraits.h"
#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
@ -44,16 +44,42 @@ namespace mirror {
class ProgressContext;
template<>
struct ImageSyncThrottler<librbd::MockTestImageCtx> {
MOCK_METHOD10(start_sync, void(librbd::MockTestImageCtx *local_image_ctx,
librbd::MockTestImageCtx *remote_image_ctx,
SafeTimer *timer, Mutex *timer_lock,
const std::string &mirror_uuid,
::journal::MockJournaler *journaler,
librbd::journal::MirrorPeerClientMeta *client_meta,
ContextWQ *work_queue, Context *on_finish,
ProgressContext *progress_ctx));
MOCK_METHOD1(cancel_sync, void(const std::string& mirror_uuid));
struct ImageSync<librbd::MockTestImageCtx> {
static ImageSync* s_instance;
Context *on_finish = nullptr;
static ImageSync* create(
librbd::MockTestImageCtx *local_image_ctx,
librbd::MockTestImageCtx *remote_image_ctx,
SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid,
::journal::MockJournaler *journaler,
librbd::journal::MirrorPeerClientMeta *client_meta, ContextWQ *work_queue,
InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
Context *on_finish, ProgressContext *progress_ctx) {
assert(s_instance != nullptr);
s_instance->on_finish = on_finish;
return s_instance;
}
ImageSync() {
assert(s_instance == nullptr);
s_instance = this;
}
~ImageSync() {
s_instance = nullptr;
}
MOCK_METHOD0(get, void());
MOCK_METHOD0(put, void());
MOCK_METHOD0(send, void());
MOCK_METHOD0(cancel, void());
};
ImageSync<librbd::MockTestImageCtx>*
ImageSync<librbd::MockTestImageCtx>::s_instance = nullptr;
template<>
struct InstanceWatcher<librbd::MockTestImageCtx> {
};
namespace image_replayer {
@ -240,10 +266,11 @@ MATCHER_P(IsSameIoCtx, io_ctx, "") {
class TestMockImageReplayerBootstrapRequest : public TestMockFixture {
public:
typedef ImageSyncThrottlerRef<librbd::MockTestImageCtx> MockImageSyncThrottler;
typedef BootstrapRequest<librbd::MockTestImageCtx> MockBootstrapRequest;
typedef CloseImageRequest<librbd::MockTestImageCtx> MockCloseImageRequest;
typedef CreateImageRequest<librbd::MockTestImageCtx> MockCreateImageRequest;
typedef ImageSync<librbd::MockTestImageCtx> MockImageSync;
typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
typedef IsPrimaryRequest<librbd::MockTestImageCtx> MockIsPrimaryRequest;
typedef OpenImageRequest<librbd::MockTestImageCtx> MockOpenImageRequest;
typedef OpenLocalImageRequest<librbd::MockTestImageCtx> MockOpenLocalImageRequest;
@ -380,14 +407,13 @@ public:
}));
}
void expect_image_sync(MockImageSyncThrottler image_sync_throttler,
int r) {
EXPECT_CALL(*image_sync_throttler, start_sync(_, _, _, _,
StrEq("local mirror uuid"),
_, _, _, _, _))
.WillOnce(WithArg<8>(Invoke([this, r](Context *on_finish) {
m_threads->work_queue->queue(on_finish, r);
})));
void expect_image_sync(MockImageSync &mock_image_sync, int r) {
EXPECT_CALL(mock_image_sync, get());
EXPECT_CALL(mock_image_sync, send())
.WillOnce(Invoke([this, &mock_image_sync, r]() {
m_threads->work_queue->queue(mock_image_sync.on_finish, r);
}));
EXPECT_CALL(mock_image_sync, put());
}
bufferlist encode_tag_data(const librbd::journal::TagData &tag_data) {
@ -396,7 +422,7 @@ public:
return bl;
}
MockBootstrapRequest *create_request(MockImageSyncThrottler mock_image_sync_throttler,
MockBootstrapRequest *create_request(MockInstanceWatcher *mock_instance_watcher,
::journal::MockJournaler &mock_journaler,
const std::string &local_image_id,
const std::string &remote_image_id,
@ -406,7 +432,7 @@ public:
Context *on_finish) {
return new MockBootstrapRequest(m_local_io_ctx,
m_remote_io_ctx,
mock_image_sync_throttler,
mock_instance_watcher,
&m_local_test_image_ctx,
local_image_id,
remote_image_id,
@ -472,10 +498,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, NonPrimaryRemoteSyncingState) {
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
MockImageSyncThrottler mock_image_sync_throttler(
new ImageSyncThrottler<librbd::MockTestImageCtx>());
MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
&mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
@ -547,10 +572,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, RemoteDemotePromote) {
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
MockImageSyncThrottler mock_image_sync_throttler(
new ImageSyncThrottler<librbd::MockTestImageCtx>());
MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
&mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
@ -632,10 +656,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, MultipleRemoteDemotePromotes) {
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
MockImageSyncThrottler mock_image_sync_throttler(
new ImageSyncThrottler<librbd::MockTestImageCtx>());
MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
&mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
@ -705,10 +728,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, LocalDemoteRemotePromote) {
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
MockImageSyncThrottler mock_image_sync_throttler(
new ImageSyncThrottler<librbd::MockTestImageCtx>());
MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
&mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
@ -777,10 +799,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, SplitBrainForcePromote) {
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
MockImageSyncThrottler mock_image_sync_throttler(
new ImageSyncThrottler<librbd::MockTestImageCtx>());
MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
&mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
@ -837,10 +858,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, ResyncRequested) {
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
MockImageSyncThrottler mock_image_sync_throttler(
new ImageSyncThrottler<librbd::MockTestImageCtx>());
MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
&mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
m_do_resync = false;
@ -905,16 +925,16 @@ TEST_F(TestMockImageReplayerBootstrapRequest, PrimaryRemote) {
expect_journaler_update_client(mock_journaler, client_data, 0);
// sync the remote image to the local image
MockImageSyncThrottler mock_image_sync_throttler(
new ImageSyncThrottler<librbd::MockTestImageCtx>());
expect_image_sync(mock_image_sync_throttler, 0);
MockImageSync mock_image_sync;
expect_image_sync(mock_image_sync, 0);
MockCloseImageRequest mock_close_image_request;
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
mock_image_sync_throttler, mock_journaler, "",
&mock_instance_watcher, mock_journaler, "",
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
@ -981,16 +1001,16 @@ TEST_F(TestMockImageReplayerBootstrapRequest, PrimaryRemoteLocalDeleted) {
expect_journaler_update_client(mock_journaler, client_data, 0);
// sync the remote image to the local image
MockImageSyncThrottler mock_image_sync_throttler(
new ImageSyncThrottler<librbd::MockTestImageCtx>());
expect_image_sync(mock_image_sync_throttler, 0);
MockImageSync mock_image_sync;
expect_image_sync(mock_image_sync, 0);
MockCloseImageRequest mock_close_image_request;
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
mock_image_sync_throttler, mock_journaler, "",
&mock_instance_watcher, mock_journaler, "",
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();

View File

@ -36,7 +36,7 @@
#include "librbd/io/ReadResult.h"
#include "tools/rbd_mirror/types.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
#include "tools/rbd_mirror/ImageDeleter.h"
@ -118,14 +118,19 @@ public:
m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_threads->work_queue,
m_threads->timer,
&m_threads->timer_lock));
m_image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>());
m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
m_local_ioctx, m_threads->work_queue, nullptr);
m_instance_watcher->handle_acquire_leader();
}
~TestImageReplayer() override
~TestImageReplayer() override
{
unwatch();
m_instance_watcher->handle_release_leader();
delete m_replayer;
delete m_instance_watcher;
delete m_threads;
EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
@ -134,9 +139,10 @@ public:
template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
void create_replayer() {
m_replayer = new ImageReplayerT(m_threads, m_image_deleter, m_image_sync_throttler,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id");
m_replayer = new ImageReplayerT(
m_threads, m_image_deleter, m_instance_watcher,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id");
m_replayer->add_remote_image(m_remote_mirror_uuid, m_remote_image_id,
m_remote_ioctx);
}
@ -364,7 +370,7 @@ public:
std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
std::shared_ptr<librados::Rados> m_local_cluster;
librados::Rados m_remote_cluster;
std::shared_ptr<rbd::mirror::ImageSyncThrottler<>> m_image_sync_throttler;
rbd::mirror::InstanceWatcher<> *m_instance_watcher;
std::string m_local_mirror_uuid = "local mirror uuid";
std::string m_remote_mirror_uuid = "remote mirror uuid";
std::string m_local_pool_name, m_remote_pool_name;

View File

@ -15,6 +15,7 @@
#include "librbd/io/ReadResult.h"
#include "librbd/journal/Types.h"
#include "tools/rbd_mirror/ImageSync.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
void register_test_image_sync() {
@ -55,6 +56,10 @@ public:
create_and_open(m_local_io_ctx, &m_local_image_ctx);
create_and_open(m_remote_io_ctx, &m_remote_image_ctx);
m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
m_local_io_ctx, m_threads->work_queue, nullptr);
m_instance_watcher->handle_acquire_leader();
m_remote_journaler = new ::journal::Journaler(
m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
m_remote_io_ctx, m_remote_image_ctx->id, "mirror-uuid", {});
@ -70,7 +75,11 @@ public:
void TearDown() override {
TestFixture::TearDown();
m_instance_watcher->handle_release_leader();
delete m_remote_journaler;
delete m_instance_watcher;
}
void create_and_open(librados::IoCtx &io_ctx, librbd::ImageCtx **image_ctx) {
@ -91,11 +100,12 @@ public:
return new ImageSync<>(m_local_image_ctx, m_remote_image_ctx,
m_threads->timer, &m_threads->timer_lock,
"mirror-uuid", m_remote_journaler, &m_client_meta,
m_threads->work_queue, ctx);
m_threads->work_queue, m_instance_watcher, ctx);
}
librbd::ImageCtx *m_remote_image_ctx;
librbd::ImageCtx *m_local_image_ctx;
rbd::mirror::InstanceWatcher<> *m_instance_watcher;
::journal::Journaler *m_remote_journaler;
librbd::journal::MirrorPeerClientMeta m_client_meta;
};

View File

@ -71,6 +71,9 @@ public:
}
}
void update_leader_handler(const std::string &leader_instance_id) override {
}
private:
mutable Mutex m_test_lock;
int m_acquire_count = 0;

View File

@ -5,11 +5,11 @@
#include "librbd/journal/Replay.h"
#include "librbd/journal/Types.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "test/journal/mock/MockJournaler.h"
#include "test/librbd/mock/MockImageCtx.h"
@ -61,7 +61,7 @@ namespace rbd {
namespace mirror {
template<>
class ImageSyncThrottler<librbd::MockTestImageCtx> {
class InstanceWatcher<librbd::MockTestImageCtx> {
};
namespace image_replayer {
@ -110,22 +110,18 @@ struct BootstrapRequest<librbd::MockTestImageCtx> {
Context *on_finish = nullptr;
bool *do_resync = nullptr;
static BootstrapRequest* create(librados::IoCtx &local_io_ctx,
librados::IoCtx &remote_io_ctx,
rbd::mirror::ImageSyncThrottlerRef<librbd::MockTestImageCtx> image_sync_throttler,
librbd::MockTestImageCtx **local_image_ctx,
const std::string &local_image_name,
const std::string &remote_image_id,
const std::string &global_image_id,
ContextWQ *work_queue, SafeTimer *timer,
Mutex *timer_lock,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
::journal::MockJournalerProxy *journaler,
librbd::journal::MirrorPeerClientMeta *client_meta,
Context *on_finish,
bool *do_resync,
rbd::mirror::ProgressContext *progress_ctx = nullptr) {
static BootstrapRequest* create(
librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx,
rbd::mirror::InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
librbd::MockTestImageCtx **local_image_ctx,
const std::string &local_image_name, const std::string &remote_image_id,
const std::string &global_image_id, ContextWQ *work_queue,
SafeTimer *timer, Mutex *timer_lock, const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
::journal::MockJournalerProxy *journaler,
librbd::journal::MirrorPeerClientMeta *client_meta,
Context *on_finish, bool *do_resync,
rbd::mirror::ProgressContext *progress_ctx = nullptr) {
assert(s_instance != nullptr);
s_instance->image_ctx = local_image_ctx;
s_instance->on_finish = on_finish;
@ -261,6 +257,7 @@ public:
typedef ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
typedef librbd::journal::Replay<librbd::MockTestImageCtx> MockReplay;
typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
void SetUp() override {
TestMockFixture::SetUp();
@ -272,11 +269,8 @@ public:
m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_threads->work_queue,
m_threads->timer,
&m_threads->timer_lock));
m_image_sync_throttler.reset(
new rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>());
m_image_replayer = new MockImageReplayer(
m_threads, m_image_deleter, m_image_sync_throttler,
m_threads, m_image_deleter, &m_instance_watcher,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
"local_mirror_uuid", m_local_io_ctx.get_id(), "global image id");
m_image_replayer->add_remote_image(
@ -455,7 +449,7 @@ public:
librbd::ImageCtx *m_remote_image_ctx;
librbd::ImageCtx *m_local_image_ctx = nullptr;
std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
std::shared_ptr<rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>> m_image_sync_throttler;
MockInstanceWatcher m_instance_watcher;
MockImageReplayer *m_image_replayer;
};

View File

@ -45,6 +45,13 @@ template class rbd::mirror::ImageSync<librbd::MockTestImageCtx>;
namespace rbd {
namespace mirror {
template<>
struct InstanceWatcher<librbd::MockTestImageCtx> {
MOCK_METHOD2(notify_sync_request, void(const std::string, Context *));
MOCK_METHOD1(cancel_sync_request, bool(const std::string &));
MOCK_METHOD1(notify_sync_complete, void(const std::string &));
};
namespace image_sync {
template <>
@ -176,6 +183,7 @@ using ::testing::InvokeWithoutArgs;
class TestMockImageSync : public TestMockFixture {
public:
typedef ImageSync<librbd::MockTestImageCtx> MockImageSync;
typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
typedef image_sync::ImageCopyRequest<librbd::MockTestImageCtx> MockImageCopyRequest;
typedef image_sync::SnapshotCopyRequest<librbd::MockTestImageCtx> MockSnapshotCopyRequest;
typedef image_sync::SyncPointCreateRequest<librbd::MockTestImageCtx> MockSyncPointCreateRequest;
@ -197,6 +205,25 @@ public:
ReturnNew<FunctionContext>([](int) {}));
}
void expect_notify_sync_request(MockInstanceWatcher &mock_instance_watcher,
const std::string &sync_id, int r) {
EXPECT_CALL(mock_instance_watcher, notify_sync_request(sync_id, _))
.WillOnce(Invoke([this, r](const std::string &, Context *on_sync_start) {
m_threads->work_queue->queue(on_sync_start, r);
}));
}
void expect_cancel_sync_request(MockInstanceWatcher &mock_instance_watcher,
const std::string &sync_id, bool canceled) {
EXPECT_CALL(mock_instance_watcher, cancel_sync_request(sync_id))
.WillOnce(Return(canceled));
}
void expect_notify_sync_complete(MockInstanceWatcher &mock_instance_watcher,
const std::string &sync_id) {
EXPECT_CALL(mock_instance_watcher, notify_sync_complete(sync_id));
}
void expect_create_sync_point(librbd::MockTestImageCtx &mock_local_image_ctx,
MockSyncPointCreateRequest &mock_sync_point_create_request,
int r) {
@ -271,11 +298,13 @@ public:
MockImageSync *create_request(librbd::MockTestImageCtx &mock_remote_image_ctx,
librbd::MockTestImageCtx &mock_local_image_ctx,
journal::MockJournaler &mock_journaler,
MockInstanceWatcher &mock_instance_watcher,
Context *ctx) {
return new MockImageSync(&mock_local_image_ctx, &mock_remote_image_ctx,
m_threads->timer, &m_threads->timer_lock,
"mirror-uuid", &mock_journaler, &m_client_meta,
m_threads->work_queue, ctx);
m_threads->work_queue, &mock_instance_watcher,
ctx);
}
librbd::ImageCtx *m_remote_image_ctx;
@ -287,6 +316,7 @@ TEST_F(TestMockImageSync, SimpleSync) {
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
MockInstanceWatcher mock_instance_watcher;
MockImageCopyRequest mock_image_copy_request;
MockSnapshotCopyRequest mock_snapshot_copy_request;
MockSyncPointCreateRequest mock_sync_point_create_request;
@ -300,6 +330,7 @@ TEST_F(TestMockImageSync, SimpleSync) {
expect_test_features(mock_local_image_ctx);
InSequence seq;
expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
expect_copy_snapshots(mock_snapshot_copy_request, 0);
expect_copy_image(mock_image_copy_request, 0);
@ -308,11 +339,12 @@ TEST_F(TestMockImageSync, SimpleSync) {
expect_create_object_map(mock_local_image_ctx, mock_object_map);
expect_open_object_map(mock_local_image_ctx, *mock_object_map);
expect_prune_sync_point(mock_sync_point_prune_request, true, 0);
expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
mock_local_image_ctx,
mock_journaler, &ctx);
mock_local_image_ctx, mock_journaler,
mock_instance_watcher, &ctx);
request->send();
ASSERT_EQ(0, ctx.wait());
}
@ -321,6 +353,7 @@ TEST_F(TestMockImageSync, RestartSync) {
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
MockInstanceWatcher mock_instance_watcher;
MockImageCopyRequest mock_image_copy_request;
MockSnapshotCopyRequest mock_snapshot_copy_request;
MockSyncPointCreateRequest mock_sync_point_create_request;
@ -339,6 +372,7 @@ TEST_F(TestMockImageSync, RestartSync) {
expect_test_features(mock_local_image_ctx);
InSequence seq;
expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
expect_prune_sync_point(mock_sync_point_prune_request, false, 0);
expect_copy_snapshots(mock_snapshot_copy_request, 0);
expect_copy_image(mock_image_copy_request, 0);
@ -347,19 +381,60 @@ TEST_F(TestMockImageSync, RestartSync) {
expect_create_object_map(mock_local_image_ctx, mock_object_map);
expect_open_object_map(mock_local_image_ctx, *mock_object_map);
expect_prune_sync_point(mock_sync_point_prune_request, true, 0);
expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
mock_local_image_ctx,
mock_journaler, &ctx);
mock_local_image_ctx, mock_journaler,
mock_instance_watcher, &ctx);
request->send();
ASSERT_EQ(0, ctx.wait());
}
TEST_F(TestMockImageSync, CancelNotifySyncRequest) {
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
MockInstanceWatcher mock_instance_watcher;
InSequence seq;
Context *on_sync_start = nullptr;
C_SaferCond notify_sync_ctx;
EXPECT_CALL(mock_instance_watcher,
notify_sync_request(mock_local_image_ctx.id, _))
.WillOnce(Invoke([this, &on_sync_start, &notify_sync_ctx](
const std::string &, Context *ctx) {
on_sync_start = ctx;
notify_sync_ctx.complete(0);
}));
EXPECT_CALL(mock_instance_watcher,
cancel_sync_request(mock_local_image_ctx.id))
.WillOnce(Invoke([this, &on_sync_start](const std::string &) {
EXPECT_NE(nullptr, on_sync_start);
on_sync_start->complete(-ECANCELED);
return true;
}));
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
mock_local_image_ctx, mock_journaler,
mock_instance_watcher, &ctx);
request->get();
request->send();
// cancel the notify sync request once it starts
ASSERT_EQ(0, notify_sync_ctx.wait());
request->cancel();
request->put();
ASSERT_EQ(-ECANCELED, ctx.wait());
}
TEST_F(TestMockImageSync, CancelImageCopy) {
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
MockInstanceWatcher mock_instance_watcher;
MockImageCopyRequest mock_image_copy_request;
MockSnapshotCopyRequest mock_snapshot_copy_request;
MockSyncPointCreateRequest mock_sync_point_create_request;
@ -371,6 +446,7 @@ TEST_F(TestMockImageSync, CancelImageCopy) {
m_client_meta.sync_points = {{cls::rbd::UserSnapshotNamespace(), "snap1", boost::none}};
InSequence seq;
expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
expect_prune_sync_point(mock_sync_point_prune_request, false, 0);
expect_copy_snapshots(mock_snapshot_copy_request, 0);
@ -379,12 +455,15 @@ TEST_F(TestMockImageSync, CancelImageCopy) {
.WillOnce(Invoke([&image_copy_ctx]() {
image_copy_ctx.complete(0);
}));
expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id,
false);
EXPECT_CALL(mock_image_copy_request, cancel());
expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
mock_local_image_ctx,
mock_journaler, &ctx);
mock_local_image_ctx, mock_journaler,
mock_instance_watcher, &ctx);
request->get();
request->send();
@ -401,6 +480,7 @@ TEST_F(TestMockImageSync, CancelAfterCopySnapshots) {
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
MockInstanceWatcher mock_instance_watcher;
MockSnapshotCopyRequest mock_snapshot_copy_request;
MockSyncPointCreateRequest mock_sync_point_create_request;
@ -413,9 +493,10 @@ TEST_F(TestMockImageSync, CancelAfterCopySnapshots) {
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
mock_local_image_ctx,
mock_journaler, &ctx);
mock_local_image_ctx, mock_journaler,
mock_instance_watcher, &ctx);
InSequence seq;
expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
EXPECT_CALL(mock_snapshot_copy_request, send())
.WillOnce((DoAll(InvokeWithoutArgs([request]() {
@ -424,7 +505,10 @@ TEST_F(TestMockImageSync, CancelAfterCopySnapshots) {
Invoke([this, &mock_snapshot_copy_request]() {
m_threads->work_queue->queue(mock_snapshot_copy_request.on_finish, 0);
}))));
expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id,
false);
EXPECT_CALL(mock_snapshot_copy_request, cancel());
expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
request->send();
ASSERT_EQ(-ECANCELED, ctx.wait());
@ -434,6 +518,7 @@ TEST_F(TestMockImageSync, CancelAfterCopyImage) {
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
MockInstanceWatcher mock_instance_watcher;
MockImageCopyRequest mock_image_copy_request;
MockSnapshotCopyRequest mock_snapshot_copy_request;
MockSyncPointCreateRequest mock_sync_point_create_request;
@ -448,9 +533,10 @@ TEST_F(TestMockImageSync, CancelAfterCopyImage) {
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
mock_local_image_ctx,
mock_journaler, &ctx);
mock_local_image_ctx, mock_journaler,
mock_instance_watcher, &ctx);
InSequence seq;
expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
expect_copy_snapshots(mock_snapshot_copy_request, 0);
EXPECT_CALL(mock_image_copy_request, send())
@ -460,7 +546,10 @@ TEST_F(TestMockImageSync, CancelAfterCopyImage) {
Invoke([this, &mock_image_copy_request]() {
m_threads->work_queue->queue(mock_image_copy_request.on_finish, 0);
}))));
expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id,
false);
EXPECT_CALL(mock_image_copy_request, cancel());
expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
request->send();
ASSERT_EQ(-ECANCELED, ctx.wait());

View File

@ -13,12 +13,7 @@
*/
#include "test/rbd_mirror/test_mock_fixture.h"
#include "librbd/journal/TypeTraits.h"
#include "test/journal/mock/MockJournaler.h"
#include "test/librbd/mock/MockImageCtx.h"
#include "librbd/ImageState.h"
#include "tools/rbd_mirror/Threads.h"
#include "tools/rbd_mirror/ImageSync.h"
namespace librbd {
@ -32,72 +27,8 @@ struct MockTestImageCtx : public librbd::MockImageCtx {
} // anonymous namespace
namespace journal {
template <>
struct TypeTraits<librbd::MockTestImageCtx> {
typedef ::journal::MockJournaler Journaler;
};
} // namespace journal
} // namespace librbd
namespace rbd {
namespace mirror {
using ::testing::Invoke;
typedef ImageSync<librbd::MockTestImageCtx> MockImageSync;
template<>
class ImageSync<librbd::MockTestImageCtx> {
public:
static std::vector<MockImageSync *> instances;
Context *on_finish;
bool syncing = false;
static ImageSync* create(librbd::MockTestImageCtx *local_image_ctx,
librbd::MockTestImageCtx *remote_image_ctx,
SafeTimer *timer, Mutex *timer_lock,
const std::string &mirror_uuid,
journal::MockJournaler *journaler,
librbd::journal::MirrorPeerClientMeta *client_meta,
ContextWQ *work_queue, Context *on_finish,
ProgressContext *progress_ctx = nullptr) {
ImageSync *sync = new ImageSync();
sync->on_finish = on_finish;
EXPECT_CALL(*sync, send())
.WillRepeatedly(Invoke([sync]() {
sync->syncing = true;
}));
return sync;
}
void finish(int r) {
on_finish->complete(r);
}
void get() {
instances.push_back(this);
}
void put() { delete this; }
MOCK_METHOD0(cancel, void());
MOCK_METHOD0(send, void());
};
std::vector<MockImageSync *> MockImageSync::instances;
} // namespace mirror
} // namespace rbd
// template definitions
#include "tools/rbd_mirror/ImageSyncThrottler.cc"
@ -108,300 +39,144 @@ class TestMockImageSyncThrottler : public TestMockFixture {
public:
typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
void SetUp() override {
TestMockFixture::SetUp();
librbd::RBD rbd;
ASSERT_EQ(0, create_image(rbd, m_remote_io_ctx, m_image_name, m_image_size));
ASSERT_EQ(0, open_image(m_remote_io_ctx, m_image_name, &m_remote_image_ctx));
ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size));
ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx));
mock_sync_throttler = new MockImageSyncThrottler();
m_mock_local_image_ctx = new librbd::MockTestImageCtx(*m_local_image_ctx);
m_mock_remote_image_ctx = new librbd::MockTestImageCtx(*m_remote_image_ctx);
m_mock_journaler = new journal::MockJournaler();
}
void TearDown() override {
MockImageSync::instances.clear();
delete mock_sync_throttler;
delete m_mock_local_image_ctx;
delete m_mock_remote_image_ctx;
delete m_mock_journaler;
TestMockFixture::TearDown();
}
void start_sync(const std::string& image_id, Context *ctx) {
m_mock_local_image_ctx->id = image_id;
mock_sync_throttler->start_sync(m_mock_local_image_ctx,
m_mock_remote_image_ctx,
m_threads->timer,
&m_threads->timer_lock,
"mirror_uuid",
m_mock_journaler,
&m_client_meta,
m_threads->work_queue,
ctx);
}
void cancel(const std::string& mirror_uuid, MockImageSync *sync,
bool running=true) {
if (running) {
EXPECT_CALL(*sync, cancel())
.WillOnce(Invoke([sync]() {
sync->finish(-ECANCELED);
}));
} else {
EXPECT_CALL(*sync, cancel()).Times(0);
}
mock_sync_throttler->cancel_sync(mirror_uuid);
}
librbd::ImageCtx *m_remote_image_ctx;
librbd::ImageCtx *m_local_image_ctx;
librbd::MockTestImageCtx *m_mock_local_image_ctx;
librbd::MockTestImageCtx *m_mock_remote_image_ctx;
journal::MockJournaler *m_mock_journaler;
librbd::journal::MirrorPeerClientMeta m_client_meta;
MockImageSyncThrottler *mock_sync_throttler;
};
TEST_F(TestMockImageSyncThrottler, Single_Sync) {
C_SaferCond ctx;
start_sync("image_id", &ctx);
ASSERT_EQ(1u, MockImageSync::instances.size());
MockImageSync *sync = MockImageSync::instances[0];
ASSERT_EQ(true, sync->syncing);
sync->finish(0);
ASSERT_EQ(0, ctx.wait());
MockImageSyncThrottler throttler;
C_SaferCond on_start;
throttler.start_op("id", &on_start);
ASSERT_EQ(0, on_start.wait());
throttler.finish_op("id");
}
TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) {
mock_sync_throttler->set_max_concurrent_syncs(2);
MockImageSyncThrottler throttler;
throttler.set_max_concurrent_syncs(2);
C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
C_SaferCond ctx2;
start_sync("image_id_2", &ctx2);
C_SaferCond ctx3;
start_sync("image_id_3", &ctx3);
C_SaferCond ctx4;
start_sync("image_id_4", &ctx4);
C_SaferCond on_start1;
throttler.start_op("id1", &on_start1);
C_SaferCond on_start2;
throttler.start_op("id2", &on_start2);
C_SaferCond on_start3;
throttler.start_op("id3", &on_start3);
C_SaferCond on_start4;
throttler.start_op("id4", &on_start4);
ASSERT_EQ(4u, MockImageSync::instances.size());
MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_TRUE(sync1->syncing);
MockImageSync *sync2 = MockImageSync::instances[1];
ASSERT_TRUE(sync2->syncing);
MockImageSync *sync3 = MockImageSync::instances[2];
ASSERT_FALSE(sync3->syncing);
MockImageSync *sync4 = MockImageSync::instances[3];
ASSERT_FALSE(sync4->syncing);
sync1->finish(0);
ASSERT_EQ(0, ctx1.wait());
ASSERT_TRUE(sync3->syncing);
sync3->finish(-EINVAL);
ASSERT_EQ(-EINVAL, ctx3.wait());
ASSERT_TRUE(sync4->syncing);
sync2->finish(0);
ASSERT_EQ(0, ctx2.wait());
sync4->finish(0);
ASSERT_EQ(0, ctx4.wait());
ASSERT_EQ(0, on_start2.wait());
throttler.finish_op("id2");
ASSERT_EQ(0, on_start3.wait());
throttler.finish_op("id3");
ASSERT_EQ(0, on_start1.wait());
throttler.finish_op("id1");
ASSERT_EQ(0, on_start4.wait());
throttler.finish_op("id4");
}
TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync) {
C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
C_SaferCond ctx2;
start_sync("image_id_2", &ctx2);
ASSERT_EQ(2u, MockImageSync::instances.size());
MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_TRUE(sync1->syncing);
MockImageSync *sync2 = MockImageSync::instances[1];
ASSERT_TRUE(sync2->syncing);
cancel("image_id_2", sync2);
ASSERT_EQ(-ECANCELED, ctx2.wait());
sync1->finish(0);
ASSERT_EQ(0, ctx1.wait());
MockImageSyncThrottler throttler;
C_SaferCond on_start;
throttler.start_op("id", &on_start);
ASSERT_EQ(0, on_start.wait());
ASSERT_FALSE(throttler.cancel_op("id"));
throttler.finish_op("id");
}
TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) {
mock_sync_throttler->set_max_concurrent_syncs(1);
MockImageSyncThrottler throttler;
throttler.set_max_concurrent_syncs(1);
C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
C_SaferCond ctx2;
start_sync("image_id_2", &ctx2);
C_SaferCond on_start1;
throttler.start_op("id1", &on_start1);
C_SaferCond on_start2;
throttler.start_op("id2", &on_start2);
ASSERT_EQ(2u, MockImageSync::instances.size());
MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_TRUE(sync1->syncing);
MockImageSync *sync2 = MockImageSync::instances[1];
ASSERT_FALSE(sync2->syncing);
cancel("image_id_2", sync2, false);
ASSERT_EQ(-ECANCELED, ctx2.wait());
sync1->finish(0);
ASSERT_EQ(0, ctx1.wait());
ASSERT_EQ(0, on_start1.wait());
ASSERT_TRUE(throttler.cancel_op("id2"));
ASSERT_EQ(-ECANCELED, on_start2.wait());
throttler.finish_op("id1");
}
TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
mock_sync_throttler->set_max_concurrent_syncs(1);
MockImageSyncThrottler throttler;
throttler.set_max_concurrent_syncs(1);
C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
C_SaferCond ctx2;
start_sync("image_id_2", &ctx2);
C_SaferCond on_start1;
throttler.start_op("id1", &on_start1);
C_SaferCond on_start2;
throttler.start_op("id2", &on_start2);
ASSERT_EQ(2u, MockImageSync::instances.size());
MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_TRUE(sync1->syncing);
MockImageSync *sync2 = MockImageSync::instances[1];
ASSERT_FALSE(sync2->syncing);
cancel("image_id_1", sync1);
ASSERT_EQ(-ECANCELED, ctx1.wait());
ASSERT_TRUE(sync2->syncing);
sync2->finish(0);
ASSERT_EQ(0, ctx2.wait());
ASSERT_EQ(0, on_start1.wait());
ASSERT_FALSE(throttler.cancel_op("id1"));
throttler.finish_op("id1");
ASSERT_EQ(0, on_start2.wait());
throttler.finish_op("id2");
}
TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
mock_sync_throttler->set_max_concurrent_syncs(2);
MockImageSyncThrottler throttler;
throttler.set_max_concurrent_syncs(2);
C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
C_SaferCond ctx2;
start_sync("image_id_2", &ctx2);
C_SaferCond ctx3;
start_sync("image_id_3", &ctx3);
C_SaferCond ctx4;
start_sync("image_id_4", &ctx4);
C_SaferCond ctx5;
start_sync("image_id_5", &ctx5);
C_SaferCond on_start1;
throttler.start_op("id1", &on_start1);
C_SaferCond on_start2;
throttler.start_op("id2", &on_start2);
C_SaferCond on_start3;
throttler.start_op("id3", &on_start3);
C_SaferCond on_start4;
throttler.start_op("id4", &on_start4);
C_SaferCond on_start5;
throttler.start_op("id5", &on_start5);
ASSERT_EQ(5u, MockImageSync::instances.size());
ASSERT_EQ(0, on_start1.wait());
ASSERT_EQ(0, on_start2.wait());
MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_TRUE(sync1->syncing);
throttler.set_max_concurrent_syncs(4);
MockImageSync *sync2 = MockImageSync::instances[1];
ASSERT_TRUE(sync2->syncing);
ASSERT_EQ(0, on_start3.wait());
ASSERT_EQ(0, on_start4.wait());
MockImageSync *sync3 = MockImageSync::instances[2];
ASSERT_FALSE(sync3->syncing);
throttler.finish_op("id4");
ASSERT_EQ(0, on_start5.wait());
MockImageSync *sync4 = MockImageSync::instances[3];
ASSERT_FALSE(sync4->syncing);
MockImageSync *sync5 = MockImageSync::instances[4];
ASSERT_FALSE(sync5->syncing);
mock_sync_throttler->set_max_concurrent_syncs(4);
ASSERT_TRUE(sync3->syncing);
ASSERT_TRUE(sync4->syncing);
ASSERT_FALSE(sync5->syncing);
sync1->finish(0);
ASSERT_EQ(0, ctx1.wait());
ASSERT_TRUE(sync5->syncing);
sync5->finish(-EINVAL);
ASSERT_EQ(-EINVAL, ctx5.wait());
sync2->finish(0);
ASSERT_EQ(0, ctx2.wait());
sync3->finish(0);
ASSERT_EQ(0, ctx3.wait());
sync4->finish(0);
ASSERT_EQ(0, ctx4.wait());
throttler.finish_op("id1");
throttler.finish_op("id2");
throttler.finish_op("id3");
throttler.finish_op("id5");
}
TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) {
mock_sync_throttler->set_max_concurrent_syncs(4);
MockImageSyncThrottler throttler;
throttler.set_max_concurrent_syncs(4);
C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
C_SaferCond ctx2;
start_sync("image_id_2", &ctx2);
C_SaferCond ctx3;
start_sync("image_id_3", &ctx3);
C_SaferCond ctx4;
start_sync("image_id_4", &ctx4);
C_SaferCond ctx5;
start_sync("image_id_5", &ctx5);
C_SaferCond on_start1;
throttler.start_op("id1", &on_start1);
C_SaferCond on_start2;
throttler.start_op("id2", &on_start2);
C_SaferCond on_start3;
throttler.start_op("id3", &on_start3);
C_SaferCond on_start4;
throttler.start_op("id4", &on_start4);
C_SaferCond on_start5;
throttler.start_op("id5", &on_start5);
ASSERT_EQ(5u, MockImageSync::instances.size());
ASSERT_EQ(0, on_start1.wait());
ASSERT_EQ(0, on_start2.wait());
ASSERT_EQ(0, on_start3.wait());
ASSERT_EQ(0, on_start4.wait());
MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_TRUE(sync1->syncing);
throttler.set_max_concurrent_syncs(2);
MockImageSync *sync2 = MockImageSync::instances[1];
ASSERT_TRUE(sync2->syncing);
throttler.finish_op("id1");
throttler.finish_op("id2");
throttler.finish_op("id3");
MockImageSync *sync3 = MockImageSync::instances[2];
ASSERT_TRUE(sync3->syncing);
ASSERT_EQ(0, on_start5.wait());
MockImageSync *sync4 = MockImageSync::instances[3];
ASSERT_TRUE(sync4->syncing);
MockImageSync *sync5 = MockImageSync::instances[4];
ASSERT_FALSE(sync5->syncing);
mock_sync_throttler->set_max_concurrent_syncs(2);
ASSERT_FALSE(sync5->syncing);
sync1->finish(0);
ASSERT_EQ(0, ctx1.wait());
ASSERT_FALSE(sync5->syncing);
sync2->finish(0);
ASSERT_EQ(0, ctx2.wait());
ASSERT_FALSE(sync5->syncing);
sync3->finish(0);
ASSERT_EQ(0, ctx3.wait());
ASSERT_TRUE(sync5->syncing);
sync4->finish(0);
ASSERT_EQ(0, ctx4.wait());
sync5->finish(0);
ASSERT_EQ(0, ctx5.wait());
throttler.finish_op("id4");
throttler.finish_op("id5");
}
} // namespace mirror
} // namespace rbd

View File

@ -4,7 +4,7 @@
#include "test/librbd/mock/MockImageCtx.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/InstanceReplayer.h"
#include "tools/rbd_mirror/Threads.h"
@ -37,6 +37,10 @@ struct Threads<librbd::MockTestImageCtx> {
}
};
template<>
struct InstanceWatcher<librbd::MockTestImageCtx> {
};
template<>
struct ImageReplayer<librbd::MockTestImageCtx> {
static ImageReplayer* s_instance;
@ -45,7 +49,7 @@ struct ImageReplayer<librbd::MockTestImageCtx> {
static ImageReplayer *create(
Threads<librbd::MockTestImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<librbd::MockTestImageCtx> image_sync_throttler,
InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
const std::string &global_image_id) {
assert(s_instance != nullptr);
@ -83,14 +87,6 @@ struct ImageReplayer<librbd::MockTestImageCtx> {
MOCK_METHOD0(is_blacklisted, bool());
};
template<>
struct ImageSyncThrottler<librbd::MockTestImageCtx> {
ImageSyncThrottler() {
}
virtual ~ImageSyncThrottler() {
}
};
ImageReplayer<librbd::MockTestImageCtx>* ImageReplayer<librbd::MockTestImageCtx>::s_instance = nullptr;
} // namespace mirror
@ -112,6 +108,7 @@ class TestMockInstanceReplayer : public TestMockFixture {
public:
typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
typedef Threads<librbd::MockTestImageCtx> MockThreads;
void SetUp() override {
@ -122,8 +119,6 @@ public:
m_image_deleter.reset(
new rbd::mirror::ImageDeleter(m_threads->work_queue, m_threads->timer,
&m_threads->timer_lock));
m_image_sync_throttler.reset(
new rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>());
}
void TearDown() override {
@ -133,14 +128,13 @@ public:
MockThreads *m_mock_threads;
std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
std::shared_ptr<rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>>
m_image_sync_throttler;
};
TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
MockInstanceWatcher mock_instance_watcher;
MockImageReplayer mock_image_replayer;
MockInstanceReplayer instance_replayer(
m_mock_threads, m_image_deleter, m_image_sync_throttler,
m_mock_threads, m_image_deleter,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
"local_mirror_uuid", m_local_io_ctx.get_id());
@ -166,8 +160,9 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
.WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, start(nullptr, false));
instance_replayer.acquire_image(global_image_id, "remote_mirror_uuid",
"remote_image_id", &on_acquire);
instance_replayer.acquire_image(&mock_instance_watcher, global_image_id,
"remote_mirror_uuid", "remote_image_id",
&on_acquire);
ASSERT_EQ(0, on_acquire.wait());
// Release

View File

@ -9,6 +9,7 @@
#include "test/librbd/mock/MockImageCtx.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "tools/rbd_mirror/InstanceReplayer.h"
#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
@ -75,12 +76,40 @@ struct Threads<librbd::MockTestImageCtx> {
template <>
struct InstanceReplayer<librbd::MockTestImageCtx> {
MOCK_METHOD4(acquire_image, void(const std::string &, const std::string &,
MOCK_METHOD5(acquire_image, void(InstanceWatcher<librbd::MockTestImageCtx> *,
const std::string &, const std::string &,
const std::string &, Context *));
MOCK_METHOD5(release_image, void(const std::string &, const std::string &,
const std::string &, bool, Context *));
};
template <>
struct ImageSyncThrottler<librbd::MockTestImageCtx> {
static ImageSyncThrottler* s_instance;
static ImageSyncThrottler *create() {
assert(s_instance != nullptr);
return s_instance;
}
ImageSyncThrottler() {
assert(s_instance == nullptr);
s_instance = this;
}
virtual ~ImageSyncThrottler() {
assert(s_instance == this);
s_instance = nullptr;
}
MOCK_METHOD0(destroy, void());
MOCK_METHOD1(drain, void(int));
MOCK_METHOD2(start_op, void(const std::string &, Context *));
MOCK_METHOD1(finish_op, void(const std::string &));
};
ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
} // namespace mirror
} // namespace rbd
@ -335,16 +364,18 @@ TEST_F(TestMockInstanceWatcher, ImageAcquireRelease) {
ASSERT_EQ(0, instance_watcher2->init());
// Acquire Image on the the same instance
EXPECT_CALL(mock_instance_replayer1, acquire_image("gid", "uuid", "id", _))
.WillOnce(WithArg<3>(CompleteContext(0)));
EXPECT_CALL(mock_instance_replayer1, acquire_image(instance_watcher1, "gid",
"uuid", "id", _))
.WillOnce(WithArg<4>(CompleteContext(0)));
C_SaferCond on_acquire1;
instance_watcher1->notify_image_acquire(instance_id1, "gid", "uuid", "id",
&on_acquire1);
ASSERT_EQ(0, on_acquire1.wait());
// Acquire Image on the other instance
EXPECT_CALL(mock_instance_replayer2, acquire_image("gid", "uuid", "id", _))
.WillOnce(WithArg<3>(CompleteContext(0)));
EXPECT_CALL(mock_instance_replayer2, acquire_image(instance_watcher2, "gid",
"uuid", "id", _))
.WillOnce(WithArg<4>(CompleteContext(0)));
C_SaferCond on_acquire2;
instance_watcher1->notify_image_acquire(instance_id2, "gid", "uuid", "id",
&on_acquire2);
@ -455,5 +486,333 @@ TEST_F(TestMockInstanceWatcher, ImageAcquireReleaseCancel) {
delete instance_watcher;
}
class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher {
public:
typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
MockManagedLock mock_managed_lock;
MockImageSyncThrottler mock_image_sync_throttler;
std::string instance_id1;
std::string instance_id2;
librados::Rados cluster;
librados::IoCtx io_ctx2;
MockInstanceWatcher *instance_watcher1;
MockInstanceWatcher *instance_watcher2;
void SetUp() override {
TestMockInstanceWatcher::SetUp();
instance_id1 = m_instance_id;
librados::IoCtx& io_ctx1 = m_local_io_ctx;
librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
instance_watcher1 = MockInstanceWatcher::create(io_ctx1,
m_mock_threads->work_queue,
nullptr);
EXPECT_EQ("", connect_cluster_pp(cluster));
EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
instance_id2 = stringify(io_ctx2.get_instance_id());
librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
instance_watcher2 = MockInstanceWatcher::create(io_ctx2,
m_mock_threads->work_queue,
nullptr);
InSequence seq;
// Init instance watcher 1 (leader)
expect_register_instance(mock_io_ctx1, 0);
expect_register_watch(mock_io_ctx1, instance_id1);
expect_acquire_lock(mock_managed_lock, 0);
EXPECT_EQ(0, instance_watcher1->init());
instance_watcher1->handle_acquire_leader();
// Init instance watcher 2
expect_register_instance(mock_io_ctx2, 0);
expect_register_watch(mock_io_ctx2, instance_id2);
expect_acquire_lock(mock_managed_lock, 0);
EXPECT_EQ(0, instance_watcher2->init());
instance_watcher2->handle_update_leader(instance_id1);
}
void TearDown() override {
librados::IoCtx& io_ctx1 = m_local_io_ctx;
librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
InSequence seq;
expect_throttler_destroy();
instance_watcher1->handle_release_leader();
// Shutdown instance watcher 1
expect_release_lock(mock_managed_lock, 0);
expect_unregister_watch(mock_io_ctx1);
expect_unregister_instance(mock_io_ctx1, 0);
instance_watcher1->shut_down();
expect_destroy_lock(mock_managed_lock);
delete instance_watcher1;
// Shutdown instance watcher 2
expect_release_lock(mock_managed_lock, 0);
expect_unregister_watch(mock_io_ctx2);
expect_unregister_instance(mock_io_ctx2, 0);
instance_watcher2->shut_down();
expect_destroy_lock(mock_managed_lock);
delete instance_watcher2;
TestMockInstanceWatcher::TearDown();
}
void expect_throttler_destroy(
std::vector<Context *> *throttler_queue = nullptr) {
EXPECT_CALL(mock_image_sync_throttler, drain(-ESTALE))
.WillOnce(Invoke([throttler_queue] (int r) {
if (throttler_queue != nullptr) {
for (auto ctx : *throttler_queue) {
ctx->complete(r);
}
}
}));
EXPECT_CALL(mock_image_sync_throttler, destroy());
}
void expect_throttler_start_op(const std::string &sync_id,
Context *on_call = nullptr,
Context **on_start_ctx = nullptr) {
EXPECT_CALL(mock_image_sync_throttler, start_op(sync_id, _))
.WillOnce(Invoke([on_call, on_start_ctx] (const std::string &,
Context *ctx) {
if (on_call != nullptr) {
on_call->complete(0);
}
if (on_start_ctx != nullptr) {
*on_start_ctx = ctx;
} else {
ctx->complete(0);
}
}));
}
void expect_throttler_finish_op(const std::string &sync_id,
Context *on_finish) {
EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
.WillOnce(Invoke([on_finish](const std::string &) {
on_finish->complete(0);
}));
}
};
TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) {
InSequence seq;
expect_throttler_start_op("sync_id");
C_SaferCond on_start;
instance_watcher1->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start.wait());
C_SaferCond on_finish;
expect_throttler_finish_op("sync_id", &on_finish);
instance_watcher1->notify_sync_complete("sync_id");
ASSERT_EQ(0, on_finish.wait());
}
TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnLeader) {
InSequence seq;
expect_throttler_start_op("sync_id");
C_SaferCond on_start;
instance_watcher1->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start.wait());
ASSERT_FALSE(instance_watcher1->cancel_sync_request("sync_id"));
C_SaferCond on_finish;
expect_throttler_finish_op("sync_id", &on_finish);
instance_watcher1->notify_sync_complete("sync_id");
ASSERT_EQ(0, on_finish.wait());
}
TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnNonLeader) {
InSequence seq;
expect_throttler_start_op("sync_id");
C_SaferCond on_start;
instance_watcher2->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start.wait());
C_SaferCond on_finish;
expect_throttler_finish_op("sync_id", &on_finish);
instance_watcher2->notify_sync_complete("sync_id");
ASSERT_EQ(0, on_finish.wait());
}
TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnNonLeader) {
InSequence seq;
expect_throttler_start_op("sync_id");
C_SaferCond on_start;
instance_watcher2->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start.wait());
ASSERT_FALSE(instance_watcher2->cancel_sync_request("sync_id"));
C_SaferCond on_finish;
expect_throttler_finish_op("sync_id", &on_finish);
instance_watcher2->notify_sync_complete("sync_id");
ASSERT_EQ(0, on_finish.wait());
}
TEST_F(TestMockInstanceWatcher_NotifySync, CancelWaitingOnNonLeader) {
InSequence seq;
C_SaferCond on_start_op_called;
Context *on_start_ctx;
expect_throttler_start_op("sync_id", &on_start_op_called,
&on_start_ctx);
C_SaferCond on_start;
instance_watcher2->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start_op_called.wait());
ASSERT_TRUE(instance_watcher2->cancel_sync_request("sync_id"));
// emulate watcher timeout
on_start_ctx->complete(-ETIMEDOUT);
ASSERT_EQ(-ECANCELED, on_start.wait());
}
TEST_F(TestMockInstanceWatcher_NotifySync, InFlightPrevNotification) {
// start sync when previous notification is still in flight
InSequence seq;
expect_throttler_start_op("sync_id");
C_SaferCond on_start1;
instance_watcher2->notify_sync_request("sync_id", &on_start1);
ASSERT_EQ(0, on_start1.wait());
C_SaferCond on_start2;
EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
.WillOnce(Invoke([this, &on_start2](const std::string &) {
instance_watcher2->notify_sync_request("sync_id", &on_start2);
}));
expect_throttler_start_op("sync_id");
instance_watcher2->notify_sync_complete("sync_id");
ASSERT_EQ(0, on_start2.wait());
C_SaferCond on_finish;
expect_throttler_finish_op("sync_id", &on_finish);
instance_watcher2->notify_sync_complete("sync_id");
ASSERT_EQ(0, on_finish.wait());
}
TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) {
InSequence seq;
expect_throttler_destroy();
instance_watcher1->handle_release_leader();
instance_watcher1->handle_acquire_leader();
}
TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) {
InSequence seq;
expect_throttler_destroy();
instance_watcher1->handle_release_leader();
instance_watcher2->handle_acquire_leader();
expect_throttler_start_op("sync_id");
C_SaferCond on_start;
instance_watcher2->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start.wait());
expect_throttler_destroy();
instance_watcher2->handle_release_leader();
instance_watcher2->notify_sync_complete("sync_id");
instance_watcher1->handle_acquire_leader();
}
TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) {
InSequence seq;
C_SaferCond on_start_op_called;
Context *on_start_ctx;
expect_throttler_start_op("sync_id", &on_start_op_called,
&on_start_ctx);
C_SaferCond on_start;
instance_watcher1->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start_op_called.wait());
std::vector<Context *> throttler_queue = {on_start_ctx};
expect_throttler_destroy(&throttler_queue);
instance_watcher1->handle_release_leader();
instance_watcher2->handle_acquire_leader();
instance_watcher1->handle_update_leader(instance_id2);
expect_throttler_start_op("sync_id");
ASSERT_EQ(0, on_start.wait());
C_SaferCond on_finish;
expect_throttler_finish_op("sync_id", &on_finish);
instance_watcher1->notify_sync_complete("sync_id");
ASSERT_EQ(0, on_finish.wait());
expect_throttler_destroy();
instance_watcher2->handle_release_leader();
instance_watcher1->handle_acquire_leader();
}
TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) {
InSequence seq;
expect_throttler_destroy();
instance_watcher1->handle_release_leader();
instance_watcher2->handle_acquire_leader();
instance_watcher1->handle_update_leader(instance_id2);
expect_throttler_start_op("sync_id");
C_SaferCond on_start;
instance_watcher1->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start.wait());
expect_throttler_destroy();
instance_watcher2->handle_release_leader();
instance_watcher1->handle_acquire_leader();
instance_watcher2->handle_update_leader(instance_id2);
instance_watcher1->notify_sync_complete("sync_id");
}
TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnNonLeaderAcquireLeader) {
InSequence seq;
C_SaferCond on_start_op_called;
Context *on_start_ctx;
expect_throttler_start_op("sync_id", &on_start_op_called,
&on_start_ctx);
C_SaferCond on_start;
instance_watcher2->notify_sync_request("sync_id", &on_start);
ASSERT_EQ(0, on_start_op_called.wait());
std::vector<Context *> throttler_queue = {on_start_ctx};
expect_throttler_destroy(&throttler_queue);
instance_watcher1->handle_release_leader();
EXPECT_CALL(mock_image_sync_throttler, start_op("sync_id", _))
.WillOnce(WithArg<1>(CompleteContext(0)));
instance_watcher2->handle_acquire_leader();
instance_watcher1->handle_update_leader(instance_id2);
ASSERT_EQ(0, on_start.wait());
C_SaferCond on_finish;
expect_throttler_finish_op("sync_id", &on_finish);
instance_watcher2->notify_sync_complete("sync_id");
ASSERT_EQ(0, on_finish.wait());
expect_throttler_destroy();
instance_watcher2->handle_release_leader();
instance_watcher1->handle_acquire_leader();
}
} // namespace mirror
} // namespace rbd

View File

@ -50,6 +50,7 @@ struct MockManagedLock {
MOCK_CONST_METHOD0(is_shutdown, bool());
MOCK_CONST_METHOD0(is_state_post_acquiring, bool());
MOCK_CONST_METHOD0(is_state_pre_releasing, bool());
MOCK_CONST_METHOD0(is_state_locked, bool());
};
@ -147,6 +148,10 @@ struct ManagedLock<MockTestImageCtx> {
return MockManagedLock::get_instance().is_state_post_acquiring();
}
bool is_state_pre_releasing() const {
return MockManagedLock::get_instance().is_state_pre_releasing();
}
bool is_state_locked() const {
return MockManagedLock::get_instance().is_state_locked();
}
@ -264,6 +269,8 @@ struct MockListener : public LeaderWatcher<librbd::MockTestImageCtx>::Listener {
MOCK_METHOD1(post_acquire_handler, void(Context *));
MOCK_METHOD1(pre_release_handler, void(Context *));
MOCK_METHOD1(update_leader_handler, void(const std::string &));
};
MockListener *MockListener::s_instance = nullptr;
@ -371,6 +378,8 @@ public:
.Times(AtLeast(0)).WillRepeatedly(Return(false));
EXPECT_CALL(mock_managed_lock, is_state_locked())
.Times(AtLeast(0)).WillRepeatedly(Return(false));
EXPECT_CALL(mock_managed_lock, is_state_pre_releasing())
.Times(AtLeast(0)).WillRepeatedly(Return(false));
}
void expect_notify_heartbeat(MockManagedLock &mock_managed_lock,
@ -627,6 +636,7 @@ TEST_F(TestMockLeaderWatcher, Break) {
expect_is_shutdown(mock_managed_lock);
expect_is_leader(mock_managed_lock);
expect_destroy(mock_managed_lock);
EXPECT_CALL(listener, update_leader_handler(_));
InSequence seq;

View File

@ -20,7 +20,7 @@ public:
virtual void cancel() {}
protected:
void finish(int r) {
virtual void finish(int r) {
if (m_cct) {
lsubdout(m_cct, rbd_mirror, 20) << m_name << "::finish: r=" << r << dendl;
}

View File

@ -21,7 +21,6 @@
#include "librbd/Utils.h"
#include "librbd/journal/Replay.h"
#include "ImageReplayer.h"
#include "ImageSync.h"
#include "Threads.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
@ -273,14 +272,14 @@ void ImageReplayer<I>::RemoteJournalerListener::handle_update(
template <typename I>
ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<I> image_sync_throttler,
InstanceWatcher<I> *instance_watcher,
RadosRef local,
const std::string &local_mirror_uuid,
int64_t local_pool_id,
const std::string &global_image_id) :
m_threads(threads),
m_image_deleter(image_deleter),
m_image_sync_throttler(image_sync_throttler),
m_instance_watcher(instance_watcher),
m_local(local),
m_local_mirror_uuid(local_mirror_uuid),
m_local_pool_id(local_pool_id),
@ -469,7 +468,7 @@ void ImageReplayer<I>::bootstrap() {
ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
BootstrapRequest<I> *request = BootstrapRequest<I>::create(
m_local_ioctx, m_remote_image.io_ctx, m_image_sync_throttler,
m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
&m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
m_global_image_id, m_threads->work_queue, m_threads->timer,
&m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,

View File

@ -47,6 +47,7 @@ namespace journal { template <typename> class Replay; }
namespace rbd {
namespace mirror {
template <typename> struct InstanceWatcher;
template <typename> struct Threads;
namespace image_replayer { template <typename> class BootstrapRequest; }
@ -73,10 +74,10 @@ public:
static ImageReplayer *create(
Threads<librbd::ImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
InstanceWatcher<ImageCtxT> *instance_watcher,
RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
const std::string &global_image_id) {
return new ImageReplayer(threads, image_deleter, image_sync_throttler,
return new ImageReplayer(threads, image_deleter, instance_watcher,
local, local_mirror_uuid, local_pool_id,
global_image_id);
}
@ -86,7 +87,7 @@ public:
ImageReplayer(Threads<librbd::ImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
InstanceWatcher<ImageCtxT> *instance_watcher,
RadosRef local, const std::string &local_mirror_uuid,
int64_t local_pool_id, const std::string &global_image_id);
virtual ~ImageReplayer();
@ -283,7 +284,7 @@ private:
Threads<librbd::ImageCtx> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
InstanceWatcher<ImageCtxT> *m_instance_watcher;
RemoteImages m_remote_images;
RemoteImage m_remote_image;

View File

@ -2,6 +2,7 @@
// vim: ts=8 sw=2 smarttab
#include "ImageSync.h"
#include "InstanceWatcher.h"
#include "ProgressContext.h"
#include "common/errno.h"
#include "journal/Journaler.h"
@ -33,13 +34,15 @@ ImageSync<I>::ImageSync(I *local_image_ctx, I *remote_image_ctx,
SafeTimer *timer, Mutex *timer_lock,
const std::string &mirror_uuid, Journaler *journaler,
MirrorPeerClientMeta *client_meta,
ContextWQ *work_queue, Context *on_finish,
ProgressContext *progress_ctx)
ContextWQ *work_queue,
InstanceWatcher<I> *instance_watcher,
Context *on_finish, ProgressContext *progress_ctx)
: BaseRequest("rbd::mirror::ImageSync", local_image_ctx->cct, on_finish),
m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
m_timer(timer), m_timer_lock(timer_lock), m_mirror_uuid(mirror_uuid),
m_journaler(journaler), m_client_meta(client_meta),
m_work_queue(work_queue), m_progress_ctx(progress_ctx),
m_work_queue(work_queue), m_instance_watcher(instance_watcher),
m_progress_ctx(progress_ctx),
m_lock(unique_lock_name("ImageSync::m_lock", this)) {
}
@ -51,7 +54,7 @@ ImageSync<I>::~ImageSync() {
template <typename I>
void ImageSync<I>::send() {
send_prune_catch_up_sync_point();
send_notify_sync_request();
}
template <typename I>
@ -62,6 +65,10 @@ void ImageSync<I>::cancel() {
m_canceled = true;
if (m_instance_watcher->cancel_sync_request(m_local_image_ctx->id)) {
return;
}
if (m_snapshot_copy_request != nullptr) {
m_snapshot_copy_request->cancel();
}
@ -71,6 +78,29 @@ void ImageSync<I>::cancel() {
}
}
template <typename I>
void ImageSync<I>::send_notify_sync_request() {
update_progress("NOTIFY_SYNC_REQUEST");
dout(20) << dendl;
Context *ctx = create_context_callback<
ImageSync<I>, &ImageSync<I>::handle_notify_sync_request>(this);
m_instance_watcher->notify_sync_request(m_local_image_ctx->id, ctx);
}
template <typename I>
void ImageSync<I>::handle_notify_sync_request(int r) {
dout(20) << ": r=" << r << dendl;
if (r < 0) {
BaseRequest::finish(r);
return;
}
send_prune_catch_up_sync_point();
}
template <typename I>
void ImageSync<I>::send_prune_catch_up_sync_point() {
update_progress("PRUNE_CATCH_UP_SYNC_POINT");
@ -366,6 +396,14 @@ void ImageSync<I>::update_progress(const std::string &description) {
}
}
template <typename I>
void ImageSync<I>::finish(int r) {
dout(20) << ": r=" << r << dendl;
m_instance_watcher->notify_sync_complete(m_local_image_ctx->id);
BaseRequest::finish(r);
}
} // namespace mirror
} // namespace rbd

View File

@ -24,6 +24,8 @@ namespace mirror {
class ProgressContext;
template <typename> class InstanceWatcher;
namespace image_sync { template <typename> class ImageCopyRequest; }
namespace image_sync { template <typename> class SnapshotCopyRequest; }
@ -39,23 +41,28 @@ public:
Mutex *timer_lock, const std::string &mirror_uuid,
Journaler *journaler,
MirrorPeerClientMeta *client_meta,
ContextWQ *work_queue, Context *on_finish,
ProgressContext *progress_ctx = nullptr) {
ContextWQ *work_queue,
InstanceWatcher<ImageCtxT> *instance_watcher,
Context *on_finish,
ProgressContext *progress_ctx = nullptr) {
return new ImageSync(local_image_ctx, remote_image_ctx, timer, timer_lock,
mirror_uuid, journaler, client_meta, work_queue,
on_finish, progress_ctx);
instance_watcher, on_finish, progress_ctx);
}
ImageSync(ImageCtxT *local_image_ctx, ImageCtxT *remote_image_ctx,
SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid,
Journaler *journaler, MirrorPeerClientMeta *client_meta,
ContextWQ *work_queue, Context *on_finish,
ProgressContext *progress_ctx = nullptr);
ContextWQ *work_queue, InstanceWatcher<ImageCtxT> *instance_watcher,
Context *on_finish, ProgressContext *progress_ctx = nullptr);
~ImageSync() override;
void send() override;
void cancel() override;
protected:
void finish(int r) override;
private:
/**
* @verbatim
@ -63,6 +70,9 @@ private:
* <start>
* |
* v
* NOTIFY_SYNC_REQUEST
* |
* v
* PRUNE_CATCH_UP_SYNC_POINT
* |
* v
@ -100,6 +110,7 @@ private:
Journaler *m_journaler;
MirrorPeerClientMeta *m_client_meta;
ContextWQ *m_work_queue;
InstanceWatcher<ImageCtxT> *m_instance_watcher;
ProgressContext *m_progress_ctx;
SnapMap m_snap_map;
@ -111,6 +122,9 @@ private:
image_sync::ImageCopyRequest<ImageCtxT> *m_image_copy_request = nullptr;
decltype(ImageCtxT::object_map) m_object_map = nullptr;
void send_notify_sync_request();
void handle_notify_sync_request(int r);
void send_prune_catch_up_sync_point();
void handle_prune_catch_up_sync_point(int r);

View File

@ -13,8 +13,9 @@
*/
#include "ImageSyncThrottler.h"
#include "ImageSync.h"
#include "common/ceph_context.h"
#include "common/Formatter.h"
#include "common/debug.h"
#include "common/errno.h"
#include "librbd/Utils.h"
#define dout_context g_ceph_context
@ -22,227 +23,172 @@
#undef dout_prefix
#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
<< " " << __func__ << ": "
using std::unique_ptr;
using std::string;
using std::set;
namespace rbd {
namespace mirror {
template <typename ImageCtxT>
struct ImageSyncThrottler<ImageCtxT>::C_SyncHolder : public Context {
ImageSyncThrottler<ImageCtxT> *m_sync_throttler;
std::string m_local_image_id;
ImageSync<ImageCtxT> *m_sync = nullptr;
Context *m_on_finish;
C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler,
const std::string &local_image_id, Context *on_finish)
: m_sync_throttler(sync_throttler),
m_local_image_id(local_image_id), m_on_finish(on_finish) {
}
void finish(int r) override {
m_sync->put();
m_sync_throttler->handle_sync_finished(this);
m_on_finish->complete(r);
}
};
template <typename I>
ImageSyncThrottler<I>::ImageSyncThrottler()
: m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs),
m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", this))
{
dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs
<< dendl;
: m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
this)),
m_max_concurrent_syncs(
g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs) {
dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl;
g_ceph_context->_conf->add_observer(this);
}
template <typename I>
ImageSyncThrottler<I>::~ImageSyncThrottler() {
{
Mutex::Locker l(m_lock);
assert(m_sync_queue.empty());
assert(m_inflight_syncs.empty());
}
g_ceph_context->_conf->remove_observer(this);
Mutex::Locker locker(m_lock);
assert(m_inflight_ops.empty());
assert(m_queue.empty());
}
template <typename I>
void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx,
SafeTimer *timer, Mutex *timer_lock,
const std::string &mirror_uuid,
Journaler *journaler,
MirrorPeerClientMeta *client_meta,
ContextWQ *work_queue,
Context *on_finish,
ProgressContext *progress_ctx) {
dout(20) << dendl;
void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
dout(20) << "id=" << id << dendl;
C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, local_image_ctx->id,
on_finish);
sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx,
remote_image_ctx, timer,
timer_lock, mirror_uuid,
journaler, client_meta,
work_queue, sync_holder_ctx,
progress_ctx);
sync_holder_ctx->m_sync->get();
bool start = false;
{
Mutex::Locker l(m_lock);
Mutex::Locker locker(m_lock);
if (m_inflight_syncs.size() < m_max_concurrent_syncs) {
assert(m_inflight_syncs.count(local_image_ctx->id) == 0);
m_inflight_syncs[local_image_ctx->id] = sync_holder_ctx;
start = true;
dout(10) << "ready to start image sync for local_image_id "
<< local_image_ctx->id << " [" << m_inflight_syncs.size() << "/"
<< m_max_concurrent_syncs << "]" << dendl;
if (m_inflight_ops.count(id) > 0) {
dout(20) << "duplicate for already started op " << id << dendl;
} else if (m_max_concurrent_syncs == 0 ||
m_inflight_ops.size() < m_max_concurrent_syncs) {
assert(m_queue.empty());
m_inflight_ops.insert(id);
dout(20) << "ready to start sync for " << id << " ["
<< m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
<< dendl;
} else {
m_sync_queue.push_front(sync_holder_ctx);
dout(10) << "image sync for local_image_id " << local_image_ctx->id
<< " has been queued" << dendl;
m_queue.push_back(std::make_pair(id, on_start));
on_start = nullptr;
dout(20) << "image sync for " << id << " has been queued" << dendl;
}
}
if (start) {
sync_holder_ctx->m_sync->send();
if (on_start != nullptr) {
on_start->complete(0);
}
}
template <typename I>
void ImageSyncThrottler<I>::cancel_sync(const std::string &local_image_id) {
dout(20) << dendl;
C_SyncHolder *sync_holder = nullptr;
bool running_sync = true;
bool ImageSyncThrottler<I>::cancel_op(const std::string &id) {
dout(20) << "id=" << id << dendl;
Context *on_start = nullptr;
{
Mutex::Locker l(m_lock);
if (m_inflight_syncs.empty()) {
// no image sync currently running and neither waiting
return;
}
auto it = m_inflight_syncs.find(local_image_id);
if (it != m_inflight_syncs.end()) {
sync_holder = it->second;
}
if (!sync_holder) {
for (auto it = m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
if ((*it)->m_local_image_id == local_image_id) {
sync_holder = (*it);
m_sync_queue.erase(it);
running_sync = false;
break;
}
Mutex::Locker locker(m_lock);
for (auto it = m_queue.begin(); it != m_queue.end(); ++it) {
if (it->first == id) {
on_start = it->second;
dout(20) << "canceled queued sync for " << id << dendl;
m_queue.erase(it);
break;
}
}
}
if (sync_holder) {
if (running_sync) {
dout(10) << "canceled running image sync for local_image_id "
<< sync_holder->m_local_image_id << dendl;
sync_holder->m_sync->cancel();
} else {
dout(10) << "canceled waiting image sync for local_image_id "
<< sync_holder->m_local_image_id << dendl;
sync_holder->m_on_finish->complete(-ECANCELED);
sync_holder->m_sync->put();
delete sync_holder;
if (on_start == nullptr) {
return false;
}
on_start->complete(-ECANCELED);
return true;
}
template <typename I>
void ImageSyncThrottler<I>::finish_op(const std::string &id) {
dout(20) << "id=" << id << dendl;
if (cancel_op(id)) {
return;
}
Context *on_start = nullptr;
{
Mutex::Locker locker(m_lock);
m_inflight_ops.erase(id);
if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
auto pair = m_queue.front();
m_inflight_ops.insert(pair.first);
dout(20) << "ready to start sync for " << pair.first << " ["
<< m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
<< dendl;
on_start= pair.second;
m_queue.pop_front();
}
}
if (on_start != nullptr) {
on_start->complete(0);
}
}
template <typename I>
void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) {
void ImageSyncThrottler<I>::drain(int r) {
dout(20) << dendl;
C_SyncHolder *next_sync_holder = nullptr;
std::list<std::pair<std::string, Context *>> queue;
{
Mutex::Locker l(m_lock);
m_inflight_syncs.erase(sync_holder->m_local_image_id);
if (m_inflight_syncs.size() < m_max_concurrent_syncs &&
!m_sync_queue.empty()) {
next_sync_holder = m_sync_queue.back();
m_sync_queue.pop_back();
assert(
m_inflight_syncs.count(next_sync_holder->m_local_image_id) == 0);
m_inflight_syncs[next_sync_holder->m_local_image_id] =
next_sync_holder;
dout(10) << "ready to start image sync for local_image_id "
<< next_sync_holder->m_local_image_id << " ["
<< m_inflight_syncs.size() << "/" << m_max_concurrent_syncs
<< "]" << dendl;
}
dout(10) << "currently running image syncs [" << m_inflight_syncs.size()
<< "/" << m_max_concurrent_syncs << "]" << dendl;
Mutex::Locker locker(m_lock);
std::swap(m_queue, queue);
m_inflight_ops.clear();
}
if (next_sync_holder) {
next_sync_holder->m_sync->send();
for (auto &pair : queue) {
pair.second->complete(r);
}
}
template <typename I>
void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
dout(20) << " max=" << max << dendl;
dout(20) << "max=" << max << dendl;
assert(max > 0);
std::list<C_SyncHolder *> next_sync_holders;
std::list<Context *> ops;
{
Mutex::Locker l(m_lock);
this->m_max_concurrent_syncs = max;
Mutex::Locker locker(m_lock);
m_max_concurrent_syncs = max;
// Start waiting syncs in the case of available free slots
while(m_inflight_syncs.size() < m_max_concurrent_syncs
&& !m_sync_queue.empty()) {
C_SyncHolder *next_sync_holder = m_sync_queue.back();
next_sync_holders.push_back(next_sync_holder);
m_sync_queue.pop_back();
assert(
m_inflight_syncs.count(next_sync_holder->m_local_image_id) == 0);
m_inflight_syncs[next_sync_holder->m_local_image_id] = next_sync_holder;
dout(10) << "ready to start image sync for local_image_id "
<< next_sync_holder->m_local_image_id << " ["
<< m_inflight_syncs.size() << "/" << m_max_concurrent_syncs
<< "]" << dendl;
// Start waiting ops in the case of available free slots
while ((m_max_concurrent_syncs == 0 ||
m_inflight_ops.size() < m_max_concurrent_syncs) &&
!m_queue.empty()) {
auto pair = m_queue.front();
m_inflight_ops.insert(pair.first);
dout(20) << "ready to start sync for " << pair.first << " ["
<< m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
<< dendl;
ops.push_back(pair.second);
m_queue.pop_front();
}
}
for (const auto& sync_holder : next_sync_holders) {
sync_holder->m_sync->send();
for (const auto& ctx : ops) {
ctx->complete(0);
}
}
template <typename I>
void ImageSyncThrottler<I>::print_status(Formatter *f, stringstream *ss) {
Mutex::Locker l(m_lock);
void ImageSyncThrottler<I>::print_status(Formatter *f, std::stringstream *ss) {
dout(20) << dendl;
Mutex::Locker locker(m_lock);
if (f) {
f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
f->dump_int("running_syncs", m_inflight_syncs.size());
f->dump_int("waiting_syncs", m_sync_queue.size());
f->dump_int("running_syncs", m_inflight_ops.size());
f->dump_int("waiting_syncs", m_queue.size());
f->flush(*ss);
} else {
*ss << "[ ";
*ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
*ss << "running_syncs=" << m_inflight_syncs.size() << ", ";
*ss << "waiting_syncs=" << m_sync_queue.size() << " ]";
*ss << "running_syncs=" << m_inflight_ops.size() << ", ";
*ss << "waiting_syncs=" << m_queue.size() << " ]";
}
}
@ -256,9 +202,8 @@ const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const {
}
template <typename I>
void ImageSyncThrottler<I>::handle_conf_change(
const struct md_config_t *conf,
const set<string> &changed) {
void ImageSyncThrottler<I>::handle_conf_change(const struct md_config_t *conf,
const set<string> &changed) {
if (changed.count("rbd_mirror_concurrent_image_syncs")) {
set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs);
}

View File

@ -1,88 +1,61 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 SUSE LINUX GmbH
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
#define CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
#ifndef RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
#define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
#include <list>
#include <map>
#include <set>
#include <sstream>
#include <string>
#include <utility>
#include "common/Mutex.h"
#include "librbd/ImageCtx.h"
#include "include/Context.h"
#include "librbd/journal/TypeTraits.h"
class CephContext;
#include "common/Mutex.h"
#include "common/config_obs.h"
class Context;
class ContextWQ;
class SafeTimer;
namespace journal { class Journaler; }
namespace librbd { namespace journal { struct MirrorPeerClientMeta; } }
namespace ceph { class Formatter; }
namespace librbd { class ImageCtx; }
namespace rbd {
namespace mirror {
template <typename> class ImageSync;
class ProgressContext;
/**
* Manage concurrent image-syncs
*/
template <typename ImageCtxT = librbd::ImageCtx>
class ImageSyncThrottler : public md_config_obs_t {
public:
typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
typedef typename TypeTraits::Journaler Journaler;
typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta;
static ImageSyncThrottler *create() {
return new ImageSyncThrottler();
}
void destroy() {
delete this;
}
ImageSyncThrottler();
~ImageSyncThrottler() override;
ImageSyncThrottler(const ImageSyncThrottler&) = delete;
ImageSyncThrottler& operator=(const ImageSyncThrottler&) = delete;
void start_sync(ImageCtxT *local_image_ctx,
ImageCtxT *remote_image_ctx, SafeTimer *timer,
Mutex *timer_lock, const std::string &mirror_uuid,
Journaler *journaler, MirrorPeerClientMeta *client_meta,
ContextWQ *work_queue, Context *on_finish,
ProgressContext *progress_ctx = nullptr);
void cancel_sync(const std::string &local_image_id);
void set_max_concurrent_syncs(uint32_t max);
void start_op(const std::string &id, Context *on_start);
bool cancel_op(const std::string &id);
void finish_op(const std::string &id);
void drain(int r);
void print_status(Formatter *f, std::stringstream *ss);
private:
struct C_SyncHolder;
void handle_sync_finished(C_SyncHolder *sync_holder);
Mutex m_lock;
uint32_t m_max_concurrent_syncs;
std::list<std::pair<std::string, Context *>> m_queue;
std::set<std::string> m_inflight_ops;
const char **get_tracked_conf_keys() const override;
void handle_conf_change(const struct md_config_t *conf,
const std::set<std::string> &changed) override;
uint32_t m_max_concurrent_syncs;
Mutex m_lock;
std::list<C_SyncHolder *> m_sync_queue;
std::map<std::string, C_SyncHolder *> m_inflight_syncs;
};
} // namespace mirror
} // namespace rbd
#endif // CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
extern template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;
#endif // RBD_MIRROR_IMAGE_SYNC_THROTTLER_H

View File

@ -25,12 +25,12 @@ using librbd::util::create_context_callback;
template <typename I>
InstanceReplayer<I>::InstanceReplayer(
Threads<I> *threads, std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<I> image_sync_throttler, RadosRef local_rados,
const std::string &local_mirror_uuid, int64_t local_pool_id)
: m_threads(threads), m_image_deleter(image_deleter),
m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados),
m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
RadosRef local_rados, const std::string &local_mirror_uuid,
int64_t local_pool_id)
: m_threads(threads), m_image_deleter(image_deleter),
m_local_rados(local_rados), m_local_mirror_uuid(local_mirror_uuid),
m_local_pool_id(local_pool_id),
m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
}
template <typename I>
@ -130,7 +130,8 @@ void InstanceReplayer<I>::release_all(Context *on_finish) {
}
template <typename I>
void InstanceReplayer<I>::acquire_image(const std::string &global_image_id,
void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
const std::string &global_image_id,
const std::string &peer_mirror_uuid,
const std::string &peer_image_id,
Context *on_finish) {
@ -145,8 +146,8 @@ void InstanceReplayer<I>::acquire_image(const std::string &global_image_id,
if (it == m_image_replayers.end()) {
auto image_replayer = ImageReplayer<I>::create(
m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
m_local_mirror_uuid, m_local_pool_id, global_image_id);
m_threads, m_image_deleter, instance_watcher, m_local_rados,
m_local_mirror_uuid, m_local_pool_id, global_image_id);
dout(20) << global_image_id << ": creating replayer " << image_replayer
<< dendl;

View File

@ -20,6 +20,7 @@ namespace mirror {
class ImageDeleter;
template <typename> class ImageReplayer;
template <typename> class InstanceWatcher;
template <typename> struct Threads;
template <typename ImageCtxT = librbd::ImageCtx>
@ -27,10 +28,10 @@ class InstanceReplayer {
public:
static InstanceReplayer* create(
Threads<ImageCtxT> *threads, std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler, RadosRef local_rados,
const std::string &local_mirror_uuid, int64_t local_pool_id) {
return new InstanceReplayer(threads, image_deleter, image_sync_throttler,
local_rados, local_mirror_uuid, local_pool_id);
RadosRef local_rados, const std::string &local_mirror_uuid,
int64_t local_pool_id) {
return new InstanceReplayer(threads, image_deleter, local_rados,
local_mirror_uuid, local_pool_id);
}
void destroy() {
delete this;
@ -38,7 +39,6 @@ public:
InstanceReplayer(Threads<ImageCtxT> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
RadosRef local_rados, const std::string &local_mirror_uuid,
int64_t local_pool_id);
~InstanceReplayer();
@ -52,7 +52,8 @@ public:
void add_peer(std::string mirror_uuid, librados::IoCtx io_ctx);
void remove_peer(std::string mirror_uuid);
void acquire_image(const std::string &global_image_id,
void acquire_image(InstanceWatcher<ImageCtxT> *instance_watcher,
const std::string &global_image_id,
const std::string &peer_mirror_uuid,
const std::string &peer_image_id,
Context *on_finish);
@ -109,7 +110,6 @@ private:
Threads<ImageCtxT> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
RadosRef m_local_rados;
std::string m_local_mirror_uuid;
int64_t m_local_pool_id;

View File

@ -10,6 +10,7 @@
#include "librbd/ManagedLock.h"
#include "librbd/Utils.h"
#include "InstanceReplayer.h"
#include "ImageSyncThrottler.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
@ -82,24 +83,36 @@ struct C_RemoveInstanceRequest : public Context {
template <typename I>
struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
InstanceWatcher<I> *instance_watcher;
librbd::watcher::Notifier notifier;
std::string instance_id;
uint64_t request_id;
bufferlist bl;
Context *on_finish;
bool send_to_leader;
std::unique_ptr<librbd::watcher::Notifier> notifier;
librbd::watcher::NotifyResponse response;
atomic_t canceling;
bool canceling = false;
C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
const std::string &instance_id, uint64_t request_id,
bufferlist &&bl, Context *on_finish)
: instance_watcher(instance_watcher),
notifier(instance_watcher->m_work_queue, instance_watcher->m_ioctx,
RBD_MIRROR_INSTANCE_PREFIX + instance_id),
instance_id(instance_id), request_id(request_id), bl(bl),
on_finish(on_finish) {
instance_watcher->m_notify_op_tracker.start_op();
: instance_watcher(instance_watcher), instance_id(instance_id),
request_id(request_id), bl(bl), on_finish(on_finish),
send_to_leader(instance_id.empty()) {
dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
<< ": instance_watcher=" << instance_watcher << ", instance_id="
<< instance_id << ", request_id=" << request_id << dendl;
assert(instance_watcher->m_lock.is_locked());
if (!send_to_leader) {
assert((!instance_id.empty()));
notifier.reset(new librbd::watcher::Notifier(
instance_watcher->m_work_queue,
instance_watcher->m_ioctx,
RBD_MIRROR_INSTANCE_PREFIX + instance_id));
}
instance_watcher->m_notify_op_tracker.start_op();
auto result = instance_watcher->m_notify_ops.insert(
std::make_pair(instance_id, this)).second;
assert(result);
@ -108,13 +121,53 @@ struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
void send() {
dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
notifier.notify(bl, &response, this);
assert(instance_watcher->m_lock.is_locked());
if (canceling) {
dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
<< ": canceling" << dendl;
instance_watcher->m_work_queue->queue(this, -ECANCELED);
return;
}
if (send_to_leader) {
if (instance_watcher->m_leader_instance_id.empty()) {
dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
<< ": suspending" << dendl;
instance_watcher->suspend_notify_request(this);
return;
}
if (instance_watcher->m_leader_instance_id != instance_id) {
auto count = instance_watcher->m_notify_ops.erase(
std::make_pair(instance_id, this));
assert(count > 0);
instance_id = instance_watcher->m_leader_instance_id;
auto result = instance_watcher->m_notify_ops.insert(
std::make_pair(instance_id, this)).second;
assert(result);
notifier.reset(new librbd::watcher::Notifier(
instance_watcher->m_work_queue,
instance_watcher->m_ioctx,
RBD_MIRROR_INSTANCE_PREFIX + instance_id));
}
}
dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
<< ": sendding to " << instance_id << dendl;
notifier->notify(bl, &response, this);
}
void cancel() {
dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
canceling.set(1);
assert(instance_watcher->m_lock.is_locked());
canceling = true;
instance_watcher->unsuspend_notify_request(this);
}
void finish(int r) override {
@ -158,27 +211,35 @@ struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
if (!found) {
if (r == -ETIMEDOUT) {
if (canceling.read()) {
r = -ECANCELED;
} else {
derr << "C_NotifyInstanceRequest: " << this << " " << __func__
<< ": resending after timeout" << dendl;
send();
return;
}
derr << "C_NotifyInstanceRequest: " << this << " " << __func__
<< ": resending after timeout" << dendl;
Mutex::Locker locker(instance_watcher->m_lock);
send();
return;
} else {
r = -EINVAL;
}
} else {
if (r == -ESTALE && send_to_leader) {
derr << "C_NotifyInstanceRequest: " << this << " " << __func__
<< ": resending due to leader change" << dendl;
Mutex::Locker locker(instance_watcher->m_lock);
send();
return;
}
}
}
instance_watcher->m_notify_op_tracker.finish_op();
on_finish->complete(r);
Mutex::Locker locker(instance_watcher->m_lock);
auto result = instance_watcher->m_notify_ops.erase(
{
Mutex::Locker locker(instance_watcher->m_lock);
auto result = instance_watcher->m_notify_ops.erase(
std::make_pair(instance_id, this));
assert(result > 0);
assert(result > 0);
instance_watcher->m_notify_op_tracker.finish_op();
}
delete this;
}
@ -187,6 +248,40 @@ struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
}
};
template <typename I>
struct InstanceWatcher<I>::C_SyncRequest : public Context {
InstanceWatcher<I> *instance_watcher;
std::string sync_id;
Context *on_start;
Context *on_complete = nullptr;
C_NotifyInstanceRequest *req = nullptr;
C_SyncRequest(InstanceWatcher<I> *instance_watcher,
const std::string &sync_id, Context *on_start)
: instance_watcher(instance_watcher), sync_id(sync_id),
on_start(on_start) {
dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
<< sync_id << dendl;
}
void finish(int r) override {
dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r="
<< r << dendl;
if (on_start != nullptr) {
instance_watcher->handle_notify_sync_request(this, r);
} else {
instance_watcher->handle_notify_sync_complete(this, r);
delete this;
}
}
// called twice
void complete(int r) override {
finish(r);
}
};
#undef dout_prefix
#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
<< this << " " << __func__ << ": "
@ -237,6 +332,11 @@ InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
template <typename I>
InstanceWatcher<I>::~InstanceWatcher() {
assert(m_notify_ops.empty());
assert(m_notify_op_tracker.empty());
assert(m_suspended_ops.empty());
assert(m_inflight_sync_reqs.empty());
assert(m_image_sync_throttler == nullptr);
m_instance_lock->destroy();
}
@ -348,6 +448,180 @@ void InstanceWatcher<I>::notify_image_release(
}
}
template <typename I>
void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
Context *on_sync_start) {
dout(20) << "sync_id=" << sync_id << dendl;
Mutex::Locker locker(m_lock);
assert(m_inflight_sync_reqs.count(sync_id) == 0);
uint64_t request_id = ++m_request_seq;
bufferlist bl;
::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
std::move(bl), sync_ctx);
m_inflight_sync_reqs[sync_id] = sync_ctx;
sync_ctx->req->send();
}
template <typename I>
bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
dout(20) << "sync_id=" << sync_id << dendl;
Mutex::Locker locker(m_lock);
auto it = m_inflight_sync_reqs.find(sync_id);
if (it == m_inflight_sync_reqs.end()) {
return false;
}
auto sync_ctx = it->second;
if (sync_ctx->on_start == nullptr) {
return false;
}
assert(sync_ctx->req != nullptr);
sync_ctx->req->cancel();
return true;
}
template <typename I>
void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
const std::string &sync_id) {
dout(20) << "sync_id=" << sync_id << dendl;
Mutex::Locker locker(m_lock);
uint64_t request_id = ++m_request_seq;
bufferlist bl;
::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
auto ctx = new FunctionContext(
[this, sync_id] (int r) {
dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
Mutex::Locker locker(m_lock);
if (r != -ESTALE && m_image_sync_throttler != nullptr) {
m_image_sync_throttler->finish_op(sync_id);
}
});
auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
std::move(bl), ctx);
req->send();
}
template <typename I>
void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
dout(20) << "sync_id=" << sync_id << dendl;
Mutex::Locker locker(m_lock);
auto it = m_inflight_sync_reqs.find(sync_id);
assert(it != m_inflight_sync_reqs.end());
auto sync_ctx = it->second;
assert(sync_ctx->req == nullptr);
m_inflight_sync_reqs.erase(it);
m_work_queue->queue(sync_ctx, 0);
}
template <typename I>
void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
int r) {
dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
Context *on_start = nullptr;
{
Mutex::Locker locker(m_lock);
assert(sync_ctx->req != nullptr);
assert(sync_ctx->on_start != nullptr);
if (sync_ctx->req->canceling) {
r = -ECANCELED;
}
std::swap(sync_ctx->on_start, on_start);
sync_ctx->req = nullptr;
}
on_start->complete(r == -ECANCELED ? r : 0);
if (r == -ECANCELED) {
notify_sync_complete(sync_ctx->sync_id);
}
}
template <typename I>
void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
int r) {
dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
if (sync_ctx->on_complete != nullptr) {
sync_ctx->on_complete->complete(r);
}
}
template <typename I>
void InstanceWatcher<I>::print_sync_status(Formatter *f, stringstream *ss) {
dout(20) << dendl;
Mutex::Locker locker(m_lock);
if (m_image_sync_throttler != nullptr) {
m_image_sync_throttler->print_status(f, ss);
}
}
template <typename I>
void InstanceWatcher<I>::handle_acquire_leader() {
dout(20) << dendl;
Mutex::Locker locker(m_lock);
assert(m_image_sync_throttler == nullptr);
m_image_sync_throttler = ImageSyncThrottler<I>::create();
m_leader_instance_id = m_instance_id;
unsuspend_notify_requests();
}
template <typename I>
void InstanceWatcher<I>::handle_release_leader() {
dout(20) << dendl;
Mutex::Locker locker(m_lock);
assert(m_image_sync_throttler != nullptr);
m_leader_instance_id.clear();
m_image_sync_throttler->drain(-ESTALE);
m_image_sync_throttler->destroy();
m_image_sync_throttler = nullptr;
}
template <typename I>
void InstanceWatcher<I>::handle_update_leader(
const std::string &leader_instance_id) {
dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
Mutex::Locker locker(m_lock);
m_leader_instance_id = leader_instance_id;
if (!m_leader_instance_id.empty()) {
unsuspend_notify_requests();
}
}
template <typename I>
void InstanceWatcher<I>::cancel_notify_requests(
const std::string &instance_id) {
@ -356,13 +630,12 @@ void InstanceWatcher<I>::cancel_notify_requests(
Mutex::Locker locker(m_lock);
for (auto op : m_notify_ops) {
if (op.first == instance_id) {
if (op.first == instance_id && !op.second->send_to_leader) {
op.second->cancel();
}
}
}
template <typename I>
void InstanceWatcher<I>::register_instance() {
assert(m_lock.is_locked());
@ -715,6 +988,46 @@ void InstanceWatcher<I>::handle_break_instance_lock(int r) {
remove_instance_object();
}
template <typename I>
void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
dout(20) << req << dendl;
assert(m_lock.is_locked());
auto result = m_suspended_ops.insert(req).second;
assert(result);
}
template <typename I>
bool InstanceWatcher<I>::unsuspend_notify_request(
C_NotifyInstanceRequest *req) {
dout(20) << req << dendl;
assert(m_lock.is_locked());
auto result = m_suspended_ops.erase(req);
if (result == 0) {
return false;
}
req->send();
return true;
}
template <typename I>
void InstanceWatcher<I>::unsuspend_notify_requests() {
dout(20) << dendl;
assert(m_lock.is_locked());
std::set<C_NotifyInstanceRequest *> suspended_ops;
std::swap(m_suspended_ops, suspended_ops);
for (auto op : suspended_ops) {
op->send();
}
}
template <typename I>
Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
uint64_t request_id,
@ -733,23 +1046,11 @@ Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
delete it->on_notify_ack;
m_requests.erase(it);
} else {
ctx = new FunctionContext(
[this, instance_id, request_id] (int r) {
C_NotifyAck *on_notify_ack = nullptr;
{
// update request state in the requests list
Mutex::Locker locker(m_lock);
Request request(instance_id, request_id);
auto it = m_requests.find(request);
assert(it != m_requests.end());
on_notify_ack = it->on_notify_ack;
m_requests.erase(it);
}
::encode(NotifyAckPayload(instance_id, request_id, r),
on_notify_ack->out);
on_notify_ack->complete(0);
});
ctx = create_async_context_callback(
m_work_queue, new FunctionContext(
[this, instance_id, request_id] (int r) {
complete_request(instance_id, request_id, r);
}));
}
request.on_notify_ack = on_notify_ack;
@ -757,6 +1058,26 @@ Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
return ctx;
}
template <typename I>
void InstanceWatcher<I>::complete_request(const std::string &instance_id,
uint64_t request_id, int r) {
dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
<< dendl;
C_NotifyAck *on_notify_ack;
{
Mutex::Locker locker(m_lock);
Request request(instance_id, request_id);
auto it = m_requests.find(request);
assert(it != m_requests.end());
on_notify_ack = it->on_notify_ack;
m_requests.erase(it);
}
::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
on_notify_ack->complete(0);
}
template <typename I>
void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
uint64_t notifier_id, bufferlist &bl) {
@ -788,8 +1109,9 @@ void InstanceWatcher<I>::handle_image_acquire(
auto ctx = new FunctionContext(
[this, global_image_id, peer_mirror_uuid, peer_image_id,
on_finish] (int r) {
m_instance_replayer->acquire_image(global_image_id, peer_mirror_uuid,
peer_image_id, on_finish);
m_instance_replayer->acquire_image(this, global_image_id,
peer_mirror_uuid, peer_image_id,
on_finish);
m_notify_op_tracker.finish_op();
});
@ -816,6 +1138,58 @@ void InstanceWatcher<I>::handle_image_release(
m_work_queue->queue(ctx, 0);
}
template <typename I>
void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
const std::string &sync_id,
Context *on_finish) {
dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
Mutex::Locker locker(m_lock);
if (m_image_sync_throttler == nullptr) {
dout(20) << "sync request for non-leader" << dendl;
m_work_queue->queue(on_finish, -ESTALE);
return;
}
Context *on_start = create_async_context_callback(
m_work_queue, new FunctionContext(
[this, instance_id, sync_id, on_finish] (int r) {
dout(20) << "handle_sync_request: finish: instance_id=" << instance_id
<< ", sync_id=" << sync_id << ", r=" << r << dendl;
if (r == 0) {
notify_sync_start(instance_id, sync_id);
}
on_finish->complete(r);
}));
m_image_sync_throttler->start_op(sync_id, on_start);
}
template <typename I>
void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
const std::string &sync_id,
Context *on_finish) {
dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
Mutex::Locker locker(m_lock);
auto it = m_inflight_sync_reqs.find(sync_id);
if (it == m_inflight_sync_reqs.end()) {
dout(20) << "not found" << dendl;
m_work_queue->queue(on_finish, 0);
return;
}
auto sync_ctx = it->second;
if (sync_ctx->on_complete != nullptr) {
dout(20) << "duplicate request" << dendl;
m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
}
sync_ctx->on_complete = on_finish;
}
template <typename I>
void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
const ImageAcquirePayload &payload,
@ -847,6 +1221,38 @@ void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
}
}
template <typename I>
void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
const SyncRequestPayload &payload,
C_NotifyAck *on_notify_ack) {
dout(20) << "sync_request: instance_id=" << instance_id << ", "
<< "request_id=" << payload.request_id << dendl;
auto on_finish = prepare_request(instance_id, payload.request_id,
on_notify_ack);
if (on_finish == nullptr) {
return;
}
handle_sync_request(instance_id, payload.sync_id, on_finish);
}
template <typename I>
void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
const SyncStartPayload &payload,
C_NotifyAck *on_notify_ack) {
dout(20) << "sync_start: instance_id=" << instance_id << ", "
<< "request_id=" << payload.request_id << dendl;
auto on_finish = prepare_request(instance_id, payload.request_id,
on_notify_ack);
if (on_finish == nullptr) {
return;
}
handle_sync_start(instance_id, payload.sync_id, on_finish);
}
template <typename I>
void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
const UnknownPayload &payload,

View File

@ -5,6 +5,7 @@
#define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
#include <map>
#include <memory>
#include <set>
#include <string>
#include <vector>
@ -24,6 +25,7 @@ template <typename> class ManagedLock;
namespace rbd {
namespace mirror {
template <typename> class ImageSyncThrottler;
template <typename> class InstanceReplayer;
template <typename> struct Threads;
@ -72,8 +74,18 @@ public:
const std::string &peer_image_id,
bool schedule_delete, Context *on_notify_ack);
void notify_sync_request(const std::string &sync_id, Context *on_sync_start);
bool cancel_sync_request(const std::string &sync_id);
void notify_sync_complete(const std::string &sync_id);
void print_sync_status(Formatter *f, stringstream *ss);
void cancel_notify_requests(const std::string &instance_id);
void handle_acquire_leader();
void handle_release_leader();
void handle_update_leader(const std::string &leader_instance_id);
private:
/**
* @verbatim
@ -105,6 +117,9 @@ private:
*/
struct C_NotifyInstanceRequest;
struct C_SyncRequest;
typedef std::pair<std::string, std::string> Id;
struct HandlePayloadVisitor : public boost::static_visitor<void> {
InstanceWatcher *instance_watcher;
@ -148,11 +163,15 @@ private:
Context *m_on_finish = nullptr;
int m_ret_val = 0;
bool m_removing = false;
std::string m_leader_instance_id;
librbd::managed_lock::Locker m_instance_locker;
std::set<std::pair<std::string, C_NotifyInstanceRequest *>> m_notify_ops;
AsyncOpTracker m_notify_op_tracker;
uint64_t m_request_seq = 0;
std::set<Request> m_requests;
std::set<C_NotifyInstanceRequest *> m_suspended_ops;
std::map<std::string, C_SyncRequest *> m_inflight_sync_reqs;
ImageSyncThrottler<ImageCtxT> *m_image_sync_throttler = nullptr;
void register_instance();
void handle_register_instance(int r);
@ -187,8 +206,20 @@ private:
void break_instance_lock();
void handle_break_instance_lock(int r);
void suspend_notify_request(C_NotifyInstanceRequest *req);
bool unsuspend_notify_request(C_NotifyInstanceRequest *req);
void unsuspend_notify_requests();
void handle_notify_sync_request(C_SyncRequest *sync_ctx, int r);
void handle_notify_sync_complete(C_SyncRequest *sync_ctx, int r);
void notify_sync_start(const std::string &instance_id,
const std::string &sync_id);
Context *prepare_request(const std::string &instance_id, uint64_t request_id,
C_NotifyAck *on_notify_ack);
void complete_request(const std::string &instance_id, uint64_t request_id,
int r);
void handle_notify(uint64_t notify_id, uint64_t handle,
uint64_t notifier_id, bufferlist &bl) override;
@ -202,12 +233,23 @@ private:
const std::string &peer_image_id,
bool schedule_delete, Context *on_finish);
void handle_sync_request(const std::string &instance_id,
const std::string &sync_id, Context *on_finish);
void handle_sync_start(const std::string &instance_id,
const std::string &sync_id, Context *on_finish);
void handle_payload(const std::string &instance_id,
const instance_watcher::ImageAcquirePayload &payload,
C_NotifyAck *on_notify_ack);
void handle_payload(const std::string &instance_id,
const instance_watcher::ImageReleasePayload &payload,
C_NotifyAck *on_notify_ack);
void handle_payload(const std::string &instance_id,
const instance_watcher::SyncRequestPayload &payload,
C_NotifyAck *on_notify_ack);
void handle_payload(const std::string &instance_id,
const instance_watcher::SyncStartPayload &payload,
C_NotifyAck *on_notify_ack);
void handle_payload(const std::string &instance_id,
const instance_watcher::UnknownPayload &payload,
C_NotifyAck *on_notify_ack);

View File

@ -45,6 +45,11 @@ LeaderWatcher<I>::~LeaderWatcher() {
delete m_leader_lock;
}
template <typename I>
std::string LeaderWatcher<I>::get_instance_id() {
return stringify(m_notifier_id);
}
template <typename I>
int LeaderWatcher<I>::init() {
C_SaferCond init_ctx;
@ -552,8 +557,10 @@ void LeaderWatcher<I>::handle_get_locker(int r,
return;
}
bool notify_listener = false;
if (m_locker != locker) {
m_locker = locker;
notify_listener = true;
if (m_acquire_attempts > 1) {
dout(10) << "new lock owner detected -- resetting heartbeat counter"
<< dendl;
@ -566,10 +573,27 @@ void LeaderWatcher<I>::handle_get_locker(int r,
dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
<< "failed attempts to acquire" << dendl;
break_leader_lock();
} else {
schedule_acquire_leader_lock(1);
m_timer_op_tracker.finish_op();
return;
}
schedule_acquire_leader_lock(1);
if (!notify_listener) {
m_timer_op_tracker.finish_op();
return;
}
auto ctx = new FunctionContext(
[this](int r) {
std::string instance_id;
if (get_leader_instance_id(&instance_id)) {
m_listener->update_leader_handler(instance_id);
}
Mutex::Locker timer_locker(m_threads->timer_lock);
Mutex::Locker locker(m_lock);
m_timer_op_tracker.finish_op();
});
m_work_queue->queue(ctx, 0);
}
template <typename I>

View File

@ -33,6 +33,9 @@ public:
virtual void post_acquire_handler(Context *on_finish) = 0;
virtual void pre_release_handler(Context *on_finish) = 0;
virtual void update_leader_handler(
const std::string &leader_instance_id) = 0;
};
LeaderWatcher(Threads<ImageCtxT> *threads, librados::IoCtx &io_ctx,
@ -51,6 +54,8 @@ public:
void release_leader();
void list_instances(std::vector<std::string> *instance_ids);
std::string get_instance_id();
private:
/**
* @verbatim

View File

@ -303,11 +303,8 @@ int PoolReplayer::init()
dout(20) << "connected to " << m_peer << dendl;
m_image_sync_throttler.reset(new ImageSyncThrottler<>());
m_instance_replayer.reset(
InstanceReplayer<>::create(m_threads, m_image_deleter,
m_image_sync_throttler, m_local_rados,
InstanceReplayer<>::create(m_threads, m_image_deleter, m_local_rados,
local_mirror_uuid, m_local_pool_id));
m_instance_replayer->init();
m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
@ -323,6 +320,7 @@ int PoolReplayer::init()
m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
&m_leader_listener));
r = m_leader_watcher->init();
if (r < 0) {
derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
@ -477,7 +475,7 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss)
admin_socket);
f->open_object_section("sync_throttler");
m_image_sync_throttler->print_status(f, ss);
m_instance_watcher->print_sync_status(f, ss);
f->close_section();
m_instance_replayer->print_status(f, ss);
@ -626,11 +624,15 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid,
void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
dout(20) << dendl;
m_instance_watcher->handle_acquire_leader();
init_local_pool_watcher(on_finish);
}
void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
dout(20) << dendl;
m_instance_watcher->handle_release_leader();
shut_down_pool_watchers(on_finish);
}
@ -737,5 +739,11 @@ void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
m_instance_replayer->release_all(on_finish);
}
void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
m_instance_watcher->handle_update_leader(leader_instance_id);
}
} // namespace mirror
} // namespace rbd

View File

@ -99,9 +99,10 @@ private:
void wait_for_update_ops(Context *on_finish);
void handle_wait_for_update_ops(int r, Context *on_finish);
void handle_update_leader(const std::string &leader_instance_id);
Threads<librbd::ImageCtx> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<> m_image_sync_throttler;
mutable Mutex m_lock;
Cond m_cond;
std::atomic<bool> m_stopping = { false };
@ -158,6 +159,11 @@ private:
m_pool_replayer->handle_pre_release_leader(on_finish);
}
void update_leader_handler(
const std::string &leader_instance_id) override {
m_pool_replayer->handle_update_leader(leader_instance_id);
}
private:
PoolReplayer *m_pool_replayer;
} m_leader_listener;

View File

@ -21,7 +21,7 @@
#include "librbd/Utils.h"
#include "librbd/journal/Types.h"
#include "tools/rbd_mirror/ProgressContext.h"
#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/ImageSync.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
@ -41,7 +41,7 @@ template <typename I>
BootstrapRequest<I>::BootstrapRequest(
librados::IoCtx &local_io_ctx,
librados::IoCtx &remote_io_ctx,
std::shared_ptr<ImageSyncThrottler<I>> image_sync_throttler,
InstanceWatcher<I> *instance_watcher,
I **local_image_ctx,
const std::string &local_image_id,
const std::string &remote_image_id,
@ -58,10 +58,10 @@ BootstrapRequest<I>::BootstrapRequest(
: BaseRequest("rbd::mirror::image_replayer::BootstrapRequest",
reinterpret_cast<CephContext*>(local_io_ctx.cct()), on_finish),
m_local_io_ctx(local_io_ctx), m_remote_io_ctx(remote_io_ctx),
m_image_sync_throttler(image_sync_throttler),
m_local_image_ctx(local_image_ctx), m_local_image_id(local_image_id),
m_remote_image_id(remote_image_id), m_global_image_id(global_image_id),
m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
m_instance_watcher(instance_watcher), m_local_image_ctx(local_image_ctx),
m_local_image_id(local_image_id), m_remote_image_id(remote_image_id),
m_global_image_id(global_image_id), m_work_queue(work_queue),
m_timer(timer), m_timer_lock(timer_lock),
m_local_mirror_uuid(local_mirror_uuid),
m_remote_mirror_uuid(remote_mirror_uuid), m_journaler(journaler),
m_client_meta(client_meta), m_progress_ctx(progress_ctx),
@ -88,7 +88,9 @@ void BootstrapRequest<I>::cancel() {
Mutex::Locker locker(m_lock);
m_canceled = true;
m_image_sync_throttler->cancel_sync(m_local_image_id);
if (m_image_sync != nullptr) {
m_image_sync->cancel();
}
}
template <typename I>
@ -643,19 +645,22 @@ void BootstrapRequest<I>::image_sync() {
{
Mutex::Locker locker(m_lock);
if (!m_canceled) {
m_image_sync_throttler->start_sync(*m_local_image_ctx,
m_remote_image_ctx, m_timer,
m_timer_lock,
m_local_mirror_uuid, m_journaler,
m_client_meta, m_work_queue, ctx,
m_progress_ctx);
if (m_canceled) {
m_ret_val = -ECANCELED;
} else {
assert(m_image_sync == nullptr);
m_image_sync = ImageSync<I>::create(
*m_local_image_ctx, m_remote_image_ctx, m_timer, m_timer_lock,
m_local_mirror_uuid, m_journaler, m_client_meta, m_work_queue,
m_instance_watcher, ctx, m_progress_ctx);
m_image_sync->get();
m_image_sync->send();
return;
}
}
dout(10) << ": request canceled" << dendl;
m_ret_val = -ECANCELED;
close_remote_image();
}
@ -663,14 +668,21 @@ template <typename I>
void BootstrapRequest<I>::handle_image_sync(int r) {
dout(20) << ": r=" << r << dendl;
if (m_canceled) {
dout(10) << ": request canceled" << dendl;
m_ret_val = -ECANCELED;
}
{
Mutex::Locker locker(m_lock);
if (r < 0) {
derr << ": failed to sync remote image: " << cpp_strerror(r) << dendl;
m_ret_val = r;
m_image_sync->put();
m_image_sync = nullptr;
if (m_canceled) {
dout(10) << ": request canceled" << dendl;
m_ret_val = -ECANCELED;
}
if (r < 0) {
derr << ": failed to sync remote image: " << cpp_strerror(r) << dendl;
m_ret_val = r;
}
}
close_remote_image();

View File

@ -27,6 +27,9 @@ namespace mirror {
class ProgressContext;
template <typename> class ImageSync;
template <typename> class InstanceWatcher;
namespace image_replayer {
template <typename ImageCtxT = librbd::ImageCtx>
@ -40,7 +43,7 @@ public:
static BootstrapRequest* create(
librados::IoCtx &local_io_ctx,
librados::IoCtx &remote_io_ctx,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
InstanceWatcher<ImageCtxT> *instance_watcher,
ImageCtxT **local_image_ctx,
const std::string &local_image_id,
const std::string &remote_image_id,
@ -55,7 +58,7 @@ public:
bool *do_resync,
ProgressContext *progress_ctx = nullptr) {
return new BootstrapRequest(local_io_ctx, remote_io_ctx,
image_sync_throttler, local_image_ctx,
instance_watcher, local_image_ctx,
local_image_id, remote_image_id,
global_image_id, work_queue, timer, timer_lock,
local_mirror_uuid, remote_mirror_uuid,
@ -65,7 +68,7 @@ public:
BootstrapRequest(librados::IoCtx &local_io_ctx,
librados::IoCtx &remote_io_ctx,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
InstanceWatcher<ImageCtxT> *instance_watcher,
ImageCtxT **local_image_ctx,
const std::string &local_image_id,
const std::string &remote_image_id,
@ -145,7 +148,7 @@ private:
librados::IoCtx &m_local_io_ctx;
librados::IoCtx &m_remote_io_ctx;
ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
InstanceWatcher<ImageCtxT> *m_instance_watcher;
ImageCtxT **m_local_image_ctx;
std::string m_local_image_id;
std::string m_remote_image_id;
@ -159,6 +162,7 @@ private:
MirrorPeerClientMeta *m_client_meta;
ProgressContext *m_progress_ctx;
bool *m_do_resync;
Mutex m_lock;
bool m_canceled = false;
@ -168,6 +172,7 @@ private:
ImageCtxT *m_remote_image_ctx = nullptr;
bool m_primary = false;
int m_ret_val = 0;
ImageSync<ImageCtxT> *m_image_sync = nullptr;
bufferlist m_out_bl;

View File

@ -58,22 +58,34 @@ private:
} // anonymous namespace
void ImagePayloadBase::encode(bufferlist &bl) const {
void PayloadBase::encode(bufferlist &bl) const {
::encode(request_id, bl);
}
void PayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
::decode(request_id, iter);
}
void PayloadBase::dump(Formatter *f) const {
f->dump_unsigned("request_id", request_id);
}
void ImagePayloadBase::encode(bufferlist &bl) const {
PayloadBase::encode(bl);
::encode(global_image_id, bl);
::encode(peer_mirror_uuid, bl);
::encode(peer_image_id, bl);
}
void ImagePayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
::decode(request_id, iter);
PayloadBase::decode(version, iter);
::decode(global_image_id, iter);
::decode(peer_mirror_uuid, iter);
::decode(peer_image_id, iter);
}
void ImagePayloadBase::dump(Formatter *f) const {
f->dump_unsigned("request_id", request_id);
PayloadBase::dump(f);
f->dump_string("global_image_id", global_image_id);
f->dump_string("peer_mirror_uuid", peer_mirror_uuid);
f->dump_string("peer_image_id", peer_image_id);
@ -94,6 +106,21 @@ void ImageReleasePayload::dump(Formatter *f) const {
f->dump_bool("schedule_delete", schedule_delete);
}
void SyncPayloadBase::encode(bufferlist &bl) const {
PayloadBase::encode(bl);
::encode(sync_id, bl);
}
void SyncPayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
PayloadBase::decode(version, iter);
::decode(sync_id, iter);
}
void SyncPayloadBase::dump(Formatter *f) const {
PayloadBase::dump(f);
f->dump_string("sync_id", sync_id);
}
void UnknownPayload::encode(bufferlist &bl) const {
assert(false);
}
@ -124,6 +151,12 @@ void NotifyMessage::decode(bufferlist::iterator& iter) {
case NOTIFY_OP_IMAGE_RELEASE:
payload = ImageReleasePayload();
break;
case NOTIFY_OP_SYNC_REQUEST:
payload = SyncRequestPayload();
break;
case NOTIFY_OP_SYNC_START:
payload = SyncStartPayload();
break;
default:
payload = UnknownPayload();
break;
@ -144,6 +177,12 @@ void NotifyMessage::generate_test_instances(std::list<NotifyMessage *> &o) {
o.push_back(new NotifyMessage(ImageReleasePayload()));
o.push_back(new NotifyMessage(ImageReleasePayload(1, "gid", "uuid", "id",
true)));
o.push_back(new NotifyMessage(SyncRequestPayload()));
o.push_back(new NotifyMessage(SyncRequestPayload(1, "sync_id")));
o.push_back(new NotifyMessage(SyncStartPayload()));
o.push_back(new NotifyMessage(SyncStartPayload(1, "sync_id")));
}
std::ostream &operator<<(std::ostream &out, const NotifyOp &op) {
@ -154,6 +193,12 @@ std::ostream &operator<<(std::ostream &out, const NotifyOp &op) {
case NOTIFY_OP_IMAGE_RELEASE:
out << "ImageRelease";
break;
case NOTIFY_OP_SYNC_REQUEST:
out << "SyncRequest";
break;
case NOTIFY_OP_SYNC_START:
out << "SyncStart";
break;
default:
out << "Unknown (" << static_cast<uint32_t>(op) << ")";
break;

View File

@ -21,21 +21,36 @@ namespace instance_watcher {
enum NotifyOp {
NOTIFY_OP_IMAGE_ACQUIRE = 0,
NOTIFY_OP_IMAGE_RELEASE = 1,
NOTIFY_OP_SYNC_REQUEST = 2,
NOTIFY_OP_SYNC_START = 3,
};
struct ImagePayloadBase {
struct PayloadBase {
uint64_t request_id;
PayloadBase() : request_id(0) {
}
PayloadBase(uint64_t request_id) : request_id(request_id) {
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::iterator &iter);
void dump(Formatter *f) const;
};
struct ImagePayloadBase : public PayloadBase {
std::string global_image_id;
std::string peer_mirror_uuid;
std::string peer_image_id;
ImagePayloadBase() : request_id(0) {
ImagePayloadBase() : PayloadBase() {
}
ImagePayloadBase(uint64_t request_id, const std::string &global_image_id,
const std::string &peer_mirror_uuid,
const std::string &peer_image_id)
: request_id(request_id), global_image_id(global_image_id),
: PayloadBase(request_id), global_image_id(global_image_id),
peer_mirror_uuid(peer_mirror_uuid), peer_image_id(peer_image_id) {
}
@ -79,6 +94,43 @@ struct ImageReleasePayload : public ImagePayloadBase {
void dump(Formatter *f) const;
};
struct SyncPayloadBase : public PayloadBase {
std::string sync_id;
SyncPayloadBase() : PayloadBase() {
}
SyncPayloadBase(uint64_t request_id, const std::string &sync_id)
: PayloadBase(request_id), sync_id(sync_id) {
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::iterator &iter);
void dump(Formatter *f) const;
};
struct SyncRequestPayload : public SyncPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_SYNC_REQUEST;
SyncRequestPayload() : SyncPayloadBase() {
}
SyncRequestPayload(uint64_t request_id, const std::string &sync_id)
: SyncPayloadBase(request_id, sync_id) {
}
};
struct SyncStartPayload : public SyncPayloadBase {
static const NotifyOp NOTIFY_OP = NOTIFY_OP_SYNC_START;
SyncStartPayload() : SyncPayloadBase() {
}
SyncStartPayload(uint64_t request_id, const std::string &sync_id)
: SyncPayloadBase(request_id, sync_id) {
}
};
struct UnknownPayload {
static const NotifyOp NOTIFY_OP = static_cast<NotifyOp>(-1);
@ -92,6 +144,8 @@ struct UnknownPayload {
typedef boost::variant<ImageAcquirePayload,
ImageReleasePayload,
SyncRequestPayload,
SyncStartPayload,
UnknownPayload> Payload;
struct NotifyMessage {

View File

@ -16,9 +16,9 @@ namespace mirror {
namespace leader_watcher {
enum NotifyOp {
NOTIFY_OP_HEARTBEAT = 0,
NOTIFY_OP_LOCK_ACQUIRED = 1,
NOTIFY_OP_LOCK_RELEASED = 2,
NOTIFY_OP_HEARTBEAT = 0,
NOTIFY_OP_LOCK_ACQUIRED = 1,
NOTIFY_OP_LOCK_RELEASED = 2,
};
struct HeartbeatPayload {

View File

@ -2,7 +2,7 @@
// vim: ts=8 sw=2 smarttab
#include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
#include "common/dout.h"
#include "common/debug.h"
#include "common/errno.h"
#include "cls/rbd/cls_rbd_client.h"
#include "librbd/Utils.h"

View File

@ -11,7 +11,6 @@
#include <vector>
#include "include/rbd/librbd.hpp"
#include "ImageSyncThrottler.h"
namespace rbd {
namespace mirror {
@ -20,9 +19,6 @@ typedef shared_ptr<librados::Rados> RadosRef;
typedef shared_ptr<librados::IoCtx> IoCtxRef;
typedef shared_ptr<librbd::Image> ImageRef;
template <typename I = librbd::ImageCtx>
using ImageSyncThrottlerRef = std::shared_ptr<ImageSyncThrottler<I>>;
struct ImageId {
std::string global_id;
std::string id;