mirror of
https://github.com/ceph/ceph
synced 2025-02-21 09:57:26 +00:00
msgr: Remove the SimpleMessenger start/start_with_nonce distinction.
Instead, have a settable nonce value that you can fill in any time after construction and that it uses during regular start(). Signed-off-by: Greg Farnum <gregory.farnum@dreamhost.com>
This commit is contained in:
parent
ffa595598d
commit
ef244773ee
@ -90,7 +90,9 @@ int main(int argc, const char **argv, const char *envp[]) {
|
||||
return -1;
|
||||
|
||||
// start up network
|
||||
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context, entity_name_t::CLIENT());
|
||||
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::CLIENT());
|
||||
messenger->set_nonce(getpid());
|
||||
Client *client = new Client(messenger, &mc);
|
||||
if (filer_flags) {
|
||||
client->set_filer_flags(filer_flags);
|
||||
@ -116,7 +118,7 @@ int main(int argc, const char **argv, const char *envp[]) {
|
||||
::close(fd[0]);
|
||||
|
||||
cout << "ceph-fuse[" << getpid() << "]: starting ceph client" << std::endl;
|
||||
messenger->start_with_nonce(getpid());
|
||||
messenger->start();
|
||||
|
||||
// start client
|
||||
client->init();
|
||||
|
@ -72,6 +72,7 @@ static int do_cmds_special_action(const std::string &action,
|
||||
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context,
|
||||
entity_name_t::CLIENT());
|
||||
int r = messenger->bind(g_conf->public_addr, getpid());
|
||||
messenger->set_nonce(getpid());
|
||||
if (r < 0)
|
||||
return r;
|
||||
MonClient mc(g_ceph_context);
|
||||
|
@ -302,10 +302,11 @@ int main(int argc, const char **argv)
|
||||
|
||||
SimpleMessenger *client_messenger = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
|
||||
SimpleMessenger *cluster_messenger = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
|
||||
SimpleMessenger *messenger_hbin = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
|
||||
SimpleMessenger *messenger_hbin = new SimpleMessenger(g_ceph_context,entity_name_t::OSD(whoami));
|
||||
SimpleMessenger *messenger_hbout = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
|
||||
cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL);
|
||||
messenger_hbin->set_cluster_protocol(CEPH_OSD_PROTOCOL);
|
||||
messenger_hbin->set_nonce(getpid());
|
||||
messenger_hbout->set_cluster_protocol(CEPH_OSD_PROTOCOL);
|
||||
|
||||
r = client_messenger->bind(g_conf->public_addr, getpid());
|
||||
@ -394,7 +395,7 @@ int main(int argc, const char **argv)
|
||||
global_init_shutdown_stderr(g_ceph_context);
|
||||
|
||||
client_messenger->start();
|
||||
messenger_hbin->start_with_nonce(getpid());
|
||||
messenger_hbin->start();
|
||||
messenger_hbout->start();
|
||||
cluster_messenger->start();
|
||||
|
||||
|
@ -78,6 +78,7 @@ public:
|
||||
|
||||
//network connection
|
||||
messenger = new SimpleMessenger(cct, entity_name_t::CLIENT());
|
||||
messenger->set_nonce(msgr_nonce);
|
||||
|
||||
//at last the client
|
||||
ret = -1002;
|
||||
@ -86,7 +87,7 @@ public:
|
||||
goto fail;
|
||||
|
||||
ret = -1003;
|
||||
if (messenger->start_with_nonce(msgr_nonce) != 0)
|
||||
if (messenger->start() != 0)
|
||||
goto fail;
|
||||
|
||||
ret = client->init();
|
||||
|
@ -937,10 +937,12 @@ int librados::RadosClient::connect()
|
||||
goto out;
|
||||
|
||||
err = -ENOMEM;
|
||||
nonce = getpid() + (1000000 * (uint64_t)rados_instance.inc());
|
||||
messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1));
|
||||
if (!messenger)
|
||||
goto out;
|
||||
|
||||
messenger->set_nonce(nonce);
|
||||
// require OSDREPLYMUX feature. this means we will fail to talk to
|
||||
// old servers. this is necessary because otherwise we won't know
|
||||
// how to decompose the reply data into its consituent pieces.
|
||||
@ -960,9 +962,7 @@ int librados::RadosClient::connect()
|
||||
|
||||
messenger->add_dispatcher_head(this);
|
||||
|
||||
nonce = getpid() + (1000000 * (uint64_t)rados_instance.inc());
|
||||
|
||||
messenger->start_with_nonce(nonce);
|
||||
messenger->start();
|
||||
messenger->add_dispatcher_head(this);
|
||||
|
||||
ldout(cct, 1) << "setting wanted keys" << dendl;
|
||||
|
@ -56,7 +56,7 @@ void Dumper::init(int rank)
|
||||
objecter->set_client_incarnation(0);
|
||||
|
||||
messenger->add_dispatcher_head(this);
|
||||
messenger->start_with_nonce(getpid());
|
||||
messenger->start();
|
||||
|
||||
monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD|CEPH_ENTITY_TYPE_MDS);
|
||||
monc->set_messenger(messenger);
|
||||
|
@ -66,7 +66,7 @@ void Resetter::init(int rank)
|
||||
objecter->set_client_incarnation(0);
|
||||
|
||||
messenger->add_dispatcher_head(this);
|
||||
messenger->start_with_nonce(getpid());
|
||||
messenger->start();
|
||||
|
||||
monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD|CEPH_ENTITY_TYPE_MDS);
|
||||
monc->set_messenger(messenger);
|
||||
|
@ -224,8 +224,9 @@ int MonClient::get_monmap_privately()
|
||||
SimpleMessenger* smessenger = NULL;
|
||||
if (!messenger) {
|
||||
messenger = smessenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1));
|
||||
smessenger->set_nonce(getpid());
|
||||
messenger->add_dispatcher_head(this);
|
||||
smessenger->start_with_nonce(getpid());
|
||||
smessenger->start();
|
||||
temp_msgr = true;
|
||||
}
|
||||
|
||||
|
@ -2402,13 +2402,13 @@ int SimpleMessenger::rebind(int avoid_port)
|
||||
return accepter.rebind(avoid_port);
|
||||
}
|
||||
|
||||
int SimpleMessenger::start_with_nonce(uint64_t nonce)
|
||||
int SimpleMessenger::start()
|
||||
{
|
||||
lock.Lock();
|
||||
ldout(cct,1) << "messenger.start" << dendl;
|
||||
|
||||
// register at least one entity, first!
|
||||
assert(my_type >= 0);
|
||||
assert(my_type >= 0);
|
||||
|
||||
assert(!started);
|
||||
started = true;
|
||||
|
@ -392,6 +392,8 @@ private:
|
||||
// where i listen
|
||||
bool need_addr;
|
||||
entity_addr_t ms_addr;
|
||||
uint64_t nonce;
|
||||
void set_nonce(uint64_t new_nonce) { nonce = new_nonce; }
|
||||
|
||||
// local
|
||||
bool destination_stopped;
|
||||
@ -512,7 +514,7 @@ public:
|
||||
accepter(this),
|
||||
lock("SimpleMessenger::lock"), started(false), did_bind(false),
|
||||
dispatch_throttler(cct->_conf->ms_dispatch_throttle_bytes), need_addr(true),
|
||||
destination_stopped(false), my_type(name.type()),
|
||||
nonce(0), destination_stopped(false), my_type(name.type()),
|
||||
global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
|
||||
reaper_thread(this), reaper_started(false), reaper_stop(false),
|
||||
dispatch_thread(this), msgr(this),
|
||||
@ -527,14 +529,8 @@ public:
|
||||
delete dispatch_queue.local_pipe;
|
||||
}
|
||||
|
||||
//void set_listen_addr(tcpaddr_t& a);
|
||||
|
||||
int bind(entity_addr_t bind_addr, int64_t nonce);
|
||||
int start_with_nonce(uint64_t nonce); // if we didn't bind
|
||||
virtual int start() { // if we did
|
||||
assert(did_bind);
|
||||
return start_with_nonce(0);
|
||||
}
|
||||
virtual int start();
|
||||
virtual void wait();
|
||||
|
||||
void set_cluster_protocol(int p) {
|
||||
|
@ -651,7 +651,8 @@ CephToolCtx* ceph_tool_common_init(ceph_tool_mode_t mode, bool concise)
|
||||
|
||||
// start up network
|
||||
messenger = new SimpleMessenger(g_ceph_context, entity_name_t::CLIENT());
|
||||
messenger->start_with_nonce(getpid());
|
||||
messenger->set_nonce(getpid());
|
||||
messenger->start();
|
||||
ctx->dispatcher = new Admin(ctx.get());
|
||||
messenger->add_dispatcher_head(ctx->dispatcher);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user