diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 8d26707c79b..e9020757aba 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -794,7 +794,8 @@ OPTION(newstore_max_dir_size, OPT_U32, 1000000) OPTION(newstore_onode_map_size, OPT_U32, 1024) // onodes per collection OPTION(newstore_backend, OPT_STR, "rocksdb") OPTION(newstore_fail_eio, OPT_BOOL, true) -OPTION(newstore_sync_queue_transaction, OPT_BOOL, false) // perform write synchronously from queue_transaction +OPTION(newstore_sync_io, OPT_BOOL, false) // perform initial io synchronously +OPTION(newstore_sync_transaction, OPT_BOOL, false) // perform kv txn synchronously OPTION(newstore_fsync_threads, OPT_INT, 16) // num threads calling fsync OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value diff --git a/src/os/newstore/NewStore.cc b/src/os/newstore/NewStore.cc index 246ff186992..25eb6412134 100644 --- a/src/os/newstore/NewStore.cc +++ b/src/os/newstore/NewStore.cc @@ -991,7 +991,7 @@ int NewStore::mount() if (r < 0) goto out_db; - r = _replay_wal(); + r = _wal_replay(); if (r < 0) goto out_aio; @@ -2036,6 +2036,88 @@ NewStore::TransContext *NewStore::_txc_create(OpSequencer *osr) return txc; } +void NewStore::_txc_state_proc(TransContext *txc) +{ + while (true) { + dout(10) << __func__ << " txc " << txc + << " " << txc->get_state_name() << dendl; + switch (txc->state) { + case TransContext::STATE_PREPARE: + if (!txc->aios.empty()) { + txc->state = TransContext::STATE_AIO_WAIT; + _txc_aio_submit(txc); + return; + } + // ** fall-thru ** + + case TransContext::STATE_AIO_WAIT: + if (!txc->fds.empty()) { + txc->state = TransContext::STATE_FSYNC_WAIT; + if (!g_conf->newstore_sync_io) { + _txc_queue_fsync(txc); + return; + } + _txc_do_sync_fsync(txc); + } + _txc_finish_io(txc); // may trigger blocked txc's too + return; + + case TransContext::STATE_IO_DONE: + assert(txc->osr->qlock.is_locked()); // see _txc_finish_io + txc->state = TransContext::STATE_KV_QUEUED; + if (!g_conf->newstore_sync_transaction) { + Mutex::Locker l(kv_lock); + db->submit_transaction(txc->t); + kv_queue.push_back(txc); + kv_cond.SignalOne(); + return; + } + db->submit_transaction_sync(txc->t); + break; + + case TransContext::STATE_KV_QUEUED: + txc->state = TransContext::STATE_KV_DONE; + _txc_finish_kv(txc); + // ** fall-thru ** + + case TransContext::STATE_KV_DONE: + if (txc->wal_txn) { + txc->state = TransContext::STATE_WAL_QUEUED; + wal_wq.queue(txc); + return; + } + txc->state = TransContext::STATE_FINISHING; + break; + + case TransContext::STATE_WAL_APPLYING: + if (!txc->aios.empty()) { + txc->state = TransContext::STATE_WAL_AIO_WAIT; + _txc_aio_submit(txc); + return; + } + // ** fall-thru ** + + case TransContext::STATE_WAL_AIO_WAIT: + _wal_finish(txc); + return; + + case TransContext::STATE_WAL_CLEANUP: + txc->state = TransContext::STATE_FINISHING; + // ** fall-thru ** + + case TransContext::TransContext::STATE_FINISHING: + _txc_finish(txc); + return; + + default: + derr << __func__ << " unexpected txc " << txc + << " state " << txc->get_state_name() << dendl; + assert(0 == "unexpected txc state"); + return; + } + } +} + void NewStore::_txc_process_fsync(fsync_item *i) { dout(20) << __func__ << " txc " << i->txc << dendl; @@ -2049,12 +2131,12 @@ void NewStore::_txc_process_fsync(fsync_item *i) } VOID_TEMP_FAILURE_RETRY(::close(i->fd)); if (i->txc->finish_fsync()) { - _txc_finish_fsync(i->txc); + _txc_finish_io(i->txc); } dout(20) << __func__ << " txc " << i->txc << " done" << dendl; } -void NewStore::_txc_finish_fsync(TransContext *txc) +void NewStore::_txc_finish_io(TransContext *txc) { dout(20) << __func__ << " " << txc << dendl; @@ -2065,25 +2147,25 @@ void NewStore::_txc_finish_fsync(TransContext *txc) OpSequencer *osr = txc->osr.get(); Mutex::Locker l(osr->qlock); - txc->state = TransContext::STATE_FSYNC_DONE; + txc->state = TransContext::STATE_IO_DONE; OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc); while (p != osr->q.begin()) { --p; - if (p->state < TransContext::STATE_FSYNC_DONE) { + if (p->state < TransContext::STATE_IO_DONE) { dout(20) << __func__ << " " << txc << " blocked by " << &*p << " " << p->get_state_name() << dendl; return; } - if (p->state > TransContext::STATE_FSYNC_DONE) { + if (p->state > TransContext::STATE_IO_DONE) { ++p; break; } } do { - _txc_submit_kv(&*p++); + _txc_state_proc(&*p++); } while (p != osr->q.end() && - p->state == TransContext::STATE_FSYNC_DONE); + p->state == TransContext::STATE_IO_DONE); } int NewStore::_txc_finalize(OpSequencer *osr, TransContext *txc) @@ -2119,7 +2201,6 @@ int NewStore::_txc_finalize(OpSequencer *osr, TransContext *txc) void NewStore::_txc_queue_fsync(TransContext *txc) { dout(20) << __func__ << " txc " << txc << dendl; - txc->state = TransContext::STATE_FSYNC_QUEUED; fsync_wq.lock(); for (list::iterator p = txc->fds.begin(); p != txc->fds.end(); @@ -2130,22 +2211,25 @@ void NewStore::_txc_queue_fsync(TransContext *txc) fsync_wq.unlock(); } -void NewStore::_txc_submit_kv(TransContext *txc) +void NewStore::_txc_do_sync_fsync(TransContext *txc) { dout(20) << __func__ << " txc " << txc << dendl; - txc->state = TransContext::STATE_KV_QUEUED; - - Mutex::Locker l(kv_lock); - db->submit_transaction(txc->t); - kv_queue.push_back(txc); - kv_cond.SignalOne(); + for (list::iterator p = txc->fds.begin(); + p != txc->fds.end(); ++p) { + dout(30) << __func__ << " fsync " << p->fd << dendl; + int r = ::fdatasync(p->fd); + if (r < 0) { + r = -errno; + derr << __func__ << " fsync: " << cpp_strerror(r) << dendl; + assert(0 == "fsync error"); + } + VOID_TEMP_FAILURE_RETRY(::close(p->fd)); + } } void NewStore::_txc_finish_kv(TransContext *txc) { dout(20) << __func__ << " txc " << txc << dendl; - txc->osr->qlock.Lock(); - txc->state = TransContext::STATE_KV_DONE; // warning: we're calling onreadable_sync inside the sequencer lock if (txc->onreadable_sync) { @@ -2164,20 +2248,9 @@ void NewStore::_txc_finish_kv(TransContext *txc) finisher.queue(txc->oncommits.front()); txc->oncommits.pop_front(); } - - if (txc->wal_txn) { - dout(20) << __func__ << " starting wal apply" << dendl; - txc->state = TransContext::STATE_WAL_QUEUED; - txc->osr->qlock.Unlock(); - wal_wq.queue(txc); - } else { - txc->state = TransContext::STATE_FINISHING; - txc->osr->qlock.Unlock(); - _txc_finish_apply(txc); - } } -void NewStore::_txc_finish_apply(TransContext *txc) +void NewStore::_txc_finish(TransContext *txc) { dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl; assert(txc->state == TransContext::STATE_FINISHING); @@ -2250,23 +2323,7 @@ void NewStore::_aio_thread() << left << " aios left" << dendl; VOID_TEMP_FAILURE_RETRY(::close(aio->fd)); if (left == 0) { - switch (txc->state) { - case TransContext::STATE_AIO_QUEUED: - txc->state = TransContext::STATE_AIO_DONE; - if (!txc->fds.empty()) { - _txc_queue_fsync(txc); - } else { - _txc_finish_fsync(txc); - } - break; - - case TransContext::STATE_WAL_AIO_WAIT: - _wal_finish(txc); - break; - - default: - assert(0 == "unexpected txc state on aio completion"); - } + _txc_state_proc(txc); } } } @@ -2298,18 +2355,7 @@ void NewStore::_kv_sync_thread() << " in " << dur << dendl; while (!kv_committing.empty()) { TransContext *txc = kv_committing.front(); - if (txc->state == TransContext::STATE_WAL_CLEANUP) { - txc->osr->qlock.Lock(); - txc->state = TransContext::STATE_FINISHING; - txc->osr->qlock.Unlock(); - _txc_finish_apply(txc); - } else if (txc->state == TransContext::STATE_KV_QUEUED) { - _txc_finish_kv(txc); - } else { - derr << __func__ << " unexpected txc state " << txc->get_state_name() - << dendl; - assert(0); - } + _txc_state_proc(txc); kv_committing.pop_front(); } @@ -2340,16 +2386,10 @@ int NewStore::_wal_apply(TransContext *txc) txc->aios.clear(); int r = _do_wal_transaction(wt, txc); - if (r < 0) - return r; + assert(r == 0); - if (!txc->aios.empty()) { - _txc_aio_submit(txc); - txc->state = TransContext::STATE_WAL_AIO_WAIT; - return 0; - } else { - return _wal_finish(txc); - } + _txc_state_proc(txc); + return 0; } int NewStore::_wal_finish(TransContext *txc) @@ -2362,9 +2402,7 @@ int NewStore::_wal_finish(TransContext *txc) KeyValueDB::Transaction cleanup = db->get_transaction(); cleanup->rmkey(PREFIX_WAL, key); - txc->osr->qlock.Lock(); txc->state = TransContext::STATE_WAL_CLEANUP; - txc->osr->qlock.Unlock(); Mutex::Locker l(kv_lock); db->submit_transaction(cleanup); @@ -2490,7 +2528,7 @@ int NewStore::_do_wal_transaction(wal_transaction_t& wt, return 0; } -int NewStore::_replay_wal() +int NewStore::_wal_replay() { dout(10) << __func__ << " start" << dendl; KeyValueDB::Iterator it = db->get_iterator(PREFIX_WAL); @@ -2539,7 +2577,7 @@ int NewStore::queue_transactions( tls, &onreadable, &ondisk, &onreadable_sync); int r; - // throttle wal work + // throttle on wal work wal_wq.throttle(g_conf->newstore_wal_max_ops, g_conf->newstore_wal_max_bytes); @@ -2557,54 +2595,22 @@ int NewStore::queue_transactions( dout(5) << __func__ << " new " << *osr << "/" << osr->parent << dendl; } + // prepare TransContext *txc = _txc_create(osr); - - // XXX do it sync for now; this is not crash safe - for (list::iterator p = tls.begin(); p != tls.end(); ++p) { - (*p)->set_osr(osr); - _do_transaction(*p, txc, handle); - } - txc->onreadable = onreadable; txc->onreadable_sync = onreadable_sync; txc->oncommit = ondisk; + for (list::iterator p = tls.begin(); p != tls.end(); ++p) { + (*p)->set_osr(osr); + _txc_add_transaction(txc, *p); + } + r = _txc_finalize(osr, txc); assert(r == 0); - if (g_conf->newstore_sync_queue_transaction) { - // do it syncrhonously. for example, if we have a *very* fast backend. - - // sync - txc->state = TransContext::STATE_FSYNC_FSYNCING; - for (list::iterator p = txc->fds.begin(); - p != txc->fds.end(); ++p) { - dout(30) << __func__ << " fsync " << p->fd << dendl; - int r = ::fdatasync(p->fd); - if (r < 0) { - r = -errno; - derr << __func__ << " fsync: " << cpp_strerror(r) << dendl; - return r; - } - VOID_TEMP_FAILURE_RETRY(::close(p->fd)); - } - - txc->state = TransContext::STATE_KV_COMMITTING; - db->submit_transaction_sync(txc->t); - - _txc_finish_kv(txc); - } else { - // async path - if (!txc->aios.empty()) { - _txc_aio_submit(txc); - txc->state = TransContext::STATE_AIO_QUEUED; - } else if (!txc->fds.empty()) { - _txc_queue_fsync(txc); - } else { - _txc_finish_fsync(txc); - } - } - + // execute (start) + _txc_state_proc(txc); return 0; } @@ -2617,7 +2623,7 @@ void NewStore::_txc_aio_submit(TransContext *txc) p != txc->aios.end(); ++p) { FS::aio_t& aio = *p; - dout(20) << __func__ << " submitting aio " << &aio << dendl; + dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd << dendl; for (vector::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q) dout(30) << __func__ << " iov " << (void*)q->iov_base << " len " << q->iov_len << dendl; @@ -2631,9 +2637,7 @@ void NewStore::_txc_aio_submit(TransContext *txc) } } -int NewStore::_do_transaction(Transaction *t, - TransContext *txc, - ThreadPool::TPHandle *handle) +int NewStore::_txc_add_transaction(TransContext *txc, Transaction *t) { Transaction::iterator i = t->begin(); int pos = 0; diff --git a/src/os/newstore/NewStore.h b/src/os/newstore/NewStore.h index 93e547566b3..961b55cce1e 100644 --- a/src/os/newstore/NewStore.h +++ b/src/os/newstore/NewStore.h @@ -145,11 +145,9 @@ public: struct TransContext { typedef enum { STATE_PREPARE, - STATE_AIO_QUEUED, - STATE_AIO_DONE, - STATE_FSYNC_QUEUED, - STATE_FSYNC_FSYNCING, - STATE_FSYNC_DONE, + STATE_FSYNC_WAIT, + STATE_AIO_WAIT, + STATE_IO_DONE, STATE_KV_QUEUED, STATE_KV_COMMITTING, STATE_KV_DONE, @@ -167,11 +165,9 @@ public: const char *get_state_name() { switch (state) { case STATE_PREPARE: return "prepare"; - case STATE_FSYNC_QUEUED: return "fsync_queued"; - case STATE_FSYNC_FSYNCING: return "fsync_fsyncing"; - case STATE_FSYNC_DONE: return "fsync_done"; - case STATE_AIO_QUEUED: return "aio_queued"; - case STATE_AIO_DONE: return "aio_done"; + case STATE_FSYNC_WAIT: return "fsync_wait"; + case STATE_AIO_WAIT: return "aio_wait"; + case STATE_IO_DONE: return "io_done"; case STATE_KV_QUEUED: return "kv_queued"; case STATE_KV_COMMITTING: return "kv_committing"; case STATE_KV_DONE: return "kv_done"; @@ -558,14 +554,16 @@ private: int _clean_fid_tail(TransContext *txc, const fragment_t& f); TransContext *_txc_create(OpSequencer *osr); + int _txc_add_transaction(TransContext *txc, Transaction *t); int _txc_finalize(OpSequencer *osr, TransContext *txc); + void _txc_state_proc(TransContext *txc); void _txc_aio_submit(TransContext *txc); + void _txc_do_sync_fsync(TransContext *txc); void _txc_queue_fsync(TransContext *txc); void _txc_process_fsync(fsync_item *i); - void _txc_finish_fsync(TransContext *txc); - void _txc_submit_kv(TransContext *txc); + void _txc_finish_io(TransContext *txc); void _txc_finish_kv(TransContext *txc); - void _txc_finish_apply(TransContext *txc); + void _txc_finish(TransContext *txc); void _osr_reap_done(OpSequencer *osr); @@ -588,9 +586,7 @@ private: int _wal_apply(TransContext *txc); int _wal_finish(TransContext *txc); int _do_wal_transaction(wal_transaction_t& wt, TransContext *txc); - void _wait_object_wal(OnodeRef onode); - int _replay_wal(); - friend class C_ApplyWAL; + int _wal_replay(); public: NewStore(CephContext *cct, const string& path);