*** empty log message ***

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@52 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sage 2004-07-29 20:50:05 +00:00
parent a539bcfafc
commit cdb047631d
19 changed files with 431 additions and 146 deletions

View File

@ -1,3 +1,4 @@
// random crap
#define NUMMDS 30
#define NUMOSD 10
@ -12,10 +13,10 @@
#define MAX_TRIMMING 16 // max events to be retiring simultaneously
#define LOGSTREAM_READ_INC 4096 // make this bigger than biggest event
//#define FAKE_CLOCK
#define FAKE_CLOCK
#define NUMCLIENT 100
#define CLIENT_REQUESTS 1000
#define NUMCLIENT 1000
#define CLIENT_REQUESTS 100
#define DEBUG_LEVEL 10

View File

@ -31,7 +31,7 @@ typedef __uint64_t inodeno_t; // ino
typedef __uint64_t mdloc_t; // dir locator?
struct inode_t {
inodeno_t ino;
inodeno_t ino; // NOTE: this must come first
__uint32_t touched;
__uint64_t size;

View File

@ -80,6 +80,53 @@ int CDir::dentry_authority(string& dn, MDCluster *mdc)
}
// state
crope CDir::encode_basic_state()
{
crope r;
// dir rep
r.append((char*)&dir_rep, sizeof(int));
// dir_rep_by
int n = dir_rep_by.size();
r.append((char*)&n, sizeof(int));
for (set<int>::iterator it = dir_rep_by.begin();
it != dir_rep_by.end();
it++) {
int j = *it;
r.append((char*)&j, sizeof(j));
}
return r;
}
int CDir::decode_basic_state(crope r, int off)
{
// dir_rep
r.copy(off, sizeof(int), (char*)&dir_rep);
off += sizeof(int);
// dir_rep_by
int n;
r.copy(off, sizeof(int), (char*)&n);
off += sizeof(int);
for (int i=0; i<n; i++) {
int j;
r.copy(off, sizeof(int), (char*)&j);
dir_rep_by.insert(j);
off += sizeof(int);
}
return off;
}
// wiating
void CDir::add_waiter(string& dentry,
@ -138,7 +185,7 @@ void CDir::take_waiting(list<Context*>& ls)
void CDir::add_hard_pin_waiter(Context *c) {
if (state & CDIR_MASK_FROZEN)
if (state_test(CDIR_STATE_FROZEN))
add_waiter(c);
else
inode->parent->dir->add_hard_pin_waiter(c);
@ -188,7 +235,7 @@ bool CDir::is_frozen()
bool CDir::is_freezing()
{
if (state & CDIR_MASK_FREEZING)
if (state_test(CDIR_STATE_FREEZING))
return true;
if (inode->parent)
return inode->parent->dir->is_freezing();
@ -207,12 +254,12 @@ void CDir::add_freeze_waiter(Context *c)
void CDir::freeze(Context *c)
{
assert((state & (CDIR_MASK_FROZEN|CDIR_MASK_FREEZING)) == 0);
assert((state_test(CDIR_STATE_FROZEN|CDIR_STATE_FREEZING)) == 0);
if (hard_pinned + nested_hard_pinned == 0) {
cout << "freeze " << *inode << endl;
state_set(CDIR_MASK_FROZEN);
state_set(CDIR_STATE_FROZEN);
inode->hard_pin(); // hard_pin for duration of freeze
// easy, we're frozen
@ -220,7 +267,7 @@ void CDir::freeze(Context *c)
delete c;
} else {
state_set(CDIR_MASK_FREEZING);
state_set(CDIR_STATE_FREEZING);
cout << "freeze + wait " << *inode << endl;
// need to wait for pins to expire
waiting_to_freeze.push_back(c);
@ -236,8 +283,8 @@ void CDir::freeze_finish()
Context *c = waiting_to_freeze.front();
waiting_to_freeze.pop_front();
if (waiting_to_freeze.empty())
state_clear(CDIR_MASK_FREEZING);
state_set(CDIR_MASK_FROZEN);
state_clear(CDIR_STATE_FREEZING);
state_set(CDIR_STATE_FROZEN);
if (c) {
c->finish(0);
@ -248,7 +295,7 @@ void CDir::freeze_finish()
void CDir::unfreeze() // thaw?
{
cout << "unfreeze " << *inode << endl;
state_clear(CDIR_MASK_FROZEN);
state_clear(CDIR_STATE_FROZEN);
inode->hard_unpin();
list<Context*> finished;
@ -280,9 +327,9 @@ void CDir::dump(int depth) {
iter++;
}
if (!(state & CDIR_MASK_COMPLETE))
if (!(state_test(CDIR_STATE_COMPLETE)))
cout << ind << "..." << endl;
if (state & CDIR_MASK_DIRTY)
if (state_test(CDIR_STATE_DIRTY))
cout << ind << "[dirty]" << endl;
}

View File

@ -6,15 +6,15 @@
#include "include/DecayCounter.h"
#include <map>
#include <ext/hash_map>
#include <string>
#include <iostream>
#include <cassert>
#include <ext/rope>
#include <list>
#include <set>
#include <map>
#include <ext/hash_map>
#include <string>
using namespace std;
class CInode;
@ -24,13 +24,19 @@ class MDCluster;
class Context;
// state bits
#define CDIR_MASK_COMPLETE 1 // the complete contents are in cache
#define CDIR_MASK_COMPLETE_LOCK 2 // complete contents are in cache, and locked that way! (not yet implemented)
#define CDIR_MASK_DIRTY 4 // has been modified since last commit
#define CDIR_MASK_MID_COMMIT 8 // mid-commit
#define CDIR_STATE_COMPLETE 1 // the complete contents are in cache
#define CDIR_STATE_COMPLETE_LOCK 2 // complete contents are in cache, and locked that way! (not yet implemented)
#define CDIR_STATE_DIRTY 4 // has been modified since last commit
#define CDIR_STATE_MID_COMMIT 8 // mid-commit
#define CDIR_MASK_FROZEN 16 // root of a freeze
#define CDIR_MASK_FREEZING 32 // in process of freezing
#define CDIR_STATE_FROZEN 16 // root of a freeze
#define CDIR_STATE_FREEZING 32 // in process of freezing
#define CDIR_STATE_FETCHING 64 // currenting fetching
// these state bits are preserved by an import/export
#define CDIR_MASK_STATE_EXPORTED (CDIR_STATE_COMPLETE\
|CDIR_STATE_DIRTY)
#define CDIR_MASK_STATE_EXPORT_KEPT 0
// common states
#define CDIR_STATE_CLEAN 0
@ -108,13 +114,38 @@ class CDir {
void reset_state(unsigned s) { state = s; }
void state_clear(unsigned mask) { state &= ~mask; }
void state_set(unsigned mask) { state |= mask; }
unsigned state_test(unsigned mask) { state & mask; }
bool is_complete() { return state & CDIR_MASK_COMPLETE; }
bool is_freeze_root() { return state & CDIR_MASK_FROZEN; }
bool is_complete() { return state & CDIR_STATE_COMPLETE; }
bool is_freeze_root() { return state & CDIR_STATE_FROZEN; }
// dirtyness
// invariant: if clean, my version >= all inode versions
__uint64_t get_version() {
return version;
}
//void touch_version() { version++; }
void float_version(__uint64_t ge) {
if (version < ge)
version = ge;
}
void mark_dirty() {
if (!state_test(CDIR_STATE_DIRTY)) {
version++;
state_set(CDIR_STATE_DIRTY);
}
}
void mark_clean() {
state_clear(CDIR_STATE_DIRTY);
}
bool is_clean() {
return !state_test(CDIR_STATE_DIRTY);
}
void hit();
crope encode_basic_state();
int decode_basic_state(crope r, int off=0);
// waiters
@ -141,13 +172,6 @@ class CDir {
// version
__uint64_t get_version() {
return version;
}
void touch_version() {
version++;
}
CInode *get_inode() { return inode; }

View File

@ -40,7 +40,7 @@ CInode::CInode() : LRUObject() {
nested_hard_pinned = 0;
// state = 0;
mid_fetch = false;
auth = true; // by default.
}
CInode::~CInode() {
@ -84,6 +84,79 @@ void CInode::hit()
}
void CInode::mark_dirty() {
if (!ref_set.count(CINODE_PIN_DIRTY))
get(CINODE_PIN_DIRTY);
if (parent) {
// dir is now dirty (if it wasn't already)
parent->dir->mark_dirty();
if (parent->dir->get_version() >= version)
version = parent->dir->get_version(); // we're as dirty as the dir
else {
version++;
parent->dir->float_version(version); // dir is at least as dirty as us.
}
} else
version++; // i'm root.
}
// state
crope CInode::encode_basic_state()
{
crope r;
// inode
r.append((char*)&inode, sizeof(inode));
// cached_by
int n = cached_by.size();
r.append((char*)&n, sizeof(int));
for (set<int>::iterator it = cached_by.begin();
it != cached_by.end();
it++) {
int j = *it;
r.append((char*)&j, sizeof(j));
}
// dir_auth
r.append((char*)&dir_auth, sizeof(int));
return r;
}
int CInode::decode_basic_state(crope r, int off)
{
// inode
r.copy(0,sizeof(inode_t), (char*)&inode);
off += sizeof(inode_t);
// cached_by --- although really this is rep_by,
// since we're non-authoritative
int n;
r.copy(off, sizeof(int), (char*)&n);
off += sizeof(int);
cached_by.clear();
for (int i=0; i<n; i++) {
int j;
r.copy(off, sizeof(int), (char*)&j);
cached_by.insert(j);
off += sizeof(int);
}
// dir_auth
r.copy(off, sizeof(int), (char*)&dir_auth);
off += sizeof(int);
return off;
}
// waiting
void CInode::add_write_waiter(Context *c) {

View File

@ -13,6 +13,7 @@
#include <list>
#include <vector>
#include <set>
#include <ext/rope>
#include <iostream>
using namespace std;
@ -68,10 +69,9 @@ class CInode : LRUObject {
int dir_auth; // authority for child dir
protected:
int ref; // reference count (???????)
int ref; // reference count
set<int> ref_set;
__uint32_t version;
__uint64_t version;
// parent dentries in cache
int nparents;
@ -81,13 +81,14 @@ class CInode : LRUObject {
// dcache lru
CInode *lru_next, *lru_prev;
// used by MDStore
bool mid_fetch;
// distributed caching
set<int> cached_by; // mds's that cache me. not well defined on replicas.
//unsigned state;
//set<int> sync_waiting_for_ack;
bool auth; // safety check; true if this is authoritative.
set<int> cached_by; // mds's that cache me.
/* NOTE: on replicas, this doubles as replicated_by, but the
cached_by_* access methods below should NOT be used in those
cases, as the semantics are different! */
//
private:
// waiters
@ -101,32 +102,35 @@ class CInode : LRUObject {
public:
DecayCounter popularity;
friend class MDCache;
friend class CDir;
friend class MDStore;
friend class MDS;
friend class MDiscover;
public:
CInode();
~CInode();
CInode *get_parent_inode();
CInode *get_realm_root(); // import, hash, or root
// fun
bool is_dir() { return inode.isdir; }
void make_path(string& s);
bool is_root() { return (bool)(!parent); }
bool is_auth() { return auth; }
inodeno_t ino() { return inode.ino; }
void make_path(string& s);
void hit();
void mark_dirty() {
if (!ref_set.count(CINODE_PIN_DIRTY))
get(CINODE_PIN_DIRTY);
// dirtyness
__uint64_t get_version() { return version; }
void float_version(__uint64_t ge) {
if (version < ge)
version = ge;
}
//void touch_version(); // mark dirty instead.
void mark_dirty();
void mark_clean() {
if (ref_set.count(CINODE_PIN_DIRTY))
put(CINODE_PIN_DIRTY);
@ -134,8 +138,48 @@ class CInode : LRUObject {
bool is_dirty() {
return ref_set.count(CINODE_PIN_DIRTY);
}
bool is_clean() {
return !ref_set.count(CINODE_PIN_DIRTY);
}
inodeno_t ino() { return inode.ino; }
// state
crope encode_basic_state();
int decode_basic_state(crope r, int off=0);
// cached_by -- to be used ONLY when we're authoritative!
bool is_cached_by_anyone() {
return !cached_by.empty();
}
bool is_cached_by(int mds) {
return cached_by.count(mds);
}
void cached_by_add(int mds) {
if (is_cached_by(mds)) return;
if (cached_by.empty())
get(CINODE_PIN_CACHED);
cached_by.insert(mds);
}
void cached_by_remove(int mds) {
if (!is_cached_by(mds)) return;
cached_by.erase(mds);
if (cached_by.empty())
put(CINODE_PIN_CACHED);
}
void cached_by_clear() {
if (cached_by.size())
put(CINODE_PIN_CACHED);
cached_by.clear();
}
set<int>::iterator cached_by_begin() {
return cached_by.begin();
}
set<int>::iterator cached_by_end() {
return cached_by.end();
}
set<int>& get_cached_by() {
return cached_by;
}
// state
/*
@ -156,7 +200,6 @@ class CInode : LRUObject {
}
*/
__uint32_t get_version() { return version; }
// dist cache
int authority(MDCluster *mdc);
@ -202,6 +245,9 @@ class CInode : LRUObject {
ref_set.insert(by);
cout << " get " << *this << " by " << by << " now " << ref << " (" << ref_set << ")" << endl;
}
bool is_pinned_by(int by) {
return ref_set.count(by);
}
// --- hierarchy stuff
void add_parent(CDentry *p);

View File

@ -57,8 +57,12 @@ int LogStream::read_next(LogEvent **le, Context *c, int step)
// does buffer have what we want?
if (buf_start > cur_pos ||
buf_start+buffer.length() < cur_pos+4) {
// make sure block is being read
if (reading_block) {
dout(5) << "read_next already reading log head from disk, offset " << cur_pos << endl;
assert(0);
//waiting_for_read_block.push_back(new C_LS_ReadNext(this, le, c));
} else {
dout(5) << "read_next reading log head from disk, offset " << cur_pos << endl;
// nope. read a chunk
@ -109,5 +113,21 @@ int LogStream::read_next(LogEvent **le, Context *c, int step)
c->finish(0);
delete c;
}
/*
// any other waiters too!
list<Context*> finished = waiting_for_read_block;
waiting_for_read_block.clear();
for (list<Context*>::iterator it = finished.begin();
it != finished.end();
it++) {
Context *c = *it;
if (c) {
c->finish(0);
delete c;
}
}
*/
}
}

View File

@ -17,6 +17,8 @@ class LogStream {
object_t oid;
bool reading_block;
//list<Context*> waiting_for_read_block;
crope buffer;
off_t buf_start;
public:

View File

@ -134,7 +134,7 @@ bool MDCache::trim(__int32_t max) {
if (idir) {
// dir incomplete!
idir->dir->state_clear(CDIR_MASK_COMPLETE);
idir->dir->state_clear(CDIR_STATE_COMPLETE);
// reexport?
if (imports.count(idir) && // import
@ -183,6 +183,32 @@ bool MDCache::shutdown_pass()
} else {
dout(7) << "log is empty; flushing cache" << endl;
trim(0);
if (mds->get_nodeid() == 0) {
// unpin inodes on shut down nodes.
// NOTE: this happens when they expire during an export; expires reference inodes, and can thus
// be missed.
bool didsomething = false;
for (hash_map<inodeno_t, CInode*>::iterator it = inode_map.begin();
it != inode_map.end();
it++) {
CInode *in = it->second;
if (in->is_auth() &&
in->is_cached_by_anyone()) {
for (set<int>::iterator by = in->cached_by.begin();
by != in->cached_by.end();
by++) {
if (mds->is_shut_down(*by)) {
in->cached_by_remove(*by);
didsomething = true;
}
}
}
}
if (didsomething)
trim(0);
}
}
dout(7) << "cache size now " << lru->lru_get_size() << endl;
@ -293,6 +319,10 @@ int MDCache::link_inode( CInode *parent, string& dname, CInode *in )
// add to dir
parent->dir->add_child(dn);
// fix up versions
parent->dir->float_version(inode->get_version()); // unlikely
in->float_version(in->get_version()); // likely
return 0;
}
@ -465,7 +495,7 @@ int MDCache::write_start(CInode *in, Message *m)
if (auth == whoami) {
// we are the authority.
if (in->cached_by.size() == 0) {
if (!in->cached_by_anyone()) {
// it's just us!
in->sync_set(CINODE_SYNC_LOCK);
in->get();
@ -481,7 +511,7 @@ int MDCache::write_start(CInode *in, Message *m)
// send sync_start
set<int>::iterator it;
for (it = in->cached_by.begin(); it != in->cached_by.end(); it++) {
for (it = in->cached_by_begin(); it != in->cached_by_end(); it++) {
mds->messenger->send_message(new MInodeSyncStart(in->inode.ino, auth),
MSG_ADDR_MDS(*it), MDS_PORT_CACHE,
MDS_PORT_CACHE);
@ -508,7 +538,7 @@ int MDCache::write_finish(CInode *in)
in->put(); // unpin
//
if (in->cached_by.size()) {
if (in->cached_by_anyone()) {
// release
set<int>::iterator it;
for (it = in->cached_by.begin(); it != in->cached_by.end(); it++) {
@ -747,7 +777,8 @@ int MDCache::handle_discover(MDiscover *dis)
root->dir = new CDir(root);
root->dir->dir_rep = trace[0].dir_rep;
root->dir->dir_rep_by = trace[0].dir_rep_by;
root->auth = false;
set_root( root );
opening_root = false;
@ -812,6 +843,7 @@ int MDCache::handle_discover(MDiscover *dis)
in->dir->dir_rep = trace[i].dir_rep;
in->dir->dir_rep_by = trace[i].dir_rep_by;
}
in->auth = false;
// link in
add_inode( in );
@ -864,9 +896,7 @@ int MDCache::handle_discover(MDiscover *dis)
CInode *root = get_root();
dis->add_bit( root, 0 );
if (root->cached_by.empty())
root->get(CINODE_PIN_CACHED);
root->cached_by.insert( dis->get_asker() );
root->cached_by_add(dis->get_asker());
}
// add bits
@ -899,9 +929,7 @@ int MDCache::handle_discover(MDiscover *dis)
dis->add_bit( next, whoami );
// remember who is caching this!
if (next->cached_by.empty())
next->get(CINODE_PIN_CACHED);
next->cached_by.insert( dis->get_asker() );
next->cached_by_add( dis->get_asker() );
cur = next; // continue!
} else {
@ -951,13 +979,12 @@ int MDCache::handle_discover(MDiscover *dis)
int MDCache::send_inode_updates(CInode *in)
{
set<int>::iterator it;
for (it = in->cached_by.begin(); it != in->cached_by.end(); it++) {
for (set<int>::iterator it = in->cached_by_begin();
it != in->cached_by_end();
it++) {
dout(7) << "sending inode_update on " << *in << " to " << *it << endl;
assert(*it != mds->get_nodeid());
mds->messenger->send_message(new MInodeUpdate(in->inode,
in->cached_by,
in->dir_auth),
mds->messenger->send_message(new MInodeUpdate(in),
MSG_ADDR_MDS(*it), MDS_PORT_CACHE,
MDS_PORT_CACHE);
}
@ -968,11 +995,11 @@ int MDCache::send_inode_updates(CInode *in)
void MDCache::handle_inode_update(MInodeUpdate *m)
{
CInode *in = get_inode(m->get_inode().ino);
CInode *in = get_inode(m->get_ino());
if (!in) {
dout(7) << "got inode_update on " << m->get_inode().ino << ", don't have it, sending expire" << endl;
dout(7) << "got inode_update on " << m->get_ino() << ", don't have it, sending expire" << endl;
mds->messenger->send_message(new MInodeExpire(m->get_inode().ino, mds->get_nodeid(), true),
mds->messenger->send_message(new MInodeExpire(m->get_ino(), mds->get_nodeid(), true),
m->get_source(), MDS_PORT_CACHE,
MDS_PORT_CACHE);
@ -989,9 +1016,7 @@ void MDCache::handle_inode_update(MInodeUpdate *m)
// update!
dout(7) << "got inode_update on " << *in << endl;
in->inode = m->get_inode();
in->cached_by = m->get_cached_by();
in->dir_auth = m->get_dir_auth();
in->decode_basic_state(m->get_payload());
// done
delete m;
@ -1016,15 +1041,13 @@ void MDCache::handle_inode_expire(MInodeExpire *m)
}
// remove from our cached_by
if (!in->cached_by.count(from)) {
if (!in->is_cached_by(from)) {
dout(7) << "got inode_expire on " << *in << " from mds" << from << ", but they're not in cached_by "<< in->cached_by << endl;
goto out;
}
dout(7) << "got inode_expire on " << *in << " from mds" << from << " cached_by now " << in->cached_by << endl;
in->cached_by.erase(from);
if (in->cached_by.empty())
in->put(CINODE_PIN_CACHED);
in->cached_by_remove(from);
// done
@ -1057,9 +1080,12 @@ void MDCache::handle_inode_expire(MInodeExpire *m)
int MDCache::send_dir_updates(CDir *dir, int except)
{
// FIXME
int whoami = mds->get_nodeid();
for (set<int>::iterator it = dir->inode->cached_by.begin();
it != dir->inode->cached_by.end();
for (set<int>::iterator it = dir->inode->cached_by_begin();
it != dir->inode->cached_by_end();
it++) {
if (*it == whoami) continue;
if (*it == except) continue;
@ -1487,6 +1513,7 @@ void MDCache::export_dir_walk(MExportDir *req,
} else
istate.dir_auth = -1;
// cached_by
dir_rope.append( (char*)&istate, sizeof(istate) );
for (set<int>::iterator it = in->cached_by.begin();
@ -1496,12 +1523,11 @@ void MDCache::export_dir_walk(MExportDir *req,
dir_rope.append( (char*)&i, sizeof(int) );
}
// unpin cached_by
if (!in->cached_by.empty()) {
in->cached_by.clear(); // only get to do this once, because we're newly non-authoritative.
in->put(CINODE_PIN_CACHED); // non-authorities are not allowed to pin this!
}
// clear/unpin cached_by (we're no longer the authority)
in->cached_by_clear();
assert(in->auth == true);
in->auth = false;
// other state too!.. open files, etc...
@ -1549,6 +1575,10 @@ void MDCache::export_dir_purge(CInode *idir, int newauth)
{
dout(7) << "export_dir_purge on " << *idir << endl;
// discard most dir state
idir->dir->state &= CDIR_MASK_STATE_EXPORT_KEPT; // i only retain a few things.
// contents:
CDir_map_t::iterator it = idir->dir->begin();
while (it != idir->dir->end()) {
CInode *in = it->second->inode;
@ -1557,9 +1587,6 @@ void MDCache::export_dir_purge(CInode *idir, int newauth)
if (in->is_dir() && in->dir)
export_dir_purge(in, newauth);
// dir incomplete!
in->parent->dir->state_clear(CDIR_MASK_COMPLETE);
dout(7) << "sending inode_expire to mds" << newauth << " on " << *in << endl;
mds->messenger->send_message(new MInodeExpire(in->inode.ino, mds->get_nodeid()),
MSG_ADDR_MDS(newauth), MDS_PORT_CACHE,
@ -1672,9 +1699,6 @@ void MDCache::handle_export_dir(MExportDir *m)
if (in->authority(mds->get_cluster()) == in->dir_auth)
in->dir_auth = CDIR_AUTH_PARENT;
// ignore "frozen" state of the main dir; it's from the authority
in->dir->state_clear(CDIR_MASK_FROZEN);
double newpop = m->get_ipop() - in->popularity.get();
dout(7) << " imported popularity jump by " << newpop << endl;
if (newpop > 0) { // duh
@ -1747,7 +1771,7 @@ void MDCache::import_dir_block(pchar& p,
if (!idir->dir) idir->dir = new CDir(idir);
idir->dir->version = dstate->version;
idir->dir->state = dstate->state;
idir->dir->state = dstate->state & CDIR_MASK_STATE_EXPORTED; // we only import certain state
idir->dir->dir_rep = dstate->dir_rep;
idir->dir->popularity = dstate->popularity;
@ -1782,6 +1806,9 @@ void MDCache::import_dir_block(pchar& p,
} else {
dout(7) << " import_dir_block already had " << *in << endl;
in->inode = istate->inode;
assert(in->auth == false);
in->auth = true;
}
// update inode state with authoritative info
@ -1793,17 +1820,14 @@ void MDCache::import_dir_block(pchar& p,
p += sizeof(*istate);
in->cached_by.clear();
in->cached_by.clear(); // HACK i'm cheating...
for (int nby = istate->ncached_by; nby>0; nby--) {
if (*((int*)p) != mds->get_nodeid())
in->cached_by.insert( *((int*)p) );
in->cached_by_add( *((int*)p) );
p += sizeof(int);
}
in->cached_by.insert(oldauth); // old auth still has it too!
if (in->cached_by.size())
in->get(CINODE_PIN_CACHED); // pin bc of cached_by
in->cached_by_add(oldauth); // old auth still has it too!
// other state? ... ?

View File

@ -6,6 +6,9 @@
#include <iostream>
using namespace std;
#include <sys/types.h>
#include <unistd.h>
MDCluster::MDCluster(int num_mds, int num_osd)
{
this->num_mds = num_mds;
@ -63,5 +66,5 @@ int MDCluster::get_log_osd(int mds)
object_t MDCluster::get_log_oid(int mds)
{
return 1000 + mds;
return ((object_t)1000*(object_t)getpid()) + (object_t)mds;
}

View File

@ -135,6 +135,11 @@ int MDLog::trim(Context *c)
void MDLog::trim_readnext()
{
if (trim_reading) {
dout(10) << "trim_readnext already reading." << endl;
return;
}
dout(10) << "trim_readnext" << endl;
trim_reading = true;
C_MDL_Trim *readfin = new C_MDL_Trim(this);

View File

@ -416,7 +416,7 @@ MClientReply *MDS::handle_client_touch(MClientRequest *req,
// do update
cur->inode.mtime++; // whatever
cur->inode.touched++;
cur->version++;
cur->mark_dirty();
// tell replicas
mdcache->send_inode_updates(cur);

View File

@ -121,7 +121,11 @@ class MDS : public Dispatcher {
mds_load_t get_load();
bool is_shutting_down() { return shutting_down; }
bool is_shut_down() { return shut_down; }
bool is_shut_down(int who=-1) {
if (who<0)
return shut_down;
return did_shut_down.count(who);
}
int init();
int shutdown_start();

View File

@ -57,16 +57,18 @@ class MDFetchDirContext : public Context {
bool MDStore::fetch_dir( CInode *in,
Context *c )
{
assert(in->is_auth());
dout(7) << "fetch_dir " << in->inode.ino << " context is " << c << endl;
if (c)
in->dir->add_waiter(c);
// already fetching?
if (in->mid_fetch) {
if (in->dir->state_test(CDIR_STATE_FETCHING)) {
dout(7) << "already fetching " << in->inode.ino << "; waiting" << endl;
return true;
}
in->mid_fetch = true;
in->dir->set_state(CDIR_STATE_FETCHING);
// create return context
MDFetchDirContext *fin = new MDFetchDirContext( this, in->ino() );
@ -165,14 +167,14 @@ bool MDStore::fetch_dir_2( int result,
}
// dir is now complete
idir->dir->state_set(CDIR_MASK_COMPLETE);
idir->dir->state_set(CDIR_STATE_COMPLETE);
}
// finish
list<Context*> finished;
idir->dir->take_waiting(finished);
idir->mid_fetch = false;
idir->dir->state_cleaer(CDIR_STATE_FETCHING);
list<Context*>::iterator it = finished.begin();
while (it != finished.end()) {
@ -269,6 +271,8 @@ public:
bool MDStore::commit_dir( CInode *in,
Context *c )
{
assert(in->is_auth());
// already committing?
if (in->dir->get_state() & CDIR_MASK_MID_COMMIT) {
// already mid-commit!
@ -342,10 +346,20 @@ bool MDStore::commit_dir_2( int result,
// is the dir now clean?
if (committed_version == in->dir->get_version()) {
in->dir->state_clear(CDIR_MASK_DIRTY); // clear dirty bit
mark_clean();
}
in->dir->state_clear(CDIR_MASK_MID_COMMIT);
// mark inodes clean too (if we committed them!)
for (CDir_map_t::iterator it = in->dir->begin();
it != in->dir->end();
it++) {
if (it->second->get_version() <= committed_version) {
assert(it->second->is_dirty());
it->second->mark_clean();
}
}
// unpin
in->dir->hard_unpin();

View File

@ -8,6 +8,46 @@
#include "../MDCache.h"
#include "../MDStore.h"
/* so we can verify the inode is in fact flushed to disk
after a commit_dir finishes (the commit could have started before
and been in progress when we asked. */
class C_EIU_VerifyInodeUpdate : public Context {
MDS *mds;
inodeno_t ino;
__uint64_t version;
Context *fin;
public:
C_EIU_VerifyInodeUpdate(MDS *mds, inodeno_t ino, __uint64_t version, Context *fin) {
this->mds = mds;
this->ino = ino;
this->version = version;
this->fin = fin;
}
virtual void finish(int r) {
CInode *in = mds->mdcache->get_inode(ino);
if (in) {
// make sure it's clean, or a different version.
if (in->is_dirty() &&
in->get_version() == version) {
cout << "ARGH, did EInodeUpdate commit but inode is still dirty" << endl;
// damnit
mds->mdstore->commit_dir(in->get_parent_inode(),
new C_EIU_VerifyInodeUpdate(mds,
in->ino(),
in->get_version(),
fin));
return;
}
}
// we're fine.
if (fin) {
fin->finish(0);
delete fin;
}
}
};
class EInodeUpdate : public LogEvent {
protected:
inode_t inode;
@ -55,7 +95,10 @@ class EInodeUpdate : public LogEvent {
// okay!
cout << "commiting containing dir for " << inode.ino << endl;
mds->mdstore->commit_dir(parent,
c);
new C_EIU_VerifyInodeUpdate(mds,
in->ino(),
in->get_version(),
c));
} else {
// oh, i'm the root inode
cout << "don't know how to commit the root inode" << endl;

View File

@ -12,9 +12,9 @@ using namespace std;
struct MDiscoverRec_t {
inode_t inode;
set<int> cached_by;
int dir_auth;
// dir stuff
int dir_auth;
int dir_rep;
set<int> dir_rep_by;
@ -159,7 +159,7 @@ class MDiscover : public Message {
MDiscoverRec_t bit;
bit.inode = in->inode;
bit.cached_by = in->cached_by;
bit.cached_by = in->get_cached_by();
bit.cached_by.insert( auth ); // obviously the authority has it too
bit.dir_auth = in->dir_auth;
if (in->is_dir() && in->dir) {

View File

@ -34,6 +34,7 @@ class MInodeExpire : public Message {
virtual crope get_payload() {
crope s;
s.append((char*)&st,sizeof(st));
return s;
}
};

View File

@ -6,49 +6,27 @@
#include <set>
using namespace std;
typedef struct {
inode_t inode;
int dir_auth;
int ncached_by;
} MInodeUpdate_st;
class MInodeUpdate : public Message {
MInodeUpdate_st st;
set<int> cached_by;
crope inode_basic_state;
public:
inode_t& get_inode() { return st.inode; }
set<int>& get_cached_by() { return cached_by; }
int get_dir_auth() { return st.dir_auth; }
inodeno_t get_ino() {
inodeno_t ino = inode_basic_state.copy(0, sizeof(inodeno_t), (char*)&ino);
return ino;
}
MInodeUpdate() {}
MInodeUpdate(inode_t& inode, set<int>cached_by, int dir_auth) :
MInodeUpdate(CInode *in) :
Message(MSG_MDS_INODEUPDATE) {
this->st.inode = inode;
this->st.dir_auth = dir_auth;
this->cached_by = cached_by;
inode_basic_state = in->encode_basic_state();
}
virtual char *get_type_name() { return "iup"; }
virtual char *get_type_name() { return "Iup"; }
virtual int decode_payload(crope s) {
s.copy(0, sizeof(st), (char*)&st);
for (int i=0; i<st.ncached_by; i++) {
int j;
s.copy(sizeof(st) + i*sizeof(int), sizeof(int), (char*)&j);
cached_by.insert(j);
}
inode_basic_state = s;
}
virtual crope get_payload() {
crope s;
st.ncached_by = cached_by.size();
s.append((char*)&st, sizeof(st));
for (set<int>::iterator it = cached_by.begin();
it != cached_by.end();
it++) {
int j = *it;
s.append((char*)&j, sizeof(int));
}
return s;
return inode_basic_state;
}
};

View File

@ -17,7 +17,7 @@
using namespace std;
#define SERIALIZE
//#define SERIALIZE
#include "include/config.h"