From 330dba00ba3153ba2862eef52714e0dceae05192 Mon Sep 17 00:00:00 2001 From: Mykola Golub Date: Wed, 27 Jul 2016 13:45:32 +0300 Subject: [PATCH] rbd-mirror: stop replay when client is disconnected Signed-off-by: Mykola Golub --- qa/workunits/rbd/rbd_mirror.sh | 47 +++++++++++++ qa/workunits/rbd/rbd_mirror_helpers.sh | 38 +++++++++-- src/test/journal/mock/MockJournaler.h | 8 ++- src/test/rbd_mirror/test_ImageReplayer.cc | 68 +++++++++++++++++++ src/tools/rbd_mirror/ImageReplayer.cc | 67 ++++++++++++++++-- src/tools/rbd_mirror/ImageReplayer.h | 15 +++- .../image_replayer/BootstrapRequest.cc | 6 ++ 7 files changed, 234 insertions(+), 15 deletions(-) diff --git a/qa/workunits/rbd/rbd_mirror.sh b/qa/workunits/rbd/rbd_mirror.sh index f071c64e12e..71f8d7efae1 100755 --- a/qa/workunits/rbd/rbd_mirror.sh +++ b/qa/workunits/rbd/rbd_mirror.sh @@ -256,4 +256,51 @@ wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image} test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position' compare_images ${POOL} ${image} +testlog "TEST: client disconnect" +image=laggy +create_image ${CLUSTER2} ${POOL} ${image} 128 --journal-object-size 64K +write_image ${CLUSTER2} ${POOL} ${image} 10 + +testlog " - replay stopped after disconnect" +wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image} +wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image} +test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})" +disconnect_image ${CLUSTER2} ${POOL} ${image} +test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})" +wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image} +test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected' + +testlog " - replay started after resync requested" +request_resync_image ${CLUSTER1} ${POOL} ${image} +wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted' +wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image} +wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image} +test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})" +compare_images ${POOL} ${image} + +testlog " - disconnected after max_concurrent_object_sets reached" +admin_daemon ${CLUSTER1} rbd mirror stop ${POOL}/${image} +wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image} +test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})" +set_image_meta ${CLUSTER2} ${POOL} ${image} \ + conf_rbd_journal_max_concurrent_object_sets 1 +write_image ${CLUSTER2} ${POOL} ${image} 20 16384 +write_image ${CLUSTER2} ${POOL} ${image} 20 16384 +test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})" +set_image_meta ${CLUSTER2} ${POOL} ${image} \ + conf_rbd_journal_max_concurrent_object_sets 0 + +testlog " - replay is still stopped (disconnected) after restart" +admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image} +wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image} +test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected' + +testlog " - replay started after resync requested" +request_resync_image ${CLUSTER1} ${POOL} ${image} +wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted' +wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image} +wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image} +test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})" +compare_images ${POOL} ${image} + echo OK diff --git a/qa/workunits/rbd/rbd_mirror_helpers.sh b/qa/workunits/rbd/rbd_mirror_helpers.sh index 8a362a16fbc..2b3e5807394 100755 --- a/qa/workunits/rbd/rbd_mirror_helpers.sh +++ b/qa/workunits/rbd/rbd_mirror_helpers.sh @@ -418,7 +418,7 @@ get_position() local status_log=${TEMPDIR}/${CLUSTER2}-${pool}-${image}.status rbd --cluster ${cluster} -p ${pool} journal status --image ${image} | tee ${status_log} >&2 - sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*$/\1/p' \ + sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*state=connected.*$/\1/p' \ ${status_log} } @@ -488,13 +488,30 @@ test_status_in_pool_dir() } create_image() +{ + local cluster=$1 ; shift + local pool=$1 ; shift + local image=$1 ; shift + local size=128 + + if [ -n "$1" ]; then + size=$1 + shift + fi + + rbd --cluster ${cluster} -p ${pool} create --size ${size} \ + --image-feature layering,exclusive-lock,journaling $@ ${image} +} + +set_image_meta() { local cluster=$1 local pool=$2 local image=$3 + local key=$4 + local val=$5 - rbd --cluster ${cluster} -p ${pool} create --size 128 \ - --image-feature layering,exclusive-lock,journaling ${image} + rbd --cluster ${cluster} -p ${pool} image-meta set ${image} $key $val } remove_image() @@ -532,6 +549,16 @@ clone_image() ${clone_pool}/${clone_image} --image-feature layering,exclusive-lock,journaling } +disconnect_image() +{ + local cluster=$1 + local pool=$2 + local image=$3 + + rbd --cluster ${cluster} -p ${pool} journal client disconnect \ + --image ${image} +} + create_snapshot() { local cluster=$1 @@ -614,9 +641,12 @@ write_image() local pool=$2 local image=$3 local count=$4 + local size=$5 + + test -n "${size}" || size=4096 rbd --cluster ${cluster} -p ${pool} bench-write ${image} \ - --io-size 4096 --io-threads 1 --io-total $((4096 * count)) \ + --io-size ${size} --io-threads 1 --io-total $((size * count)) \ --io-pattern rand } diff --git a/src/test/journal/mock/MockJournaler.h b/src/test/journal/mock/MockJournaler.h index f4eceebe8e6..54867666cd9 100644 --- a/src/test/journal/mock/MockJournaler.h +++ b/src/test/journal/mock/MockJournaler.h @@ -102,6 +102,7 @@ struct MockJournaler { Context*)); MOCK_METHOD2(register_client, void(const bufferlist &, Context *)); + MOCK_METHOD1(unregister_client, void(Context *)); MOCK_METHOD3(get_client, void(const std::string &, cls::journal::Client *, Context *)); MOCK_METHOD2(get_cached_client, int(const std::string&, cls::journal::Client*)); @@ -159,9 +160,6 @@ struct MockJournalerProxy { int register_client(const bufferlist &data) { return -EINVAL; } - void unregister_client(Context *ctx) { - ctx->complete(-EINVAL); - } void allocate_tag(uint64_t, const bufferlist &, cls::journal::Tag*, Context *on_finish) { @@ -196,6 +194,10 @@ struct MockJournalerProxy { MockJournaler::get_instance().register_client(data, on_finish); } + void unregister_client(Context *on_finish) { + MockJournaler::get_instance().unregister_client(on_finish); + } + void get_client(const std::string &client_id, cls::journal::Client *client, Context *on_finish) { MockJournaler::get_instance().get_client(client_id, client, on_finish); diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index 94ce1ef3f72..49592c3774b 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -233,6 +233,9 @@ public: std::set::const_iterator c; for (c = registered_clients.begin(); c != registered_clients.end(); c++) { std::cout << __func__ << ": client: " << *c << std::endl; + if (c->state != cls::journal::CLIENT_STATE_CONNECTED) { + continue; + } cls::journal::ObjectPositions object_positions = c->commit_position.object_positions; cls::journal::ObjectPositions::const_iterator p = @@ -822,3 +825,68 @@ TEST_F(TestImageReplayer, MultipleReplayFailures_MultiEpoch) { close_image(ictx); } +TEST_F(TestImageReplayer, Disconnect) +{ + bootstrap(); + + // Test start fails if disconnected + + librbd::ImageCtx *ictx; + + generate_test_data(); + open_remote_image(&ictx); + for (int i = 0; i < TEST_IO_COUNT; ++i) { + write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE); + } + flush(ictx); + close_image(ictx); + + std::string oid = ::journal::Journaler::header_oid(m_remote_image_id); + ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid, + m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED)); + + C_SaferCond cond1; + m_replayer->start(&cond1); + ASSERT_EQ(-ENOTCONN, cond1.wait()); + + // Test start succeeds after resync + + open_local_image(&ictx); + librbd::Journal<>::request_resync(ictx); + close_image(ictx); + C_SaferCond cond2; + m_replayer->start(&cond2); + ASSERT_EQ(-ENOTCONN, cond2.wait()); + C_SaferCond delete_cond; + m_image_deleter->wait_for_scheduled_deletion( + m_local_ioctx.get_id(), m_replayer->get_global_image_id(), &delete_cond); + EXPECT_EQ(0, delete_cond.wait()); + + start(); + wait_for_replay_complete(); + + // Test replay stopped after disconnect + + open_remote_image(&ictx); + for (int i = TEST_IO_COUNT; i < 2 * TEST_IO_COUNT; ++i) { + write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE); + } + flush(ictx); + close_image(ictx); + + ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid, + m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED)); + bufferlist bl; + ASSERT_EQ(0, m_remote_ioctx.notify2(oid, bl, 5000, NULL)); + + wait_for_stopped(); + + // Test start fails after disconnect + + C_SaferCond cond3; + m_replayer->start(&cond3); + ASSERT_EQ(-ENOTCONN, cond3.wait()); + C_SaferCond cond4; + m_replayer->start(&cond4); + ASSERT_EQ(-ENOTCONN, cond4.wait()); +} diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 3a78457c0d7..6d3a6154d6d 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -251,6 +251,15 @@ void ImageReplayer::BootstrapProgressContext::update_progress( } } +template +void ImageReplayer::RemoteJournalerListener::handle_update( + ::journal::JournalMetadata *) { + FunctionContext *ctx = new FunctionContext([this](int r) { + replayer->handle_remote_journal_metadata_updated(); + }); + replayer->m_threads->work_queue->queue(ctx, 0); +} + template ImageReplayer::ImageReplayer(Threads *threads, shared_ptr image_deleter, @@ -277,7 +286,8 @@ ImageReplayer::ImageReplayer(Threads *threads, m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " + remote_image_id), m_progress_cxt(this), - m_resync_listener(new ResyncListener(this)) + m_resync_listener(new ResyncListener(this)), + m_remote_listener(this) { // Register asok commands using a temporary "remote_pool_name/global_image_id" // name. When the image name becomes known on start the asok commands will be @@ -509,6 +519,23 @@ void ImageReplayer::handle_init_remote_journaler(int r) { return; } + m_remote_journaler->add_listener(&m_remote_listener); + + cls::journal::Client client; + r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client); + if (r < 0) { + derr << "error retrieving remote journal client: " << cpp_strerror(r) + << dendl; + on_start_fail(r, "error retrieving remote journal client"); + return; + } + + if (client.state != cls::journal::CLIENT_STATE_CONNECTED) { + dout(5) << "client flagged disconnected, stopping image replay" << dendl; + on_start_fail(-ENOTCONN, "disconnected"); + return; + } + start_replay(); } @@ -637,15 +664,18 @@ bool ImageReplayer::on_start_interrupted() } template -void ImageReplayer::stop(Context *on_finish, bool manual) +void ImageReplayer::stop(Context *on_finish, bool manual, int r, + const std::string& desc) { - dout(20) << "on_finish=" << on_finish << dendl; + dout(20) << "on_finish=" << on_finish << ", manual=" << manual + << ", desc=" << desc << dendl; image_replayer::BootstrapRequest *bootstrap_request = nullptr; bool shut_down_replay = false; bool running = true; { Mutex::Locker locker(m_lock); + if (!is_running_()) { running = false; } else { @@ -684,14 +714,14 @@ void ImageReplayer::stop(Context *on_finish, bool manual) } if (shut_down_replay) { - on_stop_journal_replay(); + on_stop_journal_replay(r, desc); } else if (on_finish != nullptr) { on_finish->complete(0); } } template -void ImageReplayer::on_stop_journal_replay() +void ImageReplayer::on_stop_journal_replay(int r, const std::string &desc) { dout(20) << "enter" << dendl; @@ -705,7 +735,7 @@ void ImageReplayer::on_stop_journal_replay() m_state = STATE_STOPPING; } - set_state_description(0, ""); + set_state_description(r, desc); update_mirror_image_status(false, boost::none); reschedule_update_status_task(-1); shut_down(0); @@ -1343,6 +1373,7 @@ void ImageReplayer::shut_down(int r) { ctx->complete(0); }); ctx = new FunctionContext([this, ctx](int r) { + m_remote_journaler->remove_listener(&m_remote_listener); m_remote_journaler->shut_down(ctx); }); if (m_stopping_for_resync) { @@ -1443,6 +1474,30 @@ void ImageReplayer::handle_shut_down(int r) { } } +template +void ImageReplayer::handle_remote_journal_metadata_updated() { + dout(20) << dendl; + + cls::journal::Client client; + { + Mutex::Locker locker(m_lock); + if (!is_running_()) { + return; + } + + int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client); + if (r < 0) { + derr << "failed to retrieve client: " << cpp_strerror(r) << dendl; + return; + } + } + + if (client.state != cls::journal::CLIENT_STATE_CONNECTED) { + dout(0) << "client flagged disconnected, stopping image replay" << dendl; + stop(nullptr, false, -ENOTCONN, "disconnected"); + } +} + template std::string ImageReplayer::to_string(const State state) { switch (state) { diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index c0c60cfa510..ba81deaa1ee 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -14,6 +14,7 @@ #include "include/rados/librados.hpp" #include "cls/journal/cls_journal_types.h" #include "cls/rbd/cls_rbd_types.h" +#include "journal/JournalMetadataListener.h" #include "journal/ReplayEntry.h" #include "librbd/ImageCtx.h" #include "librbd/journal/Types.h" @@ -111,7 +112,8 @@ public: } void start(Context *on_finish = nullptr, bool manual = false); - void stop(Context *on_finish = nullptr, bool manual = false); + void stop(Context *on_finish = nullptr, bool manual = false, + int r = 0, const std::string& desc = ""); void restart(Context *on_finish = nullptr); void flush(Context *on_finish = nullptr); @@ -190,7 +192,7 @@ protected: virtual void on_start_fail(int r, const std::string &desc = ""); virtual bool on_start_interrupted(); - virtual void on_stop_journal_replay(); + virtual void on_stop_journal_replay(int r = 0, const std::string &desc = ""); virtual void on_flush_local_replay_flush_start(Context *on_flush); virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r); @@ -268,6 +270,14 @@ private: librbd::journal::TagData m_replay_tag_data; librbd::journal::EventEntry m_event_entry; + struct RemoteJournalerListener : public ::journal::JournalMetadataListener { + ImageReplayer *replayer; + + RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { } + + void handle_update(::journal::JournalMetadata *); + } m_remote_listener; + struct C_ReplayCommitted : public Context { ImageReplayer *replayer; ReplayEntry replay_entry; @@ -307,6 +317,7 @@ private: void shut_down(int r); void handle_shut_down(int r); + void handle_remote_journal_metadata_updated(); void bootstrap(); void handle_bootstrap(int r); diff --git a/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc b/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc index 13004ebbe0d..15e76091a77 100644 --- a/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc +++ b/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc @@ -370,6 +370,12 @@ void BootstrapRequest::handle_open_local_image(int r) { m_ret_val = r; close_remote_image(); return; + } if (m_client.state == cls::journal::CLIENT_STATE_DISCONNECTED) { + dout(10) << ": client flagged disconnected -- skipping bootstrap" << dendl; + // The caller is expected to detect disconnect initializing remote journal. + m_ret_val = 0; + close_remote_image(); + return; } update_client_image();