mirror of
https://github.com/ceph/ceph
synced 2024-12-18 01:16:55 +00:00
Modified Files:
config.cc client/Buffercache.cc client/Buffercache.h client/Client.cc include/config.h Much more stable but still some bugs. Client::flush_buffers is currently blocking until buffer cache puts state changes in critical regions. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@440 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
54b7bc4a8f
commit
92f0573cca
@ -73,15 +73,17 @@ void Bufferhead::alloc_buffers(size_t size)
|
||||
void Bufferhead::miss_start(size_t miss_len)
|
||||
{
|
||||
assert(state == BUFHD_STATE_CLEAN);
|
||||
state = BUFHD_STATE_INFLIGHT;
|
||||
state = BUFHD_STATE_RX;
|
||||
this->miss_len = miss_len;
|
||||
bc->lru.lru_touch(this);
|
||||
}
|
||||
|
||||
void Bufferhead::miss_finish()
|
||||
{
|
||||
assert(state == BUFHD_STATE_INFLIGHT);
|
||||
assert(state == BUFHD_STATE_RX);
|
||||
state = BUFHD_STATE_CLEAN;
|
||||
bc->increase_size(bl.length());
|
||||
dout(6) << "bc: miss_finish: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " rx_size: " << bc->get_rx_size() << " tx_size: " << bc->get_tx_size() << " age: " << bc->dirty_buffers.get_age() << endl;
|
||||
//assert(bl.length() == miss_len);
|
||||
wakeup_read_waiters();
|
||||
wakeup_write_waiters();
|
||||
@ -90,16 +92,17 @@ void Bufferhead::miss_finish()
|
||||
void Bufferhead::dirty()
|
||||
{
|
||||
if (state == BUFHD_STATE_CLEAN) {
|
||||
dout(10) << "bc: dirtying clean buffer size: " << bl.length() << endl;
|
||||
dout(6) << "bc: dirtying clean buffer size: " << bl.length() << endl;
|
||||
state = BUFHD_STATE_DIRTY;
|
||||
dirty_since = time(NULL); // start clock for dirty buffer here
|
||||
bc->lru.lru_touch(this);
|
||||
dout(6) << "bc: dirty before: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " rx_size: " << bc->get_rx_size() << " tx_size: " << bc->get_tx_size() << " age: " << bc->dirty_buffers.get_age() << endl;
|
||||
bc->clean_to_dirty(bl.length());
|
||||
dout(6) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << " age: " << bc->dirty_buffers.get_age() << endl;
|
||||
dout(6) << "bc: dirty after: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " rx_size: " << bc->get_rx_size() << " tx_size: " << bc->get_tx_size() << " age: " << bc->dirty_buffers.get_age() << endl;
|
||||
assert(!bc->dirty_buffers.exist(this));
|
||||
bc->dirty_buffers.insert(this);
|
||||
get();
|
||||
assert(!fc->dirty_buffers.exist(this));
|
||||
assert(!fc->dirty_buffers.count(this));
|
||||
fc->dirty_buffers.insert(this);
|
||||
get();
|
||||
} else {
|
||||
@ -113,7 +116,7 @@ void Bufferhead::dirtybuffers_erase()
|
||||
assert(bc->dirty_buffers.exist(this));
|
||||
bc->dirty_buffers.erase(this);
|
||||
put();
|
||||
assert(fc->dirty_buffers.exist(this));
|
||||
assert(fc->dirty_buffers.count(this));
|
||||
fc->dirty_buffers.erase(this);
|
||||
put();
|
||||
}
|
||||
@ -122,19 +125,27 @@ void Bufferhead::flush_start()
|
||||
{
|
||||
dout(10) << "bc: flush_start" << endl;
|
||||
assert(state == BUFHD_STATE_DIRTY);
|
||||
state = BUFHD_STATE_INFLIGHT;
|
||||
state = BUFHD_STATE_TX;
|
||||
dirtybuffers_erase();
|
||||
bc->dirty_to_flushing(bl.length());
|
||||
dout(6) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << " age: " << bc->dirty_buffers.get_age() << endl;
|
||||
assert(!bc->inflight_buffers.count(this));
|
||||
bc->inflight_buffers.insert(this);
|
||||
bc->dirty_to_tx(bl.length());
|
||||
dout(6) << "bc: flush_start: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " rx_size: " << bc->get_rx_size() << " tx_size: " << bc->get_tx_size() << " age: " << bc->dirty_buffers.get_age() << endl;
|
||||
assert(!fc->inflight_buffers.count(this));
|
||||
fc->inflight_buffers.insert(this);
|
||||
}
|
||||
|
||||
void Bufferhead::flush_finish()
|
||||
{
|
||||
dout(10) << "bc: flush_finish" << endl;
|
||||
assert(state == BUFHD_STATE_INFLIGHT);
|
||||
assert(state == BUFHD_STATE_TX);
|
||||
state = BUFHD_STATE_CLEAN;
|
||||
bc->flushing_to_clean(bl.length());
|
||||
dout(6) << "bc: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " flushing_size: " << bc->get_flushing_size() << " age: " << bc->dirty_buffers.get_age() << endl;
|
||||
assert(bc->inflight_buffers.count(this));
|
||||
bc->inflight_buffers.erase(this);
|
||||
bc->tx_to_clean(bl.length());
|
||||
dout(6) << "bc: flush_finish: clean_size: " << bc->get_clean_size() << " dirty_size: " << bc->get_dirty_size() << " rx_size: " << bc->get_rx_size() << " tx_size: " << bc->get_tx_size() << " age: " << bc->dirty_buffers.get_age() << endl;
|
||||
assert(fc->inflight_buffers.count(this));
|
||||
fc->inflight_buffers.erase(this);
|
||||
wakeup_write_waiters(); // readers never wait on flushes
|
||||
}
|
||||
|
||||
@ -185,17 +196,19 @@ bool Dirtybuffers::exist(Bufferhead* bh)
|
||||
}
|
||||
|
||||
|
||||
void Dirtybuffers::get_expired(time_t ttl, size_t left_dirty, list<Bufferhead*>& to_flush)
|
||||
void Dirtybuffers::get_expired(time_t ttl, size_t left_dirty, set<Bufferhead*>& to_flush)
|
||||
{
|
||||
dout(6) << "bc: get_expired ttl: " << ttl << " left_dirty: " << left_dirty << endl;
|
||||
time_t now = time(NULL);
|
||||
for (multimap<time_t, Bufferhead*>::iterator it = _dbufs.begin();
|
||||
it != _dbufs.end();
|
||||
it++) {
|
||||
if (ttl > now - it->second->dirty_since &&
|
||||
left_dirty <= it->second->bc->get_dirty_size()) break;
|
||||
to_flush.push_back(it->second);
|
||||
left_dirty >= it->second->bc->get_dirty_size()) break;
|
||||
to_flush.insert(it->second);
|
||||
left_dirty -= it->second->bl.length();
|
||||
}
|
||||
dout(6) << "bc: get_expired to_flush.size(): " << to_flush.size() << endl;
|
||||
}
|
||||
|
||||
// -- Filecache methods
|
||||
@ -228,7 +241,8 @@ map<off_t, Bufferhead*>::iterator
|
||||
Filecache::map_existing(size_t len,
|
||||
off_t start_off,
|
||||
map<off_t, Bufferhead*>& hits,
|
||||
map<off_t, Bufferhead*>& inflight,
|
||||
map<off_t, Bufferhead*>& rx,
|
||||
map<off_t, Bufferhead*>& tx,
|
||||
map<off_t, size_t>& holes)
|
||||
{
|
||||
dout(7) << "bc: map_existing len: " << len << " off: " << start_off << endl;
|
||||
@ -245,9 +259,12 @@ Filecache::map_existing(size_t len,
|
||||
holes[need_off] = (size_t) (actual_off - need_off);
|
||||
dout(10) << "bc: map: hole " << need_off << " " << holes[need_off] << endl;
|
||||
}
|
||||
if (bh->state == BUFHD_STATE_INFLIGHT) {
|
||||
inflight[actual_off] = bh;
|
||||
dout(10) << "bc: map: inflight " << actual_off << " " << inflight[actual_off]->miss_len << endl;
|
||||
if (bh->state == BUFHD_STATE_RX) {
|
||||
rx[actual_off] = bh;
|
||||
dout(10) << "bc: map: rx " << actual_off << " " << rx[actual_off]->miss_len << endl;
|
||||
} else if (bh->state == BUFHD_STATE_TX) {
|
||||
tx[actual_off] = bh;
|
||||
dout(10) << "bc: map: tx " << actual_off << " " << tx[actual_off]->bl.length() << endl;
|
||||
} else {
|
||||
hits[actual_off] = bh;
|
||||
dout(10) << "bc: map: hits " << actual_off << " " << hits[actual_off]->bl.length() << endl;
|
||||
@ -272,7 +289,8 @@ void Filecache::simplify()
|
||||
while (start != buffer_map.end()) {
|
||||
next++;
|
||||
while (next != buffer_map.end() &&
|
||||
start->second->state != BUFHD_STATE_INFLIGHT &&
|
||||
start->second->state != BUFHD_STATE_RX &&
|
||||
start->second->state != BUFHD_STATE_TX &&
|
||||
start->second->state == next->second->state &&
|
||||
start->second->offset + start->second->bl.length() == next->second->offset &&
|
||||
next->second->read_waiters.empty() &&
|
||||
@ -348,11 +366,10 @@ int Filecache::copy_out(size_t size, off_t offset, char *dst)
|
||||
|
||||
void Buffercache::dirty(inodeno_t ino, size_t size, off_t offset, const char *src)
|
||||
{
|
||||
dout(7) << "bc: dirty ino: " << ino << " size: " << size << " offset: " << offset << endl;
|
||||
dout(6) << "bc: dirty ino: " << ino << " size: " << size << " offset: " << offset << endl;
|
||||
assert(bcache_map.count(ino)); // filecache has to be already allocated!!
|
||||
Filecache *fc = get_fc(ino);
|
||||
assert(offset >= 0);
|
||||
assert(offset + size <= fc->length());
|
||||
|
||||
map<off_t, Bufferhead*>::iterator curbuf = fc->overlap(size, offset);
|
||||
offset -= curbuf->first;
|
||||
@ -398,12 +415,13 @@ int Buffercache::touch_continuous(map<off_t, Bufferhead*>& hits, size_t size, of
|
||||
|
||||
void Buffercache::map_or_alloc(inodeno_t ino, size_t size, off_t offset,
|
||||
map<off_t, Bufferhead*>& buffers,
|
||||
map<off_t, Bufferhead*>& inflight)
|
||||
map<off_t, Bufferhead*>& rx,
|
||||
map<off_t, Bufferhead*>& tx)
|
||||
{
|
||||
dout(7) << "bc: map_or_alloc len: " << size << " off: " << offset << endl;
|
||||
Filecache *fc = get_fc(ino);
|
||||
map<off_t, size_t> holes;
|
||||
fc->map_existing(size, offset, buffers, inflight, holes);
|
||||
fc->map_existing(size, offset, buffers, rx, tx, holes);
|
||||
// stuff buffers into holes
|
||||
for (map<off_t, size_t>::iterator hole = holes.begin();
|
||||
hole != holes.end();
|
||||
@ -429,7 +447,7 @@ void Buffercache::release_file(inodeno_t ino)
|
||||
|
||||
decrease_size(it->second->bl.length());
|
||||
|
||||
dout(6) << "bc: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " flushing_size: " << get_flushing_size() << " age: " << dirty_buffers.get_age() << endl;
|
||||
dout(6) << "bc: release_file: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " rx_size: " << get_rx_size() << " tx_size: " << get_tx_size() << " age: " << dirty_buffers.get_age() << endl;
|
||||
assert(clean_size >= 0);
|
||||
delete it->second;
|
||||
}
|
||||
@ -455,17 +473,20 @@ size_t Buffercache::reclaim(size_t min_size)
|
||||
{
|
||||
dout(7) << "bc: reclaim min_size: " << min_size << endl;
|
||||
size_t freed_size = 0;
|
||||
while (freed_size >= min_size) {
|
||||
while (freed_size <= min_size) {
|
||||
Bufferhead *bh = (Bufferhead*)lru.lru_expire();
|
||||
if (!bh) {
|
||||
dout(6) << "bc: nothing more to reclaim -- freed_size: " << freed_size << endl;
|
||||
assert(0);
|
||||
break; // nothing more to reclaim
|
||||
} else {
|
||||
dout(6) << "bc: reclaim: offset: " << bh->offset << " len: " << bh->bl.length() << endl;
|
||||
assert(bh->state == BUFHD_STATE_CLEAN);
|
||||
freed_size += bh->bl.length();
|
||||
|
||||
decrease_size(bh->bl.length());
|
||||
|
||||
dout(6) << "bc: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " flushing_size: " << get_flushing_size() << " age: " << dirty_buffers.get_age() << endl;
|
||||
dout(6) << "bc: reclaim: clean_size: " << get_clean_size() << " dirty_size: " << get_dirty_size() << " rx_size: " << get_rx_size() << " tx_size: " << get_tx_size() << " age: " << dirty_buffers.get_age() << endl;
|
||||
assert(clean_size >= 0);
|
||||
bh->fc->buffer_map.erase(bh->offset);
|
||||
if (bh->fc->buffer_map.empty()) {
|
||||
|
@ -18,7 +18,8 @@ using namespace std;
|
||||
// Bufferhead states
|
||||
#define BUFHD_STATE_CLEAN 1
|
||||
#define BUFHD_STATE_DIRTY 2
|
||||
#define BUFHD_STATE_INFLIGHT 3
|
||||
#define BUFHD_STATE_RX 3
|
||||
#define BUFHD_STATE_TX 4
|
||||
|
||||
#undef dout
|
||||
#define dout(l) if (l<=g_conf.debug) cout << "client" << "." << pthread_self() << " "
|
||||
@ -111,30 +112,52 @@ class Bufferhead : public LRUObject {
|
||||
class Dirtybuffers {
|
||||
private:
|
||||
multimap<time_t, Bufferhead*> _dbufs;
|
||||
// DEBUG
|
||||
time_t former_age;
|
||||
|
||||
public:
|
||||
Dirtybuffers() {
|
||||
former_age = 0;
|
||||
dout(5) << "Dirtybuffers() former_age: " << former_age << endl;
|
||||
}
|
||||
Dirtybuffers(const Dirtybuffers& other);
|
||||
Dirtybuffers& operator=(const Dirtybuffers& other);
|
||||
void erase(Bufferhead* bh);
|
||||
void insert(Bufferhead* bh);
|
||||
bool empty() { return _dbufs.empty(); }
|
||||
bool exist(Bufferhead* bh);
|
||||
void get_expired(time_t ttl, size_t left_dirty, list<Bufferhead*>& to_flush);
|
||||
void get_expired(time_t ttl, size_t left_dirty, set<Bufferhead*>& to_flush);
|
||||
time_t get_age() {
|
||||
if (!_dbufs.empty()) return time(NULL) - _dbufs.begin()->second->dirty_since;
|
||||
time_t age;
|
||||
if (_dbufs.empty()) {
|
||||
age = 0;
|
||||
} else {
|
||||
age = time(NULL) - _dbufs.begin()->second->dirty_since;
|
||||
}
|
||||
dout(10) << "former age: " << former_age << " age: " << age << endl;
|
||||
assert((!(former_age > 30)) || (age > 0));
|
||||
former_age = age;
|
||||
return age;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
class Filecache {
|
||||
private:
|
||||
list<Cond*> inflight_waiters;
|
||||
|
||||
public:
|
||||
map<off_t, Bufferhead*> buffer_map;
|
||||
Dirtybuffers dirty_buffers;
|
||||
list<Cond*> waitfor_flushed;
|
||||
set<Bufferhead*> dirty_buffers;
|
||||
set<Bufferhead*> inflight_buffers;
|
||||
Buffercache *bc;
|
||||
|
||||
Filecache(Buffercache *bc) {
|
||||
this->bc = bc;
|
||||
buffer_map.clear();
|
||||
}
|
||||
Filecache(const Filecache& other);
|
||||
Filecache& operator=(const Filecache& other);
|
||||
|
||||
~Filecache() {
|
||||
for (map<off_t, Bufferhead*>::iterator it = buffer_map.begin();
|
||||
@ -154,34 +177,43 @@ class Filecache {
|
||||
return len;
|
||||
}
|
||||
|
||||
void wait_for_flush(Mutex &lock) {
|
||||
void wait_for_inflight(Mutex &lock) {
|
||||
Cond cond;
|
||||
waitfor_flushed.push_back(&cond);
|
||||
inflight_waiters.push_back(&cond);
|
||||
cond.Wait(lock);
|
||||
}
|
||||
|
||||
void wakeup_inflight_waiters() {
|
||||
for (list<Cond*>::iterator it = inflight_waiters.begin();
|
||||
it != inflight_waiters.end();
|
||||
it++) {
|
||||
(*it)->Signal();
|
||||
}
|
||||
inflight_waiters.clear();
|
||||
}
|
||||
|
||||
map<off_t, Bufferhead*>::iterator overlap(size_t len, off_t off);
|
||||
int copy_out(size_t size, off_t offset, char *dst);
|
||||
map<off_t, Bufferhead*>::iterator map_existing(size_t len, off_t start_off,
|
||||
map<off_t, Bufferhead*>& hits,
|
||||
map<off_t, Bufferhead*>& inflight,
|
||||
map<off_t, Bufferhead*>& rx,
|
||||
map<off_t, Bufferhead*>& tx,
|
||||
map<off_t, size_t>& holes);
|
||||
void simplify();
|
||||
};
|
||||
|
||||
class Buffercache {
|
||||
private:
|
||||
Mutex buffercache_lock;
|
||||
size_t dirty_size, flushing_size, clean_size;
|
||||
size_t dirty_size, rx_size, tx_size, clean_size;
|
||||
list<Cond*> inflight_waiters;
|
||||
|
||||
public:
|
||||
map<inodeno_t, Filecache*> bcache_map;
|
||||
LRU lru;
|
||||
Dirtybuffers dirty_buffers;
|
||||
list<Cond*> waitfor_flushed;
|
||||
set<Bufferhead*> flushing_buffers;
|
||||
set<Bufferhead*> inflight_buffers;
|
||||
|
||||
Buffercache() : dirty_size(0), flushing_size(0), clean_size(0) { }
|
||||
Buffercache() : dirty_size(0), rx_size(0), tx_size(0), clean_size(0) { }
|
||||
|
||||
// FIXME: constructor & destructor need to mesh with allocator scheme
|
||||
~Buffercache() {
|
||||
@ -192,6 +224,8 @@ class Buffercache {
|
||||
delete it->second;
|
||||
}
|
||||
}
|
||||
Buffercache(const Buffercache& other);
|
||||
Buffercache& operator=(const Buffercache& other);
|
||||
|
||||
Filecache *get_fc(inodeno_t ino) {
|
||||
if (!bcache_map.count(ino)) {
|
||||
@ -200,10 +234,19 @@ class Buffercache {
|
||||
return bcache_map[ino];
|
||||
}
|
||||
|
||||
void wait_for_flush(Mutex &lock) {
|
||||
Cond cond;
|
||||
waitfor_flushed.push_back(&cond);
|
||||
cond.Wait(lock);
|
||||
void wait_for_inflight(Mutex &lock) {
|
||||
Cond cond;
|
||||
inflight_waiters.push_back(&cond);
|
||||
cond.Wait(lock);
|
||||
}
|
||||
|
||||
void wakeup_inflight_waiters() {
|
||||
for (list<Cond*>::iterator it = inflight_waiters.begin();
|
||||
it != inflight_waiters.end();
|
||||
it++) {
|
||||
(*it)->Signal();
|
||||
}
|
||||
inflight_waiters.clear();
|
||||
}
|
||||
|
||||
void clean_to_dirty(size_t size) {
|
||||
@ -211,19 +254,19 @@ class Buffercache {
|
||||
assert(clean_size >= 0);
|
||||
dirty_size += size;
|
||||
}
|
||||
void dirty_to_flushing(size_t size) {
|
||||
void dirty_to_tx(size_t size) {
|
||||
dirty_size -= size;
|
||||
assert(dirty_size >= 0);
|
||||
flushing_size += size;
|
||||
tx_size += size;
|
||||
}
|
||||
void flushing_to_dirty(size_t size) {
|
||||
flushing_size -= size;
|
||||
assert(flushing_size >= 0);
|
||||
void tx_to_dirty(size_t size) {
|
||||
tx_size -= size;
|
||||
assert(tx_size >= 0);
|
||||
dirty_size += size;
|
||||
}
|
||||
void flushing_to_clean(size_t size) {
|
||||
flushing_size -= size;
|
||||
assert(flushing_size >= 0);
|
||||
void tx_to_clean(size_t size) {
|
||||
tx_size -= size;
|
||||
assert(tx_size >= 0);
|
||||
clean_size += size;
|
||||
}
|
||||
void increase_size(size_t size) {
|
||||
@ -235,8 +278,9 @@ class Buffercache {
|
||||
}
|
||||
size_t get_clean_size() { return clean_size; }
|
||||
size_t get_dirty_size() { return dirty_size; }
|
||||
size_t get_flushing_size() { return flushing_size; }
|
||||
size_t get_total_size() { return clean_size + dirty_size + flushing_size; }
|
||||
size_t get_rx_size() { return rx_size; }
|
||||
size_t get_tx_size() { return tx_size; }
|
||||
size_t get_total_size() { return clean_size + dirty_size + rx_size + tx_size; }
|
||||
void get_reclaimable(size_t min_size, list<Bufferhead*>&);
|
||||
|
||||
void insert(Bufferhead *bh);
|
||||
@ -244,7 +288,8 @@ class Buffercache {
|
||||
int touch_continuous(map<off_t, Bufferhead*>& hits, size_t size, off_t offset);
|
||||
void map_or_alloc(inodeno_t ino, size_t len, off_t off,
|
||||
map<off_t, Bufferhead*>& buffers,
|
||||
map<off_t, Bufferhead*>& inflight);
|
||||
map<off_t, Bufferhead*>& rx,
|
||||
map<off_t, Bufferhead*>& tx);
|
||||
void release_file(inodeno_t ino);
|
||||
size_t reclaim(size_t min_size);
|
||||
};
|
||||
|
@ -400,23 +400,13 @@ void Client::dispatch(Message *m)
|
||||
*/
|
||||
class C_Client_FileFlushFinish : public Context {
|
||||
public:
|
||||
Filecache *fc;
|
||||
Bufferhead *bh;
|
||||
C_Client_FileFlushFinish(Filecache *fc, Bufferhead *bh) {
|
||||
this->fc = fc;
|
||||
C_Client_FileFlushFinish(Bufferhead *bh) {
|
||||
this->bh = bh;
|
||||
}
|
||||
void finish(int r) {
|
||||
bh->flush_finish();
|
||||
if (fc->dirty_buffers.empty()) {
|
||||
// wake up flush waiters
|
||||
for (list<Cond*>::iterator it = fc->waitfor_flushed.begin();
|
||||
it != fc->waitfor_flushed.end();
|
||||
it++) {
|
||||
(*it)->Signal();
|
||||
}
|
||||
fc->waitfor_flushed.clear();
|
||||
}
|
||||
if (bh->fc->inflight_buffers.empty()) bh->fc->wakeup_inflight_waiters();
|
||||
}
|
||||
};
|
||||
|
||||
@ -433,20 +423,20 @@ void Client::flush_inode_buffers(Inode *in)
|
||||
dout(7) << "inflight buffers flushed" << endl;
|
||||
#ifdef BUFFERCACHE
|
||||
} else if (!bc.get_fc(in->inode.ino)->dirty_buffers.empty()) {
|
||||
dout(7) << "inode " << in->inode.ino << " has dirty buffers" << endl;
|
||||
Filecache *fc = bc.get_fc(in->inode.ino);
|
||||
fc->simplify();
|
||||
list<Bufferhead*> expired;
|
||||
fc->dirty_buffers.get_expired(0, 0, expired);
|
||||
for (list<Bufferhead*>::iterator it = expired.begin();
|
||||
it != expired.end();
|
||||
dout(7) << "bc: flush_inode_buffers: inode " << in->inode.ino << " has " << fc->dirty_buffers.size() << " dirty buffers" << endl;
|
||||
//fc->simplify();
|
||||
dout(10) << "bc: flush_inode_buffers: after simplify: inode " << in->inode.ino << " has " << fc->dirty_buffers.size() << " dirty buffers" << endl;
|
||||
set<Bufferhead*> to_flush = fc->dirty_buffers;
|
||||
for (set<Bufferhead*>::iterator it = to_flush.begin();
|
||||
it != to_flush.end();
|
||||
it++) {
|
||||
(*it)->flush_start();
|
||||
C_Client_FileFlushFinish *onfinish = new C_Client_FileFlushFinish(fc, *it);
|
||||
(*it)->flush_start(); // Note: invalidates dirty_buffer entries!!!
|
||||
C_Client_FileFlushFinish *onfinish = new C_Client_FileFlushFinish(*it);
|
||||
filer->write(in->inode.ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
|
||||
}
|
||||
dout(7) << "dirty buffers, waiting" << endl;
|
||||
fc->wait_for_flush(client_lock);
|
||||
dout(7) << "flush_inode_buffers: dirty buffers, waiting" << endl;
|
||||
fc->wait_for_inflight(client_lock);
|
||||
#endif
|
||||
} else {
|
||||
dout(7) << "no inflight buffers" << endl;
|
||||
@ -458,19 +448,10 @@ public:
|
||||
Bufferhead *bh;
|
||||
C_Client_FlushFinish(Bufferhead *bh) {
|
||||
this->bh = bh;
|
||||
bh->bc->flushing_buffers.insert(bh);
|
||||
}
|
||||
void finish(int r) {
|
||||
bh->flush_finish();
|
||||
bh->bc->flushing_buffers.erase(bh);
|
||||
if (bh->bc->flushing_buffers.empty()) {
|
||||
for (list<Cond*>::iterator it = bh->bc->waitfor_flushed.begin();
|
||||
it != bh->bc->waitfor_flushed.end();
|
||||
it++) {
|
||||
(*it)->Signal();
|
||||
}
|
||||
bh->bc->waitfor_flushed.clear();
|
||||
}
|
||||
if (bh->bc->inflight_buffers.empty()) bh->bc->wakeup_inflight_waiters();
|
||||
}
|
||||
};
|
||||
|
||||
@ -478,16 +459,20 @@ void Client::flush_buffers(int ttl, size_t dirty_size)
|
||||
{
|
||||
// ttl = 0 or dirty_size = 0: flush all
|
||||
if (!bc.dirty_buffers.empty()) {
|
||||
list<Bufferhead*> expired;
|
||||
dout(6) << "bc: flush_buffers ttl: " << ttl << " dirty_size: " << dirty_size << endl;
|
||||
set<Bufferhead*> expired;
|
||||
bc.dirty_buffers.get_expired(ttl, dirty_size, expired);
|
||||
for (list<Bufferhead*>::iterator it = expired.begin();
|
||||
it != expired.end();
|
||||
it++) {
|
||||
assert(!expired.empty());
|
||||
for (set<Bufferhead*>::iterator it = expired.begin();
|
||||
it != expired.end();
|
||||
it++) {
|
||||
(*it)->flush_start();
|
||||
C_Client_FlushFinish *onfinish = new C_Client_FlushFinish(*it);
|
||||
filer->write((*it)->ino, g_OSD_FileLayout, (*it)->bl.length(), (*it)->offset, (*it)->bl, 0, onfinish);
|
||||
bc.wait_for_flush(client_lock);
|
||||
}
|
||||
dout(7) << "flush_buffers: dirty buffers, waiting" << endl;
|
||||
assert(!bc.inflight_buffers.empty());
|
||||
bc.wait_for_inflight(client_lock);
|
||||
} else {
|
||||
dout(7) << "no dirty buffers" << endl;
|
||||
}
|
||||
@ -497,13 +482,14 @@ void Client::trim_bcache()
|
||||
{
|
||||
if (bc.get_total_size() > g_conf.client_bcache_size) {
|
||||
// need to free buffers
|
||||
if (bc.get_dirty_size() > g_conf.client_bcache_hiwater * bc.get_total_size()) {
|
||||
if (bc.get_dirty_size() > g_conf.client_bcache_hiwater * g_conf.client_bcache_size / 100) {
|
||||
// flush buffers until we have low water mark
|
||||
size_t want_target_size = (size_t) g_conf.client_bcache_lowater * bc.get_total_size();
|
||||
size_t want_target_size = (size_t) g_conf.client_bcache_lowater * g_conf.client_bcache_size / 100;
|
||||
flush_buffers(g_conf.client_bcache_ttl, want_target_size);
|
||||
}
|
||||
// Now reclaim buffers
|
||||
bc.reclaim(bc.get_total_size() - g_conf.client_bcache_size);
|
||||
dout(6) << "bc: trim_bcache: reclaim: " << bc.get_total_size() - g_conf.client_bcache_size * g_conf.client_bcache_hiwater / 100 << endl;
|
||||
bc.reclaim(bc.get_total_size() - g_conf.client_bcache_size * g_conf.client_bcache_hiwater / 100);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1349,13 +1335,13 @@ int Client::read(fileh_t fh, char *buf, size_t size, off_t offset)
|
||||
|
||||
#else
|
||||
// map buffercache
|
||||
map<off_t, Bufferhead*> hits, inflight;
|
||||
map<off_t, Bufferhead*> hits, rx, tx;
|
||||
map<off_t, Bufferhead*>::iterator curbuf;
|
||||
map<off_t, size_t> holes;
|
||||
map<off_t, size_t>::iterator hole;
|
||||
|
||||
Filecache *fc = bc.get_fc(in->inode.ino);
|
||||
curbuf = fc->map_existing(size, offset, hits, inflight, holes);
|
||||
curbuf = fc->map_existing(size, offset, hits, rx, tx, holes);
|
||||
|
||||
if (curbuf != fc->buffer_map.end() && hits.count(curbuf->first)) {
|
||||
// sweet -- we can return stuff immediately: find out how much
|
||||
@ -1394,13 +1380,14 @@ int Client::read(fileh_t fh, char *buf, size_t size, off_t offset)
|
||||
dout(10) << "first buffer is either hit or inflight" << endl;
|
||||
bh = curbuf->second;
|
||||
}
|
||||
if (bh->state == BUFHD_STATE_INFLIGHT) {
|
||||
if (bh->state == BUFHD_STATE_RX || bh->state == BUFHD_STATE_TX) {
|
||||
dout(10) << "waiting for first buffer" << endl;
|
||||
bh->wait_for_read(client_lock);
|
||||
}
|
||||
|
||||
// buffer is filled -- see how much we can return
|
||||
hits.clear(); inflight.clear(); holes.clear();
|
||||
fc->map_existing(size, offset, hits, inflight, holes); // FIXME: overkill
|
||||
hits.clear(); rx.clear(); tx.clear(); holes.clear();
|
||||
fc->map_existing(size, offset, hits, rx, tx, holes); // FIXME: overkill
|
||||
assert(hits.count(bh->offset));
|
||||
rvalue = bc.touch_continuous(hits, size, offset);
|
||||
fc->copy_out(rvalue, offset, buf);
|
||||
@ -1469,14 +1456,18 @@ int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset)
|
||||
dout(7) << "buffered/async write" << endl;
|
||||
|
||||
// map buffercache for writing
|
||||
map<off_t, Bufferhead*> buffers, inflight;
|
||||
bc.map_or_alloc(in->inode.ino, size, offset, buffers, inflight);
|
||||
map<off_t, Bufferhead*> buffers, rx, tx;
|
||||
bc.map_or_alloc(in->inode.ino, size, offset, buffers, rx, tx);
|
||||
|
||||
// wait for inflight buffers
|
||||
while (!inflight.empty()) {
|
||||
inflight.begin()->second->wait_for_write(client_lock);
|
||||
buffers.clear(); inflight.clear();
|
||||
bc.map_or_alloc(in->inode.ino, size, offset, buffers, inflight); // FIXME: overkill
|
||||
// wait for rx and tx buffers -- FIXME: don't need to wait for tx buffers
|
||||
while (!rx.empty() || !tx.empty()) {
|
||||
if (!rx.empty()) {
|
||||
rx.begin()->second->wait_for_write(client_lock);
|
||||
} else {
|
||||
tx.begin()->second->wait_for_write(client_lock);
|
||||
}
|
||||
buffers.clear(); tx.clear(); rx.clear();
|
||||
bc.map_or_alloc(in->inode.ino, size, offset, buffers, rx, tx); // FIXME: overkill
|
||||
}
|
||||
bc.dirty(in->inode.ino, size, offset, buf);
|
||||
|
||||
|
@ -64,9 +64,9 @@ md_config_t g_conf = {
|
||||
client_bcache_alloc_minsize: 1024,
|
||||
client_bcache_alloc_maxsize: 262144,
|
||||
client_bcache_ttl: 30, // seconds until dirty buffers are written to disk
|
||||
client_bcache_size: 10485760, // 10MB *for testing*
|
||||
client_bcache_lowater: .6, // fraction of size
|
||||
client_bcache_hiwater: .8,
|
||||
client_bcache_size: 2147483648, // 2GB
|
||||
client_bcache_lowater: 60, // % of size
|
||||
client_bcache_hiwater: 80, // % of size
|
||||
client_bcache_maxfrag: 10, // max actual relative # of bheads over opt rel # of bheads
|
||||
|
||||
client_trace: 0,
|
||||
|
@ -40,8 +40,8 @@ struct md_config_t {
|
||||
int client_bcache_alloc_maxsize;
|
||||
int client_bcache_ttl;
|
||||
int client_bcache_size;
|
||||
float client_bcache_lowater;
|
||||
float client_bcache_hiwater;
|
||||
int client_bcache_lowater;
|
||||
int client_bcache_hiwater;
|
||||
int client_bcache_maxfrag;
|
||||
|
||||
int client_trace;
|
||||
|
Loading…
Reference in New Issue
Block a user