msgr: add a delay_until queue that is used to delay deliveries.

Its life-cycle matches that of delay_queue, and the delayed_delivery
function respects it. For now queue_received is just setting it to
delay everything by 1 second.

Signed-off-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Greg Farnum 2012-11-27 10:05:47 -08:00 committed by Sage Weil
parent 01059e9b43
commit b97aaca387
2 changed files with 23 additions and 5 deletions

View File

@ -54,7 +54,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
: reader_thread(this), writer_thread(this),
dispatch_thread(NULL), delay_queue(NULL),
dispatch_thread(NULL), delay_queue(NULL), delay_until(NULL),
delay_lock(NULL), delay_cond(NULL), stop_delayed_delivery(true),
msgr(r),
conn_id(r->dispatch_queue.get_id()),
@ -100,6 +100,8 @@ Pipe::~Pipe()
delete dispatch_thread;
assert(delay_queue->empty());
delete delay_queue;
assert(delay_until->empty());
delete delay_until;
assert(!delay_lock->is_locked());
delete delay_lock;
delete delay_cond;
@ -141,6 +143,7 @@ void Pipe::start_reader()
lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
dispatch_thread = new DelayedDelivery(this);
delay_queue = new std::deque< Message * >();
delay_until = new std::deque< utime_t>();
delay_lock = new Mutex("delay_lock");
delay_cond = new Cond();
} else
@ -184,6 +187,9 @@ void Pipe::queue_received(Message *m, int priority)
lsubdout(msgr->cct, ms, 1) << "queuing message " << m << " for delayed delivery" << dendl;
Mutex::Locker locker(*delay_lock);
delay_queue->push_back(m);
utime_t delay = ceph_clock_now(msgr->cct);
delay += 1.0;
delay_until->push_back(delay);
delay_cond->Signal();
return;
}
@ -192,13 +198,23 @@ void Pipe::queue_received(Message *m, int priority)
void Pipe::delayed_delivery() {
Mutex::Locker locker(*delay_lock);
if (delay_queue->empty())
lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl;
delay_cond->Wait(*delay_lock);
while (!stop_delayed_delivery) {
if (delay_queue->empty()) {
lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl;
delay_cond->Wait(*delay_lock);
continue;
}
if (delay_until->front() > ceph_clock_now(msgr->cct)) {
lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond until message " << delay_queue->front()
<< " delay passes" << dendl;
delay_cond->WaitUntil(*delay_lock, delay_until->front());
continue;
}
Message *m = delay_queue->front();
lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delayed delivery" << dendl;
lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delivery because delay until "
<< delay_until->front() << " has passed" << dendl;
delay_queue->pop_front();
delay_until->pop_front();
in_q->enqueue(m, m->get_priority(), conn_id);
if (delay_queue->empty()) {
lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond" << dendl;
@ -1200,6 +1216,7 @@ void Pipe::stop()
while (!delay_queue->empty()) {
delay_queue->front()->put();
delay_queue->pop_front();
delay_until->pop_front();
}
delay_cond->Signal();
}

View File

@ -79,6 +79,7 @@ class DispatchQueue;
DelayedDelivery *dispatch_thread;
// TODO: clean up the delay_queue better on shutdown
std::deque< Message * > *delay_queue;
std::deque< utime_t > *delay_until;
Mutex *delay_lock;
Cond *delay_cond;
bool stop_delayed_delivery;