Merge pull request #13803 from trociny/wip-18785

rbd-mirror: separate ImageReplayer handling from Replayer

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2017-04-09 09:59:57 -04:00 committed by GitHub
commit 0b61e113fd
12 changed files with 1040 additions and 273 deletions

View File

@ -19,6 +19,7 @@ add_executable(unittest_rbd_mirror
test_mock_ImageReplayer.cc
test_mock_ImageSync.cc
test_mock_ImageSyncThrottler.cc
test_mock_InstanceReplayer.cc
test_mock_InstanceWatcher.cc
test_mock_LeaderWatcher.cc
test_mock_PoolWatcher.cc

View File

@ -0,0 +1,200 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "test/librbd/mock/MockImageCtx.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceReplayer.h"
#include "tools/rbd_mirror/Threads.h"
namespace librbd {
namespace {
struct MockTestImageCtx : public MockImageCtx {
MockTestImageCtx(librbd::ImageCtx &image_ctx)
: librbd::MockImageCtx(image_ctx) {
}
};
} // anonymous namespace
} // namespace librbd
namespace rbd {
namespace mirror {
template <>
struct Threads<librbd::MockTestImageCtx> {
Mutex &timer_lock;
SafeTimer *timer;
ContextWQ *work_queue;
Threads(Threads<librbd::ImageCtx> *threads)
: timer_lock(threads->timer_lock), timer(threads->timer),
work_queue(threads->work_queue) {
}
};
template<>
struct ImageReplayer<librbd::MockTestImageCtx> {
static ImageReplayer* s_instance;
std::string global_image_id;
static ImageReplayer *create(
Threads<librbd::MockTestImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<librbd::MockTestImageCtx> image_sync_throttler,
RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
const std::string &global_image_id) {
assert(s_instance != nullptr);
s_instance->global_image_id = global_image_id;
return s_instance;
}
ImageReplayer() {
assert(s_instance == nullptr);
s_instance = this;
}
virtual ~ImageReplayer() {
assert(s_instance == this);
s_instance = nullptr;
}
MOCK_METHOD0(destroy, void());
MOCK_METHOD2(start, void(Context *, bool));
MOCK_METHOD2(stop, void(Context *, bool));
MOCK_METHOD0(restart, void());
MOCK_METHOD0(flush, void());
MOCK_METHOD2(print_status, void(Formatter *, stringstream *));
MOCK_METHOD1(set_remote_images, void(const PeerImages &));
MOCK_METHOD2(remove_remote_image, void(const std::string &,
const std::string &));
MOCK_METHOD0(remote_images_empty, bool());
MOCK_METHOD0(get_global_image_id, const std::string &());
MOCK_METHOD0(get_local_image_id, const std::string &());
MOCK_METHOD0(is_running, bool());
MOCK_METHOD0(is_stopped, bool());
MOCK_METHOD0(is_blacklisted, bool());
};
template<>
struct ImageSyncThrottler<librbd::MockTestImageCtx> {
ImageSyncThrottler() {
}
virtual ~ImageSyncThrottler() {
}
};
ImageReplayer<librbd::MockTestImageCtx>* ImageReplayer<librbd::MockTestImageCtx>::s_instance = nullptr;
} // namespace mirror
} // namespace rbd
// template definitions
#include "tools/rbd_mirror/InstanceReplayer.cc"
namespace rbd {
namespace mirror {
using ::testing::_;
using ::testing::InSequence;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::ReturnRef;
class TestMockInstanceReplayer : public TestMockFixture {
public:
typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
typedef Threads<librbd::MockTestImageCtx> MockThreads;
void SetUp() override {
TestMockFixture::SetUp();
m_mock_threads = new MockThreads(m_threads);
m_image_deleter.reset(
new rbd::mirror::ImageDeleter(m_threads->work_queue, m_threads->timer,
&m_threads->timer_lock));
m_image_sync_throttler.reset(
new rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>());
}
void TearDown() override {
delete m_mock_threads;
TestMockFixture::TearDown();
}
MockThreads *m_mock_threads;
std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
std::shared_ptr<rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>>
m_image_sync_throttler;
};
TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
MockImageReplayer mock_image_replayer;
MockInstanceReplayer instance_replayer(
m_mock_threads, m_image_deleter, m_image_sync_throttler,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
"local_mirror_uuid", m_local_io_ctx.get_id());
std::string global_image_id("global_image_id");
rbd::mirror::instance_watcher::PeerImageIds peer_image_ids =
{{"remote_mirror_uuid", "remote_image_id"}};
EXPECT_CALL(mock_image_replayer, get_global_image_id())
.WillRepeatedly(ReturnRef(global_image_id));
EXPECT_CALL(mock_image_replayer, is_blacklisted())
.WillRepeatedly(Return(false));
InSequence seq;
instance_replayer.init();
instance_replayer.set_peers({{"remote_mirror_uuid", m_remote_io_ctx}});
// Acquire
C_SaferCond on_acquire;
EXPECT_CALL(mock_image_replayer, set_remote_images(_));
EXPECT_CALL(mock_image_replayer, is_stopped())
.WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, start(nullptr, false));
instance_replayer.acquire_image(global_image_id, peer_image_ids, &on_acquire);
ASSERT_EQ(0, on_acquire.wait());
// Release
C_SaferCond on_release;
EXPECT_CALL(mock_image_replayer,
remove_remote_image("remote_mirror_uuid", "remote_image_id"));
EXPECT_CALL(mock_image_replayer, remote_images_empty())
.WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, is_stopped())
.WillOnce(Return(false));
EXPECT_CALL(mock_image_replayer, is_running())
.WillOnce(Return(false));
EXPECT_CALL(mock_image_replayer, is_stopped())
.WillOnce(Return(false));
EXPECT_CALL(mock_image_replayer, is_running())
.WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, stop(_, false))
.WillOnce(CompleteContext(0));
EXPECT_CALL(mock_image_replayer, is_stopped())
.WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, destroy());
instance_replayer.release_image("global_image_id", peer_image_ids, false,
&on_release);
ASSERT_EQ(0, on_release.wait());
instance_replayer.shut_down();
}
} // namespace mirror
} // namespace rbd

