diff --git a/src/common/config_opts.h b/src/common/config_opts.h index c25c7fc0fdc..2445c23464a 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -196,7 +196,8 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at -OPTION(ms_async_op_threads, OPT_INT, 3) +OPTION(ms_async_op_threads, OPT_INT, 3) // number of worker processing threads for async messenger created on init +OPTION(ms_async_max_op_threads, OPT_INT, 5) // max number of worker processing threads for async messenger OPTION(ms_async_set_affinity, OPT_BOOL, true) // example: ms_async_affinity_cores = 0,1 // The number of coreset is expected to equal to ms_async_op_threads, otherwise diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 31f5701dc67..bcf8e6a3f67 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2495,6 +2495,12 @@ void AsyncConnection::mark_down() _stop(); } +void AsyncConnection::release_worker() +{ + if (msgr) + reinterpret_cast(msgr)->release_worker(center); +} + void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { assert(write_lock.is_locked()); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 23577db36b4..416bccba42a 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -196,6 +196,8 @@ class AsyncConnection : public Connection { Mutex::Locker l(lock); policy.lossy = true; } + + void release_worker(); private: enum { diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 03d8cbfd3b0..2ae5ee14825 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -311,11 +311,14 @@ void *Worker::entry() *******************/ const string WorkerPool::name = "AsyncMessenger::WorkerPool"; -WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false), +WorkerPool::WorkerPool(CephContext *c): cct(c), started(false), barrier_lock("WorkerPool::WorkerPool::barrier_lock"), barrier_count(0) { assert(cct->_conf->ms_async_op_threads > 0); + // make sure user won't try to force some crazy number of worker threads + assert(cct->_conf->ms_async_max_op_threads >= cct->_conf->ms_async_op_threads && + cct->_conf->ms_async_op_threads <= 32); for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) { Worker *w = new Worker(cct, this, i); workers.push_back(w); @@ -355,6 +358,70 @@ void WorkerPool::start() } } +Worker* WorkerPool::get_worker() +{ + ldout(cct, 10) << __func__ << dendl; + + // start with some reasonably large number + unsigned min_load = std::numeric_limits::max(); + Worker* current_best = nullptr; + + simple_spin_lock(&pool_spin); + // find worker with least references + // tempting case is returning on references == 0, but in reality + // this will happen so rarely that there's no need for special case. + for (auto p = workers.begin(); p != workers.end(); ++p) { + unsigned worker_load = (*p)->references.load(); + ldout(cct, 20) << __func__ << " Worker " << *p << " load: " << worker_load << dendl; + if (worker_load < min_load) { + current_best = *p; + min_load = worker_load; + } + } + + // if minimum load exceeds amount of workers, make a new worker + // logic behind this is that we're not going to create new worker + // just because others have *some* load, we'll defer worker creation + // until others have *plenty* of load. This will cause new worker + // to get assigned to all new connections *unless* one or more + // of workers get their load reduced - in that case, this worker + // will be assigned to new connection. + // TODO: add more logic and heuristics, so connections known to be + // of light workload (heartbeat service, etc.) won't overshadow + // heavy workload (clients, etc). + if (!current_best || ((workers.size() < (unsigned)cct->_conf->ms_async_max_op_threads) + && (min_load > workers.size()))) { + ldout(cct, 20) << __func__ << " creating worker" << dendl; + current_best = new Worker(cct, this, workers.size()); + workers.push_back(current_best); + current_best->create("ms_async_worker"); + } else { + ldout(cct, 20) << __func__ << " picked " << current_best + << " as best worker with load " << min_load << dendl; + } + + ++current_best->references; + simple_spin_unlock(&pool_spin); + + assert(current_best); + return current_best; +} + +void WorkerPool::release_worker(EventCenter* c) +{ + ldout(cct, 10) << __func__ << dendl; + simple_spin_lock(&pool_spin); + for (auto p = workers.begin(); p != workers.end(); ++p) { + if (&((*p)->center) == c) { + ldout(cct, 10) << __func__ << " found worker, releasing" << dendl; + int oldref = (*p)->references.fetch_sub(1); + assert(oldref > 0); + break; + } + } + simple_spin_unlock(&pool_spin); +} + void WorkerPool::barrier() { ldout(cct, 10) << __func__ << " started." << dendl; diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 3c7aa0aa25e..52d93d7cd1e 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -36,6 +36,7 @@ using namespace std; #include "include/assert.h" #include "AsyncConnection.h" #include "Event.h" +#include "common/simple_spin.h" class AsyncMessenger; @@ -65,8 +66,9 @@ class Worker : public Thread { public: EventCenter center; + std::atomic_uint references; Worker(CephContext *c, WorkerPool *p, int i) - : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c) { + : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { center.init(InitEventNumber); char name[128]; sprintf(name, "AsyncMessenger::Worker-%d", id); @@ -133,7 +135,6 @@ class WorkerPool { WorkerPool(const WorkerPool &); WorkerPool& operator=(const WorkerPool &); CephContext *cct; - uint64_t seq; vector workers; vector coreids; // Used to indicate whether thread started @@ -141,6 +142,7 @@ class WorkerPool { Mutex barrier_lock; Cond barrier_cond; atomic_t barrier_count; + simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER; class C_barrier : public EventCallback { WorkerPool *pool; @@ -158,9 +160,8 @@ class WorkerPool { explicit WorkerPool(CephContext *c); virtual ~WorkerPool(); void start(); - Worker *get_worker() { - return workers[(seq++)%workers.size()]; - } + Worker *get_worker(); + void release_worker(EventCenter* c); int get_cpuid(int id) { if (coreids.empty()) return -1; @@ -525,6 +526,7 @@ public: */ void unregister_conn(AsyncConnectionRef conn) { Mutex::Locker l(deleted_lock); + conn->release_worker(); deleted_conns.insert(conn); if (deleted_conns.size() >= ReapDeadConnectionThreshold) { @@ -540,6 +542,10 @@ public: * See "deleted_conns" */ int reap_dead(); + + void release_worker(EventCenter* c) { + pool->release_worker(c); + } /** * @} // AsyncMessenger Internals