client: killed FileCache

This commit is contained in:
Sage Weil 2008-05-15 13:37:20 -07:00
parent 675a8ad2e3
commit 636edccd19
5 changed files with 101 additions and 471 deletions

View File

@ -93,7 +93,6 @@ libCrushWrapper.so: crush/CrushWrapper_wrap.cxx libcrush_so.a
## libcephclient.so
libcephclient_so_a_SOURCES = \
client/FileCache.cc \
client/Client.cc \
client/SyntheticClient.cc \
client/Trace.cc \
@ -232,7 +231,6 @@ libosdc_a_SOURCES = \
osdc/Journaler.cc
libclient_a_SOURCES = \
client/FileCache.cc \
client/Client.cc \
client/SyntheticClient.cc \
client/Trace.cc
@ -245,7 +243,6 @@ noinst_HEADERS = \
client/SyntheticClient.h\
client/fuse.h\
client/fuse_ll.h\
client/FileCache.h\
client/Client.h\
common/Clock.h\
common/Cond.h\

View File

@ -77,20 +77,6 @@ Logger *client_logger = 0;
class C_Client_CloseRelease : public Context {
Client *cl;
Inode *in;
public:
C_Client_CloseRelease(Client *c, Inode *i) : cl(c), in(i) {
in->get();
}
void finish(int) {
cl->close_release(in);
}
};
ostream& operator<<(ostream &out, Inode &in)
{
out << in.inode.ino << "("
@ -440,7 +426,7 @@ Inode* Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dle
}
if (!dn) {
Inode *in = new Inode(ist->ino, &ist->layout, objectcacher);
Inode *in = new Inode(ist->ino, &ist->layout);
inode_map[ist->ino] = in;
dn = link(dir, dname, in);
dout(12) << " new dentry+node with ino " << ist->ino << dendl;
@ -551,7 +537,7 @@ Inode* Client::insert_trace(MClientReply *reply, utime_t from)
Inode *curi = 0;
inodeno_t ino = ist[0].ino;
if (!root && ino == 1) {
curi = root = new Inode(ino, &ist[0].layout, objectcacher);
curi = root = new Inode(ino, &ist[0].layout);
dout(10) << "insert_trace new root is " << root << dendl;
inode_map[ino] = root;
root->dir_auth = 0;
@ -1408,31 +1394,56 @@ void Client::signal_cond_list(list<Cond*>& ls)
// flush dirty data (from objectcache)
void Client::_release(Inode *in, bool checkafter)
{
if (in->cap_refs[CEPH_CAP_RDCACHE]) {
objectcacher->release_set(in->inode.ino);
if (checkafter)
put_cap_ref(in, CEPH_CAP_RDCACHE);
else
in->put_cap_ref(CEPH_CAP_RDCACHE);
}
}
struct C_Flush : public Context {
Client *client;
Inode *in;
C_Flush(Client *c, Inode *i) : client(c), in(i) {}
bool checkafter;
C_Flush(Client *c, Inode *i, bool ch) : client(c), in(i), checkafter(ch) {}
void finish(int r) {
client->_flushed(in);
client->_flushed(in, checkafter);
}
};
void Client::_flush(Inode *in)
void Client::_flush(Inode *in, bool checkafter)
{
dout(10) << "_flush " << *in << dendl;
if (in->cap_refs[CEPH_CAP_WRBUFFER] == 1) {
in->get_cap_ref(CEPH_CAP_WRBUFFER); // for the (one!) waiter
in->fc.flush_dirty(0);
in->fc.add_safe_waiter(new C_Flush(this, in));
Context *c = new C_Flush(this, in, checkafter);
bool safe = objectcacher->commit_set(in->inode.ino, c);
if (safe) {
c->finish(0);
delete c;
}
}
}
void Client::_flushed(Inode *in)
void Client::_flushed(Inode *in, bool checkafter)
{
dout(10) << "_flushed " << *in << dendl;
assert(in->cap_refs[CEPH_CAP_WRBUFFER] == 2);
// release clean pages too, if we dont hold RDCACHE reference
if (in->cap_refs[CEPH_CAP_RDCACHE] == 0)
objectcacher->release_set(in->inode.ino);
put_cap_ref(in, CEPH_CAP_WRBUFFER);
put_cap_ref(in, CEPH_CAP_WRBUFFER);
if (checkafter)
put_cap_ref(in, CEPH_CAP_WRBUFFER);
else
in->put_cap_ref(CEPH_CAP_WRBUFFER);
}
@ -1542,8 +1553,15 @@ void Client::handle_file_caps(MClientFileCaps *m)
<< " size " << in->inode.size << " -> " << m->get_size()
<< dendl;
// trim filecache?
if (g_conf.client_oc)
in->fc.truncate(in->inode.size, m->get_size());
if (g_conf.client_oc &&
m->get_size() < in->inode.size) {
// map range to objects
list<ObjectExtent> ls;
filer->file_to_extents(in->inode.ino, &in->inode.layout,
m->get_size(), in->inode.size - m->get_size(),
ls);
objectcacher->truncate_set(in->inode.ino, ls);
}
in->inode.size = m->get_size();
delete m;
@ -1623,10 +1641,10 @@ void Client::handle_file_caps(MClientFileCaps *m)
cap.issued = new_caps;
if ((cap.issued & ~new_caps) & CEPH_CAP_RDCACHE)
in->fc.release_clean();
_release(in, false);
if ((used & ~new_caps) & CEPH_CAP_WRBUFFER)
_flush(in);
_flush(in, false);
else {
ack = true;
cap.implemented = new_caps;
@ -1802,14 +1820,8 @@ int Client::unmount()
p++) {
Inode *in = p->second;
if (!in->caps.empty()) {
in->fc.release_clean();
if (in->fc.is_dirty()) {
dout(10) << "unmount residual caps on " << in->ino() << ", flushing" << dendl;
in->fc.empty(new C_Client_CloseRelease(this, in));
} else {
dout(10) << "unmount residual caps on " << in->ino() << ", releasing" << dendl;
check_caps(in);
}
_release(in);
_flush(in);
}
}
}
@ -2801,8 +2813,7 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui
if (!dn)
in->get_open_ref(f->mode); // i may have alrady added it above!
dout(10) << in->inode.ino << " mode " << cmode
<< " dirty " << in->fc.is_dirty() << " cached " << in->fc.is_cached() << dendl;
dout(10) << in->inode.ino << " mode " << cmode << dendl;
// add the cap
int mds = reply->get_source().num();
@ -2835,9 +2846,6 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui
if (new_caps & ~old_caps)
signal_cond_list(in->waitfor_caps);
if (g_conf.client_oc)
in->fc.set_caps(new_caps);
} else {
dout(7) << "open got SAME caps " << cap_string(new_caps)
<< " for " << in->ino()
@ -2860,24 +2868,6 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui
void Client::close_release(Inode *in)
{
dout(10) << "close_release on " << in->ino() << dendl;
dout(10) << in->inode.ino
<< " dirty " << in->fc.is_dirty() << " cached " << in->fc.is_cached() << dendl;
check_caps(in);
put_inode(in);
}
void Client::close_safe(Inode *in)
{
dout(10) << "close_safe on " << in->ino() << dendl;
put_inode(in);
if (unmounting)
mount_cond.Signal();
}
int Client::close(int fd)
{
@ -3014,24 +3004,26 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl)
bool lazy = f->mode == CEPH_FILE_MODE_LAZY;
// wait for RD cap and/or a valid file size
int issued;
while (1) {
issued = in->caps_issued();
if (lazy) {
// wait for lazy cap
if ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
if ((issued & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
goto wait;
}
} else {
// wait for RD cap?
while ((in->caps_issued() & CEPH_CAP_RD) == 0) {
while ((issued & CEPH_CAP_RD) == 0) {
dout(7) << " don't have read cap, waiting" << dendl;
goto wait;
}
}
// async i/o?
if ((in->caps_issued() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) {
if ((issued & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) {
// FIXME: this logic needs to move info FileCache!
@ -3056,39 +3048,57 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl)
}
break;
} else {
// unbuffered, sync i/o. defer to osd.
// unbuffered, sync i/o. we will defer to osd.
break;
}
wait:
wait_on_list(in->waitfor_caps);
}
in->get_cap_ref(CEPH_CAP_RD);
int rvalue = 0;
Cond cond;
bool done = false;
Context *onfinish = new C_SafeCond(&client_lock, &cond, &done, &rvalue);
int r = 0;
int rvalue = 0;
if (g_conf.client_oc) {
// object cache ON
rvalue = r = in->fc.read(offset, size, *bl, client_lock); // may block.
if (issued & CEPH_CAP_RDCACHE) {
// we will populate the cache here
if (in->cap_refs[CEPH_CAP_RDCACHE] == 0)
in->get_cap_ref(CEPH_CAP_RDCACHE);
// read (and possibly block)
r = objectcacher->file_read(in->inode.ino, &in->inode.layout, offset, size, bl, 0, onfinish);
if (r == 0) {
while (!done)
cond.Wait(client_lock);
r = rvalue;
} else {
// it was cached.
delete onfinish;
}
} else {
r = objectcacher->file_atomic_sync_read(in->inode.ino, &in->inode.layout, offset, size, bl, 0, client_lock);
}
} else {
// object cache OFF -- legacy inconsistent way.
// object cache OFF -- non-atomic sync read from osd
// do sync read
Cond cond;
bool done = false;
Context *onfinish = new C_SafeCond(&client_lock, &cond, &done, &rvalue);
Objecter::OSDRead *rd = filer->prepare_read(in->inode, offset, size, bl, 0);
if (in->hack_balance_reads || g_conf.client_hack_balance_reads)
rd->flags |= CEPH_OSD_OP_BALANCE_READS;
r = objecter->readx(rd, onfinish);
assert(r >= 0);
// wait!
while (!done)
cond.Wait(client_lock);
r = rvalue;
}
if (movepos) {
@ -3185,8 +3195,8 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf)
// copy into fresh buffer (since our write may be resub, async)
bufferptr bp;
if (size > 0) bp = buffer::copy(buf, size);
bufferlist blist;
blist.push_back( bp );
bufferlist bl;
bl.push_back( bp );
// request larger max_size?
__u64 endoff = offset + size;
@ -3220,13 +3230,13 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf)
in->get_cap_ref(CEPH_CAP_WRBUFFER);
// wait? (this may block!)
oc->wait_for_write(size, client_lock);
objectcacher->wait_for_write(size, client_lock);
// async, caching, non-blocking.
oc->file_write(ino, &layout, offset, size, blist, 0);
objectcacher->file_write(in->inode.ino, &in->inode.layout, offset, size, bl, 0);
} else {
// atomic, synchronous, blocking.
oc->file_atomic_sync_write(ino, &layout, offset, size, blist, 0, client_lock);
objectcacher->file_atomic_sync_write(in->inode.ino, &in->inode.layout, offset, size, bl, 0, client_lock);
}
} else {
// simple, non-atomic sync write
@ -3238,8 +3248,7 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf)
unsafe_sync_write++;
in->get_cap_ref(CEPH_CAP_WRBUFFER);
filer->write(in->inode, offset, size, blist, 0,
onfinish, onsafe);
filer->write(in->inode, offset, size, bl, 0, onfinish, onsafe);
while (!done)
cond.Wait(client_lock);
@ -3344,7 +3353,7 @@ int Client::_fsync(Fh *f, bool syncdataonly)
Inode *in = f->inode;
dout(3) << "_fsync(" << f << ", " << (syndataonly ? "dataonly)":"data+metadata)") << dendl;
dout(3) << "_fsync(" << f << ", " << (syncdataonly ? "dataonly)":"data+metadata)") << dendl;
// metadata?
if (!syncdataonly)
@ -3464,27 +3473,9 @@ int Client::lazyio_propogate(int fd, off_t offset, size_t count)
assert(fd_map.count(fd));
Fh *f = fd_map[fd];
Inode *in = f->inode;
if (f->mode & CEPH_FILE_MODE_LAZY) {
// wait for lazy cap
while ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
wait_on_list(in->waitfor_caps);
}
if (g_conf.client_oc) {
Cond cond;
bool done = false;
in->fc.flush_dirty(new C_SafeCond(&client_lock, &cond, &done));
while (!done)
cond.Wait(client_lock);
} else {
// mmm, nothin to do.
}
}
// for now
_fsync(f, true);
client_lock.Unlock();
return 0;
@ -3500,20 +3491,8 @@ int Client::lazyio_synchronize(int fd, off_t offset, size_t count)
Fh *f = fd_map[fd];
Inode *in = f->inode;
if (f->mode & CEPH_FILE_MODE_LAZY) {
// wait for lazy cap
while ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
wait_on_list(in->waitfor_caps);
}
if (g_conf.client_oc) {
in->fc.flush_dirty(0); // flush to invalidate.
in->fc.release_clean();
} else {
// mm, nothin to do.
}
}
_fsync(f, true);
_release(in);
client_lock.Unlock();
return 0;