View File

@ -1,4 +1,5 @@
add_library(rbd_mirror_types STATIC
instance_watcher/Types.cc
leader_watcher/Types.cc)
set(rbd_mirror_internal
@ -7,6 +8,7 @@ set(rbd_mirror_internal
ImageReplayer.cc
ImageSync.cc
ImageSyncThrottler.cc
InstanceReplayer.cc
InstanceWatcher.cc
Instances.cc
LeaderWatcher.cc

View File

@ -326,7 +326,7 @@ void ImageReplayer<I>::add_remote_image(const std::string &mirror_uuid,
librados::IoCtx &io_ctx) {
Mutex::Locker locker(m_lock);
RemoteImage remote_image(mirror_uuid, image_id, io_ctx);
PeerImage remote_image(mirror_uuid, io_ctx, image_id);
auto it = m_remote_images.find(remote_image);
if (it == m_remote_images.end()) {
m_remote_images.insert(remote_image);
@ -340,6 +340,12 @@ void ImageReplayer<I>::remove_remote_image(const std::string &mirror_uuid,
m_remote_images.erase({mirror_uuid, image_id});
}
template <typename I>
void ImageReplayer<I>::set_remote_images(const PeerImages &remote_images) {
Mutex::Locker locker(m_lock);
m_remote_images = remote_images;
}
template <typename I>
bool ImageReplayer<I>::remote_images_empty() const {
Mutex::Locker locker(m_lock);

View File

@ -69,6 +69,20 @@ public:
STATE_STOPPED,
};
static ImageReplayer *create(
Threads<librbd::ImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
const std::string &global_image_id) {
return new ImageReplayer(threads, image_deleter, image_sync_throttler,
local, local_mirror_uuid, local_pool_id,
global_image_id);
}
void destroy() {
delete this;
}
ImageReplayer(Threads<librbd::ImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
@ -96,6 +110,8 @@ public:
librados::IoCtx &remote_io_ctx);
void remove_remote_image(const std::string &remote_mirror_uuid,
const std::string &remote_image_id);
void set_remote_images(const PeerImages &remote_images);
bool remote_images_empty() const;
inline int64_t get_local_pool_id() const {
@ -204,37 +220,6 @@ protected:
bool on_replay_interrupted();
private:
struct RemoteImage {
std::string mirror_uuid;
std::string image_id;
librados::IoCtx io_ctx;
RemoteImage() {
}
RemoteImage(const std::string &mirror_uuid,
const std::string &image_id)
: mirror_uuid(mirror_uuid), image_id(image_id) {
}
RemoteImage(const std::string &mirror_uuid,
const std::string &image_id,
librados::IoCtx &io_ctx)
: mirror_uuid(mirror_uuid), image_id(image_id), io_ctx(io_ctx) {
}
inline bool operator<(const RemoteImage &rhs) const {
if (mirror_uuid != rhs.mirror_uuid) {
return mirror_uuid < rhs.mirror_uuid;
} else {
return image_id < rhs.image_id;
}
}
inline bool operator==(const RemoteImage &rhs) const {
return (mirror_uuid == rhs.mirror_uuid && image_id == rhs.image_id);
}
};
typedef std::set<RemoteImage> RemoteImages;
typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
typedef boost::optional<State> OptionalState;
@ -274,8 +259,8 @@ private:
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
RemoteImages m_remote_images;
RemoteImage m_remote_image;
PeerImages m_remote_images;
PeerImage m_remote_image;
RadosRef m_local;
std::string m_local_mirror_uuid;

View File

@ -0,0 +1,496 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "include/stringify.h"
#include "common/Timer.h"
#include "common/debug.h"
#include "common/errno.h"
#include "librbd/Utils.h"
#include "ImageReplayer.h"
#include "InstanceReplayer.h"
#include "Threads.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
<< this << " " << __func__ << ": "
namespace rbd {
namespace mirror {
using librbd::util::create_async_context_callback;
using librbd::util::create_context_callback;
template <typename I>
InstanceReplayer<I>::InstanceReplayer(
Threads<I> *threads, std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<I> image_sync_throttler, RadosRef local_rados,
const std::string &local_mirror_uuid, int64_t local_pool_id)
: m_threads(threads), m_image_deleter(image_deleter),
m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados),
m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
}
template <typename I>
InstanceReplayer<I>::~InstanceReplayer() {
assert(m_image_state_check_task == nullptr);
assert(m_async_op_tracker.empty());
assert(m_image_replayers.empty());
}
template <typename I>
int InstanceReplayer<I>::init() {
C_SaferCond init_ctx;
init(&init_ctx);
return init_ctx.wait();
}
template <typename I>
void InstanceReplayer<I>::init(Context *on_finish) {
dout(20) << dendl;
Context *ctx = new FunctionContext(
[this, on_finish] (int r) {
{
Mutex::Locker timer_locker(m_threads->timer_lock);
schedule_image_state_check_task();
}
on_finish->complete(0);
});
m_threads->work_queue->queue(ctx, 0);
}
template <typename I>
void InstanceReplayer<I>::shut_down() {
C_SaferCond shut_down_ctx;
shut_down(&shut_down_ctx);
int r = shut_down_ctx.wait();
assert(r == 0);
}
template <typename I>
void InstanceReplayer<I>::shut_down(Context *on_finish) {
dout(20) << dendl;
Mutex::Locker locker(m_lock);
assert(m_on_shut_down == nullptr);
m_on_shut_down = on_finish;
Context *ctx = new FunctionContext(
[this] (int r) {
cancel_image_state_check_task();
wait_for_ops();
});
m_threads->work_queue->queue(ctx, 0);
}
template <typename I>
void InstanceReplayer<I>::set_peers(const Peers &peers) {
dout(20) << dendl;
Mutex::Locker locker(m_lock);
m_peers = peers;
}
template <typename I>
void InstanceReplayer<I>::release_all(Context *on_finish) {
dout(20) << dendl;
Mutex::Locker locker(m_lock);
C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
it = m_image_replayers.erase(it)) {
auto image_replayer = it->second;
auto ctx = gather_ctx->new_sub();
ctx = new FunctionContext(
[image_replayer, ctx] (int r) {
image_replayer->destroy();
ctx->complete(0);
});
stop_image_replayer(image_replayer, ctx);
}
gather_ctx->activate();
}
template <typename I>
void InstanceReplayer<I>::acquire_image(
const std::string &global_image_id,
const instance_watcher::PeerImageIds &peers, Context *on_finish) {
dout(20) << "global_image_id=" << global_image_id << dendl;
Mutex::Locker locker(m_lock);
assert(m_on_shut_down == nullptr);
auto it = m_image_replayers.find(global_image_id);
if (it == m_image_replayers.end()) {
auto image_replayer = ImageReplayer<I>::create(
m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
m_local_mirror_uuid, m_local_pool_id, global_image_id);
dout(20) << global_image_id << ": creating replayer " << image_replayer
<< dendl;
it = m_image_replayers.insert(std::make_pair(global_image_id,
image_replayer)).first;
}
auto image_replayer = it->second;
PeerImages remote_images;
for (auto &peer : peers) {
auto it = m_peers.find(Peer(peer.mirror_uuid));
assert(it != m_peers.end());
auto io_ctx = it->io_ctx;
remote_images.insert({peer.mirror_uuid, io_ctx, peer.image_id});
}
image_replayer->set_remote_images(remote_images);
start_image_replayer(image_replayer);
m_threads->work_queue->queue(on_finish, 0);
}
template <typename I>
void InstanceReplayer<I>::release_image(
const std::string &global_image_id,
const instance_watcher::PeerImageIds &peers, bool schedule_delete,
Context *on_finish) {
dout(20) << "global_image_id=" << global_image_id << ", "
<< "schedule_delete=" << schedule_delete << dendl;
Mutex::Locker locker(m_lock);
assert(m_on_shut_down == nullptr);
auto it = m_image_replayers.find(global_image_id);
if (it == m_image_replayers.end()) {
dout(20) << global_image_id << ": not found" << dendl;
m_threads->work_queue->queue(on_finish, 0);
return;
}
auto image_replayer = it->second;
for (auto &peer : peers) {
image_replayer->remove_remote_image(peer.mirror_uuid, peer.image_id);
}
if (!image_replayer->remote_images_empty()) {
dout(20) << global_image_id << ": still has remote images" << dendl;
m_threads->work_queue->queue(on_finish, 0);
return;
}
m_image_replayers.erase(it);
on_finish = new FunctionContext(
[image_replayer, on_finish] (int r) {
image_replayer->destroy();
on_finish->complete(0);
});
if (schedule_delete) {
on_finish = new FunctionContext(
[this, image_replayer, on_finish] (int r) {
auto global_image_id = image_replayer->get_global_image_id();
auto local_image_id = image_replayer->get_local_image_id();
if (local_image_id.empty()) {
dout(20) << global_image_id << ": unknown local_image_id"
<< " (image does not exist or primary), skipping delete"
<< dendl;
} else {
m_image_deleter->schedule_image_delete(
m_local_rados, m_local_pool_id, local_image_id, global_image_id);
}
on_finish->complete(0);
});
}
stop_image_replayer(image_replayer, on_finish);
}
template <typename I>
void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
dout(20) << dendl;
if (!f) {
return;
}
Mutex::Locker locker(m_lock);
f->open_array_section("image_replayers");
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
image_replayer->print_status(f, ss);
}
f->close_section();
}
template <typename I>
void InstanceReplayer<I>::start()
{
dout(20) << dendl;
Mutex::Locker locker(m_lock);
m_manual_stop = false;
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
image_replayer->start(nullptr, true);
}
}
template <typename I>
void InstanceReplayer<I>::stop()
{
dout(20) << dendl;
Mutex::Locker locker(m_lock);
m_manual_stop = true;
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
image_replayer->stop(nullptr, true);
}
}
template <typename I>
void InstanceReplayer<I>::restart()
{
dout(20) << dendl;
Mutex::Locker locker(m_lock);
m_manual_stop = false;
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
image_replayer->restart();
}
}
template <typename I>
void InstanceReplayer<I>::flush()
{
dout(20) << "enter" << dendl;
Mutex::Locker locker(m_lock);
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
image_replayer->flush();
}
}
template <typename I>
void InstanceReplayer<I>::start_image_replayer(
ImageReplayer<I> *image_replayer) {
assert(m_lock.is_locked());
std::string global_image_id = image_replayer->get_global_image_id();
dout(20) << "global_image_id=" << global_image_id << dendl;
if (!image_replayer->is_stopped()) {
return;
} else if (image_replayer->is_blacklisted()) {
derr << "blacklisted detected during image replay" << dendl;
return;
}
FunctionContext *ctx = new FunctionContext(
[this, global_image_id] (int r) {
dout(20) << "image deleter result: r=" << r << ", "
<< "global_image_id=" << global_image_id << dendl;
Mutex::Locker locker(m_lock);
m_async_op_tracker.finish_op();
if (r == -ESTALE || r == -ECANCELED) {
return;
}
auto it = m_image_replayers.find(global_image_id);
if (it == m_image_replayers.end()) {
return;
}
auto image_replayer = it->second;
if (r >= 0) {
image_replayer->start(nullptr, false);
} else {
start_image_replayer(image_replayer);
}
});
m_async_op_tracker.start_op();
m_image_deleter->wait_for_scheduled_deletion(
m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
}
template <typename I>
void InstanceReplayer<I>::start_image_replayers() {
dout(20) << dendl;
Context *ctx = new FunctionContext(
[this] (int r) {
Mutex::Locker locker(m_lock);
m_async_op_tracker.finish_op();
if (m_on_shut_down != nullptr) {
return;
}
for (auto &it : m_image_replayers) {
start_image_replayer(it.second);
}
});
m_async_op_tracker.start_op();
m_threads->work_queue->queue(ctx, 0);
}
template <typename I>
void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
Context *on_finish) {
dout(20) << image_replayer << " global_image_id="
<< image_replayer->get_global_image_id() << ", on_finish="
<< on_finish << dendl;
if (image_replayer->is_stopped()) {
m_threads->work_queue->queue(on_finish, 0);
return;
}
m_async_op_tracker.start_op();
Context *ctx = create_async_context_callback(
m_threads->work_queue, new FunctionContext(
[this, image_replayer, on_finish] (int r) {
stop_image_replayer(image_replayer, on_finish);
m_async_op_tracker.finish_op();
}));
if (image_replayer->is_running()) {
image_replayer->stop(ctx, false);
} else {
int after = 1;
dout(20) << "scheduling image replayer " << image_replayer << " stop after "
<< after << " sec (task " << ctx << ")" << dendl;
ctx = new FunctionContext(
[this, after, ctx] (int r) {
Mutex::Locker timer_locker(m_threads->timer_lock);
m_threads->timer->add_event_after(after, ctx);
});
m_threads->work_queue->queue(ctx, 0);
}
}
template <typename I>
void InstanceReplayer<I>::wait_for_ops() {
dout(20) << dendl;
Context *ctx = create_context_callback<
InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
m_async_op_tracker.wait_for_ops(ctx);
}
template <typename I>
void InstanceReplayer<I>::handle_wait_for_ops(int r) {
dout(20) << "r=" << r << dendl;
assert(r == 0);
Mutex::Locker locker(m_lock);
stop_image_replayers();
}
template <typename I>
void InstanceReplayer<I>::stop_image_replayers() {
dout(20) << dendl;
assert(m_lock.is_locked());
Context *ctx = create_async_context_callback(
m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
&InstanceReplayer<I>::handle_stop_image_replayers>(this));
C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
for (auto &it : m_image_replayers) {
stop_image_replayer(it.second, gather_ctx->new_sub());
}
gather_ctx->activate();
}
template <typename I>
void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
dout(20) << "r=" << r << dendl;
assert(r == 0);
Context *on_finish = nullptr;
{
Mutex::Locker locker(m_lock);
for (auto &it : m_image_replayers) {
assert(it.second->is_stopped());
it.second->destroy();
}
m_image_replayers.clear();
assert(m_on_shut_down != nullptr);
std::swap(on_finish, m_on_shut_down);
}
on_finish->complete(r);
}
template <typename I>
void InstanceReplayer<I>::cancel_image_state_check_task() {
Mutex::Locker timer_locker(m_threads->timer_lock);
if (m_image_state_check_task == nullptr) {
return;
}
dout(20) << m_image_state_check_task << dendl;
bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
assert(canceled);
m_image_state_check_task = nullptr;
}
template <typename I>
void InstanceReplayer<I>::schedule_image_state_check_task() {
assert(m_threads->timer_lock.is_locked());
assert(m_image_state_check_task == nullptr);
m_image_state_check_task = new FunctionContext(
[this](int r) {
assert(m_threads->timer_lock.is_locked());
m_image_state_check_task = nullptr;
schedule_image_state_check_task();
start_image_replayers();
});
int after =
max(1, g_ceph_context->_conf->rbd_mirror_image_state_check_interval);
dout(20) << "scheduling image state check after " << after << " sec (task "
<< m_image_state_check_task << ")" << dendl;
m_threads->timer->add_event_after(after, m_image_state_check_task);
}
} // namespace mirror
} // namespace rbd
template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;

