Merge pull request #12023 from yuyuyu101/wip-msgr-type

msg: allow different ms type for cluster network and public network

Reviewed-by: Adir Lev <adirl@mellanox.com>
Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Yuri Weinstein 2017-02-03 08:50:14 -08:00 committed by GitHub
commit 7582a03650
13 changed files with 51 additions and 30 deletions

View File

@ -143,7 +143,8 @@ int main(int argc, const char **argv)
uint64_t nonce = 0;
get_random_bytes((char*)&nonce, sizeof(nonce));
Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type;
Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::MDS(-1), "mds",
nonce, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)

View File

@ -651,7 +651,8 @@ int main(int argc, const char **argv)
// bind
int rank = monmap.get_rank(g_conf->name.get_id());
Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type;
Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::MON(rank), "mon",
0, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)

View File

@ -440,29 +440,31 @@ int main(int argc, const char **argv)
<< TEXT_NORMAL << dendl;
}
Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type,
std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->ms_type : g_conf->ms_public_type;
std::string cluster_msgr_type = g_conf->ms_cluster_type.empty() ? g_conf->ms_type : g_conf->ms_cluster_type;
Messenger *ms_public = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::OSD(whoami), "client",
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msgr_type,
entity_name_t::OSD(whoami), "cluster",
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, g_conf->ms_type,
Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msgr_type,
entity_name_t::OSD(whoami), "hb_back_client",
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, g_conf->ms_type,
Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::OSD(whoami), "hb_front_client",
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msgr_type,
entity_name_t::OSD(whoami), "hb_back_server",
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type,
Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::OSD(whoami), "hb_front_server",
getpid(), Messenger::HEARTBEAT);
Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
Messenger *ms_objecter = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::OSD(whoami), "ms_objecter",
getpid(), 0);
if (!ms_public || !ms_cluster || !ms_hb_front_client || !ms_hb_back_client || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)

View File

@ -175,7 +175,9 @@ OPTION(heartbeat_file, OPT_STR, "")
OPTION(heartbeat_inject_failure, OPT_INT, 0) // force an unhealthy heartbeat for N seconds
OPTION(perf, OPT_BOOL, true) // enable internal perf counters
OPTION(ms_type, OPT_STR, "async") // messenger backend
OPTION(ms_type, OPT_STR, "async+posix") // messenger backend
OPTION(ms_public_type, OPT_STR, "") // messenger backend
OPTION(ms_cluster_type, OPT_STR, "") // messenger backend
OPTION(ms_tcp_nodelay, OPT_BOOL, true)
OPTION(ms_tcp_rcvbuf, OPT_INT, 0)
OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy
@ -212,7 +214,6 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds
OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send
OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at
OPTION(ms_async_transport_type, OPT_STR, "posix")
OPTION(ms_async_op_threads, OPT_U64, 3) // number of worker processing threads for async messenger created on init
OPTION(ms_async_max_op_threads, OPT_U64, 5) // max number of worker processing threads for async messenger
OPTION(ms_async_set_affinity, OPT_BOOL, true)

View File

@ -14,9 +14,10 @@
Messenger *Messenger::create_client_messenger(CephContext *cct, string lname)
{
std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf->ms_type : cct->_conf->ms_public_type;
uint64_t nonce = 0;
get_random_bytes((char*)&nonce, sizeof(nonce));
return Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(),
return Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(),
std::move(lname), nonce, 0);
}
@ -36,8 +37,8 @@ Messenger *Messenger::create(CephContext *cct, const string &type,
}
if (r == 0 || type == "simple")
return new SimpleMessenger(cct, name, std::move(lname), nonce);
else if (r == 1 || type == "async")
return new AsyncMessenger(cct, name, std::move(lname), nonce);
else if (r == 1 || type.find("async") != std::string::npos)
return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
#ifdef HAVE_XIO
else if ((type == "xio") &&
cct->check_experimental_feature_enabled("ms-type-xio"))

View File