View File

@ -35,7 +35,7 @@
#include "common/Mutex.h"
#include "common/Timer.h"
#include "FileCache.h"
//#include "FileCache.h"
// stl
@ -163,10 +163,6 @@ class Inode {
fragtree_t dirfragtree;
map<frag_t,int> fragmap; // known frag -> mds mappings
// for caching i/o mode
FileCache fc;
list<Cond*> waitfor_caps;
list<Cond*> waitfor_commit;
@ -200,14 +196,13 @@ class Inode {
ll_ref -= n;
}
Inode(inodeno_t ino, ceph_file_layout *layout, ObjectCacher *_oc) :
Inode(inodeno_t ino, ceph_file_layout *layout) :
//inode(_inode),
lease_mask(0), lease_mds(-1),
dir_auth(-1), dir_hashed(false), dir_replicated(false),
wanted_max_size(0), requested_max_size(0),
ref(0), ll_ref(0),
dir(0), dn(0), symlink(0),
fc(_oc, ino, layout),
hack_balance_reads(false)
{
//memset(open_by_mode, 0, sizeof(int)*CEPH_FILE_MODE_NUM);
@ -749,8 +744,10 @@ protected:
void handle_file_caps(class MClientFileCaps *m);
void check_caps(Inode *in);
void put_cap_ref(Inode *in, int cap);
void _flush(Inode *in);
void _flushed(Inode *in);
void _release(Inode *in, bool checkafter=true);
void _flush(Inode *in, bool checkafter=true);
void _flushed(Inode *in, bool checkafter);
void close_release(Inode *in);
void close_safe(Inode *in);

View File

@ -1,257 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "include/types.h"
#include "FileCache.h"
#include "osdc/ObjectCacher.h"
#include "msg/Messenger.h"
#include "config.h"
#define dout(x) if (x <= g_conf.debug_client) *_dout << dbeginl << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
#define derr(x) if (x <= g_conf.debug_client) *_derr << dbeginl << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
// flush/release/clean
void FileCache::flush_dirty(Context *onflush)
{
if (oc->flush_set(ino, onflush)) {
onflush->finish(0);
delete onflush;
}
}
off_t FileCache::release_clean()
{
return oc->release_set(ino);
}
bool FileCache::is_cached()
{
return oc->set_is_cached(ino);
}
bool FileCache::is_dirty()
{
return oc->set_is_dirty_or_committing(ino);
}
void FileCache::empty(Context *onempty)
{
off_t unclean = release_clean();
bool clean = oc->flush_set(ino, onempty);
assert(!unclean == clean);
if (clean) {
onempty->finish(0);
delete onempty;
}
}
void FileCache::tear_down()
{
off_t unclean = release_clean();
if (unclean) {
dout(0) << "tear_down " << unclean << " unclean bytes, purging" << dendl;
oc->purge_set(ino);
}
}
// truncate
void FileCache::truncate(off_t olds, off_t news)
{
dout(5) << "truncate " << olds << " -> " << news << dendl;
// map range to objects
list<ObjectExtent> ls;
oc->filer.file_to_extents(ino, &layout, news, olds-news, ls);
oc->truncate_set(ino, ls);
}
// caps
class C_FC_CheckCaps : public Context {
FileCache *fc;
public:
C_FC_CheckCaps(FileCache *f) : fc(f) {}
void finish(int r) {
fc->check_caps();
}
};
void FileCache::set_caps(int caps, Context *onimplement)
{
if (onimplement) {
dout(10) << "set_caps setting onimplement context for " << cap_string(caps) << dendl;
assert(latest_caps & ~caps); // we should be losing caps.
caps_callbacks[caps].push_back(onimplement);
}
latest_caps = caps;
check_caps();
// kick waiters? (did we gain caps?)
if (can_read() && !waitfor_read.empty())
for (set<Cond*>::iterator p = waitfor_read.begin();
p != waitfor_read.end();
++p)
(*p)->Signal();
if (can_write() && !waitfor_write.empty())
for (set<Cond*>::iterator p = waitfor_write.begin();
p != waitfor_write.end();
++p)
(*p)->Signal();
}
int FileCache::get_used_caps()
{
int used = 0;
if (num_reading) used |= CEPH_CAP_RD;
if (oc->set_is_cached(ino)) used |= CEPH_CAP_RDCACHE;
if (num_writing) used |= CEPH_CAP_WR;
if (oc->set_is_dirty_or_committing(ino)) used |= CEPH_CAP_WRBUFFER;
return used;
}
void FileCache::check_caps()
{
// calc used
int used = get_used_caps();
dout(10) << "check_caps used was " << cap_string(used) << dendl;
// try to implement caps?
// BUG? latest_caps, not least caps i've seen?
if ((latest_caps & CEPH_CAP_RDCACHE) == 0 &&
(used & CEPH_CAP_RDCACHE))
release_clean();
if ((latest_caps & CEPH_CAP_WRBUFFER) == 0 &&
(used & CEPH_CAP_WRBUFFER))
flush_dirty(new C_FC_CheckCaps(this));
used = get_used_caps();
dout(10) << "check_caps used now " << cap_string(used) << dendl;
// check callbacks
map<int, list<Context*> >::iterator p = caps_callbacks.begin();
while (p != caps_callbacks.end()) {
if (used == 0 || (~(p->first) & used) == 0) {
// implemented.
dout(10) << "check_caps used is " << cap_string(used)
<< ", caps " << cap_string(p->first) << " implemented, doing callback(s)" << dendl;
finish_contexts(p->second);
map<int, list<Context*> >::iterator o = p;
p++;
caps_callbacks.erase(o);
} else {
dout(10) << "check_caps used is " << cap_string(used)
<< ", caps " << cap_string(p->first) << " not yet implemented" << dendl;
p++;
}
}
}
// read/write
int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock)
{
int r = 0;
// can i read?
while ((latest_caps & CEPH_CAP_RD) == 0) {
dout(10) << "read doesn't have RD cap, blocking" << dendl;
Cond c;
waitfor_read.insert(&c);
c.Wait(client_lock);
waitfor_read.erase(&c);
}
// inc reading counter
num_reading++;
if (latest_caps & CEPH_CAP_RDCACHE) {
// read (and block)
Cond cond;
bool done = false;
int rvalue = 0;
C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
r = oc->file_read(ino, &layout, offset, size, &blist, 0, onfinish);
if (r == 0) {
// block
while (!done)
cond.Wait(client_lock);
r = rvalue;
} else {
// it was cached.
delete onfinish;
}
} else {
r = oc->file_atomic_sync_read(ino, &layout, offset, size, &blist, 0, client_lock);
}
// dec reading counter
num_reading--;
if (num_reading == 0 && !caps_callbacks.empty())
check_caps();
return r;
}
void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock)
{
// inc writing counter
num_writing++;
if (size > 0) {
if (latest_caps & CEPH_CAP_WRBUFFER) { // caps buffered write?
// wait? (this may block!)
oc->wait_for_write(size, client_lock);
// async, caching, non-blocking.
oc->file_write(ino, &layout, offset, size, blist, 0);
} else {
// atomic, synchronous, blocking.
oc->file_atomic_sync_write(ino, &layout, offset, size, blist, 0, client_lock);
}
}
// dec writing counter
num_writing--;
if (num_writing == 0 && !caps_callbacks.empty())
check_caps();
}
bool FileCache::all_safe()
{
return !oc->set_is_dirty_or_committing(ino);
}
void FileCache::add_safe_waiter(Context *c)
{
bool safe = oc->commit_set(ino, c);
if (safe) {
c->finish(0);
delete c;
}
}

