Modified Files:

Makefile client/Client.cc client/Client.h include/buffer.h
	include/bufferlist.h

Buffercache-related changes in client/Client.* and include/buffer*
Buffercache still buggy -- compile with -DBUFFERCACHE to enable code in client/Client.cc.


git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@399 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
carlosm 2005-07-05 19:21:26 +00:00
parent 54491e317c
commit fff39240db
5 changed files with 263 additions and 83 deletions

View File

@ -68,13 +68,13 @@ gprof-helper.so: test/gprof-helper.c
import: mds/allmds.o osd/OSD.o msg/FakeMessenger.o import.cc ${COMMON_OBJS}
${CC} ${CFLAGS} ${LIBS} $^ -o $@
singleclient: mds/allmds.o osd/OSD.o fakesingleclient.o client/Client.o msg/FakeMessenger.o fsck.o ${COMMON_OBJS}
singleclient: mds/allmds.o osd/OSD.o fakesingleclient.o client/Client.o client/Buffercache.o msg/FakeMessenger.o fsck.o ${COMMON_OBJS}
${CC} ${CFLAGS} ${LIBS} $^ -o $@
tp: osd/tp.o
${CC} ${CFLAGS} ${LIBS} $^ -o $@
fuseclient: client/Client.o client/fuse.o msg/FakeMessenger.o ${COMMON_OBJS}
fuseclient: client/Client.o client/Buffercache.o client/fuse.o msg/FakeMessenger.o ${COMMON_OBJS}
${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
fakemds: test/fakemds.cc msg/FakeMessenger.o fakeclient/FakeClient.o osd/OSD.o mds/allmds.o ${COMMON_OBJS}
@ -86,25 +86,25 @@ mpitest: test/mpitest.o msg/MPIMessenger.cc mds/allmds.o osd/OSD.o fakeclient/Fa
mttest: test/mttest.cc msg/MTMessenger.cc ${COMMON_OBJS}
${MPICC} ${CFLAGS} ${LIBS} $^ -o $@
mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc ${COMMON_OBJS}
mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc ${COMMON_OBJS}
${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/TCPMessenger.cc ${COMMON_OBJS}
tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/TCPMessenger.cc ${COMMON_OBJS}
${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/MPIMessenger.cc ${COMMON_OBJS}
mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o msg/MPIMessenger.cc ${COMMON_OBJS}
${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc ${COMMON_OBJS}
tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc ${COMMON_OBJS}
${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.cc osd/OBFSStore.o msg/TCPMessenger.cc ${COMMON_OBJS}
obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.cc osd/OBFSStore.o msg/TCPMessenger.cc ${COMMON_OBJS}
${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a
fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o ${COMMON_OBJS}
fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/Buffercache.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o ${COMMON_OBJS}
${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
fakefuse: fakefuse.cc mds/allmds.o client/Client.o osd/OSD.o client/fuse.o msg/FakeMessenger.cc ${COMMON_OBJS}
fakefuse: fakefuse.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o client/fuse.o msg/FakeMessenger.cc ${COMMON_OBJS}
${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@
testmpi: test/testmpi.cc msg/MPIMessenger.cc config.o common/Timer.o common/clock.o msg/Messenger.o msg/Dispatcher.o msg/error.o

View File

@ -21,7 +21,7 @@
#include "include/config.h"
#undef dout
#define dout(l) if (l<=g_conf.debug) cout << "client" << whoami << "." << pthread_self() << " "
#define dout(l) if (l<=g_conf.debug) cout << "client" << "." << pthread_self() << " "
@ -286,9 +286,6 @@ Dentry *Client::lookup(filepath& path)
return dn;
}
// -------
MClientReply *Client::make_request(MClientRequest *req)
@ -360,37 +357,111 @@ void Client::dispatch(Message *m)
client_lock.Unlock();
}
/*
* flush inode (write cached) buffers to disk
*/
class C_Client_FileFlushFinish : public Context {
public:
Filecache *fc;
Bufferhead *bh;
C_Client_FileFlushFinish(Filecache *fc, Bufferhead *bh) {
this->fc = fc;
this->bh = bh;
}
void finish(int r) {
bh->flush_finish();
if (fc->dirty_buffers.empty()) {
// wake up flush waiters
for (list<Cond*>::iterator it = fc->waitfor_flushed.begin();
it != fc->waitfor_flushed.end();
it++) {
(*it)->Signal();
}
fc->waitfor_flushed.clear();
}
}
};
int Client::flush_inode_buffers(Inode *in)
{
if (in->inflight_buffers.size()
/* || in->dirty_buffers.size() */) {
dout(7) << "inflight buffers, waiting" << endl;
Cond *cond = new Cond;
in->waitfor_flushed.push_back(cond);
cond->Wait(client_lock);
delete cond;
assert(in->inflight_buffers.empty());
dout(7) << "inflight buffers flushed" << endl;
if (!in->inflight_buffers.empty()) {
dout(7) << "inflight buffers of sync write, waiting" << endl;
Cond *cond = new Cond;
in->waitfor_flushed.push_back(cond);
cond->Wait(client_lock);
delete cond;
assert(in->inflight_buffers.empty());
dout(7) << "inflight buffers flushed" << endl;
#ifdef BUFFERCACHE
} else if (!bc.get_fc(in->inode.ino)->dirty_buffers.empty()) {
dout(7) << "inode " << in->inode.ino << " has dirty buffers" << endl;
Filecache *fc = bc.get_fc(in->inode.ino);
fc->simplify();
for (set<Bufferhead*>::iterator it = fc->dirty_buffers.begin();
it != fc->dirty_buffers.end();
it++) {
(*it)->flush_start();
C_Client_FileFlushFinish *onfinish = new C_Client_FileFlushFinish(fc, *it);
filer->write(in->inode.ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
}
dout(7) << "dirty buffers, waiting" << endl;
fc->wait_for_flush(client_lock);
#endif
} else {
dout(7) << "no inflight buffers" << endl;
}
}
class C_Client_FlushFinish : public Context {
public:
Buffercache *bc;
Bufferhead *bh;
C_Client_FlushFinish(Buffercache *bc, Bufferhead *bh) {
this->bc = bc;
this->bh = bh;
}
void finish(int r) {
bh->flush_finish();
if (bc->dirty_buffers.empty()) {
// wake up flush waiters
for (list<Cond*>::iterator it = bc->waitfor_flushed.begin();
it != bc->waitfor_flushed.end();
it++) {
(*it)->Signal();
}
bc->waitfor_flushed.clear();
}
}
};
int Client::flush_buffers()
{
if (!bc.dirty_buffers.empty()) {
for (set<Bufferhead*>::iterator it = bc.dirty_buffers.begin();
it != bc.dirty_buffers.end();
it++) {
(*it)->flush_start();
C_Client_FlushFinish *onfinish = new C_Client_FlushFinish(&bc, *it);
filer->write((*it)->ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
}
dout(7) << "dirty buffers, waiting" << endl;
Cond cond;
bc.waitfor_flushed.push_back(&cond);
cond.Wait(client_lock);
} else {
dout(7) << "no dirty buffers" << endl;
}
}
/*
* release inode (read cached) buffers from memory
*/
int Client::release_inode_buffers(Inode *in)
{
dout(2) << "release_inode_buffers IMPLEMENT ME" << endl;
#ifdef BUFFERCACHE
bc.release_file(in->inode.ino);
#endif
}
@ -475,8 +546,7 @@ int Client::mount(int mkfs)
if (mkfs) m->set_mkfs(mkfs);
client_lock.Unlock();
MClientMountAck *reply = (MClientMountAck*)messenger->sendrecv(m,
MSG_ADDR_MDS(0), MDS_PORT_SERVER);
MClientMountAck *reply = (MClientMountAck*)messenger->sendrecv(m, MSG_ADDR_MDS(0), MDS_PORT_SERVER);
client_lock.Lock();
assert(reply);
@ -1026,6 +1096,9 @@ int Client::close(fileh_t fh)
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
// Make sure buffers are all clean!
flush_inode_buffers(in);
// take note of the fact that we're mid-close
/* mds may ack our close() after reissuing same fh to another open; remove from
@ -1061,17 +1134,38 @@ public:
Mutex *mutex;
int *rvalue;
bool finished;
C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) {
this->cond = cond;
C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) {
this->cond = cond;
this->mutex = mutex;
this->rvalue = rvalue;
this->finished = false;
}
void finish(int r) {
//mutex->Lock();
*rvalue = r;
finished = true;
cond->Signal();
//mutex->Unlock();
}
};
class C_Client_MissFinish : public Context {
public:
Bufferhead *bh;
Mutex *mutex;
int *rvalue;
bool finished;
C_Client_MissFinish(Bufferhead *bh, Mutex *mutex, int *rvalue) {
this->bh = bh;
this->mutex = mutex;
this->rvalue = rvalue;
this->finished = false;
}
void finish(int r) {
//mutex->Lock();
*rvalue = r;
*rvalue += r;
finished = true;
cond->Signal();
bh->miss_finish();
//mutex->Unlock();
}
};
@ -1081,6 +1175,7 @@ int Client::read(fileh_t fh, char *buf, size_t size, off_t offset)
{
client_lock.Lock();
dout(7) << "read len: " << size << " off: " << offset << endl;
assert(fh_map.count(fh));
Fh *f = fh_map[fh];
Inode *in = f->inode;
@ -1095,30 +1190,94 @@ int Client::read(fileh_t fh, char *buf, size_t size, off_t offset)
}
if (cond) delete cond;
// determine whether read range overlaps with file
// FIXME: maybe we should stat the file again?
dout(10) << "file size: " << in->inode.size << endl;
if (offset >= in->inode.size) {
client_lock.Unlock();
return 0;
}
if (size > in->inode.size) size = in->inode.size;
int rvalue = 0;
if (0) {
// (some of) read from buffer?
// .... bleh ....
} else {
// issue read
Cond cond;
#ifndef BUFFERCACHE
{
Cond cond;
bufferlist blist; // data will go here
bufferlist blist; // data will go here
C_Client_Cond *onfinish = new C_Client_Cond(&cond, &client_lock, &rvalue);
filer->read(in->inode.ino, g_OSD_FileLayout, size, offset, &blist, onfinish);
cond.Wait(client_lock);
C_Client_Cond *onfinish = new C_Client_Cond(&cond, &client_lock, &rvalue);
filer->read(in->inode.ino, g_OSD_FileLayout, size, offset, &blist, onfinish);
cond.Wait(client_lock);
// copy data into caller's buf
blist.copy(0, blist.length(), buf);
// copy data into caller's buf
blist.copy(0, blist.length(), buf);
}
client_lock.Unlock();
return rvalue;
}
#else
// map buffercache
map<off_t, Bufferhead*> hits, inflight;
map<off_t, Bufferhead*>::iterator curbuf;
map<off_t, size_t> holes;
map<off_t, size_t>::iterator hole;
Filecache *fc = bc.get_fc(in->inode.ino);
curbuf = fc->map_existing(size, offset, hits, inflight, holes);
if (curbuf != fc->buffer_map.end() && hits.count(curbuf->first)) {
// sweet -- we can return stuff immediately: find out how much
dout(7) << "read bc hit" << endl;
rvalue = (int)bc.touch_continuous(hits, size, offset);
assert(rvalue > 0);
rvalue = fc->copy_out((size_t)rvalue, offset, buf);
assert(rvalue > 0);
dout(7) << "read bc hit: immediately returning " << rvalue << " bytes" << endl;
}
// issue reads for holes
int hole_rvalue = 0; //FIXME: don't really need to track rvalue in MissFinish context
for (hole = holes.begin(); hole != holes.end(); hole++) {
dout(7) << "read bc miss" << endl;
off_t hole_offset = hole->first;
size_t hole_size = hole->second;
// insert new bufferhead without allocating buffers (Filer::handle_osd_read_reply allocates them)
Bufferhead *bh = new Bufferhead(in->inode.ino, hole_offset, &bc);
// read into the buffercache: when finished transition state from inflight to clean
bh->miss_start();
C_Client_MissFinish *onfinish = new C_Client_MissFinish(bh, &client_lock, &hole_rvalue);
filer->read(in->inode.ino, g_OSD_FileLayout, hole_size, hole_offset, &(bh->bl), onfinish);
dout(7) << "read bc miss: issued osd read len: " << hole_size << " off: " << hole_offset << endl;
}
if (rvalue == 0) {
// we need to wait for the first buffer
dout(7) << "read bc miss: waiting for first buffer" << endl;
Bufferhead *bh;
if (curbuf == fc->buffer_map.end() && fc->buffer_map.count(offset)) {
dout(10) << "first buffer is currently read in" << endl;
bh = fc->buffer_map[offset];
} else {
dout(10) << "first buffer is either hit or inflight" << endl;
bh = curbuf->second;
}
if (bh->state == BUFHD_STATE_INFLIGHT) {
bh->wait_for_read(client_lock);
}
// buffer is filled -- see how much we can return
hits.clear(); inflight.clear(); holes.clear();
fc->map_existing(size, offset, hits, inflight, holes); // FIXME: overkill
assert(hits.count(bh->offset));
rvalue = bc.touch_continuous(hits, size, offset);
fc->copy_out(rvalue, offset, buf);
dout(7) << "read bc no hit: returned first " << rvalue << " bytes" << endl;
}
#endif
// done!
client_lock.Unlock();
return rvalue;
}
// hack.. see async write() below
class C_Client_WriteBuffer : public Context {
@ -1126,25 +1285,26 @@ public:
Inode *in;
bufferlist *blist;
C_Client_WriteBuffer(Inode *in, bufferlist *blist) {
this->in = in;
this->blist = blist;
this->in = in;
this->blist = blist;
}
void finish(int r) {
in->inflight_buffers.erase(blist);
delete blist;
if (in->inflight_buffers.empty()) {
// wake up flush waiters
for (list<Cond*>::iterator it = in->waitfor_flushed.begin();
it != in->waitfor_flushed.end();
it++) {
(*it)->Signal();
}
in->waitfor_flushed.clear();
}
in->inflight_buffers.erase(blist);
delete blist;
if (in->inflight_buffers.empty()) {
// wake up flush waiters
for (list<Cond*>::iterator it = in->waitfor_flushed.begin();
it != in->waitfor_flushed.end();
it++) {
(*it)->Signal();
}
in->waitfor_flushed.clear();
}
}
};
int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset)
{
client_lock.Lock();
@ -1170,28 +1330,43 @@ int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset)
// buffered write?
if (false && f->caps & CFILE_CAP_WRBUFFER) {
if (f->caps & CFILE_CAP_WRBUFFER) {
// buffered write
dout(10) << "buffered/async write" << endl;
dout(7) << "buffered/async write" << endl;
#ifdef BUFFERCACHE
// map buffercache for writing
map<off_t, Bufferhead*> buffers, inflight;
bc.map_or_alloc(in->inode.ino, size, offset, buffers, inflight);
// wait for inflight buffers
while (!inflight.empty()) {
inflight.begin()->second->wait_for_write(client_lock);
buffers.clear(); inflight.clear();
bc.map_or_alloc(in->inode.ino, size, offset, buffers, inflight); // FIXME: overkill
}
bc.dirty(in->inode.ino, size, offset, buf);
#else
/*
hack for now.. replace this with a real buffer cache
/*
hack for now.. replace this with a real buffer cache
just copy the buffer, send the write off, and return immediately.
flush() will block until all outstanding writes complete.
*/
just copy the buffer, send the write off, and return immediately.
flush() will block until all outstanding writes complete.
*/
bufferlist *blist = new bufferlist;
blist->push_back( new buffer(buf, size, BUFFER_MODE_COPY|BUFFER_MODE_FREE) );
bufferlist *blist = new bufferlist;
blist->push_back( new buffer(buf, size, BUFFER_MODE_COPY|BUFFER_MODE_FREE) );
in->inflight_buffers.insert(blist);
in->inflight_buffers.insert(blist);
Context *onfinish = new C_Client_WriteBuffer( in, blist );
filer->write(in->inode.ino, g_OSD_FileLayout, size, offset, *blist, 0, onfinish);
Context *onfinish = new C_Client_WriteBuffer( in, blist );
filer->write(in->inode.ino, g_OSD_FileLayout, size, offset, *blist, 0, onfinish);
#endif
} else {
// synchronous write
dout(10) << "synchronous write" << endl;
// FIXME: do not bypass buffercache
dout(7) << "synchronous write" << endl;
// create a buffer that refers to *buf, but doesn't try to free it when it's done.
bufferlist blist;

View File

@ -1,6 +1,8 @@
#ifndef __CLIENT_H
#define __CLIENT_H
#include "Buffercache.h"
#include "mds/MDCluster.h"
#include "osd/OSDCluster.h"
@ -240,6 +242,9 @@ class Client : public Dispatcher {
// buffer cache
Buffercache bc;
int flush_buffers(); // flush dirty buffers
int flush_inode_buffers(Inode *in); // flush buffered writes
int release_inode_buffers(Inode *in); // release cached reads

View File

@ -278,7 +278,7 @@ class bufferptr {
assert(len >= 0 && off + len <= _len);
memcpy(dest, c_str() + off, len);
}
void copy_in(int off, int len, char *src) {
void copy_in(int off, int len, const char *src) {
assert(off >= 0 && off <= _len);
assert(len >= 0 && off + len <= _len);
memcpy(c_str() + off, src, len);

View File

@ -161,7 +161,7 @@ class bufferlist {
}
}
void copy_in(int off, int len, char *src) {
void copy_in(int off, int len, const char *src) {
assert(off >= 0);
assert(off + len <= length());