librbd: journal: mark entry committed after replay

After replying a journal entry we have to call committed(). Otherwise,
the entries remain with flag committed=false in JournalMetadata
m_pending_commit_tids forever, which prevents commit position update.

Signed-off-by: Mykola Golub <mgolub@mirantis.com>
This commit is contained in:
Mykola Golub 2015-10-30 12:41:15 +02:00
parent e8a584ffc1
commit e72fc02cbd
3 changed files with 77 additions and 43 deletions

View File

@ -55,6 +55,19 @@ struct SetOpRequestTid : public boost::static_visitor<void> {
}
};
struct C_ReplayCommitted : public Context {
::journal::Journaler *journaler;
::journal::ReplayEntry replay_entry;
C_ReplayCommitted(::journal::Journaler *journaler,
::journal::ReplayEntry &&replay_entry) :
journaler(journaler), replay_entry(std::move(replay_entry)) {
}
virtual void finish(int r) {
journaler->committed(replay_entry);
}
};
} // anonymous namespace
Journal::Journal(ImageCtx &image_ctx)
@ -547,7 +560,8 @@ void Journal::handle_replay_ready() {
m_lock.Unlock();
bufferlist data = replay_entry.get_data();
bufferlist::iterator it = data.begin();
int r = m_journal_replay->process(it);
int r = m_journal_replay->process(it, new C_ReplayCommitted(m_journaler,
std::move(replay_entry)));
m_lock.Lock();
if (r < 0) {

View File

@ -21,7 +21,7 @@ JournalReplay::~JournalReplay() {
assert(m_aio_completions.empty());
}
int JournalReplay::process(bufferlist::iterator it) {
int JournalReplay::process(bufferlist::iterator it, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
@ -34,7 +34,7 @@ int JournalReplay::process(bufferlist::iterator it) {
return -EINVAL;
}
boost::apply_visitor(EventVisitor(this), event_entry.event);
boost::apply_visitor(EventVisitor(this, on_safe), event_entry.event);
return 0;
}
@ -49,95 +49,111 @@ int JournalReplay::flush() {
return m_ret_val;
}
void JournalReplay::handle_event(const journal::AioDiscardEvent &event) {
void JournalReplay::handle_event(const journal::AioDiscardEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": AIO discard event" << dendl;
AioCompletion *aio_comp = create_aio_completion();
AioCompletion *aio_comp = create_aio_completion(on_safe);
AioImageRequest::aio_discard(&m_image_ctx, aio_comp, event.offset,
event.length);
}
void JournalReplay::handle_event(const journal::AioWriteEvent &event) {
void JournalReplay::handle_event(const journal::AioWriteEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": AIO write event" << dendl;
bufferlist data = event.data;
AioCompletion *aio_comp = create_aio_completion();
AioCompletion *aio_comp = create_aio_completion(on_safe);
AioImageRequest::aio_write(&m_image_ctx, aio_comp, event.offset, event.length,
data.c_str(), 0);
}
void JournalReplay::handle_event(const journal::AioFlushEvent &event) {
void JournalReplay::handle_event(const journal::AioFlushEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl;
AioCompletion *aio_comp = create_aio_completion();
AioCompletion *aio_comp = create_aio_completion(on_safe);
AioImageRequest::aio_flush(&m_image_ctx, aio_comp);
}
void JournalReplay::handle_event(const journal::OpFinishEvent &event) {
void JournalReplay::handle_event(const journal::OpFinishEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Op finish event" << dendl;
}
void JournalReplay::handle_event(const journal::SnapCreateEvent &event) {
void JournalReplay::handle_event(const journal::SnapCreateEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap create event" << dendl;
}
void JournalReplay::handle_event(const journal::SnapRemoveEvent &event) {
void JournalReplay::handle_event(const journal::SnapRemoveEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap remove event" << dendl;
}
void JournalReplay::handle_event(const journal::SnapRenameEvent &event) {
void JournalReplay::handle_event(const journal::SnapRenameEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap rename event" << dendl;
}
void JournalReplay::handle_event(const journal::SnapProtectEvent &event) {
void JournalReplay::handle_event(const journal::SnapProtectEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap protect event" << dendl;
}
void JournalReplay::handle_event(const journal::SnapUnprotectEvent &event) {
void JournalReplay::handle_event(const journal::SnapUnprotectEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap unprotect event"
<< dendl;
}
void JournalReplay::handle_event(const journal::SnapRollbackEvent &event) {
void JournalReplay::handle_event(const journal::SnapRollbackEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap rollback start event"
<< dendl;
}
void JournalReplay::handle_event(const journal::RenameEvent &event) {
void JournalReplay::handle_event(const journal::RenameEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Rename event" << dendl;
}
void JournalReplay::handle_event(const journal::ResizeEvent &event) {
void JournalReplay::handle_event(const journal::ResizeEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Resize start event" << dendl;
}
void JournalReplay::handle_event(const journal::FlattenEvent &event) {
void JournalReplay::handle_event(const journal::FlattenEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Flatten start event" << dendl;
}
void JournalReplay::handle_event(const journal::UnknownEvent &event) {
void JournalReplay::handle_event(const journal::UnknownEvent &event,
Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": unknown event" << dendl;
on_safe->complete(0);
}
AioCompletion *JournalReplay::create_aio_completion() {
AioCompletion *JournalReplay::create_aio_completion(Context *on_safe) {
Mutex::Locker locker(m_lock);
AioCompletion *aio_comp = aio_create_completion_internal(
this, &aio_completion_callback);
m_aio_completions.insert(aio_comp);
m_aio_completions.insert(std::pair<AioCompletion*,Context*>(
aio_comp, on_safe));
return aio_comp;
}
@ -153,6 +169,9 @@ void JournalReplay::handle_aio_completion(AioCompletion *aio_comp) {
ldout(cct, 20) << this << " " << __func__ << ": aio_comp=" << aio_comp << ", "
<< "r=" << r << dendl;
Context *on_safe = it->second;
on_safe->complete(r);
if (r < 0 && m_ret_val == 0) {
m_ret_val = r;
}

View File

@ -11,7 +11,7 @@
#include "common/Mutex.h"
#include "librbd/JournalTypes.h"
#include <boost/variant.hpp>
#include <set>
#include <map>
namespace librbd {
@ -23,22 +23,23 @@ public:
JournalReplay(ImageCtx &image_ctx);
~JournalReplay();
int process(bufferlist::iterator it);
int process(bufferlist::iterator it, Context *on_safe = NULL);
int flush();
private:
typedef std::set<AioCompletion *> AioCompletions;
typedef std::map<AioCompletion*,Context*> AioCompletions;
struct EventVisitor : public boost::static_visitor<void> {
JournalReplay *journal_replay;
Context *on_safe;
EventVisitor(JournalReplay *_journal_replay)
: journal_replay(_journal_replay) {
EventVisitor(JournalReplay *_journal_replay, Context *_on_safe)
: journal_replay(_journal_replay), on_safe(_on_safe) {
}
template <typename Event>
inline void operator()(const Event &event) const {
journal_replay->handle_event(event);
journal_replay->handle_event(event, on_safe);
}
};
@ -50,22 +51,22 @@ private:
AioCompletions m_aio_completions;
int m_ret_val;
void handle_event(const journal::AioDiscardEvent &event);
void handle_event(const journal::AioWriteEvent &event);
void handle_event(const journal::AioFlushEvent &event);
void handle_event(const journal::OpFinishEvent &event);
void handle_event(const journal::SnapCreateEvent &event);
void handle_event(const journal::SnapRemoveEvent &event);
void handle_event(const journal::SnapRenameEvent &event);
void handle_event(const journal::SnapProtectEvent &event);
void handle_event(const journal::SnapUnprotectEvent &event);
void handle_event(const journal::SnapRollbackEvent &event);
void handle_event(const journal::RenameEvent &event);
void handle_event(const journal::ResizeEvent &event);
void handle_event(const journal::FlattenEvent &event);
void handle_event(const journal::UnknownEvent &event);
void handle_event(const journal::AioDiscardEvent &event, Context *on_safe);
void handle_event(const journal::AioWriteEvent &event, Context *on_safe);
void handle_event(const journal::AioFlushEvent &event, Context *on_safe);
void handle_event(const journal::OpFinishEvent &event, Context *on_safe);
void handle_event(const journal::SnapCreateEvent &event, Context *on_safe);
void handle_event(const journal::SnapRemoveEvent &event, Context *on_safe);
void handle_event(const journal::SnapRenameEvent &event, Context *on_safe);
void handle_event(const journal::SnapProtectEvent &event, Context *on_safe);
void handle_event(const journal::SnapUnprotectEvent &event, Context *on_safe);
void handle_event(const journal::SnapRollbackEvent &event, Context *on_safe);
void handle_event(const journal::RenameEvent &event, Context *on_safe);
void handle_event(const journal::ResizeEvent &event, Context *on_safe);
void handle_event(const journal::FlattenEvent &event, Context *on_safe);
void handle_event(const journal::UnknownEvent &event, Context *on_safe);
AioCompletion *create_aio_completion();
AioCompletion *create_aio_completion(Context *on_safe);
void handle_aio_completion(AioCompletion *aio_comp);
static void aio_completion_callback(completion_t cb, void *arg);