mirror of
https://github.com/ceph/ceph
synced 2025-01-20 01:51:34 +00:00
Merge pull request #7843 from branch-predictor/bp-smart-async-wp
msg/async: Implement smarter worker thread selection Reviewed-by: Haomai Wang <haomai@xsky.com>
This commit is contained in:
commit
69355b032a
@ -196,7 +196,8 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
|
||||
OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds
|
||||
OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send
|
||||
OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at
|
||||
OPTION(ms_async_op_threads, OPT_INT, 3)
|
||||
OPTION(ms_async_op_threads, OPT_INT, 3) // number of worker processing threads for async messenger created on init
|
||||
OPTION(ms_async_max_op_threads, OPT_INT, 5) // max number of worker processing threads for async messenger
|
||||
OPTION(ms_async_set_affinity, OPT_BOOL, true)
|
||||
// example: ms_async_affinity_cores = 0,1
|
||||
// The number of coreset is expected to equal to ms_async_op_threads, otherwise
|
||||
|
@ -2495,6 +2495,12 @@ void AsyncConnection::mark_down()
|
||||
_stop();
|
||||
}
|
||||
|
||||
void AsyncConnection::release_worker()
|
||||
{
|
||||
if (msgr)
|
||||
reinterpret_cast<AsyncMessenger*>(msgr)->release_worker(center);
|
||||
}
|
||||
|
||||
void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
|
||||
{
|
||||
assert(write_lock.is_locked());
|
||||
|
@ -196,6 +196,8 @@ class AsyncConnection : public Connection {
|
||||
Mutex::Locker l(lock);
|
||||
policy.lossy = true;
|
||||
}
|
||||
|
||||
void release_worker();
|
||||
|
||||
private:
|
||||
enum {
|
||||
|
@ -311,11 +311,14 @@ void *Worker::entry()
|
||||
*******************/
|
||||
const string WorkerPool::name = "AsyncMessenger::WorkerPool";
|
||||
|
||||
WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false),
|
||||
WorkerPool::WorkerPool(CephContext *c): cct(c), started(false),
|
||||
barrier_lock("WorkerPool::WorkerPool::barrier_lock"),
|
||||
barrier_count(0)
|
||||
{
|
||||
assert(cct->_conf->ms_async_op_threads > 0);
|
||||
// make sure user won't try to force some crazy number of worker threads
|
||||
assert(cct->_conf->ms_async_max_op_threads >= cct->_conf->ms_async_op_threads &&
|
||||
cct->_conf->ms_async_op_threads <= 32);
|
||||
for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
|
||||
Worker *w = new Worker(cct, this, i);
|
||||
workers.push_back(w);
|
||||
@ -355,6 +358,70 @@ void WorkerPool::start()
|
||||
}
|
||||
}
|
||||
|
||||
Worker* WorkerPool::get_worker()
|
||||
{
|
||||
ldout(cct, 10) << __func__ << dendl;
|
||||
|
||||
// start with some reasonably large number
|
||||
unsigned min_load = std::numeric_limits<int>::max();
|
||||
Worker* current_best = nullptr;
|
||||
|
||||
simple_spin_lock(&pool_spin);
|
||||
// find worker with least references
|
||||
// tempting case is returning on references == 0, but in reality
|
||||
// this will happen so rarely that there's no need for special case.
|
||||
for (auto p = workers.begin(); p != workers.end(); ++p) {
|
||||
unsigned worker_load = (*p)->references.load();
|
||||
ldout(cct, 20) << __func__ << " Worker " << *p << " load: " << worker_load << dendl;
|
||||
if (worker_load < min_load) {
|
||||
current_best = *p;
|
||||
min_load = worker_load;
|
||||
}
|
||||
}
|
||||
|
||||
// if minimum load exceeds amount of workers, make a new worker
|
||||
// logic behind this is that we're not going to create new worker
|
||||
// just because others have *some* load, we'll defer worker creation
|
||||
// until others have *plenty* of load. This will cause new worker
|
||||
// to get assigned to all new connections *unless* one or more
|
||||
// of workers get their load reduced - in that case, this worker
|
||||
// will be assigned to new connection.
|
||||
// TODO: add more logic and heuristics, so connections known to be
|
||||
// of light workload (heartbeat service, etc.) won't overshadow
|
||||
// heavy workload (clients, etc).
|
||||
if (!current_best || ((workers.size() < (unsigned)cct->_conf->ms_async_max_op_threads)
|
||||
&& (min_load > workers.size()))) {
|
||||
ldout(cct, 20) << __func__ << " creating worker" << dendl;
|
||||
current_best = new Worker(cct, this, workers.size());
|
||||
workers.push_back(current_best);
|
||||
current_best->create("ms_async_worker");
|
||||
} else {
|
||||
ldout(cct, 20) << __func__ << " picked " << current_best
|
||||
<< " as best worker with load " << min_load << dendl;
|
||||
}
|
||||
|
||||
++current_best->references;
|
||||
simple_spin_unlock(&pool_spin);
|
||||
|
||||
assert(current_best);
|
||||
return current_best;
|
||||
}
|
||||
|
||||
void WorkerPool::release_worker(EventCenter* c)
|
||||
{
|
||||
ldout(cct, 10) << __func__ << dendl;
|
||||
simple_spin_lock(&pool_spin);
|
||||
for (auto p = workers.begin(); p != workers.end(); ++p) {
|
||||
if (&((*p)->center) == c) {
|
||||
ldout(cct, 10) << __func__ << " found worker, releasing" << dendl;
|
||||
int oldref = (*p)->references.fetch_sub(1);
|
||||
assert(oldref > 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
simple_spin_unlock(&pool_spin);
|
||||
}
|
||||
|
||||
void WorkerPool::barrier()
|
||||
{
|
||||
ldout(cct, 10) << __func__ << " started." << dendl;
|
||||
|
@ -36,6 +36,7 @@ using namespace std;
|
||||
#include "include/assert.h"
|
||||
#include "AsyncConnection.h"
|
||||
#include "Event.h"
|
||||
#include "common/simple_spin.h"
|
||||
|
||||
|
||||
class AsyncMessenger;
|
||||
@ -65,8 +66,9 @@ class Worker : public Thread {
|
||||
|
||||
public:
|
||||
EventCenter center;
|
||||
std::atomic_uint references;
|
||||
Worker(CephContext *c, WorkerPool *p, int i)
|
||||
: cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c) {
|
||||
: cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
|
||||
center.init(InitEventNumber);
|
||||
char name[128];
|
||||
sprintf(name, "AsyncMessenger::Worker-%d", id);
|
||||
@ -133,7 +135,6 @@ class WorkerPool {
|
||||
WorkerPool(const WorkerPool &);
|
||||
WorkerPool& operator=(const WorkerPool &);
|
||||
CephContext *cct;
|
||||
uint64_t seq;
|
||||
vector<Worker*> workers;
|
||||
vector<int> coreids;
|
||||
// Used to indicate whether thread started
|
||||
@ -141,6 +142,7 @@ class WorkerPool {
|
||||
Mutex barrier_lock;
|
||||
Cond barrier_cond;
|
||||
atomic_t barrier_count;
|
||||
simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER;
|
||||
|
||||
class C_barrier : public EventCallback {
|
||||
WorkerPool *pool;
|
||||
@ -158,9 +160,8 @@ class WorkerPool {
|
||||
explicit WorkerPool(CephContext *c);
|
||||
virtual ~WorkerPool();
|
||||
void start();
|
||||
Worker *get_worker() {
|
||||
return workers[(seq++)%workers.size()];
|
||||
}
|
||||
Worker *get_worker();
|
||||
void release_worker(EventCenter* c);
|
||||
int get_cpuid(int id) {
|
||||
if (coreids.empty())
|
||||
return -1;
|
||||
@ -525,6 +526,7 @@ public:
|
||||
*/
|
||||
void unregister_conn(AsyncConnectionRef conn) {
|
||||
Mutex::Locker l(deleted_lock);
|
||||
conn->release_worker();
|
||||
deleted_conns.insert(conn);
|
||||
|
||||
if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
|
||||
@ -540,6 +542,10 @@ public:
|
||||
* See "deleted_conns"
|
||||
*/
|
||||
int reap_dead();
|
||||
|
||||
void release_worker(EventCenter* c) {
|
||||
pool->release_worker(c);
|
||||
}
|
||||
|
||||
/**
|
||||
* @} // AsyncMessenger Internals
|
||||
|
Loading…
Reference in New Issue
Block a user