filestore: make OpSequencer::flush() work for writeahead journaling items

It was only waiting for items in the op_queue to complete.  The goal is
to wait for anything we've called queue_transactions(&osr,...) on. If we
do writeahead journaling, though, there might be new ops that are still
journaling but not yet submitted to the fs that are missed.

This adds a journal queue to the OpSequencer, and uses it in the writeahead
case only.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2010-12-17 15:12:17 -08:00
parent 285f351b72
commit 3a235b0f21
2 changed files with 46 additions and 22 deletions

View File

@ -1230,7 +1230,7 @@ int FileStore::umount()
/// -----------------------------
void FileStore::queue_op(Sequencer *posr, uint64_t op_seq, list<Transaction*>& tls,
void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>& tls,
Context *onreadable, Context *onreadable_sync)
{
uint64_t bytes = 0, ops = 0;
@ -1260,18 +1260,6 @@ void FileStore::queue_op(Sequencer *posr, uint64_t op_seq, list<Transaction*>& t
op_tp.lock();
OpSequencer *osr;
if (!posr)
posr = &default_osr;
if (posr->p) {
osr = (OpSequencer *)posr->p;
dout(10) << "queue_op existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl;
} else {
osr = new OpSequencer;
osr->parent = posr;
posr->p = osr;
dout(10) << "queue_op new osr " << osr << "/" << osr->parent << dendl;
}
osr->queue(o);
op_queue_len++;
@ -1361,13 +1349,13 @@ void FileStore::_finish_op(OpSequencer *osr)
struct C_JournaledAhead : public Context {
FileStore *fs;
ObjectStore::Sequencer *osr;
FileStore::OpSequencer *osr;
uint64_t op;
list<ObjectStore::Transaction*> tls;
Context *onreadable, *onreadable_sync;
Context *ondisk;
C_JournaledAhead(FileStore *f, ObjectStore::Sequencer *os, uint64_t o, list<ObjectStore::Transaction*>& t,
C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, uint64_t o, list<ObjectStore::Transaction*>& t,
Context *onr, Context *ond, Context *onrsync) :
fs(f), osr(os), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
void finish(int r) {
@ -1382,10 +1370,24 @@ int FileStore::queue_transaction(Sequencer *osr, Transaction *t)
return queue_transactions(osr, tls, new C_DeleteTransaction(t));
}
int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
Context *onreadable, Context *ondisk,
Context *onreadable_sync)
{
// set up the sequencer
OpSequencer *osr;
if (!posr)
posr = &default_osr;
if (posr->p) {
osr = (OpSequencer *)posr->p;
dout(10) << "queue_transactions existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl;
} else {
osr = new OpSequencer;
osr->parent = posr;
posr->p = osr;
dout(10) << "queue_transactions new osr " << osr << "/" << osr->parent << dendl;
}
if (journal && journal->is_writeable()) {
if (g_conf.filestore_journal_parallel) {
@ -1414,6 +1416,7 @@ int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
uint64_t op = op_submit_start();
dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
osr->queue_journal(op);
_op_journal_transactions(tls, op,
new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
op_submit_finish(op);
@ -1447,7 +1450,7 @@ int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
return r;
}
void FileStore::_journaled_ahead(Sequencer *osr, uint64_t op,
void FileStore::_journaled_ahead(OpSequencer *osr, uint64_t op,
list<Transaction*> &tls,
Context *onreadable, Context *ondisk,
Context *onreadable_sync)
@ -1456,6 +1459,8 @@ void FileStore::_journaled_ahead(Sequencer *osr, uint64_t op,
op_queue_throttle();
osr->dequeue_journal();
// this should queue in order because the journal does it's completions in order.
journal_lock.Lock();
queue_op(osr, op, tls, onreadable, onreadable_sync);

View File

@ -101,11 +101,21 @@ class FileStore : public JournalingObjectStore {
class OpSequencer : public Sequencer_impl {
Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
list<Op*> q;
list<uint64_t> jq;
Cond cond;
public:
Sequencer *parent;
Mutex apply_lock; // for apply mutual exclusion
void queue_journal(uint64_t s) {
Mutex::Locker l(qlock);
jq.push_back(s);
}
void dequeue_journal() {
Mutex::Locker l(qlock);
jq.pop_front();
cond.Signal();
}
void queue(Op *o) {
Mutex::Locker l(qlock);
q.push_back(o);
@ -124,9 +134,18 @@ class FileStore : public JournalingObjectStore {
}
void flush() {
Mutex::Locker l(qlock);
if (!q.empty()) {
uint64_t seq = q.back()->op;
while (!q.empty() && q.front()->op <= seq)
// get max for journal _or_ op queues
uint64_t seq = 0;
if (!q.empty())
seq = q.back()->op;
if (!jq.empty() && jq.back() > seq)
seq = jq.back();
if (seq) {
// everything prior to our watermark to drain through either/both queues
while ((!q.empty() && q.front()->op <= seq) ||
(!jq.empty() && jq.front() <= seq))
cond.Wait(qlock);
}
}
@ -180,9 +199,9 @@ class FileStore : public JournalingObjectStore {
void _do_op(OpSequencer *o);
void _finish_op(OpSequencer *o);
void queue_op(Sequencer *osr, uint64_t op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
void queue_op(OpSequencer *osr, uint64_t op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
void op_queue_throttle();
void _journaled_ahead(Sequencer *osr, uint64_t op, list<Transaction*> &tls,
void _journaled_ahead(OpSequencer *osr, uint64_t op, list<Transaction*> &tls,
Context *onreadable, Context *ondisk, Context *onreadable_sync);
friend class C_JournaledAhead;