mirror of https://github.com/ceph/ceph
use Filer for reads/writes
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@256 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
4275d6500d
commit
eb2674f6a3
|
@ -18,7 +18,10 @@
|
|||
#include "messages/MOSDWrite.h"
|
||||
#include "messages/MOSDWriteReply.h"
|
||||
|
||||
#include "osd/Filer.h"
|
||||
|
||||
#include "common/Cond.h"
|
||||
#include "common/Mutex.h"
|
||||
|
||||
#include "include/config.h"
|
||||
#undef dout
|
||||
|
@ -35,22 +38,27 @@ Client::Client(MDCluster *mdc, int id, Messenger *m)
|
|||
|
||||
mounted = false;
|
||||
|
||||
osdcluster = new OSDCluster(); // initially blank.. see mount()
|
||||
//
|
||||
all_files_closed = false;
|
||||
root = 0;
|
||||
|
||||
set_cache_size(g_conf.client_cache_size);
|
||||
|
||||
// set up messengers
|
||||
messenger = m;
|
||||
messenger->set_dispatcher(this);
|
||||
|
||||
all_files_closed = false;
|
||||
root = 0;
|
||||
|
||||
set_cache_size(g_conf.client_cache_size);
|
||||
// osd interfaces
|
||||
osdcluster = new OSDCluster(); // initially blank.. see mount()
|
||||
filer = new Filer(messenger, osdcluster);
|
||||
}
|
||||
|
||||
|
||||
Client::~Client()
|
||||
{
|
||||
if (messenger) { delete messenger; messenger = 0; }
|
||||
if (filer) { delete filer; filer = 0; }
|
||||
if (osdcluster) { delete osdcluster; osdcluster = 0; }
|
||||
}
|
||||
|
||||
|
||||
|
@ -66,6 +74,9 @@ void Client::shutdown() {
|
|||
|
||||
|
||||
|
||||
// ===================
|
||||
// metadata cache stuff
|
||||
|
||||
|
||||
// insert inode info into metadata cache
|
||||
|
||||
|
@ -192,13 +203,26 @@ Dentry *Client::lookup(filepath& path)
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
// ------------------------
|
||||
// incoming messages
|
||||
|
||||
void Client::dispatch(Message *m)
|
||||
{
|
||||
switch (m->get_type()) {
|
||||
|
||||
// osd
|
||||
case MSG_OSD_READREPLY:
|
||||
filer->handle_osd_read_reply((MOSDReadReply*)m);
|
||||
break;
|
||||
case MSG_OSD_WRITEREPLY:
|
||||
filer->handle_osd_write_reply((MOSDWriteReply*)m);
|
||||
break;
|
||||
case MSG_OSD_OPREPLY:
|
||||
filer->handle_osd_op_reply((MOSDOpReply*)m);
|
||||
break;
|
||||
|
||||
// client
|
||||
case MSG_CLIENT_FILECAPS:
|
||||
handle_file_caps((MClientFileCaps*)m);
|
||||
break;
|
||||
|
@ -250,8 +274,6 @@ void Client::handle_file_caps(MClientFileCaps *m)
|
|||
// -------------------
|
||||
// fs ops
|
||||
|
||||
|
||||
|
||||
int Client::mount(int mkfs)
|
||||
{
|
||||
assert(!mounted); // caller is confused?
|
||||
|
@ -722,60 +744,61 @@ int Client::close(fileh_t fh)
|
|||
}
|
||||
|
||||
|
||||
|
||||
// ------------
|
||||
// read, write
|
||||
|
||||
// ------------------------
|
||||
// blocking osd interface
|
||||
|
||||
class C_Client_Cond : public Context {
|
||||
public:
|
||||
Cond *cond;
|
||||
Mutex *mutex;
|
||||
int *rvalue;
|
||||
C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) {
|
||||
this->cond = cond;
|
||||
this->mutex = mutex;
|
||||
this->rvalue = rvalue;
|
||||
}
|
||||
void finish(int r) {
|
||||
//cout << "client_cond finish" << endl;
|
||||
mutex->Lock();
|
||||
//cout << "got mutex" << endl;
|
||||
*rvalue = r;
|
||||
cond->Signal();
|
||||
mutex->Unlock();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
int Client::read(fileh_t fh, char *buf, size_t size, off_t offset)
|
||||
{
|
||||
|
||||
assert(fh_map.count(fh));
|
||||
inodeno_t ino = fh_map[fh]->ino;
|
||||
|
||||
|
||||
// check current file mode (are we allowed to read, cache, etc.)
|
||||
// ***
|
||||
|
||||
// issue async read
|
||||
Cond cond;
|
||||
static Mutex mylock;
|
||||
int rvalue;
|
||||
|
||||
// map to object byte ranges
|
||||
list<OSDExtent> extents;
|
||||
osdcluster->file_to_extents( ino, size, offset,
|
||||
1, // 1 replica for now
|
||||
extents );
|
||||
Mutex *mutex = &mylock;
|
||||
mutex->Lock();
|
||||
|
||||
C_Client_Cond *onfinish = new C_Client_Cond(&cond, mutex, &rvalue);
|
||||
filer->read(ino, size, offset, buf, onfinish);
|
||||
|
||||
// issue reads (serially for now!)
|
||||
size_t left = size; // sanity check
|
||||
size_t totalread = 0;
|
||||
cond.Wait(*mutex);
|
||||
mutex->Unlock();
|
||||
|
||||
for (list<OSDExtent>::iterator it = extents.begin();
|
||||
it != extents.end();
|
||||
it++) {
|
||||
// what where who
|
||||
int osd = it->osds[0]; // only 1 replica for now
|
||||
dout(7) << "reading range from object " << it->oid << " on " << osd << ": len " << it->len << " offset " << it->offset << endl;
|
||||
assert(it->len <= left);
|
||||
|
||||
// issue read
|
||||
MOSDRead *req = new MOSDRead(0, // tid
|
||||
it->oid,
|
||||
it->len, it->offset);
|
||||
MOSDReadReply *reply = (MOSDReadReply*)messenger->sendrecv(req, MSG_ADDR_OSD(osd));
|
||||
assert(reply);
|
||||
|
||||
// copy data into *buf
|
||||
size_t readlen = reply->get_len();
|
||||
memcpy(buf, reply->get_buffer(), readlen);
|
||||
|
||||
// move on
|
||||
left -= readlen;
|
||||
buf += readlen;
|
||||
totalread += readlen;
|
||||
|
||||
// short read?
|
||||
if (readlen < it->len) {
|
||||
dout(7) << "short read, only got " << readlen << " bytes" << endl;
|
||||
break; // short read.
|
||||
}
|
||||
}
|
||||
|
||||
return totalread;
|
||||
return rvalue;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset)
|
||||
{
|
||||
dout(7) << "write fh " << fh << " size " << size << " offset " << offset << endl;
|
||||
|
@ -791,42 +814,23 @@ int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset)
|
|||
// check current file mode (are we allowed to write, buffer, etc.)
|
||||
// ***
|
||||
|
||||
// issue write
|
||||
Cond cond;
|
||||
static Mutex mylock;
|
||||
int rvalue;
|
||||
|
||||
// map to object byte ranges
|
||||
list<OSDExtent> extents;
|
||||
osdcluster->file_to_extents( ino, size, offset,
|
||||
1, // 1 replica for now
|
||||
extents );
|
||||
Mutex *mutex = &mylock;
|
||||
mutex->Lock();
|
||||
|
||||
C_Client_Cond *onfinish = new C_Client_Cond(&cond, mutex, &rvalue);
|
||||
filer->write(ino, size, offset, buf, 0, onfinish);
|
||||
|
||||
// issue writes (serially for now!)
|
||||
size_t totalwritten = 0;
|
||||
|
||||
for (list<OSDExtent>::iterator it = extents.begin();
|
||||
it != extents.end();
|
||||
it++) {
|
||||
// what where who
|
||||
int osd = it->osds[0]; // only 1 replica for now
|
||||
dout(7) << "writing range to object " << it->oid << " on " << osd << ": len " << it->len << " offset " << it->offset << endl;
|
||||
|
||||
// issue write
|
||||
MOSDWrite *req = new MOSDWrite(0, // tid
|
||||
it->oid,
|
||||
it->len, it->offset,
|
||||
buf,
|
||||
0); // no flags
|
||||
MOSDWriteReply *reply = (MOSDWriteReply*)messenger->sendrecv(req, MSG_ADDR_OSD(osd));
|
||||
assert(reply);
|
||||
|
||||
// don't tolerate osd crankiness yet
|
||||
assert(it->len == reply->get_result()); // ??
|
||||
|
||||
// move on
|
||||
buf += it->len;
|
||||
totalwritten += it->len;
|
||||
}
|
||||
|
||||
assert(totalwritten == size);
|
||||
cond.Wait(*mutex);
|
||||
mutex->Unlock();
|
||||
|
||||
// assume success for now. FIXME.
|
||||
size_t totalwritten = size;
|
||||
|
||||
// extend file?
|
||||
if (totalwritten + offset > f->size) {
|
||||
f->size = totalwritten + offset;
|
||||
|
|
|
@ -27,6 +27,8 @@ using namespace std;
|
|||
using namespace __gnu_cxx;
|
||||
|
||||
|
||||
class Filer;
|
||||
|
||||
|
||||
// ============================================
|
||||
// types for my local metadata cache
|
||||
|
@ -121,6 +123,7 @@ class Client : public Dispatcher {
|
|||
OSDCluster *osdcluster;
|
||||
bool mounted;
|
||||
|
||||
Filer *filer; // (non-blocking) osd interface
|
||||
|
||||
// cache
|
||||
map<inodeno_t, Inode*> inode_map;
|
||||
|
@ -214,6 +217,10 @@ class Client : public Dispatcher {
|
|||
|
||||
// find dentry based on filepath
|
||||
Dentry *lookup(filepath& path);
|
||||
|
||||
// blocking osd accessors
|
||||
int blocking_osd_read(inodeno_t ino, size_t len, size_t offset, char* buffer);
|
||||
int blocking_osd_write(inodeno_t ino, size_t len, size_t offset, const char* buffer);
|
||||
|
||||
public:
|
||||
Client(MDCluster *mdc, int id, Messenger *m);
|
||||
|
|
Loading…
Reference in New Issue