@ -213,9 +213,13 @@ void Processor::stop()
struct StackSingleton {
CephContext *cct;
std::shared_ptr<NetworkStack> stack;
StackSingleton(CephContext *c) {
stack = NetworkStack::create(c, c->_conf->ms_async_transport_type);
StackSingleton(CephContext *c): cct(c) {}
void ready(std::string &type) {
if (!stack)
stack = NetworkStack::create(cct, type);
}
~StackSingleton() {
stack->stop();
@ -239,7 +243,7 @@ class C_handle_reap : public EventCallback {
*/
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce)
const std::string &type, string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
dispatch_queue(cct, this, mname),
lock("AsyncMessenger::lock"),
@ -247,9 +251,16 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
cluster_protocol(0), stopped(true)
{
std::string transport_type = "posix";
if (type.find("rdma") != std::string::npos)
transport_type = "rdma";
else if (type.find("dpdk") != std::string::npos)
transport_type = "dpdk";
ceph_spin_init(&global_seq_lock);
StackSingleton *single;
cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack");
cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack::"+transport_type);
single->ready(transport_type);
stack = single->stack.get();
stack->start();
local_worker = stack->get_worker();

View File

@ -82,7 +82,7 @@ public:
* _nonce A unique ID to use for this AsyncMessenger. It should not
* be a value that will be repeated if the daemon restarts.
*/
AsyncMessenger(CephContext *cct, entity_name_t name,
AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
string mname, uint64_t _nonce);
/**
@ -225,6 +225,8 @@ private:
// the worker run messenger's cron jobs
Worker *local_worker;
std::string ms_type;
/// overall lock used for AsyncMessenger data structures
Mutex lock;
// AsyncMessenger stuff

View File

@ -99,14 +99,15 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout)
<< " time_id=" << time_event_next_id << ").";
}
int EventCenter::init(int n, unsigned i)
int EventCenter::init(int n, unsigned i, const std::string &t)
{
// can't init multi times
assert(nevent == 0);
type = t;
idx = i;
if (cct->_conf->ms_async_transport_type == "dpdk") {
if (t == "dpdk") {
#ifdef HAVE_DPDK
driver = new DPDKDriver(cct);
#endif
@ -189,7 +190,7 @@ void EventCenter::set_owner()
ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl;
if (!global_centers) {
cct->lookup_or_create_singleton_object<EventCenter::AssociatedCenters>(
global_centers, "AsyncMessenger::EventCenter::global_center");
global_centers, "AsyncMessenger::EventCenter::global_center::"+type);
assert(global_centers);
global_centers->centers[idx] = this;
if (driver->need_wakeup()) {

View File

@ -152,6 +152,7 @@ class EventCenter {
private:
CephContext *cct;
std::string type;
int nevent;
// Used only to external event
pthread_t owner;
@ -190,7 +191,7 @@ class EventCenter {
~EventCenter();
ostream& _event_prefix(std::ostream *_dout);
int init(int nevent, unsigned idx);
int init(int nevent, unsigned idx, const std::string &t);
void set_owner();
pthread_t get_owner() const { return owner; }
unsigned get_id() const { return idx; }

View File

@ -107,7 +107,7 @@ NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(fa
for (unsigned i = 0; i < num_workers; ++i) {
Worker *w = create_worker(cct, type, i);
w->center.init(InitEventNumber, i);
w->center.init(InitEventNumber, i, type);
workers.push_back(w);
}
cct->register_fork_watcher(this);

View File

@ -256,7 +256,7 @@ class FakeEvent : public EventCallback {
TEST(EventCenterTest, FileEventExpansion) {
vector<int> sds;
EventCenter center(g_ceph_context);
center.init(100, 0);
center.init(100, 0, "posix");
center.set_owner();
EventCallbackRef e(new FakeEvent());
for (int i = 0; i < 300; i++) {
@ -277,7 +277,7 @@ class Worker : public Thread {
public:
EventCenter center;
explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) {
center.init(100, idx);
center.init(100, idx, "posix");
}
void stop() {
done = true;

View File

@ -41,11 +41,11 @@ class NetworkWorkerTest : public ::testing::TestWithParam<const char*> {
virtual void SetUp() {
cerr << __func__ << " start set up " << GetParam() << std::endl;
if (strncmp(GetParam(), "dpdk", 4)) {
g_ceph_context->_conf->set_val("ms_async_transport_type", "posix", false, false);
g_ceph_context->_conf->set_val("ms_type", "async+posix", false, false);
addr = "127.0.0.1:15000";
port_addr = "127.0.0.1:15001";
} else {
g_ceph_context->_conf->set_val("ms_async_transport_type", "dpdk", false, false);
g_ceph_context->_conf->set_val("ms_type", "async+dpdk", false, false);
g_ceph_context->_conf->set_val("ms_dpdk_debug_allow_loopback", "true", false, false);
g_ceph_context->_conf->set_val("ms_async_op_threads", "2", false, false);
g_ceph_context->_conf->set_val("ms_dpdk_coremask", "0x7", false, false);

View File

@ -450,7 +450,7 @@ double eventcenter_poll()
{
int count = 1000000;
EventCenter center(g_ceph_context);
center.init(1000, 0);
center.init(1000, 0, "posix");
center.set_owner();
uint64_t start = Cycles::rdtsc();
for (int i = 0; i < count; i++) {
@ -467,7 +467,7 @@ class CenterWorker : public Thread {
public:
EventCenter center;
explicit CenterWorker(CephContext *c): cct(c), done(false), center(c) {
center.init(100, 0);
center.init(100, 0, "posix");
}
void stop() {
done = true;