mirror of
https://github.com/ceph/ceph
synced 2025-03-11 02:39:05 +00:00
merged trunk changes r1208:1255 into branches/sage/cephmds2
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1256 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
6ed8c55ee2
commit
fb147bf247
@ -970,6 +970,8 @@ void Client::handle_file_caps(MClientFileCaps *m)
|
||||
if (in->file_wr_mtime > in->inode.mtime)
|
||||
m->get_inode().mtime = in->inode.mtime = in->file_wr_mtime;
|
||||
|
||||
|
||||
|
||||
if (g_conf.client_oc) {
|
||||
// caching on, use FileCache.
|
||||
Context *onimplement = 0;
|
||||
@ -1886,7 +1888,7 @@ int Client::getdir(const char *relpath, map<string,inode_t>& contents)
|
||||
if (*pdn == ".")
|
||||
continue;
|
||||
|
||||
// count entries
|
||||
// count entries
|
||||
res++;
|
||||
|
||||
// put in cache
|
||||
@ -2184,7 +2186,8 @@ int Client::open(const char *relpath, int flags)
|
||||
dout(7) << "open got caps " << cap_string(new_caps)
|
||||
<< " for " << f->inode->ino()
|
||||
<< " seq " << reply->get_file_caps_seq()
|
||||
<< " from mds" << mds << endl;
|
||||
<< " from mds" << mds
|
||||
<< endl;
|
||||
|
||||
int old_caps = f->inode->caps[mds].caps;
|
||||
f->inode->caps[mds].caps = new_caps;
|
||||
@ -2201,7 +2204,8 @@ int Client::open(const char *relpath, int flags)
|
||||
dout(7) << "open got SAME caps " << cap_string(new_caps)
|
||||
<< " for " << f->inode->ino()
|
||||
<< " seq " << reply->get_file_caps_seq()
|
||||
<< " from mds" << mds << endl;
|
||||
<< " from mds" << mds
|
||||
<< endl;
|
||||
}
|
||||
|
||||
// put in map
|
||||
@ -2367,7 +2371,6 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
|
||||
tout << size << endl;
|
||||
tout << offset << endl;
|
||||
|
||||
assert(offset >= 0);
|
||||
assert(fh_map.count(fh));
|
||||
Fh *f = fh_map[fh];
|
||||
Inode *in = f->inode;
|
||||
@ -2377,35 +2380,21 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
|
||||
|
||||
bool lazy = f->mode == FILE_MODE_LAZY;
|
||||
|
||||
// do we have read file cap?
|
||||
while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
|
||||
dout(7) << " don't have read cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_read.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
// lazy cap?
|
||||
while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
|
||||
dout(7) << " don't have lazy cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_lazy.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
|
||||
// determine whether read range overlaps with file
|
||||
// ...ONLY if we're doing async io
|
||||
if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) {
|
||||
// we're doing buffered i/o. make sure we're inside the file.
|
||||
// we can trust size info bc we get accurate info when buffering/caching caps are issued.
|
||||
dout(10) << "file size: " << in->inode.size << endl;
|
||||
dout(-10) << "file size: " << in->inode.size << endl;
|
||||
if (offset > 0 && offset >= in->inode.size) {
|
||||
client_lock.Unlock();
|
||||
return 0;
|
||||
}
|
||||
if (offset + size > (unsigned)in->inode.size) size = (unsigned)in->inode.size - offset;
|
||||
if (offset + size > (off_t)in->inode.size)
|
||||
size = (off_t)in->inode.size - offset;
|
||||
|
||||
if (size == 0) {
|
||||
dout(10) << "read is size=0, returning 0" << endl;
|
||||
dout(-10) << "read is size=0, returning 0" << endl;
|
||||
client_lock.Unlock();
|
||||
return 0;
|
||||
}
|
||||
@ -2416,14 +2405,31 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
|
||||
}
|
||||
|
||||
bufferlist blist; // data will go here
|
||||
int rvalue = 0;
|
||||
int r = 0;
|
||||
int rvalue = 0;
|
||||
|
||||
if (g_conf.client_oc) {
|
||||
// object cache ON
|
||||
rvalue = r = in->fc.read(offset, size, blist, client_lock); // may block.
|
||||
} else {
|
||||
// object cache OFF -- legacy inconsistent way.
|
||||
|
||||
// do we have read file cap?
|
||||
while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
|
||||
dout(7) << " don't have read cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_read.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
// lazy cap?
|
||||
while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
|
||||
dout(7) << " don't have lazy cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_lazy.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
|
||||
// do sync read
|
||||
Cond cond;
|
||||
bool done = false;
|
||||
C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
|
||||
@ -2490,7 +2496,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
|
||||
tout << size << endl;
|
||||
tout << offset << endl;
|
||||
|
||||
assert(offset >= 0);
|
||||
assert(fh_map.count(fh));
|
||||
Fh *f = fh_map[fh];
|
||||
Inode *in = f->inode;
|
||||
@ -2502,23 +2507,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
|
||||
|
||||
dout(10) << "cur file size is " << in->inode.size << " wr size " << in->file_wr_size << endl;
|
||||
|
||||
// do we have write file cap?
|
||||
while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
|
||||
dout(7) << " don't have write cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_write.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
|
||||
dout(7) << " don't have lazy cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_lazy.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
|
||||
// adjust fd pos
|
||||
f->pos = offset+size;
|
||||
|
||||
// time it.
|
||||
utime_t start = g_clock.now();
|
||||
|
||||
@ -2532,11 +2520,28 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
|
||||
|
||||
// write (this may block!)
|
||||
in->fc.write(offset, size, blist, client_lock);
|
||||
|
||||
// adjust fd pos
|
||||
f->pos = offset+size;
|
||||
|
||||
} else {
|
||||
// legacy, inconsistent synchronous write.
|
||||
dout(7) << "synchronous write" << endl;
|
||||
|
||||
// do we have write file cap?
|
||||
while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
|
||||
dout(7) << " don't have write cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_write.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
|
||||
dout(7) << " don't have lazy cap, waiting" << endl;
|
||||
Cond cond;
|
||||
in->waitfor_lazy.push_back(&cond);
|
||||
cond.Wait(client_lock);
|
||||
}
|
||||
|
||||
// prepare write
|
||||
Cond cond;
|
||||
bool done = false;
|
||||
@ -2552,6 +2557,9 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
|
||||
//, 1+((int)g_clock.now()) / 10 //f->pos // hack hack test osd revision snapshots
|
||||
);
|
||||
|
||||
// adjust fd pos
|
||||
f->pos = offset+size;
|
||||
|
||||
while (!done) {
|
||||
cond.Wait(client_lock);
|
||||
dout(20) << " sync write bump " << onfinish << endl;
|
||||
|
@ -1,3 +1,15 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
#include "include/types.h"
|
||||
@ -8,8 +20,8 @@
|
||||
#include "msg/Messenger.h"
|
||||
|
||||
#undef dout
|
||||
#define dout(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache "
|
||||
#define derr(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache "
|
||||
#define dout(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
|
||||
#define derr(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
|
||||
|
||||
|
||||
// flush/release/clean
|
||||
@ -54,27 +66,51 @@ void FileCache::tear_down()
|
||||
{
|
||||
off_t unclean = release_clean();
|
||||
if (unclean) {
|
||||
dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
|
||||
oc->purge_set(inode.ino);
|
||||
dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
|
||||
oc->purge_set(inode.ino);
|
||||
}
|
||||
}
|
||||
|
||||
// caps
|
||||
|
||||
class C_FC_CheckCaps : public Context {
|
||||
FileCache *fc;
|
||||
public:
|
||||
C_FC_CheckCaps(FileCache *f) : fc(f) {}
|
||||
void finish(int r) {
|
||||
fc->check_caps();
|
||||
}
|
||||
};
|
||||
|
||||
void FileCache::set_caps(int caps, Context *onimplement)
|
||||
{
|
||||
if (onimplement) {
|
||||
dout(10) << "set_caps setting onimplement context for " << cap_string(caps) << endl;
|
||||
assert(latest_caps & ~caps); // we should be losing caps.
|
||||
caps_callbacks[caps].push_back(onimplement);
|
||||
}
|
||||
|
||||
latest_caps = caps;
|
||||
check_caps();
|
||||
|
||||
// kick waiters? (did we gain caps?)
|
||||
if (can_read() && !waitfor_read.empty())
|
||||
for (set<Cond*>::iterator p = waitfor_read.begin();
|
||||
p != waitfor_read.end();
|
||||
++p)
|
||||
(*p)->Signal();
|
||||
if (can_write() && !waitfor_write.empty())
|
||||
for (set<Cond*>::iterator p = waitfor_write.begin();
|
||||
p != waitfor_write.end();
|
||||
++p)
|
||||
(*p)->Signal();
|
||||
|
||||
}
|
||||
|
||||
|
||||
void FileCache::check_caps()
|
||||
{
|
||||
// calc used
|
||||
int used = 0;
|
||||
if (num_reading) used |= CAP_FILE_RD;
|
||||
if (oc->set_is_cached(inode.ino)) used |= CAP_FILE_RDCACHE;
|
||||
@ -82,6 +118,15 @@ void FileCache::check_caps()
|
||||
if (oc->set_is_dirty_or_committing(inode.ino)) used |= CAP_FILE_WRBUFFER;
|
||||
dout(10) << "check_caps used " << cap_string(used) << endl;
|
||||
|
||||
// try to implement caps?
|
||||
// BUG? latest_caps, not least caps i've seen?
|
||||
if ((latest_caps & CAP_FILE_RDCACHE) == 0 &&
|
||||
(used & CAP_FILE_RDCACHE))
|
||||
release_clean();
|
||||
if ((latest_caps & CAP_FILE_WRBUFFER) == 0 &&
|
||||
(used & CAP_FILE_WRBUFFER))
|
||||
flush_dirty(new C_FC_CheckCaps(this));
|
||||
|
||||
// check callbacks
|
||||
map<int, list<Context*> >::iterator p = caps_callbacks.begin();
|
||||
while (p != caps_callbacks.end()) {
|
||||
@ -109,6 +154,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
|
||||
{
|
||||
int r = 0;
|
||||
|
||||
// can i read?
|
||||
while ((latest_caps & CAP_FILE_RD) == 0) {
|
||||
dout(10) << "read doesn't have RD cap, blocking" << endl;
|
||||
Cond c;
|
||||
waitfor_read.insert(&c);
|
||||
c.Wait(client_lock);
|
||||
waitfor_read.erase(&c);
|
||||
}
|
||||
|
||||
// inc reading counter
|
||||
num_reading++;
|
||||
|
||||
@ -145,6 +199,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
|
||||
|
||||
void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock)
|
||||
{
|
||||
// can i write
|
||||
while ((latest_caps & CAP_FILE_WR) == 0) {
|
||||
dout(10) << "write doesn't have WR cap, blocking" << endl;
|
||||
Cond c;
|
||||
waitfor_write.insert(&c);
|
||||
c.Wait(client_lock);
|
||||
waitfor_write.erase(&c);
|
||||
}
|
||||
|
||||
// inc writing counter
|
||||
num_writing++;
|
||||
|
||||
|
@ -1,3 +1,16 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef __FILECACHE_H
|
||||
#define __FILECACHE_H
|
||||
|
||||
@ -22,9 +35,9 @@ class FileCache {
|
||||
//int num_unsafe;
|
||||
|
||||
// waiters
|
||||
list<Cond*> waitfor_read;
|
||||
list<Cond*> waitfor_write;
|
||||
//list<Context*> waitfor_safe;
|
||||
set<Cond*> waitfor_read;
|
||||
set<Cond*> waitfor_write;
|
||||
|
||||
bool waitfor_release;
|
||||
|
||||
public:
|
||||
@ -35,7 +48,7 @@ class FileCache {
|
||||
num_reading(0), num_writing(0),// num_unsafe(0),
|
||||
waitfor_release(false) {}
|
||||
~FileCache() {
|
||||
tear_down();
|
||||
tear_down();
|
||||
}
|
||||
|
||||
// waiters/waiting
|
||||
@ -43,9 +56,7 @@ class FileCache {
|
||||
bool can_write() { return latest_caps & CAP_FILE_WR; }
|
||||
bool all_safe();// { return num_unsafe == 0; }
|
||||
|
||||
void add_read_waiter(Cond *c) { waitfor_read.push_back(c); }
|
||||
void add_write_waiter(Cond *c) { waitfor_write.push_back(c); }
|
||||
void add_safe_waiter(Context *c);// { waitfor_safe.push_back(c); }
|
||||
void add_safe_waiter(Context *c);
|
||||
|
||||
// ...
|
||||
void flush_dirty(Context *onflush=0);
|
||||
|
@ -737,6 +737,8 @@ int SyntheticClient::clean_dir(string& basedir)
|
||||
for (map<string, inode_t>::iterator it = contents.begin();
|
||||
it != contents.end();
|
||||
it++) {
|
||||
if (it->first == ".") continue;
|
||||
if (it->first == "..") continue;
|
||||
string file = basedir + "/" + it->first;
|
||||
|
||||
if (time_to_stop()) break;
|
||||
@ -1003,17 +1005,16 @@ int SyntheticClient::write_file(string& fn, int size, int wrsize) // size is i
|
||||
}
|
||||
dout(2) << "writing block " << i << "/" << chunks << endl;
|
||||
|
||||
// fill buf with a fingerprint
|
||||
int *p = (int*)buf;
|
||||
// fill buf with a 16 byte fingerprint
|
||||
// 64 bits : file offset
|
||||
// 64 bits : client id
|
||||
// = 128 bits (16 bytes)
|
||||
__uint64_t *p = (__uint64_t*)buf;
|
||||
while ((char*)p < buf + wrsize) {
|
||||
*p = (char*)p - buf;
|
||||
p++;
|
||||
*p = i;
|
||||
*p = i*wrsize + (char*)p - buf;
|
||||
p++;
|
||||
*p = client->get_nodeid();
|
||||
p++;
|
||||
*p = 0;
|
||||
p++;
|
||||
}
|
||||
|
||||
client->write(fd, buf, wrsize, i*wrsize);
|
||||
@ -1048,42 +1049,33 @@ int SyntheticClient::read_file(string& fn, int size, int rdsize) // size is in
|
||||
for (unsigned i=0; i<chunks; i++) {
|
||||
if (time_to_stop()) break;
|
||||
dout(2) << "reading block " << i << "/" << chunks << endl;
|
||||
client->read(fd, buf, rdsize, i*rdsize);
|
||||
int r = client->read(fd, buf, rdsize, i*rdsize);
|
||||
if (r < rdsize) {
|
||||
dout(1) << "read_file got r = " << r << ", probably end of file" << endl;
|
||||
break;
|
||||
}
|
||||
|
||||
// verify fingerprint
|
||||
int *p = (int*)buf;
|
||||
int bad = 0;
|
||||
int boff, bgoff, bchunk, bclient, bzero;
|
||||
__int64_t *p = (__int64_t*)buf;
|
||||
__int64_t readoff, readclient;
|
||||
while ((char*)p + 32 < buf + rdsize) {
|
||||
boff = *p;
|
||||
bgoff = (int)((char*)p - buf);
|
||||
readoff = *p;
|
||||
__int64_t wantoff = i*rdsize + (__int64_t)((char*)p - buf);
|
||||
p++;
|
||||
bchunk = *p;
|
||||
readclient = *p;
|
||||
p++;
|
||||
bclient = *p;
|
||||
p++;
|
||||
bzero = *p;
|
||||
p++;
|
||||
if (boff != bgoff ||
|
||||
bchunk != (int)i ||
|
||||
bclient != client->get_nodeid() ||
|
||||
bzero != 0) {
|
||||
if (readoff != wantoff ||
|
||||
readclient != client->get_nodeid()) {
|
||||
if (!bad)
|
||||
dout(0) << "WARNING: wrong data from OSD, it should be "
|
||||
<< "(block=" << i
|
||||
<< " offset=" << bgoff
|
||||
<< " client=" << client->get_nodeid() << ")"
|
||||
<< " .. but i read back .. "
|
||||
<< "(block=" << bchunk
|
||||
<< " offset=" << boff
|
||||
<< " client=" << bclient << " zero=" << bzero << ")" << endl;
|
||||
|
||||
dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient
|
||||
<< ", should be offset " << wantoff << " clietn " << client->get_nodeid()
|
||||
<< endl;
|
||||
bad++;
|
||||
}
|
||||
}
|
||||
if (bad)
|
||||
dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl;
|
||||
|
||||
}
|
||||
|
||||
client->close(fd);
|
||||
|
@ -177,13 +177,12 @@ static int ceph_write(const char *path, const char *buf, size_t size,
|
||||
return client->write(fh, buf, size, offset);
|
||||
}
|
||||
|
||||
/*
|
||||
static int ceph_flush(const char *path, struct fuse_file_info *fi)
|
||||
{
|
||||
fh_t fh = fi->fh;
|
||||
return client->flush(fh);
|
||||
//fh_t fh = fi->fh;
|
||||
//return client->flush(fh);
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
static int ceph_statfs(const char *path, struct statvfs *stbuf)
|
||||
@ -227,7 +226,7 @@ static struct fuse_operations ceph_oper = {
|
||||
read: ceph_read,
|
||||
write: ceph_write,
|
||||
statfs: ceph_statfs,
|
||||
flush: 0, //ceph_flush,
|
||||
flush: ceph_flush,
|
||||
release: ceph_release,
|
||||
fsync: ceph_fsync
|
||||
};
|
||||
|
@ -1,3 +1,16 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef __crush_BINARYTREE_H
|
||||
#define __crush_BINARYTREE_H
|
||||
|
||||
|
@ -1,3 +1,16 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef __crush_BUCKET_H
|
||||
#define __crush_BUCKET_H
|
||||
|
||||
|
@ -1,3 +1,16 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
|
||||
// Robert Jenkins' function for mixing 32-bit values
|
||||
// http://burtleburtle.net/bob/hash/evahash.html
|
||||
|
@ -1,3 +1,16 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef __crush_CRUSH_H
|
||||
#define __crush_CRUSH_H
|
||||
|
||||
@ -93,7 +106,7 @@ namespace crush {
|
||||
int bucketno;
|
||||
Hash h;
|
||||
|
||||
hash_map<int, int> parent_map; // what bucket each leaf/bucket lives in
|
||||
hash_map<int, int> parent_map; // what bucket each leaf/bucket lives in
|
||||
|
||||
public:
|
||||
map<int, Rule> rules;
|
||||
@ -163,28 +176,28 @@ namespace crush {
|
||||
off += sizeof(r);
|
||||
rules[r]._decode(bl,off);
|
||||
}
|
||||
|
||||
// index
|
||||
build_parent_map();
|
||||
|
||||
// index
|
||||
build_parent_map();
|
||||
}
|
||||
|
||||
void build_parent_map() {
|
||||
parent_map.clear();
|
||||
|
||||
// index every bucket
|
||||
for (map<int, Bucket*>::iterator bp = buckets.begin();
|
||||
bp != buckets.end();
|
||||
++bp) {
|
||||
// index bucket items
|
||||
vector<int> items;
|
||||
bp->second->get_items(items);
|
||||
for (vector<int>::iterator ip = items.begin();
|
||||
ip != items.end();
|
||||
++ip)
|
||||
parent_map[*ip] = bp->first;
|
||||
}
|
||||
}
|
||||
|
||||
void build_parent_map() {
|
||||
parent_map.clear();
|
||||
|
||||
// index every bucket
|
||||
for (map<int, Bucket*>::iterator bp = buckets.begin();
|
||||
bp != buckets.end();
|
||||
++bp) {
|
||||
// index bucket items
|
||||
vector<int> items;
|
||||
bp->second->get_items(items);
|
||||
for (vector<int>::iterator ip = items.begin();
|
||||
ip != items.end();
|
||||
++ip)
|
||||
parent_map[*ip] = bp->first;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public:
|
||||
@ -280,25 +293,25 @@ namespace crush {
|
||||
vector<int>& outvec,
|
||||
bool firstn,
|
||||
set<int>& outset, map<int,float>& overloadmap,
|
||||
bool forcefeed=false,
|
||||
int forcefeedval=-1) {
|
||||
bool forcefeed=false,
|
||||
int forcefeedval=-1) {
|
||||
int off = outvec.size();
|
||||
|
||||
// for each replica
|
||||
for (int rep=0; rep<numrep; rep++) {
|
||||
int outv = -1; // my result
|
||||
|
||||
// forcefeed?
|
||||
if (forcefeed) {
|
||||
forcefeed = false;
|
||||
outvec.push_back(forcefeedval);
|
||||
continue;
|
||||
}
|
||||
|
||||
// forcefeed?
|
||||
if (forcefeed) {
|
||||
forcefeed = false;
|
||||
outvec.push_back(forcefeedval);
|
||||
continue;
|
||||
}
|
||||
|
||||
// keep trying until we get a non-out, non-colliding item
|
||||
int ftotal = 0;
|
||||
bool skip_rep = false;
|
||||
|
||||
|
||||
while (1) {
|
||||
// start with the input bucket
|
||||
Bucket *in = inbucket;
|
||||
@ -415,21 +428,21 @@ namespace crush {
|
||||
//int numresult = 0;
|
||||
result.clear();
|
||||
|
||||
// determine hierarchical context for first.
|
||||
list<int> force_stack;
|
||||
if (forcefeed >= 0) {
|
||||
int t = forcefeed;
|
||||
while (1) {
|
||||
force_stack.push_front(t);
|
||||
if (parent_map.count(t) == 0) break; // reached root, presumably.
|
||||
//cout << " " << t << " parent is " << parent_map[t] << endl;
|
||||
t = parent_map[t];
|
||||
}
|
||||
}
|
||||
|
||||
// determine hierarchical context for first.
|
||||
list<int> force_stack;
|
||||
if (forcefeed >= 0) {
|
||||
int t = forcefeed;
|
||||
while (1) {
|
||||
force_stack.push_front(t);
|
||||
if (parent_map.count(t) == 0) break; // reached root, presumably.
|
||||
//cout << " " << t << " parent is " << parent_map[t] << endl;
|
||||
t = parent_map[t];
|
||||
}
|
||||
}
|
||||
|
||||
// working vector
|
||||
vector<int> w; // working variable
|
||||
|
||||
|
||||
// go through each statement
|
||||
for (vector<RuleStep>::iterator pc = rule.steps.begin();
|
||||
pc != rule.steps.end();
|
||||
@ -442,13 +455,13 @@ namespace crush {
|
||||
{
|
||||
const int arg = pc->args[0];
|
||||
//cout << "take " << arg << endl;
|
||||
|
||||
if (!force_stack.empty()) {
|
||||
int forceval = force_stack.front();
|
||||
force_stack.pop_front();
|
||||
assert(arg == forceval);
|
||||
}
|
||||
|
||||
|
||||
if (!force_stack.empty()) {
|
||||
int forceval = force_stack.front();
|
||||
force_stack.pop_front();
|
||||
assert(arg == forceval);
|
||||
}
|
||||
|
||||
w.clear();
|
||||
w.push_back(arg);
|
||||
}
|
||||
@ -469,26 +482,26 @@ namespace crush {
|
||||
vector<int> out;
|
||||
|
||||
// forcefeeding?
|
||||
bool forcing = false;
|
||||
int forceval;
|
||||
if (!force_stack.empty()) {
|
||||
forceval = force_stack.front();
|
||||
force_stack.pop_front();
|
||||
//cout << "priming out with " << forceval << endl;
|
||||
forcing = true;
|
||||
}
|
||||
|
||||
bool forcing = false;
|
||||
int forceval;
|
||||
if (!force_stack.empty()) {
|
||||
forceval = force_stack.front();
|
||||
force_stack.pop_front();
|
||||
//cout << "priming out with " << forceval << endl;
|
||||
forcing = true;
|
||||
}
|
||||
|
||||
// do each row independently
|
||||
for (vector<int>::iterator i = w.begin();
|
||||
i != w.end();
|
||||
i++) {
|
||||
assert(buckets.count(*i));
|
||||
Bucket *b = buckets[*i];
|
||||
choose(x, numrep, type, b, out, firstn,
|
||||
outset, overloadmap,
|
||||
forcing,
|
||||
forceval);
|
||||
forcing = false; // only once
|
||||
choose(x, numrep, type, b, out, firstn,
|
||||
outset, overloadmap,
|
||||
forcing,
|
||||
forceval);
|
||||
forcing = false; // only once
|
||||
} // for inrow
|
||||
|
||||
// put back into w
|
||||
|
@ -53,31 +53,44 @@ int main(int argc, char **argv, char *envp[]) {
|
||||
// start up network
|
||||
rank.start_rank();
|
||||
|
||||
// start client
|
||||
Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
|
||||
client->init();
|
||||
list<Client*> clients;
|
||||
list<SyntheticClient*> synclients;
|
||||
|
||||
cout << "mounting and starting " << g_conf.num_client << " syn client(s)" << endl;
|
||||
for (int i=0; i<g_conf.num_client; i++) {
|
||||
// start client
|
||||
Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
|
||||
client->init();
|
||||
|
||||
// start syntheticclient
|
||||
SyntheticClient *syn = new SyntheticClient(client);
|
||||
// start syntheticclient
|
||||
SyntheticClient *syn = new SyntheticClient(client);
|
||||
|
||||
// start up fuse
|
||||
// use my argc, argv (make sure you pass a mount point!)
|
||||
cout << "mounting" << endl;
|
||||
client->mount();
|
||||
|
||||
cout << "starting syn client" << endl;
|
||||
syn->start_thread();
|
||||
client->mount();
|
||||
|
||||
syn->start_thread();
|
||||
|
||||
// wait
|
||||
syn->join_thread();
|
||||
clients.push_back(client);
|
||||
synclients.push_back(syn);
|
||||
}
|
||||
|
||||
// unmount
|
||||
client->unmount();
|
||||
cout << "unmounted" << endl;
|
||||
client->shutdown();
|
||||
|
||||
delete client;
|
||||
|
||||
cout << "waiting for client(s) to finish" << endl;
|
||||
while (!clients.empty()) {
|
||||
Client *client = clients.front();
|
||||
SyntheticClient *syn = synclients.front();
|
||||
clients.pop_front();
|
||||
synclients.pop_front();
|
||||
|
||||
// wait
|
||||
syn->join_thread();
|
||||
|
||||
// unmount
|
||||
client->unmount();
|
||||
client->shutdown();
|
||||
|
||||
delete syn;
|
||||
delete client;
|
||||
}
|
||||
|
||||
// wait for messenger to finish
|
||||
rank.wait();
|
||||
|
||||
|
@ -213,6 +213,91 @@ int ObjectCache::find_tx(block_t start, block_t len,
|
||||
}
|
||||
|
||||
|
||||
int ObjectCache::try_map_read(block_t start, block_t len)
|
||||
{
|
||||
map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
|
||||
|
||||
block_t cur = start;
|
||||
block_t left = len;
|
||||
|
||||
if (p != data.begin() &&
|
||||
(p == data.end() || p->first > cur)) {
|
||||
p--; // might overlap!
|
||||
if (p->first + p->second->length() <= cur)
|
||||
p++; // doesn't overlap.
|
||||
}
|
||||
|
||||
int num_missing = 0;
|
||||
|
||||
while (left > 0) {
|
||||
// at end?
|
||||
if (p == data.end()) {
|
||||
// rest is a miss.
|
||||
vector<Extent> exv;
|
||||
on->map_extents(cur,
|
||||
left, // no prefetch here!
|
||||
exv);
|
||||
|
||||
num_missing += exv.size();
|
||||
left = 0;
|
||||
cur = start+len;
|
||||
break;
|
||||
}
|
||||
|
||||
if (p->first <= cur) {
|
||||
// have it (or part of it)
|
||||
BufferHead *e = p->second;
|
||||
|
||||
if (e->is_clean() ||
|
||||
e->is_dirty() ||
|
||||
e->is_tx()) {
|
||||
dout(20) << "try_map_read hit " << *e << endl;
|
||||
}
|
||||
else if (e->is_rx()) {
|
||||
dout(20) << "try_map_read rx " << *e << endl;
|
||||
num_missing++;
|
||||
}
|
||||
else if (e->is_partial()) {
|
||||
dout(-20) << "try_map_read partial " << *e << endl;
|
||||
num_missing++;
|
||||
}
|
||||
else {
|
||||
dout(0) << "try_map_read got unexpected " << *e << endl;
|
||||
assert(0);
|
||||
}
|
||||
|
||||
block_t lenfromcur = MIN(e->end() - cur, left);
|
||||
cur += lenfromcur;
|
||||
left -= lenfromcur;
|
||||
p++;
|
||||
continue; // more?
|
||||
} else if (p->first > cur) {
|
||||
// gap.. miss
|
||||
block_t next = p->first;
|
||||
vector<Extent> exv;
|
||||
on->map_extents(cur,
|
||||
MIN(next-cur, left), // no prefetch
|
||||
exv);
|
||||
|
||||
dout(-20) << "try_map_read gap of " << p->first-cur << " blocks, "
|
||||
<< exv.size() << " extents" << endl;
|
||||
num_missing += exv.size();
|
||||
left -= (p->first - cur);
|
||||
cur = p->first;
|
||||
continue; // more?
|
||||
}
|
||||
else
|
||||
assert(0);
|
||||
}
|
||||
|
||||
assert(left == 0);
|
||||
assert(cur == start+len);
|
||||
return num_missing;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* map a range of blocks into buffer_heads.
|
||||
@ -283,7 +368,7 @@ int ObjectCache::map_read(block_t start, block_t len,
|
||||
dout(20) << "map_read partial " << *e << endl;
|
||||
}
|
||||
else {
|
||||
dout(0) << "map_read ??? " << *e << endl;
|
||||
dout(0) << "map_read ??? got unexpected " << *e << endl;
|
||||
assert(0);
|
||||
}
|
||||
|
||||
@ -725,7 +810,7 @@ void BufferCache::bh_read(Onode *on, BufferHead *bh, block_t from)
|
||||
{
|
||||
dout(10) << "bh_read " << *on << " on " << *bh << endl;
|
||||
|
||||
if (bh->is_missing()) {
|
||||
if (bh->is_missing()) {
|
||||
mark_rx(bh);
|
||||
} else {
|
||||
assert(bh->is_partial());
|
||||
@ -746,7 +831,7 @@ void BufferCache::bh_read(Onode *on, BufferHead *bh, block_t from)
|
||||
// this should be empty!!
|
||||
assert(bh->rx_ioh == 0);
|
||||
|
||||
dout(20) << "bh_read " << *bh << " from " << ex << endl;
|
||||
dout(20) << "bh_read " << *on << " " << *bh << " from " << ex << endl;
|
||||
|
||||
C_OC_RxFinish *fin = new C_OC_RxFinish(ebofs_lock, on->oc,
|
||||
bh->start(), bh->length(),
|
||||
@ -792,7 +877,7 @@ void BufferCache::bh_write(Onode *on, BufferHead *bh, block_t shouldbe)
|
||||
if (shouldbe)
|
||||
assert(ex.length == 1 && ex.start == shouldbe);
|
||||
|
||||
dout(20) << "bh_write " << *bh << " to " << ex << endl;
|
||||
dout(20) << "bh_write " << *on << " " << *bh << " to " << ex << endl;
|
||||
|
||||
//assert(bh->tx_ioh == 0);
|
||||
|
||||
|
@ -403,6 +403,8 @@ class ObjectCache {
|
||||
map<block_t, BufferHead*>& missing, // read these from disk
|
||||
map<block_t, BufferHead*>& rx, // wait for these to finish reading from disk
|
||||
map<block_t, BufferHead*>& partial); // (maybe) wait for these to read from disk
|
||||
int try_map_read(block_t start, block_t len); // just tell us how many extents we're missing.
|
||||
|
||||
|
||||
int map_write(block_t start, block_t len,
|
||||
interval_set<block_t>& alloc,
|
||||
|
@ -30,6 +30,7 @@
|
||||
#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ")."
|
||||
#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ")."
|
||||
|
||||
|
||||
char *nice_blocks(block_t b)
|
||||
{
|
||||
static char s[20];
|
||||
@ -1848,6 +1849,7 @@ int Ebofs::_is_cached(object_t oid, off_t off, size_t len)
|
||||
|
||||
if (!on->have_oc()) {
|
||||
// nothing is cached. return # of extents in file.
|
||||
dout(10) << "_is_cached have onode but no object cache, returning extent count" << endl;
|
||||
return on->extent_map.size();
|
||||
}
|
||||
|
||||
@ -1860,8 +1862,10 @@ int Ebofs::_is_cached(object_t oid, off_t off, size_t len)
|
||||
map<block_t, BufferHead*> missing; // read these
|
||||
map<block_t, BufferHead*> rx; // wait for these
|
||||
map<block_t, BufferHead*> partials; // ??
|
||||
on->get_oc(&bc)->map_read(bstart, blen, hits, missing, rx, partials);
|
||||
return missing.size() + rx.size() + partials.size();
|
||||
|
||||
int num_missing = on->get_oc(&bc)->try_map_read(bstart, blen);
|
||||
dout(7) << "_is_cached try_map_read reports " << num_missing << " missing extents" << endl;
|
||||
return num_missing;
|
||||
|
||||
// FIXME: actually, we should calculate if these extents are contiguous.
|
||||
// and not using map_read, probably...
|
||||
|
@ -71,7 +71,7 @@ public:
|
||||
bool is_suppress() { return suppress; }
|
||||
void set_suppress(bool b) { suppress = b; }
|
||||
|
||||
bool is_null() { return cap_history.empty(); }
|
||||
bool is_null() { return cap_history.empty() && wanted_caps == 0; }
|
||||
|
||||
// most recently issued caps.
|
||||
int pending() {
|
||||
|
@ -939,6 +939,8 @@ void Locker::inode_file_read_finish(CInode *in)
|
||||
|
||||
bool Locker::inode_file_write_start(CInode *in, MClientRequest *m)
|
||||
{
|
||||
dout(7) << "inode_file_write_start on " << *in << endl;
|
||||
|
||||
// can't write?
|
||||
if (!in->filelock.can_write(in->is_auth())) {
|
||||
|
||||
@ -992,7 +994,7 @@ bool Locker::inode_file_write_start(CInode *in, MClientRequest *m)
|
||||
void Locker::inode_file_write_finish(CInode *in)
|
||||
{
|
||||
// drop ref
|
||||
assert(in->filelock.can_write(in->is_auth()));
|
||||
//assert(in->filelock.can_write(in->is_auth()));
|
||||
in->filelock.put_write();
|
||||
dout(7) << "inode_file_write_finish on " << *in << ", filelock=" << in->filelock << endl;
|
||||
|
||||
@ -1866,6 +1868,7 @@ void Locker::dentry_xlock_finish(CDentry *dn, bool quiet)
|
||||
mds->queue_finished(finished);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* onfinish->finish() will be called with
|
||||
* 0 on successful xlock,
|
||||
|
@ -2362,6 +2362,19 @@ void Server::handle_client_open(MClientRequest *req,
|
||||
return;
|
||||
}
|
||||
|
||||
// O_TRUNC
|
||||
if (flags & O_TRUNC) {
|
||||
// write
|
||||
if (!mds->locker->inode_file_write_start(cur, req))
|
||||
return; // fw or (wait for) lock
|
||||
|
||||
// do update
|
||||
cur->inode.size = req->get_sizearg();
|
||||
cur->_mark_dirty(); // fixme
|
||||
|
||||
mds->locker->inode_file_write_finish(cur);
|
||||
}
|
||||
|
||||
|
||||
// hmm, check permissions or something.
|
||||
|
||||
|
@ -156,14 +156,23 @@ int MonitorStore::get_bl_ss(bufferlist& bl, const char *a, const char *b)
|
||||
return 0;
|
||||
}
|
||||
|
||||
// read size
|
||||
__int32_t len = 0;
|
||||
::read(fd, &len, sizeof(len));
|
||||
|
||||
// get size
|
||||
struct stat st;
|
||||
int rc = ::fstat(fd, &st);
|
||||
assert(rc == 0);
|
||||
__int32_t len = st.st_size;
|
||||
|
||||
// read buffer
|
||||
bl.clear();
|
||||
bufferptr bp(len);
|
||||
::read(fd, bp.c_str(), len);
|
||||
int off = 0;
|
||||
while (off < len) {
|
||||
dout(20) << "reading at off " << off << " of " << len << endl;
|
||||
int r = ::read(fd, bp.c_str()+off, len-off);
|
||||
if (r < 0) derr(0) << "errno on read " << strerror(errno) << endl;
|
||||
assert(r>0);
|
||||
off += r;
|
||||
}
|
||||
bl.append(bp);
|
||||
::close(fd);
|
||||
|
||||
@ -193,17 +202,20 @@ int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b)
|
||||
int fd = ::open(tfn, O_WRONLY|O_CREAT);
|
||||
assert(fd);
|
||||
|
||||
// write size
|
||||
__int32_t len = bl.length();
|
||||
::write(fd, &len, sizeof(len));
|
||||
// chmod
|
||||
::fchmod(fd, 0644);
|
||||
|
||||
// write data
|
||||
for (list<bufferptr>::const_iterator it = bl.buffers().begin();
|
||||
it != bl.buffers().end();
|
||||
it++)
|
||||
::write(fd, it->c_str(), it->length());
|
||||
it++) {
|
||||
int r = ::write(fd, it->c_str(), it->length());
|
||||
if (r != (int)it->length())
|
||||
derr(0) << "put_bl_ss ::write() returned " << r << " not " << it->length() << endl;
|
||||
if (r < 0)
|
||||
derr(0) << "put_bl_ss ::write() errored out, errno is " << strerror(errno) << endl;
|
||||
}
|
||||
|
||||
::fchmod(fd, 0644);
|
||||
::fsync(fd);
|
||||
::close(fd);
|
||||
::rename(tfn, fn);
|
||||
|
@ -306,6 +306,9 @@ private:
|
||||
type = PG_TYPE_STARTOSD;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
// construct final PG
|
||||
|
Loading…
Reference in New Issue
Block a user