From 09bf5e80d5b357dc11b37a498ecd31dce97e10e9 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Thu, 10 Apr 2014 16:32:56 -0700 Subject: [PATCH] msgr: change the delay queue flushing semantics Since we're doing fast_dispatch out of the delay queue, we don't want to flush while holding the pipe lock. Instead, make flush set it up for instant delivery, and steal the delay queue when replacing pipes. If we're shutting down a pipe, wait until flushing has completed before doing so. Signed-off-by: Greg Farnum --- src/msg/Pipe.cc | 37 ++++++++++++++++++++----------------- src/msg/Pipe.h | 18 +++++++++++++++++- 2 files changed, 37 insertions(+), 18 deletions(-) diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index fd2fbcef70e..eb748ea3c7f 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -191,17 +191,8 @@ void Pipe::DelayedDelivery::flush() { lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl; Mutex::Locker l(delay_lock); - while (!delay_queue.empty()) { - Message *m = delay_queue.front().second; - delay_queue.pop_front(); - if (pipe->in_q->can_fast_dispatch(m)) { - delay_lock.Unlock(); - pipe->in_q->fast_dispatch(m); - delay_lock.Lock(); - } else { - pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); - } - } + flush_count = delay_queue.size(); + delay_cond.Signal(); } void *Pipe::DelayedDelivery::entry() @@ -218,14 +209,19 @@ void *Pipe::DelayedDelivery::entry() utime_t release = delay_queue.front().first; Message *m = delay_queue.front().second; string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type; - if (release > ceph_clock_now(pipe->msgr->cct) && - (delay_msg_type.empty() || m->get_type_name() == delay_msg_type)) { + if (!flush_count && + (release > ceph_clock_now(pipe->msgr->cct) && + (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) { lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl; delay_cond.WaitUntil(delay_lock, release); continue; } lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl; delay_queue.pop_front(); + if (flush_count > 0) { + --flush_count; + active_flush = true; + } if (pipe->in_q->can_fast_dispatch(m)) { delay_lock.Unlock(); pipe->in_q->fast_dispatch(m); @@ -233,6 +229,7 @@ void *Pipe::DelayedDelivery::entry() } else { pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); } + active_flush = false; } lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl; return NULL; @@ -611,7 +608,7 @@ int Pipe::accept() if (existing->connection_state->clear_pipe(existing)) msgr->dispatch_queue.queue_reset(existing->connection_state.get()); } else { - // queue a reset on the old connection + // queue a reset on the new connection, which we're dumping for the old msgr->dispatch_queue.queue_reset(connection_state.get()); // drop my Connection, and take a ref to the existing one. do not @@ -622,9 +619,12 @@ int Pipe::accept() // make existing Connection reference us connection_state->reset_pipe(this); - // flush/queue any existing delayed messages - if (existing->delay_thread) - existing->delay_thread->flush(); + if (existing->delay_thread) { + existing->delay_thread->steal_for_pipe(this); + delay_thread = existing->delay_thread; + existing->delay_thread = NULL; + delay_thread->flush(); + } // steal incoming queue uint64_t replaced_conn_id = conn_id; @@ -1754,6 +1754,9 @@ void Pipe::unlock_maybe_reap() if (!reader_running && !writer_running) { shutdown_socket(); pipe_lock.Unlock(); + if (delay_thread && delay_thread->is_flushing()) { + delay_thread->wait_for_flush(); + } msgr->queue_reap(this); } else { pipe_lock.Unlock(); diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index 468a6a52f2c..0bd7febae3b 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -75,12 +75,15 @@ class DispatchQueue; std::deque< pair > delay_queue; Mutex delay_lock; Cond delay_cond; + int flush_count; + bool active_flush; bool stop_delayed_delivery; public: DelayedDelivery(Pipe *p) : pipe(p), - delay_lock("Pipe::DelayedDelivery::delay_lock"), + delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0), + active_flush(false), stop_delayed_delivery(false) { } ~DelayedDelivery() { discard(); @@ -93,12 +96,25 @@ class DispatchQueue; } void discard(); void flush(); + bool is_flushing() { + Mutex::Locker l(delay_lock); + return flush_count > 0 || active_flush; + } + void wait_for_flush() { + Mutex::Locker l(delay_lock); + while (flush_count > 0 || active_flush) + delay_cond.Wait(delay_lock); + } void stop() { delay_lock.Lock(); stop_delayed_delivery = true; delay_cond.Signal(); delay_lock.Unlock(); } + void steal_for_pipe(Pipe *new_owner) { + Mutex::Locker l(delay_lock); + pipe = new_owner; + } } *delay_thread; friend class DelayedDelivery;