mirror of
https://github.com/ceph/ceph
synced 2025-02-21 01:47:25 +00:00
*** empty log message ***
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@226 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
a6e5fa53dd
commit
cb323e6209
@ -1,8 +1,15 @@
|
||||
|
||||
#include "Filer.h"
|
||||
|
||||
#include "messages/MOSDRead.h"
|
||||
#include "messages/MOSDReadReply.h"
|
||||
#include "messages/MOSDWrite.h"
|
||||
#include "messages/MOSDWriteReply.h"
|
||||
#include "messages/MOSDOp.h"
|
||||
|
||||
|
||||
// read
|
||||
|
||||
// OSD fun ------------------------
|
||||
|
||||
int
|
||||
Filer::read(inodeno_t ino,
|
||||
@ -28,9 +35,10 @@ Filer::read(inodeno_t ino,
|
||||
it != extents.end();
|
||||
it++) {
|
||||
int r = 0; // pick a replica
|
||||
|
||||
last_tid++;
|
||||
|
||||
// issue read
|
||||
MOSDRead *m = new MOSDRead(++last_tid, it->oid, it->len, it->offset);
|
||||
MOSDRead *m = new MOSDRead(last_tid, it->oid, it->len, it->offset);
|
||||
messenger->send_message(m, MSG_ADDR_OSD(it->osd[r]), 0);
|
||||
|
||||
// add to gather set
|
||||
@ -40,11 +48,9 @@ Filer::read(inodeno_t ino,
|
||||
}
|
||||
|
||||
|
||||
int Filer::handle_osd_read_reply(MOSDReadReply *m)
|
||||
int
|
||||
Filer::handle_osd_read_reply(MOSDReadReply *m)
|
||||
{
|
||||
MOSDReadReply *m = (MOSDReadReply*)rawm;
|
||||
|
||||
assert(m->get_result() >= 0);
|
||||
|
||||
// get pio
|
||||
tid_t = m->get_tid();
|
||||
@ -56,18 +62,34 @@ int Filer::handle_osd_read_reply(MOSDReadReply *m)
|
||||
|
||||
if (p->outstanding_ops.empty()) {
|
||||
// all done
|
||||
p->buffer->clear();
|
||||
|
||||
// assemble result
|
||||
p->buffer->append( m->get_buffer() );
|
||||
p->buffer = 0;
|
||||
long result = m->get_len();
|
||||
|
||||
p->buffer->clear();
|
||||
if (p->finished_reads.empty()) {
|
||||
// single read, easy
|
||||
p->buffer->append( m->get_buffer() );
|
||||
} else {
|
||||
// multiple reads
|
||||
crope *partial = new crope;
|
||||
*partial = m->get_buffer();
|
||||
p->finished_reads[ m->get_offset() ] = partial;
|
||||
|
||||
for (map<size_t, crope*>::iterator it = p->finished_reads.begin();
|
||||
it != p->finished_reads.end();
|
||||
it++) {
|
||||
p->buffer->append( *it->second ); // FIXME: fill in holes!
|
||||
delete it->second;
|
||||
}
|
||||
}
|
||||
|
||||
long result = p->buffer->length();
|
||||
|
||||
// finish, clean up
|
||||
Context *onfinish = p->onfinish;
|
||||
delete p; // del pendingOsdRead_t
|
||||
delete m; // del message
|
||||
delete m;
|
||||
|
||||
// done
|
||||
if (onfinish) {
|
||||
onfinish->finish(result);
|
||||
delete onfinish;
|
||||
@ -76,51 +98,78 @@ int Filer::handle_osd_read_reply(MOSDReadReply *m)
|
||||
// partial result
|
||||
crope *partial = new crope;
|
||||
*partial = m->get_buffer();
|
||||
p->finished_reads[ m->get_offset() ] = buffer;
|
||||
p->finished_reads[ m->get_offset() ] = partial;
|
||||
delete m;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// -- osd_write
|
||||
// write
|
||||
|
||||
int
|
||||
MDS::osd_write(int osd,
|
||||
object_t oid,
|
||||
size_t len,
|
||||
size_t offset,
|
||||
crope& buffer,
|
||||
int flags,
|
||||
Context *c)
|
||||
Filer::write(inodeno_t ino,
|
||||
size_t len,
|
||||
size_t offset,
|
||||
crope& buffer,
|
||||
int flags,
|
||||
Context *onfinish)
|
||||
{
|
||||
osd_last_tid++;
|
||||
last_tid++;
|
||||
int num_rep = 1;
|
||||
|
||||
MOSDWrite *m = new MOSDWrite(osd_last_tid,
|
||||
oid,
|
||||
len, offset,
|
||||
buffer, flags);
|
||||
osd_writes[ osd_last_tid ] = c;
|
||||
// pending write record
|
||||
PendingOSDWrite_t *p = new PendingOSDWrite_t;
|
||||
p->onfinish = onfinish;
|
||||
|
||||
// find data
|
||||
list<OSDExtent> extents;
|
||||
osdcluster->file_to_extents(ino, len, offset, num_rep, extents);
|
||||
|
||||
dout(10) << "sending MOSDWrite " << m->get_type() << endl;
|
||||
messenger->send_message(m,
|
||||
MSG_ADDR_OSD(osd),
|
||||
0, MDS_PORT_MAIN);
|
||||
}
|
||||
dout(7) << "osd write in " << extents.size() << " object extents on " << num_rep << " replicas" << endl;
|
||||
|
||||
size_t off = offset; // ptr into buffer
|
||||
|
||||
int MDS::osd_write_finish(Message *rawm)
|
||||
{
|
||||
MOSDWriteReply *m = (MOSDWriteReply *)rawm;
|
||||
|
||||
Context *c = osd_writes[ m->get_tid() ];
|
||||
osd_writes.erase(m->get_tid());
|
||||
|
||||
long result = m->get_result();
|
||||
delete m;
|
||||
|
||||
dout(10) << " finishing osd_write" << endl;
|
||||
|
||||
if (c) {
|
||||
c->finish(result);
|
||||
delete c;
|
||||
for (list<OSDExtent>::iterator it = extents.begin();
|
||||
it != extents.end();
|
||||
it++) {
|
||||
int r = 0; // pick a replica
|
||||
last_tid++;
|
||||
|
||||
// issue write
|
||||
crope partial = buffer.substr(off, it->len);
|
||||
off += it->len;
|
||||
|
||||
MOSDWrite *m = new MOSDWrite(last_tid, it->oid, it->len, it->offset,
|
||||
partial);
|
||||
messenger->send_message(m, MSG_ADDR_OSD(it->osd[r]), 0);
|
||||
|
||||
// add to gather set
|
||||
p->outstanding_ops.insert(last_tid);
|
||||
osd_writes[last_tid] = p;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int Filer::handle_osd_write_reply(MOSDWriteReply *m)
|
||||
{
|
||||
// get pio
|
||||
tid_t = m->get_tid();
|
||||
PendingOSDWrite_t *p = osd_writes[ tid ];
|
||||
osd_writes.erase( tid );
|
||||
|
||||
// our op finished
|
||||
p->outstanding_ops.erase(tid);
|
||||
|
||||
if (p->outstanding_ops.empty()) {
|
||||
// all done
|
||||
Context *onfinish = p->onfinish;
|
||||
delete p;
|
||||
|
||||
if (onfinish) {
|
||||
onfinish->finish(0);
|
||||
delete onfinish;
|
||||
}
|
||||
}
|
||||
delete m;
|
||||
}
|
||||
|
@ -2,6 +2,8 @@
|
||||
#define __FILER_H
|
||||
|
||||
/*** Filer
|
||||
*
|
||||
* client/mds interface to access "files" in OSD cluster
|
||||
*
|
||||
* generic non-blocking interface for reading/writing to osds, using
|
||||
* the file-to-object mappings defined by OSDCluster.
|
||||
@ -13,6 +15,10 @@
|
||||
|
||||
#include "OSDCluster.h"
|
||||
|
||||
#include <set>
|
||||
#include <map>
|
||||
using namespace std;
|
||||
|
||||
#include <ext/hash_map>
|
||||
#include <ext/rope>
|
||||
using namespace __gnu_cxx;
|
||||
@ -32,14 +38,14 @@ typedef struct {
|
||||
Context *onfinish;
|
||||
} PendingOSDWrite_t;
|
||||
|
||||
class Filer {
|
||||
|
||||
OSDCluster *osdcluster; // what osds am i dealing with?
|
||||
|
||||
|
||||
class Filer {
|
||||
OSDCluster *osdcluster; // what osds am i dealing with?
|
||||
|
||||
__uint64_t osd_last_tid;
|
||||
__uint64_t last_tid;
|
||||
hash_map<tid_t,PendingOSDRead_t*> osd_reads;
|
||||
hash_map<tid_t,Context*> osd_writes;
|
||||
hash_map<tid_t,PendingOSDWrite_t*> osd_writes;
|
||||
|
||||
public:
|
||||
Filer(OSDCluster *osdcluster);
|
||||
@ -54,8 +60,8 @@ class Filer {
|
||||
crope *buffer,
|
||||
Context *c);
|
||||
protected:
|
||||
int read_finish(Message *m);
|
||||
|
||||
void handle_osd_read_reply(class MOSDReadReply *m);
|
||||
|
||||
public:
|
||||
int write(inodeno_t ino,
|
||||
size_t len,
|
||||
@ -64,8 +70,12 @@ class Filer {
|
||||
int flags,
|
||||
Context *c);
|
||||
protected:
|
||||
int write_finish(Message *m);
|
||||
void handle_osd_write_reply(class MOSDWriteReply *m);
|
||||
|
||||
|
||||
public:
|
||||
int zero(inodeno_t ino, size_t len, size_t offset, Context *c); // delete, if len==offset==0
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user