mirror of
https://github.com/ceph/ceph
synced 2025-02-21 01:47:25 +00:00
msg/Pipe: hold pipe_lock during important parts of accept()
Previously we did not bother with locking for accept() because we were not visible to any other threads. However, we need to close accepting Pipes from mark_down_all(), which means we need to handle interference. Fix up the locking so that we hold pipe_lock when looking at Pipe state and verify that we are still in the ACCEPTING state any time we retake the lock. Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
parent
687fe888b3
commit
ecab4bb951
@ -210,12 +210,11 @@ void *Pipe::DelayedDelivery::entry()
|
||||
int Pipe::accept()
|
||||
{
|
||||
ldout(msgr->cct,10) << "accept" << dendl;
|
||||
|
||||
set_socket_options();
|
||||
|
||||
// my creater gave me sd via accept()
|
||||
assert(pipe_lock.is_locked());
|
||||
assert(state == STATE_ACCEPTING);
|
||||
|
||||
|
||||
pipe_lock.Unlock();
|
||||
|
||||
// vars
|
||||
bufferlist addrs;
|
||||
entity_addr_t socket_addr;
|
||||
@ -241,6 +240,8 @@ int Pipe::accept()
|
||||
// used for reading in the remote acked seq on connect
|
||||
uint64_t newly_acked_seq = 0;
|
||||
|
||||
set_socket_options();
|
||||
|
||||
// announce myself.
|
||||
r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
|
||||
if (r < 0) {
|
||||
@ -311,7 +312,6 @@ int Pipe::accept()
|
||||
goto fail_unlocked;
|
||||
}
|
||||
|
||||
|
||||
authorizer.clear();
|
||||
if (connect.authorizer_len) {
|
||||
bp = buffer::create(connect.authorizer_len);
|
||||
@ -328,8 +328,12 @@ int Pipe::accept()
|
||||
<< dendl;
|
||||
|
||||
msgr->lock.Lock(); // FIXME
|
||||
pipe_lock.Lock();
|
||||
if (msgr->dispatch_queue.stop)
|
||||
goto shutting_down;
|
||||
if (state != STATE_ACCEPTING) {
|
||||
goto shutting_down;
|
||||
}
|
||||
|
||||
// note peer's type, flags
|
||||
set_peer_type(connect.host_type);
|
||||
@ -391,15 +395,18 @@ int Pipe::accept()
|
||||
|
||||
ldout(msgr->cct,10) << "accept: setting up session_security." << dendl;
|
||||
|
||||
pipe_lock.Unlock();
|
||||
msgr->lock.Lock();
|
||||
pipe_lock.Lock();
|
||||
if (msgr->dispatch_queue.stop)
|
||||
goto shutting_down;
|
||||
|
||||
if (state != STATE_ACCEPTING)
|
||||
goto shutting_down;
|
||||
|
||||
// existing?
|
||||
existing = msgr->_lookup_pipe(peer_addr);
|
||||
if (existing) {
|
||||
existing->pipe_lock.Lock();
|
||||
existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here)
|
||||
|
||||
if (connect.global_seq < existing->peer_global_seq) {
|
||||
ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
|
||||
@ -523,6 +530,8 @@ int Pipe::accept()
|
||||
assert(0);
|
||||
|
||||
retry_session:
|
||||
assert(existing->pipe_lock.is_locked());
|
||||
assert(pipe_lock.is_locked());
|
||||
reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
|
||||
reply.connect_seq = existing->connect_seq + 1;
|
||||
existing->pipe_lock.Unlock();
|
||||
@ -530,8 +539,10 @@ int Pipe::accept()
|
||||
goto reply;
|
||||
|
||||
reply:
|
||||
assert(pipe_lock.is_locked());
|
||||
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
|
||||
reply.authorizer_len = authorizer_reply.length();
|
||||
pipe_lock.Unlock();
|
||||
r = tcp_write((char*)&reply, sizeof(reply));
|
||||
if (r < 0)
|
||||
goto fail_unlocked;
|
||||
@ -543,6 +554,8 @@ int Pipe::accept()
|
||||
}
|
||||
|
||||
replace:
|
||||
assert(existing->pipe_lock.is_locked());
|
||||
assert(pipe_lock.is_locked());
|
||||
if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) {
|
||||
reply_tag = CEPH_MSGR_TAG_SEQ;
|
||||
existing_seq = existing->in_seq;
|
||||
@ -593,6 +606,7 @@ int Pipe::accept()
|
||||
|
||||
open:
|
||||
// open
|
||||
assert(pipe_lock.is_locked());
|
||||
connect_seq = connect.connect_seq + 1;
|
||||
peer_global_seq = connect.global_seq;
|
||||
assert(state == STATE_ACCEPTING);
|
||||
@ -626,6 +640,7 @@ int Pipe::accept()
|
||||
assert(removed == 1);
|
||||
register_pipe();
|
||||
msgr->lock.Unlock();
|
||||
pipe_lock.Unlock();
|
||||
|
||||
r = tcp_write((char*)&reply, sizeof(reply));
|
||||
if (r < 0) {
|
||||
@ -657,7 +672,6 @@ int Pipe::accept()
|
||||
start_writer();
|
||||
}
|
||||
ldout(msgr->cct,20) << "accept done" << dendl;
|
||||
pipe_lock.Unlock();
|
||||
|
||||
maybe_start_delay_thread();
|
||||
|
||||
@ -690,10 +704,10 @@ int Pipe::accept()
|
||||
if (queued || replaced)
|
||||
start_writer();
|
||||
}
|
||||
pipe_lock.Unlock();
|
||||
return -1;
|
||||
|
||||
shutting_down:
|
||||
assert(pipe_lock.is_locked());
|
||||
msgr->lock.Unlock();
|
||||
|
||||
if (msgr->cct->_conf->ms_inject_internal_delays) {
|
||||
@ -703,11 +717,9 @@ int Pipe::accept()
|
||||
t.sleep();
|
||||
}
|
||||
|
||||
pipe_lock.Lock();
|
||||
state = STATE_CLOSED;
|
||||
state_closed.set(1);
|
||||
fault();
|
||||
pipe_lock.Unlock();
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -1310,11 +1322,13 @@ void Pipe::stop()
|
||||
*/
|
||||
void Pipe::reader()
|
||||
{
|
||||
if (state == STATE_ACCEPTING)
|
||||
accept();
|
||||
|
||||
pipe_lock.Lock();
|
||||
|
||||
if (state == STATE_ACCEPTING) {
|
||||
accept();
|
||||
assert(pipe_lock.is_locked());
|
||||
}
|
||||
|
||||
// loop.
|
||||
while (state != STATE_CLOSED &&
|
||||
state != STATE_CONNECTING) {
|
||||
|
Loading…
Reference in New Issue
Block a user