diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 7c1ab446609..357bb995334 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -18,7 +18,10 @@ #include "messages/MOSDWrite.h" #include "messages/MOSDWriteReply.h" +#include "osd/Filer.h" +#include "common/Cond.h" +#include "common/Mutex.h" #include "include/config.h" #undef dout @@ -35,22 +38,27 @@ Client::Client(MDCluster *mdc, int id, Messenger *m) mounted = false; - osdcluster = new OSDCluster(); // initially blank.. see mount() + // + all_files_closed = false; + root = 0; + + set_cache_size(g_conf.client_cache_size); // set up messengers messenger = m; messenger->set_dispatcher(this); - all_files_closed = false; - root = 0; - - set_cache_size(g_conf.client_cache_size); + // osd interfaces + osdcluster = new OSDCluster(); // initially blank.. see mount() + filer = new Filer(messenger, osdcluster); } Client::~Client() { if (messenger) { delete messenger; messenger = 0; } + if (filer) { delete filer; filer = 0; } + if (osdcluster) { delete osdcluster; osdcluster = 0; } } @@ -66,6 +74,9 @@ void Client::shutdown() { +// =================== +// metadata cache stuff + // insert inode info into metadata cache @@ -192,13 +203,26 @@ Dentry *Client::lookup(filepath& path) + + // ------------------------ // incoming messages void Client::dispatch(Message *m) { switch (m->get_type()) { - + // osd + case MSG_OSD_READREPLY: + filer->handle_osd_read_reply((MOSDReadReply*)m); + break; + case MSG_OSD_WRITEREPLY: + filer->handle_osd_write_reply((MOSDWriteReply*)m); + break; + case MSG_OSD_OPREPLY: + filer->handle_osd_op_reply((MOSDOpReply*)m); + break; + + // client case MSG_CLIENT_FILECAPS: handle_file_caps((MClientFileCaps*)m); break; @@ -250,8 +274,6 @@ void Client::handle_file_caps(MClientFileCaps *m) // ------------------- // fs ops - - int Client::mount(int mkfs) { assert(!mounted); // caller is confused? @@ -722,60 +744,61 @@ int Client::close(fileh_t fh) } + +// ------------ +// read, write + +// ------------------------ +// blocking osd interface + +class C_Client_Cond : public Context { +public: + Cond *cond; + Mutex *mutex; + int *rvalue; + C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) { + this->cond = cond; + this->mutex = mutex; + this->rvalue = rvalue; + } + void finish(int r) { + //cout << "client_cond finish" << endl; + mutex->Lock(); + //cout << "got mutex" << endl; + *rvalue = r; + cond->Signal(); + mutex->Unlock(); + } +}; + + int Client::read(fileh_t fh, char *buf, size_t size, off_t offset) { - assert(fh_map.count(fh)); inodeno_t ino = fh_map[fh]->ino; - + // check current file mode (are we allowed to read, cache, etc.) // *** + // issue async read + Cond cond; + static Mutex mylock; + int rvalue; - // map to object byte ranges - list extents; - osdcluster->file_to_extents( ino, size, offset, - 1, // 1 replica for now - extents ); + Mutex *mutex = &mylock; + mutex->Lock(); + + C_Client_Cond *onfinish = new C_Client_Cond(&cond, mutex, &rvalue); + filer->read(ino, size, offset, buf, onfinish); - // issue reads (serially for now!) - size_t left = size; // sanity check - size_t totalread = 0; + cond.Wait(*mutex); + mutex->Unlock(); - for (list::iterator it = extents.begin(); - it != extents.end(); - it++) { - // what where who - int osd = it->osds[0]; // only 1 replica for now - dout(7) << "reading range from object " << it->oid << " on " << osd << ": len " << it->len << " offset " << it->offset << endl; - assert(it->len <= left); - - // issue read - MOSDRead *req = new MOSDRead(0, // tid - it->oid, - it->len, it->offset); - MOSDReadReply *reply = (MOSDReadReply*)messenger->sendrecv(req, MSG_ADDR_OSD(osd)); - assert(reply); - - // copy data into *buf - size_t readlen = reply->get_len(); - memcpy(buf, reply->get_buffer(), readlen); - - // move on - left -= readlen; - buf += readlen; - totalread += readlen; - - // short read? - if (readlen < it->len) { - dout(7) << "short read, only got " << readlen << " bytes" << endl; - break; // short read. - } - } - - return totalread; + return rvalue; } + + int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset) { dout(7) << "write fh " << fh << " size " << size << " offset " << offset << endl; @@ -791,42 +814,23 @@ int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset) // check current file mode (are we allowed to write, buffer, etc.) // *** + // issue write + Cond cond; + static Mutex mylock; + int rvalue; - // map to object byte ranges - list extents; - osdcluster->file_to_extents( ino, size, offset, - 1, // 1 replica for now - extents ); + Mutex *mutex = &mylock; + mutex->Lock(); + + C_Client_Cond *onfinish = new C_Client_Cond(&cond, mutex, &rvalue); + filer->write(ino, size, offset, buf, 0, onfinish); - // issue writes (serially for now!) - size_t totalwritten = 0; - - for (list::iterator it = extents.begin(); - it != extents.end(); - it++) { - // what where who - int osd = it->osds[0]; // only 1 replica for now - dout(7) << "writing range to object " << it->oid << " on " << osd << ": len " << it->len << " offset " << it->offset << endl; - - // issue write - MOSDWrite *req = new MOSDWrite(0, // tid - it->oid, - it->len, it->offset, - buf, - 0); // no flags - MOSDWriteReply *reply = (MOSDWriteReply*)messenger->sendrecv(req, MSG_ADDR_OSD(osd)); - assert(reply); - - // don't tolerate osd crankiness yet - assert(it->len == reply->get_result()); // ?? - - // move on - buf += it->len; - totalwritten += it->len; - } - - assert(totalwritten == size); + cond.Wait(*mutex); + mutex->Unlock(); + // assume success for now. FIXME. + size_t totalwritten = size; + // extend file? if (totalwritten + offset > f->size) { f->size = totalwritten + offset; diff --git a/ceph/client/Client.h b/ceph/client/Client.h index 1508d5332e2..bc6510d8d35 100644 --- a/ceph/client/Client.h +++ b/ceph/client/Client.h @@ -27,6 +27,8 @@ using namespace std; using namespace __gnu_cxx; +class Filer; + // ============================================ // types for my local metadata cache @@ -121,6 +123,7 @@ class Client : public Dispatcher { OSDCluster *osdcluster; bool mounted; + Filer *filer; // (non-blocking) osd interface // cache map inode_map; @@ -214,6 +217,10 @@ class Client : public Dispatcher { // find dentry based on filepath Dentry *lookup(filepath& path); + + // blocking osd accessors + int blocking_osd_read(inodeno_t ino, size_t len, size_t offset, char* buffer); + int blocking_osd_write(inodeno_t ino, size_t len, size_t offset, const char* buffer); public: Client(MDCluster *mdc, int id, Messenger *m);