mirror of
https://github.com/ceph/ceph
synced 2025-03-06 08:20:12 +00:00
fixed op queueing/threading stupidity
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@603 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
7f141a724c
commit
7642998871
@ -151,10 +151,9 @@ OSD::OSD(int id, Messenger *m)
|
||||
{
|
||||
char name[80];
|
||||
sprintf(name,"osd%d.threadpool", whoami);
|
||||
threadpool = new ThreadPool<OSD, MOSDOp>(name, g_conf.osd_maxthreads,
|
||||
static_doop,
|
||||
this,
|
||||
static_dequeueop);
|
||||
threadpool = new ThreadPool<OSD*, object_t>(name, g_conf.osd_maxthreads,
|
||||
static_dequeueop,
|
||||
this);
|
||||
}
|
||||
}
|
||||
|
||||
@ -218,6 +217,12 @@ int OSD::shutdown()
|
||||
void OSD::lock_object(object_t oid)
|
||||
{
|
||||
osd_lock.Lock();
|
||||
_lock_object(oid);
|
||||
osd_lock.Unlock();
|
||||
}
|
||||
|
||||
void OSD::_lock_object(object_t oid)
|
||||
{
|
||||
if (object_lock.count(oid)) {
|
||||
Cond c;
|
||||
dout(0) << "lock_object " << hex << oid << dec << " waiting as " << &c << endl;
|
||||
@ -228,7 +233,6 @@ void OSD::lock_object(object_t oid)
|
||||
dout(15) << "lock_object " << hex << oid << dec << endl;
|
||||
object_lock.insert(oid);
|
||||
}
|
||||
osd_lock.Unlock();
|
||||
}
|
||||
|
||||
void OSD::unlock_object(object_t oid)
|
||||
@ -1993,33 +1997,50 @@ void OSD::handle_op(MOSDOp *op)
|
||||
}
|
||||
|
||||
// queue op
|
||||
object_t oid = op->get_oid();
|
||||
op_queue[oid].push_back(op);
|
||||
pending_ops++;
|
||||
|
||||
if (g_conf.osd_maxthreads < 1) {
|
||||
// do it now
|
||||
osd_lock.Unlock();
|
||||
{
|
||||
dequeue_op(op);
|
||||
do_op(op);
|
||||
dequeue_op(oid);
|
||||
}
|
||||
osd_lock.Lock();
|
||||
} else {
|
||||
osd_lock.Unlock(); // because put_op might block, bc threadpool may be calling dequeue_op w/ q_lock
|
||||
{
|
||||
threadpool->put_op(op);
|
||||
}
|
||||
osd_lock.Lock();
|
||||
threadpool->put_op(oid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* called serially (but in worker thread) as items are dequeued from the threadpool
|
||||
*/
|
||||
void OSD::dequeue_op(MOSDOp *op)
|
||||
void OSD::dequeue_op(object_t oid)
|
||||
{
|
||||
dout(10) << "dequeue_op " << op << endl;
|
||||
lock_object(op->get_oid());
|
||||
MOSDOp *op;
|
||||
|
||||
osd_lock.Lock();
|
||||
{
|
||||
// lock oid
|
||||
_lock_object(oid);
|
||||
|
||||
// get pending op
|
||||
list<MOSDOp*> &ls = op_queue[oid];
|
||||
assert(!ls.empty());
|
||||
op = ls.front();
|
||||
ls.pop_front();
|
||||
if (ls.empty())
|
||||
op_queue.erase(oid);
|
||||
}
|
||||
osd_lock.Unlock();
|
||||
|
||||
dout(10) << "dequeue_op " << hex << oid << dec << " op " << op << ", " << (pending_ops-1) << " others pending" << endl;
|
||||
do_op(op);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* called asynchronously by worker thread after items are dequeued
|
||||
*/
|
||||
@ -2030,7 +2051,7 @@ void OSD::do_op(MOSDOp *op)
|
||||
logger->inc("op");
|
||||
|
||||
object_t oid = op->get_oid();
|
||||
//lock_object(oid); // dequeue_op does this now
|
||||
//lock_object(oid); // done by dequeue_op now
|
||||
|
||||
// replication ops?
|
||||
if (OSD_OP_IS_REP(op->get_op())) {
|
||||
|
@ -78,6 +78,7 @@ class OSD : public Dispatcher {
|
||||
hash_set<object_t> object_lock;
|
||||
hash_map<object_t, list<Cond*> > object_lock_waiters;
|
||||
void lock_object(object_t oid);
|
||||
void _lock_object(object_t oid);
|
||||
void unlock_object(object_t oid);
|
||||
|
||||
// finished waiting messages, that will go at tail of dispatch()
|
||||
@ -92,12 +93,12 @@ class OSD : public Dispatcher {
|
||||
|
||||
|
||||
// -- ops --
|
||||
class ThreadPool<class OSD, class MOSDOp> *threadpool;
|
||||
class ThreadPool<class OSD*, object_t> *threadpool;
|
||||
hash_map<object_t, list<MOSDOp*> > op_queue;
|
||||
int pending_ops;
|
||||
bool waiting_for_no_ops;
|
||||
Cond no_pending_ops;
|
||||
|
||||
void queue_op(class MOSDOp *m);
|
||||
void wait_for_no_ops();
|
||||
|
||||
int apply_write(MOSDOp *op, version_t v,
|
||||
@ -107,14 +108,12 @@ class OSD : public Dispatcher {
|
||||
void get_repop(OSDReplicaOp*);
|
||||
void put_repop(OSDReplicaOp*); // will send ack/safe msgs, and delete as necessary.
|
||||
|
||||
public:
|
||||
void do_op(class MOSDOp *m);
|
||||
static void static_doop(OSD *o, MOSDOp *op) {
|
||||
o->do_op(op);
|
||||
};
|
||||
void dequeue_op(class MOSDOp *m);
|
||||
static void static_dequeueop(OSD *o, MOSDOp *op) {
|
||||
o->dequeue_op(op);
|
||||
|
||||
public:
|
||||
void dequeue_op(object_t oid);
|
||||
static void static_dequeueop(OSD *o, object_t oid) {
|
||||
o->dequeue_op(oid);
|
||||
};
|
||||
|
||||
protected:
|
||||
|
Loading…
Reference in New Issue
Block a user