mirror of
https://github.com/ceph/ceph
synced 2025-04-11 04:02:04 +00:00
msg/DispatchQueue: do not discard queued events on stop
When the shutdown/stop flag is set, continue to work through the queue. Process events, but discard messages. This avoids the loss of reset events on shutdown that are necessary to clean up ref cycles. Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
parent
de64bc50f2
commit
ea6880f8a2
@ -78,8 +78,8 @@ void DispatchQueue::local_delivery(Message *m, int priority)
|
||||
void DispatchQueue::entry()
|
||||
{
|
||||
lock.Lock();
|
||||
while (!stop) {
|
||||
while (!mqueue.empty() && !stop) {
|
||||
while (true) {
|
||||
while (!mqueue.empty()) {
|
||||
QueueItem qitem = mqueue.dequeue();
|
||||
if (!qitem.is_code())
|
||||
remove_arrival(qitem.get_message());
|
||||
@ -104,29 +104,37 @@ void DispatchQueue::entry()
|
||||
}
|
||||
} else {
|
||||
Message *m = qitem.get_message();
|
||||
uint64_t msize = m->get_dispatch_throttle_size();
|
||||
m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
|
||||
if (stop) {
|
||||
ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
|
||||
m->put();
|
||||
} else {
|
||||
uint64_t msize = m->get_dispatch_throttle_size();
|
||||
m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
|
||||
|
||||
ldout(cct,1) << "<== " << m->get_source_inst()
|
||||
<< " " << m->get_seq()
|
||||
<< " ==== " << *m
|
||||
<< " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
|
||||
<< "+" << m->get_data().length()
|
||||
<< " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
|
||||
<< " " << m->get_footer().data_crc << ")"
|
||||
<< " " << m << " con " << m->get_connection()
|
||||
<< dendl;
|
||||
msgr->ms_deliver_dispatch(m);
|
||||
ldout(cct,1) << "<== " << m->get_source_inst()
|
||||
<< " " << m->get_seq()
|
||||
<< " ==== " << *m
|
||||
<< " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
|
||||
<< "+" << m->get_data().length()
|
||||
<< " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
|
||||
<< " " << m->get_footer().data_crc << ")"
|
||||
<< " " << m << " con " << m->get_connection()
|
||||
<< dendl;
|
||||
msgr->ms_deliver_dispatch(m);
|
||||
|
||||
msgr->dispatch_throttle_release(msize);
|
||||
msgr->dispatch_throttle_release(msize);
|
||||
|
||||
ldout(cct,20) << "done calling dispatch on " << m << dendl;
|
||||
ldout(cct,20) << "done calling dispatch on " << m << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
lock.Lock();
|
||||
}
|
||||
if (!stop)
|
||||
cond.Wait(lock); //wait for something to be put on queue
|
||||
if (stop)
|
||||
break;
|
||||
|
||||
// wait for something to be put on queue
|
||||
cond.Wait(lock);
|
||||
}
|
||||
lock.Unlock();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user