Merge branch 'wip-oc-error-handling' into next

Reviewed-by: Sage Weil <sage@inktank.com>
This commit is contained in:
Josh Durgin 2012-05-16 17:42:21 -07:00
commit 4277d4d337
5 changed files with 153 additions and 62 deletions

View File

@ -375,11 +375,11 @@ OPTION(journal_queue_max_bytes, OPT_INT, 100 << 20)
OPTION(journal_align_min_size, OPT_INT, 64 << 10) // align data payloads >= this.
OPTION(journal_replay_from, OPT_INT, 0)
OPTION(journal_zero_on_create, OPT_BOOL, false)
OPTION(rbd_cache, OPT_BOOL, false) // whether to enable writeback caching
OPTION(rbd_cache_size, OPT_LONGLONG, 32<<20) // cache size
OPTION(rbd_cache_max_dirty, OPT_LONGLONG, 24<<20) // dirty limit
OPTION(rbd_cache_target_dirty, OPT_LONGLONG, 16<<20) // target dirty limit
OPTION(rbd_cache_max_dirty_age, OPT_FLOAT, 1.0) // age in cache before writeback starts
OPTION(rbd_cache, OPT_BOOL, false) // whether to enable caching (writeback unless rbd_cache_max_dirty is 0)
OPTION(rbd_cache_size, OPT_LONGLONG, 32<<20) // cache size in bytes
OPTION(rbd_cache_max_dirty, OPT_LONGLONG, 24<<20) // dirty limit in bytes - set to 0 for write-through caching
OPTION(rbd_cache_target_dirty, OPT_LONGLONG, 16<<20) // target dirty limit in bytes
OPTION(rbd_cache_max_dirty_age, OPT_FLOAT, 1.0) // seconds in cache before writeback starts
OPTION(rgw_cache_enabled, OPT_BOOL, true) // rgw cache enabled
OPTION(rgw_cache_lru_size, OPT_INT, 10000) // num of entries in rgw cache
OPTION(rgw_socket_path, OPT_STR, "") // path to unix domain socket, if not specified, rgw will not run as external fcgi

View File