View File

@ -0,0 +1,120 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef RBD_MIRROR_INSTANCE_REPLAYER_H
#define RBD_MIRROR_INSTANCE_REPLAYER_H
#include <map>
#include <sstream>
#include "common/AsyncOpTracker.h"
#include "common/Formatter.h"
#include "common/Mutex.h"
#include "tools/rbd_mirror/instance_watcher/Types.h"
#include "types.h"
namespace librbd { class ImageCtx; }
namespace rbd {
namespace mirror {
class ImageDeleter;
template <typename> class ImageReplayer;
template <typename> struct Threads;
template <typename ImageCtxT = librbd::ImageCtx>
class InstanceReplayer {
public:
static InstanceReplayer* create(
Threads<ImageCtxT> *threads, std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler, RadosRef local_rados,
const std::string &local_mirror_uuid, int64_t local_pool_id) {
return new InstanceReplayer(threads, image_deleter, image_sync_throttler,
local_rados, local_mirror_uuid, local_pool_id);
}
void destroy() {
delete this;
}
InstanceReplayer(Threads<ImageCtxT> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
RadosRef local_rados, const std::string &local_mirror_uuid,
int64_t local_pool_id);
~InstanceReplayer();
int init();
void shut_down();
void init(Context *on_finish);
void shut_down(Context *on_finish);
void set_peers(const Peers &peers);
void acquire_image(const std::string &global_image_id,
const instance_watcher::PeerImageIds &peers,
Context *on_finish);
void release_image(const std::string &global_image_id,
const instance_watcher::PeerImageIds &peers,
bool schedule_delete, Context *on_finish);
void release_all(Context *on_finish);
void print_status(Formatter *f, stringstream *ss);
void start();
void stop();
void restart();
void flush();
private:
/**
* @verbatim
*
* <uninitialized> <-------------------\
* | (init) | (repeat for each
* v STOP_IMAGE_REPLAYER ---\ image replayer)
* SCHEDULE_IMAGE_STATE_CHECK_TASK ^ ^ |
* | | | |
* v (shut_down) | \---------/
* <initialized> -----------------> WAIT_FOR_OPS
*
* @endverbatim
*/
Threads<ImageCtxT> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
RadosRef m_local_rados;
std::string m_local_mirror_uuid;
int64_t m_local_pool_id;
Mutex m_lock;
AsyncOpTracker m_async_op_tracker;
std::map<std::string, ImageReplayer<ImageCtxT> *> m_image_replayers;
Peers m_peers;
Context *m_image_state_check_task = nullptr;
Context *m_on_shut_down = nullptr;
bool m_manual_stop = false;
void wait_for_ops();
void handle_wait_for_ops(int r);
void start_image_replayer(ImageReplayer<ImageCtxT> *image_replayer);
void start_image_replayers();
void stop_image_replayer(ImageReplayer<ImageCtxT> *image_replayer,
Context *on_finish);
void stop_image_replayers();
void handle_stop_image_replayers(int r);
void schedule_image_state_check_task();
void cancel_image_state_check_task();
};
} // namespace mirror
} // namespace rbd
extern template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;
#endif // RBD_MIRROR_INSTANCE_REPLAYER_H

