Modified Files:

Buffercache.cc Client.cc

Introduced more parallelism:

(1) reads hit on flushing buffers in addition to clean and dirty ones
(2) trim_bcache is not blocking unless it cannot reclaim a single buffer in
which case it waits until all buffers are flushed


git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@469 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
carlosm 2005-07-23 21:40:55 +00:00
parent bdf5994bbe
commit 42e438f127
2 changed files with 41 additions and 21 deletions

View File

@ -76,6 +76,7 @@ void Bufferhead::alloc_buffers(size_t size)
void Bufferhead::miss_start(size_t miss_len)
{
assert(state == BUFHD_STATE_CLEAN);
get();
state = BUFHD_STATE_RX;
this->miss_len = miss_len;
bc->lru.lru_touch(this);
@ -96,11 +97,13 @@ void Bufferhead::miss_finish()
//assert(bl.length() == miss_len);
wakeup_read_waiters();
wakeup_write_waiters();
put();
}
void Bufferhead::dirty()
{
if (state == BUFHD_STATE_CLEAN) {
get();
dout(6) << "bc: dirtying clean buffer size: " << bl.length() << endl;
state = BUFHD_STATE_DIRTY;
dirty_since = time(NULL); // start clock for dirty buffer here
@ -110,10 +113,8 @@ void Bufferhead::dirty()
dout(6) << "bc: dirty after: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " rx_size: " << bc->get_rx_size() << " tx_size: " << bc->get_tx_size() << " age: " << bc->dirty_buffers.get_age() << endl;
assert(!bc->dirty_buffers.exist(this));
bc->dirty_buffers.insert(this);
get();
assert(!fc->dirty_buffers.count(this));
fc->dirty_buffers.insert(this);
get();
} else {
dout(10) << "bc: dirtying dirty buffer size: " << bl.length() << endl;
}
@ -124,7 +125,6 @@ void Bufferhead::dirtybuffers_erase()
dout(10) << "bc: erase in dirtybuffers size: " << bl.length() << " in state " << state << endl;
assert(bc->dirty_buffers.exist(this));
bc->dirty_buffers.erase(this);
put();
assert(fc->dirty_buffers.count(this));
fc->dirty_buffers.erase(this);
put();
@ -134,6 +134,7 @@ void Bufferhead::flush_start()
{
dout(10) << "bc: flush_start" << endl;
assert(state == BUFHD_STATE_DIRTY);
get();
state = BUFHD_STATE_TX;
dirtybuffers_erase();
assert(!bc->inflight_buffers.count(this));
@ -156,6 +157,7 @@ void Bufferhead::flush_finish()
assert(fc->inflight_buffers.count(this));
fc->inflight_buffers.erase(this);
wakeup_write_waiters(); // readers never wait on flushes
put();
}
void Bufferhead::claim_append(Bufferhead *other)
@ -214,7 +216,7 @@ void Dirtybuffers::get_expired(time_t ttl, int left_dirty, set<Bufferhead*>& to_
it != _dbufs.end() && left_dirty > 0;
it++) {
if (ttl > now - it->second->dirty_since &&
left_dirty >= it->second->bc->get_dirty_size()) break;
left_dirty >= (int)it->second->bc->get_dirty_size()) break;
to_flush.insert(it->second);
left_dirty -= it->second->length();
}
@ -395,23 +397,28 @@ int Filecache::copy_out(size_t size, off_t offset, char *dst)
map<off_t, Bufferhead*>::iterator curbuf = buffer_map.lower_bound(offset);
if (curbuf == buffer_map.end() || curbuf->first > offset) {
return -1;
}
if (curbuf == buffer_map.begin()) {
return -1;
} else {
curbuf--;
}
}
offset -= curbuf->first;
if (offset < 0) dout(5) << "bc: copy_out: curbuf offset: " << curbuf->first << endl;
dout(6) << "bc: copy_out: curbuf offset: " << curbuf->first << " offset: " << offset << endl;
assert(offset >= 0);
while (size > 0) {
Bufferhead *bh = curbuf->second;
if (offset + size <= bh->length()) {
dout(10) << "bc: copy_out bh len: " << bh->length() << endl;
dout(6) << "bc: copy_out bh len: " << bh->length() << " size: " << size << endl;
dout(10) << "bc: want to copy off: " << offset << " size: " << size << endl;
bh->bl.copy(offset, size, dst);
size = 0;
break;
}
int howmuch = bh->length() - offset;
dout(10) << "bc: copy_out bh len: " << bh->length() << endl;
dout(6) << "bc: copy_out bh len: " << bh->length() << " size: " << size << endl;
dout(10) << "bc: want to copy off: " << offset << " size: " << howmuch << endl;
bh->bl.copy(offset, howmuch, dst);
@ -424,7 +431,7 @@ int Filecache::copy_out(size_t size, off_t offset, char *dst)
assert(curbuf != buffer_map.end());
}
}
return rvalue;
return rvalue - size;
}
// -- Buffercache methods
@ -468,8 +475,12 @@ size_t Buffercache::touch_continuous(map<off_t, Bufferhead*>& hits, size_t size,
{
dout(7) << "bc: touch_continuous size: " << size << " offset: " << offset << endl;
off_t next_off = offset;
if (hits.begin()->first > offset ||
hits.begin()->first + hits.begin()->second->length() <= offset) {
return 0;
}
for (map<off_t, Bufferhead*>::iterator curbuf = hits.begin();
curbuf != hits.end();
curbuf != hits.end();
curbuf++) {
if (curbuf == hits.begin()) {
next_off = curbuf->first;
@ -477,7 +488,7 @@ size_t Buffercache::touch_continuous(map<off_t, Bufferhead*>& hits, size_t size,
break;
}
lru.lru_touch(curbuf->second);
next_off += curbuf->second->bl.length();
next_off += curbuf->second->length();
}
return (size_t)(next_off - offset) >= size ? size : (next_off - offset);
}
@ -545,7 +556,6 @@ size_t Buffercache::reclaim(size_t min_size)
Bufferhead *bh = (Bufferhead*)lru.lru_expire();
if (!bh) {
dout(6) << "bc: nothing more to reclaim -- freed_size: " << freed_size << endl;
assert(0);
break; // nothing more to reclaim
} else {
dout(6) << "bc: reclaim: offset: " << bh->offset << " len: " << bh->length() << endl;

View File

@ -517,9 +517,11 @@ void Client::flush_buffers(int ttl, int dirty_size)
C_Client_FlushFinish *onfinish = new C_Client_FlushFinish(*it);
filer->write((*it)->ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
}
#if 0
dout(7) << "flush_buffers: dirty buffers, waiting" << endl;
assert(!bc.inflight_buffers.empty());
bc.wait_for_inflight(client_lock);
#endif
} else {
dout(7) << "no dirty buffers" << endl;
}
@ -536,7 +538,13 @@ void Client::trim_bcache()
}
// Now reclaim buffers
dout(6) << "bc: trim_bcache: reclaim: " << bc.get_total_size() - g_conf.client_bcache_size * g_conf.client_bcache_hiwater / 100 << endl;
bc.reclaim(bc.get_total_size() - g_conf.client_bcache_size * g_conf.client_bcache_hiwater / 100);
while (bc.reclaim(bc.get_total_size() -
g_conf.client_bcache_size *
g_conf.client_bcache_hiwater / 100) == 0) {
// cannot reclaim any buffers: wait for flush to finish
assert(!bc.inflight_buffers.empty());
bc.wait_for_inflight(client_lock);
}
}
}
@ -1455,14 +1463,17 @@ int Client::read(fh_t fh, char *buf, size_t size, off_t offset)
hits.clear(); rx.clear(); tx.clear(); holes.clear();
fc->map_existing(size, offset, hits, rx, tx, holes);
if (hits.count(offset)) {
// sweet -- we can return stuff immediately: find out how much
dout(6) << "read bc hit" << endl;
rvalue = (int)bc.touch_continuous(hits, size, offset);
assert(rvalue > 0);
// see whether there are initial buffer that can be read immmediately
if ((rvalue = (int)bc.touch_continuous(hits, size, offset)) > 0) {
dout(6) << "read bc hit on clean or dirty buffer, rvalue: " << rvalue << endl;
} else if ((rvalue = (int)bc.touch_continuous(tx, size, offset)) > 0) {
dout(6) << "read bc hit on tx buffer, rvalue: " << rvalue << endl;
}
if (rvalue > 0) {
// sweet -- we can return stuff immediately
rvalue = fc->copy_out((size_t)rvalue, offset, buf);
dout(6) << "read bc hit: immediately returning " << rvalue << " bytes" << endl;
assert(rvalue > 0);
dout(7) << "read bc hit: immediately returning " << rvalue << " bytes" << endl;
}
assert(!(rvalue >= 0 && (size_t)rvalue == size) || holes.empty());
@ -1513,7 +1524,6 @@ int Client::read(fh_t fh, char *buf, size_t size, off_t offset)
// buffer is filled -- see how much we can return
hits.clear(); rx.clear(); tx.clear(); holes.clear();
fc->map_existing(size, offset, hits, rx, tx, holes); // FIXME: overkill
//assert(hits.count(offset));
rvalue = (int)bc.touch_continuous(hits, size, offset);
fc->copy_out(rvalue, offset, buf);
dout(7) << "read bc no hit: returned first " << rvalue << " bytes" << endl;