View File

@ -1,86 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef __FILECACHE_H
#define __FILECACHE_H
#include <iostream>
using std::iostream;
#include "common/Cond.h"
#include "mds/Capability.h"
class ObjectCacher;
class FileCache {
ObjectCacher *oc;
inodeno_t ino;
ceph_file_layout layout;
// caps
int latest_caps;
map<int, list<Context*> > caps_callbacks;
int num_reading;
int num_writing;
//int num_unsafe;
// waiters
set<Cond*> waitfor_read;
set<Cond*> waitfor_write;
bool waitfor_release;
public:
FileCache(ObjectCacher *_oc, inodeno_t i, ceph_file_layout *l) :
oc(_oc),
ino(i), layout(*l),
latest_caps(0),
num_reading(0), num_writing(0),// num_unsafe(0),
waitfor_release(false) {}
~FileCache() {
tear_down();
}
// waiters/waiting
bool can_read() { return latest_caps & CEPH_CAP_RD; }
bool can_write() { return latest_caps & CEPH_CAP_WR; }
bool all_safe();// { return num_unsafe == 0; }
void add_safe_waiter(Context *c);
void truncate(off_t olds, off_t news);
// ...
void flush_dirty(Context *onflush=0);
off_t release_clean();
void empty(Context *onempty=0);
bool is_empty() { return !(is_cached() || is_dirty()); }
bool is_cached();
bool is_dirty();
void tear_down();
int get_caps() { return latest_caps; }
int get_used_caps();
void set_caps(int caps, Context *onimplement=0);
void check_caps();
int read(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock); // may block.
void write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock); // may block.
};
#endif