mirror of
https://github.com/ceph/ceph
synced 2025-01-01 08:32:24 +00:00
Merge pull request #34615 from dillaman/wip-45072
rbd-mirror: image replayer stop might race with instance replayer shut down Reviewed-by: Mykola Golub <mgolub@suse.com>
This commit is contained in:
commit
5e540f1191
@ -5,8 +5,7 @@
|
||||
#define CEPH_ASYNC_OP_TRACKER_H
|
||||
|
||||
#include "common/ceph_mutex.h"
|
||||
|
||||
struct Context;
|
||||
#include "include/Context.h"
|
||||
|
||||
class AsyncOpTracker {
|
||||
public:
|
||||
@ -27,4 +26,23 @@ private:
|
||||
|
||||
};
|
||||
|
||||
class C_TrackedOp : public Context {
|
||||
public:
|
||||
C_TrackedOp(AsyncOpTracker& async_op_tracker, Context* on_finish)
|
||||
: m_async_op_tracker(async_op_tracker), m_on_finish(on_finish) {
|
||||
m_async_op_tracker.start_op();
|
||||
}
|
||||
|
||||
void finish(int r) override {
|
||||
if (m_on_finish != nullptr) {
|
||||
m_on_finish->complete(r);
|
||||
}
|
||||
m_async_op_tracker.finish_op();
|
||||
}
|
||||
|
||||
private:
|
||||
AsyncOpTracker& m_async_op_tracker;
|
||||
Context* m_on_finish;
|
||||
};
|
||||
|
||||
#endif // CEPH_ASYNC_OP_TRACKER_H
|
||||
|
@ -90,7 +90,7 @@ struct ImageReplayer<librbd::MockTestImageCtx> {
|
||||
MOCK_METHOD0(destroy, void());
|
||||
MOCK_METHOD2(start, void(Context *, bool));
|
||||
MOCK_METHOD2(stop, void(Context *, bool));
|
||||
MOCK_METHOD0(restart, void());
|
||||
MOCK_METHOD1(restart, void(Context*));
|
||||
MOCK_METHOD0(flush, void());
|
||||
MOCK_METHOD1(print_status, void(Formatter *));
|
||||
MOCK_METHOD1(add_peer, void(const Peer<librbd::MockTestImageCtx>& peer));
|
||||
@ -201,7 +201,8 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
|
||||
EXPECT_CALL(mock_image_replayer, is_stopped()).WillOnce(Return(true));
|
||||
EXPECT_CALL(mock_image_replayer, is_blacklisted()).WillOnce(Return(false));
|
||||
EXPECT_CALL(mock_image_replayer, is_finished()).WillOnce(Return(false));
|
||||
EXPECT_CALL(mock_image_replayer, start(nullptr, false));
|
||||
EXPECT_CALL(mock_image_replayer, start(_, false))
|
||||
.WillOnce(CompleteContext(0));
|
||||
expect_work_queue(mock_threads);
|
||||
|
||||
instance_replayer.acquire_image(&mock_instance_watcher, global_image_id,
|
||||
@ -271,7 +272,8 @@ TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) {
|
||||
EXPECT_CALL(mock_image_replayer, is_stopped()).WillOnce(Return(true));
|
||||
EXPECT_CALL(mock_image_replayer, is_blacklisted()).WillOnce(Return(false));
|
||||
EXPECT_CALL(mock_image_replayer, is_finished()).WillOnce(Return(false));
|
||||
EXPECT_CALL(mock_image_replayer, start(nullptr, false));
|
||||
EXPECT_CALL(mock_image_replayer, start(_, false))
|
||||
.WillOnce(CompleteContext(0));
|
||||
expect_work_queue(mock_threads);
|
||||
|
||||
instance_replayer.acquire_image(&mock_instance_watcher, global_image_id,
|
||||
@ -344,7 +346,8 @@ TEST_F(TestMockInstanceReplayer, Reacquire) {
|
||||
EXPECT_CALL(mock_image_replayer, is_stopped()).WillOnce(Return(true));
|
||||
EXPECT_CALL(mock_image_replayer, is_blacklisted()).WillOnce(Return(false));
|
||||
EXPECT_CALL(mock_image_replayer, is_finished()).WillOnce(Return(false));
|
||||
EXPECT_CALL(mock_image_replayer, start(nullptr, false));
|
||||
EXPECT_CALL(mock_image_replayer, start(_, false))
|
||||
.WillOnce(CompleteContext(0));
|
||||
expect_work_queue(mock_threads);
|
||||
|
||||
C_SaferCond on_acquire1;
|
||||
@ -354,7 +357,8 @@ TEST_F(TestMockInstanceReplayer, Reacquire) {
|
||||
|
||||
// Re-acquire
|
||||
EXPECT_CALL(mock_image_replayer, set_finished(false));
|
||||
EXPECT_CALL(mock_image_replayer, restart());
|
||||
EXPECT_CALL(mock_image_replayer, restart(_))
|
||||
.WillOnce(CompleteContext(0));
|
||||
expect_work_queue(mock_threads);
|
||||
|
||||
C_SaferCond on_acquire2;
|
||||
|
@ -175,7 +175,7 @@ void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
|
||||
// detect if the image has been deleted while the leader was offline
|
||||
auto& image_replayer = it->second;
|
||||
image_replayer->set_finished(false);
|
||||
image_replayer->restart();
|
||||
image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
|
||||
}
|
||||
|
||||
m_threads->work_queue->queue(on_finish, 0);
|
||||
@ -224,7 +224,7 @@ void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
|
||||
// it will eventually detect that the peer image is missing and
|
||||
// determine if a delete propagation is required.
|
||||
auto image_replayer = it->second;
|
||||
image_replayer->restart();
|
||||
image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
|
||||
}
|
||||
m_threads->work_queue->queue(on_finish, 0);
|
||||
}
|
||||
@ -252,25 +252,21 @@ void InstanceReplayer<I>::start()
|
||||
|
||||
m_manual_stop = false;
|
||||
|
||||
auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
|
||||
auto gather_ctx = new C_Gather(
|
||||
cct, new C_TrackedOp(m_async_op_tracker, nullptr));
|
||||
for (auto &kv : m_image_replayers) {
|
||||
auto &image_replayer = kv.second;
|
||||
image_replayer->start(nullptr, true);
|
||||
image_replayer->start(gather_ctx->new_sub(), true);
|
||||
}
|
||||
|
||||
gather_ctx->activate();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void InstanceReplayer<I>::stop()
|
||||
{
|
||||
dout(10) << dendl;
|
||||
|
||||
std::lock_guard locker{m_lock};
|
||||
|
||||
m_manual_stop = true;
|
||||
|
||||
for (auto &kv : m_image_replayers) {
|
||||
auto &image_replayer = kv.second;
|
||||
image_replayer->stop(nullptr, true);
|
||||
}
|
||||
stop(nullptr);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -279,7 +275,8 @@ void InstanceReplayer<I>::stop(Context *on_finish)
|
||||
dout(10) << dendl;
|
||||
|
||||
auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
|
||||
auto gather_ctx = new C_Gather(cct, on_finish);
|
||||
auto gather_ctx = new C_Gather(
|
||||
cct, new C_TrackedOp(m_async_op_tracker, on_finish));
|
||||
{
|
||||
std::lock_guard locker{m_lock};
|
||||
|
||||
@ -305,7 +302,7 @@ void InstanceReplayer<I>::restart()
|
||||
|
||||
for (auto &kv : m_image_replayers) {
|
||||
auto &image_replayer = kv.second;
|
||||
image_replayer->restart();
|
||||
image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
|
||||
}
|
||||
}
|
||||
|
||||
@ -347,7 +344,7 @@ void InstanceReplayer<I>::start_image_replayer(
|
||||
}
|
||||
|
||||
dout(10) << "global_image_id=" << global_image_id << dendl;
|
||||
image_replayer->start(nullptr, false);
|
||||
image_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
|
@ -75,22 +75,6 @@ struct Replayer<I>::C_ReplayCommitted : public Context {
|
||||
}
|
||||
};
|
||||
|
||||
template <typename I>
|
||||
struct Replayer<I>::C_TrackedOp : public Context {
|
||||
Replayer *replayer;
|
||||
Context* ctx;
|
||||
|
||||
C_TrackedOp(Replayer* replayer, Context* ctx)
|
||||
: replayer(replayer), ctx(ctx) {
|
||||
replayer->m_in_flight_op_tracker.start_op();
|
||||
}
|
||||
|
||||
void finish(int r) override {
|
||||
ctx->complete(r);
|
||||
replayer->m_in_flight_op_tracker.finish_op();
|
||||
}
|
||||
};
|
||||
|
||||
template <typename I>
|
||||
struct Replayer<I>::RemoteJournalerListener
|
||||
: public ::journal::JournalMetadataListener {
|
||||
@ -99,7 +83,9 @@ struct Replayer<I>::RemoteJournalerListener
|
||||
RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {}
|
||||
|
||||
void handle_update(::journal::JournalMetadata*) override {
|
||||
auto ctx = new C_TrackedOp(replayer, new LambdaContext([this](int r) {
|
||||
auto ctx = new C_TrackedOp(
|
||||
replayer->m_in_flight_op_tracker,
|
||||
new LambdaContext([this](int r) {
|
||||
replayer->handle_remote_journal_metadata_updated();
|
||||
}));
|
||||
replayer->m_threads->work_queue->queue(ctx, 0);
|
||||
@ -247,7 +233,7 @@ template <typename I>
|
||||
void Replayer<I>::flush(Context* on_finish) {
|
||||
dout(10) << dendl;
|
||||
|
||||
flush_local_replay(new C_TrackedOp(this, on_finish));
|
||||
flush_local_replay(new C_TrackedOp(m_in_flight_op_tracker, on_finish));
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -264,7 +250,7 @@ bool Replayer<I>::get_replay_status(std::string* description,
|
||||
return false;
|
||||
}
|
||||
|
||||
on_finish = new C_TrackedOp(this, on_finish);
|
||||
on_finish = new C_TrackedOp(m_in_flight_op_tracker, on_finish);
|
||||
return m_replay_status_formatter->get_or_send_update(description,
|
||||
on_finish);
|
||||
}
|
||||
@ -1136,7 +1122,7 @@ void Replayer<I>::notify_status_updated() {
|
||||
|
||||
dout(10) << dendl;
|
||||
|
||||
auto ctx = new C_TrackedOp(this, new LambdaContext(
|
||||
auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
|
||||
[this](int) {
|
||||
m_replayer_listener->handle_notification();
|
||||
}));
|
||||
|
@ -174,7 +174,6 @@ private:
|
||||
};
|
||||
|
||||
struct C_ReplayCommitted;
|
||||
struct C_TrackedOp;
|
||||
struct RemoteJournalerListener;
|
||||
struct RemoteReplayHandler;
|
||||
struct LocalJournalListener;
|
||||
|
@ -87,22 +87,6 @@ struct Replayer<I>::C_UpdateWatchCtx : public librbd::UpdateWatchCtx {
|
||||
}
|
||||
};
|
||||
|
||||
template <typename I>
|
||||
struct Replayer<I>::C_TrackedOp : public Context {
|
||||
Replayer *replayer;
|
||||
Context* ctx;
|
||||
|
||||
C_TrackedOp(Replayer* replayer, Context* ctx)
|
||||
: replayer(replayer), ctx(ctx) {
|
||||
replayer->m_in_flight_op_tracker.start_op();
|
||||
}
|
||||
|
||||
void finish(int r) override {
|
||||
ctx->complete(r);
|
||||
replayer->m_in_flight_op_tracker.finish_op();
|
||||
}
|
||||
};
|
||||
|
||||
template <typename I>
|
||||
struct Replayer<I>::DeepCopyHandler : public librbd::deep_copy::Handler {
|
||||
Replayer *replayer;
|
||||
@ -1101,7 +1085,8 @@ void Replayer<I>::update_non_primary_snapshot(bool complete) {
|
||||
&op, m_local_snap_id_end, m_local_mirror_snap_ns.complete,
|
||||
m_local_mirror_snap_ns.last_copied_object_number);
|
||||
|
||||
auto ctx = new C_TrackedOp(this, new LambdaContext([this, complete](int r) {
|
||||
auto ctx = new C_TrackedOp(
|
||||
m_in_flight_op_tracker, new LambdaContext([this, complete](int r) {
|
||||
handle_update_non_primary_snapshot(complete, r);
|
||||
}));
|
||||
auto aio_comp = create_rados_callback(ctx);
|
||||
@ -1419,7 +1404,7 @@ void Replayer<I>::notify_status_updated() {
|
||||
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
|
||||
|
||||
dout(10) << dendl;
|
||||
auto ctx = new C_TrackedOp(this, new LambdaContext(
|
||||
auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
|
||||
[this](int) {
|
||||
m_replayer_listener->handle_notification();
|
||||
}));
|
||||
|
@ -189,7 +189,6 @@ private:
|
||||
};
|
||||
|
||||
struct C_UpdateWatchCtx;
|
||||
struct C_TrackedOp;
|
||||
struct DeepCopyHandler;
|
||||
|
||||
Threads<ImageCtxT>* m_threads;
|
||||
|
Loading…
Reference in New Issue
Block a user