msgr: explicitly specify internal cluster protocol

Replace case statement based on my_type.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2012-01-04 13:54:11 -08:00
parent b8be338226
commit 9986553c71
5 changed files with 41 additions and 26 deletions

View File

@ -211,6 +211,8 @@ int main(int argc, const char **argv)
global_print_banner();
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context);
messenger->set_cluster_protocol(CEPH_MDS_PROTOCOL);
messenger->bind(g_conf->public_addr, getpid());
cout << "starting " << g_conf->name << " at " << messenger->get_ms_addr()
<< std::endl;

View File

@ -353,6 +353,8 @@ int main(int argc, const char **argv)
// bind
SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context);
messenger->set_cluster_protocol(CEPH_MON_PROTOCOL);
int rank = monmap.get_rank(g_conf->name.get_id());
global_print_banner();

View File

@ -275,6 +275,10 @@ int main(int argc, const char **argv)
SimpleMessenger *messenger_hbin = new SimpleMessenger(g_ceph_context);
SimpleMessenger *messenger_hbout = new SimpleMessenger(g_ceph_context);
cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL);
messenger_hbin->set_cluster_protocol(CEPH_OSD_PROTOCOL);
messenger_hbout->set_cluster_protocol(CEPH_OSD_PROTOCOL);
client_messenger->bind(g_conf->public_addr, getpid());
cluster_messenger->bind(g_conf->cluster_addr, getpid());

View File

@ -500,33 +500,12 @@ void SimpleMessenger::set_ip(entity_addr_t &addr)
}
}
/**************************************
* Pipe
*/
#undef dout_prefix
#define dout_prefix _pipe_prefix(_dout)
ostream& SimpleMessenger::Pipe::_pipe_prefix(std::ostream *_dout) {
return *_dout << "-- " << msgr->ms_addr << " >> " << peer_addr << " pipe(" << this
<< " sd=" << sd
<< " pgs=" << peer_global_seq
<< " cs=" << connect_seq
<< " l=" << policy.lossy
<< ").";
}
static int get_proto_version(int my_type, int peer_type, bool connect)
int SimpleMessenger::get_proto_version(int peer_type, bool connect)
{
// set reply protocol version
if (peer_type == my_type) {
// internal
switch (my_type) {
case CEPH_ENTITY_TYPE_OSD: return CEPH_OSD_PROTOCOL;
case CEPH_ENTITY_TYPE_MDS: return CEPH_MDS_PROTOCOL;
case CEPH_ENTITY_TYPE_MON: return CEPH_MON_PROTOCOL;
}
return cluster_protocol;
} else {
// public
if (connect) {
@ -546,6 +525,22 @@ static int get_proto_version(int my_type, int peer_type, bool connect)
return 0;
}
/**************************************
* Pipe
*/
#undef dout_prefix
#define dout_prefix _pipe_prefix(_dout)
ostream& SimpleMessenger::Pipe::_pipe_prefix(std::ostream *_dout) {
return *_dout << "-- " << msgr->ms_addr << " >> " << peer_addr << " pipe(" << this
<< " sd=" << sd
<< " pgs=" << peer_global_seq
<< " cs=" << connect_seq
<< " l=" << policy.lossy
<< ").";
}
void SimpleMessenger::Pipe::queue_received(Message *m, int priority)
{
assert(pipe_lock.is_locked());
@ -736,7 +731,7 @@ int SimpleMessenger::Pipe::accept()
<< dendl;
memset(&reply, 0, sizeof(reply));
reply.protocol_version = get_proto_version(msgr->my_type, peer_type, false);
reply.protocol_version = msgr->get_proto_version(peer_type, false);
// mismatch?
ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version
@ -1137,7 +1132,7 @@ int SimpleMessenger::Pipe::connect()
connect.host_type = msgr->my_type;
connect.global_seq = gseq;
connect.connect_seq = cseq;
connect.protocol_version = get_proto_version(msgr->my_type, peer_type, true);
connect.protocol_version = msgr->get_proto_version(peer_type, true);
connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
if (authorizer)

View File

@ -551,6 +551,11 @@ private:
SimpleMessenger *msgr; //hack to make dout macro work, will fix
int timeout;
/// internal cluster protocol version, if any, for talking to entities of the same type.
int cluster_protocol;
int get_proto_version(int peer_type, bool connect);
public:
SimpleMessenger(CephContext *cct) :
@ -561,7 +566,10 @@ public:
destination_stopped(true), my_type(-1),
global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
reaper_thread(this), reaper_started(false), reaper_stop(false),
dispatch_thread(this), msgr(this) {
dispatch_thread(this), msgr(this),
timeout(0),
cluster_protocol(0)
{
// for local dmsg delivery
dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
}
@ -579,6 +587,10 @@ public:
}
void wait();
void set_cluster_protocol(int p) {
cluster_protocol = p;
}
int write_pid_file(int pid);
int rebind(int avoid_port);