mirror of
https://github.com/ceph/ceph
synced 2025-01-19 17:41:39 +00:00
*** empty log message ***
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@335 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
b26640967b
commit
f5ae8145fa
@ -11,7 +11,7 @@ using namespace std;
|
||||
|
||||
#define MAX_THREADS 1000
|
||||
|
||||
template <class T>
|
||||
template <class U, class T>
|
||||
class ThreadPool {
|
||||
|
||||
private:
|
||||
@ -21,7 +21,8 @@ class ThreadPool {
|
||||
pthread_t *thread;
|
||||
int num_ops;
|
||||
int num_threads;
|
||||
void (*func)(T*);
|
||||
void (*func)(U*,T*);
|
||||
U *u;
|
||||
|
||||
static void *foo(void *arg)
|
||||
{
|
||||
@ -33,7 +34,7 @@ class ThreadPool {
|
||||
{
|
||||
T* op;
|
||||
|
||||
cout << "Thread ready for action\n";
|
||||
cout << "Thread "<< pthread_self() << " ready for action\n";
|
||||
while(1) {
|
||||
q_sem.Get();
|
||||
op = get_op();
|
||||
@ -42,7 +43,8 @@ class ThreadPool {
|
||||
cout << "Thread exiting\n";
|
||||
pthread_exit(0);
|
||||
}
|
||||
func(op);
|
||||
cout << "Thread "<< pthread_self() << " calling the function\n";
|
||||
func(u, op);
|
||||
}
|
||||
}
|
||||
|
||||
@ -62,10 +64,11 @@ class ThreadPool {
|
||||
|
||||
public:
|
||||
|
||||
ThreadPool(int howmany, void (*f)(T*))
|
||||
ThreadPool(int howmany, void (*f)(U*,T*), U *obj)
|
||||
{
|
||||
int status;
|
||||
|
||||
u = obj;
|
||||
num_ops = 0;
|
||||
func = f;
|
||||
num_threads = howmany;
|
||||
|
@ -22,6 +22,8 @@
|
||||
#include "common/Logger.h"
|
||||
#include "common/LogType.h"
|
||||
|
||||
#include "common/ThreadPool.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <cassert>
|
||||
#include <errno.h>
|
||||
@ -40,6 +42,7 @@ char *osd_base_path = "./osddata";
|
||||
|
||||
LogType osd_logtype;
|
||||
|
||||
|
||||
OSD::OSD(int id, Messenger *m)
|
||||
{
|
||||
whoami = id;
|
||||
@ -70,13 +73,13 @@ OSD::OSD(int id, Messenger *m)
|
||||
|
||||
monitor->get_notify().insert(MSG_ADDR_MDS(0));
|
||||
|
||||
|
||||
|
||||
// log
|
||||
char name[80];
|
||||
sprintf(name, "osd%02d", whoami);
|
||||
logger = new Logger(name, (LogType*)&osd_logtype);
|
||||
|
||||
// Thread pool
|
||||
threadpool = new ThreadPool<OSD, MOSDOp>(10, (void (*)(OSD*, MOSDOp*))doop, this);
|
||||
}
|
||||
|
||||
OSD::~OSD()
|
||||
@ -241,31 +244,36 @@ void OSD::handle_op(MOSDOp *op)
|
||||
|
||||
// am i the right rg_role?
|
||||
if (0) {
|
||||
repgroup_t rg = op->get_rg();
|
||||
if (op->get_rg_role() == 0) {
|
||||
// PRIMARY
|
||||
repgroup_t rg = op->get_rg();
|
||||
if (op->get_rg_role() == 0) {
|
||||
// PRIMARY
|
||||
|
||||
// verify that we are primary, or acting primary
|
||||
int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
|
||||
if (acting_primary != whoami) {
|
||||
dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
|
||||
messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
|
||||
logger->inc("fwd");
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// REPLICA
|
||||
int my_role = osdcluster->get_rg_role(rg, whoami);
|
||||
|
||||
dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
|
||||
|
||||
if (my_role != op->get_rg_role()) {
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
// verify that we are primary, or acting primary
|
||||
int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
|
||||
if (acting_primary != whoami) {
|
||||
dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
|
||||
messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
|
||||
logger->inc("fwd");
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// REPLICA
|
||||
int my_role = osdcluster->get_rg_role(rg, whoami);
|
||||
|
||||
dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
|
||||
|
||||
if (my_role != op->get_rg_role()) {
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
do_op(op);
|
||||
queue_op(op);
|
||||
// do_op(op);
|
||||
}
|
||||
|
||||
void OSD::queue_op(MOSDOp *op) {
|
||||
threadpool->put_op(op);
|
||||
}
|
||||
|
||||
void OSD::do_op(MOSDOp *op)
|
||||
@ -437,3 +445,7 @@ void OSD::op_stat(MOSDOp *op)
|
||||
logger->inc("stat");
|
||||
delete op;
|
||||
}
|
||||
|
||||
void doop(OSD *u, MOSDOp *p) {
|
||||
u->do_op(p);
|
||||
}
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include "msg/Dispatcher.h"
|
||||
|
||||
#include "common/Mutex.h"
|
||||
#include "common/ThreadPool.h"
|
||||
|
||||
#include <map>
|
||||
using namespace std;
|
||||
@ -44,7 +45,7 @@ class OSD : public Dispatcher {
|
||||
class ObjectStore *store;
|
||||
class HostMonitor *monitor;
|
||||
class Logger *logger;
|
||||
class ThreadPool *threadpool;
|
||||
class ThreadPool<class OSD, class MOSDOp> *threadpool;
|
||||
|
||||
list<class MOSDOp*> waiting_for_osdcluster;
|
||||
|
||||
@ -61,7 +62,12 @@ class OSD : public Dispatcher {
|
||||
// OSDCluster
|
||||
void update_osd_cluster(__uint64_t ocv, bufferlist& blist);
|
||||
|
||||
void queue_op(class MOSDOp *m);
|
||||
void do_op(class MOSDOp *m);
|
||||
static void doop(OSD *o, MOSDOp *op) {
|
||||
o->do_op(op);
|
||||
};
|
||||
|
||||
|
||||
// messages
|
||||
virtual void dispatch(Message *m);
|
||||
|
@ -5,7 +5,7 @@
|
||||
using namespace std;
|
||||
|
||||
#include "common/Mutex.h"
|
||||
#include "osd/ThreadPool.h"
|
||||
#include "common/ThreadPool.h"
|
||||
// #include <thread.h>
|
||||
|
||||
class Op {
|
||||
@ -24,27 +24,43 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
void foo(Op *o)
|
||||
{
|
||||
cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
|
||||
usleep(1);
|
||||
|
||||
// sched_yield();
|
||||
}
|
||||
void foop(class TP *t, class Op *o);
|
||||
|
||||
int main(int argc, char *argv)
|
||||
{
|
||||
ThreadPool<Op> *t = new ThreadPool<Op>(10, foo);
|
||||
class TP {
|
||||
public:
|
||||
|
||||
for(int i = 0; i < 100; i++) {
|
||||
Op *o = new Op(i);
|
||||
t->put_op(o);
|
||||
void foo(Op *o)
|
||||
{
|
||||
cout << "Thread "<< pthread_self() << ": " << o->get() << "\n";
|
||||
usleep(1);
|
||||
|
||||
// sched_yield();
|
||||
}
|
||||
|
||||
sleep(1);
|
||||
int main(int argc, char *argv)
|
||||
{
|
||||
ThreadPool<TP,Op> *t = new ThreadPool<TP,Op>(10, (void (*)(TP*, Op*))foop, this);
|
||||
|
||||
for(int i = 0; i < 100; i++) {
|
||||
Op *o = new Op(i);
|
||||
t->put_op(o);
|
||||
}
|
||||
|
||||
sleep(1);
|
||||
|
||||
delete(t);
|
||||
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
delete(t);
|
||||
|
||||
return 0;
|
||||
void foop(class TP *t, class Op *o) {
|
||||
t->foo(o);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv) {
|
||||
TP t;
|
||||
|
||||
t.main(argc,argv);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user