View File

@ -17,6 +17,7 @@
#include "librbd/Utils.h"
#include "librbd/Watcher.h"
#include "librbd/api/Mirror.h"
#include "InstanceReplayer.h"
#include "InstanceWatcher.h"
#include "LeaderWatcher.h"
#include "Replayer.h"
@ -258,6 +259,9 @@ Replayer::~Replayer()
if (m_instance_watcher) {
m_instance_watcher->shut_down();
}
if (m_instance_replayer) {
m_instance_replayer->shut_down();
}
assert(!m_pool_watcher);
}
@ -297,6 +301,15 @@ int Replayer::init()
return r;
}
std::string local_mirror_uuid;
r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
&local_mirror_uuid);
if (r < 0) {
derr << "failed to retrieve local mirror uuid from pool "
<< m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
return r;
}
r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
m_remote_io_ctx);
if (r < 0) {
@ -304,10 +317,15 @@ int Replayer::init()
<< ": " << cpp_strerror(r) << dendl;
return r;
}
m_remote_pool_id = m_remote_io_ctx.get_id();
dout(20) << "connected to " << m_peer << dendl;
m_instance_replayer.reset(
InstanceReplayer<>::create(m_threads, m_image_deleter,
m_image_sync_throttler, m_local_rados,
local_mirror_uuid, m_local_pool_id));
m_instance_replayer->init();
m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
&m_leader_listener));
r = m_leader_watcher->init();
@ -422,27 +440,7 @@ void Replayer::run()
break;
}
for (auto image_it = m_image_replayers.begin();
image_it != m_image_replayers.end(); ) {
if (image_it->second->remote_images_empty()) {
if (stop_image_replayer(image_it->second)) {
image_it = m_image_replayers.erase(image_it);
continue;
}
} else {
start_image_replayer(image_it->second);
}
++image_it;
}
m_cond.WaitInterval(m_lock,
utime_t(g_ceph_context->_conf->
rbd_mirror_image_state_check_interval, 0));
}
Mutex::Locker locker(m_lock);
while (!m_image_replayers.empty()) {
stop_image_replayers();
m_cond.WaitInterval(m_lock, utime_t(1, 0));
}
}
@ -471,14 +469,9 @@ void Replayer::print_status(Formatter *f, stringstream *ss)
}
f->close_section();
}
f->open_array_section("image_replayers");
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
image_replayer->print_status(f, ss);
}
m_instance_replayer->print_status(f, ss);
f->close_section();
f->close_section();
f->flush(*ss);
}
@ -493,12 +486,7 @@ void Replayer::start()
return;
}
m_manual_stop = false;
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
image_replayer->start(nullptr, true);
}
m_instance_replayer->start();
}
void Replayer::stop(bool manual)
@ -514,11 +502,7 @@ void Replayer::stop(bool manual)
return;
}
m_manual_stop = true;
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
image_replayer->stop(nullptr, true);
}
m_instance_replayer->stop();
}
void Replayer::restart()
@ -531,12 +515,7 @@ void Replayer::restart()
return;
}
m_manual_stop = false;
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
image_replayer->restart();
}
m_instance_replayer->restart();
}
void Replayer::flush()
@ -549,10 +528,7 @@ void Replayer::flush()
return;
}
for (auto &kv : m_image_replayers) {
auto &image_replayer = kv.second;
image_replayer->flush();
}
m_instance_replayer->flush();
}
void Replayer::release_leader()
@ -582,6 +558,8 @@ void Replayer::handle_update(const std::string &mirror_uuid,
return;
}
m_instance_replayer->set_peers({{mirror_uuid, m_remote_io_ctx}});
// first callback will be a full directory -- so see if we need to remove
// any local images that no longer exist on the remote side
if (!m_init_image_ids.empty()) {
@ -603,193 +581,27 @@ void Replayer::handle_update(const std::string &mirror_uuid,
m_init_image_ids.clear();
}
// shut down replayers for non-mirrored images
m_update_op_tracker.start_op();
Context *ctx = new FunctionContext([this](int r) {
dout(20) << "complete handle_update: r=" << r << dendl;
m_update_op_tracker.finish_op();
});
C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
for (auto &image_id : removed_image_ids) {
auto image_it = m_image_replayers.find(image_id.global_id);
if (image_it != m_image_replayers.end()) {
image_it->second->remove_remote_image(mirror_uuid, image_id.id);
if (image_it->second->is_running()) {
dout(20) << "stop image replayer for remote image "
<< image_id.id << " (" << image_id.global_id << ")"
<< dendl;
}
if (image_it->second->remote_images_empty() &&
stop_image_replayer(image_it->second)) {
// no additional remotes registered for this image
m_image_replayers.erase(image_it);
}
}
m_instance_replayer->release_image(image_id.global_id,
{{mirror_uuid, image_id.id}}, true,
gather_ctx->new_sub());
}
// prune previously stopped image replayers
for (auto image_it = m_image_replayers.begin();
image_it != m_image_replayers.end(); ) {
if (image_it->second->remote_images_empty() &&
stop_image_replayer(image_it->second)) {
image_it = m_image_replayers.erase(image_it);
} else {
++image_it;
}
}
if (added_image_ids.empty()) {
return;
}
std::string local_mirror_uuid;
int r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
&local_mirror_uuid);
if (r < 0 || local_mirror_uuid.empty()) {
derr << "failed to retrieve local mirror uuid from pool "
<< m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
return;
}
// start replayers for newly added remote image sources
for (auto &image_id : added_image_ids) {
auto it = m_image_replayers.find(image_id.global_id);
if (it == m_image_replayers.end()) {
unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
local_mirror_uuid, m_local_pool_id, image_id.global_id));
if (m_manual_stop) {
image_replayer->stop(nullptr, true);
}
it = m_image_replayers.insert(
std::make_pair(image_id.global_id, std::move(image_replayer))).first;
}
it->second->add_remote_image(mirror_uuid, image_id.id,
m_remote_io_ctx);
if (!it->second->is_running()) {
dout(20) << "starting image replayer for remote image "
<< image_id.global_id << dendl;
}
start_image_replayer(it->second);
}
}
void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
{
assert(m_lock.is_locked());
if (!image_replayer->is_stopped() || image_replayer->remote_images_empty()) {
return;
} else if (image_replayer->is_blacklisted()) {
derr << "blacklisted detected during image replay" << dendl;
m_blacklisted = true;
m_stopping.set(1);
return;
m_instance_replayer->acquire_image(image_id.global_id,
{{mirror_uuid, image_id.id}},
gather_ctx->new_sub());
}
std::string global_image_id = image_replayer->get_global_image_id();
dout(20) << "global_image_id=" << global_image_id << dendl;
FunctionContext *ctx = new FunctionContext(
[this, global_image_id] (int r) {
dout(20) << "image deleter result: r=" << r << ", "
<< "global_image_id=" << global_image_id << dendl;
if (r == -ESTALE || r == -ECANCELED) {
return;
}
Mutex::Locker locker(m_lock);
auto it = m_image_replayers.find(global_image_id);
if (it == m_image_replayers.end()) {
return;
}
auto &image_replayer = it->second;
if (r >= 0) {
image_replayer->start();
} else {
start_image_replayer(image_replayer);
}
}
);
m_image_deleter->wait_for_scheduled_deletion(
m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
}
bool Replayer::stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
{
assert(m_lock.is_locked());
dout(20) << "global_image_id=" << image_replayer->get_global_image_id()
<< dendl;
// TODO: check how long it is stopping and alert if it is too long.
if (image_replayer->is_stopped()) {
m_image_deleter->cancel_waiter(m_local_pool_id,
image_replayer->get_global_image_id());
if (!m_stopping.read() && m_leader_watcher->is_leader()) {
dout(20) << "scheduling delete" << dendl;
m_image_deleter->schedule_image_delete(
m_local_rados,
image_replayer->get_local_pool_id(),
image_replayer->get_local_image_id(),
image_replayer->get_global_image_id());
}
return true;
} else {
if (!m_stopping.read()) {
dout(20) << "scheduling delete after image replayer stopped" << dendl;
}
FunctionContext *ctx = new FunctionContext(
[&image_replayer, this] (int r) {
if (!m_stopping.read() && m_leader_watcher->is_leader() && r >= 0) {
m_image_deleter->schedule_image_delete(
m_local_rados,
image_replayer->get_local_pool_id(),
image_replayer->get_local_image_id(),
image_replayer->get_global_image_id());
}
}
);
image_replayer->stop(ctx);
}
return false;
}
void Replayer::stop_image_replayers() {
dout(20) << dendl;
assert(m_lock.is_locked());
for (auto image_it = m_image_replayers.begin();
image_it != m_image_replayers.end();) {
if (stop_image_replayer(image_it->second)) {
image_it = m_image_replayers.erase(image_it);
continue;
}
++image_it;
}
}
void Replayer::stop_image_replayers(Context *on_finish) {
dout(20) << dendl;
{
Mutex::Locker locker(m_lock);
stop_image_replayers();
if (!m_image_replayers.empty()) {
Context *ctx = new FunctionContext([this, on_finish](int r) {
assert(r == 0);
stop_image_replayers(on_finish);
});
ctx = create_async_context_callback(m_threads->work_queue, ctx);
Mutex::Locker timer_locker(m_threads->timer_lock);
m_threads->timer->add_event_after(1, ctx);
return;
}
}
on_finish->complete(0);
gather_ctx->activate();
}
void Replayer::handle_post_acquire_leader(Context *on_finish) {
@ -871,8 +683,29 @@ void Replayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {
assert(m_pool_watcher);
m_pool_watcher.reset();
}
wait_for_update_ops(on_finish);
}
stop_image_replayers(on_finish);
void Replayer::wait_for_update_ops(Context *on_finish) {
dout(20) << dendl;
Mutex::Locker locker(m_lock);
Context *ctx = new FunctionContext([this, on_finish](int r) {
handle_wait_for_update_ops(r, on_finish);
});
ctx = create_async_context_callback(m_threads->work_queue, ctx);
m_update_op_tracker.wait_for_ops(ctx);
}
void Replayer::handle_wait_for_update_ops(int r, Context *on_finish) {
dout(20) << "r=" << r << dendl;
assert(r == 0);
Mutex::Locker locker(m_lock);
m_instance_replayer->release_all(on_finish);
}
} // namespace mirror

