*** empty log message ***

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@519 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sage 2005-12-10 00:40:04 +00:00
parent 0b76b3d0e9
commit 4be66aee51
14 changed files with 1316 additions and 467 deletions

View File

@ -4,12 +4,14 @@ client
ofs
- object map : object_t -> block_t
- col map : coll_t -> block_t
- freelists : block_t -> block_t
- on disk btree nodes need super_version
- commit thread
- reallocate on write
-
- objects
- collections set<object_t>
- Ebofs.free_blocks
- Ebofs.readonly
- real error codes

View File

@ -85,7 +85,7 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near)
ex.start += left.length;
ex.length -= left.length;
assert(ex.length == num);
release(left);
release_now(left);
} else {
// take middle part.
Extent left,right;
@ -95,8 +95,8 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near)
right.start = ex.start + num;
right.length = ex.length - left.length - num;
ex.length = num;
release(left);
release(right);
release_now(left);
release_now(right);
}
}
else {
@ -105,7 +105,7 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near)
right.start = ex.start + num;
right.length = ex.length - num;
ex.length = num;
release(right);
release_now(right);
}
}
@ -135,7 +135,25 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near)
return -1;
}
int Allocator::release(Extent& ex)
int Allocator::release(Extent& ex)
{
dout(1) << "release " << ex << " (into limbo)" << endl;
limbo.insert(ex.start, ex.length);
return 0;
}
int Allocator::release_limbo()
{
for (map<block_t,block_t>::iterator i = limbo.m.begin();
i != limbo.m.end();
i++) {
Extent ex(i->first, i->second);
release_now(ex);
}
return 0;
}
int Allocator::release_now(Extent& ex)
{
Extent newex = ex;

View File

@ -3,12 +3,16 @@
#include "types.h"
#include "include/interval_set.h"
class Ebofs;
class Allocator {
protected:
Ebofs *fs;
interval_set<block_t> limbo;
static int pick_bucket(block_t num) {
int b = 0;
while (num > 1) {
@ -29,6 +33,9 @@ class Allocator {
int allocate(Extent& ex, block_t num, block_t near=0);
int release(Extent& ex);
int release_now(Extent& ex);
int release_limbo();
};
#endif

View File

@ -268,7 +268,7 @@ int BlockDevice::cancel_io(ioh_t ioh)
// FIXME?
if (r == 0 && pbio->context) {
pbio->context->finish(0);
//pbio->context->finish(0); ******HELP!!!
delete pbio->context;
delete pbio;
}

View File

@ -92,7 +92,12 @@ void ObjectCache::tx_finish(ioh_t ioh, block_t start, block_t length, version_t
p->second->set_last_flushed(version);
bc->mark_clean(p->second);
if (p->second->ioh == ioh) p->second->ioh = 0;
if (p->second->ioh == ioh) {
p->second->ioh = 0;
}
else if (p->second->shadow_ioh == ioh) {
p->second->shadow_ioh = 0;
}
// trigger waiters
waiters.splice(waiters.begin(), p->second->waitfor_flush);
@ -188,6 +193,12 @@ int ObjectCache::map_read(block_t start, block_t len,
return 0;
}
/*
* map a range of pages on an object's buffer cache.
*
* - break up bufferheads that don't fall completely within the range
*/
int ObjectCache::map_write(block_t start, block_t len,
map<block_t, BufferHead*>& hits)
{

View File

@ -27,7 +27,12 @@ class BufferHead : public LRUObject {
public:
ObjectCache *oc;
bufferlist data;
bufferlist data, shadow_data;
ioh_t ioh, shadow_ioh; // any pending read/write op
version_t tx_epoch; // epoch this write is in
list<Context*> waitfor_read;
list<Context*> waitfor_flush;
private:
map<off_t, bufferlist> partial; // partial dirty content overlayed onto incoming data
@ -36,20 +41,16 @@ class BufferHead : public LRUObject {
int state;
version_t version; // current version in cache
version_t last_flushed; // last version flushed to disk
Extent object_loc; // block position _in_object_
utime_t dirty_stamp;
public:
ioh_t ioh; // any pending read/write op
list<Context*> waitfor_read;
list<Context*> waitfor_flush;
public:
BufferHead(ObjectCache *o) :
oc(o), ref(0), state(STATE_MISSING), version(0), last_flushed(0), ioh(0) {}
oc(o), ioh(0), shadow_ioh(0), tx_epoch(0),
ref(0), state(STATE_MISSING), version(0), last_flushed(0)
{}
ObjectCache *get_oc() { return oc; }
@ -337,10 +338,9 @@ public:
class BufferCache {
public:
Mutex lock;
BlockDevice& dev;
AlignedBufferPool& bufferpool;
Mutex &lock; // hack: ref to global lock
BlockDevice &dev;
AlignedBufferPool &bufferpool;
set<BufferHead*> dirty_bh;
@ -358,7 +358,8 @@ class BufferCache {
off_t stat_missing;
public:
BufferCache(BlockDevice& d, AlignedBufferPool& bp) : dev(d), bufferpool(bp),
BufferCache(BlockDevice& d, AlignedBufferPool& bp, Mutex& glock) :
lock(glock), dev(d), bufferpool(bp),
stat_waiter(0),
stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_partial(0), stat_missing(0)
{}

View File

@ -8,49 +8,65 @@
int Ebofs::mount()
{
// note: this will fail in mount -> unmount -> mount type situations, bc
// prior state isn't fully cleaned up.
ebofs_lock.Lock();
assert(!mounted);
// read super
bufferptr bp1, bp2;
bufferptr bp1 = bufferpool.alloc(EBOFS_BLOCK_SIZE);
bufferptr bp2 = bufferpool.alloc(EBOFS_BLOCK_SIZE);
dev.read(0, 1, bp1);
dev.read(1, 1, bp2);
struct ebofs_super *sb1 = (struct ebofs_super*)bp1.c_str();
struct ebofs_super *sb2 = (struct ebofs_super*)bp2.c_str();
struct ebofs_super *sb = 0;
dout(2) << "mount super @0 epoch " << sb1->epoch << endl;
dout(2) << "mount super @1 epoch " << sb2->epoch << endl;
// pick newest super
dout(2) << "mount super @0 v " << sb1->version << endl;
dout(2) << "mount super @1 v " << sb2->version << endl;
if (sb1->version > sb2->version)
struct ebofs_super *sb = 0;
if (sb1->epoch > sb2->epoch)
sb = sb1;
else
sb = sb2;
super_version = sb->version;
super_epoch = sb->epoch;
dout(2) << "mount epoch " << super_epoch << endl;
assert(super_epoch == sb->epoch);
// init node pools
dout(2) << "mount table nodepool" << endl;
table_nodepool.read( dev, &sb->table_nodepool );
dout(2) << "mount nodepool" << endl;
nodepool.init( &sb->nodepool );
nodepool.read_usemap( dev, super_epoch );
nodepool.read_clean_nodes( dev );
// open tables
dout(2) << "mount opening tables" << endl;
object_tab = new Table<object_t, Extent>( table_nodepool, sb->object_tab );
object_tab = new Table<object_t, Extent>( nodepool, sb->object_tab );
for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
free_tab[i] = new Table<block_t, block_t>( table_nodepool, sb->free_tab[i] );
collection_tab = new Table<coll_t, Extent>( table_nodepool, sb->collection_tab );
oc_tab = new Table<idpair_t, bool>( table_nodepool, sb->oc_tab );
co_tab = new Table<idpair_t, bool>( table_nodepool, sb->co_tab );
free_tab[i] = new Table<block_t, block_t>( nodepool, sb->free_tab[i] );
collection_tab = new Table<coll_t, Extent>( nodepool, sb->collection_tab );
oc_tab = new Table<idpair_t, bool>( nodepool, sb->oc_tab );
co_tab = new Table<idpair_t, bool>( nodepool, sb->co_tab );
dout(2) << "mount starting commit thread" << endl;
commit_thread.create();
dout(2) << "mount mounted" << endl;
mounted = true;
ebofs_lock.Unlock();
return 0;
}
int Ebofs::mkfs()
{
ebofs_lock.Lock();
assert(!mounted);
block_t num_blocks = dev.get_num_blocks();
// create first noderegion
@ -58,89 +74,110 @@ int Ebofs::mkfs()
nr.start = 2;
nr.length = num_blocks / 10000;
if (nr.length < 10) nr.length = 10;
NodeRegion *r = new NodeRegion(0,nr);
table_nodepool.add_region(r);
table_nodepool.init_all_free();
nodepool.add_region(nr);
dout(1) << "mkfs: first node region at " << nr << endl;
// allocate two usemaps
block_t usemap_len = ((nr.length-1) / 8 / EBOFS_BLOCK_SIZE) + 1;
nodepool.usemap_even.start = nr.end();
nodepool.usemap_even.length = usemap_len;
nodepool.usemap_odd.start = nodepool.usemap_even.end();
nodepool.usemap_odd.length = usemap_len;
dout(1) << "mkfs: even usemap at " << nodepool.usemap_even << endl;
dout(1) << "mkfs: odd usemap at " << nodepool.usemap_odd << endl;
// init tables
struct ebofs_table empty;
empty.num_keys = 0;
empty.root = -1;
empty.depth = 0;
object_tab = new Table<object_t, Extent>( table_nodepool, empty );
collection_tab = new Table<coll_t, Extent>( table_nodepool, empty );
object_tab = new Table<object_t, Extent>( nodepool, empty );
collection_tab = new Table<coll_t, Extent>( nodepool, empty );
for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
free_tab[i] = new Table<block_t,block_t>( table_nodepool, empty );
oc_tab = new Table<idpair_t, bool>( table_nodepool, empty );
co_tab = new Table<idpair_t, bool>( table_nodepool, empty );
free_tab[i] = new Table<block_t,block_t>( nodepool, empty );
oc_tab = new Table<idpair_t, bool>( nodepool, empty );
co_tab = new Table<idpair_t, bool>( nodepool, empty );
// add free space
Extent left;
left.start = nr.start + nr.length;
left.start = nodepool.usemap_odd.end();
left.length = num_blocks - left.start;
dout(1) << "mkfs: free blocks at " << left << endl;
allocator.release( left );
allocator.release_now( left );
// write nodes
dout(1) << "mkfs: flushing nodepool" << endl;
table_nodepool.induce_full_flush();
table_nodepool.flush( dev );
// write nodes, super, 2x
dout(1) << "mkfs: flushing nodepool and superblocks (2x)" << endl;
nodepool.commit_start( dev, 0 );
nodepool.commit_wait();
bufferptr superbp0;
prepare_super(0, superbp0);
write_super(0, superbp0);
// write super (2x)
dout(1) << "mkfs: writing superblocks" << endl;
super_version = 0;
write_super();
write_super();
nodepool.commit_start( dev, 1 );
nodepool.commit_wait();
bufferptr superbp1;
prepare_super(1, superbp1);
write_super(1, superbp1);
mounted = true;
// free memory
dout(1) << "mkfs: cleaning up" << endl;
close_tables();
dout(1) << "mkfs: done" << endl;
ebofs_lock.Unlock();
return 0;
}
int Ebofs::umount()
void Ebofs::close_tables()
{
mounted = false;
// wait
// flush
dout(1) << "umount: flushing nodepool" << endl;
table_nodepool.flush( dev );
// super
dout(1) << "umount: writing superblocks" << endl;
write_super();
// close tables
delete object_tab;
for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
delete free_tab[i];
delete collection_tab;
delete oc_tab;
delete co_tab;
}
int Ebofs::umount()
{
ebofs_lock.Lock();
// mark unmounting
readonly = true;
unmounting = true;
// kick commit thread
commit_cond.Signal();
// wait
ebofs_lock.Unlock();
commit_thread.join();
// free memory
close_tables();
ebofs_lock.Unlock();
return 0;
}
int Ebofs::write_super()
void Ebofs::prepare_super(version_t epoch, bufferptr& bp)
{
struct ebofs_super sb;
super_version++;
block_t bno = super_version % 2;
dout(1) << "write_super v" << super_version << " to b" << bno << endl;
dout(1) << "prepare_super v" << epoch << endl;
// fill in super
memset(&sb, 0, sizeof(sb));
sb.s_magic = EBOFS_MAGIC;
sb.version = super_version;
sb.epoch = epoch;
sb.num_blocks = dev.get_num_blocks();
sb.free_blocks = free_blocks;
//sb.num_objects = num_objects;
@ -170,18 +207,80 @@ int Ebofs::write_super()
sb.co_tab.depth = co_tab->get_depth();
// pools
sb.table_nodepool.num_regions = table_nodepool.get_num_regions();
for (int i=0; i<table_nodepool.get_num_regions(); i++) {
sb.table_nodepool.region_loc[i] = table_nodepool.get_region_loc(i);
sb.nodepool.num_regions = nodepool.region_loc.size();
for (unsigned i=0; i<nodepool.region_loc.size(); i++) {
sb.nodepool.region_loc[i] = nodepool.region_loc[i];
}
bufferptr bp = bufferpool.alloc(EBOFS_BLOCK_SIZE);
sb.nodepool.node_usemap_even = nodepool.usemap_even;
sb.nodepool.node_usemap_odd = nodepool.usemap_odd;
// put in a buffer
bp = bufferpool.alloc(EBOFS_BLOCK_SIZE);
memcpy(bp.c_str(), (const char*)&sb, sizeof(sb));
dev.write(bno, 1, bp);
return 0;
}
void Ebofs::write_super(version_t epoch, bufferptr& bp)
{
block_t bno = epoch & 1;
dout(1) << "write_super v" << epoch << " to b" << bno << endl;
dev.write(bno, 1, bp);
}
int Ebofs::commit_thread_entry()
{
dout(10) << "commit_thread start" << endl;
ebofs_lock.Lock();
while (mounted) {
// wait
//commit_cond.Wait(ebofs_lock, utime_t(EBOFS_COMMIT_INTERVAL,0)); // wait for kick, or 10s.
commit_cond.Wait(ebofs_lock);//, utime_t(EBOFS_COMMIT_INTERVAL,0)); // wait for kick, or 10s.
if (unmounting) {
dout(10) << "commit_thread unmounting: final commit pass" << endl;
assert(readonly);
unmounting = false;
mounted = false;
}
super_epoch++;
dout(10) << "commit_thread commit start, new epoch is " << super_epoch << endl;
// (async) write onodes+condes (do this first; it currently involves inode reallocation)
commit_inodes_start();
allocator.release_limbo();
// (async) write btree nodes
nodepool.commit_start( dev, super_epoch );
// prepare super (before any changes get made!)
bufferptr superbp;
prepare_super(super_epoch, superbp);
// wait it all to flush (drops global lock)
commit_inodes_wait();
nodepool.commit_wait();
commit_bc_wait(super_epoch-1);
// ok, now (synchronously) write the prior super!
dout(10) << "commit_thread commit flushed, writing super for prior epoch" << endl;
ebofs_lock.Unlock();
write_super(super_epoch, superbp);
ebofs_lock.Lock();
dout(10) << "commit_thread commit finish" << endl;
}
ebofs_lock.Unlock();
dout(10) << "commit_thread finish" << endl;
return 0;
}
// *** onodes ***
@ -253,21 +352,20 @@ Onode* Ebofs::get_onode(object_t oid)
}
void Ebofs::write_onode(Onode *on)
void Ebofs::write_onode(Onode *on, Context *c)
{
// allocate
// buffer
int bytes = sizeof(ebofs_onode) + on->get_attr_bytes() + on->get_extent_bytes();
unsigned blocks = (bytes-1)/EBOFS_BLOCK_SIZE + 1;
bufferlist bl;
bufferpool.alloc( EBOFS_BLOCK_SIZE*blocks, bl );
// place on disk
if (on->onode_loc.length < blocks) {
// relocate onode!
// (always) relocate onode
if (1) {
if (on->onode_loc.length)
allocator.release(on->onode_loc);
block_t first = 0;
if (on->extents.size())
first = on->extents[0].start;
@ -276,7 +374,7 @@ void Ebofs::write_onode(Onode *on)
object_tab->remove( on->object_id );
object_tab->insert( on->object_id, on->onode_loc );
}
struct ebofs_onode eo;
eo.onode_loc = on->onode_loc;
eo.object_id = on->object_id;
@ -306,7 +404,7 @@ void Ebofs::write_onode(Onode *on)
}
// write
dev.write( on->onode_loc.start, on->onode_loc.length, bl );
dev.write( on->onode_loc.start, on->onode_loc.length, bl, c );
}
void Ebofs::remove_onode(Onode *on)
@ -330,7 +428,13 @@ void Ebofs::put_onode(Onode *on)
on->put();
}
void Ebofs::dirty_onode(Onode *on)
{
if (!on->is_dirty()) {
on->mark_dirty();
dirty_onodes.insert(on);
}
}
void Ebofs::trim_onode_cache()
{
@ -408,21 +512,20 @@ Cnode* Ebofs::get_cnode(object_t cid)
return cn;
}
void Ebofs::write_cnode(Cnode *cn)
void Ebofs::write_cnode(Cnode *cn, Context *c)
{
// allocate
// allocate buffer
int bytes = sizeof(ebofs_cnode) + cn->get_attr_bytes();
unsigned blocks = (bytes-1)/EBOFS_BLOCK_SIZE + 1;
bufferlist bl;
bufferpool.alloc( EBOFS_BLOCK_SIZE*blocks, bl );
// place on disk
if (cn->cnode_loc.length < blocks) {
// relocate cnode!
// (always) relocate cnode!
if (1) {
if (cn->cnode_loc.length)
allocator.release(cn->cnode_loc);
allocator.allocate(cn->cnode_loc, blocks, 0);
collection_tab->remove( cn->coll_id );
collection_tab->insert( cn->coll_id, cn->cnode_loc );
@ -449,7 +552,7 @@ void Ebofs::write_cnode(Cnode *cn)
}
// write
dev.write( cn->cnode_loc.start, cn->cnode_loc.length, bl );
dev.write( cn->cnode_loc.start, cn->cnode_loc.length, bl, c );
}
void Ebofs::remove_cnode(Cnode *cn)
@ -472,11 +575,73 @@ void Ebofs::put_cnode(Cnode *cn)
cn->put();
}
void Ebofs::dirty_cnode(Cnode *cn)
{
if (!cn->is_dirty()) {
cn->mark_dirty();
dirty_cnodes.insert(cn);
}
}
class C_E_InodeFlush : public Context {
Ebofs *ebofs;
public:
C_E_InodeFlush(Ebofs *e) : ebofs(e) {}
void finish(int r) {
ebofs->flush_inode_finish();
}
};
void Ebofs::flush_inode_finish()
{
ebofs_lock.Lock();
inodes_flushing--;
if (inodes_flushing == 0)
inode_commit_cond.Signal();
ebofs_lock.Unlock();
}
void Ebofs::commit_inodes_start()
{
dout(10) << "commit_inodes_start" << endl;
assert(inodes_flushing == 0);
// onodes
for (set<Onode*>::iterator i = dirty_onodes.begin();
i != dirty_onodes.end();
i++) {
Onode *on = *i;
inodes_flushing++;
write_onode(on, new C_E_InodeFlush(this));
on->mark_clean();
}
// cnodes
for (set<Cnode*>::iterator i = dirty_cnodes.begin();
i != dirty_cnodes.end();
i++) {
Cnode *cn = *i;
inodes_flushing++;
write_cnode(cn, new C_E_InodeFlush(this));
cn->mark_clean();
}
dout(10) << "commit_inodes_start writing " << inodes_flushing << " onodes+cnodes" << endl;
}
void Ebofs::commit_inodes_wait()
{
// caller must hold ebofs_lock
while (inodes_flushing > 0) {
dout(10) << "commit_inodes_wait for " << inodes_flushing << " onodes+cnodes to flush" << endl;
inode_commit_cond.Wait(ebofs_lock);
}
dout(10) << "commit_inodes_wait all flushed" << endl;
}
@ -486,9 +651,10 @@ void Ebofs::put_cnode(Cnode *cn)
// *** buffer cache ***
// ... should already hold lock ...
void Ebofs::trim_buffer_cache()
{
bc.lock.Lock();
//ebofs_lock.Lock();
// flush any dirty items?
while (bc.lru_dirty.lru_get_size() > bc.lru_dirty.lru_get_max()) {
@ -528,10 +694,16 @@ void Ebofs::trim_buffer_cache()
<< bc.lru_rest.lru_get_size() << " rest + "
<< bc.lru_dirty.lru_get_size() << " dirty " << endl;
bc.lock.Unlock();
//ebofs_lock.Unlock();
}
void Ebofs::commit_bc_wait(version_t epoch)
{
dout(1) << "commit_bc_wait" << endl;
}
void Ebofs::flush_all()
{
// FIXME what about partial heads?
@ -627,6 +799,7 @@ void Ebofs::bh_write(Onode *on, BufferHead *bh)
assert(bh->is_dirty());
bc.mark_tx(bh);
bh->tx_epoch = super_epoch; // note the epoch!
// get extents
vector<Extent> ex;
@ -651,6 +824,101 @@ void Ebofs::bh_write(Onode *on, BufferHead *bh)
}
/*
* allocate a write to blocks on disk.
* take care to not overwrite any "safe" data blocks.
* break up bufferheads in bh_hits that span realloc boundaries.
* final bufferhead set stored in final!
*/
void Ebofs::alloc_write(Onode *on,
block_t start, block_t len,
map<block_t, BufferHead*>& hits)
{
// first decide what pages to (re)allocate
interval_set<block_t> alloc;
on->map_alloc_regions(start, len, alloc);
dout(10) << "alloc_write need to alloc " << alloc << endl;
// merge alloc into onode uncommitted map
cout << "union of " << on->uncommitted << " and " << alloc << endl;
on->uncommitted.union_of(alloc);
dout(10) << "alloc_write onode uncommitted now " << on->uncommitted << endl;
// allocate the space
for (map<block_t,block_t>::iterator i = alloc.m.begin();
i != alloc.m.end();
i++) {
// get old region
vector<Extent> old;
on->map_extents(i->first, i->second, old);
for (unsigned o=0; o<old.size(); o++)
allocator.release(old[o]);
// allocate new space
block_t left = i->second;
block_t cur = i->first;
while (left > 0) {
Extent ex;
allocator.allocate(ex, left, 0);
dout(10) << "alloc_write got " << ex << " for object offset " << cur << endl;
on->set_extent(cur, ex); // map object to new region
left -= ex.length;
cur += ex.length;
}
}
// now break up the bh's as necessary
block_t cur = start;
block_t left = len;
map<block_t,BufferHead*>::iterator bhp = hits.begin();
map<block_t,block_t>::iterator ap = alloc.m.begin();
block_t aoff = 0;
while (left > 0) {
assert(cur == bhp->first);
BufferHead *bh = bhp->second;
assert(cur == bh->start());
assert(left >= bh->length());
assert(ap->first+aoff == bh->start());
if (ap->second-aoff == bh->length()) {
// perfect.
cur += bh->length();
left -= bh->length();
ap++;
aoff = 0;
bhp++;
continue;
}
if (bh->length() < ap->second-aoff) {
// bh is within alloc range.
cur += bh->length();
left -= bh->length();
aoff += bh->length();
bhp++;
continue;
}
// bh spans alloc boundary, split it!
assert(bh->length() > ap->second - aoff);
BufferHead *n = bc.split(bh, bh->start() + ap->second-aoff);
hits[n->start()] = n; // add new guy to hit map
// bh is now shortened...
cur += bh->length();
left -= bh->length();
assert(ap->second == aoff + bh->length());
aoff = 0;
ap++;
continue;
}
}
void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
{
@ -673,7 +941,7 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
dout(10) << "apply_write extending object size " << on->object_size
<< " -> " << off+len << endl;
on->object_size = off+len;
on->mark_dirty();
dirty_onode(on);
}
if (zleft)
dout(10) << "apply_write zeroing first " << zleft << " bytes" << endl;
@ -681,11 +949,13 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
block_t blast = (len+off-1) / EBOFS_BLOCK_SIZE;
block_t blen = blast-bstart+1;
bc.lock.Lock();
// map b range onto buffer_heads
map<block_t, BufferHead*> hits;
oc->map_write(bstart, blen, hits);
// allocate write on disk. break buffer_heads across realloc/no realloc boundaries
alloc_write(on, bstart, blen, hits);
// get current versions
version_t lowv, highv;
oc->scan_versions(bstart, blen, lowv, highv);
@ -702,10 +972,20 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
bh->set_version(highv+1);
// cancel old io?
if (bh->ioh) {
dout(10) << "apply_write canceling old io on " << *bh << endl;
bc.dev.cancel_io( bh->ioh );
bh->ioh = 0;
if (bh->is_tx()) {
if (bh->tx_epoch == super_epoch) {
// try to cancel the old io (just bc it's a waste)
dout(10) << "apply_write canceling old io on " << *bh << endl;
bc.dev.cancel_io(bh->ioh);
bh->ioh = 0;
} else {
// this tx is from prior epoch! shadow+copy the buffer before we modify it.
bh->shadow_data.claim(bh->data);
bc.bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data); // new buffers!
bh->data.copy_in(0, bh->length()*EBOFS_BLOCK_SIZE, bh->shadow_data);
bh->shadow_ioh = bh->ioh;
bh->ioh = 0;
}
}
// partial at head or tail?
@ -753,6 +1033,7 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
bc.dev.cancel_io( bh->ioh );
bh->ioh = 0;
}
bh_write(on, bh);
}
else if (bh->is_rx()) {
dout(10) << "apply_write rx -> partial " << *bh << endl;
@ -793,8 +1074,10 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
left -= len_in_bh-z;
opos += len_in_bh-z;
if (!bh->is_dirty())
if (!bh->is_dirty())
bc.mark_dirty(bh);
bh_write(on, bh);
}
continue;
}
@ -834,13 +1117,14 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
// mark dirty
if (!bh->is_dirty())
bc.mark_dirty(bh);
bh_write(on, bh);
}
assert(zleft == 0);
assert(left == 0);
assert(opos == off+len);
//assert(blpos == bl.length());
bc.lock.Unlock();
}
@ -967,31 +1251,34 @@ int Ebofs::read(object_t oid,
size_t len, off_t off,
bufferlist& bl)
{
ebofs_lock.Lock();
Onode *on = get_onode(oid);
if (!on)
if (!on) {
ebofs_lock.Unlock();
return -1; // object dne?
}
// read data into bl. block as necessary.
Cond cond;
bc.lock.Lock();
while (1) {
// check size bound
if (off >= on->object_size) {
put_onode(on);
if (off >= on->object_size)
break;
}
size_t will_read = MIN( off+len, on->object_size ) - off;
size_t will_read = MIN(off+len, on->object_size) - off;
if (attempt_read(on, will_read, off, bl, &cond))
break; // yay
// wait
cond.Wait(bc.lock);
cond.Wait(ebofs_lock);
}
bc.lock.Unlock();
put_onode(on);
ebofs_lock.Unlock();
return 0;
}
@ -1000,71 +1287,70 @@ int Ebofs::write(object_t oid,
size_t len, off_t off,
bufferlist& bl, bool fsync)
{
// FIXME
return write(oid, len, off, bl, (Context*)0);
// wait?
if (fsync) {
// wait for flush.
// FIXME. wait on a Cond or whatever! be careful about ebofs_lock.
return write(oid, len, off, bl, (Context*)0);
} else {
// don't wait.
return write(oid, len, off, bl, (Context*)0);
}
}
int Ebofs::write(object_t oid,
size_t len, off_t off,
bufferlist& bl, Context *onflush)
{
ebofs_lock.Lock();
assert(len > 0);
// get inode
Onode *on = get_onode(oid);
if (!on)
on = new_onode(oid); // new inode!
// allocate more space?
block_t bnum = (len+off-1) / EBOFS_BLOCK_SIZE + 1;
if (bnum > on->object_blocks) {
block_t need = bnum - on->object_blocks;
block_t near = 0;
if (on->extents.size())
near = on->extents[on->extents.size()-1].end();
while (need > 0) {
Extent ex;
allocator.allocate(ex, need, near);
dout(10) << "apply_write allocated " << ex << " near " << near << endl;
on->extents.push_back(ex);
on->object_blocks += ex.length;
need -= ex.length;
near = ex.end();
}
}
// apply to buffer cache
// apply write to buffer cache
apply_write(on, len, off, bl);
// attr changes
// apply attribute changes
// ***
// prepare (eventual) journal entry.
// set up onfinish waiter
if (onflush) {
}
// done
put_onode(on);
ebofs_lock.Unlock();
return 0;
}
int Ebofs::remove(object_t oid)
{
ebofs_lock.Lock();
// get inode
Onode *on = get_onode(oid);
if (!on) return -1;
if (!on) {
ebofs_lock.Unlock();
return -1;
}
// FIXME locking, buffer, flushing etc.
assert(0);
remove_onode(on);
ebofs_lock.Unlock();
return 0;
}
@ -1077,23 +1363,28 @@ int Ebofs::truncate(object_t oid, off_t size)
bool Ebofs::exists(object_t oid)
{
ebofs_lock.Lock();
Onode *on = get_onode(oid);
if (!on)
return false;
put_onode(on);
return true;
if (on) put_onode(on);
ebofs_lock.Unlock();
return on ? true:false;
}
int Ebofs::stat(object_t oid, struct stat *st)
{
ebofs_lock.Lock();
Onode *on = get_onode(oid);
if (!on)
if (!on) {
ebofs_lock.Unlock();
return -1;
}
// ??
st->st_size = on->object_size;
put_onode(on);
ebofs_lock.Unlock();
return 0;
}
@ -1107,7 +1398,7 @@ int Ebofs::setattr(object_t oid, const char *name, void *value, size_t size)
string n(name);
AttrVal val((char*)value, size);
on->attr[n] = val;
on->mark_dirty();
dirty_onode(on);
put_onode(on);
return 0;
@ -1122,7 +1413,7 @@ int Ebofs::getattr(object_t oid, const char *name, void *value, size_t size)
if (on->attr.count(n) == 0) return -1;
memcpy(value, on->attr[n].data, MIN( on->attr[n].len, (int)size ));
on->mark_dirty();
dirty_onode(on);
put_onode(on);
return 0;
}
@ -1135,7 +1426,7 @@ int Ebofs::rmattr(object_t oid, const char *name)
string n(name);
on->attr.erase(n);
on->mark_dirty();
dirty_onode(on);
put_onode(on);
return 0;
}
@ -1257,7 +1548,7 @@ int Ebofs::collection_setattr(coll_t cid, const char *name, void *value, size_t
string n(name);
AttrVal val((char*)value, size);
cn->attr[n] = val;
cn->mark_dirty();
dirty_cnode(cn);
put_cnode(cn);
return 0;
@ -1272,7 +1563,6 @@ int Ebofs::collection_getattr(coll_t cid, const char *name, void *value, size_t
if (cn->attr.count(n) == 0) return -1;
memcpy(value, cn->attr[n].data, MIN( cn->attr[n].len, (int)size ));
cn->mark_dirty();
put_cnode(cn);
return 0;
}
@ -1285,7 +1575,7 @@ int Ebofs::collection_rmattr(coll_t cid, const char *name)
string n(name);
cn->attr.erase(n);
cn->mark_dirty();
dirty_cnode(cn);
put_cnode(cn);
return 0;
}

View File

@ -26,16 +26,35 @@ inline ostream& operator<<(ostream& out, idpair_t oc) {
}
const int EBOFS_COMMIT_INTERVAL = 10; // whatever
class Ebofs : public ObjectStore {
protected:
Mutex ebofs_lock; // a beautiful global lock
// ** super **
bool mounted;
BlockDevice &dev;
version_t super_version;
bool mounted, unmounting;
bool readonly;
version_t super_epoch;
void prepare_super(version_t epoch, bufferptr& bp);
void write_super(version_t epoch, bufferptr& bp);
Cond commit_cond; // to wake up the commit thread
int commit_thread_entry();
class CommitThread : public Thread {
Ebofs *ebofs;
public:
CommitThread(Ebofs *e) : ebofs(e) {}
void *entry() {
ebofs->commit_thread_entry();
return 0;
}
} commit_thread;
int write_super();
// ** allocator **
block_t free_blocks;
@ -48,38 +67,50 @@ class Ebofs : public ObjectStore {
// ** tables and sets **
// nodes
NodePool table_nodepool; // for primary tables.
NodePool nodepool; // for all tables...
// tables
Table<object_t, Extent> *object_tab;
Table<block_t,block_t> *free_tab[EBOFS_NUM_FREE_BUCKETS];
Table<object_t, Extent> *object_tab;
Table<block_t,block_t> *free_tab[EBOFS_NUM_FREE_BUCKETS];
// collections
Table<coll_t, Extent> *collection_tab;
Table<coll_t, Extent> *collection_tab;
Table<idpair_t, bool> *oc_tab;
Table<idpair_t, bool> *co_tab;
void close_tables();
// ** onode cache **
hash_map<object_t, Onode*> onode_map; // onode cache
LRU onode_lru;
set<Onode*> dirty_onodes;
Onode* new_onode(object_t oid); // make new onode. ref++.
Onode* get_onode(object_t oid); // get cached onode, or read from disk. ref++.
void write_onode(Onode *on);
void write_onode(Onode *on, Context *c);
void remove_onode(Onode *on);
void put_onode(Onode* o); // put it back down. ref--.
void dirty_onode(Onode* o);
// ** cnodes **
hash_map<coll_t, Cnode*> cnode_map;
LRU cnode_lru;
set<Cnode*> dirty_cnodes;
int inodes_flushing;
Cond inode_commit_cond;
Cnode* new_cnode(coll_t cid);
Cnode* get_cnode(coll_t cid);
void write_cnode(Cnode *cn);
void write_cnode(Cnode *cn, Context *c);
void remove_cnode(Cnode *cn);
void put_cnode(Cnode *cn);
void dirty_cnode(Cnode *cn);
void flush_inode_finish();
void commit_inodes_start();
void commit_inodes_wait();
friend class C_E_InodeFlush;
public:
void trim_onode_cache();
@ -89,12 +120,17 @@ class Ebofs : public ObjectStore {
BufferCache bc;
pthread_t flushd_thread_id;
void commit_bc_wait(version_t epoch);
public:
void trim_buffer_cache();
void flush_all();
protected:
void zero(Onode *on, size_t len, off_t off, off_t write_thru);
//void zero(Onode *on, size_t len, off_t off, off_t write_thru);
void alloc_write(Onode *on,
block_t start, block_t len,
map<block_t, BufferHead*>& hits);
void apply_write(Onode *on, size_t len, off_t off, bufferlist& bl);
bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond *will_wait_on);
@ -111,11 +147,14 @@ class Ebofs : public ObjectStore {
public:
Ebofs(BlockDevice& d) :
dev(d),
dev(d),
mounted(false), unmounting(false), readonly(false), super_epoch(0),
commit_thread(this),
free_blocks(0), allocator(this),
bufferpool(EBOFS_BLOCK_SIZE),
object_tab(0), collection_tab(0), oc_tab(0), co_tab(0),
bc(dev, bufferpool) {
inodes_flushing(0),
bc(dev, bufferpool, ebofs_lock) {
for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
free_tab[i] = 0;
}

View File

@ -6,6 +6,9 @@
#include "types.h"
#include "BufferCache.h"
#include "include/interval_set.h"
/*
* object node (like an inode)
*
@ -34,6 +37,8 @@ public:
map<string, AttrVal > attr;
vector<Extent> extents;
interval_set<block_t> uncommitted;
ObjectCache *oc;
bool dirty;
@ -94,6 +99,81 @@ public:
// allocation
void _append_extent(Extent& ex) {
if (extents.size() &&
extents[extents.size()-1].end() == ex.start)
extents[extents.size()-1].length += ex.length;
else
extents.push_back(ex);
}
void set_extent(block_t offset, Extent ex) {
assert(offset <= object_blocks);
// at the end?
if (offset == object_blocks) {
_append_extent(ex);
object_blocks += ex.length;
return;
}
// nope. ok, rebuild the extent list.
vector<Extent> old;
old.swap(extents);
assert(extents.empty());
unsigned oldex = 0;
block_t oldoff = 0;
block_t cur = 0;
// copy up to offset
while (cur < offset) {
Extent t;
t.start = old[oldex].start+oldoff;
t.length = MIN(offset-cur, old[oldex].length-oldoff);
_append_extent(t);
oldoff += t.length;
if (oldoff == old[oldex].length) {
oldex++;
oldoff = 0;
}
}
// add our new extent
_append_extent(ex);
if (offset + ex.length > object_blocks)
object_blocks = offset + ex.length;
// skip past it in the old stuff
block_t sleft = ex.length;
while (sleft > 0) {
block_t skip = MIN(ex.length, old[oldex].length-oldoff);
sleft -= skip;
oldoff += skip;
if (oldoff == old[oldex].length) {
oldex++;
oldoff = 0;
if (oldex == old.size()) break;
}
}
// copy anything left?
if (oldex < old.size()) {
if (oldoff) {
Extent t;
t.start = old[oldex].start+oldoff;
t.length = old[oldex].length-oldoff;
_append_extent(t);
} else {
_append_extent(old[oldex++]);
}
}
}
/* map_extents(start, len, ls)
* map teh given page range into extents on disk.
*/
int map_extents(block_t start, block_t len, vector<Extent>& ls) {
block_t cur = 0;
for (unsigned i=0; i<extents.size(); i++) {
@ -113,17 +193,87 @@ public:
return 0;
}
// attr
int getxattr(char *name, void *value, int size) {
return 0;
/* map_alloc_regions(start, len, map)
* map range into regions that need to be (re)allocated on disk
* because they overlap "safe" (or unallocated) parts of the object
*/
void map_alloc_regions(block_t start, block_t len,
interval_set<block_t>& alloc) {
interval_set<block_t> already_uncom;
alloc.insert(start, len); // start with whole range
already_uncom.intersection_of(alloc, uncommitted);
alloc.subtract(already_uncom); // take out the bits that aren't yet committed
}
int setxattr(char *name, void *value, int size) {
return 0;
/*
// (un)committed ranges
void dirty_range(block_t start, block_t len) {
// frame affected area (used for simplify later)
block_t first = start;
block_t last = start+len;
// put in uncommitted range
map<block_t,block_t>::iterator p = uncommitted.lower_bound(start);
if (p != uncommitted.begin() &&
(p->first > start || p == uncommitted.end())) {
p--;
first = p->first;
if (p->first + p->second <= start)
p++;
}
for (; len>0; p++) {
if (p == uncommitted.end()) {
uncommitted[start] = len;
break;
}
if (p->first <= start) {
block_t skip = start - p->first;
block_t overlap = MIN( p->second - skip, len );
start += overlap;
len -= overlap;
} else if (p->first > start) {
block_t add = MIN(len, p->first - start);
uncommitted[start] = add;
start += add;
len -= add;
}
}
// simplify uncommitted
map<block_t,block_t>::iterator p = uncommitted.lower_bound(first);
block_t prevstart = p->first;
block_t prevend = p->first + p->second;
p++;
while (p != uncommitted.end()) {
if (prevend == p->first) {
uncommitted[prevstart] += p->second;
prevend += p->second;
block_t t = p->first;
p++;
uncommitted.erase(t);
} else {
prevstart = p->first;
prevend = p->first + p->second;
if (prevend >= last) break; // we've gone past our updates
p++;
}
}
}
int removexattr(char *name) {
return 0;
void mark_committed() {
uncommitted.clear();
}
bool is_uncommitted(block_t b) {
}
*/
// pack/unpack
int get_attr_bytes() {

View File

@ -473,8 +473,7 @@ class Table : public _Table {
// create a root node (leaf!)
assert(root == -1);
assert(depth == 0);
Nodeptr newroot( pool.new_node() );
newroot.set_type(Node::TYPE_LEAF);
Nodeptr newroot( pool.new_node(Node::TYPE_LEAF) );
root = newroot.get_id();
depth++;
}
@ -529,9 +528,8 @@ class Table : public _Table {
cursor.dirty();
// split
Nodeptr newnode( pool.new_node() );
Nodeptr leftnode = cursor.open[cursor.level];
newnode.set_type( leftnode.node->get_type() );
Nodeptr newnode( pool.new_node(leftnode.node->get_type()) );
leftnode.split( newnode );
/* insert our item */
@ -558,10 +556,9 @@ class Table : public _Table {
if (cursor.level == 0) {
/* split root. */
dbtout << "that split was the root " << root << endl;
Nodeptr newroot( pool.new_node() );
Nodeptr newroot( pool.new_node(Node::TYPE_INDEX) );
/* new root node */
newroot.set_type(Node::TYPE_INDEX);
newroot.set_size(2);
newroot.index_item(0).key = leftnode.key(0);
newroot.index_item(0).node = root;

View File

@ -18,8 +18,12 @@ int main(int argc, char **argv)
}
// mkfs
Ebofs mfs(dev);
mfs.mkfs();
// test-o-rama!
Ebofs fs(dev);
fs.mkfs();
fs.mount();
if (0) { // test
bufferlist bl;

View File

@ -8,32 +8,34 @@
#include "AlignedBufferPool.h"
/* node status on disk, in memory
*
* DISK MEMORY ON LISTS EVENT
*
* claim cycle:
* free free free -
* free dirty dirty mark_dirty()
* free dirty committing start commit
* inuse dirty committing (write happens)
* inuse clean - finish commit
*
* release cycle:
* inuse clean - -
* inuse free limbo release
* inuse free committing start_write
* free free committing (write happens)
* free free free finish_write
*/
/*
disk wire memory
free free -> free can alloc
free used -> dirty can modify
free used used -> tx
free used free -> limbo
used used -> clean
used free -> limbo
// meaningless
used free free -> free can alloc
used free used __DNE__
*/
class Node {
public:
static const int STATUS_FREE = 0;
static const int STATUS_DIRTY = 1;
static const int STATUS_CLEAN = 2;
// bit fields
static const int STATE_CLEAN = 1;
static const int STATE_DIRTY = 2;
static const int STATE_TX = 3;
static const int ITEM_LEN = EBOFS_NODE_BYTES - sizeof(int) - sizeof(int) - sizeof(int);
@ -42,55 +44,91 @@ class Node {
protected:
nodeid_t id;
int state; // use bit fields above!
bufferptr bptr;
int *nrecs;
int *status;
bufferptr shadow_bptr;
// in disk buffer
int *type;
int *nrecs;
public:
Node(nodeid_t i) : id(i) {
bptr = new buffer(EBOFS_NODE_BYTES);
Node(nodeid_t i, bufferptr& b, int s) : id(i), state(s), bptr(b) {
nrecs = (int*)(bptr.c_str());
status = (int*)(bptr.c_str() + sizeof(*nrecs));
type = (int*)(bptr.c_str() + sizeof(*status) + sizeof(*nrecs));
clear();
}
Node(nodeid_t i, bufferptr& b) : id(i), bptr(b) {
nrecs = (int*)(bptr.c_str());
status = (int*)(bptr.c_str() + sizeof(*nrecs));
type = (int*)(bptr.c_str() + sizeof(*status) + sizeof(*nrecs));
type = (int*)(bptr.c_str() + sizeof(*nrecs));
}
void clear() {
*nrecs = 0;
*status = STATUS_FREE;
*type = 0;
}
// id
nodeid_t get_id() const { return id; }
void set_id(nodeid_t n) { id = n; }
// buffer
bufferptr& get_buffer() { return bptr; }
char *item_ptr() { return bptr.c_str() + sizeof(*nrecs) + sizeof(*type); }
// size
int size() { return *nrecs; }
void set_size(int s) { *nrecs = s; }
char *item_ptr() { return bptr.c_str() + sizeof(*status) + sizeof(*nrecs) + sizeof(*type); }
int& get_status() { return *status; }
bool is_dirty() const { return *status == STATUS_DIRTY; }
void set_status(int s) { *status = s; }
// type
int& get_type() { return *type; }
void set_type(int t) { *type = t; }
bool is_index() { return *type == TYPE_INDEX; }
bool is_leaf() { return *type == TYPE_LEAF; }
// state
bool is_dirty() { return state == STATE_DIRTY; }
bool is_tx() { return state == STATE_TX; }
bool is_clean() { return state == STATE_CLEAN; }
void set_state(int s) { state = s; }
void make_shadow(AlignedBufferPool& bufferpool) {
assert(is_tx());
shadow_bptr = bptr;
// new buffer
bptr = bufferpool.alloc(EBOFS_NODE_BYTES);
nrecs = (int*)(bptr.c_str());
type = (int*)(bptr.c_str() + sizeof(*nrecs));
// copy contents!
memcpy(bptr.c_str(), shadow_bptr.c_str(), EBOFS_NODE_BYTES);
}
};
class NodeRegion {
class NodePool {
protected:
AlignedBufferPool bufferpool; // our own memory allocator for node buffers
map<nodeid_t, Node*> node_map; // open node map
public:
vector<Extent> region_loc; // region locations
Extent usemap_even;
Extent usemap_odd;
protected:
// on-disk block states
set<nodeid_t> free;
set<nodeid_t> dirty;
set<nodeid_t> tx;
set<nodeid_t> clean; // aka used
set<nodeid_t> limbo;
Mutex lock;
Cond commit_cond;
int flushing;
static int make_nodeid(int region, int offset) {
return (region << 24) | offset;
@ -101,219 +139,276 @@ class NodeRegion {
static int nodeid_offset(nodeid_t nid) {
return nid & (0xffffffffUL >> 24);
}
protected:
int region_id;
int num_nodes;
public:
Extent location;
// free -> dirty -> committing -> clean
// dirty -> free
// or !dirty -> limbo -> free
set<nodeid_t> free;
set<nodeid_t> dirty;
set<nodeid_t> committing;
set<nodeid_t> limbo;
public:
NodeRegion(int id, Extent& loc) : region_id(id) {
location = loc;
num_nodes = location.length / EBOFS_NODE_BLOCKS;
}
int get_region_id() const { return region_id; }
void init_all_free() {
for (unsigned i=0; i<location.length; i++) {
nodeid_t nid = make_nodeid(region_id, i);
free.insert(nid);
}
NodePool() : bufferpool(EBOFS_NODE_BYTES),
flushing(0) {}
~NodePool() {
// nodes
release_all();
}
void induce_full_flush() {
// free -> limbo : so they get written out as such
for (set<nodeid_t>::iterator i = free.begin();
i != free.end();
i++)
limbo.insert(*i);
free.clear();
}
int size() const {
return num_nodes;
//return (location.length / EBOFS_NODE_BLOCKS); // FIXME THIS IS WRONG
}
int num_free() const {
int num_free() {
return free.size();
}
// new/open node
nodeid_t new_nodeid() {
assert(num_free());
nodeid_t nid = *(free.begin());
free.erase(nid);
dirty.insert(nid);
return nid;
}
void release(nodeid_t nid) {
if (dirty.count(nid)) {
dirty.erase(nid);
free.insert(nid);
}
else {
if (committing.count(nid))
committing.erase(nid);
limbo.insert(nid);
// the caller had better adjust usemap locations...
void add_region(Extent ex) {
int region = region_loc.size();
region_loc.push_back(ex);
for (unsigned o = 0; o < ex.length; o++) {
free.insert( make_nodeid(region, o) );
}
}
};
class NodePool {
protected:
int num_regions;
map<int, NodeRegion*> node_regions; // regions
map<nodeid_t, Node*> node_map; // open node map
AlignedBufferPool bufferpool; // memory pool
public:
NodePool() : num_regions(0), bufferpool(EBOFS_NODE_BYTES) {}
~NodePool() {
// nodes
set<Node*> left;
for (map<nodeid_t,Node*>::iterator i = node_map.begin();
i != node_map.end();
i++)
left.insert(i->second);
for (set<Node*>::iterator i = left.begin();
i != left.end();
i++)
release( *i );
assert(node_map.empty());
// regions
for (map<int,NodeRegion*>::iterator i = node_regions.begin();
i != node_regions.end();
i++)
delete i->second;
node_regions.clear();
}
void add_region(NodeRegion *r) {
node_regions[r->get_region_id()] = r;
num_regions++;
}
int get_num_regions() {
return num_regions;
}
Extent& get_region_loc(int r) {
return node_regions[r]->location;
}
int num_free() {
int f = 0;
for (map<int,NodeRegion*>::iterator i = node_regions.begin();
i != node_regions.end();
i++)
f += i->second->num_free();
return f;
}
// ***
int read(BlockDevice& dev, struct ebofs_nodepool *np) {
int init(struct ebofs_nodepool *np) {
// regions
for (int i=0; i<np->num_regions; i++) {
dout(3) << " region " << i << " at " << np->region_loc[i] << endl;
NodeRegion *r = new NodeRegion(i, np->region_loc[i]);
add_region(r);
for (block_t boff = 0; boff < r->location.length; boff += EBOFS_NODE_BLOCKS) {
nodeid_t nid = NodeRegion::make_nodeid(r->get_region_id(), boff);
dout(3) << "init region " << i << " at " << np->region_loc[i] << endl;
region_loc.push_back( np->region_loc[i] );
}
// usemap
usemap_even = np->node_usemap_even;
usemap_odd = np->node_usemap_odd;
dout(3) << "init even map at " << usemap_even << endl;
dout(3) << "init odd map at " << usemap_odd << endl;
return 0;
}
// *** blocking i/o routines ***
int read_usemap(BlockDevice& dev, version_t epoch) {
// read map
Extent loc;
if (epoch & 1)
loc = usemap_odd;
else
loc = usemap_even;
bufferptr bp = bufferpool.alloc(EBOFS_BLOCK_SIZE*loc.length);
dev.read(loc.start, loc.length, bp);
// parse
unsigned region = 0; // current region
unsigned roff = 0; // offset in region
for (unsigned byte = 0; byte<bp.length(); byte++) { // each byte
// get byte
int x = *(unsigned char*)(bp.c_str() + byte);
int mask = 0x80; // left-most bit
for (unsigned bit=0; bit<8; bit++) {
nodeid_t nid = make_nodeid(region, roff);
if (x & mask)
clean.insert(nid);
else
free.insert(nid);
mask = mask >> 1; // move one bit right.
roff++;
if (roff == region_loc[region].length) {
// next region!
roff = 0;
region++;
break;
}
}
if (region == region_loc.size()) break;
}
return 0;
}
int read_clean_nodes(BlockDevice& dev) {
/*
this relies on the clean set begin defined so that we know which nodes
to read. so it only really works when called from mount()!
*/
for (unsigned r=0; r<region_loc.size(); r++) {
dout(3) << "read region " << r << " at " << region_loc[r] << endl;
for (block_t boff = 0; boff < region_loc[r].length; boff++) {
nodeid_t nid = make_nodeid(r, boff);
if (!clean.count(nid)) continue;
dout(20) << "read node " << nid << endl;
bufferptr bp = bufferpool.alloc(EBOFS_NODE_BYTES);
dev.read(r->location.start + (block_t)boff, EBOFS_NODE_BLOCKS,
dev.read(region_loc[r].start + (block_t)boff, EBOFS_NODE_BLOCKS,
bp);
Node *n = new Node(nid, bp);
if (n->get_status() == Node::STATUS_FREE) {
dout(5) << " node " << nid << " free" << endl;
r->free.insert(nid);
delete n;
} else {
dout(5) << " node " << nid << " in use" << endl;
node_map[nid] = n;
}
Node *n = new Node(nid, bp, Node::STATE_CLEAN);
node_map[nid] = n;
}
}
return 0;
}
void init_all_free() {
for (map<int,NodeRegion*>::iterator i = node_regions.begin();
i != node_regions.end();
i++) {
i->second->init_all_free();
// **** non-blocking i/o ****
private:
class C_NP_FlushUsemap : public Context {
NodePool *pool;
public:
C_NP_FlushUsemap(NodePool *p) :
pool(p) {}
void finish(int r) {
pool->flushed_usemap();
}
};
void flushed_usemap() {
lock.Lock();
flushing--;
if (flushing == 0)
commit_cond.Signal();
lock.Unlock();
}
void induce_full_flush() {
for (map<int,NodeRegion*>::iterator i = node_regions.begin();
i != node_regions.end();
i++) {
i->second->induce_full_flush();
}
}
public:
int write_usemap(BlockDevice& dev, version_t version) {
// alloc
Extent loc;
if (version & 1)
loc = usemap_odd;
else
loc = usemap_even;
bufferptr bp = bufferpool.alloc(EBOFS_BLOCK_SIZE*loc.length);
void flush(BlockDevice& dev) {
// flush dirty items, change them to limbo status
for (map<int,NodeRegion*>::iterator i = node_regions.begin();
i != node_regions.end();
i++) {
NodeRegion *r = i->second;
// dirty -> clean
r->committing = r->dirty;
r->dirty.clear();
// fill in
unsigned region = 0; // current region
unsigned roff = 0; // offset in region
for (unsigned byte = 0; byte<bp.length(); byte++) { // each byte
int x = 0; // start with empty byte
int mask = 0x80; // left-most bit
for (unsigned bit=0; bit<8; bit++) {
nodeid_t nid = make_nodeid(region, roff);
if (clean.count(nid) ||
dirty.count(nid))
x |= mask;
for (set<int>::iterator i = r->committing.begin();
i != r->committing.end();
i++) {
Node *n = get_node(*i);
assert(n); // it's dirty, we better have it
n->set_status(Node::STATUS_CLEAN);
block_t off = NodeRegion::nodeid_offset(*i);
dev.write(r->location.start + off, EBOFS_NODE_BLOCKS, n->get_buffer());
roff++;
mask = mask >> 1;
if (roff == region_loc[region].length) {
// next region!
roff = 0;
region++;
break;
}
}
r->committing.clear();
// limbo -> free
r->committing = r->limbo;
r->limbo.clear();
bufferptr freebuffer = bufferpool.alloc(EBOFS_NODE_BYTES);
Node freenode(1, freebuffer);
freenode.set_status(Node::STATUS_FREE);
for (set<int>::iterator i = r->committing.begin();
i != r->committing.end();
i++) {
freenode.set_id( *i );
block_t off = NodeRegion::nodeid_offset(*i);
dev.write(r->location.start + off, EBOFS_NODE_BLOCKS, freenode.get_buffer());
}
for (set<int>::iterator i = r->committing.begin();
i != r->committing.end();
i++)
r->free.insert(*i);
r->committing.clear();
*(unsigned char*)(bp.c_str() + byte) = x;
if (region == region_loc.size()) break;
}
// write
bufferlist bl;
bl.append(bp);
dev.write(loc.start, loc.length, bl,
new C_NP_FlushUsemap(this));
return 0;
}
// *** node commit ***
private:
bool is_committing() {
return (flushing > 0);
}
class C_NP_FlushNode : public Context {
NodePool *pool;
nodeid_t nid;
public:
C_NP_FlushNode(NodePool *p, nodeid_t n) :
pool(p), nid(n) {}
void finish(int r) {
pool->flushed_node(nid);
}
};
void flushed_node(nodeid_t nid) {
lock.Lock();
assert(tx.count(nid));
if (tx.count(nid)) {
tx.erase(nid);
clean.insert(nid);
}
else {
assert(limbo.count(nid));
}
flushing--;
if (flushing == 0)
commit_cond.Signal();
lock.Unlock();
}
public:
void commit_start(BlockDevice& dev, version_t version) {
lock.Lock();
assert(!is_committing());
// write map
flushing++;
write_usemap(dev,version & 1);
// dirty -> tx (write to disk)
for (set<nodeid_t>::iterator i = dirty.begin();
i != dirty.end();
i++) {
Node *n = get_node(*i);
assert(n);
assert(n->is_dirty());
n->set_state(Node::STATE_TX);
unsigned region = nodeid_region(*i);
block_t off = nodeid_offset(*i);
bufferlist bl;
bl.append(n->get_buffer());
dev.write(region_loc[region].start + off, EBOFS_NODE_BLOCKS,
bl,
new C_NP_FlushNode(this, *i));
flushing++;
tx.insert(*i);
}
dirty.clear();
// limbo -> free
for (set<nodeid_t>::iterator i = limbo.begin();
i != limbo.end();
i++) {
free.insert(*i);
}
limbo.clear();
lock.Unlock();
}
void commit_wait() {
lock.Lock();
while (is_committing()) {
commit_cond.Wait(lock);
}
lock.Unlock();
}
@ -326,7 +421,7 @@ class NodePool {
}
// unopened node
/*
/* not implemented yet!!
Node* open_node(nodeid_t nid) {
Node *n = node_regions[ NodeRegion::nodeid_region(nid) ]->open_node(nid);
dbtout << "pool.open_node " << n->get_id() << endl;
@ -335,47 +430,86 @@ class NodePool {
}
*/
// new node
nodeid_t new_nodeid() {
for (map<int,NodeRegion*>::iterator i = node_regions.begin();
i != node_regions.end();
i++) {
if (i->second->num_free())
return i->second->new_nodeid();
}
assert(0); // full!
return -1;
// allocate id/block on disk. always free -> dirty.
nodeid_t alloc_id() {
// pick node id
assert(!free.empty());
nodeid_t nid = *(free.begin());
free.erase(nid);
dirty.insert(nid);
return nid;
}
Node* new_node() {
// new node
Node* new_node(int type) {
nodeid_t nid = alloc_id();
dbtout << "pool.new_node " << nid << endl;
// alloc node
bufferptr bp = bufferpool.alloc(EBOFS_NODE_BYTES);
Node *n = new Node(new_nodeid(), bp);
n->clear();
dbtout << "pool.new_node " << n->get_id() << endl;
assert(node_map.count(n->get_id()) == 0);
node_map[n->get_id()] = n;
Node *n = new Node(nid, bp, Node::STATE_DIRTY);
n->set_type(type);
n->set_size(0);
assert(node_map.count(nid) == 0);
node_map[nid] = n;
return n;
}
void release_nodeid(nodeid_t nid) {
dbtout << "pool.release_nodeid on " << nid << endl;
assert(node_map.count(nid) == 0);
node_regions[ NodeRegion::nodeid_region(nid) ]->release(nid);
return;
}
void release(Node *n) {
dbtout << "pool.release on " << n->get_id() << endl;
node_map.erase(n->get_id());
release_nodeid(n->get_id());
const nodeid_t nid = n->get_id();
dbtout << "pool.release on " << nid << endl;
node_map.erase(nid);
if (n->is_dirty()) {
dirty.erase(nid);
free.insert(nid);
} else if (n->is_clean()) {
clean.erase(nid);
limbo.insert(nid);
} else
assert(0);
delete n;
}
void release_all() {
set<Node*> left;
for (map<nodeid_t,Node*>::iterator i = node_map.begin();
i != node_map.end();
i++)
left.insert(i->second);
for (set<Node*>::iterator i = left.begin();
i != left.end();
i++)
release( *i );
assert(node_map.empty());
}
void dirty_node(Node *n) {
assert(!n->is_dirty());
n->set_status(Node::STATUS_DIRTY);
nodeid_t newid = new_nodeid();
dbtout << "pool.dirty_node on " << n->get_id() << " now " << newid << endl;
node_map.erase(n->get_id());
release_nodeid(n->get_id());
// get new node id?
nodeid_t oldid = n->get_id();
nodeid_t newid = alloc_id();
dbtout << "pool.dirty_node on " << oldid << " now " << newid << endl;
// release old block
if (n->is_clean()) {
assert(clean.count(oldid));
clean.erase(oldid);
} else {
assert(n->is_tx());
assert(tx.count(oldid));
tx.erase(oldid);
// move/copy current -> shadow buffer as necessary
n->make_shadow(bufferpool);
}
limbo.insert(oldid);
node_map.erase(oldid);
n->set_state(Node::STATE_DIRTY);
// move to new one!
n->set_id(newid);
node_map[newid] = n;
}

View File

@ -75,6 +75,9 @@ static const int EBOFS_NODE_BYTES = EBOFS_NODE_BLOCKS * EBOFS_BLOCK_SIZE;
static const int EBOFS_MAX_NODE_REGIONS = 10; // pick a better value!
struct ebofs_nodepool {
Extent node_usemap_even; // for even sb versions
Extent node_usemap_odd; // for odd sb versions
int num_regions;
Extent region_loc[EBOFS_MAX_NODE_REGIONS];
};
@ -127,24 +130,23 @@ static const int EBOFS_NUM_FREE_BUCKETS = 16; /* see alloc.h for bucket constr
struct ebofs_super {
unsigned s_magic;
unsigned version; // version of this superblock.
unsigned epoch; // version of this superblock.
unsigned num_blocks; /* # blocks in filesystem */
// basic stats, for kicks
unsigned free_blocks; /* unused blocks */
//unsigned num_btree_blocks; /* blocks devoted to btrees (sum of chunks) */
unsigned num_objects;
/* stupid stuff */
unsigned num_fragmented;
struct ebofs_table object_tab; // object directory
struct ebofs_table free_tab[EBOFS_NUM_FREE_BUCKETS];
struct ebofs_nodepool nodepool;
// tables
struct ebofs_table free_tab[EBOFS_NUM_FREE_BUCKETS];
struct ebofs_table object_tab; // object directory
struct ebofs_table collection_tab; // collection directory
struct ebofs_table oc_tab;
struct ebofs_table co_tab;
struct ebofs_nodepool table_nodepool;
};

194
ceph/include/interval_set.h Normal file
View File

@ -0,0 +1,194 @@
#ifndef __INTERVAL_SET_H
#define __INTERVAL_SET_H
#include <map>
#include <ostream>
#include <cassert>
using namespace std;
template<typename T>
class interval_set {
public:
map<T,T> m; // map start -> len
// helpers
private:
typename map<T,T>::iterator find_inc(T start) {
typename map<T,T>::iterator p = m.lower_bound(start);
if (p != m.begin() &&
(p->first > start || p == m.end())) {
p--; // might overlap?
if (p->first + p->second <= start)
p++; // it doesn't.
}
return p;
}
typename map<T,T>::iterator find_adj(T start) {
typename map<T,T>::iterator p = m.lower_bound(start);
if (p != m.begin() &&
(p->first > start || p == m.end())) {
p--; // might touch?
if (p->first + p->second < start)
p++; // it doesn't.
}
return p;
}
public:
void clear() {
m.clear();
}
bool contains(T i) {
typename map<T,T>::iterator p = find_inc(i);
if (p == end()) return false;
if (p->first > i) return false;
if (p->first+p->second <= i) return false;
assert(p->first <= i && p->first+p->second > i);
return true;
}
bool contains(T start, T len) {
typename map<T,T>::iterator p = find_inc(start);
if (p == end()) return false;
if (p->first > start) return false;
if (p->first+p->second <= start) return false;
assert(p->first <= start && p->first+p->second > start);
if (p->first+p->second < start+len) return false;
return true;
}
void insert(T start, T len) {
typename map<T,T>::iterator p = find_adj(start);
if (p == m.end()) {
m[start] = len; // new interval
} else {
if (p->first < start) {
assert(p->first + p->second == start);
p->second += len; // append to end
typename map<T,T>::iterator n = p;
n++;
if (start+len == n->first) { // combine with next, too!
p->second += n->second;
m.erase(n);
}
} else {
if (start+len == p->first) {
m[start] = len + p->second; // append to front
m.erase(p);
} else {
assert(p->first > start+len);
m[start] = len; // new interval
}
}
}
}
void erase(T start, T len) {
typename map<T,T>::iterator p = find_inc(start);
assert(p != m.end());
assert(p->first <= start);
T before = start - p->first;
assert(p->second >= before+len);
T after = p->second - before - len;
if (before)
p->second = before; // shorten bit before
else
m.erase(p);
if (after)
m[start+len] = after;
}
void subtract(interval_set &a) {
for (typename map<T,T>::iterator p = a.m.begin();
p != a.m.end();
p++)
erase(p->first, p->second);
}
void insert(interval_set &a) {
for (typename map<T,T>::iterator p = a.m.begin();
p != a.m.end();
p++)
insert(p->first, p->second);
}
void intersection_of(interval_set &a, interval_set &b) {
typename map<T,T>::iterator pa = a.m.begin();
typename map<T,T>::iterator pb = b.m.begin();
while (pa != a.m.end() && pb != b.m.end()) {
// passing?
if (pa->first + pa->second <= pb->first)
{ pa++; continue; }
if (pb->first + pb->second <= pa->first)
{ pb++; continue; }
T start = MAX(pa->first, pb->first);
T end = MIN(pa->first+pa->second, pb->first+pb->second);
assert(end > start);
insert(start, end-start);
pa++;
pb++;
}
}
void union_of(interval_set &a, interval_set &b) {
typename map<T,T>::iterator pa = a.m.begin();
typename map<T,T>::iterator pb = b.m.begin();
while (pa != a.m.end() || pb != b.m.end()) {
// passing?
if (pb == b.m.end() || pa->first + pa->second <= pb->first) {
insert(pa->first, pa->second);
pa++; continue;
}
if (pa == a.m.end() || pb->first + pb->second <= pa->first) {
insert(pb->first, pb->second);
pb++; continue;
}
T start = MIN(pa->first, pb->first);
T end = MAX(pa->first+pa->second, pb->first+pb->second);
insert(start, end-start);
pa++;
pb++;
}
}
void union_of(interval_set &a) {
interval_set b;
b.m.swap(m);
union_of(a, b);
}
bool subset_of(interval_set &big) {
for (typename map<T,T>::iterator i = m.begin();
i != m.end();
i++)
if (!big.contains(i->first, i->second)) return false;
return true;
}
};
template<class T>
inline ostream& operator<<(ostream& out, interval_set<T> &s) {
out << "[";
for (typename map<T,T>::iterator i = s.m.begin();
i != s.m.end();
i++) {
if (i != s.m.begin()) out << ",";
out << i->first << "~" << i->second;
}
out << "]";
return out;
}
#endif