From e880236c84dfd32157f5880aaa211ffa6089e63b Mon Sep 17 00:00:00 2001 From: sbrandt Date: Wed, 22 Jun 2005 03:13:02 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@329 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/osd/ThreadPool.cc | 99 ----------------------------------- ceph/osd/ThreadPool.h | 115 ++++++++++++++++++++++++++++++++++++++--- ceph/osd/tp.cc | 46 +++++++++++++++++ 3 files changed, 153 insertions(+), 107 deletions(-) delete mode 100644 ceph/osd/ThreadPool.cc create mode 100644 ceph/osd/tp.cc diff --git a/ceph/osd/ThreadPool.cc b/ceph/osd/ThreadPool.cc deleted file mode 100644 index 19523df535b..00000000000 --- a/ceph/osd/ThreadPool.cc +++ /dev/null @@ -1,99 +0,0 @@ -#include "include/types.h" - -#include "OSD.h" -#include "FakeStore.h" -#include "OSDCluster.h" - -#include "mds/MDS.h" - -#include "msg/Messenger.h" -#include "msg/Message.h" - -#include "msg/HostMonitor.h" - -#include "messages/MGenericMessage.h" -#include "messages/MPing.h" -#include "messages/MPingAck.h" -#include "messages/MOSDOp.h" -#include "messages/MOSDOpReply.h" -#include "messages/MOSDGetClusterAck.h" - -#include "common/Logger.h" -#include "common/LogType.h" -#include "common/Mutex.h" - -#include "OSD/ThreadPool.h" - -#include - -#include -#include -#include -#include - -void main(int argc, char *argv) { - ThreadPool t(10); - -} - -ThreadPool::Threadpool(int howmany) { - num_ops = 0; - num_threads = 0; - - int status; - - num_threads = howmany; - - for(int i = 0; i < howmany; i++) { - status = pthread_create(thread[i], NULL, do_ops, (void *)&i); - } -} - -ThreadPool::~Threadpool() { - queue_lock.Lock(); - for(int i = num_ops; i > 0; i--) - get_op(); - - for(int i = 0; i < num_threads; i++) { - put_op((MOSDOp *)NULL); - } - - for(int i = 0; i < num_threads; i++) { - cout << "Waiting for thread " << i << " to die"; - pthread_join(threads[i]); - } - - queue_lock.Unlock(); -} - -void do_ops(void *whoami) { - MOSDOp *op; - - cout << "Thread " << (int)i << " ready for action\n"; - while(1) { - op = get_op(); - - if(op == NULL) { - cout << "Thread " << (int)i << " dying"; - pthread_exit(0); - } - - OSD.do_op(op); - } -} - -MOSDOp *get_op() { - MOSDOp *op; - queue_lock.Lock(); - op = op_queue.pop(); - num_ops--; - queue_lock.Unlock(); -} - -void put_op(MOSDOp *op) { - queue_lock.Lock(); - opqueue.push(op); - num_ops++; - queue_lock.Unlock(); -} - diff --git a/ceph/osd/ThreadPool.h b/ceph/osd/ThreadPool.h index 135c5deae9d..99c1bc48d99 100644 --- a/ceph/osd/ThreadPool.h +++ b/ceph/osd/ThreadPool.h @@ -1,19 +1,118 @@ +#ifndef THREADPOOL +#define THREADPOOL + +#include +#include +#include + +using namespace std; + #define MAX_THREADS 1000 +class Semaphore { + sem_t sem; + + public: + + Semaphore(int i) { + sem_init(&sem, 0, i); + } + + void get() { + sem_wait(&sem); + } + + void put() { + sem_post(&sem); + } +}; + +template class ThreadPool { - queue op_queue; - Mutex queue_lock; + + private: + queue q; + Semaphore q_lock(1); + Semaphore q_sem(0); pthread_t thread[MAX_THREADS]; int num_ops; int num_threads; + void (*func)(T*); - ThreadPool::Threadpool(int howmany); + static void *foo(void *arg) { + ThreadPool *t = (ThreadPool *)arg; + t->do_ops(arg); + } - ThreadPool::~Threadpool(); + void * do_ops(void *nothing) { + T* op; - void put_op(MOSDOp *op); + cout << "Thread ready for action\n"; + while(1) { + q_sem.get(); + op = get_op(); - void do_ops(void *whoami); + if(op == NULL) { + cout << "Thread exiting\n"; + pthread_exit(0); + } + func(op); + } + } - MOSDOp *get_op(); -} + + T* get_op() { + T* op; + + q_lock.get(); + op = q.front(); + q.pop(); + num_ops--; + q_lock.put(); + + return op; + } + + public: + ThreadPool(int howmany, void (*f)(T*)) { + num_ops = 0; + num_threads = 0; + + int status; + + func = f; + + num_threads = howmany; + + for(int i = 0; i < howmany; i++) { + status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this); + } + } + + ~ThreadPool() { + q_lock.get(); + for(int i = num_ops; i > 0; i--) + get_op(); + + for(int i = 0; i < num_threads; i++) { + put_op((T*)NULL); + } + q_lock.put(); + + for(int i = 0; i < num_threads; i++) { + cout << "Waiting for thread " << i << " to die\n"; + pthread_join(thread[i], NULL); + } + + } + + void put_op(T* op) { + q_lock.get(); + q.push(op); + num_ops++; + q_sem.put(); + q_lock.put(); + } + +}; +#endif diff --git a/ceph/osd/tp.cc b/ceph/osd/tp.cc new file mode 100644 index 00000000000..c95a6465061 --- /dev/null +++ b/ceph/osd/tp.cc @@ -0,0 +1,46 @@ + +#include +#include +#include + +using namespace std; + +#include "common/Mutex.h" +#include "osd/ThreadPool.h" +#include + +class Op { + int i; + +public: + + Op(int i) { + this->i = i; + } + + int get() { + return i; + } +}; + +void foo(Op *o) { + cout << "Thread "<< thr_self() << ": " << o->get() << "\n"; + usleep(1); +} + +int main(int argc, char *argv) { + ThreadPool *t = new ThreadPool(10, foo); + + sleep(1); + + for(int i = 0; i < 100; i++) { + Op *o = new Op(i); + t->put_op(o); + } + + sleep(1); + delete(t); + + return 0; +} +