rbd-mirror: allocate local journal tags from mirror remote peer tags

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2016-03-27 23:00:33 -04:00
parent bdee02cb57
commit 276e2eb21b
6 changed files with 129 additions and 34 deletions

View File

@ -17,7 +17,12 @@ struct MockJournal {
MOCK_METHOD1(wait_for_journal_ready, void(Context *));
MOCK_CONST_METHOD0(is_tag_owner, bool());
MOCK_METHOD2(allocate_tag, void(const std::string &, Context *));
MOCK_METHOD6(allocate_tag, void(const std::string &mirror_uuid,
const std::string &predecessor_mirror_uuid,
bool predecessor_commit_valid,
uint64_t predecessor_tag_tid,
uint64_t predecessor_entry_tid,
Context *on_finish));
MOCK_METHOD1(open, void(Context *));
MOCK_METHOD1(close, void(Context *));

View File

@ -66,7 +66,7 @@ struct MockJournaler {
std::set<cls::journal::Client> *,
Context*));
MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *));
MOCK_METHOD2(try_pop_front, bool(MockReplayEntryProxy *, uint64_t *));
MOCK_METHOD2(start_live_replay, void(ReplayHandler *, double));
MOCK_METHOD0(stop_replay, void());
@ -74,6 +74,8 @@ struct MockJournaler {
MOCK_METHOD1(flush_commit_position, void(Context*));
MOCK_METHOD2(update_client, void(const bufferlist&, Context *on_safe));
MOCK_METHOD3(get_tag, void(uint64_t, cls::journal::Tag *, Context *));
};
struct MockJournalerProxy {
@ -100,8 +102,8 @@ struct MockJournalerProxy {
on_finish);
}
bool try_pop_front(MockReplayEntryProxy *entry) {
return MockJournaler::get_instance().try_pop_front(entry);
bool try_pop_front(MockReplayEntryProxy *entry, uint64_t *tag_tid) {
return MockJournaler::get_instance().try_pop_front(entry, tag_tid);
}
void start_live_replay(ReplayHandler *handler, double interval) {
MockJournaler::get_instance().start_live_replay(handler, interval);
@ -120,6 +122,10 @@ struct MockJournalerProxy {
void update_client(const bufferlist& data, Context *on_safe) {
MockJournaler::get_instance().update_client(data, on_safe);
}
void get_tag(uint64_t tag_tid, cls::journal::Tag *tag, Context *on_finish) {
MockJournaler::get_instance().get_tag(tag_tid, tag, on_finish);
}
};
std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &);

View File

@ -71,7 +71,7 @@ public:
}
};
TestImageReplayer() : m_client_id("TestImageReplayer"), m_watch_handle(0)
TestImageReplayer() : m_watch_handle(0)
{
EXPECT_EQ("", connect_cluster_pp(m_local_cluster));
@ -116,8 +116,8 @@ public:
m_replayer = new ImageReplayerT(m_threads,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
m_client_id, m_local_ioctx.get_id(), m_remote_pool_id, m_remote_image_id,
"global image id");
m_local_mirror_uuid, m_remote_mirror_uuid, m_local_ioctx.get_id(),
m_remote_pool_id, m_remote_image_id, "global image id");
}
void start(rbd::mirror::ImageReplayer<>::BootstrapParams *bootstap_params =
@ -199,7 +199,7 @@ public:
cls::journal::ObjectPosition *mirror_position)
{
std::string master_client_id = "";
std::string mirror_client_id = m_client_id;
std::string mirror_client_id = m_local_mirror_uuid;
C_SaferCond cond;
uint64_t minimum_set;
@ -324,7 +324,8 @@ public:
rbd::mirror::Threads *m_threads = nullptr;
librados::Rados m_local_cluster, m_remote_cluster;
std::string m_client_id;
std::string m_local_mirror_uuid = "local mirror uuid";
std::string m_remote_mirror_uuid = "remote mirror uuid";
std::string m_local_pool_name, m_remote_pool_name;
librados::IoCtx m_local_ioctx, m_remote_ioctx;
std::string m_image_name;
@ -531,12 +532,15 @@ class ImageReplayer : public rbd::mirror::ImageReplayer<> {
public:
ImageReplayer(rbd::mirror::Threads *threads,
rbd::mirror::RadosRef local, rbd::mirror::RadosRef remote,
const std::string &client_id, int64_t local_pool_id,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
int64_t local_pool_id,
int64_t remote_pool_id, const std::string &remote_image_id,
const std::string &global_image_id)
: rbd::mirror::ImageReplayer<>(threads, local, remote, client_id,
local_pool_id, remote_pool_id,
remote_image_id, global_image_id)
: rbd::mirror::ImageReplayer<>(threads, local, remote, local_mirror_uuid,
remote_mirror_uuid, local_pool_id,
remote_pool_id, remote_image_id,
global_image_id)
{}
void set_error(const std::string &state, int r) {

View File

@ -156,7 +156,8 @@ private:
template <typename I>
ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
const std::string &mirror_uuid,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
int64_t local_pool_id,
int64_t remote_pool_id,
const std::string &remote_image_id,
@ -164,7 +165,8 @@ ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remot
m_threads(threads),
m_local(local),
m_remote(remote),
m_mirror_uuid(mirror_uuid),
m_local_mirror_uuid(local_mirror_uuid),
m_remote_mirror_uuid(remote_mirror_uuid),
m_remote_pool_id(remote_pool_id),
m_local_pool_id(local_pool_id),
m_remote_image_id(remote_image_id),
@ -234,7 +236,7 @@ void ImageReplayer<I>::start(Context *on_finish,
m_remote_journaler = new Journaler(m_threads->work_queue,
m_threads->timer,
&m_threads->timer_lock, m_remote_ioctx,
m_remote_image_id, m_mirror_uuid,
m_remote_image_id, m_local_mirror_uuid,
commit_interval);
bootstrap();
@ -252,7 +254,7 @@ void ImageReplayer<I>::bootstrap() {
m_local_ioctx, m_remote_ioctx, &m_local_image_ctx,
m_local_image_name, m_remote_image_id, m_global_image_id,
m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
m_mirror_uuid, m_remote_journaler, &m_client_meta, ctx);
m_local_mirror_uuid, m_remote_journaler, &m_client_meta, ctx);
request->send();
}
@ -497,6 +499,8 @@ void ImageReplayer<I>::on_stop_journal_replay_shut_down_finish(int r)
assert(m_state == STATE_STOPPING);
m_local_image_ctx->journal->stop_external_replay();
m_local_replay = nullptr;
m_replay_entry = ReplayEntry();
m_replay_tag_valid = false;
}
on_stop_local_image_close_start();
@ -569,12 +573,16 @@ void ImageReplayer<I>::handle_replay_ready()
return;
}
if (!m_remote_journaler->try_pop_front(&m_replay_entry)) {
if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
return;
}
// TODO
process_entry();
if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
process_entry();
return;
}
replay_flush();
}
template <typename I>
@ -592,6 +600,7 @@ void ImageReplayer<I>::flush(Context *on_finish)
}
});
on_flush_local_replay_flush_start(ctx);
return;
}
}
@ -704,42 +713,99 @@ template <typename I>
void ImageReplayer<I>::replay_flush() {
dout(20) << dendl;
// TODO
Context *ctx = create_context_callback<
ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
flush(ctx);
}
template <typename I>
void ImageReplayer<I>::handle_replay_flush(int r) {
dout(20) << "r=" << r << dendl;
// TODO
if (r < 0) {
derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
handle_replay_complete(r);
return;
}
get_remote_tag();
}
template <typename I>
void ImageReplayer<I>::get_remote_tag() {
dout(20) << dendl;
dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
// TODO
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
}
template <typename I>
void ImageReplayer<I>::handle_get_remote_tag(int r) {
dout(20) << "r=" << r << dendl;
// TODO
if (r == 0) {
try {
bufferlist::iterator it = m_replay_tag.data.begin();
::decode(m_replay_tag_data, it);
} catch (const buffer::error &err) {
r = -EBADMSG;
}
}
if (r < 0) {
derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
<< cpp_strerror(r) << dendl;
handle_replay_complete(r);
return;
}
m_replay_tag_valid = true;
dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
<< m_replay_tag_data << dendl;
allocate_local_tag();
}
template <typename I>
void ImageReplayer<I>::allocate_local_tag() {
dout(20) << dendl;
// TODO
std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
mirror_uuid == m_local_mirror_uuid) {
mirror_uuid = m_remote_mirror_uuid;
}
std::string predecessor_mirror_uuid =
m_replay_tag_data.predecessor_mirror_uuid;
if (predecessor_mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
mirror_uuid = m_remote_mirror_uuid;
} else if (predecessor_mirror_uuid == m_local_mirror_uuid) {
predecessor_mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
}
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
m_local_image_ctx->journal->allocate_tag(
mirror_uuid, predecessor_mirror_uuid,
m_replay_tag_data.predecessor_commit_valid,
m_replay_tag_data.predecessor_tag_tid,
m_replay_tag_data.predecessor_entry_tid,
ctx);
}
template <typename I>
void ImageReplayer<I>::handle_allocate_local_tag(int r) {
dout(20) << "r=" << r << dendl;
// TODO
if (r < 0) {
derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
handle_replay_complete(r);
return;
}
process_entry();
}
template <typename I>

