From 3e80f8d74a535e14d4092b27ea5417bacff8394e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Da=C5=82ek?= Date: Fri, 26 Feb 2016 13:54:20 +0100 Subject: [PATCH] msg/async: Implement smarter worker thread selection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This changeset makes AsyncMessenger a bit smarter when it comes to assigning worker threads to AsyncConnections. Each time a worker is assigned, its reference count is increased. Next time when Async Messenger needs to assign another worker to new AsyncConnection, it picks the one with the lowest reference count. If it cannot find an idle one, and number of currently instantiated workers is less than specified with "ms async op max threads", the new worker is created and returned. Once AsyncConnection goes away, the reference count on assigned worker is decreased. This does not prevent, but greatly reduces chances of having a single async worker thread doing most (or even all) of the ops, and also removes the need to manually tune the "ms async op threads" option. Signed-off-by: Piotr Dałek --- src/common/config_opts.h | 3 +- src/msg/async/AsyncConnection.cc | 6 +++ src/msg/async/AsyncConnection.h | 2 + src/msg/async/AsyncMessenger.cc | 69 +++++++++++++++++++++++++++++++- src/msg/async/AsyncMessenger.h | 16 +++++--- 5 files changed, 89 insertions(+), 7 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index e018f1e3ebc..2f67509f9ee 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -196,7 +196,8 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at -OPTION(ms_async_op_threads, OPT_INT, 3) +OPTION(ms_async_op_threads, OPT_INT, 3) // number of worker processing threads for async messenger created on init +OPTION(ms_async_max_op_threads, OPT_INT, 5) // max number of worker processing threads for async messenger OPTION(ms_async_set_affinity, OPT_BOOL, true) // example: ms_async_affinity_cores = 0,1 // The number of coreset is expected to equal to ms_async_op_threads, otherwise diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 0d7ca05d5d2..18430066d3f 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2493,6 +2493,12 @@ void AsyncConnection::mark_down() _stop(); } +void AsyncConnection::release_worker() +{ + if (msgr) + reinterpret_cast(msgr)->release_worker(center); +} + void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { assert(write_lock.is_locked()); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 23577db36b4..416bccba42a 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -196,6 +196,8 @@ class AsyncConnection : public Connection { Mutex::Locker l(lock); policy.lossy = true; } + + void release_worker(); private: enum { diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index c12ee8e21bb..8ae2cffb81a 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -307,11 +307,14 @@ void *Worker::entry() *******************/ const string WorkerPool::name = "AsyncMessenger::WorkerPool"; -WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false), +WorkerPool::WorkerPool(CephContext *c): cct(c), started(false), barrier_lock("WorkerPool::WorkerPool::barrier_lock"), barrier_count(0) { assert(cct->_conf->ms_async_op_threads > 0); + // make sure user won't try to force some crazy number of worker threads + assert(cct->_conf->ms_async_max_op_threads >= cct->_conf->ms_async_op_threads && + cct->_conf->ms_async_op_threads <= 32); for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) { Worker *w = new Worker(cct, this, i); workers.push_back(w); @@ -351,6 +354,70 @@ void WorkerPool::start() } } +Worker* WorkerPool::get_worker() +{ + ldout(cct, 10) << __func__ << dendl; + + // start with some reasonably large number + unsigned min_load = std::numeric_limits::max(); + Worker* current_best = nullptr; + + simple_spin_lock(&pool_spin); + // find worker with least references + // tempting case is returning on references == 0, but in reality + // this will happen so rarely that there's no need for special case. + for (auto p = workers.begin(); p != workers.end(); ++p) { + unsigned worker_load = (*p)->references.load(); + ldout(cct, 20) << __func__ << " Worker " << *p << " load: " << worker_load << dendl; + if (worker_load < min_load) { + current_best = *p; + min_load = worker_load; + } + } + + // if minimum load exceeds amount of workers, make a new worker + // logic behind this is that we're not going to create new worker + // just because others have *some* load, we'll defer worker creation + // until others have *plenty* of load. This will cause new worker + // to get assigned to all new connections *unless* one or more + // of workers get their load reduced - in that case, this worker + // will be assigned to new connection. + // TODO: add more logic and heuristics, so connections known to be + // of light workload (heartbeat service, etc.) won't overshadow + // heavy workload (clients, etc). + if (!current_best || ((workers.size() < (unsigned)cct->_conf->ms_async_max_op_threads) + && (min_load > workers.size()))) { + ldout(cct, 20) << __func__ << " creating worker" << dendl; + current_best = new Worker(cct, this, workers.size()); + workers.push_back(current_best); + current_best->create("ms_async_worker"); + } else { + ldout(cct, 20) << __func__ << " picked " << current_best + << " as best worker with load " << min_load << dendl; + } + + ++current_best->references; + simple_spin_unlock(&pool_spin); + + assert(current_best); + return current_best; +} + +void WorkerPool::release_worker(EventCenter* c) +{ + ldout(cct, 10) << __func__ << dendl; + simple_spin_lock(&pool_spin); + for (auto p = workers.begin(); p != workers.end(); ++p) { + if (&((*p)->center) == c) { + ldout(cct, 10) << __func__ << " found worker, releasing" << dendl; + int oldref = (*p)->references.fetch_sub(1); + assert(oldref > 0); + break; + } + } + simple_spin_unlock(&pool_spin); +} + void WorkerPool::barrier() { ldout(cct, 10) << __func__ << " started." << dendl; diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 3c7aa0aa25e..52d93d7cd1e 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -36,6 +36,7 @@ using namespace std; #include "include/assert.h" #include "AsyncConnection.h" #include "Event.h" +#include "common/simple_spin.h" class AsyncMessenger; @@ -65,8 +66,9 @@ class Worker : public Thread { public: EventCenter center; + std::atomic_uint references; Worker(CephContext *c, WorkerPool *p, int i) - : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c) { + : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { center.init(InitEventNumber); char name[128]; sprintf(name, "AsyncMessenger::Worker-%d", id); @@ -133,7 +135,6 @@ class WorkerPool { WorkerPool(const WorkerPool &); WorkerPool& operator=(const WorkerPool &); CephContext *cct; - uint64_t seq; vector workers; vector coreids; // Used to indicate whether thread started @@ -141,6 +142,7 @@ class WorkerPool { Mutex barrier_lock; Cond barrier_cond; atomic_t barrier_count; + simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER; class C_barrier : public EventCallback { WorkerPool *pool; @@ -158,9 +160,8 @@ class WorkerPool { explicit WorkerPool(CephContext *c); virtual ~WorkerPool(); void start(); - Worker *get_worker() { - return workers[(seq++)%workers.size()]; - } + Worker *get_worker(); + void release_worker(EventCenter* c); int get_cpuid(int id) { if (coreids.empty()) return -1; @@ -525,6 +526,7 @@ public: */ void unregister_conn(AsyncConnectionRef conn) { Mutex::Locker l(deleted_lock); + conn->release_worker(); deleted_conns.insert(conn); if (deleted_conns.size() >= ReapDeadConnectionThreshold) { @@ -540,6 +542,10 @@ public: * See "deleted_conns" */ int reap_dead(); + + void release_worker(EventCenter* c) { + pool->release_worker(c); + } /** * @} // AsyncMessenger Internals