mirror of
https://github.com/ceph/ceph
synced 2024-12-16 00:15:35 +00:00
msgr: make nonce a required part of the SimpleMessenger constructor.
With that, remove the set_nonce function and the gratuitous passing of nonce around through layers of functions. Signed-off-by: Greg Farnum <gregory.farnum@dreamhost.com>
This commit is contained in:
parent
26e48f4234
commit
091b176016
@ -91,8 +91,8 @@ int main(int argc, const char **argv, const char *envp[]) {
|
||||
|
||||
// start up network
|
||||
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::CLIENT());
|
||||
messenger->set_nonce(getpid());
|
||||
entity_name_t::CLIENT(),
|
||||
getpid());
|
||||
Client *client = new Client(messenger, &mc);
|
||||
if (filer_flags) {
|
||||
client->set_filer_flags(filer_flags);
|
||||
|
@ -70,8 +70,8 @@ static int do_cmds_special_action(const std::string &action,
|
||||
{
|
||||
common_init_finish(g_ceph_context);
|
||||
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::CLIENT());
|
||||
messenger->set_nonce(getpid());
|
||||
entity_name_t::CLIENT(),
|
||||
getpid());
|
||||
int r = messenger->bind(g_conf->public_addr);
|
||||
if (r < 0)
|
||||
return r;
|
||||
@ -232,9 +232,9 @@ int main(int argc, const char **argv)
|
||||
global_print_banner();
|
||||
|
||||
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::MDS(-1));
|
||||
entity_name_t::MDS(-1),
|
||||
getpid());
|
||||
messenger->set_cluster_protocol(CEPH_MDS_PROTOCOL);
|
||||
messenger->set_nonce(getpid());
|
||||
|
||||
cout << "starting " << g_conf->name << " at " << messenger->get_ms_addr()
|
||||
<< std::endl;
|
||||
|
@ -368,7 +368,8 @@ int main(int argc, const char **argv)
|
||||
// bind
|
||||
int rank = monmap.get_rank(g_conf->name.get_id());
|
||||
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::MON(rank));
|
||||
entity_name_t::MON(rank),
|
||||
0);
|
||||
messenger->set_cluster_protocol(CEPH_MON_PROTOCOL);
|
||||
messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
|
||||
|
||||
|
@ -300,14 +300,18 @@ int main(int argc, const char **argv)
|
||||
<< TEXT_NORMAL << dendl;
|
||||
}
|
||||
|
||||
SimpleMessenger *client_messenger = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
|
||||
client_messenger->set_nonce(getpid());
|
||||
SimpleMessenger *cluster_messenger = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
|
||||
cluster_messenger->set_nonce(getpid());
|
||||
SimpleMessenger *messenger_hbin = new SimpleMessenger(g_ceph_context,entity_name_t::OSD(whoami));
|
||||
messenger_hbin->set_nonce(getpid());
|
||||
SimpleMessenger *messenger_hbout = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
|
||||
messenger_hbout->set_nonce(getpid());
|
||||
SimpleMessenger *client_messenger = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::OSD(whoami),
|
||||
getpid());
|
||||
SimpleMessenger *cluster_messenger = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::OSD(whoami),
|
||||
getpid());
|
||||
SimpleMessenger *messenger_hbin = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::OSD(whoami),
|
||||
getpid());
|
||||
SimpleMessenger *messenger_hbout = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::OSD(whoami),
|
||||
getpid());
|
||||
cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL);
|
||||
messenger_hbin->set_cluster_protocol(CEPH_OSD_PROTOCOL);
|
||||
messenger_hbout->set_cluster_protocol(CEPH_OSD_PROTOCOL);
|
||||
|
@ -68,8 +68,9 @@ int main(int argc, const char **argv, char *envp[])
|
||||
|
||||
cout << "ceph-syn: starting " << g_conf->num_client << " syn client(s)" << std::endl;
|
||||
for (int i=0; i<g_conf->num_client; i++) {
|
||||
messengers[i] = new SimpleMessenger(g_ceph_context, entity_name_t(entity_name_t::TYPE_CLIENT,-1));
|
||||
messengers[i]->set_nonce(i * 1000000 + getpid());
|
||||
messengers[i] = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t(entity_name_t::TYPE_CLIENT,-1),
|
||||
i * 1000000 + getpid());
|
||||
messengers[i]->bind(g_conf->public_addr);
|
||||
mclients[i] = new MonClient(g_ceph_context);
|
||||
mclients[i]->build_initial_monmap();
|
||||
|
@ -77,8 +77,7 @@ public:
|
||||
goto fail;
|
||||
|
||||
//network connection
|
||||
messenger = new SimpleMessenger(cct, entity_name_t::CLIENT());
|
||||
messenger->set_nonce(msgr_nonce);
|
||||
messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(), msgr_nonce);
|
||||
|
||||
//at last the client
|
||||
ret = -1002;
|
||||
|
@ -938,11 +938,10 @@ int librados::RadosClient::connect()
|
||||
|
||||
err = -ENOMEM;
|
||||
nonce = getpid() + (1000000 * (uint64_t)rados_instance.inc());
|
||||
messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1));
|
||||
messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1), nonce);
|
||||
if (!messenger)
|
||||
goto out;
|
||||
|
||||
messenger->set_nonce(nonce);
|
||||
// require OSDREPLYMUX feature. this means we will fail to talk to
|
||||
// old servers. this is necessary because otherwise we won't know
|
||||
// how to decompose the reply data into its consituent pieces.
|
||||
|
@ -223,8 +223,9 @@ int MonClient::get_monmap_privately()
|
||||
bool temp_msgr = false;
|
||||
SimpleMessenger* smessenger = NULL;
|
||||
if (!messenger) {
|
||||
messenger = smessenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1));
|
||||
smessenger->set_nonce(getpid());
|
||||
messenger = smessenger = new SimpleMessenger(cct,
|
||||
entity_name_t::CLIENT(-1),
|
||||
getpid());
|
||||
messenger->add_dispatcher_head(this);
|
||||
smessenger->start();
|
||||
temp_msgr = true;
|
||||
|
@ -55,7 +55,7 @@ static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
|
||||
* Accepter
|
||||
*/
|
||||
|
||||
int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, int avoid_port1, int avoid_port2)
|
||||
int SimpleMessenger::Accepter::bind(entity_addr_t &bind_addr, int avoid_port1, int avoid_port2)
|
||||
{
|
||||
const md_config_t *conf = msgr->cct->_conf;
|
||||
// bind to a socket
|
||||
@ -154,7 +154,7 @@ int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, in
|
||||
|
||||
if (msgr->ms_addr.get_port() == 0) {
|
||||
msgr->ms_addr = listen_addr;
|
||||
msgr->ms_addr.nonce = nonce;
|
||||
msgr->ms_addr.nonce = msgr->nonce;
|
||||
}
|
||||
|
||||
msgr->init_local_pipe();
|
||||
@ -176,7 +176,7 @@ int SimpleMessenger::Accepter::rebind(int avoid_port)
|
||||
addr.set_port(0);
|
||||
|
||||
ldout(msgr->cct,10) << " will try " << addr << dendl;
|
||||
int r = bind(addr.get_nonce(), addr, old_port, avoid_port);
|
||||
int r = bind(addr, old_port, avoid_port);
|
||||
if (r == 0)
|
||||
start();
|
||||
return r;
|
||||
@ -2392,7 +2392,7 @@ int SimpleMessenger::bind(entity_addr_t bind_addr)
|
||||
lock.Unlock();
|
||||
|
||||
// bind to a socket
|
||||
return accepter.bind(nonce, bind_addr);
|
||||
return accepter.bind(bind_addr);
|
||||
}
|
||||
|
||||
int SimpleMessenger::rebind(int avoid_port)
|
||||
|
@ -66,7 +66,7 @@ private:
|
||||
|
||||
void *entry();
|
||||
void stop();
|
||||
int bind(uint64_t nonce, entity_addr_t &bind_addr, int avoid_port1=0, int avoid_port2=0);
|
||||
int bind(entity_addr_t &bind_addr, int avoid_port1=0, int avoid_port2=0);
|
||||
int rebind(int avoid_port);
|
||||
int start();
|
||||
} accepter;
|
||||
@ -392,7 +392,6 @@ private:
|
||||
bool need_addr;
|
||||
entity_addr_t ms_addr;
|
||||
uint64_t nonce;
|
||||
void set_nonce(uint64_t new_nonce) { nonce = new_nonce; }
|
||||
|
||||
// local
|
||||
bool destination_stopped;
|
||||
@ -538,12 +537,12 @@ private:
|
||||
int get_proto_version(int peer_type, bool connect);
|
||||
|
||||
public:
|
||||
SimpleMessenger(CephContext *cct, entity_name_t name) :
|
||||
SimpleMessenger(CephContext *cct, entity_name_t name, uint64_t _nonce) :
|
||||
Messenger(cct, name),
|
||||
accepter(this),
|
||||
lock("SimpleMessenger::lock"), did_bind(false),
|
||||
dispatch_throttler(cct->_conf->ms_dispatch_throttle_bytes), need_addr(true),
|
||||
nonce(0), destination_stopped(false), my_type(name.type()),
|
||||
nonce(_nonce), destination_stopped(false), my_type(name.type()),
|
||||
global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
|
||||
reaper_thread(this), reaper_started(false), reaper_stop(false),
|
||||
dispatch_thread(this), msgr(this),
|
||||
|
@ -97,8 +97,8 @@ int main(int argc, const char **argv, const char *envp[]) {
|
||||
g_ceph_context->_conf->set_val("public_addr", sss.c_str());
|
||||
g_ceph_context->_conf->apply_changes(NULL);
|
||||
SimpleMessenger *rank = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::MON(whoami));
|
||||
rank->set_nonce(getpid());
|
||||
entity_name_t::MON(whoami),
|
||||
getpid());
|
||||
int err = rank->bind(g_ceph_context->_conf->public_addr);
|
||||
if (err < 0)
|
||||
return 1;
|
||||
|
@ -650,8 +650,8 @@ CephToolCtx* ceph_tool_common_init(ceph_tool_mode_t mode, bool concise)
|
||||
tok = tok_init(NULL);
|
||||
|
||||
// start up network
|
||||
messenger = new SimpleMessenger(g_ceph_context, entity_name_t::CLIENT());
|
||||
messenger->set_nonce(getpid());
|
||||
messenger = new SimpleMessenger(g_ceph_context, entity_name_t::CLIENT(),
|
||||
getpid());
|
||||
messenger->start();
|
||||
ctx->dispatcher = new Admin(ctx.get());
|
||||
messenger->add_dispatcher_head(ctx->dispatcher);
|
||||
|
Loading…
Reference in New Issue
Block a user