diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index f0e8ef0be07..437f767429e 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -28,18 +28,26 @@ #undef dout_prefix #define dout_prefix *_dout << "incomingqueue(" << this << " " << parent << ")." -void IncomingQueue::queue(Message *m, int priority) +void IncomingQueue::queue(Message *m, int priority, bool hold_dq_lock) { Mutex::Locker l(lock); ldout(cct,20) << "queue " << m << " prio " << priority << dendl; if (in_q.count(priority) == 0) { // queue inq AND message under inq AND dispatch_queue locks. - lock.Unlock(); - dq->lock.Lock(); - lock.Lock(); + if (!hold_dq_lock) { + lock.Unlock(); + dq->lock.Lock(); + lock.Lock(); + } else { + assert(dq->lock.is_locked()); + } if (halt) { - dq->lock.Unlock(); + if (!hold_dq_lock) { + dq->lock.Unlock(); + } else { + assert(dq->lock.is_locked()); + } goto halt; } @@ -65,7 +73,11 @@ void IncomingQueue::queue(Message *m, int priority) queue.push_back(m); - dq->lock.Unlock(); + if (!hold_dq_lock) { + dq->lock.Unlock(); + } else { + assert(dq->lock.is_locked()); + } } else { ldout(cct,20) << "queue " << m << " under existing queue" << dendl; // just queue message under our lock. @@ -139,6 +151,7 @@ void IncomingQueue::restart_queue() } + /******************* * DispatchQueue */ @@ -149,8 +162,7 @@ void IncomingQueue::restart_queue() void DispatchQueue::local_delivery(Message *m, int priority) { - if ((unsigned long)m > 10) - m->set_connection(msgr->local_connection->get()); + m->set_connection(msgr->local_connection->get()); local_queue.queue(m, priority); } @@ -197,65 +209,55 @@ void DispatchQueue::entry() << ", moved to end of list" << dendl; qlist->push_back(inq->queue_items[priority]); // move to end of list } - lock.Unlock(); //done with the pipe queue for a while + + Connection *con = NULL; + if ((long)m < DispatchQueue::D_NUM_CODES) { + assert(inq == &local_queue); + con = con_q.front(); + con_q.pop_front(); + } + + lock.Unlock(); inq->in_qlen--; qlen.dec(); - inq->lock.Unlock(); // done with the pipe's message queue now - + inq->lock.Unlock(); if (dequeued) inq->put(); - { - if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) { - lock.Lock(); - Connection *con = remote_reset_q.front(); - remote_reset_q.pop_front(); - lock.Unlock(); - msgr->ms_deliver_handle_remote_reset(con); - con->put(); - } else if ((long)m == DispatchQueue::D_CONNECT) { - lock.Lock(); - Connection *con = connect_q.front(); - connect_q.pop_front(); - lock.Unlock(); - msgr->ms_deliver_handle_connect(con); - con->put(); - } else if ((long)m == DispatchQueue::D_ACCEPT) { - lock.Lock(); - Connection *con = accept_q.front(); - accept_q.pop_front(); - lock.Unlock(); - msgr->ms_deliver_handle_accept(con); - con->put(); - } else if ((long)m == DispatchQueue::D_BAD_RESET) { - lock.Lock(); - Connection *con = reset_q.front(); - reset_q.pop_front(); - lock.Unlock(); - msgr->ms_deliver_handle_reset(con); - con->put(); - } else { - uint64_t msize = m->get_dispatch_throttle_size(); - m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message. + if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) { + msgr->ms_deliver_handle_remote_reset(con); + con->put(); + } else if ((long)m == DispatchQueue::D_CONNECT) { + msgr->ms_deliver_handle_connect(con); + con->put(); + } else if ((long)m == DispatchQueue::D_ACCEPT) { + msgr->ms_deliver_handle_accept(con); + con->put(); + } else if ((long)m == DispatchQueue::D_BAD_RESET) { + msgr->ms_deliver_handle_reset(con); + con->put(); + } else { + uint64_t msize = m->get_dispatch_throttle_size(); + m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message. - ldout(cct,1) << "<== " << m->get_source_inst() - << " " << m->get_seq() - << " ==== " << *m - << " ==== " << m->get_payload().length() << "+" << m->get_middle().length() - << "+" << m->get_data().length() - << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc - << " " << m->get_footer().data_crc << ")" - << " " << m << " con " << m->get_connection() - << dendl; - msgr->ms_deliver_dispatch(m); + ldout(cct,1) << "<== " << m->get_source_inst() + << " " << m->get_seq() + << " ==== " << *m + << " ==== " << m->get_payload().length() << "+" << m->get_middle().length() + << "+" << m->get_data().length() + << " (" << m->get_footer().front_crc << " " << m->get_footer().middle_crc + << " " << m->get_footer().data_crc << ")" + << " " << m << " con " << m->get_connection() + << dendl; + msgr->ms_deliver_dispatch(m); - msgr->dispatch_throttle_release(msize); + msgr->dispatch_throttle_release(msize); - ldout(cct,20) << "done calling dispatch on " << m << dendl; - } + ldout(cct,20) << "done calling dispatch on " << m << dendl; } + lock.Lock(); } if (!stop) diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index 37853a7d45a..a0a25940f5f 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -41,7 +41,7 @@ struct IncomingQueue : public RefCountedObject { map::item* > queue_items; // protected by pipe_lock AND q.lock bool halt; - void queue(Message *m, int priority); + void queue(Message *m, int priority, bool hold_dq_lock=false); void discard_queue(); void restart_queue(); @@ -86,9 +86,7 @@ struct DispatchQueue { atomic_t qlen; enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES }; - list connect_q, accept_q; - list remote_reset_q; - list reset_q; + list con_q; IncomingQueue local_queue; @@ -121,9 +119,9 @@ struct DispatchQueue { lock.Unlock(); return; } - connect_q.push_back(con->get()); + con_q.push_back(con->get()); + local_queue.queue((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST, true); lock.Unlock(); - local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST); } void queue_accept(Connection *con) { lock.Lock(); @@ -131,9 +129,9 @@ struct DispatchQueue { lock.Unlock(); return; } - accept_q.push_back(con->get()); + con_q.push_back(con->get()); + local_queue.queue((Message*)D_ACCEPT, CEPH_MSG_PRIO_HIGHEST, true); lock.Unlock(); - local_delivery((Message*)D_ACCEPT, CEPH_MSG_PRIO_HIGHEST); } void queue_remote_reset(Connection *con) { lock.Lock(); @@ -141,9 +139,9 @@ struct DispatchQueue { lock.Unlock(); return; } - remote_reset_q.push_back(con->get()); + con_q.push_back(con->get()); + local_queue.queue((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST, true); lock.Unlock(); - local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST); } void queue_reset(Connection *con) { lock.Lock(); @@ -151,9 +149,9 @@ struct DispatchQueue { lock.Unlock(); return; } - reset_q.push_back(con->get()); + con_q.push_back(con->get()); + local_queue.queue((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST, true); lock.Unlock(); - local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST); } void start();