mirror of
https://github.com/ceph/ceph
synced 2025-02-24 19:47:44 +00:00
* some changes to client cache: readers/writers block properly, wake up when data is flushed
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1250 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
974c3fe013
commit
2bcc95a7e3
@ -875,6 +875,8 @@ void Client::handle_file_caps(MClientFileCaps *m)
|
||||
if (in->file_wr_mtime > in->inode.mtime)
|
||||
m->get_inode().mtime = in->inode.mtime = in->file_wr_mtime;
|
||||
|
||||
|
||||
|
||||
if (g_conf.client_oc) {
|
||||
// caching on, use FileCache.
|
||||
Context *onimplement = 0;
|
||||
@ -2274,7 +2276,6 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
|
||||
tout << size << endl;
|
||||
tout << offset << endl;
|
||||
|
||||
assert(offset >= 0);
|
||||
assert(fh_map.count(fh));
|
||||
Fh *f = fh_map[fh];
|
||||
Inode *in = f->inode;
|
||||
@ -2284,21 +2285,6 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
|
||||
|
||||
bool lazy = f->mode == FILE_MODE_LAZY;
|
||||
|
||||
// do we have read file cap?
|
||||
while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
|
||||
dout(7) << " don't have read cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_read.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
// lazy cap?
|
||||
while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
|
||||
dout(7) << " don't have lazy cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_lazy.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
|
||||
// determine whether read range overlaps with file
|
||||
// ...ONLY if we're doing async io
|
||||
if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) {
|
||||
@ -2332,6 +2318,23 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
|
||||
rvalue = r = in->fc.read(offset, size, blist, client_lock); // may block.
|
||||
} else {
|
||||
// object cache OFF -- legacy inconsistent way.
|
||||
|
||||
// do we have read file cap?
|
||||
while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
|
||||
dout(7) << " don't have read cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_read.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
// lazy cap?
|
||||
while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
|
||||
dout(7) << " don't have lazy cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_lazy.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
|
||||
// do sync read
|
||||
Cond cond;
|
||||
bool done = false;
|
||||
C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
|
||||
@ -2398,7 +2401,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
|
||||
tout << size << endl;
|
||||
tout << offset << endl;
|
||||
|
||||
assert(offset >= 0);
|
||||
assert(fh_map.count(fh));
|
||||
Fh *f = fh_map[fh];
|
||||
Inode *in = f->inode;
|
||||
@ -2410,23 +2412,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
|
||||
|
||||
dout(10) << "cur file size is " << in->inode.size << " wr size " << in->file_wr_size << endl;
|
||||
|
||||
// do we have write file cap?
|
||||
while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
|
||||
dout(7) << " don't have write cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_write.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
|
||||
dout(7) << " don't have lazy cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_lazy.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
|
||||
// adjust fd pos
|
||||
f->pos = offset+size;
|
||||
|
||||
// time it.
|
||||
utime_t start = g_clock.now();
|
||||
|
||||
@ -2440,11 +2425,28 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
|
||||
|
||||
// write (this may block!)
|
||||
in->fc.write(offset, size, blist, client_lock);
|
||||
|
||||
// adjust fd pos
|
||||
f->pos = offset+size;
|
||||
|
||||
} else {
|
||||
// legacy, inconsistent synchronous write.
|
||||
dout(7) << "synchronous write" << endl;
|
||||
|
||||
// do we have write file cap?
|
||||
while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
|
||||
dout(7) << " don't have write cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_write.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
|
||||
dout(7) << " don't have lazy cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_lazy.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
|
||||
// prepare write
|
||||
Cond cond;
|
||||
bool done = false;
|
||||
@ -2460,6 +2462,9 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
|
||||
//, 1+((int)g_clock.now()) / 10 //f->pos // hack hack test osd revision snapshots
|
||||
);
|
||||
|
||||
// adjust fd pos
|
||||
f->pos = offset+size;
|
||||
|
||||
while (!done) {
|
||||
cond.Wait(client_lock);
|
||||
dout(20) << " sync write bump " << onfinish << endl;
|
||||
|
@ -1,3 +1,15 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
#include "include/types.h"
|
||||
@ -8,8 +20,8 @@
|
||||
#include "msg/Messenger.h"
|
||||
|
||||
#undef dout
|
||||
#define dout(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache "
|
||||
#define derr(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache "
|
||||
#define dout(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
|
||||
#define derr(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
|
||||
|
||||
|
||||
// flush/release/clean
|
||||
@ -54,27 +66,51 @@ void FileCache::tear_down()
|
||||
{
|
||||
off_t unclean = release_clean();
|
||||
if (unclean) {
|
||||
dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
|
||||
oc->purge_set(inode.ino);
|
||||
dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
|
||||
oc->purge_set(inode.ino);
|
||||
}
|
||||
}
|
||||
|
||||
// caps
|
||||
|
||||
class C_FC_CheckCaps : public Context {
|
||||
FileCache *fc;
|
||||
public:
|
||||
C_FC_CheckCaps(FileCache *f) : fc(f) {}
|
||||
void finish(int r) {
|
||||
fc->check_caps();
|
||||
}
|
||||
};
|
||||
|
||||
void FileCache::set_caps(int caps, Context *onimplement)
|
||||
{
|
||||
if (onimplement) {
|
||||
dout(10) << "set_caps setting onimplement context for " << cap_string(caps) << endl;
|
||||
assert(latest_caps & ~caps); // we should be losing caps.
|
||||
caps_callbacks[caps].push_back(onimplement);
|
||||
}
|
||||
|
||||
latest_caps = caps;
|
||||
check_caps();
|
||||
|
||||
// kick waiters? (did we gain caps?)
|
||||
if (can_read() && !waitfor_read.empty())
|
||||
for (set<Cond*>::iterator p = waitfor_read.begin();
|
||||
p != waitfor_read.end();
|
||||
++p)
|
||||
(*p)->Signal();
|
||||
if (can_write() && !waitfor_write.empty())
|
||||
for (set<Cond*>::iterator p = waitfor_write.begin();
|
||||
p != waitfor_write.end();
|
||||
++p)
|
||||
(*p)->Signal();
|
||||
|
||||
}
|
||||
|
||||
|
||||
void FileCache::check_caps()
|
||||
{
|
||||
// calc used
|
||||
int used = 0;
|
||||
if (num_reading) used |= CAP_FILE_RD;
|
||||
if (oc->set_is_cached(inode.ino)) used |= CAP_FILE_RDCACHE;
|
||||
@ -82,6 +118,18 @@ void FileCache::check_caps()
|
||||
if (oc->set_is_dirty_or_committing(inode.ino)) used |= CAP_FILE_WRBUFFER;
|
||||
dout(10) << "check_caps used " << cap_string(used) << endl;
|
||||
|
||||
// try to implement caps?
|
||||
// BUG? latest_caps, not least caps i've seen?
|
||||
if ((latest_caps & CAP_FILE_RDCACHE) == 0 &&
|
||||
(used & CAP_FILE_RDCACHE))
|
||||
release_clean();
|
||||
if ((latest_caps & CAP_FILE_WRBUFFER) == 0 &&
|
||||
(used & CAP_FILE_WRBUFFER))
|
||||
flush_dirty(new C_FC_CheckCaps(this));
|
||||
if (latest_caps == 0 &&
|
||||
used != 0)
|
||||
empty(new C_FC_CheckCaps(this));
|
||||
|
||||
// check callbacks
|
||||
map<int, list<Context*> >::iterator p = caps_callbacks.begin();
|
||||
while (p != caps_callbacks.end()) {
|
||||
@ -109,6 +157,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
|
||||
{
|
||||
int r = 0;
|
||||
|
||||
// can i read?
|
||||
while ((latest_caps & CAP_FILE_RD) == 0) {
|
||||
dout(10) << "read doesn't have RD cap, blocking" << endl;
|
||||
Cond c;
|
||||
waitfor_read.insert(&c);
|
||||
c.Wait(client_lock);
|
||||
waitfor_read.erase(&c);
|
||||
}
|
||||
|
||||
// inc reading counter
|
||||
num_reading++;
|
||||
|
||||
@ -145,6 +202,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
|
||||
|
||||
void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock)
|
||||
{
|
||||
// can i write
|
||||
while ((latest_caps & CAP_FILE_WR) == 0) {
|
||||
dout(10) << "write doesn't have WR cap, blocking" << endl;
|
||||
Cond c;
|
||||
waitfor_write.insert(&c);
|
||||
c.Wait(client_lock);
|
||||
waitfor_write.erase(&c);
|
||||
}
|
||||
|
||||
// inc writing counter
|
||||
num_writing++;
|
||||
|
||||
|
@ -1,3 +1,16 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef __FILECACHE_H
|
||||
#define __FILECACHE_H
|
||||
|
||||
@ -22,9 +35,9 @@ class FileCache {
|
||||
//int num_unsafe;
|
||||
|
||||
// waiters
|
||||
list<Cond*> waitfor_read;
|
||||
list<Cond*> waitfor_write;
|
||||
//list<Context*> waitfor_safe;
|
||||
set<Cond*> waitfor_read;
|
||||
set<Cond*> waitfor_write;
|
||||
|
||||
bool waitfor_release;
|
||||
|
||||
public:
|
||||
@ -35,7 +48,7 @@ class FileCache {
|
||||
num_reading(0), num_writing(0),// num_unsafe(0),
|
||||
waitfor_release(false) {}
|
||||
~FileCache() {
|
||||
tear_down();
|
||||
tear_down();
|
||||
}
|
||||
|
||||
// waiters/waiting
|
||||
@ -43,9 +56,7 @@ class FileCache {
|
||||
bool can_write() { return latest_caps & CAP_FILE_WR; }
|
||||
bool all_safe();// { return num_unsafe == 0; }
|
||||
|
||||
void add_read_waiter(Cond *c) { waitfor_read.push_back(c); }
|
||||
void add_write_waiter(Cond *c) { waitfor_write.push_back(c); }
|
||||
void add_safe_waiter(Context *c);// { waitfor_safe.push_back(c); }
|
||||
void add_safe_waiter(Context *c);
|
||||
|
||||
// ...
|
||||
void flush_dirty(Context *onflush=0);
|
||||
|
Loading…
Reference in New Issue
Block a user