@ -680,6 +680,8 @@ int librados::IoCtxImpl::aio_operate(const object_t& oid,
int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
bufferlist *pbl, size_t len, uint64_t off)
{
if (len > (size_t) INT_MAX)
return -EDOM;
Context *onack = new C_aio_Ack(c);
eversion_t ver;
@ -698,6 +700,9 @@ int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
char *buf, size_t len, uint64_t off)
{
if (len > (size_t) INT_MAX)
return -EDOM;
Context *onack = new C_aio_Ack(c);
c->is_read = true;
@ -719,6 +724,8 @@ int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
bufferlist *data_bl, size_t len,
uint64_t off)
{
if (len > (size_t) INT_MAX)
return -EDOM;
C_aio_sparse_read_Ack *onack = new C_aio_sparse_read_Ack(c);
onack->m = m;
@ -1028,6 +1035,9 @@ int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
int librados::IoCtxImpl::read(const object_t& oid,
bufferlist& bl, size_t len, uint64_t off)
{
if (len > (size_t) INT_MAX)
return -EDOM;
Mutex mylock("IoCtxImpl::read::mylock");
Cond cond;
bool done;
@ -1035,6 +1045,7 @@ int librados::IoCtxImpl::read(const object_t& oid,
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
eversion_t ver;
::ObjectOperation op;
::ObjectOperation *pop = prepare_assert_ops(&op);
@ -1101,6 +1112,9 @@ int librados::IoCtxImpl::sparse_read(const object_t& oid,
bufferlist& data_bl, size_t len,
uint64_t off)
{
if (len > (size_t) INT_MAX)
return -EDOM;
bufferlist bl;
Mutex mylock("IoCtxImpl::read::mylock");

View File

@ -310,8 +310,8 @@ namespace librbd {
return r;
}
void flush_cache() {
int r;
int flush_cache() {
int r = 0;
Mutex mylock("librbd::ImageCtx::flush_cache");
Cond cond;
bool done;
@ -328,6 +328,7 @@ namespace librbd {
mylock.Unlock();
ldout(cct, 20) << "finished flushing cache" << dendl;
}
return r;
}
void shutdown_cache() {
@ -344,11 +345,14 @@ namespace librbd {
cache_lock.Lock();
object_cacher->release_set(object_set);
cache_lock.Unlock();
flush_cache();
int r = flush_cache();
if (r)
lderr(cct) << "flush_cache returned " << r << dendl;
cache_lock.Lock();
bool unclean = object_cacher->release_set(object_set);
cache_lock.Unlock();
assert(!unclean);
if (unclean)
lderr(cct) << "could not release all objects from cache" << dendl;
}
};
@ -1729,7 +1733,7 @@ ssize_t handle_sparse_read(CephContext *cct,
/* last hole */
if (buf_left > 0) {
ldout(cct, 0) << "<3>zeroing " << buf_ofs << "~" << buf_left << dendl;
ldout(cct, 10) << "<3>zeroing " << buf_ofs << "~" << buf_left << dendl;
r = cb(buf_ofs, buf_left, NULL, arg);
if (r < 0) {
return r;
@ -1809,14 +1813,13 @@ int _flush(ImageCtx *ictx)
int r;
// flush any outstanding writes
if (ictx->object_cacher) {
ictx->flush_cache();
r = 0;
r = ictx->flush_cache();
} else {
r = ictx->data_ctx.aio_flush();
}
if (r)
ldout(cct, 10) << "aio_flush " << ictx << " r = " << r << dendl;
lderr(cct) << "_flush " << ictx << " r = " << r << dendl;
return r;
}

View File

@ -4,6 +4,7 @@
#include "msg/Messenger.h"
#include "ObjectCacher.h"
#include "WritebackHandler.h"
#include "common/errno.h"
#include "common/perf_counters.h"
@ -27,7 +28,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, loff_t o
right->last_write_tid = left->last_write_tid;
right->set_state(left->get_state());
right->snapc = left->snapc;
loff_t newleftlen = off - left->start();
right->set_start(off);
right->set_length(left->length() - newleftlen);
@ -114,15 +115,16 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
p->second->get_state() == bh->get_state()) {
merge_left(p->second, bh);
bh = p->second;
} else
} else {
p++;
}
}
// to the right?
assert(p->second == bh);
p++;
if (p != data.end() &&
p->second->start() == bh->end() &&
p->second->get_state() == bh->get_state())
p->second->get_state() == bh->get_state())
merge_left(bh, p->second);
}
@ -160,7 +162,8 @@ bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left)
int ObjectCacher::Object::map_read(OSDRead *rd,
map<loff_t, BufferHead*>& hits,
map<loff_t, BufferHead*>& missing,
map<loff_t, BufferHead*>& rx)
map<loff_t, BufferHead*>& rx,
map<loff_t, BufferHead*>& errors)
{
for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
ex_it != rd->extents.end();
@ -169,7 +172,8 @@ int ObjectCacher::Object::map_read(OSDRead *rd,
if (ex_it->oid != oid.oid) continue;
ldout(oc->cct, 10) << "map_read " << ex_it->oid
<< " " << ex_it->offset << "~" << ex_it->length << dendl;
<< " " << ex_it->offset << "~" << ex_it->length
<< dendl;
loff_t cur = ex_it->offset;
loff_t left = ex_it->length;
@ -195,18 +199,21 @@ int ObjectCacher::Object::map_read(OSDRead *rd,
if (p->first <= cur) {
// have it (or part of it)
BufferHead *e = p->second;
if (e->is_clean() ||
e->is_dirty() ||
e->is_tx()) {
hits[cur] = e; // readable!
ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
}
else if (e->is_rx()) {
} else if (e->is_rx()) {
rx[cur] = e; // missing, not readable.
ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
}
else assert(0);
} else if (e->is_error()) {
errors[cur] = e;
ldout(oc->cct, 20) << "map_read error " << *e << dendl;
} else {
assert(0);
}
loff_t lenfromcur = MIN(e->end() - cur, left);
cur += lenfromcur;
@ -226,12 +233,12 @@ int ObjectCacher::Object::map_read(OSDRead *rd,
left -= MIN(left, n->length());
ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
continue; // more?
}
else
} else {
assert(0);
}
}
}
return(0);
return 0;
}
/*
@ -251,7 +258,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)
if (ex_it->oid != oid.oid) continue;
ldout(oc->cct, 10) << "map_write oex " << ex_it->oid
<< " " << ex_it->offset << "~" << ex_it->length << dendl;
<< " " << ex_it->offset << "~" << ex_it->length << dendl;
loff_t cur = ex_it->offset;
loff_t left = ex_it->length;
@ -312,8 +319,9 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)
p--; // move iterator back to final
assert(p->second == final);
merge_left(final, bh);
} else
} else {
final = bh;
}
}
// keep going.
@ -418,7 +426,8 @@ ObjectCacher::ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb,
max_dirty(max_dirty), target_dirty(target_dirty), max_size(max_size),
flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg),
flusher_stop(false), flusher_thread(this),
stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0), stat_dirty_waiting(0)
stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0),
stat_error(0), stat_dirty_waiting(0)
{
this->max_dirty_age.set_from_double(max_dirty_age);
perf_start();
@ -527,10 +536,6 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start,
<< " returned " << r
<< dendl;
if (r < 0) {
// TODO: fix bad read case
}
if (bl.length() < length) {
bufferptr bp(length - bl.length());
bp.zero();
@ -574,7 +579,14 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start,
bh->bl.substr_of(bl,
opos-bh->start(),
bh->length());
mark_clean(bh);
if (r < 0 && r != -ENOENT) {
mark_error(bh);
bh->error = r;
} else {
mark_clean(bh);
}
ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
opos = bh->end();
@ -588,7 +600,7 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start,
p++)
ls.splice(ls.end(), p->second);
bh->waitfor_read.clear();
finish_contexts(cct, ls);
finish_contexts(cct, ls, bh->error);
// clean up?
ob->try_merge_bh(bh);
@ -696,10 +708,8 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
<< oid
<< " tid " << tid
<< " " << start << "~" << length
<< " returned " << r
<< dendl;
if (r < 0) {
// TODO: handle write error
}
if (objects[poolid].count(oid) == 0) {
ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
@ -733,12 +743,19 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
continue;
}
// ok! mark bh clean.
mark_clean(bh);
ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
if (r >= 0) {
// ok! mark bh clean and error-free
mark_clean(bh);
ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
} else {
mark_dirty(bh);
ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
<< *bh << " r = " << r << " " << cpp_strerror(-r)
<< dendl;
}
}
// update last_commit.
assert(ob->last_commit_tid < tid);
ob->last_commit_tid = tid;
@ -748,7 +765,7 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
list<Context*> ls;
ls.splice(ls.begin(), ob->waitfor_commit[tid]);
ob->waitfor_commit.erase(tid);
finish_contexts(cct, ls);
finish_contexts(cct, ls, r);
}
// is the entire object set now clean and fully committed?
@ -846,9 +863,16 @@ bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, sna
* returns 0 if doing async read
*/
int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
{
return _readx(rd, oset, onfinish, true);
}
int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
bool external_call)
{
assert(lock.is_locked());
bool success = true;
int error = 0;
list<BufferHead*> hit_ls;
uint64_t bytes_in_cache = 0;
uint64_t bytes_not_in_cache = 0;
@ -867,8 +891,17 @@ int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
Object *o = get_object(soid, oset, ex_it->oloc);
// map extent into bufferheads
map<loff_t, BufferHead*> hits, missing, rx;
o->map_read(rd, hits, missing, rx);
map<loff_t, BufferHead*> hits, missing, rx, errors;
o->map_read(rd, hits, missing, rx, errors);
if (external_call) {
// retry reading error buffers
missing.insert(errors.begin(), errors.end());
} else {
// some reads had errors, fail later so completions
// are cleaned up up properly
// TODO: make read path not call _readx for every completion
hits.insert(errors.begin(), errors.end());
}
if (!missing.empty() || !rx.empty()) {
// read missing
@ -906,6 +939,8 @@ int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
bh_it != hits.end();
bh_it++) {
ldout(cct, 10) << "readx hit bh " << *bh_it->second << dendl;
if (bh_it->second->is_error() && bh_it->second->error)
error = bh_it->second->error;
hit_ls.push_back(bh_it->second);
bytes_in_cache += bh_it->second->length();
}
@ -964,7 +999,7 @@ int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
touch_bh(*bhit);
if (!success) {
if (perfcounter) {
if (perfcounter && external_call) {
perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
@ -972,7 +1007,7 @@ int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
}
return 0; // wait!
}
if (perfcounter) {
if (perfcounter && external_call) {
perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
perfcounter->inc(l_objectcacher_cache_ops_hit);
@ -984,7 +1019,7 @@ int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
// ok, assemble into result buffer.
uint64_t pos = 0;
if (rd->bl) {
if (rd->bl && !error) {
rd->bl->clear();
for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
i != stripe_map.end();
@ -1004,8 +1039,10 @@ int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
delete rd;
trim();
return pos;
assert(pos <= (uint64_t) INT_MAX);
return error ? error : pos;
}
@ -1720,7 +1757,7 @@ void ObjectCacher::verify_stats() const
{
ldout(cct, 10) << "verify_stats" << dendl;
loff_t clean = 0, dirty = 0, rx = 0, tx = 0, missing = 0;
loff_t clean = 0, dirty = 0, rx = 0, tx = 0, missing = 0, error = 0;
for (vector<hash_map<sobject_t, Object*> >::const_iterator i = objects.begin();
i != objects.end();
++i) {
@ -1748,6 +1785,9 @@ void ObjectCacher::verify_stats() const
case BufferHead::STATE_RX:
rx += bh->length();
break;
case BufferHead::STATE_ERROR:
error += bh->length();
break;
default:
assert(0);
}
@ -1760,12 +1800,14 @@ void ObjectCacher::verify_stats() const
<< " tx " << tx
<< " dirty " << dirty
<< " missing " << missing
<< " error " << error
<< dendl;
assert(clean == stat_clean);
assert(rx == stat_rx);
assert(tx == stat_tx);
assert(dirty == stat_dirty);
assert(missing == stat_missing);
assert(error == stat_error);
}
void ObjectCacher::bh_stat_add(BufferHead *bh)
@ -1790,6 +1832,11 @@ void ObjectCacher::bh_stat_add(BufferHead *bh)
case BufferHead::STATE_RX:
stat_rx += bh->length();
break;
case BufferHead::STATE_ERROR:
stat_error += bh->length();
break;
default:
assert(0 == "bh_stat_add: invalid bufferhead state");
}
if (get_stat_dirty_waiting() > 0)
stat_cond.Signal();
@ -1817,6 +1864,11 @@ void ObjectCacher::bh_stat_sub(BufferHead *bh)
case BufferHead::STATE_RX:
stat_rx -= bh->length();
break;
case BufferHead::STATE_ERROR:
stat_error -= bh->length();
break;
default:
assert(0 == "bh_stat_sub: invalid bufferhead state");
}
}
@ -1833,6 +1885,9 @@ void ObjectCacher::bh_set_state(BufferHead *bh, int s)
lru_rest.lru_insert_top(bh);
dirty_bh.erase(bh);
}
if (s != BufferHead::STATE_ERROR && bh->get_state() == BufferHead::STATE_ERROR) {
bh->error = 0;
}
// set state
bh_stat_sub(bh);

View File

@ -87,7 +87,8 @@ class ObjectCacher {
static const int STATE_DIRTY = 2;
static const int STATE_RX = 3;
static const int STATE_TX = 4;
static const int STATE_ERROR = 5; // a read error occurred
private:
// my fields
int state;
@ -102,16 +103,17 @@ class ObjectCacher {
tid_t last_write_tid; // version of bh (if non-zero)
utime_t last_write;
SnapContext snapc;
int error; // holds return value for failed reads
map< loff_t, list<Context*> > waitfor_read;
public:
// cons
BufferHead(Object *o) :
state(STATE_MISSING),
ref(0),
ob(o),
last_write_tid(0) {}
last_write_tid(0),
error(0) {}
// extent
loff_t start() const { return ex.start; }
@ -134,6 +136,7 @@ class ObjectCacher {
bool is_clean() { return state == STATE_CLEAN; }
bool is_tx() { return state == STATE_TX; }
bool is_rx() { return state == STATE_RX; }
bool is_error() { return state == STATE_ERROR; }
// reference counting
int get() {
@ -148,7 +151,6 @@ class ObjectCacher {
return ref;
}
};
// ******* Object *********
class Object {
@ -251,6 +253,7 @@ class ObjectCacher {
assert(data.count(bh->start()));
data.erase(bh->start());
}
bool is_empty() { return data.empty(); }
// mid-level
@ -262,7 +265,8 @@ class ObjectCacher {
int map_read(OSDRead *rd,
map<loff_t, BufferHead*>& hits,
map<loff_t, BufferHead*>& missing,
map<loff_t, BufferHead*>& rx);
map<loff_t, BufferHead*>& rx,
map<loff_t, BufferHead*>& errors);
BufferHead *map_write(OSDWrite *wr);
void truncate(loff_t s);
@ -341,6 +345,7 @@ class ObjectCacher {
loff_t stat_rx;
loff_t stat_tx;
loff_t stat_missing;
loff_t stat_error;
loff_t stat_dirty_waiting; // bytes that writers are waiting on to write
void verify_stats() const;
@ -370,6 +375,7 @@ class ObjectCacher {
void mark_clean(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_CLEAN); };
void mark_rx(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_RX); };
void mark_tx(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_TX); };
void mark_error(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_ERROR); };
void mark_dirty(BufferHead *bh) {
bh_set_state(bh, BufferHead::STATE_DIRTY);
lru_dirty.lru_touch(bh);
@ -406,6 +412,9 @@ class ObjectCacher {
void wrlock(Object *o);
void wrunlock(Object *o);
int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
bool external_call);
public:
void bh_read_finish(int64_t poolid, sobject_t oid, loff_t offset,
uint64_t length, bufferlist &bl, int r);
@ -488,11 +497,15 @@ class ObjectCacher {
Context *onfinish;
public:
C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c) : oc(_oc), rd(r), oset(os), onfinish(c) {}
void finish(int) {
int r = oc->readx(rd, oset, onfinish);
if (r > 0 && onfinish) {
onfinish->finish(r);
delete onfinish;
void finish(int r) {
if (r < 0) {
if (onfinish)
onfinish->complete(r);
return;
}
int ret = oc->_readx(rd, oset, onfinish, false);
if (ret != 0 && onfinish) {
onfinish->complete(ret);
}
}
};
@ -500,6 +513,11 @@ class ObjectCacher {
// non-blocking. async.
/**
* @note total read size must be <= INT_MAX, since
* the return value is total bytes read
*/
int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish);
int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock);
bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, snapid_t snapid);
@ -583,6 +601,7 @@ inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh)
if (bh.is_clean()) out << " clean";
if (bh.is_missing()) out << " missing";
if (bh.bl.length() > 0) out << " firstbyte=" << (int)bh.bl[0];
if (bh.error) out << " error=" << bh.error;
out << "]";
return out;
}