diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc index 4cd951918ad..4ba914b2d18 100644 --- a/src/ceph_mds.cc +++ b/src/ceph_mds.cc @@ -143,7 +143,8 @@ int main(int argc, const char **argv) uint64_t nonce = 0; get_random_bytes((char*)&nonce, sizeof(nonce)); - Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type, + std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type; + Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::MDS(-1), "mds", nonce, Messenger::HAS_MANY_CONNECTIONS); if (!msgr) diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 31606130378..489cb4137a1 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -651,7 +651,8 @@ int main(int argc, const char **argv) // bind int rank = monmap.get_rank(g_conf->name.get_id()); - Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type, + std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type; + Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::MON(rank), "mon", 0, Messenger::HAS_MANY_CONNECTIONS); if (!msgr) diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 49f7b2e6250..6d796056e16 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -440,29 +440,31 @@ int main(int argc, const char **argv) << TEXT_NORMAL << dendl; } - Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type, + std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type; + std::string cluster_msgr_type = g_conf->ms_cluster_type.empty() ? g_conf->ms_type : g_conf->ms_cluster_type; + Messenger *ms_public = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::OSD(whoami), "client", getpid(), Messenger::HAS_HEAVY_TRAFFIC | Messenger::HAS_MANY_CONNECTIONS); - Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msgr_type, entity_name_t::OSD(whoami), "cluster", getpid(), Messenger::HAS_HEAVY_TRAFFIC | Messenger::HAS_MANY_CONNECTIONS); - Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msgr_type, entity_name_t::OSD(whoami), "hb_back_client", getpid(), Messenger::HEARTBEAT); - Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::OSD(whoami), "hb_front_client", getpid(), Messenger::HEARTBEAT); - Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msgr_type, entity_name_t::OSD(whoami), "hb_back_server", getpid(), Messenger::HEARTBEAT); - Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::OSD(whoami), "hb_front_server", getpid(), Messenger::HEARTBEAT); - Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type, + Messenger *ms_objecter = Messenger::create(g_ceph_context, public_msgr_type, entity_name_t::OSD(whoami), "ms_objecter", getpid(), 0); if (!ms_public || !ms_cluster || !ms_hb_front_client || !ms_hb_back_client || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index b64ab212c00..7db3dc642cd 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -175,7 +175,9 @@ OPTION(heartbeat_file, OPT_STR, "") OPTION(heartbeat_inject_failure, OPT_INT, 0) // force an unhealthy heartbeat for N seconds OPTION(perf, OPT_BOOL, true) // enable internal perf counters -OPTION(ms_type, OPT_STR, "async") // messenger backend +OPTION(ms_type, OPT_STR, "async+posix") // messenger backend +OPTION(ms_public_type, OPT_STR, "") // messenger backend +OPTION(ms_cluster_type, OPT_STR, "") // messenger backend OPTION(ms_tcp_nodelay, OPT_BOOL, true) OPTION(ms_tcp_rcvbuf, OPT_INT, 0) OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy @@ -212,7 +214,6 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at -OPTION(ms_async_transport_type, OPT_STR, "posix") OPTION(ms_async_op_threads, OPT_U64, 3) // number of worker processing threads for async messenger created on init OPTION(ms_async_max_op_threads, OPT_U64, 5) // max number of worker processing threads for async messenger OPTION(ms_async_set_affinity, OPT_BOOL, true) diff --git a/src/msg/Messenger.cc b/src/msg/Messenger.cc index bc6509914ba..ff9812e3261 100644 --- a/src/msg/Messenger.cc +++ b/src/msg/Messenger.cc @@ -14,9 +14,10 @@ Messenger *Messenger::create_client_messenger(CephContext *cct, string lname) { + std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf->ms_type : cct->_conf->ms_public_type; uint64_t nonce = 0; get_random_bytes((char*)&nonce, sizeof(nonce)); - return Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(), + return Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(), std::move(lname), nonce, 0); } @@ -36,8 +37,8 @@ Messenger *Messenger::create(CephContext *cct, const string &type, } if (r == 0 || type == "simple") return new SimpleMessenger(cct, name, std::move(lname), nonce); - else if (r == 1 || type == "async") - return new AsyncMessenger(cct, name, std::move(lname), nonce); + else if (r == 1 || type.find("async") != std::string::npos) + return new AsyncMessenger(cct, name, type, std::move(lname), nonce); #ifdef HAVE_XIO else if ((type == "xio") && cct->check_experimental_feature_enabled("ms-type-xio")) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 2b2eb2c0c9e..506b71be471 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -213,9 +213,13 @@ void Processor::stop() struct StackSingleton { + CephContext *cct; std::shared_ptr stack; - StackSingleton(CephContext *c) { - stack = NetworkStack::create(c, c->_conf->ms_async_transport_type); + + StackSingleton(CephContext *c): cct(c) {} + void ready(std::string &type) { + if (!stack) + stack = NetworkStack::create(cct, type); } ~StackSingleton() { stack->stop(); @@ -239,7 +243,7 @@ class C_handle_reap : public EventCallback { */ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t _nonce) + const std::string &type, string mname, uint64_t _nonce) : SimplePolicyMessenger(cct, name,mname, _nonce), dispatch_queue(cct, this, mname), lock("AsyncMessenger::lock"), @@ -247,9 +251,16 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), cluster_protocol(0), stopped(true) { + std::string transport_type = "posix"; + if (type.find("rdma") != std::string::npos) + transport_type = "rdma"; + else if (type.find("dpdk") != std::string::npos) + transport_type = "dpdk"; + ceph_spin_init(&global_seq_lock); StackSingleton *single; - cct->lookup_or_create_singleton_object(single, "AsyncMessenger::NetworkStack"); + cct->lookup_or_create_singleton_object(single, "AsyncMessenger::NetworkStack::"+transport_type); + single->ready(transport_type); stack = single->stack.get(); stack->start(); local_worker = stack->get_worker(); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 384e4465a7b..771dfdbe8a7 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -82,7 +82,7 @@ public: * _nonce A unique ID to use for this AsyncMessenger. It should not * be a value that will be repeated if the daemon restarts. */ - AsyncMessenger(CephContext *cct, entity_name_t name, + AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type, string mname, uint64_t _nonce); /** @@ -225,6 +225,8 @@ private: // the worker run messenger's cron jobs Worker *local_worker; + std::string ms_type; + /// overall lock used for AsyncMessenger data structures Mutex lock; // AsyncMessenger stuff diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 5999b0d77e8..ca48d38be87 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -99,14 +99,15 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout) << " time_id=" << time_event_next_id << ")."; } -int EventCenter::init(int n, unsigned i) +int EventCenter::init(int n, unsigned i, const std::string &t) { // can't init multi times assert(nevent == 0); + type = t; idx = i; - if (cct->_conf->ms_async_transport_type == "dpdk") { + if (t == "dpdk") { #ifdef HAVE_DPDK driver = new DPDKDriver(cct); #endif @@ -189,7 +190,7 @@ void EventCenter::set_owner() ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl; if (!global_centers) { cct->lookup_or_create_singleton_object( - global_centers, "AsyncMessenger::EventCenter::global_center"); + global_centers, "AsyncMessenger::EventCenter::global_center::"+type); assert(global_centers); global_centers->centers[idx] = this; if (driver->need_wakeup()) { diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 5b210efe068..d1c7350cb0d 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -152,6 +152,7 @@ class EventCenter { private: CephContext *cct; + std::string type; int nevent; // Used only to external event pthread_t owner; @@ -190,7 +191,7 @@ class EventCenter { ~EventCenter(); ostream& _event_prefix(std::ostream *_dout); - int init(int nevent, unsigned idx); + int init(int nevent, unsigned idx, const std::string &t); void set_owner(); pthread_t get_owner() const { return owner; } unsigned get_id() const { return idx; } diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 26d119b999c..625e472d808 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -107,7 +107,7 @@ NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(fa for (unsigned i = 0; i < num_workers; ++i) { Worker *w = create_worker(cct, type, i); - w->center.init(InitEventNumber, i); + w->center.init(InitEventNumber, i, type); workers.push_back(w); } cct->register_fork_watcher(this); diff --git a/src/test/msgr/test_async_driver.cc b/src/test/msgr/test_async_driver.cc index 87aac346eed..7773db77c57 100644 --- a/src/test/msgr/test_async_driver.cc +++ b/src/test/msgr/test_async_driver.cc @@ -256,7 +256,7 @@ class FakeEvent : public EventCallback { TEST(EventCenterTest, FileEventExpansion) { vector sds; EventCenter center(g_ceph_context); - center.init(100, 0); + center.init(100, 0, "posix"); center.set_owner(); EventCallbackRef e(new FakeEvent()); for (int i = 0; i < 300; i++) { @@ -277,7 +277,7 @@ class Worker : public Thread { public: EventCenter center; explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) { - center.init(100, idx); + center.init(100, idx, "posix"); } void stop() { done = true; diff --git a/src/test/msgr/test_async_networkstack.cc b/src/test/msgr/test_async_networkstack.cc index af39f79c47c..585077c764b 100644 --- a/src/test/msgr/test_async_networkstack.cc +++ b/src/test/msgr/test_async_networkstack.cc @@ -41,11 +41,11 @@ class NetworkWorkerTest : public ::testing::TestWithParam { virtual void SetUp() { cerr << __func__ << " start set up " << GetParam() << std::endl; if (strncmp(GetParam(), "dpdk", 4)) { - g_ceph_context->_conf->set_val("ms_async_transport_type", "posix", false, false); + g_ceph_context->_conf->set_val("ms_type", "async+posix", false, false); addr = "127.0.0.1:15000"; port_addr = "127.0.0.1:15001"; } else { - g_ceph_context->_conf->set_val("ms_async_transport_type", "dpdk", false, false); + g_ceph_context->_conf->set_val("ms_type", "async+dpdk", false, false); g_ceph_context->_conf->set_val("ms_dpdk_debug_allow_loopback", "true", false, false); g_ceph_context->_conf->set_val("ms_async_op_threads", "2", false, false); g_ceph_context->_conf->set_val("ms_dpdk_coremask", "0x7", false, false); diff --git a/src/test/perf_local.cc b/src/test/perf_local.cc index 2fec73e0014..5fba4b443d1 100644 --- a/src/test/perf_local.cc +++ b/src/test/perf_local.cc @@ -450,7 +450,7 @@ double eventcenter_poll() { int count = 1000000; EventCenter center(g_ceph_context); - center.init(1000, 0); + center.init(1000, 0, "posix"); center.set_owner(); uint64_t start = Cycles::rdtsc(); for (int i = 0; i < count; i++) { @@ -467,7 +467,7 @@ class CenterWorker : public Thread { public: EventCenter center; explicit CenterWorker(CephContext *c): cct(c), done(false), center(c) { - center.init(100, 0); + center.init(100, 0, "posix"); } void stop() { done = true;