msgr: change the delay queue flushing semantics

Since we're doing fast_dispatch out of the delay queue, we don't want to
flush while holding the pipe lock. Instead, make flush set it up for instant
delivery, and steal the delay queue when replacing pipes. If we're shutting
down a pipe, wait until flushing has completed before doing so.

Signed-off-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Greg Farnum 2014-04-10 16:32:56 -07:00
parent 69fc6b2b66
commit 09bf5e80d5
2 changed files with 37 additions and 18 deletions

View File

@ -191,17 +191,8 @@ void Pipe::DelayedDelivery::flush()
{ {
lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl; lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl;
Mutex::Locker l(delay_lock); Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) { flush_count = delay_queue.size();
Message *m = delay_queue.front().second; delay_cond.Signal();
delay_queue.pop_front();
if (pipe->in_q->can_fast_dispatch(m)) {
delay_lock.Unlock();
pipe->in_q->fast_dispatch(m);
delay_lock.Lock();
} else {
pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
}
}
} }
void *Pipe::DelayedDelivery::entry() void *Pipe::DelayedDelivery::entry()
@ -218,14 +209,19 @@ void *Pipe::DelayedDelivery::entry()
utime_t release = delay_queue.front().first; utime_t release = delay_queue.front().first;
Message *m = delay_queue.front().second; Message *m = delay_queue.front().second;
string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type; string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
if (release > ceph_clock_now(pipe->msgr->cct) && if (!flush_count &&
(delay_msg_type.empty() || m->get_type_name() == delay_msg_type)) { (release > ceph_clock_now(pipe->msgr->cct) &&
(delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl; lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
delay_cond.WaitUntil(delay_lock, release); delay_cond.WaitUntil(delay_lock, release);
continue; continue;
} }
lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl; lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
delay_queue.pop_front(); delay_queue.pop_front();
if (flush_count > 0) {
--flush_count;
active_flush = true;
}
if (pipe->in_q->can_fast_dispatch(m)) { if (pipe->in_q->can_fast_dispatch(m)) {
delay_lock.Unlock(); delay_lock.Unlock();
pipe->in_q->fast_dispatch(m); pipe->in_q->fast_dispatch(m);
@ -233,6 +229,7 @@ void *Pipe::DelayedDelivery::entry()
} else { } else {
pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
} }
active_flush = false;
} }
lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl; lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl;
return NULL; return NULL;
@ -611,7 +608,7 @@ int Pipe::accept()
if (existing->connection_state->clear_pipe(existing)) if (existing->connection_state->clear_pipe(existing))
msgr->dispatch_queue.queue_reset(existing->connection_state.get()); msgr->dispatch_queue.queue_reset(existing->connection_state.get());
} else { } else {
// queue a reset on the old connection // queue a reset on the new connection, which we're dumping for the old
msgr->dispatch_queue.queue_reset(connection_state.get()); msgr->dispatch_queue.queue_reset(connection_state.get());
// drop my Connection, and take a ref to the existing one. do not // drop my Connection, and take a ref to the existing one. do not
@ -622,9 +619,12 @@ int Pipe::accept()
// make existing Connection reference us // make existing Connection reference us
connection_state->reset_pipe(this); connection_state->reset_pipe(this);
// flush/queue any existing delayed messages if (existing->delay_thread) {
if (existing->delay_thread) existing->delay_thread->steal_for_pipe(this);
existing->delay_thread->flush(); delay_thread = existing->delay_thread;
existing->delay_thread = NULL;
delay_thread->flush();
}
// steal incoming queue // steal incoming queue
uint64_t replaced_conn_id = conn_id; uint64_t replaced_conn_id = conn_id;
@ -1754,6 +1754,9 @@ void Pipe::unlock_maybe_reap()
if (!reader_running && !writer_running) { if (!reader_running && !writer_running) {
shutdown_socket(); shutdown_socket();
pipe_lock.Unlock(); pipe_lock.Unlock();
if (delay_thread && delay_thread->is_flushing()) {
delay_thread->wait_for_flush();
}
msgr->queue_reap(this); msgr->queue_reap(this);
} else { } else {
pipe_lock.Unlock(); pipe_lock.Unlock();

View File

@ -75,12 +75,15 @@ class DispatchQueue;
std::deque< pair<utime_t,Message*> > delay_queue; std::deque< pair<utime_t,Message*> > delay_queue;
Mutex delay_lock; Mutex delay_lock;
Cond delay_cond; Cond delay_cond;
int flush_count;
bool active_flush;
bool stop_delayed_delivery; bool stop_delayed_delivery;
public: public:
DelayedDelivery(Pipe *p) DelayedDelivery(Pipe *p)
: pipe(p), : pipe(p),
delay_lock("Pipe::DelayedDelivery::delay_lock"), delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
active_flush(false),
stop_delayed_delivery(false) { } stop_delayed_delivery(false) { }
~DelayedDelivery() { ~DelayedDelivery() {
discard(); discard();
@ -93,12 +96,25 @@ class DispatchQueue;
} }
void discard(); void discard();
void flush(); void flush();
bool is_flushing() {
Mutex::Locker l(delay_lock);
return flush_count > 0 || active_flush;
}
void wait_for_flush() {
Mutex::Locker l(delay_lock);
while (flush_count > 0 || active_flush)
delay_cond.Wait(delay_lock);
}
void stop() { void stop() {
delay_lock.Lock(); delay_lock.Lock();
stop_delayed_delivery = true; stop_delayed_delivery = true;
delay_cond.Signal(); delay_cond.Signal();
delay_lock.Unlock(); delay_lock.Unlock();
} }
void steal_for_pipe(Pipe *new_owner) {
Mutex::Locker l(delay_lock);
pipe = new_owner;
}
} *delay_thread; } *delay_thread;
friend class DelayedDelivery; friend class DelayedDelivery;