rbd-mirror: preprocess journal events prior to applying

Fixes: http://tracker.ceph.com/issues/16622
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2016-07-11 15:32:45 -04:00
parent fdfca55737
commit 4df913d10b
3 changed files with 82 additions and 13 deletions

View File

@ -6,6 +6,7 @@
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "test/journal/mock/MockJournaler.h"
#include "test/librbd/mock/MockImageCtx.h"
@ -116,6 +117,28 @@ struct CloseImageRequest<librbd::MockTestImageCtx> {
MOCK_METHOD0(send, void());
};
template<>
struct EventPreprocessor<librbd::MockTestImageCtx> {
static EventPreprocessor *s_instance;
static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx,
::journal::MockJournalerProxy &remote_journaler,
const std::string &local_mirror_uuid,
librbd::journal::MirrorPeerClientMeta *client_meta,
ContextWQ *work_queue) {
assert(s_instance != nullptr);
return s_instance;
}
EventPreprocessor() {
assert(s_instance == nullptr);
s_instance = this;
}
MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &));
MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *));
};
template<>
struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
static ReplayStatusFormatter* s_instance;
@ -136,6 +159,7 @@ struct ReplayStatusFormatter<librbd::MockTestImageCtx> {
BootstrapRequest<librbd::MockTestImageCtx>* BootstrapRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
CloseImageRequest<librbd::MockTestImageCtx>* CloseImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
EventPreprocessor<librbd::MockTestImageCtx>* EventPreprocessor<librbd::MockTestImageCtx>::s_instance = nullptr;
ReplayStatusFormatter<librbd::MockTestImageCtx>* ReplayStatusFormatter<librbd::MockTestImageCtx>::s_instance = nullptr;
} // namespace image_replayer

View File

@ -23,6 +23,7 @@
#include "Threads.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
#define dout_subsys ceph_subsys_rbd_mirror
@ -296,6 +297,7 @@ ImageReplayer<I>::ImageReplayer(Threads *threads,
template <typename I>
ImageReplayer<I>::~ImageReplayer()
{
assert(m_event_preprocessor == nullptr);
assert(m_replay_status_formatter == nullptr);
assert(m_local_image_ctx == nullptr);
assert(m_local_replay == nullptr);
@ -539,6 +541,9 @@ void ImageReplayer<I>::handle_start_replay(int r) {
std::swap(m_on_start_finish, on_finish);
}
m_event_preprocessor = EventPreprocessor<I>::create(
*m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
&m_client_meta, m_threads->work_queue);
m_replay_status_formatter =
ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
@ -703,7 +708,7 @@ void ImageReplayer<I>::handle_replay_ready()
}
if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
process_entry();
preprocess_entry();
return;
}
@ -978,6 +983,43 @@ void ImageReplayer<I>::handle_allocate_local_tag(int r) {
return;
}
preprocess_entry();
}
template <typename I>
void ImageReplayer<I>::preprocess_entry() {
dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
<< dendl;
bufferlist data = m_replay_entry.get_data();
bufferlist::iterator it = data.begin();
int r = m_local_replay->decode(&it, &m_event_entry);
if (r < 0) {
derr << "failed to decode journal event" << dendl;
handle_replay_complete(r, "failed to decode journal event");
return;
}
if (!m_event_preprocessor->is_required(m_event_entry)) {
process_entry();
return;
}
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry>(this);
m_event_preprocessor->preprocess(&m_event_entry, ctx);
}
template <typename I>
void ImageReplayer<I>::handle_preprocess_entry(int r) {
dout(20) << "r=" << r << dendl;
if (r < 0) {
derr << "failed to preprocess journal event" << dendl;
handle_replay_complete(r, "failed to preprocess journal event");
return;
}
process_entry();
}
@ -986,21 +1028,11 @@ void ImageReplayer<I>::process_entry() {
dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
<< dendl;
bufferlist data = m_replay_entry.get_data();
bufferlist::iterator it = data.begin();
librbd::journal::EventEntry event_entry;
int r = m_local_replay->decode(&it, &event_entry);
if (r < 0) {
derr << "failed to decode journal event" << dendl;
handle_replay_complete(r, "failed to decode journal event");
return;
}
Context *on_ready = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
m_local_replay->process(event_entry, on_ready, on_commit);
m_local_replay->process(m_event_entry, on_ready, on_commit);
m_event_entry = {};
}
template <typename I>
@ -1305,6 +1337,10 @@ void ImageReplayer<I>::shut_down(int r, Context *on_start) {
m_local_journal->stop_external_replay();
m_local_journal = nullptr;
m_local_replay = nullptr;
delete m_event_preprocessor;
m_event_preprocessor = nullptr;
ctx->complete(0);
});
ctx = new FunctionContext([this, ctx](int r) {

View File

@ -45,6 +45,7 @@ namespace mirror {
struct Threads;
namespace image_replayer { template <typename> class BootstrapRequest; }
namespace image_replayer { template <typename> class EventPreprocessor; }
namespace image_replayer { template <typename> class ReplayStatusFormatter; }
/**
@ -165,6 +166,9 @@ protected:
* | ALLOCATE_LOCAL_TAG * * * * * *
* | | | *
* | v (error) *
* | PREPROCESS_ENTRY * * * * * * *
* | | | *
* | v (error) *
* | PROCESS_ENTRY * * * * * * * * *
* | | | *
* | \---------------------/ *
@ -228,6 +232,7 @@ private:
int m_last_r = 0;
std::string m_state_desc;
BootstrapProgressContext m_progress_cxt;
image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
nullptr;
librados::IoCtx m_local_ioctx, m_remote_ioctx;
@ -263,6 +268,7 @@ private:
uint64_t m_replay_tag_tid = 0;
cls::journal::Tag m_replay_tag;
librbd::journal::TagData m_replay_tag_data;
librbd::journal::EventEntry m_event_entry;
struct C_ReplayCommitted : public Context {
ImageReplayer *replayer;
@ -322,6 +328,9 @@ private:
void allocate_local_tag();
void handle_allocate_local_tag(int r);
void preprocess_entry();
void handle_preprocess_entry(int r);
void process_entry();
void handle_process_entry_ready(int r);
void handle_process_entry_safe(const ReplayEntry& replay_entry, int r);