From 50771dd7e6a1830cd12c0c45db5b28c6b9216436 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 13 Jan 2015 11:54:54 +0800 Subject: [PATCH] AsyncConnection: Enhance replace process Make handle_connect_msg follow lock rule: unlock any lock before acquire messenger's lock. Otherwise, deadlock will happen. Enhance lock condition check because connection's state maybe change while unlock itself and lock again. Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 73 ++++++++++++++++++++++---------- src/msg/async/AsyncConnection.h | 5 +-- src/msg/async/AsyncMessenger.h | 15 ++++++- 3 files changed, 66 insertions(+), 27 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 5673c179e16..b588c4fe6e8 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1469,7 +1469,7 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl, bufferlist &authorizer_reply) { - int r; + int r = 0; ceph_msg_connect_reply reply; bufferlist reply_bl; uint64_t existing_seq = -1; @@ -1521,7 +1521,16 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a ldout(async_msgr->cct, 10) << __func__ << " accept: setting up session_security." << dendl; // existing? + lock.Unlock(); AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr); + lock.Lock(); + if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { + ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down, state=" + << get_state_name(state) << dendl; + assert(state == STATE_CLOSED); + goto fail; + } + if (existing) { if (connect.global_seq < existing->peer_global_seq) { ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing @@ -1633,18 +1642,21 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a } ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl; - // In order to avoid dead lock, here need to lock in ordering. - // It may be another thread access this connection between unlock and lock - // call, this is rely to EventCenter to guarantee only one thread can access - // one connection. - lock.Unlock(); - if (existing->sd > sd) { - existing->lock.Lock(); - lock.Lock(); - } else { - lock.Lock(); - existing->lock.Lock(); + // There is no possible that existing connection will acquire this lock + existing->lock.Lock(); + + if (existing->replacing || existing->state == STATE_CLOSED) { + ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing." + << " state=" << get_state_name(existing->state) << dendl; + reply.connect_seq = existing->connect_seq + 1; + r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); + existing->lock.Unlock(); + if (r < 0) { + goto fail; + } + return r; } + // Here we use "_stop" instead of "mark_down" because "mark_down" is a async // operation, but now we need ensure all variables in `existing` is cleaned up // and we will reuse it next. @@ -1665,14 +1677,19 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a existing->outcoming_bl.clear(); existing->requeue_sent(); reply.connect_seq = connect.connect_seq + 1; - if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) + if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) { + existing->lock.Unlock(); goto fail; + } // Now existing connection will be alive and the current connection will // exchange socket with existing connection because we want to maintain // original "connection_state" center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - center->create_file_event(sd, EVENT_READABLE, existing->read_handler); + existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler); + + existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE); + center->create_file_event(existing->sd, EVENT_READABLE, read_handler); swap(existing->sd, sd); existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; @@ -1709,13 +1726,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol, session_key, get_features())); - // notify - center->dispatch_event_external(accept_handler); - async_msgr->ms_deliver_handle_fast_accept(this); - - // ok! - async_msgr->accept_conn(this); - reply_bl.append((char*)&reply, sizeof(reply)); if (reply.authorizer_len) @@ -1731,10 +1741,29 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a discard_requeued_up_to(0); } - r = _try_send(reply_bl); + // if replacing, this con is alreadly accepted. + lock.Unlock(); + r = async_msgr->accept_conn(this); + lock.Lock(); if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr + << " just fail later one(this)" << dendl; goto fail; } + if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { + ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down, state=" + << get_state_name(state) << dendl; + assert(state == STATE_CLOSED); + goto fail; + } + + // notify + center->dispatch_event_external(accept_handler); + async_msgr->ms_deliver_handle_fast_accept(this); + + r = _try_send(reply_bl); + if (r < 0) + goto fail; if (r == 0) { state = next_state; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index d6c1771495d..515b1d700d2 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -157,7 +157,6 @@ class AsyncConnection : public Connection { STATE_CONNECTING_WAIT_ACK_SEQ, STATE_CONNECTING_READY, STATE_ACCEPTING, - STATE_ACCEPTING_HANDLE_CONNECT, STATE_ACCEPTING_WAIT_BANNER_ADDR, STATE_ACCEPTING_WAIT_CONNECT_MSG, STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH, @@ -194,7 +193,6 @@ class AsyncConnection : public Connection { "STATE_CONNECTING_WAIT_ACK_SEQ", "STATE_CONNECTING_READY", "STATE_ACCEPTING", - "STATE_ACCEPTING_HANDLE_CONNECT", "STATE_ACCEPTING_WAIT_BANNER_ADDR", "STATE_ACCEPTING_WAIT_CONNECT_MSG", "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH", @@ -202,8 +200,7 @@ class AsyncConnection : public Connection { "STATE_ACCEPTING_READY", "STATE_STANDBY", "STATE_CLOSED", - "STATE_WAIT", - "STATE_FAULT"}; + "STATE_WAIT"}; return statenames[state]; } diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 972934ad8ef..0a6e9089be2 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -383,10 +383,23 @@ public: return _lookup_conn(k); } - void accept_conn(AsyncConnectionRef conn) { + int accept_conn(AsyncConnectionRef conn) { Mutex::Locker l(lock); + if (conns.count(conn->peer_addr)) { + AsyncConnectionRef existing = conns[conn->peer_addr]; + + // lazy delete, see "deleted_conns" + Mutex::Locker l(deleted_lock); + if (deleted_conns.count(existing)) { + deleted_conns.erase(existing); + existing->put(); + } else if (conn != existing) { + return -1; + } + } conns[conn->peer_addr] = conn; accepting_conns.erase(conn); + return 0; } void learned_addr(const entity_addr_t &peer_addr_for_me);