Merge PR #39654 into master

* refs/pull/39654/head:
	common/options: drop ms_async_max_op_threads
	msg/async: drop Stack::num_workers
	msg/async: s/num_workers/workers.size()/
	msg/async: use range-based loop in NetworkStack
	msg/async: do not pass worker id to Stack::spawn_worker()
	async/Stack: pass Worker* to NetworkStack::add_thread()
	async/rdma: do not reference worker id in RDMAStack::spawn_worker()
	async/dpdk: do not use worker id when creating worker
	async/PosixStack: do not reference worker id in ctor
	async/rdma: initialize worker in RDMAStack::create_worker()
	async/rdma: move RDMAStack::create_worker() to .cc

Reviewed-by: luo runbing <luo.runbing@zte.com.cn>
Reviewed-by: Haomai Wang <haomai@xsky.com>
This commit is contained in:
Sage Weil 2021-03-03 08:38:20 -05:00
commit 67e842d519
11 changed files with 59 additions and 81 deletions

View File

@ -133,3 +133,4 @@ zdover23 Zac Dover <zac.dover@gmail.com>
ShyamsundarR Shyamsundar R <srangana@redhat.com>
sunnyku Sunny Kumar <sunkumar@redhat.com>
adk3798 Adam King <adking@redhat.com>
runsisi luo runbing <luo.runbing@zte.com.cn>

View File

@ -83,14 +83,3 @@ Async messenger options
:Type: 64-bit Unsigned Integer
:Required: No
:Default: ``3``
``ms_async_max_op_threads``
:Description: Maximum number of worker threads used by each Async Messenger instance.
Set to lower values when your machine has limited CPU count, and increase
when your CPUs are underutilized (i. e. one or more of CPUs are
constantly on 100% load during I/O operations).
:Type: 64-bit Unsigned Integer
:Required: No
:Default: ``5``

View File

@ -142,7 +142,6 @@ OPTION(ms_blackhole_client, OPT_BOOL)
OPTION(ms_dump_on_send, OPT_BOOL) // hexdump msg to log on send
OPTION(ms_dump_corrupt_message_level, OPT_INT) // debug level to hexdump undecodeable messages at
OPTION(ms_async_op_threads, OPT_U64) // number of worker processing threads for async messenger created on init
OPTION(ms_async_max_op_threads, OPT_U64) // max number of worker processing threads for async messenger
OPTION(ms_async_rdma_device_name, OPT_STR)
OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL)
OPTION(ms_async_rdma_buffer_size, OPT_INT)

View File

