JournalingFileStore: move apply/commit sequencing to apply_manager

syncing the filestore requires a stable commit point (i.e., all ops
up to applied_seq must have been applied).  Previously, we used
journal_lock to atomically block new applies while waiting for
the remaining ones to finish.  This creates unnecessary contention.
We now use apply_manager to manage that state atomically with its
own lock.

Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2012-10-05 17:33:36 -07:00
parent 1d9f5d27d6
commit 9601b29132
3 changed files with 128 additions and 116 deletions

View File

@ -2222,7 +2222,7 @@ void FileStore::queue_op(OpSequencer *osr, Op *o)
// mark apply start _now_, because we need to drain the entire apply
// queue during commit in order to put the store in a consistent
// state.
op_apply_start(o->op);
apply_manager.op_apply_start(o->op);
op_tp.lock();
osr->queue(o);
@ -2295,7 +2295,7 @@ void FileStore::_do_op(OpSequencer *osr)
dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
int r = do_transactions(o->tls, o->op);
op_apply_finish(o->op);
apply_manager.op_apply_finish(o->op);
dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
<< ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
@ -2409,7 +2409,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
if (m_filestore_do_dump)
dump_transactions(tls, op, osr);
op_apply_start(op);
apply_manager.op_apply_start(op);
int r = do_transactions(tls, op);
if (r >= 0) {
@ -2427,7 +2427,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
op_finisher.queue(onreadable, r);
submit_manager.op_submit_finish(op);
op_apply_finish(op);
apply_manager.op_apply_finish(op);
return r;
}
@ -3647,9 +3647,9 @@ void FileStore::sync_entry()
fin.swap(sync_waiters);
lock.Unlock();
if (commit_start()) {
if (apply_manager.commit_start()) {
utime_t start = ceph_clock_now(g_ceph_context);
uint64_t cp = committing_seq;
uint64_t cp = apply_manager.get_committing_seq();
sync_entry_timeo_lock.Lock();
SyncEntryTimeout *sync_entry_timeo =
@ -3697,7 +3697,7 @@ void FileStore::sync_entry()
snaps.push_back(cp);
commit_started();
apply_manager.commit_started();
// wait for commit
dout(20) << " waiting for transid " << async_args.transid << " to complete" << dendl;
@ -3728,11 +3728,11 @@ void FileStore::sync_entry()
assert(r == 0);
snaps.push_back(cp);
commit_started();
apply_manager.commit_started();
}
} else
{
commit_started();
apply_manager.commit_started();
if (btrfs) {
dout(15) << "sync_entry doing btrfs SYNC" << dendl;
@ -3764,7 +3764,7 @@ void FileStore::sync_entry()
logger->finc(l_os_commit_lat, lat);
logger->finc(l_os_commit_len, dur);
commit_finish();
apply_manager.commit_finish();
logger->set(l_os_committing, 0);

View File

@ -38,12 +38,8 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
fs_op_seq = g_conf->journal_replay_from - 1;
}
journal_lock.Lock();
uint64_t op_seq = fs_op_seq;
committed_seq = fs_op_seq;
committing_seq = fs_op_seq;
applied_seq = fs_op_seq;
journal_lock.Unlock();
apply_manager.init_seq(fs_op_seq);
if (!journal)
return 0;
@ -58,8 +54,6 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
return err;
}
journal_lock.Lock();
replaying = true;
int count = 0;
@ -85,14 +79,11 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
tls.push_back(t);
}
open_ops++;
journal_lock.Unlock();
apply_manager.op_apply_start(seq);
int r = do_transactions(tls, seq);
journal_lock.Lock();
open_ops--;
cond.Signal();
apply_manager.op_apply_finish(seq);
op_seq = applied_seq = seq;
op_seq = seq;
while (!tls.empty()) {
delete tls.front();
@ -100,16 +91,12 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
}
dout(3) << "journal_replay: r = " << r << ", op_seq now " << op_seq << dendl;
assert(op_seq == seq);
seq++; // we expect the next op
}
replaying = false;
submit_manager.set_op_seq(op_seq);
journal_lock.Unlock();
// done reading, make writeable.
journal->make_writeable();
@ -119,16 +106,9 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
// ------------------------------------
uint64_t JournalingObjectStore::op_apply_start(uint64_t op)
uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
{
Mutex::Locker l(journal_lock);
return _op_apply_start(op);
}
uint64_t JournalingObjectStore::_op_apply_start(uint64_t op)
{
assert(journal_lock.is_locked());
Mutex::Locker l(apply_lock);
// if we ops are blocked, or there are already people (left) in
// line, get in line.
if (blocked || !ops_apply_blocked.empty()) {
@ -137,7 +117,7 @@ uint64_t JournalingObjectStore::_op_apply_start(uint64_t op)
dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl;
// sleep until we are not blocked AND we are at the front of line
while (blocked || ops_apply_blocked.front() != &cond)
cond.Wait(journal_lock);
cond.Wait(apply_lock);
dout(10) << "op_apply_start " << op << " woke (at front of line)" << dendl;
ops_apply_blocked.pop_front();
if (!ops_apply_blocked.empty()) {
@ -147,24 +127,24 @@ uint64_t JournalingObjectStore::_op_apply_start(uint64_t op)
}
dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl;
assert(!blocked);
open_ops++;
return op;
}
void JournalingObjectStore::op_apply_finish(uint64_t op)
void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
{
journal_lock.Lock();
dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> " << (open_ops-1) << dendl;
Mutex::Locker l(apply_lock);
dout(10) << "op_apply_finish " << op << " open_ops " << open_ops
<< " -> " << (open_ops-1) << dendl;
if (--open_ops == 0)
cond.Signal();
open_ops_cond.Signal();
// there can be multiple applies in flight; track the max value we
// note. note that we can't _read_ this value and learn anything
// meaningful unless/until we've quiesced all in-flight applies.
if (op > applied_seq)
applied_seq = op;
journal_lock.Unlock();
}
uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
@ -192,51 +172,57 @@ void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op)
// ------------------------------------------
bool JournalingObjectStore::commit_start()
void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c)
{
Mutex::Locker l(com_lock);
assert(c);
commit_waiters[op].push_back(c);
}
bool JournalingObjectStore::ApplyManager::commit_start()
{
bool ret = false;
journal_lock.Lock();
dout(10) << "commit_start op_seq " << submit_manager.get_op_seq()
<< ", applied_seq " << applied_seq
<< ", committed_seq " << committed_seq << dendl;
blocked = true;
while (open_ops > 0) {
dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
cond.Wait(journal_lock);
{
Mutex::Locker l(apply_lock);
dout(10) << "commit_start "
<< ", applied_seq " << applied_seq << dendl;
blocked = true;
while (open_ops > 0) {
dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
open_ops_cond.Wait(apply_lock);
}
assert(open_ops == 0);
dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
{
Mutex::Locker l(com_lock);
if (applied_seq == committed_seq) {
dout(10) << "commit_start nothing to do" << dendl;
blocked = false;
if (!ops_apply_blocked.empty())
ops_apply_blocked.front()->Signal();
assert(commit_waiters.empty());
goto out;
}
committing_seq = applied_seq;
dout(10) << "commit_start committing " << committing_seq
<< ", still blocked" << dendl;
}
}
dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
assert(open_ops == 0);
if (applied_seq == committed_seq) {
dout(10) << "commit_start nothing to do" << dendl;
blocked = false;
if (!ops_apply_blocked.empty())
ops_apply_blocked.front()->Signal();
assert(commit_waiters.empty());
goto out;
}
com_lock.Lock();
// we can _only_ read applied_seq here because open_ops == 0 (we've
// quiesced all in-flight applies).
committing_seq = applied_seq;
com_lock.Unlock();
dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl;
ret = true;
out:
if (journal)
journal->commit_start(); // tell the journal too
journal_lock.Unlock();
return ret;
}
void JournalingObjectStore::commit_started()
void JournalingObjectStore::ApplyManager::commit_started()
{
Mutex::Locker l(journal_lock);
Mutex::Locker l(apply_lock);
// allow new ops. (underlying fs should now be committing all prior ops)
dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl;
blocked = false;
@ -244,21 +230,19 @@ void JournalingObjectStore::commit_started()
ops_apply_blocked.front()->Signal();
}
void JournalingObjectStore::commit_finish()
void JournalingObjectStore::ApplyManager::commit_finish()
{
Mutex::Locker l(journal_lock);
Mutex::Locker l(com_lock);
dout(10) << "commit_finish thru " << committing_seq << dendl;
if (journal)
journal->committed_thru(committing_seq);
com_lock.Lock();
committed_seq = committing_seq;
com_lock.Unlock();
map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
while (p != commit_waiters.end() &&
p->first <= committing_seq) {
p->first <= committing_seq) {
finisher.queue(p->second);
commit_waiters.erase(p++);
}
@ -285,6 +269,7 @@ void JournalingObjectStore::_op_journal_transactions(
::encode(*t, tbl);
}
journal->submit_entry(op, tbl, data_align, onjournal, osd_op);
} else if (onjournal)
commit_waiters[op].push_back(onjournal);
} else if (onjournal) {
apply_manager.add_waiter(op, onjournal);
}
}

