diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 3feb92924a9..2c66a5ea7db 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1886,10 +1886,9 @@ FileStore::Op *FileStore::build_op(list& tls, void FileStore::queue_op(OpSequencer *osr, Op *o) { - // mark apply start _now_, because we need to drain the entire apply - // queue during commit in order to put the store in a consistent - // state. - apply_manager.op_apply_start(o->op); + // queue op on sequencer, then queue sequencer for the threadpool, + // so that regardless of which order the threads pick up the + // sequencer, the op order will be preserved. osr->queue(o); @@ -1953,16 +1952,12 @@ void FileStore::_do_op(OpSequencer *osr) { osr->apply_lock.Lock(); Op *o = osr->peek_queue(); - + apply_manager.op_apply_start(o->op); dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl; int r = do_transactions(o->tls, o->op); apply_manager.op_apply_finish(o->op); dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl; - - /*dout(10) << "op_entry finished " << o->bytes << " bytes, queue now " - << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl; - */ } void FileStore::_finish_op(OpSequencer *osr) diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc index 42b95c96a58..b1aee62eca8 100644 --- a/src/os/JournalingObjectStore.cc +++ b/src/os/JournalingObjectStore.cc @@ -111,7 +111,8 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) Mutex::Locker l(apply_lock); // if we ops are blocked, or there are already people (left) in // line, get in line. - if (blocked || !ops_apply_blocked.empty()) { + if (op > max_applying_seq && + (blocked || !ops_apply_blocked.empty())) { Cond cond; ops_apply_blocked.push_back(&cond); dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl; @@ -125,9 +126,12 @@ uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) ops_apply_blocked.front()->Signal(); } } - dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl; - assert(!blocked); + dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) + << ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl; + if (op > max_applying_seq) + max_applying_seq = op; + assert(op > committed_seq); open_ops++; return op; } @@ -136,15 +140,18 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op) { Mutex::Locker l(apply_lock); dout(10) << "op_apply_finish " << op << " open_ops " << open_ops - << " -> " << (open_ops-1) << dendl; + << " -> " << (open_ops-1) + << ", max_applying_seq " << max_applying_seq + << ", max_applied_seq " << max_applied_seq << " -> " << MAX(op, max_applied_seq) + << dendl; if (--open_ops == 0) open_ops_cond.Signal(); // there can be multiple applies in flight; track the max value we // note. note that we can't _read_ this value and learn anything // meaningful unless/until we've quiesced all in-flight applies. - if (op > applied_seq) - applied_seq = op; + if (op > max_applied_seq) + max_applied_seq = op; } uint64_t JournalingObjectStore::SubmitManager::op_submit_start() @@ -185,19 +192,21 @@ bool JournalingObjectStore::ApplyManager::commit_start() { Mutex::Locker l(apply_lock); - dout(10) << "commit_start " - << ", applied_seq " << applied_seq << dendl; + dout(10) << "commit_start max_applying_seq " << max_applying_seq + << ", max_applied_seq " << max_applied_seq + << dendl; blocked = true; while (open_ops > 0) { - dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl; + dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, " + << " max_applying_seq " << max_applying_seq << " max_applied_seq " << max_applied_seq << dendl; open_ops_cond.Wait(apply_lock); } assert(open_ops == 0); - + assert(max_applied_seq == max_applying_seq); dout(10) << "commit_start blocked, all open_ops have completed" << dendl; { Mutex::Locker l(com_lock); - if (applied_seq == committed_seq) { + if (max_applied_seq == committed_seq) { dout(10) << "commit_start nothing to do" << dendl; blocked = false; if (!ops_apply_blocked.empty()) @@ -206,7 +215,7 @@ bool JournalingObjectStore::ApplyManager::commit_start() goto out; } - committing_seq = applied_seq; + committing_seq = max_applying_seq; dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl; diff --git a/src/os/JournalingObjectStore.h b/src/os/JournalingObjectStore.h index dff49d43cbb..ae74c32cd25 100644 --- a/src/os/JournalingObjectStore.h +++ b/src/os/JournalingObjectStore.h @@ -54,7 +54,8 @@ protected: Cond blocked_cond; int open_ops; Cond open_ops_cond; - uint64_t applied_seq; + uint64_t max_applying_seq; + uint64_t max_applied_seq; Mutex com_lock; map > commit_waiters; @@ -68,7 +69,8 @@ protected: apply_lock("JOS::ApplyManager::apply_lock", false, true, false, g_ceph_context), blocked(false), open_ops(0), - applied_seq(0), + max_applying_seq(0), + max_applied_seq(0), com_lock("JOS::ApplyManager::com_lock", false, true, false, g_ceph_context), committing_seq(0), committed_seq(0) {} void add_waiter(uint64_t, Context*); @@ -97,7 +99,7 @@ protected: } { Mutex::Locker l(apply_lock); - applied_seq = fs_op_seq; + max_applying_seq = max_applied_seq = fs_op_seq; } } } apply_manager;