@ -1149,11 +1149,6 @@ std::vector<Option> get_global_options() {
.set_min_max(1, 24)
.set_description("Threadpool size for AsyncMessenger (ms_type=async)"),
Option("ms_async_max_op_threads", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(5)
.set_description("Maximum threadpool size of AsyncMessenger")
.add_see_also("ms_async_op_threads"),
Option("ms_async_rdma_device_name", Option::TYPE_STR, Option::LEVEL_ADVANCED)
.set_default("")
.set_description(""),

View File

@ -47,9 +47,8 @@ class PosixNetworkStack : public NetworkStack {
public:
explicit PosixNetworkStack(CephContext *c);
void spawn_worker(unsigned i, std::function<void ()> &&func) override {
threads.resize(i+1);
threads[i] = std::thread(func);
void spawn_worker(std::function<void ()> &&func) override {
threads.emplace_back(std::move(func));
}
void join_worker(unsigned i) override {
ceph_assert(threads.size() > i && threads[i].joinable());

View File

@ -34,9 +34,8 @@
#undef dout_prefix
#define dout_prefix *_dout << "stack "
std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
std::function<void ()> NetworkStack::add_thread(Worker* w)
{
Worker *w = workers[worker_id];
return [this, w]() {
char tp_name[16];
sprintf(tp_name, "msgr-worker-%u", w->id);
@ -86,8 +85,17 @@ std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
return nullptr;
}
unsigned num_workers = c->_conf->ms_async_op_threads;
ceph_assert(num_workers > 0);
if (num_workers >= EventCenter::MAX_EVENTCENTER) {
ldout(c, 0) << __func__ << " max thread limit is "
<< EventCenter::MAX_EVENTCENTER << ", switching to this now. "
<< "Higher thread values are unnecessary and currently unsupported."
<< dendl;
num_workers = EventCenter::MAX_EVENTCENTER;
}
const int InitEventNumber = 5000;
for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) {
Worker *w = stack->create_worker(c, worker_id);
int ret = w->center.init(InitEventNumber, worker_id, t);
if (ret)
@ -100,18 +108,7 @@ std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
NetworkStack::NetworkStack(CephContext *c)
: cct(c)
{
ceph_assert(cct->_conf->ms_async_op_threads > 0);
num_workers = cct->_conf->ms_async_op_threads;
if (num_workers >= EventCenter::MAX_EVENTCENTER) {
ldout(cct, 0) << __func__ << " max thread limit is "
<< EventCenter::MAX_EVENTCENTER << ", switching to this now. "
<< "Higher thread values are unnecessary and currently unsupported."
<< dendl;
num_workers = EventCenter::MAX_EVENTCENTER;
}
}
{}
void NetworkStack::start()
{
@ -121,17 +118,17 @@ void NetworkStack::start()
return ;
}
for (unsigned i = 0; i < num_workers; ++i) {
if (workers[i]->is_init())
for (Worker* worker : workers) {
if (worker->is_init())
continue;
std::function<void ()> thread = add_thread(i);
spawn_worker(i, std::move(thread));
spawn_worker(add_thread(worker));
}
started = true;
lk.unlock();
for (unsigned i = 0; i < num_workers; ++i)
workers[i]->wait_for_init();
for (Worker* worker : workers) {
worker->wait_for_init();
}
}
Worker* NetworkStack::get_worker()
@ -146,10 +143,10 @@ Worker* NetworkStack::get_worker()
// find worker with least references
// tempting case is returning on references == 0, but in reality
// this will happen so rarely that there's no need for special case.
for (unsigned i = 0; i < num_workers; ++i) {
unsigned worker_load = workers[i]->references.load();
for (Worker* worker : workers) {
unsigned worker_load = worker->references.load();
if (worker_load < min_load) {
current_best = workers[i];
current_best = worker;
min_load = worker_load;
}
}
@ -163,10 +160,11 @@ Worker* NetworkStack::get_worker()
void NetworkStack::stop()
{
std::lock_guard lk(pool_spin);
for (unsigned i = 0; i < num_workers; ++i) {
workers[i]->done = true;
workers[i]->center.wakeup();
join_worker(i);
unsigned i = 0;
for (Worker* worker : workers) {
worker->done = true;
worker->center.wakeup();
join_worker(i++);
}
started = false;
}
@ -195,10 +193,10 @@ void NetworkStack::drain()
ldout(cct, 30) << __func__ << " started." << dendl;
pthread_t cur = pthread_self();
pool_spin.lock();
C_drain drain(num_workers);
for (unsigned i = 0; i < num_workers; ++i) {
ceph_assert(cur != workers[i]->center.get_owner());
workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
C_drain drain(get_num_worker());
for (Worker* worker : workers) {
ceph_assert(cur != worker->center.get_owner());
worker->center.dispatch_event_external(EventCallbackRef(&drain));
}
pool_spin.unlock();
drain.wait();

View File

@ -293,11 +293,10 @@ class Worker {
};
class NetworkStack {
unsigned num_workers = 0;
ceph::spinlock pool_spin;
bool started = false;
std::function<void ()> add_thread(unsigned i);
std::function<void ()> add_thread(Worker* w);
virtual Worker* create_worker(CephContext *c, unsigned i) = 0;
@ -334,11 +333,11 @@ class NetworkStack {
}
void drain();
unsigned get_num_worker() const {
return num_workers;
return workers.size();
}
// direct is used in tests only
virtual void spawn_worker(unsigned i, std::function<void ()> &&) = 0;
virtual void spawn_worker(std::function<void ()> &&) = 0;
virtual void join_worker(unsigned i) = 0;
virtual bool is_ready() { return true; };

View File

@ -242,11 +242,11 @@ int DPDKWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
return r;
}
void DPDKStack::spawn_worker(unsigned i, std::function<void ()> &&func)
void DPDKStack::spawn_worker(std::function<void ()> &&func)
{
// create a extra master thread
//
funcs[i] = std::move(func);
funcs.push_back(std::move(func));
int r = 0;
r = dpdk::eal::init(cct);
if (r < 0) {
@ -255,16 +255,17 @@ void DPDKStack::spawn_worker(unsigned i, std::function<void ()> &&func)
}
// if dpdk::eal::init already called by NVMEDevice, we will select 1..n
// cores
ceph_assert(rte_lcore_count() >= i + 1);
unsigned nr_worker = funcs.size();
ceph_assert(rte_lcore_count() >= nr_worker);
unsigned core_id;
int j = i;
RTE_LCORE_FOREACH_SLAVE(core_id) {
if (i-- == 0) {
if (--nr_worker == 0) {
break;
}
}
dpdk::eal::execute_on_master([&]() {
r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&funcs[j]), core_id);
void *adapted_func = static_cast<void*>(funcs.back());
dpdk::eal::execute_on_master([adapted_func, core_id, this]() {
int r = rte_eal_remote_launch(dpdk_thread_adaptor, adapted_func, core_id);
if (r < 0) {
lderr(cct) << __func__ << " remote launch failed, r=" << r << dendl;
ceph_abort();

View File

@ -254,12 +254,11 @@ class DPDKStack : public NetworkStack {
}
public:
explicit DPDKStack(CephContext *cct): NetworkStack(cct) {
funcs.resize(cct->_conf->ms_async_max_op_threads);
}
explicit DPDKStack(CephContext *cct): NetworkStack(cct)
{}
virtual bool support_local_listen_table() const override { return true; }
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
virtual void spawn_worker(std::function<void ()> &&func) override;
virtual void join_worker(unsigned i) override;
};

View File

@ -782,13 +782,6 @@ RDMAStack::RDMAStack(CephContext *cct)
rdma_dispatcher(std::make_shared<RDMADispatcher>(cct, ib))
{
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
w->set_dispatcher(rdma_dispatcher);
w->set_ib(ib);
}
ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
}
@ -799,10 +792,17 @@ RDMAStack::~RDMAStack()
}
}
void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
Worker* RDMAStack::create_worker(CephContext *c, unsigned worker_id)
{
threads.resize(i+1);
threads[i] = std::thread(func);
auto w = new RDMAWorker(c, worker_id);
w->set_dispatcher(rdma_dispatcher);
w->set_ib(ib);
return w;
}
void RDMAStack::spawn_worker(std::function<void ()> &&func)
{
threads.emplace_back(std::move(func));
}
void RDMAStack::join_worker(unsigned i)

View File

@ -326,16 +326,14 @@ class RDMAStack : public NetworkStack {
std::atomic<bool> fork_finished = {false};
virtual Worker* create_worker(CephContext *c, unsigned worker_id) override {
return new RDMAWorker(c, worker_id);
}
virtual Worker* create_worker(CephContext *c, unsigned worker_id) override;
public:
explicit RDMAStack(CephContext *cct);
virtual ~RDMAStack();
virtual bool nonblock_connect_need_writable_event() const override { return false; }
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
virtual void spawn_worker(std::function<void ()> &&func) override;
virtual void join_worker(unsigned i) override;
virtual bool is_ready() override { return fork_finished.load(); };
virtual void ready() override { fork_finished = true; };