mirror of
https://github.com/ceph/ceph
synced 2025-04-01 23:02:17 +00:00
msg/Pipe: flush delayed messages when stealing/failing pipes
If we are failing a pipe, flush the incoming messages before we try to reconnect. Similarly, flush queued messages on an existing pipe beore we replace it. This ensures that when we get a socket failure and reconnect the delayed messages are handled in the normal fashion. Specifically, it fixes a situation like: - read msg, update in_seq etc. - delay msg - pipe faults - peer reconnects, we replace existing pipe, discard delayed msgs - peer resends msgs - we discard, because they are < in_seq Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
parent
6d65fa4ef9
commit
8dcc6c399c
@ -160,6 +160,7 @@ void Pipe::join_reader()
|
||||
|
||||
void Pipe::DelayedDelivery::discard()
|
||||
{
|
||||
lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::discard" << dendl;
|
||||
Mutex::Locker l(delay_lock);
|
||||
while (!delay_queue.empty()) {
|
||||
Message *m = delay_queue.front().second;
|
||||
@ -169,6 +170,17 @@ void Pipe::DelayedDelivery::discard()
|
||||
}
|
||||
}
|
||||
|
||||
void Pipe::DelayedDelivery::flush()
|
||||
{
|
||||
lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl;
|
||||
Mutex::Locker l(delay_lock);
|
||||
while (!delay_queue.empty()) {
|
||||
Message *m = delay_queue.front().second;
|
||||
delay_queue.pop_front();
|
||||
pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
|
||||
}
|
||||
}
|
||||
|
||||
void *Pipe::DelayedDelivery::entry()
|
||||
{
|
||||
Mutex::Locker locker(delay_lock);
|
||||
@ -543,7 +555,11 @@ int Pipe::accept()
|
||||
|
||||
// make existing Connection reference us
|
||||
existing->connection_state->reset_pipe(this);
|
||||
|
||||
|
||||
// flush/queue any existing delayed messages
|
||||
if (existing->delay_thread)
|
||||
existing->delay_thread->flush();
|
||||
|
||||
// steal incoming queue
|
||||
uint64_t replaced_conn_id = conn_id;
|
||||
conn_id = existing->conn_id;
|
||||
@ -1113,6 +1129,10 @@ void Pipe::fault(bool onread)
|
||||
return;
|
||||
}
|
||||
|
||||
// queue delayed items immediately
|
||||
if (delay_thread)
|
||||
delay_thread->flush();
|
||||
|
||||
// requeue sent items
|
||||
requeue_sent();
|
||||
|
||||
@ -1120,7 +1140,7 @@ void Pipe::fault(bool onread)
|
||||
ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
|
||||
state = STATE_STANDBY;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (state != STATE_CONNECTING) {
|
||||
if (policy.server) {
|
||||
|
@ -90,6 +90,7 @@ class DispatchQueue;
|
||||
delay_cond.Signal();
|
||||
}
|
||||
void discard();
|
||||
void flush();
|
||||
void stop() {
|
||||
delay_lock.Lock();
|
||||
stop_delayed_delivery = true;
|
||||
|
Loading…
Reference in New Issue
Block a user