diff --git a/src/test/librbd/mock/MockJournal.h b/src/test/librbd/mock/MockJournal.h index 1393399d555..a4637a4c6c5 100644 --- a/src/test/librbd/mock/MockJournal.h +++ b/src/test/librbd/mock/MockJournal.h @@ -17,7 +17,12 @@ struct MockJournal { MOCK_METHOD1(wait_for_journal_ready, void(Context *)); MOCK_CONST_METHOD0(is_tag_owner, bool()); - MOCK_METHOD2(allocate_tag, void(const std::string &, Context *)); + MOCK_METHOD6(allocate_tag, void(const std::string &mirror_uuid, + const std::string &predecessor_mirror_uuid, + bool predecessor_commit_valid, + uint64_t predecessor_tag_tid, + uint64_t predecessor_entry_tid, + Context *on_finish)); MOCK_METHOD1(open, void(Context *)); MOCK_METHOD1(close, void(Context *)); diff --git a/src/test/rbd_mirror/mock/MockJournaler.h b/src/test/rbd_mirror/mock/MockJournaler.h index 7279d26586d..5f08c12d82e 100644 --- a/src/test/rbd_mirror/mock/MockJournaler.h +++ b/src/test/rbd_mirror/mock/MockJournaler.h @@ -66,7 +66,7 @@ struct MockJournaler { std::set *, Context*)); - MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *)); + MOCK_METHOD2(try_pop_front, bool(MockReplayEntryProxy *, uint64_t *)); MOCK_METHOD2(start_live_replay, void(ReplayHandler *, double)); MOCK_METHOD0(stop_replay, void()); @@ -74,6 +74,8 @@ struct MockJournaler { MOCK_METHOD1(flush_commit_position, void(Context*)); MOCK_METHOD2(update_client, void(const bufferlist&, Context *on_safe)); + + MOCK_METHOD3(get_tag, void(uint64_t, cls::journal::Tag *, Context *)); }; struct MockJournalerProxy { @@ -100,8 +102,8 @@ struct MockJournalerProxy { on_finish); } - bool try_pop_front(MockReplayEntryProxy *entry) { - return MockJournaler::get_instance().try_pop_front(entry); + bool try_pop_front(MockReplayEntryProxy *entry, uint64_t *tag_tid) { + return MockJournaler::get_instance().try_pop_front(entry, tag_tid); } void start_live_replay(ReplayHandler *handler, double interval) { MockJournaler::get_instance().start_live_replay(handler, interval); @@ -120,6 +122,10 @@ struct MockJournalerProxy { void update_client(const bufferlist& data, Context *on_safe) { MockJournaler::get_instance().update_client(data, on_safe); } + + void get_tag(uint64_t tag_tid, cls::journal::Tag *tag, Context *on_finish) { + MockJournaler::get_instance().get_tag(tag_tid, tag, on_finish); + } }; std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &); diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index d5de0284361..18f1441682a 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -71,7 +71,7 @@ public: } }; - TestImageReplayer() : m_client_id("TestImageReplayer"), m_watch_handle(0) + TestImageReplayer() : m_watch_handle(0) { EXPECT_EQ("", connect_cluster_pp(m_local_cluster)); @@ -116,8 +116,8 @@ public: m_replayer = new ImageReplayerT(m_threads, rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)), rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)), - m_client_id, m_local_ioctx.get_id(), m_remote_pool_id, m_remote_image_id, - "global image id"); + m_local_mirror_uuid, m_remote_mirror_uuid, m_local_ioctx.get_id(), + m_remote_pool_id, m_remote_image_id, "global image id"); } void start(rbd::mirror::ImageReplayer<>::BootstrapParams *bootstap_params = @@ -199,7 +199,7 @@ public: cls::journal::ObjectPosition *mirror_position) { std::string master_client_id = ""; - std::string mirror_client_id = m_client_id; + std::string mirror_client_id = m_local_mirror_uuid; C_SaferCond cond; uint64_t minimum_set; @@ -324,7 +324,8 @@ public: rbd::mirror::Threads *m_threads = nullptr; librados::Rados m_local_cluster, m_remote_cluster; - std::string m_client_id; + std::string m_local_mirror_uuid = "local mirror uuid"; + std::string m_remote_mirror_uuid = "remote mirror uuid"; std::string m_local_pool_name, m_remote_pool_name; librados::IoCtx m_local_ioctx, m_remote_ioctx; std::string m_image_name; @@ -531,12 +532,15 @@ class ImageReplayer : public rbd::mirror::ImageReplayer<> { public: ImageReplayer(rbd::mirror::Threads *threads, rbd::mirror::RadosRef local, rbd::mirror::RadosRef remote, - const std::string &client_id, int64_t local_pool_id, + const std::string &local_mirror_uuid, + const std::string &remote_mirror_uuid, + int64_t local_pool_id, int64_t remote_pool_id, const std::string &remote_image_id, const std::string &global_image_id) - : rbd::mirror::ImageReplayer<>(threads, local, remote, client_id, - local_pool_id, remote_pool_id, - remote_image_id, global_image_id) + : rbd::mirror::ImageReplayer<>(threads, local, remote, local_mirror_uuid, + remote_mirror_uuid, local_pool_id, + remote_pool_id, remote_image_id, + global_image_id) {} void set_error(const std::string &state, int r) { diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 14564cf206f..9f9830b8c1b 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -156,7 +156,8 @@ private: template ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, - const std::string &mirror_uuid, + const std::string &local_mirror_uuid, + const std::string &remote_mirror_uuid, int64_t local_pool_id, int64_t remote_pool_id, const std::string &remote_image_id, @@ -164,7 +165,8 @@ ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remot m_threads(threads), m_local(local), m_remote(remote), - m_mirror_uuid(mirror_uuid), + m_local_mirror_uuid(local_mirror_uuid), + m_remote_mirror_uuid(remote_mirror_uuid), m_remote_pool_id(remote_pool_id), m_local_pool_id(local_pool_id), m_remote_image_id(remote_image_id), @@ -234,7 +236,7 @@ void ImageReplayer::start(Context *on_finish, m_remote_journaler = new Journaler(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, m_remote_ioctx, - m_remote_image_id, m_mirror_uuid, + m_remote_image_id, m_local_mirror_uuid, commit_interval); bootstrap(); @@ -252,7 +254,7 @@ void ImageReplayer::bootstrap() { m_local_ioctx, m_remote_ioctx, &m_local_image_ctx, m_local_image_name, m_remote_image_id, m_global_image_id, m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, - m_mirror_uuid, m_remote_journaler, &m_client_meta, ctx); + m_local_mirror_uuid, m_remote_journaler, &m_client_meta, ctx); request->send(); } @@ -497,6 +499,8 @@ void ImageReplayer::on_stop_journal_replay_shut_down_finish(int r) assert(m_state == STATE_STOPPING); m_local_image_ctx->journal->stop_external_replay(); m_local_replay = nullptr; + m_replay_entry = ReplayEntry(); + m_replay_tag_valid = false; } on_stop_local_image_close_start(); @@ -569,12 +573,16 @@ void ImageReplayer::handle_replay_ready() return; } - if (!m_remote_journaler->try_pop_front(&m_replay_entry)) { + if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) { return; } - // TODO - process_entry(); + if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) { + process_entry(); + return; + } + + replay_flush(); } template @@ -592,6 +600,7 @@ void ImageReplayer::flush(Context *on_finish) } }); on_flush_local_replay_flush_start(ctx); + return; } } @@ -704,42 +713,99 @@ template void ImageReplayer::replay_flush() { dout(20) << dendl; - // TODO + Context *ctx = create_context_callback< + ImageReplayer, &ImageReplayer::handle_replay_flush>(this); + flush(ctx); } template void ImageReplayer::handle_replay_flush(int r) { dout(20) << "r=" << r << dendl; - // TODO + if (r < 0) { + derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl; + handle_replay_complete(r); + return; + } + + get_remote_tag(); } template void ImageReplayer::get_remote_tag() { - dout(20) << dendl; + dout(20) << "tag_tid: " << m_replay_tag_tid << dendl; - // TODO + Context *ctx = create_context_callback< + ImageReplayer, &ImageReplayer::handle_get_remote_tag>(this); + m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx); } template void ImageReplayer::handle_get_remote_tag(int r) { dout(20) << "r=" << r << dendl; - // TODO + if (r == 0) { + try { + bufferlist::iterator it = m_replay_tag.data.begin(); + ::decode(m_replay_tag_data, it); + } catch (const buffer::error &err) { + r = -EBADMSG; + } + } + + if (r < 0) { + derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": " + << cpp_strerror(r) << dendl; + handle_replay_complete(r); + return; + } + + m_replay_tag_valid = true; + dout(20) << "decoded remote tag " << m_replay_tag_tid << ": " + << m_replay_tag_data << dendl; + + allocate_local_tag(); } template void ImageReplayer::allocate_local_tag() { dout(20) << dendl; - // TODO + std::string mirror_uuid = m_replay_tag_data.mirror_uuid; + if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID || + mirror_uuid == m_local_mirror_uuid) { + mirror_uuid = m_remote_mirror_uuid; + } + + std::string predecessor_mirror_uuid = + m_replay_tag_data.predecessor_mirror_uuid; + if (predecessor_mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { + mirror_uuid = m_remote_mirror_uuid; + } else if (predecessor_mirror_uuid == m_local_mirror_uuid) { + predecessor_mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; + } + + Context *ctx = create_context_callback< + ImageReplayer, &ImageReplayer::handle_allocate_local_tag>(this); + m_local_image_ctx->journal->allocate_tag( + mirror_uuid, predecessor_mirror_uuid, + m_replay_tag_data.predecessor_commit_valid, + m_replay_tag_data.predecessor_tag_tid, + m_replay_tag_data.predecessor_entry_tid, + ctx); } template void ImageReplayer::handle_allocate_local_tag(int r) { dout(20) << "r=" << r << dendl; - // TODO + if (r < 0) { + derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl; + handle_replay_complete(r); + return; + } + + process_entry(); } template diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 9073a5b69ed..17315add7dc 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -67,7 +67,8 @@ public: }; ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, - const std::string &mirror_uuid, int64_t local_pool_id, + const std::string &local_mirror_uuid, + const std::string &remote_mirror_uuid, int64_t local_pool_id, int64_t remote_pool_id, const std::string &remote_image_id, const std::string &global_image_id); virtual ~ImageReplayer(); @@ -181,7 +182,8 @@ private: Threads *m_threads; RadosRef m_local, m_remote; - std::string m_mirror_uuid; + std::string m_local_mirror_uuid; + std::string m_remote_mirror_uuid; int64_t m_remote_pool_id, m_local_pool_id; std::string m_remote_image_id, m_local_image_id, m_global_image_id; std::string m_local_image_name; @@ -203,6 +205,10 @@ private: librbd::journal::MirrorPeerClientMeta m_client_meta; ReplayEntry m_replay_entry; + bool m_replay_tag_valid = false; + uint64_t m_replay_tag_tid = 0; + cls::journal::Tag m_replay_tag; + librbd::journal::TagData m_replay_tag_data; struct C_ReplayCommitted : public Context { ImageReplayer *replayer; diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 7935d269162..234a19cd1cb 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -319,22 +319,30 @@ void Replayer::set_sources(const PoolImageIds &pool_image_ids) continue; } - std::string mirror_uuid; - r = librbd::cls_client::mirror_uuid_get(&local_ioctx, &mirror_uuid); + std::string local_mirror_uuid; + r = librbd::cls_client::mirror_uuid_get(&local_ioctx, &local_mirror_uuid); if (r < 0) { - derr << "failed to retrieve mirror uuid from pool " + derr << "failed to retrieve local mirror uuid from pool " << local_ioctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; continue; } + std::string remote_mirror_uuid; + r = librbd::cls_client::mirror_uuid_get(&remote_ioctx, &remote_mirror_uuid); + if (r < 0) { + derr << "failed to retrieve remote mirror uuid from pool " + << remote_ioctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; + continue; + } + // create entry for pool if it doesn't exist auto &pool_replayers = m_images[pool_id]; for (const auto &image_id : kv.second) { auto it = pool_replayers.find(image_id.id); if (it == pool_replayers.end()) { unique_ptr > image_replayer(new ImageReplayer<>( - m_threads, m_local, m_remote, mirror_uuid, local_ioctx.get_id(), - pool_id, image_id.id, image_id.global_id)); + m_threads, m_local, m_remote, local_mirror_uuid, remote_mirror_uuid, + local_ioctx.get_id(), pool_id, image_id.id, image_id.global_id)); it = pool_replayers.insert( std::make_pair(image_id.id, std::move(image_replayer))).first; }