1
0
mirror of https://github.com/ceph/ceph synced 2025-04-01 23:02:17 +00:00

constifying entity_inst_t, starting to rework messenger lookup stuff...

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@906 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sageweil 2006-10-03 20:13:56 +00:00
parent f55e7d6bea
commit c533fac1dd
7 changed files with 110 additions and 36 deletions

View File

@ -289,14 +289,14 @@ class entity_inst_t {
entity_inst_t(tcpaddr_t& a, int r) : addr(a), rank(r) {}
};
inline bool operator==(entity_inst_t& a, entity_inst_t& b) { return a.rank == b.rank && a.addr == b.addr; }
inline bool operator!=(entity_inst_t& a, entity_inst_t& b) { return !(a == b); }
inline bool operator>(entity_inst_t& a, entity_inst_t& b) { return a.rank > b.rank; }
inline bool operator>=(entity_inst_t& a, entity_inst_t& b) { return a.rank >= b.rank; }
inline bool operator<(entity_inst_t& a, entity_inst_t& b) { return a.rank < b.rank; }
inline bool operator<=(entity_inst_t& a, entity_inst_t& b) { return a.rank <= b.rank; }
inline bool operator==(const entity_inst_t& a, const entity_inst_t& b) { return a.rank == b.rank && a.addr == b.addr; }
inline bool operator!=(const entity_inst_t& a, const entity_inst_t& b) { return !(a == b); }
inline bool operator>(const entity_inst_t& a, const entity_inst_t& b) { return a.rank > b.rank; }
inline bool operator>=(const entity_inst_t& a, const entity_inst_t& b) { return a.rank >= b.rank; }
inline bool operator<(const entity_inst_t& a, const entity_inst_t& b) { return a.rank < b.rank; }
inline bool operator<=(const entity_inst_t& a, const entity_inst_t& b) { return a.rank <= b.rank; }
inline ostream& operator<<(ostream& out, entity_inst_t &i)
inline ostream& operator<<(ostream& out, const entity_inst_t &i)
{
return out << "rank" << i.rank << "_" << i.addr;
}

View File

@ -86,6 +86,10 @@ class Messenger {
// send message
virtual void prepare_send_message(msg_addr_t dest) {}
virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0;
virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst) {
return send_message(m, dest); // overload me!
}
// make a procedure call
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);

View File

@ -904,7 +904,7 @@ int Rank::start_rank(tcpaddr_t& ns)
/* connect_rank
* NOTE: assumes rank.lock held.
*/
Rank::Sender *Rank::connect_rank(entity_inst_t& inst)
Rank::Sender *Rank::connect_rank(const entity_inst_t& inst)
{
assert(rank.lock.is_locked());
assert(inst != rank.my_inst);
@ -1091,6 +1091,63 @@ void Rank::prepare_dest(msg_addr_t dest)
lock.Unlock();
}
void Rank::submit_message(Message *m, const entity_inst_t& dest_inst)
{
const msg_addr_t dest = m->get_dest();
// lookup
EntityMessenger *entity = 0;
Sender *sender = 0;
lock.Lock();
{
// local?
if (dest_inst.rank == my_inst.rank) {
if (local.count(dest)) {
// local
dout(20) << "submit_message " << *m << " dest " << dest << " local" << endl;
if (g_conf.ms_single_dispatch) {
_submit_single_dispatch(m);
} else {
entity = local[dest];
}
} else {
// mid-register
dout(20) << "submit_message " << *m << " dest " << dest << " local but mid-register, waiting." << endl;
assert(0);
waiting_for_lookup[dest].push_back(m);
}
}
else {
// remote.
if (rank_sender.count( dest_inst.rank )) {
//&&
//rank_sender[dest_inst.rank]->inst == dest_inst) {
dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connected." << endl;
// connected.
sender = rank_sender[ dest_inst.rank ];
} else {
dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connecting." << endl;
// not connected.
sender = connect_rank( dest_inst );
}
}
}
lock.Unlock();
// do it
if (entity) {
// local!
dout(20) << "submit_message " << *m << " dest " << dest << " local, queueing" << endl;
entity->queue_message(m);
}
else if (sender) {
// remote!
dout(20) << "submit_message " << *m << " dest " << dest << " remote, sending" << endl;
sender->send(m);
}
}
void Rank::submit_message(Message *m)
{
@ -1507,19 +1564,37 @@ void Rank::EntityMessenger::prepare_send_message(msg_addr_t dest)
rank.prepare_dest(dest);
}
int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst)
{
// set envelope
m->set_source(get_myaddr(), 0);
m->set_dest(dest, 0);
dout(1) << "--> "
<< m->get_source() //<< ':' << m->get_source_port()
<< " to " << m->get_dest() //<< ':' << m->get_dest_port()
<< " ---- " << m->get_type_name()
<< " ---- " << rank.my_inst << " --> " << inst
<< " ---- " << m
<< endl;
rank.submit_message(m, inst);
return 0;
}
int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
{
// set envelope
m->set_source(get_myaddr(), fromport);
m->set_dest(dest, port);
m->set_lamport_send_stamp( get_lamport() );
dout(1) << "--> "
<< m->get_source() << ':' << m->get_source_port()
<< " to " << m->get_dest() << ':' << m->get_dest_port()
<< m->get_source() //<< ':' << m->get_source_port()
<< " to " << m->get_dest() //<< ':' << m->get_dest_port()
<< " ---- " << m->get_type_name()
<< " ---- " << rank.my_inst
<< " ---- " << rank.my_inst << " --> ?"
<< " ---- " << m
<< endl;
@ -1609,14 +1684,3 @@ void Rank::mark_up(msg_addr_t a, entity_inst_t& i)
lock.Unlock();
}
/*void Rank::EntityMessenger::reset(msg_addr_t a)
{
assert(a != get_myaddr());
if (rank.my_rank == 0) return;
rank.lock.Lock();
rank.down.erase(a);
rank.reset_peer(a);
rank.lock.Unlock();
}
*/

