objectcacher: flush range, set

Add ability to flush a range of an object, or a vector of ObjectExtents.  Flush
any buffers that intersect the specified range, or the entire object if len==0.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2012-05-05 16:25:26 -07:00
parent 203a7d67aa
commit c8bd471b59
2 changed files with 79 additions and 9 deletions

View File

@ -1340,7 +1340,7 @@ void ObjectCacher::wrunlock(Object *o)
return;
}
flush(o); // flush first
flush(o, 0, 0); // flush first
int op = 0;
if (o->rdlock_ref > 0) {
@ -1425,19 +1425,31 @@ void ObjectCacher::purge(Object *ob)
// flush. non-blocking. no callback.
// true if clean, already flushed.
// false if we wrote something.
bool ObjectCacher::flush(Object *ob)
// be sloppy about the ranges and flush any buffer it touches
bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
{
bool clean = true;
for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
p != ob->data.end();
p++) {
ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
map<loff_t,BufferHead*>::iterator p = ob->data.lower_bound(offset);
if (p != ob->data.begin() &&
(p == ob->data.end() || p->first > offset)) {
p--; // might overlap!
if (p->first + p->second->length() <= offset)
p++; // doesn't overlap.
}
for ( ; p != ob->data.end(); p++) {
BufferHead *bh = p->second;
ldout(cct, 20) << "flush " << *bh << dendl;
if (length && bh->start() > offset+length) {
break;
}
if (bh->is_tx()) {
clean = false;
continue;
}
if (!bh->is_dirty()) continue;
if (!bh->is_dirty()) {
continue;
}
bh_write(bh);
clean = false;
}
@ -1463,7 +1475,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
!i.end(); ++i) {
Object *ob = *i;
if (!flush(ob)) {
if (!flush(ob, 0, 0)) {
// we'll need to gather...
safe = false;
@ -1485,6 +1497,52 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
return false;
}
// flush. non-blocking, takes callback.
// returns true if already flushed
bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv, Context *onfinish)
{
if (oset->objects.empty()) {
ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
return true;
}
ldout(cct, 10) << "flush_set " << oset << " on " << exv.size() << " ObjectExtents" << dendl;
// we'll need to wait for all objects to flush!
C_GatherBuilder gather(cct, onfinish);
bool safe = true;
for (vector<ObjectExtent>::iterator p = exv.begin();
p != exv.end();
++p) {
ObjectExtent &ex = *p;
sobject_t soid(ex.oid, CEPH_NOSNAP);
if (objects[oset->poolid].count(soid) == 0)
continue;
Object *ob = objects[oset->poolid][soid];
ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid << " " << ob << dendl;
if (!flush(ob, ex.offset, ex.length)) {
// we'll need to gather...
safe = false;
ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
<< ob->last_write_tid << " on " << *ob << dendl;
if (onfinish != NULL)
ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
}
}
if (onfinish != NULL)
gather.activate();
if (safe) {
ldout(cct, 10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl;
return true;
}
return false;
}
// commit. non-blocking, takes callback.
// return true if already flushed.

View File

@ -366,7 +366,18 @@ class ObjectCacher {
void trim(loff_t max=-1);
void flush(loff_t amount=0);
bool flush(Object *o);
/**
* flush a range of buffers
*
* Flush any buffers that intersect the specified extent. If len==0,
* flush *all* buffers for the object.
*
* @param o object
* @param off start offset
* @param len extent length, or 0 for entire object
* @return true if object was already clean/flushed.
*/
bool flush(Object *o, loff_t off, loff_t len);
loff_t release(Object *o);
void purge(Object *o);
@ -481,6 +492,7 @@ public:
bool set_is_dirty_or_committing(ObjectSet *oset);
bool flush_set(ObjectSet *oset, Context *onfinish=0);
bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex, Context *onfinish=0);
void flush_all(Context *onfinish=0);
bool commit_set(ObjectSet *oset, Context *oncommit);