View File

@ -67,7 +67,8 @@ public:
};
ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
const std::string &mirror_uuid, int64_t local_pool_id,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid, int64_t local_pool_id,
int64_t remote_pool_id, const std::string &remote_image_id,
const std::string &global_image_id);
virtual ~ImageReplayer();
@ -181,7 +182,8 @@ private:
Threads *m_threads;
RadosRef m_local, m_remote;
std::string m_mirror_uuid;
std::string m_local_mirror_uuid;
std::string m_remote_mirror_uuid;
int64_t m_remote_pool_id, m_local_pool_id;
std::string m_remote_image_id, m_local_image_id, m_global_image_id;
std::string m_local_image_name;
@ -203,6 +205,10 @@ private:
librbd::journal::MirrorPeerClientMeta m_client_meta;
ReplayEntry m_replay_entry;
bool m_replay_tag_valid = false;
uint64_t m_replay_tag_tid = 0;
cls::journal::Tag m_replay_tag;
librbd::journal::TagData m_replay_tag_data;
struct C_ReplayCommitted : public Context {
ImageReplayer *replayer;

View File

@ -319,22 +319,30 @@ void Replayer::set_sources(const PoolImageIds &pool_image_ids)
continue;
}
std::string mirror_uuid;
r = librbd::cls_client::mirror_uuid_get(&local_ioctx, &mirror_uuid);
std::string local_mirror_uuid;
r = librbd::cls_client::mirror_uuid_get(&local_ioctx, &local_mirror_uuid);
if (r < 0) {
derr << "failed to retrieve mirror uuid from pool "
derr << "failed to retrieve local mirror uuid from pool "
<< local_ioctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
continue;
}
std::string remote_mirror_uuid;
r = librbd::cls_client::mirror_uuid_get(&remote_ioctx, &remote_mirror_uuid);
if (r < 0) {
derr << "failed to retrieve remote mirror uuid from pool "
<< remote_ioctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
continue;
}
// create entry for pool if it doesn't exist
auto &pool_replayers = m_images[pool_id];
for (const auto &image_id : kv.second) {
auto it = pool_replayers.find(image_id.id);
if (it == pool_replayers.end()) {
unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
m_threads, m_local, m_remote, mirror_uuid, local_ioctx.get_id(),
pool_id, image_id.id, image_id.global_id));
m_threads, m_local, m_remote, local_mirror_uuid, remote_mirror_uuid,
local_ioctx.get_id(), pool_id, image_id.id, image_id.global_id));
it = pool_replayers.insert(
std::make_pair(image_id.id, std::move(image_replayer))).first;
}