From 7642998871e114cb42d71f1f5e02636d825031b1 Mon Sep 17 00:00:00 2001 From: sage Date: Fri, 10 Feb 2006 06:46:29 +0000 Subject: [PATCH] fixed op queueing/threading stupidity git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@603 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/osd/OSD.cc | 55 ++++++++++++++++++++++++++++++++++--------------- ceph/osd/OSD.h | 17 +++++++-------- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index ed88766e662..13bc413f5c0 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -151,10 +151,9 @@ OSD::OSD(int id, Messenger *m) { char name[80]; sprintf(name,"osd%d.threadpool", whoami); - threadpool = new ThreadPool(name, g_conf.osd_maxthreads, - static_doop, - this, - static_dequeueop); + threadpool = new ThreadPool(name, g_conf.osd_maxthreads, + static_dequeueop, + this); } } @@ -218,6 +217,12 @@ int OSD::shutdown() void OSD::lock_object(object_t oid) { osd_lock.Lock(); + _lock_object(oid); + osd_lock.Unlock(); +} + +void OSD::_lock_object(object_t oid) +{ if (object_lock.count(oid)) { Cond c; dout(0) << "lock_object " << hex << oid << dec << " waiting as " << &c << endl; @@ -228,7 +233,6 @@ void OSD::lock_object(object_t oid) dout(15) << "lock_object " << hex << oid << dec << endl; object_lock.insert(oid); } - osd_lock.Unlock(); } void OSD::unlock_object(object_t oid) @@ -1993,33 +1997,50 @@ void OSD::handle_op(MOSDOp *op) } // queue op + object_t oid = op->get_oid(); + op_queue[oid].push_back(op); pending_ops++; + if (g_conf.osd_maxthreads < 1) { + // do it now osd_lock.Unlock(); { - dequeue_op(op); - do_op(op); + dequeue_op(oid); } osd_lock.Lock(); } else { - osd_lock.Unlock(); // because put_op might block, bc threadpool may be calling dequeue_op w/ q_lock - { - threadpool->put_op(op); - } - osd_lock.Lock(); + threadpool->put_op(oid); } } /* - * called serially (but in worker thread) as items are dequeued from the threadpool */ -void OSD::dequeue_op(MOSDOp *op) +void OSD::dequeue_op(object_t oid) { - dout(10) << "dequeue_op " << op << endl; - lock_object(op->get_oid()); + MOSDOp *op; + + osd_lock.Lock(); + { + // lock oid + _lock_object(oid); + + // get pending op + list &ls = op_queue[oid]; + assert(!ls.empty()); + op = ls.front(); + ls.pop_front(); + if (ls.empty()) + op_queue.erase(oid); + } + osd_lock.Unlock(); + + dout(10) << "dequeue_op " << hex << oid << dec << " op " << op << ", " << (pending_ops-1) << " others pending" << endl; + do_op(op); } + + /* * called asynchronously by worker thread after items are dequeued */ @@ -2030,7 +2051,7 @@ void OSD::do_op(MOSDOp *op) logger->inc("op"); object_t oid = op->get_oid(); - //lock_object(oid); // dequeue_op does this now + //lock_object(oid); // done by dequeue_op now // replication ops? if (OSD_OP_IS_REP(op->get_op())) { diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 5ff731fed1f..c3f6fa8361c 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -78,6 +78,7 @@ class OSD : public Dispatcher { hash_set object_lock; hash_map > object_lock_waiters; void lock_object(object_t oid); + void _lock_object(object_t oid); void unlock_object(object_t oid); // finished waiting messages, that will go at tail of dispatch() @@ -92,12 +93,12 @@ class OSD : public Dispatcher { // -- ops -- - class ThreadPool *threadpool; + class ThreadPool *threadpool; + hash_map > op_queue; int pending_ops; bool waiting_for_no_ops; Cond no_pending_ops; - void queue_op(class MOSDOp *m); void wait_for_no_ops(); int apply_write(MOSDOp *op, version_t v, @@ -107,14 +108,12 @@ class OSD : public Dispatcher { void get_repop(OSDReplicaOp*); void put_repop(OSDReplicaOp*); // will send ack/safe msgs, and delete as necessary. - public: void do_op(class MOSDOp *m); - static void static_doop(OSD *o, MOSDOp *op) { - o->do_op(op); - }; - void dequeue_op(class MOSDOp *m); - static void static_dequeueop(OSD *o, MOSDOp *op) { - o->dequeue_op(op); + + public: + void dequeue_op(object_t oid); + static void static_dequeueop(OSD *o, object_t oid) { + o->dequeue_op(oid); }; protected: