From c6c8fce46b6bd0b96a10355329fd923cbfe1663a Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 15 Oct 2012 15:39:55 -0700 Subject: [PATCH] FileJournal: break writeq locking from queue_lock This prevents the relatively long process of queueing finishers from preventing op submission. In submit_entry, we no longer check for full before placing the write in the writeq, committed_thru should work anyway, and we don't want to grab the required lock. Signed-off-by: Samuel Just --- src/os/FileJournal.cc | 90 +++++++++++++++++++------------------------ src/os/FileJournal.h | 26 ++++++++++++- 2 files changed, 65 insertions(+), 51 deletions(-) diff --git a/src/os/FileJournal.cc b/src/os/FileJournal.cc index 2ef43798c52..730dac6e8bb 100644 --- a/src/os/FileJournal.cc +++ b/src/os/FileJournal.cc @@ -626,10 +626,10 @@ void FileJournal::stop_writer() #ifdef HAVE_LIBAIO Mutex::Locker q(aio_lock); #endif - Mutex::Locker p(queue_lock); + Mutex::Locker p(writeq_lock); write_stop = true; write_cond.Signal(); - queue_cond.Signal(); + writeq_cond.Signal(); #ifdef HAVE_LIBAIO aio_cond.Signal(); write_finish_cond.Signal(); @@ -829,22 +829,24 @@ void FileJournal::queue_completions_thru(uint64_t seq) { assert(queue_lock.is_locked()); utime_t now = ceph_clock_now(g_ceph_context); - while (!completions.empty() && - completions.front().seq <= seq) { + while (!completions_empty()) { + completion_item next = completion_peek_front(); + if (next.seq > seq) + break; + completion_pop_front(); utime_t lat = now; - lat -= completions.front().start; + lat -= next.start; dout(10) << "queue_completions_thru seq " << seq - << " queueing seq " << completions.front().seq - << " " << completions.front().finish + << " queueing seq " << next.seq + << " " << next.finish << " lat " << lat << dendl; if (logger) { logger->finc(l_os_j_lat, lat); } - if (completions.front().finish) - finisher->queue(completions.front().finish); - if (completions.front().tracked_op) - completions.front().tracked_op->mark_event("journaled_completion_queued"); - completions.pop_front(); + if (next.finish) + finisher->queue(next.finish); + if (next.tracked_op) + next.tracked_op->mark_event("journaled_completion_queued"); } queue_cond.Signal(); } @@ -1084,7 +1086,7 @@ void FileJournal::flush() dout(5) << "waiting for completions to empty" << dendl; { Mutex::Locker l(queue_lock); - while (!completions.empty()) + while (!completions_empty()) queue_cond.Wait(queue_lock); } dout(5) << "flush waiting for finisher" << dendl; @@ -1098,16 +1100,12 @@ void FileJournal::write_thread_entry() dout(10) << "write_thread_entry start" << dendl; while (1) { { - Mutex::Locker locker(queue_lock); + Mutex::Locker locker(writeq_lock); if (writeq.empty()) { if (write_stop) break; dout(20) << "write_thread_entry going to sleep" << dendl; - { - if (writeq.empty()) { - queue_cond.Wait(queue_lock); - } - } + writeq_cond.Wait(writeq_lock); dout(20) << "write_thread_entry woke up" << dendl; continue; } @@ -1391,60 +1389,52 @@ void FileJournal::check_aio_completion() void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, Context *oncommit, TrackedOpRef osd_op) { - Mutex::Locker locker(queue_lock); // ** lock ** - // dump on queue dout(5) << "submit_entry seq " << seq - << " len " << e.length() - << " (" << oncommit << ")" << dendl; + << " len " << e.length() + << " (" << oncommit << ")" << dendl; assert(e.length() > 0); - completions.push_back( - completion_item( - seq, oncommit, ceph_clock_now(g_ceph_context), osd_op)); - - if (full_state == FULL_NOTFULL) { - if (osd_op) - osd_op->mark_event("commit_queued_for_journal_write"); - // queue and kick writer thread - dout(30) << "XXX throttle take " << e.length() << dendl; - throttle_ops.take(1); - throttle_bytes.take(e.length()); - - if (logger) { - logger->set(l_os_jq_max_ops, throttle_ops.get_max()); - logger->set(l_os_jq_max_bytes, throttle_bytes.get_max()); - logger->set(l_os_jq_ops, throttle_ops.get_current()); - logger->set(l_os_jq_bytes, throttle_bytes.get_current()); - } + dout(30) << "XXX throttle take " << e.length() << dendl; + throttle_ops.take(1); + throttle_bytes.take(e.length()); + if (osd_op) + osd_op->mark_event("commit_queued_for_journal_write"); + if (logger) { + logger->set(l_os_jq_max_ops, throttle_ops.get_max()); + logger->set(l_os_jq_max_bytes, throttle_bytes.get_max()); + logger->set(l_os_jq_ops, throttle_ops.get_current()); + logger->set(l_os_jq_bytes, throttle_bytes.get_current()); + } + { + Mutex::Locker l1(writeq_lock); // ** lock ** + Mutex::Locker l2(completions_lock); // ** lock ** + completions.push_back( + completion_item( + seq, oncommit, ceph_clock_now(g_ceph_context), osd_op)); writeq.push_back(write_item(seq, e, alignment, osd_op)); - queue_cond.Signal(); - } else { - if (osd_op) - osd_op->mark_event("commit_blocked_by_journal_full"); - // not journaling this. restart writing no sooner than seq + 1. - dout(10) << " journal is/was full" << dendl; + writeq_cond.Signal(); } } bool FileJournal::writeq_empty() { - Mutex::Locker locker(queue_lock); + Mutex::Locker locker(writeq_lock); return writeq.empty(); } FileJournal::write_item &FileJournal::peek_write() { assert(write_lock.is_locked()); - Mutex::Locker locker(queue_lock); + Mutex::Locker locker(writeq_lock); return writeq.front(); } void FileJournal::pop_write() { assert(write_lock.is_locked()); - Mutex::Locker locker(queue_lock); + Mutex::Locker locker(writeq_lock); writeq.pop_front(); } diff --git a/src/os/FileJournal.h b/src/os/FileJournal.h index 5b3e6ee9544..cd13330c468 100644 --- a/src/os/FileJournal.h +++ b/src/os/FileJournal.h @@ -58,15 +58,36 @@ public: } write_item() : seq(0), alignment(0) {} }; + Mutex queue_lock; Cond queue_cond; uint64_t journaled_seq; bool plug_journal_completions; + + Mutex writeq_lock; + Cond writeq_cond; deque writeq; - deque completions; bool writeq_empty(); write_item &peek_write(); void pop_write(); + + Mutex completions_lock; + deque completions; + bool completions_empty() { + Mutex::Locker l(completions_lock); + return completions.empty(); + } + completion_item completion_peek_front() { + Mutex::Locker l(completions_lock); + assert(!completions.empty()); + return completions.front(); + } + void completion_pop_front() { + Mutex::Locker l(completions_lock); + assert(!completions.empty()); + completions.pop_front(); + } + void submit_entry(uint64_t seq, bufferlist& bl, int alignment, Context *oncommit, TrackedOpRef osd_op = TrackedOpRef()); @@ -295,6 +316,9 @@ private: queue_lock("FileJournal::queue_lock", false, true, false, g_ceph_context), journaled_seq(0), plug_journal_completions(false), + writeq_lock("FileJournal::writeq_lock", false, true, false, g_ceph_context), + completions_lock( + "FileJournal::completions_lock", false, true, false, g_ceph_context), fn(f), zero_buf(NULL), max_size(0), block_size(0),