Merge branch 'wip-filestore' into next

Reviewed-by: Sam Just <sam.just@inktank.com>
This commit is contained in:
Sage Weil 2012-12-04 15:05:18 -08:00
commit 3ef741ac2d
3 changed files with 30 additions and 24 deletions

View File

@ -1886,10 +1886,9 @@ FileStore::Op *FileStore::build_op(list<Transaction*>& tls,
void FileStore::queue_op(OpSequencer *osr, Op *o)
{
// mark apply start _now_, because we need to drain the entire apply
// queue during commit in order to put the store in a consistent
// state.
apply_manager.op_apply_start(o->op);
// queue op on sequencer, then queue sequencer for the threadpool,
// so that regardless of which order the threads pick up the
// sequencer, the op order will be preserved.
osr->queue(o);
@ -1953,16 +1952,12 @@ void FileStore::_do_op(OpSequencer *osr)
{
osr->apply_lock.Lock();
Op *o = osr->peek_queue();
apply_manager.op_apply_start(o->op);
dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
int r = do_transactions(o->tls, o->op);
apply_manager.op_apply_finish(o->op);
dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
<< ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
/*dout(10) << "op_entry finished " << o->bytes << " bytes, queue now "
<< op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl;
*/
}
void FileStore::_finish_op(OpSequencer *osr)

View File

@ -111,7 +111,8 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
Mutex::Locker l(apply_lock);
// if we ops are blocked, or there are already people (left) in
// line, get in line.
if (blocked || !ops_apply_blocked.empty()) {
if (op > max_applying_seq &&
(blocked || !ops_apply_blocked.empty())) {
Cond cond;
ops_apply_blocked.push_back(&cond);
dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl;
@ -125,9 +126,12 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
ops_apply_blocked.front()->Signal();
}
}
dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl;
assert(!blocked);
dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1)
<< ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl;
if (op > max_applying_seq)
max_applying_seq = op;
assert(op > committed_seq);
open_ops++;
return op;
}
@ -136,15 +140,18 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
{
Mutex::Locker l(apply_lock);
dout(10) << "op_apply_finish " << op << " open_ops " << open_ops
<< " -> " << (open_ops-1) << dendl;
<< " -> " << (open_ops-1)
<< ", max_applying_seq " << max_applying_seq
<< ", max_applied_seq " << max_applied_seq << " -> " << MAX(op, max_applied_seq)
<< dendl;
if (--open_ops == 0)
open_ops_cond.Signal();
// there can be multiple applies in flight; track the max value we
// note. note that we can't _read_ this value and learn anything
// meaningful unless/until we've quiesced all in-flight applies.
if (op > applied_seq)
applied_seq = op;
if (op > max_applied_seq)
max_applied_seq = op;
}
uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
@ -185,19 +192,21 @@ bool JournalingObjectStore::ApplyManager::commit_start()
{
Mutex::Locker l(apply_lock);
dout(10) << "commit_start "
<< ", applied_seq " << applied_seq << dendl;
dout(10) << "commit_start max_applying_seq " << max_applying_seq
<< ", max_applied_seq " << max_applied_seq
<< dendl;
blocked = true;
while (open_ops > 0) {
dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, "
<< " max_applying_seq " << max_applying_seq << " max_applied_seq " << max_applied_seq << dendl;
open_ops_cond.Wait(apply_lock);
}
assert(open_ops == 0);
assert(max_applied_seq == max_applying_seq);
dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
{
Mutex::Locker l(com_lock);
if (applied_seq == committed_seq) {
if (max_applied_seq == committed_seq) {
dout(10) << "commit_start nothing to do" << dendl;
blocked = false;
if (!ops_apply_blocked.empty())
@ -206,7 +215,7 @@ bool JournalingObjectStore::ApplyManager::commit_start()
goto out;
}
committing_seq = applied_seq;
committing_seq = max_applying_seq;
dout(10) << "commit_start committing " << committing_seq
<< ", still blocked" << dendl;

View File

@ -54,7 +54,8 @@ protected:
Cond blocked_cond;
int open_ops;
Cond open_ops_cond;
uint64_t applied_seq;
uint64_t max_applying_seq;
uint64_t max_applied_seq;
Mutex com_lock;
map<version_t, vector<Context*> > commit_waiters;
@ -68,7 +69,8 @@ protected:
apply_lock("JOS::ApplyManager::apply_lock", false, true, false, g_ceph_context),
blocked(false),
open_ops(0),
applied_seq(0),
max_applying_seq(0),
max_applied_seq(0),
com_lock("JOS::ApplyManager::com_lock", false, true, false, g_ceph_context),
committing_seq(0), committed_seq(0) {}
void add_waiter(uint64_t, Context*);
@ -97,7 +99,7 @@ protected:
}
{
Mutex::Locker l(apply_lock);
applied_seq = fs_op_seq;
max_applying_seq = max_applied_seq = fs_op_seq;
}
}
} apply_manager;