View File

@ -106,7 +106,7 @@ class Rank : public Dispatcher {
Mutex lock;
Cond cond;
Sender(entity_inst_t& i) : inst(i), done(false), sd(0) {}
Sender(const entity_inst_t& i) : inst(i), done(false), sd(0) {}
virtual ~Sender() {}
void *entry();
@ -187,6 +187,7 @@ class Rank : public Dispatcher {
virtual int shutdown();
virtual void prepare_send_message(msg_addr_t dest);
virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst);
virtual void mark_down(msg_addr_t a, entity_inst_t& i);
virtual void mark_up(msg_addr_t a, entity_inst_t& i);
@ -263,7 +264,7 @@ class Rank : public Dispatcher {
void handle_register_ack(class MNSRegisterAck *m);
void handle_lookup_reply(class MNSLookupReply *m);
Sender *connect_rank(entity_inst_t& inst);
Sender *connect_rank(const entity_inst_t& inst);
void mark_down(msg_addr_t addr, entity_inst_t& i);
void mark_up(msg_addr_t addr, entity_inst_t& i);
@ -285,6 +286,7 @@ public:
EntityMessenger *register_entity(msg_addr_t addr);
void unregister_entity(EntityMessenger *ms);
void submit_message(Message *m, const entity_inst_t& inst);
void prepare_dest(msg_addr_t dest);
void submit_message(Message *m);
void submit_messages(list<Message*>& ls);

View File

@ -8,7 +8,7 @@
typedef struct sockaddr_in tcpaddr_t;
inline ostream& operator<<(ostream& out, tcpaddr_t &a)
inline ostream& operator<<(ostream& out, const tcpaddr_t &a)
{
unsigned char addr[4];
memcpy((char*)addr, (char*)&a.sin_addr.s_addr, 4);

View File

@ -67,7 +67,7 @@
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cerr << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
char *osd_base_path = "./osddata";
char *ebofs_base_path = "./ebofsdev";
char *ebofs_base_path = "./dev";
@ -123,9 +123,9 @@ OSD::OSD(int id, Messenger *m, char *dev)
// init object store
// try in this order:
// ebofsdev/$num
// ebofsdev/$hostname
// ebofsdev/all
// dev/osd$num
// dev/osd.$hostname
// dev/osd.all
if (dev) {
strcpy(dev_path,dev);
@ -134,14 +134,14 @@ OSD::OSD(int id, Messenger *m, char *dev)
hostname[0] = 0;
gethostname(hostname,100);
sprintf(dev_path, "%s/%d", ebofs_base_path, whoami);
sprintf(dev_path, "%s/osd%d", ebofs_base_path, whoami);
struct stat sta;
if (::lstat(dev_path, &sta) != 0)
sprintf(dev_path, "%s/%s", ebofs_base_path, hostname);
sprintf(dev_path, "%s/osd.%s", ebofs_base_path, hostname);
if (::lstat(dev_path, &sta) != 0)
sprintf(dev_path, "%s/all", ebofs_base_path);
sprintf(dev_path, "%s/osd.all", ebofs_base_path);
}
if (g_conf.ebofs) {
@ -484,7 +484,7 @@ void OSD::heartbeat()
i++) {
_share_map_outgoing( MSG_ADDR_OSD(*i) );
messenger->send_message(new MOSDPing(osdmap->get_epoch()),
MSG_ADDR_OSD(*i));
MSG_ADDR_OSD(*i), osdmap->get_osd_inst(*i));
}
if (logger) logger->set("pingset", pingset.size());

View File

@ -143,7 +143,11 @@ private:
bool is_up(int osd) { return !is_down(osd); }
bool is_out(int osd) { return out_osds.count(osd); }
bool is_in(int osd) { return !is_out(osd); }
const entity_inst_t& get_inst(int osd) {
assert(osd_inst.count(osd));
return osd_inst[osd];
}
bool get_inst(int osd, entity_inst_t& inst) {
if (osd_inst.count(osd)) {
inst = osd_inst[osd];