AsyncConnection: Enhance replace process

Make handle_connect_msg follow lock rule: unlock any lock before acquire
messenger's lock. Otherwise, deadlock will happen.

Enhance lock condition check because connection's state maybe change while
unlock itself and lock again.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
This commit is contained in:
Haomai Wang 2015-01-13 11:54:54 +08:00
parent a1753902dc
commit 50771dd7e6
3 changed files with 66 additions and 27 deletions

View File

@ -1469,7 +1469,7 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co
int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl, int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl,
bufferlist &authorizer_reply) bufferlist &authorizer_reply)
{ {
int r; int r = 0;
ceph_msg_connect_reply reply; ceph_msg_connect_reply reply;
bufferlist reply_bl; bufferlist reply_bl;
uint64_t existing_seq = -1; uint64_t existing_seq = -1;
@ -1521,7 +1521,16 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
ldout(async_msgr->cct, 10) << __func__ << " accept: setting up session_security." << dendl; ldout(async_msgr->cct, 10) << __func__ << " accept: setting up session_security." << dendl;
// existing? // existing?
lock.Unlock();
AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr); AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
lock.Lock();
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down, state="
<< get_state_name(state) << dendl;
assert(state == STATE_CLOSED);
goto fail;
}
if (existing) { if (existing) {
if (connect.global_seq < existing->peer_global_seq) { if (connect.global_seq < existing->peer_global_seq) {
ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
@ -1633,18 +1642,21 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
} }
ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl; ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
// In order to avoid dead lock, here need to lock in ordering. // There is no possible that existing connection will acquire this lock
// It may be another thread access this connection between unlock and lock
// call, this is rely to EventCenter to guarantee only one thread can access
// one connection.
lock.Unlock();
if (existing->sd > sd) {
existing->lock.Lock();
lock.Lock();
} else {
lock.Lock();
existing->lock.Lock(); existing->lock.Lock();
if (existing->replacing || existing->state == STATE_CLOSED) {
ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
<< " state=" << get_state_name(existing->state) << dendl;
reply.connect_seq = existing->connect_seq + 1;
r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
existing->lock.Unlock();
if (r < 0) {
goto fail;
} }
return r;
}
// Here we use "_stop" instead of "mark_down" because "mark_down" is a async // Here we use "_stop" instead of "mark_down" because "mark_down" is a async
// operation, but now we need ensure all variables in `existing` is cleaned up // operation, but now we need ensure all variables in `existing` is cleaned up
// and we will reuse it next. // and we will reuse it next.
@ -1665,14 +1677,19 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
existing->outcoming_bl.clear(); existing->outcoming_bl.clear();
existing->requeue_sent(); existing->requeue_sent();
reply.connect_seq = connect.connect_seq + 1; reply.connect_seq = connect.connect_seq + 1;
if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) {
existing->lock.Unlock();
goto fail; goto fail;
}
// Now existing connection will be alive and the current connection will // Now existing connection will be alive and the current connection will
// exchange socket with existing connection because we want to maintain // exchange socket with existing connection because we want to maintain
// original "connection_state" // original "connection_state"
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
center->create_file_event(sd, EVENT_READABLE, existing->read_handler); existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
center->create_file_event(existing->sd, EVENT_READABLE, read_handler);
swap(existing->sd, sd); swap(existing->sd, sd);
existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
@ -1709,13 +1726,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol, get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol,
session_key, get_features())); session_key, get_features()));
// notify
center->dispatch_event_external(accept_handler);
async_msgr->ms_deliver_handle_fast_accept(this);
// ok!
async_msgr->accept_conn(this);
reply_bl.append((char*)&reply, sizeof(reply)); reply_bl.append((char*)&reply, sizeof(reply));
if (reply.authorizer_len) if (reply.authorizer_len)
@ -1731,10 +1741,29 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
discard_requeued_up_to(0); discard_requeued_up_to(0);
} }
r = _try_send(reply_bl); // if replacing, this con is alreadly accepted.
lock.Unlock();
r = async_msgr->accept_conn(this);
lock.Lock();
if (r < 0) { if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
<< " just fail later one(this)" << dendl;
goto fail; goto fail;
} }
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down, state="
<< get_state_name(state) << dendl;
assert(state == STATE_CLOSED);
goto fail;
}
// notify
center->dispatch_event_external(accept_handler);
async_msgr->ms_deliver_handle_fast_accept(this);
r = _try_send(reply_bl);
if (r < 0)
goto fail;
if (r == 0) { if (r == 0) {
state = next_state; state = next_state;

View File

@ -157,7 +157,6 @@ class AsyncConnection : public Connection {
STATE_CONNECTING_WAIT_ACK_SEQ, STATE_CONNECTING_WAIT_ACK_SEQ,
STATE_CONNECTING_READY, STATE_CONNECTING_READY,
STATE_ACCEPTING, STATE_ACCEPTING,
STATE_ACCEPTING_HANDLE_CONNECT,
STATE_ACCEPTING_WAIT_BANNER_ADDR, STATE_ACCEPTING_WAIT_BANNER_ADDR,
STATE_ACCEPTING_WAIT_CONNECT_MSG, STATE_ACCEPTING_WAIT_CONNECT_MSG,
STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH, STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH,
@ -194,7 +193,6 @@ class AsyncConnection : public Connection {
"STATE_CONNECTING_WAIT_ACK_SEQ", "STATE_CONNECTING_WAIT_ACK_SEQ",
"STATE_CONNECTING_READY", "STATE_CONNECTING_READY",
"STATE_ACCEPTING", "STATE_ACCEPTING",
"STATE_ACCEPTING_HANDLE_CONNECT",
"STATE_ACCEPTING_WAIT_BANNER_ADDR", "STATE_ACCEPTING_WAIT_BANNER_ADDR",
"STATE_ACCEPTING_WAIT_CONNECT_MSG", "STATE_ACCEPTING_WAIT_CONNECT_MSG",
"STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH", "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
@ -202,8 +200,7 @@ class AsyncConnection : public Connection {
"STATE_ACCEPTING_READY", "STATE_ACCEPTING_READY",
"STATE_STANDBY", "STATE_STANDBY",
"STATE_CLOSED", "STATE_CLOSED",
"STATE_WAIT", "STATE_WAIT"};
"STATE_FAULT"};
return statenames[state]; return statenames[state];
} }

View File

@ -383,10 +383,23 @@ public:
return _lookup_conn(k); return _lookup_conn(k);
} }
void accept_conn(AsyncConnectionRef conn) { int accept_conn(AsyncConnectionRef conn) {
Mutex::Locker l(lock); Mutex::Locker l(lock);
if (conns.count(conn->peer_addr)) {
AsyncConnectionRef existing = conns[conn->peer_addr];
// lazy delete, see "deleted_conns"
Mutex::Locker l(deleted_lock);
if (deleted_conns.count(existing)) {
deleted_conns.erase(existing);
existing->put();
} else if (conn != existing) {
return -1;
}
}
conns[conn->peer_addr] = conn; conns[conn->peer_addr] = conn;
accepting_conns.erase(conn); accepting_conns.erase(conn);
return 0;
} }
void learned_addr(const entity_addr_t &peer_addr_for_me); void learned_addr(const entity_addr_t &peer_addr_for_me);