View File

@ -9,6 +9,7 @@
#include <set>
#include <string>
#include "common/AsyncOpTracker.h"
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/WorkQueue.h"
@ -16,7 +17,6 @@
#include "include/rados/librados.hpp"
#include "ClusterWatcher.h"
#include "ImageReplayer.h"
#include "LeaderWatcher.h"
#include "PoolWatcher.h"
#include "ImageDeleter.h"
@ -30,6 +30,7 @@ namespace rbd {
namespace mirror {
template <typename> struct Threads;
template <typename> class InstanceReplayer;
template <typename> class InstanceWatcher;
/**
@ -79,11 +80,6 @@ private:
const ImageIds &added_image_ids,
const ImageIds &removed_image_ids);
void start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
bool stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
void stop_image_replayers();
void stop_image_replayers(Context *on_finish);
int init_rados(const std::string &cluster_name,
const std::string &client_name,
const std::string &description, RadosRef *rados_ref);
@ -99,6 +95,9 @@ private:
void shut_down_pool_watcher(Context *on_finish);
void handle_shut_down_pool_watcher(int r, Context *on_finish);
void wait_for_update_ops(Context *on_finish);
void handle_wait_for_update_ops(int r, Context *on_finish);
Threads<librbd::ImageCtx> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<> m_image_sync_throttler;
@ -117,12 +116,11 @@ private:
librados::IoCtx m_remote_io_ctx;
int64_t m_local_pool_id = -1;
int64_t m_remote_pool_id = -1;
PoolWatcherListener m_pool_watcher_listener;
std::unique_ptr<PoolWatcher<> > m_pool_watcher;
std::map<std::string, std::unique_ptr<ImageReplayer<> > > m_image_replayers;
std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
std::string m_asok_hook_name;
AdminSocketHook *m_asok_hook;
@ -159,6 +157,7 @@ private:
std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
std::unique_ptr<InstanceWatcher<librbd::ImageCtx> > m_instance_watcher;
AsyncOpTracker m_update_op_tracker;
};
} // namespace mirror

View File

@ -0,0 +1,28 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "Types.h"
#include "common/Formatter.h"
namespace rbd {
namespace mirror {
namespace instance_watcher {
void PeerImageId::encode(bufferlist &bl) const {
::encode(mirror_uuid, bl);
::encode(image_id, bl);
}
void PeerImageId::decode(bufferlist::iterator &iter) {
::decode(mirror_uuid, iter);
::decode(image_id, iter);
}
void PeerImageId::dump(Formatter *f) const {
f->dump_string("mirror_uuid", mirror_uuid);
f->dump_string("image_id", image_id);
}
} // namespace instance_watcher
} // namespace mirror
} // namespace rbd

View File

@ -0,0 +1,46 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef RBD_MIRROR_INSTANCE_WATCHER_TYPES_H
#define RBD_MIRROR_INSTANCE_WATCHER_TYPES_H
#include <string>
#include <set>
#include <boost/variant.hpp>
#include "include/buffer_fwd.h"
#include "include/encoding.h"
#include "include/int_types.h"
namespace ceph { class Formatter; }
namespace rbd {
namespace mirror {
namespace instance_watcher {
struct PeerImageId {
std::string mirror_uuid;
std::string image_id;
inline bool operator<(const PeerImageId &rhs) const {
return mirror_uuid < rhs.mirror_uuid;
}
inline bool operator==(const PeerImageId &rhs) const {
return (mirror_uuid == rhs.mirror_uuid && image_id == rhs.image_id);
}
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& it);
void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(PeerImageId);
typedef std::set<PeerImageId> PeerImageIds;
} // namespace instance_watcher
} // namespace mirror
} // namespace librbd
#endif // RBD_MIRROR_INSTANCE_WATCHER_TYPES_H

View File

@ -51,6 +51,57 @@ std::ostream &operator<<(std::ostream &, const ImageId &image_id);
typedef std::set<ImageId> ImageIds;
struct Peer {
std::string mirror_uuid;
librados::IoCtx io_ctx;
Peer() {
}
Peer(const std::string &mirror_uuid) : mirror_uuid(mirror_uuid) {
}
Peer(const std::string &mirror_uuid, librados::IoCtx &io_ctx)
: mirror_uuid(mirror_uuid), io_ctx(io_ctx) {
}
inline bool operator<(const Peer &rhs) const {
return mirror_uuid < rhs.mirror_uuid;
}
inline bool operator==(const Peer &rhs) const {
return mirror_uuid == rhs.mirror_uuid;
}
};
typedef std::set<Peer> Peers;
struct PeerImage : public Peer {
std::string image_id;
PeerImage() {
}
PeerImage(const std::string &mirror_uuid, const std::string &image_id)
: Peer(mirror_uuid), image_id(image_id) {
}
PeerImage(const std::string &mirror_uuid, librados::IoCtx &io_ctx,
const std::string &image_id)
: Peer(mirror_uuid, io_ctx), image_id(image_id) {
}
inline bool operator<(const PeerImage &rhs) const {
if (mirror_uuid != rhs.mirror_uuid) {
return mirror_uuid < rhs.mirror_uuid;
} else {
return image_id < rhs.image_id;
}
}
inline bool operator==(const PeerImage &rhs) const {
return (mirror_uuid == rhs.mirror_uuid && image_id == rhs.image_id);
}
};
typedef std::set<PeerImage> PeerImages;
struct peer_t {
peer_t() = default;
peer_t(const std::string &uuid, const std::string &cluster_name,