View File

@ -21,9 +21,9 @@
class JournalingObjectStore : public ObjectStore {
protected:
uint64_t applied_seq;
uint64_t committing_seq, committed_seq;
map<version_t, vector<Context*> > commit_waiters;
Journal *journal;
Finisher finisher;
class SubmitManager {
Mutex lock;
@ -38,25 +38,69 @@ protected:
void op_submit_finish(uint64_t op);
void set_op_seq(uint64_t seq) {
Mutex::Locker l(lock);
seq = op_seq;
op_seq = seq;
}
uint64_t get_op_seq() {
return op_seq;
}
} submit_manager;
int open_ops;
bool blocked;
class ApplyManager {
Journal *&journal;
Finisher &finisher;
Journal *journal;
Finisher finisher;
Mutex apply_lock;
bool blocked;
Cond blocked_cond;
int open_ops;
Cond open_ops_cond;
uint64_t applied_seq;
Cond cond;
Mutex journal_lock;
Mutex com_lock;
Mutex com_lock;
map<version_t, vector<Context*> > commit_waiters;
uint64_t committing_seq, committed_seq;
list<uint64_t> ops_submitting;
list<Cond*> ops_apply_blocked;
list<uint64_t> ops_submitting;
list<Cond*> ops_apply_blocked;
public:
ApplyManager(Journal *&j, Finisher &f) :
journal(j), finisher(f),
apply_lock("JOS::ApplyManager::apply_lock"),
blocked(false),
open_ops(0),
applied_seq(0),
com_lock("JOS::ApplyManager::com_lock"),
committing_seq(0), committed_seq(0) {}
void add_waiter(uint64_t, Context*);
uint64_t op_apply_start(uint64_t op);
void op_apply_finish(uint64_t op);
bool commit_start();
void commit_started();
void commit_finish();
bool is_committing() {
Mutex::Locker l(com_lock);
return committing_seq != committed_seq;
}
uint64_t get_committed_seq() {
Mutex::Locker l(com_lock);
return committed_seq;
}
uint64_t get_committing_seq() {
Mutex::Locker l(com_lock);
return committing_seq;
}
void init_seq(uint64_t fs_op_seq) {
{
Mutex::Locker l(com_lock);
committed_seq = fs_op_seq;
committing_seq = fs_op_seq;
}
{
Mutex::Locker l(apply_lock);
applied_seq = fs_op_seq;
}
}
} apply_manager;
bool replaying;
@ -65,39 +109,22 @@ protected:
void journal_stop();
int journal_replay(uint64_t fs_op_seq);
// --
uint64_t op_submit_start();
void op_submit_finish(uint64_t op_seq);
uint64_t op_apply_start(uint64_t op);
uint64_t _op_apply_start(uint64_t op);
void op_apply_finish(uint64_t op);
void _op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
Context *onjournal, TrackedOpRef osd_op);
virtual int do_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op_seq) = 0;
bool commit_start();
void commit_started(); // allow new ops (underlying fs should now be committing all prior ops)
void commit_finish();
public:
bool is_committing() {
Mutex::Locker l(com_lock);
return committing_seq != committed_seq;
return apply_manager.is_committing();
}
uint64_t get_committed_seq() {
Mutex::Locker l(com_lock);
return committed_seq;
return apply_manager.get_committed_seq();
}
public:
JournalingObjectStore() : applied_seq(0), committing_seq(0), committed_seq(0),
open_ops(0), blocked(false),
journal(NULL), finisher(g_ceph_context),
journal_lock("JournalingObjectStore::journal_lock"),
com_lock("JournalingObjectStore::com_lock"),
JournalingObjectStore() : journal(NULL), finisher(g_ceph_context),
apply_manager(journal, finisher),
replaying(false) {}
};