msgr: add ms_handle_connect callback

Called when an outgoing connection succeeds.
This commit is contained in:
Sage Weil 2009-10-13 11:51:32 -07:00
parent d8f771e981
commit 7e5d162edd
4 changed files with 33 additions and 5 deletions

View File

@ -29,6 +29,9 @@ public:
// how i receive messages
virtual bool ms_dispatch(Message *m) = 0;
// after a connection connects
virtual void ms_handle_connect(Connection *con) { };
/*
* on any connection reset.
* this indicates that the ordered+reliable delivery semantics have

View File

@ -104,6 +104,12 @@ protected:
<< dendl;
assert(0);
}
void ms_deliver_handle_connect(Connection *con) {
for (list<Dispatcher*>::iterator p = dispatchers.begin();
p != dispatchers.end();
p++)
(*p)->ms_handle_connect(con);
}
void ms_deliver_handle_reset(Connection *con) {
for (list<Dispatcher*>::iterator p = dispatchers.begin();
p != dispatchers.end();

View File

@ -287,14 +287,21 @@ void SimpleMessenger::Endpoint::dispatch_entry()
}
Message *m = ls.front();
ls.pop_front();
if ((long)m == BAD_REMOTE_RESET) {
if ((long)m == D_BAD_REMOTE_RESET) {
lock.Lock();
Connection *con = remote_reset_q.front();
remote_reset_q.pop_front();
lock.Unlock();
ms_deliver_handle_remote_reset(con);
con->put();
} else if ((long)m == BAD_RESET) {
} else if ((long)m == D_CONNECT) {
lock.Lock();
Connection *con = connect_q.front();
connect_q.pop_front();
lock.Unlock();
ms_deliver_handle_connect(con);
con->put();
} else if ((long)m == D_BAD_RESET) {
lock.Lock();
Connection *con = reset_q.front();
reset_q.pop_front();
@ -1002,6 +1009,10 @@ int SimpleMessenger::Pipe::connect()
backoff = utime_t();
dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
for (unsigned i=0; i<rank->local.size(); i++)
if (rank->local[i])
rank->local[i]->queue_connect(connection_state->get());
if (!reader_running) {
dout(20) << "connect starting reader" << dendl;
start_reader();

View File

@ -296,21 +296,29 @@ private:
lock.Unlock();
}
enum { BAD_REMOTE_RESET, BAD_RESET };
enum { D_CONNECT, D_BAD_REMOTE_RESET, D_BAD_RESET };
list<Connection*> connect_q;
list<Connection*> remote_reset_q;
list<Connection*> reset_q;
void queue_connect(Connection *con) {
lock.Lock();
connect_q.push_back(con);
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_CONNECT);
cond.Signal();
lock.Unlock();
}
void queue_remote_reset(Connection *con) {
lock.Lock();
remote_reset_q.push_back(con);
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET);
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_REMOTE_RESET);
cond.Signal();
lock.Unlock();
}
void queue_reset(Connection *con) {
lock.Lock();
reset_q.push_back(con);
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET);
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_RESET);
cond.Signal();
lock.Unlock();
}