msgr: Cover qlen with a Spinlock for faster access and to limit blocking

This commit is contained in:
Greg Farnum 2009-12-10 11:16:40 -08:00
parent ec2720cc9e
commit eb371aa65e
2 changed files with 11 additions and 5 deletions

View File

@ -283,8 +283,10 @@ void SimpleMessenger::Endpoint::dispatch_entry()
}
if (pipe_list.empty())
queued_pipes.erase(priority);
--qlen;
endpoint_lock.Unlock(); //done with the pipe queue for a while
qlen_lock.lock();
--qlen;
qlen_lock.unlock();
//get message from pipe
Message *m = m_queue.front();
@ -1187,6 +1189,7 @@ void SimpleMessenger::Pipe::discard_queue()
i != queue_items.end();
++i)
i->second->remove_myself();
rank->local_endpoint->endpoint_lock.Unlock();
endpoint = true;
pipe_lock.Lock();
}
@ -1200,13 +1203,13 @@ void SimpleMessenger::Pipe::discard_queue()
for (map<int,list<Message*> >::iterator p = in_q.begin(); p != in_q.end(); p++) {
if (endpoint) {
int size = in_q.size();
rank->local_endpoint->qlen_lock.lock();
rank->local_endpoint->qlen -= size;
rank->local_endpoint->qlen_lock.unlock();
}
for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
delete *r;
}
if (endpoint)
rank->local_endpoint->endpoint_lock.Unlock();
in_q.clear();
}

View File

@ -26,6 +26,7 @@ using namespace std;
using namespace __gnu_cxx;
#include "common/Mutex.h"
#include "common/Spinlock.h"
#include "common/Cond.h"
#include "common/Thread.h"
@ -232,10 +233,10 @@ private:
pipe_lock.Unlock();
//increment queue length counter
rank->local_endpoint->endpoint_lock.Lock();
rank->local_endpoint->qlen_lock.lock();
++rank->local_endpoint->qlen;
rank->local_endpoint->qlen_lock.unlock();
rank->local_endpoint->cond.Signal();
rank->local_endpoint->endpoint_lock.Unlock();
dout(0) << "finished queuing received message " << m << "in msgr " << rank << dendl;
}
@ -321,6 +322,7 @@ private:
map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
bool stop;
int qlen;
Spinlock qlen_lock;
int my_rank;
class DispatchThread : public Thread {
@ -376,6 +378,7 @@ private:
endpoint_lock("SimpleMessenger::Endpoint::endpoint_lock"),
stop(false),
qlen(0),
qlen_lock("SimpleMessenger::Endpoint::qlen_lock"),
my_rank(rn),
dispatch_thread(this) {
local_pipe = new Pipe(r, Pipe::STATE_OPEN);