os/newstore: throttle wal work

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2015-04-16 16:30:31 -07:00
parent efe218b4aa
commit 143d48570f
5 changed files with 45 additions and 3 deletions

View File

@ -322,6 +322,10 @@ public:
pool->_lock.Unlock(); pool->_lock.Unlock();
} }
Mutex &get_lock() {
return pool->_lock;
}
void lock() { void lock() {
pool->lock(); pool->lock();
} }

View File

@ -798,9 +798,11 @@ OPTION(newstore_sync_queue_transaction, OPT_BOOL, false) // perform write synch
OPTION(newstore_fsync_threads, OPT_INT, 16) // num threads calling fsync OPTION(newstore_fsync_threads, OPT_INT, 16) // num threads calling fsync
OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value
OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value
OPTION(newstore_wal_threads, OPT_INT, 2) OPTION(newstore_wal_threads, OPT_INT, 4)
OPTION(newstore_wal_thread_timeout, OPT_INT, 30) OPTION(newstore_wal_thread_timeout, OPT_INT, 30)
OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120) OPTION(newstore_wal_thread_suicide_timeout, OPT_INT, 120)
OPTION(newstore_wal_max_ops, OPT_U64, 64)
OPTION(newstore_wal_max_bytes, OPT_U64, 64*1024*1024)
OPTION(newstore_fid_prealloc, OPT_INT, 1024) OPTION(newstore_fid_prealloc, OPT_INT, 1024)
OPTION(newstore_nid_prealloc, OPT_INT, 1024) OPTION(newstore_nid_prealloc, OPT_INT, 1024)
OPTION(newstore_overlay_max_length, OPT_INT, 65536) OPTION(newstore_overlay_max_length, OPT_INT, 65536)

View File

@ -2419,6 +2419,10 @@ int NewStore::queue_transactions(
tls, &onreadable, &ondisk, &onreadable_sync); tls, &onreadable, &ondisk, &onreadable_sync);
int r; int r;
// throttle wal work
wal_wq.throttle(g_conf->newstore_wal_max_ops,
g_conf->newstore_wal_max_bytes);
// set up the sequencer // set up the sequencer
OpSequencer *osr; OpSequencer *osr;
if (!posr) if (!posr)

View File

@ -367,6 +367,8 @@ public:
private: private:
NewStore *store; NewStore *store;
wal_osr_queue_t wal_queue; wal_osr_queue_t wal_queue;
uint64_t ops, bytes;
Cond throttle_cond;
public: public:
WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp) WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp)
@ -381,6 +383,8 @@ public:
wal_queue.push_back(*i->osr); wal_queue.push_back(*i->osr);
} }
i->osr->wal_q.push_back(*i); i->osr->wal_q.push_back(*i);
++ops;
bytes += i->wal_txn->get_bytes();
return true; return true;
} }
void _dequeue(TransContext *p) { void _dequeue(TransContext *p) {
@ -397,12 +401,18 @@ public:
// requeue at the end to minimize contention // requeue at the end to minimize contention
wal_queue.push_back(*i->osr); wal_queue.push_back(*i->osr);
} }
--ops;
bytes -= i->wal_txn->get_bytes();
throttle_cond.Signal();
// preserve wal ordering for this sequencer by taking the lock
// while still holding the queue lock
i->osr->wal_apply_lock.Lock();
return i; return i;
} }
void _process(TransContext *i, ThreadPool::TPHandle &handle) { void _process(TransContext *i, ThreadPool::TPHandle &handle) {
// preserve wal ordering for this sequencer
Mutex::Locker l(i->osr->wal_apply_lock);
store->_apply_wal_transaction(i); store->_apply_wal_transaction(i);
i->osr->wal_apply_lock.Unlock();
} }
void _clear() { void _clear() {
assert(wal_queue.empty()); assert(wal_queue.empty());
@ -416,6 +426,14 @@ public:
unlock(); unlock();
drain(); drain();
} }
void throttle(uint64_t max_ops, uint64_t max_bytes) {
Mutex& lock = get_lock();
Mutex::Locker l(lock);
while (ops > max_ops || bytes > max_bytes) {
throttle_cond.Wait(lock);
}
}
}; };
struct KVSyncThread : public Thread { struct KVSyncThread : public Thread {

View File

@ -165,6 +165,20 @@ struct wal_transaction_t {
uint64_t seq; uint64_t seq;
list<wal_op_t> ops; list<wal_op_t> ops;
int64_t _bytes; ///< cached byte count
wal_transaction_t() : _bytes(-1) {}
uint64_t get_bytes() {
if (_bytes < 0) {
_bytes = 0;
for (list<wal_op_t>::iterator p = ops.begin(); p != ops.end(); ++p) {
_bytes += p->length;
}
}
return _bytes;
}
void encode(bufferlist& bl) const; void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& p); void decode(bufferlist::iterator& p);
void dump(Formatter *f) const; void dump(Formatter *f) const;