From fb147bf2473ec0c84a9bcdf03d3a2e353899ffd2 Mon Sep 17 00:00:00 2001 From: sageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9> Date: Fri, 16 Mar 2007 19:17:48 +0000 Subject: [PATCH] merged trunk changes r1208:1255 into branches/sage/cephmds2 git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1256 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/client/Client.cc | 90 ++++++----- branches/sage/cephmds2/client/FileCache.cc | 71 ++++++++- branches/sage/cephmds2/client/FileCache.h | 25 ++- .../sage/cephmds2/client/SyntheticClient.cc | 54 +++---- branches/sage/cephmds2/client/fuse.cc | 9 +- branches/sage/cephmds2/crush/BinaryTree.h | 13 ++ branches/sage/cephmds2/crush/Bucket.h | 13 ++ branches/sage/cephmds2/crush/Hash.h | 13 ++ branches/sage/cephmds2/crush/crush.h | 143 ++++++++++-------- branches/sage/cephmds2/csyn.cc | 55 ++++--- branches/sage/cephmds2/ebofs/BufferCache.cc | 93 +++++++++++- branches/sage/cephmds2/ebofs/BufferCache.h | 2 + branches/sage/cephmds2/ebofs/Ebofs.cc | 8 +- branches/sage/cephmds2/mds/Capability.h | 2 +- branches/sage/cephmds2/mds/Locker.cc | 5 +- branches/sage/cephmds2/mds/Server.cc | 13 ++ branches/sage/cephmds2/mon/MonitorStore.cc | 34 +++-- branches/sage/cephmds2/osd/OSDMap.h | 3 + 18 files changed, 453 insertions(+), 193 deletions(-) diff --git a/branches/sage/cephmds2/client/Client.cc b/branches/sage/cephmds2/client/Client.cc index 736a5960bb7..9885bab970e 100644 --- a/branches/sage/cephmds2/client/Client.cc +++ b/branches/sage/cephmds2/client/Client.cc @@ -970,6 +970,8 @@ void Client::handle_file_caps(MClientFileCaps *m) if (in->file_wr_mtime > in->inode.mtime) m->get_inode().mtime = in->inode.mtime = in->file_wr_mtime; + + if (g_conf.client_oc) { // caching on, use FileCache. Context *onimplement = 0; @@ -1886,7 +1888,7 @@ int Client::getdir(const char *relpath, map<string,inode_t>& contents) if (*pdn == ".") continue; - // count entries + // count entries res++; // put in cache @@ -2184,7 +2186,8 @@ int Client::open(const char *relpath, int flags) dout(7) << "open got caps " << cap_string(new_caps) << " for " << f->inode->ino() << " seq " << reply->get_file_caps_seq() - << " from mds" << mds << endl; + << " from mds" << mds + << endl; int old_caps = f->inode->caps[mds].caps; f->inode->caps[mds].caps = new_caps; @@ -2201,7 +2204,8 @@ int Client::open(const char *relpath, int flags) dout(7) << "open got SAME caps " << cap_string(new_caps) << " for " << f->inode->ino() << " seq " << reply->get_file_caps_seq() - << " from mds" << mds << endl; + << " from mds" << mds + << endl; } // put in map @@ -2367,7 +2371,6 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset) tout << size << endl; tout << offset << endl; - assert(offset >= 0); assert(fh_map.count(fh)); Fh *f = fh_map[fh]; Inode *in = f->inode; @@ -2377,35 +2380,21 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset) bool lazy = f->mode == FILE_MODE_LAZY; - // do we have read file cap? - while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) { - dout(7) << " don't have read cap, waiting" << endl; - Cond cond; - in->waitfor_read.push_back(&cond); - cond.Wait(client_lock); - } - // lazy cap? - while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { - dout(7) << " don't have lazy cap, waiting" << endl; - Cond cond; - in->waitfor_lazy.push_back(&cond); - cond.Wait(client_lock); - } - // determine whether read range overlaps with file // ...ONLY if we're doing async io if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) { // we're doing buffered i/o. make sure we're inside the file. // we can trust size info bc we get accurate info when buffering/caching caps are issued. - dout(10) << "file size: " << in->inode.size << endl; + dout(-10) << "file size: " << in->inode.size << endl; if (offset > 0 && offset >= in->inode.size) { client_lock.Unlock(); return 0; } - if (offset + size > (unsigned)in->inode.size) size = (unsigned)in->inode.size - offset; + if (offset + size > (off_t)in->inode.size) + size = (off_t)in->inode.size - offset; if (size == 0) { - dout(10) << "read is size=0, returning 0" << endl; + dout(-10) << "read is size=0, returning 0" << endl; client_lock.Unlock(); return 0; } @@ -2416,14 +2405,31 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset) } bufferlist blist; // data will go here - int rvalue = 0; int r = 0; + int rvalue = 0; if (g_conf.client_oc) { // object cache ON rvalue = r = in->fc.read(offset, size, blist, client_lock); // may block. } else { // object cache OFF -- legacy inconsistent way. + + // do we have read file cap? + while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) { + dout(7) << " don't have read cap, waiting" << endl; + Cond cond; + in->waitfor_read.push_back(&cond); + cond.Wait(client_lock); + } + // lazy cap? + while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { + dout(7) << " don't have lazy cap, waiting" << endl; + Cond cond; + in->waitfor_lazy.push_back(&cond); + cond.Wait(client_lock); + } + + // do sync read Cond cond; bool done = false; C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue); @@ -2490,7 +2496,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) tout << size << endl; tout << offset << endl; - assert(offset >= 0); assert(fh_map.count(fh)); Fh *f = fh_map[fh]; Inode *in = f->inode; @@ -2502,23 +2507,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) dout(10) << "cur file size is " << in->inode.size << " wr size " << in->file_wr_size << endl; - // do we have write file cap? - while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) { - dout(7) << " don't have write cap, waiting" << endl; - Cond cond; - in->waitfor_write.push_back(&cond); - cond.Wait(client_lock); - } - while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { - dout(7) << " don't have lazy cap, waiting" << endl; - Cond cond; - in->waitfor_lazy.push_back(&cond); - cond.Wait(client_lock); - } - - // adjust fd pos - f->pos = offset+size; - // time it. utime_t start = g_clock.now(); @@ -2532,11 +2520,28 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) // write (this may block!) in->fc.write(offset, size, blist, client_lock); + + // adjust fd pos + f->pos = offset+size; } else { // legacy, inconsistent synchronous write. dout(7) << "synchronous write" << endl; + // do we have write file cap? + while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) { + dout(7) << " don't have write cap, waiting" << endl; + Cond cond; + in->waitfor_write.push_back(&cond); + cond.Wait(client_lock); + } + while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { + dout(7) << " don't have lazy cap, waiting" << endl; + Cond cond; + in->waitfor_lazy.push_back(&cond); + cond.Wait(client_lock); + } + // prepare write Cond cond; bool done = false; @@ -2552,6 +2557,9 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) //, 1+((int)g_clock.now()) / 10 //f->pos // hack hack test osd revision snapshots ); + // adjust fd pos + f->pos = offset+size; + while (!done) { cond.Wait(client_lock); dout(20) << " sync write bump " << onfinish << endl; diff --git a/branches/sage/cephmds2/client/FileCache.cc b/branches/sage/cephmds2/client/FileCache.cc index 2a1dd1576ae..5cc6d9ff796 100644 --- a/branches/sage/cephmds2/client/FileCache.cc +++ b/branches/sage/cephmds2/client/FileCache.cc @@ -1,3 +1,15 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ #include "config.h" #include "include/types.h" @@ -8,8 +20,8 @@ #include "msg/Messenger.h" #undef dout -#define dout(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache " -#define derr(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache " +#define dout(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache " +#define derr(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache " // flush/release/clean @@ -54,27 +66,51 @@ void FileCache::tear_down() { off_t unclean = release_clean(); if (unclean) { - dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl; - oc->purge_set(inode.ino); + dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl; + oc->purge_set(inode.ino); } } // caps +class C_FC_CheckCaps : public Context { + FileCache *fc; +public: + C_FC_CheckCaps(FileCache *f) : fc(f) {} + void finish(int r) { + fc->check_caps(); + } +}; + void FileCache::set_caps(int caps, Context *onimplement) { if (onimplement) { + dout(10) << "set_caps setting onimplement context for " << cap_string(caps) << endl; assert(latest_caps & ~caps); // we should be losing caps. caps_callbacks[caps].push_back(onimplement); } latest_caps = caps; check_caps(); + + // kick waiters? (did we gain caps?) + if (can_read() && !waitfor_read.empty()) + for (set<Cond*>::iterator p = waitfor_read.begin(); + p != waitfor_read.end(); + ++p) + (*p)->Signal(); + if (can_write() && !waitfor_write.empty()) + for (set<Cond*>::iterator p = waitfor_write.begin(); + p != waitfor_write.end(); + ++p) + (*p)->Signal(); + } void FileCache::check_caps() { + // calc used int used = 0; if (num_reading) used |= CAP_FILE_RD; if (oc->set_is_cached(inode.ino)) used |= CAP_FILE_RDCACHE; @@ -82,6 +118,15 @@ void FileCache::check_caps() if (oc->set_is_dirty_or_committing(inode.ino)) used |= CAP_FILE_WRBUFFER; dout(10) << "check_caps used " << cap_string(used) << endl; + // try to implement caps? + // BUG? latest_caps, not least caps i've seen? + if ((latest_caps & CAP_FILE_RDCACHE) == 0 && + (used & CAP_FILE_RDCACHE)) + release_clean(); + if ((latest_caps & CAP_FILE_WRBUFFER) == 0 && + (used & CAP_FILE_WRBUFFER)) + flush_dirty(new C_FC_CheckCaps(this)); + // check callbacks map<int, list<Context*> >::iterator p = caps_callbacks.begin(); while (p != caps_callbacks.end()) { @@ -109,6 +154,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ { int r = 0; + // can i read? + while ((latest_caps & CAP_FILE_RD) == 0) { + dout(10) << "read doesn't have RD cap, blocking" << endl; + Cond c; + waitfor_read.insert(&c); + c.Wait(client_lock); + waitfor_read.erase(&c); + } + // inc reading counter num_reading++; @@ -145,6 +199,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock) { + // can i write + while ((latest_caps & CAP_FILE_WR) == 0) { + dout(10) << "write doesn't have WR cap, blocking" << endl; + Cond c; + waitfor_write.insert(&c); + c.Wait(client_lock); + waitfor_write.erase(&c); + } + // inc writing counter num_writing++; diff --git a/branches/sage/cephmds2/client/FileCache.h b/branches/sage/cephmds2/client/FileCache.h index 6bef22f4e0c..d710d38c073 100644 --- a/branches/sage/cephmds2/client/FileCache.h +++ b/branches/sage/cephmds2/client/FileCache.h @@ -1,3 +1,16 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + #ifndef __FILECACHE_H #define __FILECACHE_H @@ -22,9 +35,9 @@ class FileCache { //int num_unsafe; // waiters - list<Cond*> waitfor_read; - list<Cond*> waitfor_write; - //list<Context*> waitfor_safe; + set<Cond*> waitfor_read; + set<Cond*> waitfor_write; + bool waitfor_release; public: @@ -35,7 +48,7 @@ class FileCache { num_reading(0), num_writing(0),// num_unsafe(0), waitfor_release(false) {} ~FileCache() { - tear_down(); + tear_down(); } // waiters/waiting @@ -43,9 +56,7 @@ class FileCache { bool can_write() { return latest_caps & CAP_FILE_WR; } bool all_safe();// { return num_unsafe == 0; } - void add_read_waiter(Cond *c) { waitfor_read.push_back(c); } - void add_write_waiter(Cond *c) { waitfor_write.push_back(c); } - void add_safe_waiter(Context *c);// { waitfor_safe.push_back(c); } + void add_safe_waiter(Context *c); // ... void flush_dirty(Context *onflush=0); diff --git a/branches/sage/cephmds2/client/SyntheticClient.cc b/branches/sage/cephmds2/client/SyntheticClient.cc index 4da2e572d9c..101451e8b26 100644 --- a/branches/sage/cephmds2/client/SyntheticClient.cc +++ b/branches/sage/cephmds2/client/SyntheticClient.cc @@ -737,6 +737,8 @@ int SyntheticClient::clean_dir(string& basedir) for (map<string, inode_t>::iterator it = contents.begin(); it != contents.end(); it++) { + if (it->first == ".") continue; + if (it->first == "..") continue; string file = basedir + "/" + it->first; if (time_to_stop()) break; @@ -1003,17 +1005,16 @@ int SyntheticClient::write_file(string& fn, int size, int wrsize) // size is i } dout(2) << "writing block " << i << "/" << chunks << endl; - // fill buf with a fingerprint - int *p = (int*)buf; + // fill buf with a 16 byte fingerprint + // 64 bits : file offset + // 64 bits : client id + // = 128 bits (16 bytes) + __uint64_t *p = (__uint64_t*)buf; while ((char*)p < buf + wrsize) { - *p = (char*)p - buf; - p++; - *p = i; + *p = i*wrsize + (char*)p - buf; p++; *p = client->get_nodeid(); p++; - *p = 0; - p++; } client->write(fd, buf, wrsize, i*wrsize); @@ -1048,42 +1049,33 @@ int SyntheticClient::read_file(string& fn, int size, int rdsize) // size is in for (unsigned i=0; i<chunks; i++) { if (time_to_stop()) break; dout(2) << "reading block " << i << "/" << chunks << endl; - client->read(fd, buf, rdsize, i*rdsize); + int r = client->read(fd, buf, rdsize, i*rdsize); + if (r < rdsize) { + dout(1) << "read_file got r = " << r << ", probably end of file" << endl; + break; + } // verify fingerprint - int *p = (int*)buf; int bad = 0; - int boff, bgoff, bchunk, bclient, bzero; + __int64_t *p = (__int64_t*)buf; + __int64_t readoff, readclient; while ((char*)p + 32 < buf + rdsize) { - boff = *p; - bgoff = (int)((char*)p - buf); + readoff = *p; + __int64_t wantoff = i*rdsize + (__int64_t)((char*)p - buf); p++; - bchunk = *p; + readclient = *p; p++; - bclient = *p; - p++; - bzero = *p; - p++; - if (boff != bgoff || - bchunk != (int)i || - bclient != client->get_nodeid() || - bzero != 0) { + if (readoff != wantoff || + readclient != client->get_nodeid()) { if (!bad) - dout(0) << "WARNING: wrong data from OSD, it should be " - << "(block=" << i - << " offset=" << bgoff - << " client=" << client->get_nodeid() << ")" - << " .. but i read back .. " - << "(block=" << bchunk - << " offset=" << boff - << " client=" << bclient << " zero=" << bzero << ")" << endl; - + dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient + << ", should be offset " << wantoff << " clietn " << client->get_nodeid() + << endl; bad++; } } if (bad) dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl; - } client->close(fd); diff --git a/branches/sage/cephmds2/client/fuse.cc b/branches/sage/cephmds2/client/fuse.cc index f4a1c2d3f77..2feb7472d1c 100644 --- a/branches/sage/cephmds2/client/fuse.cc +++ b/branches/sage/cephmds2/client/fuse.cc @@ -177,13 +177,12 @@ static int ceph_write(const char *path, const char *buf, size_t size, return client->write(fh, buf, size, offset); } -/* static int ceph_flush(const char *path, struct fuse_file_info *fi) { - fh_t fh = fi->fh; - return client->flush(fh); +//fh_t fh = fi->fh; + //return client->flush(fh); + return 0; } -*/ static int ceph_statfs(const char *path, struct statvfs *stbuf) @@ -227,7 +226,7 @@ static struct fuse_operations ceph_oper = { read: ceph_read, write: ceph_write, statfs: ceph_statfs, - flush: 0, //ceph_flush, + flush: ceph_flush, release: ceph_release, fsync: ceph_fsync }; diff --git a/branches/sage/cephmds2/crush/BinaryTree.h b/branches/sage/cephmds2/crush/BinaryTree.h index 4f8524bf4dd..f13f3f1e565 100644 --- a/branches/sage/cephmds2/crush/BinaryTree.h +++ b/branches/sage/cephmds2/crush/BinaryTree.h @@ -1,3 +1,16 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + #ifndef __crush_BINARYTREE_H #define __crush_BINARYTREE_H diff --git a/branches/sage/cephmds2/crush/Bucket.h b/branches/sage/cephmds2/crush/Bucket.h index cdae5bfce8a..5b2d3259e09 100644 --- a/branches/sage/cephmds2/crush/Bucket.h +++ b/branches/sage/cephmds2/crush/Bucket.h @@ -1,3 +1,16 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + #ifndef __crush_BUCKET_H #define __crush_BUCKET_H diff --git a/branches/sage/cephmds2/crush/Hash.h b/branches/sage/cephmds2/crush/Hash.h index cd3bb0a02cd..a321624925d 100644 --- a/branches/sage/cephmds2/crush/Hash.h +++ b/branches/sage/cephmds2/crush/Hash.h @@ -1,3 +1,16 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + // Robert Jenkins' function for mixing 32-bit values // http://burtleburtle.net/bob/hash/evahash.html diff --git a/branches/sage/cephmds2/crush/crush.h b/branches/sage/cephmds2/crush/crush.h index b1e245f1b6a..aa93031beb5 100644 --- a/branches/sage/cephmds2/crush/crush.h +++ b/branches/sage/cephmds2/crush/crush.h @@ -1,3 +1,16 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + #ifndef __crush_CRUSH_H #define __crush_CRUSH_H @@ -93,7 +106,7 @@ namespace crush { int bucketno; Hash h; - hash_map<int, int> parent_map; // what bucket each leaf/bucket lives in + hash_map<int, int> parent_map; // what bucket each leaf/bucket lives in public: map<int, Rule> rules; @@ -163,28 +176,28 @@ namespace crush { off += sizeof(r); rules[r]._decode(bl,off); } - - // index - build_parent_map(); + + // index + build_parent_map(); } - void build_parent_map() { - parent_map.clear(); - - // index every bucket - for (map<int, Bucket*>::iterator bp = buckets.begin(); - bp != buckets.end(); - ++bp) { - // index bucket items - vector<int> items; - bp->second->get_items(items); - for (vector<int>::iterator ip = items.begin(); - ip != items.end(); - ++ip) - parent_map[*ip] = bp->first; - } - } - + void build_parent_map() { + parent_map.clear(); + + // index every bucket + for (map<int, Bucket*>::iterator bp = buckets.begin(); + bp != buckets.end(); + ++bp) { + // index bucket items + vector<int> items; + bp->second->get_items(items); + for (vector<int>::iterator ip = items.begin(); + ip != items.end(); + ++ip) + parent_map[*ip] = bp->first; + } + } + public: @@ -280,25 +293,25 @@ namespace crush { vector<int>& outvec, bool firstn, set<int>& outset, map<int,float>& overloadmap, - bool forcefeed=false, - int forcefeedval=-1) { + bool forcefeed=false, + int forcefeedval=-1) { int off = outvec.size(); // for each replica for (int rep=0; rep<numrep; rep++) { int outv = -1; // my result - // forcefeed? - if (forcefeed) { - forcefeed = false; - outvec.push_back(forcefeedval); - continue; - } - + // forcefeed? + if (forcefeed) { + forcefeed = false; + outvec.push_back(forcefeedval); + continue; + } + // keep trying until we get a non-out, non-colliding item int ftotal = 0; bool skip_rep = false; - + while (1) { // start with the input bucket Bucket *in = inbucket; @@ -415,21 +428,21 @@ namespace crush { //int numresult = 0; result.clear(); - // determine hierarchical context for first. - list<int> force_stack; - if (forcefeed >= 0) { - int t = forcefeed; - while (1) { - force_stack.push_front(t); - if (parent_map.count(t) == 0) break; // reached root, presumably. - //cout << " " << t << " parent is " << parent_map[t] << endl; - t = parent_map[t]; - } - } - + // determine hierarchical context for first. + list<int> force_stack; + if (forcefeed >= 0) { + int t = forcefeed; + while (1) { + force_stack.push_front(t); + if (parent_map.count(t) == 0) break; // reached root, presumably. + //cout << " " << t << " parent is " << parent_map[t] << endl; + t = parent_map[t]; + } + } + // working vector vector<int> w; // working variable - + // go through each statement for (vector<RuleStep>::iterator pc = rule.steps.begin(); pc != rule.steps.end(); @@ -442,13 +455,13 @@ namespace crush { { const int arg = pc->args[0]; //cout << "take " << arg << endl; - - if (!force_stack.empty()) { - int forceval = force_stack.front(); - force_stack.pop_front(); - assert(arg == forceval); - } - + + if (!force_stack.empty()) { + int forceval = force_stack.front(); + force_stack.pop_front(); + assert(arg == forceval); + } + w.clear(); w.push_back(arg); } @@ -469,26 +482,26 @@ namespace crush { vector<int> out; // forcefeeding? - bool forcing = false; - int forceval; - if (!force_stack.empty()) { - forceval = force_stack.front(); - force_stack.pop_front(); - //cout << "priming out with " << forceval << endl; - forcing = true; - } - + bool forcing = false; + int forceval; + if (!force_stack.empty()) { + forceval = force_stack.front(); + force_stack.pop_front(); + //cout << "priming out with " << forceval << endl; + forcing = true; + } + // do each row independently for (vector<int>::iterator i = w.begin(); i != w.end(); i++) { assert(buckets.count(*i)); Bucket *b = buckets[*i]; - choose(x, numrep, type, b, out, firstn, - outset, overloadmap, - forcing, - forceval); - forcing = false; // only once + choose(x, numrep, type, b, out, firstn, + outset, overloadmap, + forcing, + forceval); + forcing = false; // only once } // for inrow // put back into w diff --git a/branches/sage/cephmds2/csyn.cc b/branches/sage/cephmds2/csyn.cc index b5e4892cb9a..0f95ee56b26 100644 --- a/branches/sage/cephmds2/csyn.cc +++ b/branches/sage/cephmds2/csyn.cc @@ -53,31 +53,44 @@ int main(int argc, char **argv, char *envp[]) { // start up network rank.start_rank(); - // start client - Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap); - client->init(); + list<Client*> clients; + list<SyntheticClient*> synclients; + + cout << "mounting and starting " << g_conf.num_client << " syn client(s)" << endl; + for (int i=0; i<g_conf.num_client; i++) { + // start client + Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap); + client->init(); - // start syntheticclient - SyntheticClient *syn = new SyntheticClient(client); + // start syntheticclient + SyntheticClient *syn = new SyntheticClient(client); - // start up fuse - // use my argc, argv (make sure you pass a mount point!) - cout << "mounting" << endl; - client->mount(); - - cout << "starting syn client" << endl; - syn->start_thread(); + client->mount(); + + syn->start_thread(); - // wait - syn->join_thread(); + clients.push_back(client); + synclients.push_back(syn); + } - // unmount - client->unmount(); - cout << "unmounted" << endl; - client->shutdown(); - - delete client; - + cout << "waiting for client(s) to finish" << endl; + while (!clients.empty()) { + Client *client = clients.front(); + SyntheticClient *syn = synclients.front(); + clients.pop_front(); + synclients.pop_front(); + + // wait + syn->join_thread(); + + // unmount + client->unmount(); + client->shutdown(); + + delete syn; + delete client; + } + // wait for messenger to finish rank.wait(); diff --git a/branches/sage/cephmds2/ebofs/BufferCache.cc b/branches/sage/cephmds2/ebofs/BufferCache.cc index fa48c08b18a..4ad22b3a5d0 100644 --- a/branches/sage/cephmds2/ebofs/BufferCache.cc +++ b/branches/sage/cephmds2/ebofs/BufferCache.cc @@ -213,6 +213,91 @@ int ObjectCache::find_tx(block_t start, block_t len, } +int ObjectCache::try_map_read(block_t start, block_t len) +{ + map<block_t, BufferHead*>::iterator p = data.lower_bound(start); + + block_t cur = start; + block_t left = len; + + if (p != data.begin() && + (p == data.end() || p->first > cur)) { + p--; // might overlap! + if (p->first + p->second->length() <= cur) + p++; // doesn't overlap. + } + + int num_missing = 0; + + while (left > 0) { + // at end? + if (p == data.end()) { + // rest is a miss. + vector<Extent> exv; + on->map_extents(cur, + left, // no prefetch here! + exv); + + num_missing += exv.size(); + left = 0; + cur = start+len; + break; + } + + if (p->first <= cur) { + // have it (or part of it) + BufferHead *e = p->second; + + if (e->is_clean() || + e->is_dirty() || + e->is_tx()) { + dout(20) << "try_map_read hit " << *e << endl; + } + else if (e->is_rx()) { + dout(20) << "try_map_read rx " << *e << endl; + num_missing++; + } + else if (e->is_partial()) { + dout(-20) << "try_map_read partial " << *e << endl; + num_missing++; + } + else { + dout(0) << "try_map_read got unexpected " << *e << endl; + assert(0); + } + + block_t lenfromcur = MIN(e->end() - cur, left); + cur += lenfromcur; + left -= lenfromcur; + p++; + continue; // more? + } else if (p->first > cur) { + // gap.. miss + block_t next = p->first; + vector<Extent> exv; + on->map_extents(cur, + MIN(next-cur, left), // no prefetch + exv); + + dout(-20) << "try_map_read gap of " << p->first-cur << " blocks, " + << exv.size() << " extents" << endl; + num_missing += exv.size(); + left -= (p->first - cur); + cur = p->first; + continue; // more? + } + else + assert(0); + } + + assert(left == 0); + assert(cur == start+len); + return num_missing; +} + + + + /* * map a range of blocks into buffer_heads. @@ -283,7 +368,7 @@ int ObjectCache::map_read(block_t start, block_t len, dout(20) << "map_read partial " << *e << endl; } else { - dout(0) << "map_read ??? " << *e << endl; + dout(0) << "map_read ??? got unexpected " << *e << endl; assert(0); } @@ -725,7 +810,7 @@ void BufferCache::bh_read(Onode *on, BufferHead *bh, block_t from) { dout(10) << "bh_read " << *on << " on " << *bh << endl; - if (bh->is_missing()) { + if (bh->is_missing()) { mark_rx(bh); } else { assert(bh->is_partial()); @@ -746,7 +831,7 @@ void BufferCache::bh_read(Onode *on, BufferHead *bh, block_t from) // this should be empty!! assert(bh->rx_ioh == 0); - dout(20) << "bh_read " << *bh << " from " << ex << endl; + dout(20) << "bh_read " << *on << " " << *bh << " from " << ex << endl; C_OC_RxFinish *fin = new C_OC_RxFinish(ebofs_lock, on->oc, bh->start(), bh->length(), @@ -792,7 +877,7 @@ void BufferCache::bh_write(Onode *on, BufferHead *bh, block_t shouldbe) if (shouldbe) assert(ex.length == 1 && ex.start == shouldbe); - dout(20) << "bh_write " << *bh << " to " << ex << endl; + dout(20) << "bh_write " << *on << " " << *bh << " to " << ex << endl; //assert(bh->tx_ioh == 0); diff --git a/branches/sage/cephmds2/ebofs/BufferCache.h b/branches/sage/cephmds2/ebofs/BufferCache.h index 84680973510..563b3e5791c 100644 --- a/branches/sage/cephmds2/ebofs/BufferCache.h +++ b/branches/sage/cephmds2/ebofs/BufferCache.h @@ -403,6 +403,8 @@ class ObjectCache { map<block_t, BufferHead*>& missing, // read these from disk map<block_t, BufferHead*>& rx, // wait for these to finish reading from disk map<block_t, BufferHead*>& partial); // (maybe) wait for these to read from disk + int try_map_read(block_t start, block_t len); // just tell us how many extents we're missing. + int map_write(block_t start, block_t len, interval_set<block_t>& alloc, diff --git a/branches/sage/cephmds2/ebofs/Ebofs.cc b/branches/sage/cephmds2/ebofs/Ebofs.cc index 468a18178bb..2008d1961bf 100644 --- a/branches/sage/cephmds2/ebofs/Ebofs.cc +++ b/branches/sage/cephmds2/ebofs/Ebofs.cc @@ -30,6 +30,7 @@ #define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ")." #define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ")." + char *nice_blocks(block_t b) { static char s[20]; @@ -1848,6 +1849,7 @@ int Ebofs::_is_cached(object_t oid, off_t off, size_t len) if (!on->have_oc()) { // nothing is cached. return # of extents in file. + dout(10) << "_is_cached have onode but no object cache, returning extent count" << endl; return on->extent_map.size(); } @@ -1860,8 +1862,10 @@ int Ebofs::_is_cached(object_t oid, off_t off, size_t len) map<block_t, BufferHead*> missing; // read these map<block_t, BufferHead*> rx; // wait for these map<block_t, BufferHead*> partials; // ?? - on->get_oc(&bc)->map_read(bstart, blen, hits, missing, rx, partials); - return missing.size() + rx.size() + partials.size(); + + int num_missing = on->get_oc(&bc)->try_map_read(bstart, blen); + dout(7) << "_is_cached try_map_read reports " << num_missing << " missing extents" << endl; + return num_missing; // FIXME: actually, we should calculate if these extents are contiguous. // and not using map_read, probably... diff --git a/branches/sage/cephmds2/mds/Capability.h b/branches/sage/cephmds2/mds/Capability.h index e011dbe43e8..25e658b0131 100644 --- a/branches/sage/cephmds2/mds/Capability.h +++ b/branches/sage/cephmds2/mds/Capability.h @@ -71,7 +71,7 @@ public: bool is_suppress() { return suppress; } void set_suppress(bool b) { suppress = b; } - bool is_null() { return cap_history.empty(); } + bool is_null() { return cap_history.empty() && wanted_caps == 0; } // most recently issued caps. int pending() { diff --git a/branches/sage/cephmds2/mds/Locker.cc b/branches/sage/cephmds2/mds/Locker.cc index 67441cc3cc4..7089176f16e 100644 --- a/branches/sage/cephmds2/mds/Locker.cc +++ b/branches/sage/cephmds2/mds/Locker.cc @@ -939,6 +939,8 @@ void Locker::inode_file_read_finish(CInode *in) bool Locker::inode_file_write_start(CInode *in, MClientRequest *m) { + dout(7) << "inode_file_write_start on " << *in << endl; + // can't write? if (!in->filelock.can_write(in->is_auth())) { @@ -992,7 +994,7 @@ bool Locker::inode_file_write_start(CInode *in, MClientRequest *m) void Locker::inode_file_write_finish(CInode *in) { // drop ref - assert(in->filelock.can_write(in->is_auth())); + //assert(in->filelock.can_write(in->is_auth())); in->filelock.put_write(); dout(7) << "inode_file_write_finish on " << *in << ", filelock=" << in->filelock << endl; @@ -1866,6 +1868,7 @@ void Locker::dentry_xlock_finish(CDentry *dn, bool quiet) mds->queue_finished(finished); } + /* * onfinish->finish() will be called with * 0 on successful xlock, diff --git a/branches/sage/cephmds2/mds/Server.cc b/branches/sage/cephmds2/mds/Server.cc index 04868664ac6..084b02d4384 100644 --- a/branches/sage/cephmds2/mds/Server.cc +++ b/branches/sage/cephmds2/mds/Server.cc @@ -2362,6 +2362,19 @@ void Server::handle_client_open(MClientRequest *req, return; } + // O_TRUNC + if (flags & O_TRUNC) { + // write + if (!mds->locker->inode_file_write_start(cur, req)) + return; // fw or (wait for) lock + + // do update + cur->inode.size = req->get_sizearg(); + cur->_mark_dirty(); // fixme + + mds->locker->inode_file_write_finish(cur); + } + // hmm, check permissions or something. diff --git a/branches/sage/cephmds2/mon/MonitorStore.cc b/branches/sage/cephmds2/mon/MonitorStore.cc index 55389973c8c..f5a10696c7a 100644 --- a/branches/sage/cephmds2/mon/MonitorStore.cc +++ b/branches/sage/cephmds2/mon/MonitorStore.cc @@ -156,14 +156,23 @@ int MonitorStore::get_bl_ss(bufferlist& bl, const char *a, const char *b) return 0; } - // read size - __int32_t len = 0; - ::read(fd, &len, sizeof(len)); - + // get size + struct stat st; + int rc = ::fstat(fd, &st); + assert(rc == 0); + __int32_t len = st.st_size; + // read buffer bl.clear(); bufferptr bp(len); - ::read(fd, bp.c_str(), len); + int off = 0; + while (off < len) { + dout(20) << "reading at off " << off << " of " << len << endl; + int r = ::read(fd, bp.c_str()+off, len-off); + if (r < 0) derr(0) << "errno on read " << strerror(errno) << endl; + assert(r>0); + off += r; + } bl.append(bp); ::close(fd); @@ -193,17 +202,20 @@ int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b) int fd = ::open(tfn, O_WRONLY|O_CREAT); assert(fd); - // write size - __int32_t len = bl.length(); - ::write(fd, &len, sizeof(len)); + // chmod + ::fchmod(fd, 0644); // write data for (list<bufferptr>::const_iterator it = bl.buffers().begin(); it != bl.buffers().end(); - it++) - ::write(fd, it->c_str(), it->length()); + it++) { + int r = ::write(fd, it->c_str(), it->length()); + if (r != (int)it->length()) + derr(0) << "put_bl_ss ::write() returned " << r << " not " << it->length() << endl; + if (r < 0) + derr(0) << "put_bl_ss ::write() errored out, errno is " << strerror(errno) << endl; + } - ::fchmod(fd, 0644); ::fsync(fd); ::close(fd); ::rename(tfn, fn); diff --git a/branches/sage/cephmds2/osd/OSDMap.h b/branches/sage/cephmds2/osd/OSDMap.h index 48e080eeecc..163c14e65ed 100644 --- a/branches/sage/cephmds2/osd/OSDMap.h +++ b/branches/sage/cephmds2/osd/OSDMap.h @@ -306,6 +306,9 @@ private: type = PG_TYPE_STARTOSD; } break; + + default: + assert(0); } // construct final PG