msg/async: refine worker creation in NetworkStack

This commit updates NetworkStack::create_worker() to a virtual
function to reduce type check redundancy.

Since calling a pure virtual function in a constructor method is
strictly prohibited, NetworkStack::create_worker() is currently
not implemented as a virtual function.
It requires duplicated type check with NetworkStack::create(),
making the code a little bit tricky.

Considering NetworkStack instances can only be instantiated
through NetworkStack::create(), this commit moves worker creation
out of its constructor to NetworkStack::create(), makes it a pure
virtual, and lets inherited classes implement it to remove the
type check redundancy. By making it a non-static, we can also
reduce unnecessary class member function exposure.

Signed-off-by: Insu Jang <insu_jang@tmax.co.kr>
This commit is contained in:
Insu Jang 2020-12-03 11:09:25 +09:00
parent 84d50e7d63
commit ff65c800b3
5 changed files with 35 additions and 35 deletions

View File

@ -40,6 +40,10 @@ class PosixWorker : public Worker {
class PosixNetworkStack : public NetworkStack {
std::vector<std::thread> threads;
virtual Worker* create_worker(CephContext *c, unsigned worker_id) override {
return new PosixWorker(c, worker_id);
}
public:
explicit PosixNetworkStack(CephContext *c, const std::string &t);

View File

@ -66,47 +66,42 @@ std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
const std::string &t)
{
std::shared_ptr<NetworkStack> stack = nullptr;
if (t == "posix")
return std::make_shared<PosixNetworkStack>(c, t);
stack.reset(new PosixNetworkStack(c, t));
#ifdef HAVE_RDMA
else if (t == "rdma")
return std::make_shared<RDMAStack>(c, t);
stack.reset(new RDMAStack(c, t));
#endif
#ifdef HAVE_DPDK
else if (t == "dpdk")
return std::make_shared<DPDKStack>(c, t);
stack.reset(new DPDKStack(c, t));
#endif
lderr(c) << __func__ << " ms_async_transport_type " << t <<
if (stack == nullptr) {
lderr(c) << __func__ << " ms_async_transport_type " << t <<
" is not supported! " << dendl;
ceph_abort();
return nullptr;
}
ceph_abort();
return nullptr;
}
const int InitEventNumber = 5000;
for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
Worker *w = stack->create_worker(c, worker_id);
int ret = w->center.init(InitEventNumber, worker_id, t);
if (ret)
throw std::system_error(-ret, std::generic_category());
stack->workers.push_back(w);
}
Worker* NetworkStack::create_worker(CephContext *c, const std::string &type, unsigned worker_id)
{
if (type == "posix")
return new PosixWorker(c, worker_id);
#ifdef HAVE_RDMA
else if (type == "rdma")
return new RDMAWorker(c, worker_id);
#endif
#ifdef HAVE_DPDK
else if (type == "dpdk")
return new DPDKWorker(c, worker_id);
#endif
lderr(c) << __func__ << " ms_async_transport_type " << type <<
" is not supported! " << dendl;
ceph_abort();
return nullptr;
return stack;
}
NetworkStack::NetworkStack(CephContext *c, const std:: string &t): type(t), started(false), cct(c)
{
ceph_assert(cct->_conf->ms_async_op_threads > 0);
const int InitEventNumber = 5000;
num_workers = cct->_conf->ms_async_op_threads;
if (num_workers >= EventCenter::MAX_EVENTCENTER) {
ldout(cct, 0) << __func__ << " max thread limit is "
@ -115,14 +110,6 @@ NetworkStack::NetworkStack(CephContext *c, const std:: string &t): type(t), star
<< dendl;
num_workers = EventCenter::MAX_EVENTCENTER;
}
for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) {
Worker *w = create_worker(cct, type, worker_id);
int ret = w->center.init(InitEventNumber, worker_id, type);
if (ret)
throw std::system_error(-ret, std::generic_category());
workers.push_back(w);
}
}
void NetworkStack::start()

View File

@ -300,6 +300,8 @@ class NetworkStack {
std::function<void ()> add_thread(unsigned i);
virtual Worker* create_worker(CephContext *c, unsigned i) = 0;
protected:
CephContext *cct;
std::vector<Worker*> workers;
@ -316,8 +318,6 @@ class NetworkStack {
static std::shared_ptr<NetworkStack> create(
CephContext *c, const std::string &type);
static Worker* create_worker(
CephContext *c, const std::string &t, unsigned i);
// backend need to override this method if backend doesn't support shared
// listen table.
// For example, posix backend has in kernel global listen table. If one

View File

@ -248,6 +248,11 @@ class DPDKWorker : public Worker {
class DPDKStack : public NetworkStack {
vector<std::function<void()> > funcs;
virtual Worker* create_worker(CephContext *c, unsigned worker_id) override {
return new DPDKWorker(c, worker_id);
}
public:
explicit DPDKStack(CephContext *cct, const string &t): NetworkStack(cct, t) {
funcs.resize(cct->_conf->ms_async_max_op_threads);

View File

@ -326,6 +326,10 @@ class RDMAStack : public NetworkStack {
std::atomic<bool> fork_finished = {false};
virtual Worker* create_worker(CephContext *c, unsigned worker_id) override {
return new RDMAWorker(c, worker_id);
}
public:
explicit RDMAStack(CephContext *cct, const std::string &t);
virtual ~RDMAStack();