*** empty log message ***

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@170 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sage 2005-04-27 21:15:33 +00:00
parent 26023cdc1a
commit 08b2cd0d30
15 changed files with 360 additions and 43 deletions

View File

@ -3,14 +3,14 @@ CC=g++
#CC=distcc g++
CFLAGS=-g -I. -pg
#CFLAGS=-D __gnu_cxx=std -g -I. #-I/usr/lib/mpi/include -L/usr/lib/mpi/lib
LIBS=
LIBS=-lpthread
MPICC=g++
MPICFLAGS=${CFLAGS} -I/usr/lib/mpi/include -L/usr/lib/mpi/mpi_gnu/lib
MPILIBS= -lelan -lmpi ${LIBS}
#LEAKTRACER=
LEAKTRACER=$(HOME)/lib/LeakTracer.o
LEAKTRACER=
#LEAKTRACER=$(HOME)/lib/LeakTracer.o
SRCS=*.cc */*.cc
OBJS=osd/OSD.o\
@ -46,6 +46,9 @@ import: pmds.o msg/FakeMessenger.o import.cc
pmds: mpitest.cc msg/MPIMessenger.cc pmds.o
${MPICC} ${MPICFLAGS} ${MPILIBS} mpitest.cc msg/MPIMessenger.cc pmds.o -o pmds
singleclient: pmds.o fakesingleclient.o client/Client.o msg/CheesySerializer.o msg/FakeMessenger.o
${CC} ${CFLAGS} ${LIBS} pmds.o client/Client.o msg/FakeMessenger.o msg/CheesySerializer.o fakesingleclient.o ${LEAKTRACER} -o singleclient
clean:
rm *.o */*.o ${TARGETS}

View File

@ -4,6 +4,14 @@ me, soon:
- test marshalling
- symlinks
- fix logging model for data safety
- inodeupdate
- dentryunlink
- dentrylink
- put path to item in log entry
/- export null dentries

View File

@ -1,9 +1,18 @@
// ceph stuff
#include "Client.h"
#include "msg/CheesySerializer.h"
#include "messages/MClientRequest.h"
#include "messages/MClientReply.h"
// unix-ey fs stuff
#include <sys/types.h>
#include <utime.h>
// cons/des
Client::Client(MDCluster *mdc, int id, Messenger *m)
@ -11,6 +20,8 @@ Client::Client(MDCluster *mdc, int id, Messenger *m)
mdcluster = mdc;
whoami = id;
messenger = m;
serial_messenger = new CheesySerializer(m, this);
all_files_closed = false;
tid = 0;
root = 0;
@ -22,6 +33,7 @@ Client::Client(MDCluster *mdc, int id, Messenger *m)
Client::~Client()
{
if (messenger) { delete messenger; messenger = 0; }
if (serial_messenger) { delete serial_messenger; serial_messenger = 0; }
}
@ -36,33 +48,31 @@ int Client::lstat(const char *path, struct stat *stbuf)
{
MClientRequest *req = new MClientRequest(tid++, MDS_OP_STAT, whoami);
MClientReply *reply;
inode_t inode;
vector<c_inode_info*> *trace;
req->set_path(string(path)); //FIXME correct construction of string?
req->set_path(path);
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
reply = messenger->sendrecv(req);
reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0));
res = reply->get_result();
int res = reply->get_result();
if (res != 0) return res;
//Transfer information from reply to stbuf
trace = reply->get_trace();
inode = trace[trace.size()-1]->inode;
vector<c_inode_info*> trace = reply->get_trace();
inode_t inode = trace[trace.size()-1]->inode;
//stbuf->st_dev =
stbuf->st_ino = inode->ino;
stbuf->st_mode = inode->mode;
stbuf->st_ino = inode.ino;
stbuf->st_mode = inode.mode;
//stbuf->st_nlink =
stbuf->st_uid = inode->uid;
stbuf->st_gid = inode->gid;
stbuf->st_ctime = inode->ctime;
stbuf->st_atime = inode->atime;
stbuf->st_mtime = inode->mtime;
stbuf->st_size = (off_t) inode->size; //FIXME off_t is signed 64 vs size is unsigned 64
stbuf->st_uid = inode.uid;
stbuf->st_gid = inode.gid;
stbuf->st_ctime = inode.ctime;
stbuf->st_atime = inode.atime;
stbuf->st_mtime = inode.mtime;
stbuf->st_size = (off_t) inode.size; //FIXME off_t is signed 64 vs size is unsigned 64
//stbuf->st_blocks =
//stbuf->st_blksize =
//stbuf->st_flags =
@ -77,15 +87,15 @@ int Client::chmod(const char *path, mode_t mode)
MClientRequest *req = new MClientRequest(tid++, MDS_OP_CHMOD, whoami);
MClientReply *reply;
req->set_path(string(path)); //FIXME correct construction of string?
req->set_path(path); //FIXME correct construction of string?
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
req->set_iarg = (int) mode;
req->set_iarg( (int)mode );
reply = messenger->sendrecv(req);
reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0));
return reply->get_result();
}
@ -94,7 +104,7 @@ int Client::chown(const char *path, uid_t uid, gid_t gid)
MClientRequest *req = new MClientRequest(tid++, MDS_OP_CHOWN, whoami);
MClientReply *reply;
req->set_path(string(path)); //FIXME correct construction of string?
req->set_path(path); //FIXME correct construction of string?
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
@ -102,10 +112,10 @@ int Client::chown(const char *path, uid_t uid, gid_t gid)
//FIXME enforce caller uid rights?
req->set_iarg = (int) uid;
req->set_iarg2 = (int) gid;
req->set_iarg( (int)uid );
req->set_iarg2( (int)gid );
reply = messenger->sendrecv(req);
reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0));
return reply->get_result();
}
@ -114,7 +124,7 @@ int Client::utime(const char *path, struct utimbuf *buf)
MClientRequest *req = new MClientRequest(tid++, MDS_OP_UTIME, whoami);
MClientReply *reply;
req->set_path(string(path)); //FIXME correct construction of string?
req->set_path(path); //FIXME correct construction of string?
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
@ -122,10 +132,10 @@ int Client::utime(const char *path, struct utimbuf *buf)
//FIXME enforce caller uid rights?
req->set_targ = utimbuf->modtime;
req->set_targ2 = utimbuf->actime;
req->set_targ( buf->modtime );
req->set_targ2( buf->actime );
reply = messenger->sendrecv(req);
reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0));
return reply->get_result();
}
@ -136,5 +146,7 @@ int Client::utime(const char *path, struct utimbuf *buf)
//
// getdir
/*
typedef int (*fuse_dirfil_t) (fuse_dirh_t h, const char *name, int type,
ino_t ino);
*/

