os/newstore: throttle over entire write lifecycle

Take a global throttle when we submit ops and release when they complete.
The first throttles cover the period from submit to commit, while the wal
ones also cover the async post-commit wal work.  The configs are additive
since the wal ones cover both periods; this should make them reasonably
idiot-proof.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2015-04-23 14:51:51 -07:00
parent b1136fbd33
commit 41886c5420
3 changed files with 39 additions and 32 deletions

View File

@ -796,14 +796,17 @@ OPTION(newstore_backend, OPT_STR, "rocksdb")
OPTION(newstore_fail_eio, OPT_BOOL, true)
OPTION(newstore_sync_io, OPT_BOOL, false) // perform initial io synchronously
OPTION(newstore_sync_transaction, OPT_BOOL, false) // perform kv txn synchronously
OPTION(newstore_sync_wal_apply, OPT_BOOL, true) // perform initial wal work synchronously (possibly in combination with aio so we only *queue* ios)
OPTION(newstore_fsync_threads, OPT_INT, 16) // num threads calling fsync
OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value
OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value
OPTION(newstore_wal_threads, OPT_INT, 4)
OPTION(newstore_wal_thread_timeout, OPT_INT, 30)
OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120)
OPTION(newstore_wal_max_ops, OPT_U64, 64)
OPTION(newstore_wal_max_bytes, OPT_U64, 64*1024*1024)
OPTION(newstore_max_ops, OPT_U64, 512)
OPTION(newstore_max_bytes, OPT_U64, 64*1024*1024)
OPTION(newstore_wal_max_ops, OPT_U64, 512)
OPTION(newstore_wal_max_bytes, OPT_U64, 128*1024*1024)
OPTION(newstore_fid_prealloc, OPT_INT, 1024)
OPTION(newstore_nid_prealloc, OPT_INT, 1024)
OPTION(newstore_overlay_max_length, OPT_INT, 65536)

View File

@ -584,6 +584,14 @@ NewStore::NewStore(CephContext *cct, const string& path)
fid_lock("NewStore::fid_lock"),
nid_lock("NewStore::nid_lock"),
nid_max(0),
throttle_ops(cct, "newstore_max_ops", cct->_conf->newstore_max_ops),
throttle_bytes(cct, "newstore_max_bytes", cct->_conf->newstore_max_bytes),
throttle_wal_ops(cct, "newstore_wal_max_ops",
cct->_conf->newstore_max_ops +
cct->_conf->newstore_wal_max_ops),
throttle_wal_bytes(cct, "newstore_wal_max_bytes",
cct->_conf->newstore_max_bytes +
cct->_conf->newstore_wal_max_bytes),
wal_lock("NewStore::wal_lock"),
wal_seq(0),
wal_tp(cct,
@ -2088,7 +2096,11 @@ void NewStore::_txc_state_proc(TransContext *txc)
case TransContext::STATE_KV_DONE:
if (txc->wal_txn) {
txc->state = TransContext::STATE_WAL_QUEUED;
wal_wq.queue(txc);
if (g_conf->newstore_sync_wal_apply) {
_wal_apply(txc);
} else {
wal_wq.queue(txc);
}
return;
}
txc->state = TransContext::STATE_FINISHING;
@ -2253,6 +2265,9 @@ void NewStore::_txc_finish_kv(TransContext *txc)
finisher.queue(txc->oncommits.front());
txc->oncommits.pop_front();
}
throttle_ops.put(txc->ops);
throttle_bytes.put(txc->bytes);
}
void NewStore::_txc_finish(TransContext *txc)
@ -2280,6 +2295,9 @@ void NewStore::_txc_finish(TransContext *txc)
txc->removed_collections.pop_front();
}
throttle_wal_ops.put(txc->ops);
throttle_wal_bytes.put(txc->bytes);
OpSequencerRef osr = txc->osr;
osr->qlock.Lock();
txc->state = TransContext::STATE_DONE;
@ -2402,8 +2420,6 @@ int NewStore::_wal_finish(TransContext *txc)
wal_transaction_t& wt = *txc->wal_txn;
dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
wal_wq.release_throttle(txc);
string key;
get_wal_key(wt.seq, &key);
KeyValueDB::Transaction cleanup = db->get_transaction();
@ -2584,10 +2600,6 @@ int NewStore::queue_transactions(
tls, &onreadable, &ondisk, &onreadable_sync);
int r;
// throttle on wal work
wal_wq.throttle(g_conf->newstore_wal_max_ops,
g_conf->newstore_wal_max_bytes);
// set up the sequencer
OpSequencer *osr;
if (!posr)
@ -2610,12 +2622,19 @@ int NewStore::queue_transactions(
for (list<Transaction*>::iterator p = tls.begin(); p != tls.end(); ++p) {
(*p)->set_osr(osr);
txc->ops += (*p)->get_num_ops();
txc->bytes += (*p)->get_num_bytes();
_txc_add_transaction(txc, *p);
}
r = _txc_finalize(osr, txc);
assert(r == 0);
throttle_ops.get(txc->ops);
throttle_bytes.get(txc->bytes);
throttle_wal_ops.get(txc->ops);
throttle_wal_bytes.get(txc->bytes);
// execute (start)
_txc_state_proc(txc);
return 0;

View File

@ -185,6 +185,8 @@ public:
OpSequencerRef osr;
boost::intrusive::list_member_hook<> sequencer_item;
uint64_t ops, bytes;
list<fsync_item> fds; ///< these fds need to be synced
set<OnodeRef> onodes; ///< these onodes need to be updated/written
KeyValueDB::Transaction t; ///< then we will commit this
@ -210,6 +212,8 @@ public:
TransContext(OpSequencer *o)
: state(STATE_PREPARE),
osr(o),
ops(0),
bytes(0),
oncommit(NULL),
onreadable(NULL),
onreadable_sync(NULL),
@ -376,15 +380,11 @@ public:
private:
NewStore *store;
wal_osr_queue_t wal_queue;
uint64_t ops, bytes;
Cond throttle_cond;
public:
WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp)
: ThreadPool::WorkQueue<TransContext>("NewStore::WALWQ", ti, sti, tp),
store(s),
ops(0),
bytes(0) {
store(s) {
}
bool _empty() {
return wal_queue.empty();
@ -394,8 +394,6 @@ public:
wal_queue.push_back(*i->osr);
}
i->osr->wal_q.push_back(*i);
++ops;
bytes += i->wal_txn->get_bytes();
return true;
}
void _dequeue(TransContext *p) {
@ -434,22 +432,6 @@ public:
unlock();
drain();
}
void throttle(uint64_t max_ops, uint64_t max_bytes) {
Mutex& lock = get_lock();
Mutex::Locker l(lock);
while (ops > max_ops || bytes > max_bytes) {
throttle_cond.Wait(lock);
}
}
void release_throttle(TransContext *txc) {
lock();
--ops;
bytes -= txc->wal_txn->get_bytes();
throttle_cond.Signal();
unlock();
}
};
struct KVSyncThread : public Thread {
@ -495,6 +477,9 @@ private:
uint64_t nid_last;
uint64_t nid_max;
Throttle throttle_ops, throttle_bytes; ///< submit to commit
Throttle throttle_wal_ops, throttle_wal_bytes; ///< submit to wal complete
Mutex wal_lock;
atomic64_t wal_seq;
ThreadPool wal_tp;