Merge pull request #4204 from ceph/wip-10368-mk3

mds: separate MDLog::safe_pos from journaler

Reviewed-by: Greg Farnum <gfarnum@redhat.com>
This commit is contained in:
Gregory Farnum 2015-04-19 17:51:12 -07:00
commit b25e9e130b
2 changed files with 43 additions and 8 deletions

View File

@ -315,6 +315,32 @@ void MDLog::_submit_entry(LogEvent *le, MDSInternalContextBase *c)
}
}
/**
* Invoked on the flush after each entry submitted
*/
class C_MDL_Flushed : public MDSIOContextBase {
protected:
MDLog *mdlog;
MDS *get_mds() {return mdlog->mds;}
uint64_t flushed_to;
MDSInternalContextBase *wrapped;
void finish(int r) {
if (wrapped) {
wrapped->complete(r);
}
mdlog->submit_mutex.Lock();
assert(mdlog->safe_pos <= flushed_to);
mdlog->safe_pos = flushed_to;
mdlog->submit_mutex.Unlock();
}
public:
C_MDL_Flushed(MDLog *m, uint64_t ft, MDSInternalContextBase *w)
: mdlog(m), flushed_to(ft), wrapped(w) {}
};
void MDLog::_submit_thread()
{
dout(10) << "_submit_thread start" << dendl;
@ -355,11 +381,12 @@ void MDLog::_submit_thread()
<< " : " << *le << dendl;
// journal it.
journaler->append_entry(bl); // bl is destroyed.
ls->end = journaler->get_write_pos();
const uint64_t new_write_pos = journaler->append_entry(bl); // bl is destroyed.
ls->end = new_write_pos;
journaler->wait_for_flush(new C_MDL_Flushed(
this, new_write_pos, data.fin));
if (data.fin)
journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin));
if (data.flush)
journaler->flush();
@ -368,8 +395,8 @@ void MDLog::_submit_thread()
delete le;
} else {
if (data.fin)
journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin));
journaler->wait_for_flush(new C_MDL_Flushed(
this, journaler->get_write_pos(), data.fin));
if (data.flush)
journaler->flush();
}
@ -541,7 +568,7 @@ void MDLog::trim(int m)
++p;
if (pending_events.count(ls->seq) ||
ls->end > journaler->get_write_safe_pos()) {
ls->end > safe_pos) {
dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
<< journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
break;
@ -597,7 +624,6 @@ int MDLog::trim_all()
<< "/" << expiring_segments.size()
<< "/" << expired_segments.size() << dendl;
uint64_t safe_pos = journaler->get_write_safe_pos();
uint64_t last_seq = 0;
if (!segments.empty())
last_seq = get_last_segment_seq();
@ -1244,6 +1270,8 @@ void MDLog::_replay_thread()
logger->set(l_mdl_expos, journaler->get_expire_pos());
}
safe_pos = journaler->get_write_safe_pos();
dout(10) << "_replay_thread kicking waiters" << dendl;
mds->mds_lock.Lock();
finish_contexts(g_ceph_context, waitfor_replay, r);

View File

@ -74,6 +74,11 @@ protected:
bool stopping;
// Log position which is persistent *and* for which
// submit_entry wait_for_safe callbacks have already
// been called.
uint64_t safe_pos;
inodeno_t ino;
Journaler *journaler;
@ -184,6 +189,7 @@ public:
unflushed(0),
capped(false),
stopping(false),
safe_pos(0),
journaler(0),
logger(0),
replay_thread(this),
@ -290,6 +296,7 @@ private:
void _trim_expired_segments();
friend class C_MaybeExpiredSegment;
friend class C_MDL_Flushed;
public:
void trim_expired_segments();