AsyncConnection: Avoid calling callback after delteing AsyncMessenger

Now when calling mark_down/mark_down_all, it will dispatch a reset event.
If we call Messenger::shutdown/wait, and it will let reset event called after
Messenger dealloc.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
This commit is contained in:
Haomai Wang 2015-01-10 21:35:04 +08:00
parent 9a84a905fd
commit 34cbd4c76c
5 changed files with 43 additions and 19 deletions

View File

@ -2111,7 +2111,6 @@ void AsyncConnection::mark_down()
stopping.set(1);
Mutex::Locker l(lock);
_stop();
center->dispatch_event_external(reset_handler);
}
void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)

View File

@ -286,6 +286,10 @@ class AsyncConnection : public Connection {
void process();
void wakeup_from(uint64_t id);
void local_deliver();
void stop() {
mark_down();
center->dispatch_event_external(reset_handler);
}
}; /* AsyncConnection */
typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;

View File

@ -106,13 +106,19 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
return -errno;
}
int r = net.set_nonblock(listen_sd);
if (r < 0) {
::close(listen_sd);
listen_sd = -1;
return -errno;
}
// use whatever user specified (if anything)
entity_addr_t listen_addr = bind_addr;
listen_addr.set_family(family);
/* bind to port */
int rc = -1;
int r = -1;
r = -1;
for (int i = 0; i < conf->ms_bind_retry_count; i++) {
if (i > 0) {
@ -167,6 +173,8 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
if (rc < 0) {
lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count
<< " attempts: " << cpp_strerror(errno) << dendl;
::close(listen_sd);
listen_sd = -1;
return r;
}
@ -176,6 +184,8 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
if (rc < 0) {
rc = -errno;
lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl;
::close(listen_sd);
listen_sd = -1;
return rc;
}
@ -187,6 +197,8 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
rc = -errno;
lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr
<< ": " << cpp_strerror(rc) << dendl;
::close(listen_sd);
listen_sd = -1;
return rc;
}
@ -252,10 +264,17 @@ void Processor::accept()
ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl;
msgr->add_accept(sd);
break;
continue;
} else {
ldout(msgr->cct, 0) << __func__ << " no incoming connection? sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
if (errno == EINTR) {
continue;
} else if (errno == EAGAIN) {
break;
} else {
errors++;
ldout(msgr->cct, 20) << __func__ << " no incoming connection? sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
}
}
}
}
@ -371,7 +390,6 @@ void WorkerPool::barrier()
{
ldout(cct, 10) << __func__ << " started." << dendl;
pthread_t cur = pthread_self();
uint64_t send = 0;
for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
assert(cur != (*it)->center.get_owner());
(*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this)));
@ -393,7 +411,7 @@ void WorkerPool::barrier()
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
processor(this, _nonce),
processor(this, cct, _nonce),
lock("AsyncMessenger::lock"),
nonce(_nonce), need_addr(true), did_bind(false),
global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
@ -663,8 +681,8 @@ void AsyncMessenger::mark_down_all()
for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
q != accepting_conns.end(); ++q) {
AsyncConnectionRef p = *q;
ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
p->mark_down();
ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl;
p->stop();
}
accepting_conns.clear();
@ -673,15 +691,18 @@ void AsyncMessenger::mark_down_all()
AsyncConnectionRef p = it->second;
ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
conns.erase(it);
p->mark_down();
p->stop();
}
while (!deleted_conns.empty()) {
set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
AsyncConnectionRef p = *it;
ldout(cct, 5) << __func__ << " delete " << p << dendl;
p->put();
deleted_conns.erase(it);
{
Mutex::Locker l(deleted_lock);
while (!deleted_conns.empty()) {
set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
AsyncConnectionRef p = *it;
ldout(cct, 5) << __func__ << " delete " << p << dendl;
p->put();
deleted_conns.erase(it);
}
}
lock.Unlock();
}
@ -692,7 +713,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr)
AsyncConnectionRef p = _lookup_conn(addr);
if (p) {
ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
p->mark_down();
p->stop();
} else {
ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
}

View File

@ -63,12 +63,13 @@ class Worker : public Thread {
*/
class Processor {
AsyncMessenger *msgr;
NetHandler net;
Worker *worker;
int listen_sd;
uint64_t nonce;
public:
Processor(AsyncMessenger *r, uint64_t n): msgr(r), worker(NULL), listen_sd(-1), nonce(n) {}
Processor(AsyncMessenger *r, CephContext *c, uint64_t n): msgr(r), net(c), worker(NULL), listen_sd(-1), nonce(n) {}
void stop();
int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);

View File

@ -149,7 +149,6 @@ class FakeDispatcher : public Dispatcher {
cerr << __func__ << con << std::endl;
Session *s = static_cast<Session*>(con->get_priv());
if (s) {
Mutex::Locker l(s->lock);
s->con.reset(NULL); // break con <-> session ref cycle
con->set_priv(NULL); // break ref <-> session cycle, if any
s->put();