FileJournal: break writeq locking from queue_lock

This prevents the relatively long process of queueing
finishers from preventing op submission.

In submit_entry, we no longer check for full before placing
the write in the writeq, committed_thru should work anyway,
and we don't want to grab the required lock.

Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2012-10-15 15:39:55 -07:00
parent 2646a8fe06
commit c6c8fce46b
2 changed files with 65 additions and 51 deletions

View File

@ -626,10 +626,10 @@ void FileJournal::stop_writer()
#ifdef HAVE_LIBAIO
Mutex::Locker q(aio_lock);
#endif
Mutex::Locker p(queue_lock);
Mutex::Locker p(writeq_lock);
write_stop = true;
write_cond.Signal();
queue_cond.Signal();
writeq_cond.Signal();
#ifdef HAVE_LIBAIO
aio_cond.Signal();
write_finish_cond.Signal();
@ -829,22 +829,24 @@ void FileJournal::queue_completions_thru(uint64_t seq)
{
assert(queue_lock.is_locked());
utime_t now = ceph_clock_now(g_ceph_context);
while (!completions.empty() &&
completions.front().seq <= seq) {
while (!completions_empty()) {
completion_item next = completion_peek_front();
if (next.seq > seq)
break;
completion_pop_front();
utime_t lat = now;
lat -= completions.front().start;
lat -= next.start;
dout(10) << "queue_completions_thru seq " << seq
<< " queueing seq " << completions.front().seq
<< " " << completions.front().finish
<< " queueing seq " << next.seq
<< " " << next.finish
<< " lat " << lat << dendl;
if (logger) {
logger->finc(l_os_j_lat, lat);
}
if (completions.front().finish)
finisher->queue(completions.front().finish);
if (completions.front().tracked_op)
completions.front().tracked_op->mark_event("journaled_completion_queued");
completions.pop_front();
if (next.finish)
finisher->queue(next.finish);
if (next.tracked_op)
next.tracked_op->mark_event("journaled_completion_queued");
}
queue_cond.Signal();
}
@ -1084,7 +1086,7 @@ void FileJournal::flush()
dout(5) << "waiting for completions to empty" << dendl;
{
Mutex::Locker l(queue_lock);
while (!completions.empty())
while (!completions_empty())
queue_cond.Wait(queue_lock);
}
dout(5) << "flush waiting for finisher" << dendl;
@ -1098,16 +1100,12 @@ void FileJournal::write_thread_entry()
dout(10) << "write_thread_entry start" << dendl;
while (1) {
{
Mutex::Locker locker(queue_lock);
Mutex::Locker locker(writeq_lock);
if (writeq.empty()) {
if (write_stop)
break;
dout(20) << "write_thread_entry going to sleep" << dendl;
{
if (writeq.empty()) {
queue_cond.Wait(queue_lock);
}
}
writeq_cond.Wait(writeq_lock);
dout(20) << "write_thread_entry woke up" << dendl;
continue;
}
@ -1391,60 +1389,52 @@ void FileJournal::check_aio_completion()
void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment,
Context *oncommit, TrackedOpRef osd_op)
{
Mutex::Locker locker(queue_lock); // ** lock **
// dump on queue
dout(5) << "submit_entry seq " << seq
<< " len " << e.length()
<< " (" << oncommit << ")" << dendl;
<< " len " << e.length()
<< " (" << oncommit << ")" << dendl;
assert(e.length() > 0);
completions.push_back(
completion_item(
seq, oncommit, ceph_clock_now(g_ceph_context), osd_op));
if (full_state == FULL_NOTFULL) {
if (osd_op)
osd_op->mark_event("commit_queued_for_journal_write");
// queue and kick writer thread
dout(30) << "XXX throttle take " << e.length() << dendl;
throttle_ops.take(1);
throttle_bytes.take(e.length());
if (logger) {
logger->set(l_os_jq_max_ops, throttle_ops.get_max());
logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
logger->set(l_os_jq_ops, throttle_ops.get_current());
logger->set(l_os_jq_bytes, throttle_bytes.get_current());
}
dout(30) << "XXX throttle take " << e.length() << dendl;
throttle_ops.take(1);
throttle_bytes.take(e.length());
if (osd_op)
osd_op->mark_event("commit_queued_for_journal_write");
if (logger) {
logger->set(l_os_jq_max_ops, throttle_ops.get_max());
logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
logger->set(l_os_jq_ops, throttle_ops.get_current());
logger->set(l_os_jq_bytes, throttle_bytes.get_current());
}
{
Mutex::Locker l1(writeq_lock); // ** lock **
Mutex::Locker l2(completions_lock); // ** lock **
completions.push_back(
completion_item(
seq, oncommit, ceph_clock_now(g_ceph_context), osd_op));
writeq.push_back(write_item(seq, e, alignment, osd_op));
queue_cond.Signal();
} else {
if (osd_op)
osd_op->mark_event("commit_blocked_by_journal_full");
// not journaling this. restart writing no sooner than seq + 1.
dout(10) << " journal is/was full" << dendl;
writeq_cond.Signal();
}
}
bool FileJournal::writeq_empty()
{
Mutex::Locker locker(queue_lock);
Mutex::Locker locker(writeq_lock);
return writeq.empty();
}
FileJournal::write_item &FileJournal::peek_write()
{
assert(write_lock.is_locked());
Mutex::Locker locker(queue_lock);
Mutex::Locker locker(writeq_lock);
return writeq.front();
}
void FileJournal::pop_write()
{
assert(write_lock.is_locked());
Mutex::Locker locker(queue_lock);
Mutex::Locker locker(writeq_lock);
writeq.pop_front();
}

View File

@ -58,15 +58,36 @@ public:
}
write_item() : seq(0), alignment(0) {}
};
Mutex queue_lock;
Cond queue_cond;
uint64_t journaled_seq;
bool plug_journal_completions;
Mutex writeq_lock;
Cond writeq_cond;
deque<write_item> writeq;
deque<completion_item> completions;
bool writeq_empty();
write_item &peek_write();
void pop_write();
Mutex completions_lock;
deque<completion_item> completions;
bool completions_empty() {
Mutex::Locker l(completions_lock);
return completions.empty();
}
completion_item completion_peek_front() {
Mutex::Locker l(completions_lock);
assert(!completions.empty());
return completions.front();
}
void completion_pop_front() {
Mutex::Locker l(completions_lock);
assert(!completions.empty());
completions.pop_front();
}
void submit_entry(uint64_t seq, bufferlist& bl, int alignment,
Context *oncommit,
TrackedOpRef osd_op = TrackedOpRef());
@ -295,6 +316,9 @@ private:
queue_lock("FileJournal::queue_lock", false, true, false, g_ceph_context),
journaled_seq(0),
plug_journal_completions(false),
writeq_lock("FileJournal::writeq_lock", false, true, false, g_ceph_context),
completions_lock(
"FileJournal::completions_lock", false, true, false, g_ceph_context),
fn(f),
zero_buf(NULL),
max_size(0), block_size(0),