AsyncConnection: Lock existing's lock in advance avoid existing's state changed

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
This commit is contained in:
Haomai Wang 2015-03-13 12:21:32 +08:00
parent 18c35878b3
commit f7a1fdb563

View File

@ -272,6 +272,7 @@ int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
// else return < 0 means error
int AsyncConnection::_try_send(bufferlist send_bl, bool send)
{
assert(lock.is_locked());
if (send_bl.length()) {
if (outcoming_bl.length())
outcoming_bl.claim_append(send_bl);
@ -1587,11 +1588,27 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
if (existing == this)
existing = NULL;
if (existing) {
// There is no possible that existing connection will acquire this
// connection's lock
existing->lock.Lock();
if (existing->replacing || existing->state == STATE_CLOSED) {
ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
<< " state=" << get_state_name(existing->state) << dendl;
reply.global_seq = existing->peer_global_seq;
r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
existing->lock.Unlock();
if (r < 0)
goto fail;
return 0;
}
if (connect.global_seq < existing->peer_global_seq) {
ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
<< ".gseq " << existing->peer_global_seq << " > "
<< connect.global_seq << ", RETRY_GLOBAL" << dendl;
reply.global_seq = existing->peer_global_seq; // so we can send it below..
existing->lock.Unlock();
return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
} else {
ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
@ -1625,6 +1642,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
<< existing->connect_seq << " > " << connect.connect_seq
<< ", RETRY_SESSION" << dendl;
reply.connect_seq = existing->connect_seq + 1;
existing->lock.Unlock();
return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
}
@ -1639,6 +1657,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
<< ".cseq " << existing->connect_seq << " == "
<< connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
reply.connect_seq = existing->connect_seq + 1;
existing->lock.Unlock();
return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
}
@ -1657,6 +1676,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
assert(peer_addr > async_msgr->get_myaddr());
// make sure our outgoing connection will follow through
existing->send_keepalive();
existing->lock.Unlock();
return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply);
}
}
@ -1668,6 +1688,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
<< connect.connect_seq << ", " << existing << ".cseq = "
<< existing->connect_seq << "), sending RESETSESSION" << dendl;
existing->lock.Unlock();
return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
}
@ -1700,20 +1721,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
t.sleep();
}
// There is no possible that existing connection will acquire this lock
existing->lock.Lock();
if (existing->replacing || existing->state == STATE_CLOSED) {
ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
<< " state=" << get_state_name(existing->state) << dendl;
reply.global_seq = existing->peer_global_seq;
r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
existing->lock.Unlock();
if (r < 0)
goto fail;
return 0;
}
if (existing->policy.lossy) {
// disconnect from the Connection
existing->center->dispatch_event_external(existing->reset_handler);