mirror of
https://github.com/ceph/ceph
synced 2025-02-21 09:57:26 +00:00
*** empty log message ***
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@182 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
2c0a604c68
commit
e24938b12c
@ -5,9 +5,9 @@ CFLAGS=-g -I. -pg -D_FILE_OFFSET_BITS=64
|
||||
#CFLAGS=-D __gnu_cxx=std -g -I. #-I/usr/lib/mpi/include -L/usr/lib/mpi/lib
|
||||
LIBS=-lpthread
|
||||
|
||||
MPICC=g++
|
||||
MPICC=/usr/local/mpich-1.2.6/bin/mpicxx
|
||||
MPICFLAGS=${CFLAGS} -I/usr/local/mpich-1.2.6/include -L/usr/local/mpich-1.2.6/lib
|
||||
MPILIBS= -lelan -lmpi ${LIBS}
|
||||
MPILIBS=-lmpich ${LIBS}
|
||||
|
||||
LEAKTRACER=
|
||||
#LEAKTRACER=$(HOME)/lib/LeakTracer.o
|
||||
@ -31,7 +31,7 @@ OBJS=osd/OSD.o\
|
||||
clock.o\
|
||||
config.o
|
||||
|
||||
TARGETS=test import pmds
|
||||
TARGETS=test import mpitest
|
||||
|
||||
all: depend ${TARGETS}
|
||||
|
||||
@ -43,8 +43,8 @@ test: test.cc msg/FakeMessenger.o pmds.o
|
||||
import: pmds.o msg/FakeMessenger.o import.cc
|
||||
${CC} ${CFLAGS} ${LIBS} pmds.o msg/FakeMessenger.o import.cc ${LEAKTRACER} -o import
|
||||
|
||||
pmds: mpitest.cc msg/MPIMessenger.cc pmds.o
|
||||
${MPICC} ${MPICFLAGS} ${MPILIBS} mpitest.cc msg/MPIMessenger.cc pmds.o -o pmds
|
||||
mpitest: mpitest.o msg/MPIMessenger.cc pmds.o
|
||||
${MPICC} ${CFLAGS} ${MPILIBS} mpitest.o msg/MPIMessenger.cc pmds.o -o mpitest
|
||||
|
||||
singleclient: pmds.o fakesingleclient.o client/Client.o msg/CheesySerializer.o msg/FakeMessenger.o
|
||||
${CC} ${CFLAGS} ${LIBS} pmds.o client/Client.o msg/FakeMessenger.o msg/CheesySerializer.o fakesingleclient.o ${LEAKTRACER} -o singleclient
|
||||
|
@ -1,7 +1,7 @@
|
||||
|
||||
me, soon:
|
||||
- foriegn renames
|
||||
- symlinks --> hard!
|
||||
- symlinks
|
||||
|
||||
- fix logging model for data safety
|
||||
- inodeupdate
|
||||
|
@ -5,6 +5,7 @@
|
||||
|
||||
#include "msg/Message.h"
|
||||
#include "msg/Dispatcher.h"
|
||||
#include "msg/Messenger.h"
|
||||
#include "msg/SerialMessenger.h"
|
||||
|
||||
//#include "msgthread.h"
|
||||
|
@ -12,9 +12,9 @@
|
||||
|
||||
|
||||
md_config_t g_conf = {
|
||||
num_mds: 13,
|
||||
num_mds: 3,
|
||||
num_osd: 10,
|
||||
num_client: 55,
|
||||
num_client: 3,
|
||||
|
||||
osd_cow: false, // crashy? true,
|
||||
|
||||
@ -59,7 +59,7 @@ md_config_t g_conf = {
|
||||
client_op_mknod: 10,
|
||||
client_op_link: false,
|
||||
client_op_unlink: 10,
|
||||
client_op_rename: 100,
|
||||
client_op_rename: 00,
|
||||
|
||||
client_op_mkdir: 10,
|
||||
client_op_rmdir: 10,
|
||||
|
@ -1076,7 +1076,7 @@ void MDCache::make_trace(vector<CDentry*>& trace, CInode *in)
|
||||
}
|
||||
|
||||
|
||||
bool MDCache::request_start(MClientRequest *req,
|
||||
bool MDCache::request_start(Message *req,
|
||||
CInode *ref,
|
||||
vector<CDentry*>& trace)
|
||||
{
|
||||
@ -1097,7 +1097,7 @@ bool MDCache::request_start(MClientRequest *req,
|
||||
}
|
||||
|
||||
|
||||
void MDCache::request_cleanup(MClientRequest *req)
|
||||
void MDCache::request_cleanup(Message *req)
|
||||
{
|
||||
assert(active_requests.count(req) == 1);
|
||||
|
||||
@ -1137,7 +1137,7 @@ void MDCache::request_cleanup(MClientRequest *req)
|
||||
active_requests.erase(req);
|
||||
}
|
||||
|
||||
void MDCache::request_finish(MClientRequest *req)
|
||||
void MDCache::request_finish(Message *req)
|
||||
{
|
||||
dout(7) << "request_finish " << *req << endl;
|
||||
request_cleanup(req);
|
||||
@ -1147,7 +1147,7 @@ void MDCache::request_finish(MClientRequest *req)
|
||||
}
|
||||
|
||||
|
||||
void MDCache::request_forward(MClientRequest *req, int who)
|
||||
void MDCache::request_forward(Message *req, int who)
|
||||
{
|
||||
dout(7) << "request_forward to " << who << " req " << *req << endl;
|
||||
request_cleanup(req);
|
||||
@ -3372,7 +3372,7 @@ void MDCache::handle_lock_dir(MLock *m)
|
||||
|
||||
// DENTRY
|
||||
|
||||
bool MDCache::dentry_xlock_start(CDentry *dn, MClientRequest *m, CInode *ref, bool all_nodes)
|
||||
bool MDCache::dentry_xlock_start(CDentry *dn, Message *m, CInode *ref, bool all_nodes)
|
||||
{
|
||||
dout(7) << "dentry_xlock_start on " << *dn << endl;
|
||||
|
||||
@ -3477,7 +3477,7 @@ bool MDCache::dentry_xlock_start(CDentry *dn, MClientRequest *m, CInode *ref, bo
|
||||
}
|
||||
|
||||
// wait
|
||||
dout(7) << "dentry_xlock_start locking, waitigg for replicas " << endl;
|
||||
dout(7) << "dentry_xlock_start locking, waiting for replicas " << endl;
|
||||
dn->dir->add_waiter(CDIR_WAIT_DNLOCK, dn->name,
|
||||
new C_MDS_RetryRequest(mds, m, ref));
|
||||
return false;
|
||||
@ -3534,22 +3534,39 @@ void MDCache::handle_lock_dn(MLock *m)
|
||||
|
||||
if (LOCK_AC_FOR_AUTH(m->get_action())) {
|
||||
// auth
|
||||
assert(diri && dir);
|
||||
int dauth = dir->dentry_authority(dname);
|
||||
assert(dauth == mds->get_nodeid() ||
|
||||
dir->is_proxy());
|
||||
|
||||
if (dir->is_proxy()) {
|
||||
// fw
|
||||
assert(dauth >= 0);
|
||||
dout(7) << "handle_lock_dn " << m << " " << m->get_ino() << " dname " << dname << " from " << from << ": proxy, fw to " << dauth << endl;
|
||||
mds->messenger->send_message(m,
|
||||
MSG_ADDR_MDS(dauth), MDS_PORT_CACHE,
|
||||
MDS_PORT_CACHE);
|
||||
return;
|
||||
|
||||
// normally we have it always
|
||||
if (diri && dir) {
|
||||
int dauth = dir->dentry_authority(dname);
|
||||
assert(dauth == mds->get_nodeid() ||
|
||||
dir->is_proxy());
|
||||
|
||||
if (dir->is_proxy()) {
|
||||
// fw
|
||||
assert(dauth >= 0);
|
||||
dout(7) << "handle_lock_dn " << m << " " << m->get_ino() << " dname " << dname << " from " << from << ": proxy, fw to " << dauth << endl;
|
||||
mds->messenger->send_message(m,
|
||||
MSG_ADDR_MDS(dauth), MDS_PORT_CACHE,
|
||||
MDS_PORT_CACHE);
|
||||
return;
|
||||
}
|
||||
|
||||
dn = dir->lookup(dname);
|
||||
}
|
||||
|
||||
dn = dir->lookup(dname);
|
||||
// except with.. an xlock request?
|
||||
if (!dn) {
|
||||
assert(m->get_action() == LOCK_AC_REQXLOCK);
|
||||
// send nak
|
||||
dout(7) << "handle_lock_dn reqxlock on " << dname << " in " << *dir << " dne, nak" << endl;
|
||||
MLock *reply = new MLock(LOCK_AC_REQXLOCKNAK, mds->get_nodeid());
|
||||
reply->set_dn(dir->ino(), dname);
|
||||
mds->messenger->send_message(reply,
|
||||
MSG_ADDR_MDS(m->get_asker()), MDS_PORT_CACHE,
|
||||
MDS_PORT_CACHE);
|
||||
delete m;
|
||||
return;
|
||||
}
|
||||
assert(dn);
|
||||
} else {
|
||||
// replica
|
||||
@ -3645,6 +3662,31 @@ void MDCache::handle_lock_dn(MLock *m)
|
||||
}
|
||||
break;
|
||||
|
||||
case LOCK_AC_REQXLOCK:
|
||||
dout(7) << "handle_lock_dn reqxlock on " << *dn << endl;
|
||||
|
||||
// start request?
|
||||
if (!active_requests.count(m)) {
|
||||
vector<CDentry*> trace;
|
||||
if (!request_start(m, dir->inode, trace))
|
||||
return; // waiting for pin
|
||||
}
|
||||
|
||||
// try to xlock!
|
||||
if (!dentry_xlock_start(dn, m, dir->inode, true))
|
||||
return; // waiting for xlock
|
||||
|
||||
{
|
||||
// ACK
|
||||
MLock *reply = new MLock(LOCK_AC_REQXLOCKACK, mds->get_nodeid());
|
||||
reply->set_dn(dir->ino(), dname);
|
||||
mds->messenger->send_message(reply,
|
||||
MSG_ADDR_MDS(m->get_asker()), MDS_PORT_CACHE,
|
||||
MDS_PORT_CACHE);
|
||||
}
|
||||
break;
|
||||
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
@ -3658,6 +3700,9 @@ void MDCache::handle_lock_dn(MLock *m)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// ino proxy
|
||||
|
||||
int MDCache::ino_proxy_auth(inodeno_t ino,
|
||||
|
@ -55,7 +55,7 @@ typedef hash_map<inodeno_t, CInode*> inode_map_t;
|
||||
typedef const char* pchar;
|
||||
|
||||
typedef struct {
|
||||
CInode *ref; // reference inode
|
||||
CInode *ref; // reference inode
|
||||
map< CDentry*, vector<CDentry*> > traces; // path pins held
|
||||
} active_request_t;
|
||||
|
||||
@ -98,8 +98,8 @@ class MDCache {
|
||||
|
||||
// active MDS requests
|
||||
public:
|
||||
map<MClientRequest*, active_request_t> active_requests;
|
||||
map<Message*, set<CDentry*> > active_request_xlocks;
|
||||
map<Message*, active_request_t> active_requests;
|
||||
map<Message*, set<CDentry*> > active_request_xlocks;
|
||||
|
||||
|
||||
friend class MDBalancer;
|
||||
@ -178,12 +178,12 @@ class MDCache {
|
||||
void path_unpin(vector<CDentry*>& trace, Message *m);
|
||||
void make_trace(vector<CDentry*>& trace, CInode *in);
|
||||
|
||||
bool request_start(MClientRequest *req,
|
||||
bool request_start(Message *req,
|
||||
CInode *ref,
|
||||
vector<CDentry*>& trace);
|
||||
void request_cleanup(MClientRequest *req);
|
||||
void request_finish(MClientRequest *req);
|
||||
void request_forward(MClientRequest *req, int mds);
|
||||
void request_cleanup(Message *req);
|
||||
void request_finish(Message *req);
|
||||
void request_forward(Message *req, int mds);
|
||||
|
||||
|
||||
// == messages ==
|
||||
@ -335,7 +335,7 @@ class MDCache {
|
||||
|
||||
// dentry
|
||||
bool dentry_xlock_start(CDentry *dn,
|
||||
MClientRequest *m, CInode *ref,
|
||||
Message *m, CInode *ref,
|
||||
bool allnodes=false);
|
||||
void dentry_xlock_finish(CDentry *dn, bool quiet=false);
|
||||
void handle_lock_dn(MLock *m);
|
||||
|
@ -29,8 +29,10 @@
|
||||
#include "messages/MClientRequest.h"
|
||||
#include "messages/MClientReply.h"
|
||||
|
||||
#include "messages/MLock.h"
|
||||
#include "messages/MInodeWriterClosed.h"
|
||||
|
||||
|
||||
#include "events/EInodeUpdate.h"
|
||||
|
||||
#include <errno.h>
|
||||
@ -457,8 +459,28 @@ void MDS::handle_client_request(MClientRequest *req)
|
||||
|
||||
|
||||
|
||||
void MDS::dispatch_request(MClientRequest *req, CInode *ref)
|
||||
void MDS::dispatch_request(Message *m, CInode *ref)
|
||||
{
|
||||
MClientRequest *req = 0;
|
||||
|
||||
// MLock or MClientRequest?
|
||||
/* this is a little weird.
|
||||
client requests and mlocks both initial dentry xlocks, path pins, etc.,
|
||||
and thus both make use of the context C_MDS_RetryRequest.
|
||||
*/
|
||||
switch (m->get_type()) {
|
||||
case MSG_CLIENT_REQUEST:
|
||||
req = (MClientRequest*)m;
|
||||
break;
|
||||
case MSG_MDS_LOCK:
|
||||
mdcache->handle_lock_dn((MLock*)m);
|
||||
break;
|
||||
default:
|
||||
assert(0); // shouldn't get here
|
||||
}
|
||||
|
||||
// MClientRequest.
|
||||
|
||||
// bump popularity
|
||||
balancer->hit_inode(ref, MDS_POP_ANY);
|
||||
|
||||
@ -1468,42 +1490,22 @@ void MDS::handle_client_rename_2(MClientRequest *req,
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
|
||||
locking
|
||||
|
||||
if (!locked && flag=renaming)
|
||||
(maybe) if (!locked && flag=renamingto)
|
||||
|
||||
|
||||
|
||||
basic protocol with replicas:
|
||||
|
||||
> Lock (possibly x2?)
|
||||
< LockAck (possibly x2?)
|
||||
> Rename
|
||||
src ino
|
||||
dst dir
|
||||
either dst ino (is unlinked)
|
||||
or dst name
|
||||
< RenameAck
|
||||
(implicitly unlocks, unlinks, etc.)
|
||||
|
||||
*/
|
||||
|
||||
class C_MDS_RenameFinish : public Context{
|
||||
MDS *mds;
|
||||
MClientRequest *req;
|
||||
CInode *renamedi;
|
||||
public:
|
||||
C_MDS_RenameFinish(MDS *mds, MClientRequest *req) {
|
||||
C_MDS_RenameFinish(MDS *mds, MClientRequest *req, CInode *renamedi) {
|
||||
this->mds = mds;
|
||||
this->req = req;
|
||||
this->renamedi = renamedi;
|
||||
}
|
||||
virtual void finish(int r) {
|
||||
MClientReply *reply = new MClientReply(req, r);
|
||||
|
||||
// include trace?
|
||||
// XXX FIXME?
|
||||
// include trace of renamed inode (so client can update their cache structure)
|
||||
reply->set_trace_dist( renamedi, mds->get_nodeid() );
|
||||
|
||||
mds->messenger->send_message(reply,
|
||||
MSG_ADDR_CLIENT(req->get_client()), 0,
|
||||
@ -1512,8 +1514,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
void MDS::handle_client_rename_local(MClientRequest *req,
|
||||
CInode *ref,
|
||||
string& srcpath,
|
||||
@ -1528,10 +1528,11 @@ void MDS::handle_client_rename_local(MClientRequest *req,
|
||||
if (true || srcdn->inode->is_dir()) {
|
||||
/* overkill warning: lock w/ everyone for simplicity.
|
||||
i could limit this to cases where something beneath me is exported.
|
||||
could possibly limit the list.
|
||||
main constrained is that, regardless of the order i do the xlocks, and whatever
|
||||
could possibly limit the list. (maybe.)
|
||||
Underlying constraint is that, regardless of the order i do the xlocks, and whatever
|
||||
imports/exports might happen in the process, the destdir _must_ exist on any node
|
||||
importing something beneath me when rename finishes.
|
||||
importing something beneath me when rename finishes, or else mayhem ensues when
|
||||
their import is dangling in the cache.
|
||||
*/
|
||||
dout(7) << "handle_client_rename_local: overkill? doing xlocks with _all_ nodes" << endl;
|
||||
everybody = true;
|
||||
@ -1571,10 +1572,9 @@ void MDS::handle_client_rename_local(MClientRequest *req,
|
||||
dosrc = !dosrc;
|
||||
}
|
||||
|
||||
// we're a go.
|
||||
|
||||
// we're golden (everything is xlocked by use, we rule, etc.)
|
||||
mdcache->file_rename( srcdn, destdn,
|
||||
new C_MDS_RenameFinish(this, req),
|
||||
new C_MDS_RenameFinish(this, req, srcdn->inode),
|
||||
everybody );
|
||||
}
|
||||
|
||||
|
@ -158,7 +158,7 @@ class MDS : public Dispatcher {
|
||||
void handle_client_fstat(MClientRequest *req);
|
||||
|
||||
// requests
|
||||
void dispatch_request(MClientRequest *req, CInode *ref);
|
||||
void dispatch_request(Message *m, CInode *ref);
|
||||
|
||||
// inode request *req, CInode *ref;
|
||||
void handle_client_stat(MClientRequest *req, CInode *ref);
|
||||
@ -237,10 +237,10 @@ class MDS : public Dispatcher {
|
||||
|
||||
class C_MDS_RetryRequest : public Context {
|
||||
MDS *mds;
|
||||
MClientRequest *req;
|
||||
Message *req; // MClientRequest or MLock
|
||||
CInode *ref;
|
||||
public:
|
||||
C_MDS_RetryRequest(MDS *mds, MClientRequest *req, CInode *ref) {
|
||||
C_MDS_RetryRequest(MDS *mds, Message *req, CInode *ref) {
|
||||
assert(ref);
|
||||
this->mds = mds;
|
||||
this->req = req;
|
||||
|
@ -53,13 +53,17 @@ class MClientRequest : public Message {
|
||||
public:
|
||||
MClientRequest() {}
|
||||
MClientRequest(long tid, int op, int client) : Message(MSG_CLIENT_REQUEST) {
|
||||
this->st.tid = tid;
|
||||
set_tid(tid);
|
||||
this->st.op = op;
|
||||
this->st.client = client;
|
||||
this->st.iarg = 0;
|
||||
}
|
||||
virtual char *get_type_name() { return "creq"; }
|
||||
|
||||
void set_tid(long tid) {
|
||||
this->st.tid = tid;
|
||||
}
|
||||
|
||||
void set_path(string& p) { path.set_path(p); }
|
||||
void set_path(const char *p) { path.set_path(p); }
|
||||
void set_caller_uid(int u) { st.caller_uid = u; }
|
||||
|
@ -39,6 +39,10 @@
|
||||
#define LOCK_AC_REQREAD 17
|
||||
#define LOCK_AC_REQWRITE 18
|
||||
|
||||
#define LOCK_AC_REQXLOCK 20
|
||||
#define LOCK_AC_REQXLOCKACK 21
|
||||
#define LOCK_AC_REQXLOCKNAK 21
|
||||
|
||||
#define lock_ac_name(x)
|
||||
|
||||
|
||||
|
@ -1,47 +1,87 @@
|
||||
|
||||
#include "CheesySerializer.h"
|
||||
#include "Message.h"
|
||||
#include "Messenger.h"
|
||||
|
||||
#include <iostream>
|
||||
using namespace std;
|
||||
|
||||
|
||||
// ---------
|
||||
// incoming messages
|
||||
|
||||
void CheesySerializer::dispatch(Message *m)
|
||||
{
|
||||
// i better be expecting it
|
||||
assert(waiting_for_reply);
|
||||
long tid = m->get_tid();
|
||||
|
||||
cout << "serializer: dispatch got " << reply << ", waking up waiter" << endl;
|
||||
reply = m;
|
||||
waiter.Post();
|
||||
lock.Lock();
|
||||
|
||||
// was i expecting it?
|
||||
if (call_sem.count(tid)) {
|
||||
// yes, this is a reply to a pending call.
|
||||
cout << "serializer: dispatch got reply for " << tid << " " << m << endl;
|
||||
call_reply[tid] = m; // set reply
|
||||
call_sem[tid]->Post();
|
||||
lock.Unlock();
|
||||
} else {
|
||||
// no, this is an unsolicited message.
|
||||
lock.Unlock();
|
||||
cout << "serializer: dispatch got unsolicited message" << m << endl;
|
||||
dispatcher->dispatch(m);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ---------
|
||||
// outgoing messages
|
||||
|
||||
void CheesySerializer::send(Message *m, msg_addr_t dest, int port, int fromport)
|
||||
{
|
||||
// just pass it on to the messenger
|
||||
cout << "serializer: send " << m << endl;
|
||||
messenger->send_message(m, dest, port, fromport);
|
||||
}
|
||||
|
||||
Message *CheesySerializer::sendrecv(Message *m, msg_addr_t dest, int port, int fromport)
|
||||
{
|
||||
cout << "serializer: sendrecv " << m << endl;
|
||||
Semaphore *sem = new Semaphore();
|
||||
|
||||
// make up a transaction number that is unique (to me!)
|
||||
/* NOTE: since request+replies are matched up on tid's alone, it means that
|
||||
two nodes using this mechanism can't do calls of each other or else their
|
||||
tid's might overlap.
|
||||
This should be fine.. only the Client uses this so far!
|
||||
*/
|
||||
long tid = ++last_tid;
|
||||
m->set_tid(tid);
|
||||
|
||||
cout << "serializer: sendrecv sending " << m << " on tid " << tid << endl;
|
||||
|
||||
// add call records
|
||||
lock.Lock();
|
||||
assert(call_sem.count(tid) == 0); // tid should be UNIQUE
|
||||
call_sem[tid] = sem;
|
||||
call_reply[tid] = 0; // no reply yet
|
||||
lock.Unlock();
|
||||
|
||||
// send
|
||||
messenger->send_message(m, dest, port, fromport);
|
||||
waiting_for_reply = true;
|
||||
cout << "serializer: sendrecv waiting " << endl;
|
||||
waiter.Wait();
|
||||
cout << "serializer: sendrecv got " << reply << endl;
|
||||
|
||||
// wait
|
||||
cout << "serializer: sendrecv waiting for reply on tid " << tid << endl;
|
||||
sem->Wait();
|
||||
|
||||
// pick up reply
|
||||
lock.Lock();
|
||||
Message *reply = call_reply[tid];
|
||||
assert(reply);
|
||||
call_reply.erase(tid); // remove from call map
|
||||
call_sem.erase(tid);
|
||||
lock.Unlock();
|
||||
|
||||
delete sem;
|
||||
|
||||
cout << "serializer: sendrecv got reply " << reply << " on tid " << tid << endl;
|
||||
return reply;
|
||||
}
|
||||
|
||||
|
||||
// thread crap
|
||||
void *cheesyserializer_starter(void *pthis)
|
||||
{
|
||||
CheesySerializer *pt = (CheesySerializer*)pthis;
|
||||
pt->message_thread();
|
||||
}
|
||||
|
||||
void CheesySerializer::message_thread()
|
||||
{
|
||||
|
||||
}
|
||||
|
@ -1,36 +1,44 @@
|
||||
#ifndef __CHEESY_MESSENGER_H
|
||||
#define __CHEESY_MESSENGER_H
|
||||
|
||||
#include "Dispatcher.h"
|
||||
#include "Message.h"
|
||||
|
||||
#include "SerialMessenger.h"
|
||||
#include "Messenger.h"
|
||||
#include "Dispatcher.h"
|
||||
|
||||
#include "Semaphore.h"
|
||||
#include "Mutex.h"
|
||||
|
||||
#include <map>
|
||||
using namespace std;
|
||||
|
||||
class CheesySerializer : public SerialMessenger {
|
||||
protected:
|
||||
long last_tid;
|
||||
|
||||
// my state, whatever
|
||||
Messenger *messenger; // this is how i communicate
|
||||
Dispatcher *dispatcher; // this is where i send unsolicited messages
|
||||
|
||||
bool waiting_for_reply;
|
||||
Message *reply;
|
||||
Semaphore waiter;
|
||||
|
||||
Mutex lock; // protect call_sem, call_reply
|
||||
map<long, Semaphore*> call_sem;
|
||||
map<long, Message*> call_reply;
|
||||
|
||||
public:
|
||||
CheesySerializer(Messenger *msg, Dispatcher *me) {
|
||||
this->messenger = msg;
|
||||
this->dispatcher = me;
|
||||
waiting_for_reply = false;
|
||||
reply = 0;
|
||||
last_tid = 1;
|
||||
}
|
||||
|
||||
// i receive my messages here
|
||||
// incoming messages
|
||||
void dispatch(Message *m);
|
||||
|
||||
// my stuff
|
||||
void start(); // start my thread
|
||||
void message_thread();
|
||||
|
||||
void send(Message *m, msg_addr_t dest, int port=0, int fromport=0); // doesn't block
|
||||
Message *sendrecv(Message *m, msg_addr_t dest, int port=0, int fromport=0); // blocks for matching reply
|
||||
// outgoing messages
|
||||
void send(Message *m, msg_addr_t dest,
|
||||
int port=0, int fromport=0); // doesn't block
|
||||
Message *sendrecv(Message *m, msg_addr_t dest,
|
||||
int port=0, int fromport=0); // blocks for matching reply
|
||||
};
|
||||
|
||||
#endif
|
||||
|
Binary file not shown.
@ -6,8 +6,10 @@
|
||||
|
||||
#include <iostream>
|
||||
#include <cassert>
|
||||
#include <ext/hash_map>
|
||||
using namespace std;
|
||||
#include <ext/hash_map>
|
||||
using namespace __gnu_cxx;
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include "mpi.h"
|
||||
|
@ -124,6 +124,12 @@ class Message {
|
||||
}
|
||||
virtual ~Message() {}
|
||||
|
||||
|
||||
// for rpc-type procedural messages
|
||||
virtual long get_tid() { return 0; }
|
||||
virtual void set_tid(long t) { assert(0); } // overload me
|
||||
|
||||
|
||||
// ENVELOPE ----
|
||||
|
||||
// type
|
||||
@ -174,4 +180,6 @@ class Message {
|
||||
};
|
||||
|
||||
|
||||
ostream& operator<<(ostream& out, Message& m);
|
||||
|
||||
#endif
|
||||
|
@ -48,6 +48,19 @@ using namespace std;
|
||||
|
||||
#include "messages/MLock.h"
|
||||
|
||||
|
||||
ostream& operator<<(ostream& out, Message& m)
|
||||
{
|
||||
// some messages define << themselves
|
||||
if (m.get_type() == MSG_CLIENT_REQUEST) return out << *((MClientRequest*)&m);
|
||||
|
||||
// generic
|
||||
return out << "message(type=" << m.get_type() << ")";
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
Message *
|
||||
decode_message(crope& ser)
|
||||
{
|
||||
|
@ -23,6 +23,8 @@ class Messenger {
|
||||
remove_dispatcher();
|
||||
}
|
||||
|
||||
// adminstrative
|
||||
virtual int init(Dispatcher *d) = 0;
|
||||
void set_dispatcher(Dispatcher *d) {
|
||||
dispatcher = d;
|
||||
}
|
||||
@ -31,14 +33,15 @@ class Messenger {
|
||||
dispatcher = 0;
|
||||
return t;
|
||||
}
|
||||
|
||||
// ...
|
||||
virtual int init(Dispatcher *d) = 0;
|
||||
virtual int shutdown() = 0;
|
||||
virtual void done() {}
|
||||
|
||||
// message interface
|
||||
virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0;
|
||||
virtual int wait_message(time_t seconds) = 0;
|
||||
virtual void done() {}
|
||||
|
||||
|
||||
// misc crap
|
||||
/*
|
||||
virtual int loop() {
|
||||
while (1) {
|
||||
if (wait_message(0) < 0)
|
||||
@ -49,9 +52,9 @@ class Messenger {
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
// queue
|
||||
// incoming queue
|
||||
Message *get_message() {
|
||||
if (incoming.size() > 0) {
|
||||
cout << incoming.size() << " messages, taking first" << endl;
|
||||
@ -61,7 +64,6 @@ class Messenger {
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
bool queue_incoming(Message *m) {
|
||||
incoming.push_back(m);
|
||||
return true;
|
||||
@ -69,15 +71,10 @@ class Messenger {
|
||||
int num_incoming() {
|
||||
return incoming.size();
|
||||
}
|
||||
|
||||
|
||||
void dispatch(Message *m) {
|
||||
dispatcher->dispatch(m);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
@ -3,11 +3,12 @@
|
||||
#include <sys/stat.h>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
using namespace std;
|
||||
|
||||
#include "mds/MDCluster.h"
|
||||
#include "mds/MDS.h"
|
||||
#include "osd/OSD.h"
|
||||
#include "client/Client.h"
|
||||
#include "fakeclient/FakeClient.h"
|
||||
|
||||
#include "mds/MDCache.h"
|
||||
#include "mds/MDStore.h"
|
||||
@ -16,7 +17,6 @@
|
||||
|
||||
#include "messages/MPing.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
__uint64_t ino = 1;
|
||||
|
||||
@ -55,10 +55,10 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
|
||||
// create clients
|
||||
Client *client[NUMCLIENT];
|
||||
FakeClient *client[NUMCLIENT];
|
||||
for (int i=0; i<NUMCLIENT; i++) {
|
||||
if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
|
||||
client[i] = new Client(mdc, i, new MPIMessenger(MSG_ADDR_CLIENT(i)), g_conf.client_requests);
|
||||
client[i] = new FakeClient(mdc, i, new MPIMessenger(MSG_ADDR_CLIENT(i)), g_conf.client_requests);
|
||||
client[i]->init();
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user