mirror of
https://github.com/ceph/ceph
synced 2025-02-20 09:27:35 +00:00
filestore: adjust op_queue throttle max during fs commit
The underlying FS (btrfs at least) will block writes for a period while it is doing a commit. If an OSD workload is write limited, we should raise the op_queue max (operations that are queued to be applied to disk) during the commit period. For example, for a normally journal throughput limited (writeahead mode) workload: - journal queue throttle normally limits things. - sync starts - journaled items getting moved to op_queue soon fills up op_queue max - all writes stop - sync completes - op_queue drains, new writes come in again - journal queue throttle fills up, again starts limiting tput For an fs throughput limited workload (writeahead): - kernel buffer cache hits dirty limit - op_queue throttle limits tput - sync starts - opq stalls, new writes stall on throttler - sync completes - opq drains (quickly: kernel has no dirty pages) - new writes flood in - etc. (Actually this isn't super realistic, because hitting the kernel dirty limit will do all sorts of other weird things with userland memory allocations.) In both cases, the commit phase blocks up the op queue, and raising the limit temporarily will keep things flowing. This should be ok because the disks are still busy during this period; they're just flushing dirty data and metadata. Once the sync completes the opq will quickly dump dirty data into the kernel page cache and "catch up". Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
parent
51b93726d9
commit
3ecfbfbbd3
@ -364,6 +364,8 @@ struct config_option config_optionsp[] = {
|
||||
OPTION(filestore_journal_trailing, 0, OPT_BOOL, false),
|
||||
OPTION(filestore_queue_max_ops, 0, OPT_INT, 500),
|
||||
OPTION(filestore_queue_max_bytes, 0, OPT_INT, 100 << 20),
|
||||
OPTION(filestore_queue_committing_max_ops, 0, OPT_INT, 500), // this is ON TOP of filestore_queue_max_*
|
||||
OPTION(filestore_queue_committing_max_bytes, 0, OPT_INT, 100 << 20), // "
|
||||
OPTION(filestore_op_threads, 0, OPT_INT, 2),
|
||||
OPTION(filestore_commit_timeout, 0, OPT_FLOAT, 600),
|
||||
OPTION(ebofs, 0, OPT_BOOL, false),
|
||||
|
@ -413,6 +413,8 @@ struct md_config_t
|
||||
bool filestore_journal_trailing;
|
||||
int filestore_queue_max_ops;
|
||||
int filestore_queue_max_bytes;
|
||||
int filestore_queue_committing_max_ops;
|
||||
int filestore_queue_committing_max_bytes;
|
||||
int filestore_op_threads;
|
||||
float filestore_commit_timeout;
|
||||
|
||||
|
@ -1481,11 +1481,19 @@ void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>&
|
||||
void FileStore::op_queue_throttle()
|
||||
{
|
||||
op_tp.lock();
|
||||
while ((g_conf.filestore_queue_max_ops && op_queue_len >= (unsigned)g_conf.filestore_queue_max_ops) ||
|
||||
(g_conf.filestore_queue_max_bytes && op_queue_bytes >= (unsigned)g_conf.filestore_queue_max_bytes)) {
|
||||
dout(2) << "throttle: "
|
||||
<< op_queue_len << " > " << g_conf.filestore_queue_max_ops << " ops || "
|
||||
<< op_queue_bytes << " > " << g_conf.filestore_queue_max_bytes << dendl;
|
||||
|
||||
uint64_t max_ops = g_conf.filestore_queue_max_ops;
|
||||
uint64_t max_bytes = g_conf.filestore_queue_max_bytes;
|
||||
if (is_committing()) {
|
||||
max_ops += g_conf.filestore_queue_committing_max_ops;
|
||||
max_bytes += g_conf.filestore_queue_committing_max_bytes;
|
||||
}
|
||||
|
||||
while ((max_ops && op_queue_len >= max_ops) ||
|
||||
(max_bytes && op_queue_bytes >= max_bytes)) {
|
||||
dout(2) << "op_queue_throttle waiting: "
|
||||
<< op_queue_len << " > " << max_ops << " ops || "
|
||||
<< op_queue_bytes << " > " << max_bytes << dendl;
|
||||
op_tp.wait(op_throttle_cond);
|
||||
}
|
||||
op_tp.unlock();
|
||||
|
@ -186,9 +186,11 @@ bool JournalingObjectStore::commit_start()
|
||||
goto out;
|
||||
}
|
||||
|
||||
com_lock.Lock();
|
||||
// we can _only_ read applied_seq here because open_ops == 0 (we've
|
||||
// quiesced all in-flight applies).
|
||||
committing_seq = applied_seq;
|
||||
com_lock.Unlock();
|
||||
|
||||
dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl;
|
||||
ret = true;
|
||||
@ -217,7 +219,10 @@ void JournalingObjectStore::commit_finish()
|
||||
|
||||
if (journal)
|
||||
journal->committed_thru(committing_seq);
|
||||
|
||||
com_lock.Lock();
|
||||
committed_seq = committing_seq;
|
||||
com_lock.Unlock();
|
||||
|
||||
map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
|
||||
while (p != commit_waiters.end() &&
|
||||
|
@ -33,6 +33,7 @@ protected:
|
||||
|
||||
Cond cond;
|
||||
Mutex journal_lock;
|
||||
Mutex com_lock;
|
||||
|
||||
list<uint64_t> ops_submitting;
|
||||
list<Cond*> ops_apply_blocked;
|
||||
@ -58,14 +59,19 @@ protected:
|
||||
bool commit_start();
|
||||
void commit_started(); // allow new ops (underlying fs should now be committing all prior ops)
|
||||
void commit_finish();
|
||||
|
||||
|
||||
bool is_committing() {
|
||||
Mutex::Locker l(com_lock);
|
||||
return committing_seq != committed_seq;
|
||||
}
|
||||
|
||||
public:
|
||||
JournalingObjectStore() : op_seq(0),
|
||||
applied_seq(0), committing_seq(0), committed_seq(0),
|
||||
open_ops(0), blocked(false),
|
||||
journal(NULL),
|
||||
journal_lock("JournalingObjectStore::journal_lock") { }
|
||||
journal_lock("JournalingObjectStore::journal_lock"),
|
||||
com_lock("JournalingObjectStore::com_lock") { }
|
||||
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user