diff --git a/src/ceph_fuse.cc b/src/ceph_fuse.cc index fbdc508df68..c37d0be37be 100644 --- a/src/ceph_fuse.cc +++ b/src/ceph_fuse.cc @@ -91,8 +91,8 @@ int main(int argc, const char **argv, const char *envp[]) { // start up network SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context, - entity_name_t::CLIENT()); - messenger->set_nonce(getpid()); + entity_name_t::CLIENT(), + getpid()); Client *client = new Client(messenger, &mc); if (filer_flags) { client->set_filer_flags(filer_flags); diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc index ed85338e1f5..bfa3698e6c9 100644 --- a/src/ceph_mds.cc +++ b/src/ceph_mds.cc @@ -70,8 +70,8 @@ static int do_cmds_special_action(const std::string &action, { common_init_finish(g_ceph_context); SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context, - entity_name_t::CLIENT()); - messenger->set_nonce(getpid()); + entity_name_t::CLIENT(), + getpid()); int r = messenger->bind(g_conf->public_addr); if (r < 0) return r; @@ -232,9 +232,9 @@ int main(int argc, const char **argv) global_print_banner(); SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context, - entity_name_t::MDS(-1)); + entity_name_t::MDS(-1), + getpid()); messenger->set_cluster_protocol(CEPH_MDS_PROTOCOL); - messenger->set_nonce(getpid()); cout << "starting " << g_conf->name << " at " << messenger->get_ms_addr() << std::endl; diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index c9e6d37cc7a..218650e9de9 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -368,7 +368,8 @@ int main(int argc, const char **argv) // bind int rank = monmap.get_rank(g_conf->name.get_id()); SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context, - entity_name_t::MON(rank)); + entity_name_t::MON(rank), + 0); messenger->set_cluster_protocol(CEPH_MON_PROTOCOL); messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH); diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 3bf1cbaf286..18a8847e796 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -300,14 +300,18 @@ int main(int argc, const char **argv) << TEXT_NORMAL << dendl; } - SimpleMessenger *client_messenger = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami)); - client_messenger->set_nonce(getpid()); - SimpleMessenger *cluster_messenger = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami)); - cluster_messenger->set_nonce(getpid()); - SimpleMessenger *messenger_hbin = new SimpleMessenger(g_ceph_context,entity_name_t::OSD(whoami)); - messenger_hbin->set_nonce(getpid()); - SimpleMessenger *messenger_hbout = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami)); - messenger_hbout->set_nonce(getpid()); + SimpleMessenger *client_messenger = new SimpleMessenger(g_ceph_context, + entity_name_t::OSD(whoami), + getpid()); + SimpleMessenger *cluster_messenger = new SimpleMessenger(g_ceph_context, + entity_name_t::OSD(whoami), + getpid()); + SimpleMessenger *messenger_hbin = new SimpleMessenger(g_ceph_context, + entity_name_t::OSD(whoami), + getpid()); + SimpleMessenger *messenger_hbout = new SimpleMessenger(g_ceph_context, + entity_name_t::OSD(whoami), + getpid()); cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL); messenger_hbin->set_cluster_protocol(CEPH_OSD_PROTOCOL); messenger_hbout->set_cluster_protocol(CEPH_OSD_PROTOCOL); diff --git a/src/ceph_syn.cc b/src/ceph_syn.cc index db90373fc57..fb5f22f31ca 100644 --- a/src/ceph_syn.cc +++ b/src/ceph_syn.cc @@ -68,8 +68,9 @@ int main(int argc, const char **argv, char *envp[]) cout << "ceph-syn: starting " << g_conf->num_client << " syn client(s)" << std::endl; for (int i=0; inum_client; i++) { - messengers[i] = new SimpleMessenger(g_ceph_context, entity_name_t(entity_name_t::TYPE_CLIENT,-1)); - messengers[i]->set_nonce(i * 1000000 + getpid()); + messengers[i] = new SimpleMessenger(g_ceph_context, + entity_name_t(entity_name_t::TYPE_CLIENT,-1), + i * 1000000 + getpid()); messengers[i]->bind(g_conf->public_addr); mclients[i] = new MonClient(g_ceph_context); mclients[i]->build_initial_monmap(); diff --git a/src/libcephfs.cc b/src/libcephfs.cc index 377ab45db40..efb1733cc59 100644 --- a/src/libcephfs.cc +++ b/src/libcephfs.cc @@ -77,8 +77,7 @@ public: goto fail; //network connection - messenger = new SimpleMessenger(cct, entity_name_t::CLIENT()); - messenger->set_nonce(msgr_nonce); + messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(), msgr_nonce); //at last the client ret = -1002; diff --git a/src/librados.cc b/src/librados.cc index a494dc4fda0..ef8d3c9012d 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -938,11 +938,10 @@ int librados::RadosClient::connect() err = -ENOMEM; nonce = getpid() + (1000000 * (uint64_t)rados_instance.inc()); - messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1)); + messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1), nonce); if (!messenger) goto out; - messenger->set_nonce(nonce); // require OSDREPLYMUX feature. this means we will fail to talk to // old servers. this is necessary because otherwise we won't know // how to decompose the reply data into its consituent pieces. diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index db1b805df99..e89f1ec66f3 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -223,8 +223,9 @@ int MonClient::get_monmap_privately() bool temp_msgr = false; SimpleMessenger* smessenger = NULL; if (!messenger) { - messenger = smessenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1)); - smessenger->set_nonce(getpid()); + messenger = smessenger = new SimpleMessenger(cct, + entity_name_t::CLIENT(-1), + getpid()); messenger->add_dispatcher_head(this); smessenger->start(); temp_msgr = true; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index ae93e76a63c..20d70582416 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -55,7 +55,7 @@ static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) { * Accepter */ -int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, int avoid_port1, int avoid_port2) +int SimpleMessenger::Accepter::bind(entity_addr_t &bind_addr, int avoid_port1, int avoid_port2) { const md_config_t *conf = msgr->cct->_conf; // bind to a socket @@ -154,7 +154,7 @@ int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, in if (msgr->ms_addr.get_port() == 0) { msgr->ms_addr = listen_addr; - msgr->ms_addr.nonce = nonce; + msgr->ms_addr.nonce = msgr->nonce; } msgr->init_local_pipe(); @@ -176,7 +176,7 @@ int SimpleMessenger::Accepter::rebind(int avoid_port) addr.set_port(0); ldout(msgr->cct,10) << " will try " << addr << dendl; - int r = bind(addr.get_nonce(), addr, old_port, avoid_port); + int r = bind(addr, old_port, avoid_port); if (r == 0) start(); return r; @@ -2392,7 +2392,7 @@ int SimpleMessenger::bind(entity_addr_t bind_addr) lock.Unlock(); // bind to a socket - return accepter.bind(nonce, bind_addr); + return accepter.bind(bind_addr); } int SimpleMessenger::rebind(int avoid_port) diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 6aa218efad0..0e2fc065242 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -66,7 +66,7 @@ private: void *entry(); void stop(); - int bind(uint64_t nonce, entity_addr_t &bind_addr, int avoid_port1=0, int avoid_port2=0); + int bind(entity_addr_t &bind_addr, int avoid_port1=0, int avoid_port2=0); int rebind(int avoid_port); int start(); } accepter; @@ -392,7 +392,6 @@ private: bool need_addr; entity_addr_t ms_addr; uint64_t nonce; - void set_nonce(uint64_t new_nonce) { nonce = new_nonce; } // local bool destination_stopped; @@ -538,12 +537,12 @@ private: int get_proto_version(int peer_type, bool connect); public: - SimpleMessenger(CephContext *cct, entity_name_t name) : + SimpleMessenger(CephContext *cct, entity_name_t name, uint64_t _nonce) : Messenger(cct, name), accepter(this), lock("SimpleMessenger::lock"), did_bind(false), dispatch_throttler(cct->_conf->ms_dispatch_throttle_bytes), need_addr(true), - nonce(0), destination_stopped(false), my_type(name.type()), + nonce(_nonce), destination_stopped(false), my_type(name.type()), global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0), reaper_thread(this), reaper_started(false), reaper_stop(false), dispatch_thread(this), msgr(this), diff --git a/src/testmsgr.cc b/src/testmsgr.cc index 8ff28f3c41f..a7a0d52a032 100644 --- a/src/testmsgr.cc +++ b/src/testmsgr.cc @@ -97,8 +97,8 @@ int main(int argc, const char **argv, const char *envp[]) { g_ceph_context->_conf->set_val("public_addr", sss.c_str()); g_ceph_context->_conf->apply_changes(NULL); SimpleMessenger *rank = new SimpleMessenger(g_ceph_context, - entity_name_t::MON(whoami)); - rank->set_nonce(getpid()); + entity_name_t::MON(whoami), + getpid()); int err = rank->bind(g_ceph_context->_conf->public_addr); if (err < 0) return 1; diff --git a/src/tools/common.cc b/src/tools/common.cc index 376c4103d87..e27c2f67f87 100644 --- a/src/tools/common.cc +++ b/src/tools/common.cc @@ -650,8 +650,8 @@ CephToolCtx* ceph_tool_common_init(ceph_tool_mode_t mode, bool concise) tok = tok_init(NULL); // start up network - messenger = new SimpleMessenger(g_ceph_context, entity_name_t::CLIENT()); - messenger->set_nonce(getpid()); + messenger = new SimpleMessenger(g_ceph_context, entity_name_t::CLIENT(), + getpid()); messenger->start(); ctx->dispatcher = new Admin(ctx.get()); messenger->add_dispatcher_head(ctx->dispatcher);