mirror of
https://github.com/ceph/ceph
synced 2025-02-22 18:47:18 +00:00
Merge pull request #34930 from trociny/wip-45409
rbd-mirror: wait for events to replay before shut down journal replay Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
commit
e2d9c0a1b3
@ -1088,12 +1088,14 @@ TEST_F(TestMockImageReplayerJournalReplayer, Replay) {
|
||||
EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
|
||||
expect_preprocess(mock_event_preprocessor, false, 0);
|
||||
expect_process(mock_local_journal_replay, 0, 0);
|
||||
EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_));
|
||||
|
||||
// the next event with preprocess
|
||||
expect_try_pop_front(mock_remote_journaler, tag.tid, true);
|
||||
EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
|
||||
expect_preprocess(mock_event_preprocessor, true, 0);
|
||||
expect_process(mock_local_journal_replay, 0, 0);
|
||||
EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_));
|
||||
|
||||
// attempt to process the next event
|
||||
C_SaferCond replay_ctx;
|
||||
@ -1249,6 +1251,7 @@ TEST_F(TestMockImageReplayerJournalReplayer, DelayedReplay) {
|
||||
ReturnArg<1>()));
|
||||
expect_preprocess(mock_event_preprocessor, false, 0);
|
||||
expect_process(mock_local_journal_replay, 0, 0);
|
||||
EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_));
|
||||
|
||||
// attempt to process the next event
|
||||
C_SaferCond replay_ctx;
|
||||
@ -1830,6 +1833,7 @@ TEST_F(TestMockImageReplayerJournalReplayer, AllocateTagDemotion) {
|
||||
EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
|
||||
expect_preprocess(mock_event_preprocessor, false, 0);
|
||||
expect_process(mock_local_journal_replay, 0, 0);
|
||||
EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_));
|
||||
|
||||
remote_replay_handler->handle_entries_available();
|
||||
wait_for_notification();
|
||||
@ -2017,6 +2021,7 @@ TEST_F(TestMockImageReplayerJournalReplayer, ProcessError) {
|
||||
EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
|
||||
expect_preprocess(mock_event_preprocessor, false, 0);
|
||||
expect_process(mock_local_journal_replay, 0, -EINVAL);
|
||||
EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_));
|
||||
|
||||
// attempt to process the next event
|
||||
C_SaferCond replay_ctx;
|
||||
@ -2089,6 +2094,7 @@ TEST_F(TestMockImageReplayerJournalReplayer, ImageNameUpdated) {
|
||||
EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0));
|
||||
expect_preprocess(mock_event_preprocessor, false, 0);
|
||||
expect_process(mock_local_journal_replay, 0, 0);
|
||||
EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_));
|
||||
|
||||
// attempt to process the next event
|
||||
C_SaferCond replay_ctx;
|
||||
|
@ -226,7 +226,7 @@ void Replayer<I>::shut_down(Context* on_finish) {
|
||||
|
||||
cancel_delayed_preprocess_task();
|
||||
cancel_flush_local_replay_task();
|
||||
shut_down_local_journal_replay();
|
||||
wait_for_event_replay();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
@ -404,12 +404,31 @@ bool Replayer<I>::notify_init_complete(std::unique_lock<ceph::mutex>& locker) {
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void Replayer<I>::wait_for_event_replay() {
|
||||
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
|
||||
|
||||
dout(10) << dendl;
|
||||
auto ctx = create_async_context_callback(
|
||||
m_threads->work_queue, create_context_callback<
|
||||
Replayer<I>, &Replayer<I>::handle_wait_for_event_replay>(this));
|
||||
m_event_replay_tracker.wait_for_ops(ctx);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void Replayer<I>::handle_wait_for_event_replay(int r) {
|
||||
dout(10) << "r=" << r << dendl;
|
||||
|
||||
std::unique_lock locker{m_lock};
|
||||
shut_down_local_journal_replay();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void Replayer<I>::shut_down_local_journal_replay() {
|
||||
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
|
||||
|
||||
if (m_local_journal_replay == nullptr) {
|
||||
wait_for_event_replay();
|
||||
close_local_image();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -429,25 +448,6 @@ void Replayer<I>::handle_shut_down_local_journal_replay(int r) {
|
||||
handle_replay_error(r, "failed to shut down local journal replay");
|
||||
}
|
||||
|
||||
wait_for_event_replay();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void Replayer<I>::wait_for_event_replay() {
|
||||
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
|
||||
|
||||
dout(10) << dendl;
|
||||
auto ctx = create_async_context_callback(
|
||||
m_threads->work_queue, create_context_callback<
|
||||
Replayer<I>, &Replayer<I>::handle_wait_for_event_replay>(this));
|
||||
m_event_replay_tracker.wait_for_ops(ctx);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void Replayer<I>::handle_wait_for_event_replay(int r) {
|
||||
dout(10) << "r=" << r << dendl;
|
||||
|
||||
std::unique_lock locker{m_lock};
|
||||
close_local_image();
|
||||
}
|
||||
|
||||
|
@ -145,10 +145,10 @@ private:
|
||||
* REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * *
|
||||
* | *
|
||||
* v *
|
||||
* SHUT_DOWN_LOCAL_JOURNAL_REPLAY *
|
||||
* WAIT_FOR_REPLAY *
|
||||
* | *
|
||||
* v *
|
||||
* WAIT_FOR_REPLAY *
|
||||
* SHUT_DOWN_LOCAL_JOURNAL_REPLAY *
|
||||
* | *
|
||||
* v *
|
||||
* CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * *
|
||||
|
Loading…
Reference in New Issue
Block a user