View File

@ -1,11 +1,18 @@
#ifndef __CLIENT_H
#define __CLIENT_H
#include "mds/MDCluster.h"
#include "msg/Message.h"
#include "msgthread.h"
#include "msg/Dispatcher.h"
#include "msg/SerialMessenger.h"
//#include "msgthread.h"
#include "include/types.h"
#include "include/lru.h"
// stl
#include <set>
#include <map>
using namespace std;
@ -27,22 +34,24 @@ using namespace __gnu_cxx;
*/
class Dentry;
class Inode;
class Dir {
public:
Inode *parent_inode; // my inode
hash_map< string, Dentry* > dentries;
Dir(Inode* in) { inode = in; }
Dir(Inode* in) { parent_inode = in; }
};
class Inode {
public:
inodeno_t inode;
inode_t inode; // the actual inode
time_t last_updated;
int ref; // ref count. 1 for each dentry, fh or dir that links to me
Dir *dir; // if i'm a dir.
Dentry *dn; // if i'm linked to a dentry.
void get() { ref++; }
void put() { ref--; assert(ref >= 0); }
@ -76,10 +85,11 @@ struct Fh {
// ========================================================
// client interface
class Client {
class Client : public Dispatcher {
protected:
MDCluster *mdcluster;
Messenger *messenger;
SerialMessenger *serial_messenger;
long tid;
int whoami;
bool all_files_closed;
@ -90,7 +100,7 @@ class Client {
LRU lru; // lru list of Dentry's in our local metadata cache.
// file handles
map<fh_t, Fh*> fh_map;
map<fileh_t, Fh*> fh_map;
// global semaphore/mutex protecting cache+fh structures
@ -128,12 +138,18 @@ class Client {
// link to inode
dn->inode = in;
in->dn = dn;
in->get();
lru.lru_insert_mid(dn); // mid or top?
return dn;
}
void unlink(Dentry *dn) {
Inode *in = dn->inode;
// unlink from inode
dn->inode = 0;
in->dn = 0;
in->put();
// unlink from dir
@ -146,6 +162,9 @@ class Client {
delete dn->dir;
}
dn->dir = 0;
// delete den
lru.lru_remove(dn);
delete dn;
}
@ -166,6 +185,12 @@ class Client {
Client(MDCluster *mdc, int id, Messenger *m);
~Client();
// messaging
void dispatch(Message *m) {
cout << "dispatch not implemented" << endl;
}
// ----------------------
// fs ops.
// these shoud (more or less) mirror the actual system calls.
@ -173,7 +198,7 @@ class Client {
// namespace ops
//?int getdir(const char *path, fuse_dirh_t h, fuse_dirfil_t filler);
int link(const char *existing, const char *new);
int link(const char *existing, const char *newname);
int unlink(const char *path);
int rename(const char *from, const char *to);
@ -183,7 +208,7 @@ class Client {
// symlinks
int readlink(const char *path, char *buf, size_t size);
int symlink(const char *existing, const char *new);
int symlink(const char *existing, const char *newname);
// inode stuff
int lstat(const char *path, struct stat *stbuf);
@ -194,10 +219,10 @@ class Client {
// file ops
int mknod(const char *path, mode_t mode);
int open(const char *path, int mode);
int read(fh_t fh, char *buf, size_t size, off_t offset);
int write(fh_t fh, const char *buf, size_t size, off_t offset);
int truncate(fh_t fh, off_t size);
int fsync(fh_t fh);
int read(fileh_t fh, char *buf, size_t size, off_t offset);
int write(fileh_t fh, const char *buf, size_t size, off_t offset);
int truncate(fileh_t fh, off_t size);
int fsync(fileh_t fh);
};

45
ceph/common/Mutex.h Executable file
View File

@ -0,0 +1,45 @@
/////////////////////////////////////////////////////////////////////
// Written by Phillip Sitbon
// Copyright 2003
//
// Posix/Mutex.h
// - Resource locking mechanism using Posix mutexes
//
/////////////////////////////////////////////////////////////////////
#ifndef _Mutex_Posix_
#define _Mutex_Posix_
#include <pthread.h>
class Mutex
{
mutable pthread_mutex_t M;
void operator=(Mutex &M) {}
Mutex( const Mutex &M ) {}
public:
Mutex()
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&M,&attr);
pthread_mutexattr_destroy(&attr);
}
virtual ~Mutex()
{ pthread_mutex_unlock(&M); pthread_mutex_destroy(&M); }
int Lock() const
{ return pthread_mutex_lock(&M); }
int Lock_Try() const
{ return pthread_mutex_trylock(&M); }
int Unlock() const
{ return pthread_mutex_unlock(&M); }
};
#endif // !_Mutex_Posix_

View File

@ -49,7 +49,7 @@ md_config_t g_conf = {
client_op_statfs: false,
client_op_stat: 10,
client_op_stat: 100,
client_op_lstat: false,
client_op_utime: 10, // untested
client_op_chmod: 10,

View File

@ -61,6 +61,7 @@ class MClientRequest : public Message {
virtual char *get_type_name() { return "creq"; }
void set_path(string& p) { path.set_path(p); }
void set_path(const char *p) { path.set_path(p); }
void set_caller_uid(int u) { st.caller_uid = u; }
void set_caller_gid(int g) { st.caller_uid = g; }
void set_ino(inodeno_t ino) { st.ino = ino; }

View File

@ -0,0 +1,43 @@
#include "CheesySerializer.h"
#include "Message.h"
#include <iostream>
using namespace std;
void CheesySerializer::dispatch(Message *m)
{
// i better be expecting it
assert(waiting_for_reply);
cout << "dispatch got " << reply << ", waking up waiter" << endl;
reply = m;
waiter.Post();
}
void CheesySerializer::send(Message *m, msg_addr_t dest, int port, int fromport)
{
messenger->send_message(m, dest, port, fromport);
}
Message *CheesySerializer::sendrecv(Message *m, msg_addr_t dest, int port, int fromport)
{
messenger->send_message(m, dest, port, fromport);
waiting_for_reply = true;
waiter.Wait();
return reply;
}
// thread crap
void *cheesyserializer_starter(void *pthis)
{
CheesySerializer *pt = (CheesySerializer*)pthis;
pt->message_thread();
}
void CheesySerializer::message_thread()
{
}

View File

@ -0,0 +1,36 @@
#include "SerialMessenger.h"
#include "Messenger.h"
#include "Dispatcher.h"
#include "Semaphore.h"
class CheesySerializer : public SerialMessenger {
// my state, whatever
Messenger *messenger; // this is how i communicate
Dispatcher *dispatcher; // this is where i send unsolicited messages
bool waiting_for_reply;
Message *reply;
Semaphore waiter;
public:
CheesySerializer(Messenger *msg, Dispatcher *me) {
this->messenger = msg;
this->dispatcher = me;
waiting_for_reply = false;
reply = 0;
}
// i receive my messages here
void dispatch(Message *m);
// my stuff
void start(); // start my thread
void message_thread();
void send(Message *m, msg_addr_t dest, int port=0, int fromport=0); // doesn't block
Message *sendrecv(Message *m, msg_addr_t dest, int port=0, int fromport=0); // blocks for matching reply
};

BIN
ceph/msg/CheesySerializer.o Normal file

Binary file not shown.

View File

@ -19,6 +19,9 @@
using namespace std;
#include "Semaphore.h"
#include "Mutex.h"
#include <pthread.h>
#include "include/config.h"
@ -29,11 +32,49 @@ map<int, FakeMessenger*> directory;
hash_map<int, Logger*> loggers;
LogType fakemsg_logtype;
Semaphore sem;
Semaphore shutdownsem;
bool awake = false;
bool shutdown = false;
pthread_t thread_id;
void *fakemessenger_thread(void *ptr)
{
dout(1) << "thread start" << endl;
while (1) {
awake = false;
sem.Wait();
if (shutdown) break;
fakemessenger_do_loop();
}
dout(1) << "thread finish (i woke up but no messages, bye)" << endl;
shutdownsem.Post();
}
void fakemessenger_startthread() {
pthread_create(&thread_id, NULL, fakemessenger_thread, 0);
}
void fakemessenger_stopthread() {
shutdown = true;
sem.Post();
shutdownsem.Wait();
}
// lame main looper
int fakemessenger_do_loop()
{
dout(1) << "do_loop begin." << endl;
while (1) {
bool didone = false;
@ -77,6 +118,7 @@ int fakemessenger_do_loop()
break;
}
dout(1) << "do_loop end (no more messages)." << endl;
return 0;
}
@ -157,6 +199,13 @@ int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromp
cout << "no destination " << dest << endl;
assert(0);
}
// wake up loop?
if (!awake) {
dout(1) << "waking up fakemessenger" << endl;
awake = true;
sem.Post();
}
}
int FakeMessenger::wait_message(time_t seconds)

View File

@ -13,7 +13,7 @@ class FakeMessenger : public Messenger {
int whoami;
class Logger *logger;
public:
FakeMessenger(long me);
~FakeMessenger();
@ -27,7 +27,10 @@ class FakeMessenger : public Messenger {
};
int fakemessenger_do_loop();
void fakemessenger_startthread();
void fakemessenger_stopthread();
#endif

35
ceph/msg/RWLock.h Normal file
View File

@ -0,0 +1,35 @@
#ifndef _RWLock_Posix_
#define _RWLock_Posix_
#include <pthread.h>
class RWLock
{
mutable pthread_rwlock_t L;
public:
RWLock() {
pthread_rwlock_init(&L, NULL);
}
virtual ~RWLock() {
pthread_rwlock_unlock(&L);
pthread_rwlock_destroy(&L);
}
void unlock() {
pthread_rwlock_unlock(&L);
}
void get_read() {
pthread_rwlock_rdlock(&L);
}
void put_read() { unlock(); }
void get_write() {
pthread_rwlock_wrlock(&L);
}
void put_write() { unlock(); }
};
#endif // !_Mutex_Posix_

43
ceph/msg/Semaphore.h Executable file
View File

@ -0,0 +1,43 @@
/////////////////////////////////////////////////////////////////////
// Written by Phillip Sitbon
// Copyright 2003
//
// Posix/Semaphore.h
// - Resource counting mechanism
//
/////////////////////////////////////////////////////////////////////
#ifndef _Semaphore_Posix_
#define _Semaphore_Posix_
#include <semaphore.h>
#include <errno.h>
class Semaphore
{
sem_t S;
public:
Semaphore( int init = 0 )
{ sem_init(&S,0,init); }
virtual ~Semaphore()
{ sem_destroy(&S); }
void Wait() const
{ sem_wait((sem_t *)&S); }
int Wait_Try() const
{ return (sem_trywait((sem_t *)&S)?errno:0); }
int Post() const
{ return (sem_post((sem_t *)&S)?errno:0); }
int Value() const
{ int V = -1; sem_getvalue((sem_t *)&S,&V); return V; }
void Reset( int init = 0 )
{ sem_destroy(&S); sem_init(&S,0,init); }
};
#endif // !_Semaphore_Posix_

View File

@ -0,0 +1,14 @@
#ifndef __SERIAL_MESSENGER_H
#define __SERIAL_MESSENGER_H
#include "Dispatcher.h"
#include "Message.h"
class SerialMessenger : public Dispatcher {
public:
virtual void dispatch(Message *m) = 0; // i receive my messages here
virtual void send(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0; // doesn't block
virtual Message *sendrecv(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0; // blocks for matching reply
};
#endif