mirror of
https://github.com/ceph/ceph
synced 2025-01-03 17:42:36 +00:00
msgr: avoid copying Pipe* xlist
Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
This commit is contained in:
parent
ce6f5788c2
commit
d3d115964f
@ -290,12 +290,12 @@ void SimpleMessenger::dispatch_entry()
|
||||
while (!dispatch_queue.stop) {
|
||||
while (!dispatch_queue.queued_pipes.empty() && !dispatch_queue.stop) {
|
||||
//get highest-priority pipe
|
||||
map<int, xlist<Pipe *> >::reverse_iterator high_iter =
|
||||
map<int, xlist<Pipe *>* >::reverse_iterator high_iter =
|
||||
dispatch_queue.queued_pipes.rbegin();
|
||||
int priority = high_iter->first;
|
||||
xlist<Pipe *>& pipe_list = high_iter->second;
|
||||
xlist<Pipe *> *pipe_list = high_iter->second;
|
||||
|
||||
Pipe *pipe = pipe_list.front();
|
||||
Pipe *pipe = pipe_list->front();
|
||||
//move pipe to back of line -- or just take off if no more messages
|
||||
pipe->pipe_lock.Lock();
|
||||
list<Message *>& m_queue = pipe->in_q[priority];
|
||||
@ -303,11 +303,13 @@ void SimpleMessenger::dispatch_entry()
|
||||
m_queue.pop_front();
|
||||
|
||||
if (m_queue.empty()) {
|
||||
pipe_list.pop_front(); // pipe is done
|
||||
if (pipe_list.empty())
|
||||
pipe_list->pop_front(); // pipe is done
|
||||
if (pipe_list->empty()) {
|
||||
delete pipe_list;
|
||||
dispatch_queue.queued_pipes.erase(priority);
|
||||
}
|
||||
} else {
|
||||
pipe_list.push_back(pipe->queue_items[priority]); // move to end of list
|
||||
pipe_list->push_back(pipe->queue_items[priority]); // move to end of list
|
||||
}
|
||||
ldout(cct,20) << "dispatch_entry pipe " << pipe << " dequeued " << m << dendl;
|
||||
dispatch_queue.lock.Unlock(); //done with the pipe queue for a while
|
||||
@ -554,7 +556,16 @@ void SimpleMessenger::Pipe::queue_received(Message *m, int priority)
|
||||
queue_items[priority] = new xlist<Pipe *>::item(this);
|
||||
if (msgr->dispatch_queue.queued_pipes.empty())
|
||||
msgr->dispatch_queue.cond.Signal();
|
||||
msgr->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
|
||||
|
||||
map<int, xlist<Pipe*>*>::iterator p = msgr->dispatch_queue.queued_pipes.find(priority);
|
||||
xlist<Pipe*> *pipe_list;
|
||||
if (p != msgr->dispatch_queue.queued_pipes.end())
|
||||
pipe_list = p->second;
|
||||
else {
|
||||
pipe_list = new xlist<Pipe*>;
|
||||
msgr->dispatch_queue.queued_pipes[priority] = pipe_list;
|
||||
}
|
||||
pipe_list->push_back(queue_items[priority]);
|
||||
}
|
||||
|
||||
queue.push_back(m);
|
||||
@ -1362,8 +1373,10 @@ void SimpleMessenger::Pipe::discard_queue()
|
||||
xlist<Pipe *>* list_on;
|
||||
if ((list_on = i->second->get_list())) { //if in round-robin
|
||||
i->second->remove_myself(); //take off
|
||||
if (list_on->empty()) //if round-robin queue is empty
|
||||
if (list_on->empty()) { //if round-robin queue is empty
|
||||
delete list_on;
|
||||
q.queued_pipes.erase(i->first); //remove from map
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -215,6 +215,9 @@ private:
|
||||
friend class Writer;
|
||||
|
||||
public:
|
||||
Pipe(const Pipe& other);
|
||||
const Pipe& operator=(const Pipe& other);
|
||||
|
||||
Pipe(SimpleMessenger *r, int st) :
|
||||
msgr(r),
|
||||
sd(-1),
|
||||
@ -366,7 +369,7 @@ private:
|
||||
Cond cond;
|
||||
bool stop;
|
||||
|
||||
map<int, xlist<Pipe *> > queued_pipes;
|
||||
map<int, xlist<Pipe *>* > queued_pipes;
|
||||
map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
|
||||
atomic_t qlen;
|
||||
|
||||
@ -414,10 +417,11 @@ private:
|
||||
local_pipe(NULL)
|
||||
{}
|
||||
~DispatchQueue() {
|
||||
for (map< int, xlist<Pipe *> >::iterator i = queued_pipes.begin();
|
||||
for (map< int, xlist<Pipe *>* >::iterator i = queued_pipes.begin();
|
||||
i != queued_pipes.end();
|
||||
++i) {
|
||||
i->second.clear();
|
||||
i->second->clear();
|
||||
delete i->second;
|
||||
}
|
||||
}
|
||||
} dispatch_queue;
|
||||
|
Loading…
Reference in New Issue
Block a user