uclient: handle short sync reads vs eof

This commit is contained in:
Sage Weil 2009-08-13 15:30:14 -07:00
parent 772d49e744
commit f021e18a76
2 changed files with 210 additions and 5 deletions

View File

@ -3799,8 +3799,36 @@ void Client::unlock_fh_pos(Fh *f)
}
//
//char *hackbuf = 0;
int Client::_get_caps(Inode *in, int need, int want, int *got, loff_t endoff)
{
while (1) {
if (endoff > 0) {
// ...
}
int implemented;
int have = in->caps_issued(&implemented);
if ((have & need) == need) {
int butnot = want & ~(have & need);
int revoking = implemented & ~have;
dout(10) << "get_caps " << *in << " have " << ccap_string(have)
<< " need " << ccap_string(need) << " want " << ccap_string(want)
<< " but not " << ccap_string(butnot) << " revoking " << ccap_string(revoking)
<< dendl;
if ((revoking & butnot) == 0) {
*got = need | (have & want);
in->get_cap_ref(need);
return 0;
}
}
// wait
dout(10) << "get_caps " << *in << " waiting" << dendl;
wait_on_list(in->waitfor_caps);
}
}
// blocking osd interface
@ -3829,6 +3857,13 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl)
{
Inode *in = f->inode;
//bool lazy = f->mode == CEPH_FILE_MODE_LAZY;
int got;
int r = _get_caps(in, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_CACHE, &got, -1);
if (r < 0)
return r;
bool movepos = false;
if (offset < 0) {
lock_fh_pos(f);
@ -3836,7 +3871,168 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl)
movepos = true;
}
bool lazy = f->mode == CEPH_FILE_MODE_LAZY;
if (got & CEPH_CAP_FILE_CACHE)
r = _read_async(f, offset, size, bl);
else
r = _read_sync(f, offset, size, bl);
if (movepos) {
// adjust fd pos
f->pos = offset+bl->length();
unlock_fh_pos(f);
}
// adjust readahead state
if (f->last_pos != offset) {
f->nr_consec_read = f->consec_read_bytes = 0;
} else {
f->nr_consec_read++;
}
f->consec_read_bytes += bl->length();
dout(10) << "readahead nr_consec_read " << f->nr_consec_read
<< " for " << f->consec_read_bytes << " bytes"
<< " .. last_pos " << f->last_pos << " .. offset " << offset
<< dendl;
f->last_pos = offset+bl->length();
// done!
put_cap_ref(in, got);
return r;
}
int Client::_read_async(Fh *f, __u64 off, __u64 len, bufferlist *bl)
{
Inode *in = f->inode;
bool readahead = true;
dout(10) << "_read_async " << *in << " " << off << "~" << len << dendl;
// trim read based on file size?
if (off >= in->size)
return 0;
if (off + len > in->size) {
len = in->size - off;
readahead = false;
}
// we will populate the cache here
if (in->cap_refs[CEPH_CAP_FILE_CACHE] == 0)
in->get_cap_ref(CEPH_CAP_FILE_CACHE);
// readahead?
if (readahead &&
f->nr_consec_read &&
(g_conf.client_readahead_max_bytes ||
g_conf.client_readahead_max_periods)) {
loff_t l = f->consec_read_bytes * 2;
if (g_conf.client_readahead_min)
l = MAX(l, g_conf.client_readahead_min);
if (g_conf.client_readahead_max_bytes)
l = MIN(l, g_conf.client_readahead_max_bytes);
loff_t p = ceph_file_layout_period(in->layout);
if (g_conf.client_readahead_max_periods)
l = MIN(l, g_conf.client_readahead_max_periods * p);
if (l >= 2*p)
// align with period
l -= (off+l) % p;
// don't read past end of file
if (off+l > in->size)
l = in->size - off;
dout(10) << "readahead " << f->nr_consec_read << " reads "
<< f->consec_read_bytes << " bytes ... readahead " << off << "~" << l
<< " (caller wants " << off << "~" << len << ")" << dendl;
if (l > 0) {
objectcacher->file_read(in->ino, &in->layout, in->snapid,
off, l, NULL, 0, 0);
dout(10) << "readahead initiated" << dendl;
}
}
// read (and possibly block)
int r, rvalue = 0;
Mutex flock("Client::_read_async flock");
Cond cond;
bool done = false;
Context *onfinish = new C_SafeCond(&flock, &cond, &done, &rvalue);
if (in->snapid == CEPH_NOSNAP)
r = objectcacher->file_read(in->ino, &in->layout, in->snapid,
off, len, bl, 0, onfinish);
else
r = objectcacher->file_read(in->ino, &in->layout, in->snapid,
off, len, bl, 0, onfinish);
if (r == 0) {
while (!done)
cond.Wait(client_lock);
r = rvalue;
} else {
// it was cached.
delete onfinish;
}
return r;
}
int Client::_read_sync(Fh *f, __u64 off, __u64 len, bufferlist *bl)
{
Inode *in = f->inode;
__u64 pos = off;
int left = len;
int read = 0;
dout(10) << "_read_sync " << *in << " " << off << "~" << len << dendl;
int flags = 0;
if (in->hack_balance_reads || g_conf.client_hack_balance_reads)
flags |= CEPH_OSD_FLAG_BALANCE_READS;
Mutex flock("Client::_read_sync flock");
Cond cond;
while (1) {
int r = 0;
bool done = false;
Context *onfinish = new C_SafeCond(&flock, &cond, &done, &r);
bufferlist tbl;
int wanted = left;
filer->read(in->ino, &in->layout, in->snapid,
pos, left, &tbl, flags, onfinish);
while (!done)
cond.Wait(client_lock);
if (r < 0)
return r;
if (tbl.length()) {
r = tbl.length();
read += r;
pos += r;
left -= r;
bl->claim_append(tbl);
}
// short read?
if (r >= 0 && r < wanted) {
if (pos + left <= in->size) {
// hole, zero and return.
bufferptr z(left);
z.zero();
bl->push_back(z);
read += left;
return read;
}
// reverify size
r = _getattr(in, CEPH_STAT_CAP_SIZE);
if (r < 0)
return r;
// eof? short read.
if (pos >= in->size)
return read;
}
}
}
#if 0
// wait for RD cap and/or a valid file size
int issued;
@ -3995,7 +4191,7 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl)
return rvalue;
}
#endif
/*

View File

@ -453,13 +453,18 @@ class Inode {
return true;
}
int caps_issued() {
int caps_issued(int *implemented = 0) {
int c = exporting_issued | snap_caps;
int i = 0;
for (map<int,InodeCap*>::iterator it = caps.begin();
it != caps.end();
it++)
if (cap_is_valid(it->second))
if (cap_is_valid(it->second)) {
c |= it->second->issued;
i |= it->second->implemented;
}
if (implemented)
*implemented = i;
return c;
}
void touch_cap(InodeCap *cap) {
@ -1024,6 +1029,10 @@ private:
int _ll_put(Inode *in, int num);
void _ll_drop_pins();
int _get_caps(Inode *in, int need, int want, int *got, loff_t endoff);
int _read_sync(Fh *f, __u64 off, __u64 len, bufferlist *bl);
int _read_async(Fh *f, __u64 off, __u64 len, bufferlist *bl);
// internal interface
// call these with client_lock held!
int _do_lookup(Inode *dir, const char *name, Inode **target);