*** empty log message ***

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@329 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sbrandt 2005-06-22 03:13:02 +00:00
parent 4434bcad24
commit e880236c84
3 changed files with 153 additions and 107 deletions

View File

@ -1,99 +0,0 @@
#include "include/types.h"
#include "OSD.h"
#include "FakeStore.h"
#include "OSDCluster.h"
#include "mds/MDS.h"
#include "msg/Messenger.h"
#include "msg/Message.h"
#include "msg/HostMonitor.h"
#include "messages/MGenericMessage.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "messages/MOSDGetClusterAck.h"
#include "common/Logger.h"
#include "common/LogType.h"
#include "common/Mutex.h"
#include "OSD/ThreadPool.h"
#include <queue>
#include <iostream>
#include <cassert>
#include <errno.h>
#include <sys/stat.h>
void main(int argc, char *argv) {
ThreadPool t(10);
}
ThreadPool::Threadpool(int howmany) {
num_ops = 0;
num_threads = 0;
int status;
num_threads = howmany;
for(int i = 0; i < howmany; i++) {
status = pthread_create(thread[i], NULL, do_ops, (void *)&i);
}
}
ThreadPool::~Threadpool() {
queue_lock.Lock();
for(int i = num_ops; i > 0; i--)
get_op();
for(int i = 0; i < num_threads; i++) {
put_op((MOSDOp *)NULL);
}
for(int i = 0; i < num_threads; i++) {
cout << "Waiting for thread " << i << " to die";
pthread_join(threads[i]);
}
queue_lock.Unlock();
}
void do_ops(void *whoami) {
MOSDOp *op;
cout << "Thread " << (int)i << " ready for action\n";
while(1) {
op = get_op();
if(op == NULL) {
cout << "Thread " << (int)i << " dying";
pthread_exit(0);
}
OSD.do_op(op);
}
}
MOSDOp *get_op() {
MOSDOp *op;
queue_lock.Lock();
op = op_queue.pop();
num_ops--;
queue_lock.Unlock();
}
void put_op(MOSDOp *op) {
queue_lock.Lock();
opqueue.push(op);
num_ops++;
queue_lock.Unlock();
}

View File

@ -1,19 +1,118 @@
#ifndef THREADPOOL
#define THREADPOOL
#include <queue>
#include<semaphore.h>
#include <pthread.h>
using namespace std;
#define MAX_THREADS 1000
class Semaphore {
sem_t sem;
public:
Semaphore(int i) {
sem_init(&sem, 0, i);
}
void get() {
sem_wait(&sem);
}
void put() {
sem_post(&sem);
}
};
template <class T>
class ThreadPool {
queue<MOSDOp *> op_queue;
Mutex queue_lock;
private:
queue<T *> q;
Semaphore q_lock(1);
Semaphore q_sem(0);
pthread_t thread[MAX_THREADS];
int num_ops;
int num_threads;
void (*func)(T*);
ThreadPool::Threadpool(int howmany);
static void *foo(void *arg) {
ThreadPool *t = (ThreadPool *)arg;
t->do_ops(arg);
}
ThreadPool::~Threadpool();
void * do_ops(void *nothing) {
T* op;
void put_op(MOSDOp *op);
cout << "Thread ready for action\n";
while(1) {
q_sem.get();
op = get_op();
void do_ops(void *whoami);
if(op == NULL) {
cout << "Thread exiting\n";
pthread_exit(0);
}
func(op);
}
}
MOSDOp *get_op();
}
T* get_op() {
T* op;
q_lock.get();
op = q.front();
q.pop();
num_ops--;
q_lock.put();
return op;
}
public:
ThreadPool(int howmany, void (*f)(T*)) {
num_ops = 0;
num_threads = 0;
int status;
func = f;
num_threads = howmany;
for(int i = 0; i < howmany; i++) {
status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this);
}
}
~ThreadPool() {
q_lock.get();
for(int i = num_ops; i > 0; i--)
get_op();
for(int i = 0; i < num_threads; i++) {
put_op((T*)NULL);
}
q_lock.put();
for(int i = 0; i < num_threads; i++) {
cout << "Waiting for thread " << i << " to die\n";
pthread_join(thread[i], NULL);
}
}
void put_op(T* op) {
q_lock.get();
q.push(op);
num_ops++;
q_sem.put();
q_lock.put();
}
};
#endif

46
ceph/osd/tp.cc Normal file
View File

@ -0,0 +1,46 @@
#include <iostream>
#include <string>
#include <stdlib.h>
using namespace std;
#include "common/Mutex.h"
#include "osd/ThreadPool.h"
#include <thread.h>
class Op {
int i;
public:
Op(int i) {
this->i = i;
}
int get() {
return i;
}
};
void foo(Op *o) {
cout << "Thread "<< thr_self() << ": " << o->get() << "\n";
usleep(1);
}
int main(int argc, char *argv) {
ThreadPool<Op> *t = new ThreadPool<Op>(10, foo);
sleep(1);
for(int i = 0; i < 100; i++) {
Op *o = new Op(i);
t->put_op(o);
}
sleep(1);
delete(t);
return 0;
}