msg/DispatchQueue: fix locking in dispatch thread

The locking was awkward with locally delivered messages.. we dropped dq
lock, inq lock, re-took dq lock, etc.   We would also take + drop + retake
+ drop the dq lock when queuing events.  Blech!

Instead:

 * simplify the queueing of cons for the local_queue
 * dequeue the con under the original dq lock
 * queue events under a single dq lock interval, by telling
   local_queue.queue() we already have it.

Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
Sage Weil 2012-07-17 15:27:27 -07:00
parent 9d94ed1caa
commit 472d14f717
2 changed files with 68 additions and 68 deletions

View File

@ -28,18 +28,26 @@
#undef dout_prefix
#define dout_prefix *_dout << "incomingqueue(" << this << " " << parent << ")."
void IncomingQueue::queue(Message *m, int priority)
void IncomingQueue::queue(Message *m, int priority, bool hold_dq_lock)
{
Mutex::Locker l(lock);
ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
if (in_q.count(priority) == 0) {
// queue inq AND message under inq AND dispatch_queue locks.
lock.Unlock();
dq->lock.Lock();
lock.Lock();
if (!hold_dq_lock) {
lock.Unlock();
dq->lock.Lock();
lock.Lock();
} else {
assert(dq->lock.is_locked());
}
if (halt) {
dq->lock.Unlock();
if (!hold_dq_lock) {
dq->lock.Unlock();
} else {
assert(dq->lock.is_locked());
}
goto halt;
}
@ -65,7 +73,11 @@ void IncomingQueue::queue(Message *m, int priority)
queue.push_back(m);
dq->lock.Unlock();
if (!hold_dq_lock) {
dq->lock.Unlock();
} else {
assert(dq->lock.is_locked());
}
} else {
ldout(cct,20) << "queue " << m << " under existing queue" << dendl;
// just queue message under our lock.
@ -139,6 +151,7 @@ void IncomingQueue::restart_queue()
}
/*******************
* DispatchQueue
*/
@ -149,8 +162,7 @@ void IncomingQueue::restart_queue()
void DispatchQueue::local_delivery(Message *m, int priority)
{
if ((unsigned long)m > 10)
m->set_connection(msgr->local_connection->get());
m->set_connection(msgr->local_connection->get());
local_queue.queue(m, priority);
}
@ -197,65 +209,55 @@ void DispatchQueue::entry()
<< ", moved to end of list" << dendl;
qlist->push_back(inq->queue_items[priority]); // move to end of list
}
lock.Unlock(); //done with the pipe queue for a while
Connection *con = NULL;
if ((long)m < DispatchQueue::D_NUM_CODES) {
assert(inq == &local_queue);
con = con_q.front();
con_q.pop_front();
}
lock.Unlock();
inq->in_qlen--;
qlen.dec();
inq->lock.Unlock(); // done with the pipe's message queue now
inq->lock.Unlock();
if (dequeued)
inq->put();
{
if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
lock.Lock();
Connection *con = remote_reset_q.front();
remote_reset_q.pop_front();
lock.Unlock();
msgr->ms_deliver_handle_remote_reset(con);
con->put();
} else if ((long)m == DispatchQueue::D_CONNECT) {
lock.Lock();
Connection *con = connect_q.front();
connect_q.pop_front();
lock.Unlock();
msgr->ms_deliver_handle_connect(con);
con->put();
} else if ((long)m == DispatchQueue::D_ACCEPT) {
lock.Lock();
Connection *con = accept_q.front();
accept_q.pop_front();
lock.Unlock();
msgr->ms_deliver_handle_accept(con);
con->put();
} else if ((long)m == DispatchQueue::D_BAD_RESET) {
lock.Lock();
Connection *con = reset_q.front();
reset_q.pop_front();
lock.Unlock();
msgr->ms_deliver_handle_reset(con);
con->put();
} else {
uint64_t msize = m->get_dispatch_throttle_size();
m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
msgr->ms_deliver_handle_remote_reset(con);
con->put();
} else if ((long)m == DispatchQueue::D_CONNECT) {
msgr->ms_deliver_handle_connect(con);
con->put();
} else if ((long)m == DispatchQueue::D_ACCEPT) {
msgr->ms_deliver_handle_accept(con);
con->put();
} else if ((long)m == DispatchQueue::D_BAD_RESET) {
msgr->ms_deliver_handle_reset(con);
con->put();
} else {
uint64_t msize = m->get_dispatch_throttle_size();
m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
ldout(cct,1) << "<== " << m->get_source_inst()
<< " " << m->get_seq()
<< " ==== " << *m
<< " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
<< "+" << m->get_data().length()
<< " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
<< " " << m->get_footer().data_crc << ")"
<< " " << m << " con " << m->get_connection()
<< dendl;
msgr->ms_deliver_dispatch(m);
ldout(cct,1) << "<== " << m->get_source_inst()
<< " " << m->get_seq()
<< " ==== " << *m
<< " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
<< "+" << m->get_data().length()
<< " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc
<< " " << m->get_footer().data_crc << ")"
<< " " << m << " con " << m->get_connection()
<< dendl;
msgr->ms_deliver_dispatch(m);
msgr->dispatch_throttle_release(msize);
msgr->dispatch_throttle_release(msize);
ldout(cct,20) << "done calling dispatch on " << m << dendl;
}
ldout(cct,20) << "done calling dispatch on " << m << dendl;
}
lock.Lock();
}
if (!stop)

View File

@ -41,7 +41,7 @@ struct IncomingQueue : public RefCountedObject {
map<int, xlist<IncomingQueue *>::item* > queue_items; // protected by pipe_lock AND q.lock
bool halt;
void queue(Message *m, int priority);
void queue(Message *m, int priority, bool hold_dq_lock=false);
void discard_queue();
void restart_queue();
@ -86,9 +86,7 @@ struct DispatchQueue {
atomic_t qlen;
enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
list<Connection*> connect_q, accept_q;
list<Connection*> remote_reset_q;
list<Connection*> reset_q;
list<Connection*> con_q;
IncomingQueue local_queue;
@ -121,9 +119,9 @@ struct DispatchQueue {
lock.Unlock();
return;
}
connect_q.push_back(con->get());
con_q.push_back(con->get());
local_queue.queue((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST, true);
lock.Unlock();
local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST);
}
void queue_accept(Connection *con) {
lock.Lock();
@ -131,9 +129,9 @@ struct DispatchQueue {
lock.Unlock();
return;
}
accept_q.push_back(con->get());
con_q.push_back(con->get());
local_queue.queue((Message*)D_ACCEPT, CEPH_MSG_PRIO_HIGHEST, true);
lock.Unlock();
local_delivery((Message*)D_ACCEPT, CEPH_MSG_PRIO_HIGHEST);
}
void queue_remote_reset(Connection *con) {
lock.Lock();
@ -141,9 +139,9 @@ struct DispatchQueue {
lock.Unlock();
return;
}
remote_reset_q.push_back(con->get());
con_q.push_back(con->get());
local_queue.queue((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST, true);
lock.Unlock();
local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST);
}
void queue_reset(Connection *con) {
lock.Lock();
@ -151,9 +149,9 @@ struct DispatchQueue {
lock.Unlock();
return;
}
reset_q.push_back(con->get());
con_q.push_back(con->get());
local_queue.queue((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST, true);
lock.Unlock();
local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
}
void start();