mirror of
https://github.com/ceph/ceph
synced 2025-01-03 17:42:36 +00:00
progress
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@35 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
4405bf66d3
commit
a8ab173d4a
@ -3,6 +3,7 @@
|
||||
#define __DECAYCOUNTER_H
|
||||
|
||||
#include <math.h>
|
||||
#include "include/Clock.h"
|
||||
|
||||
class DecayCounter {
|
||||
protected:
|
||||
@ -34,8 +35,7 @@ class DecayCounter {
|
||||
}
|
||||
|
||||
double getnow() {
|
||||
// ??
|
||||
return 1.0;
|
||||
return g_clock.gettime();
|
||||
}
|
||||
|
||||
void reset() {
|
||||
@ -46,7 +46,7 @@ class DecayCounter {
|
||||
void decay() {
|
||||
double tnow = getnow();
|
||||
double el = tnow - last_decay;
|
||||
if (el > .1) {
|
||||
if (el > .5) {
|
||||
val = val * exp(el * k);
|
||||
last_decay = tnow;
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ class LRUObject {
|
||||
protected:
|
||||
LRUObject *lru_next, *lru_prev;
|
||||
bool lru_in_top;
|
||||
bool lru_in_lru;
|
||||
bool lru_expireable;
|
||||
|
||||
public:
|
||||
@ -76,6 +77,7 @@ class LRU {
|
||||
lru_tophead = o;
|
||||
lru_ntop++;
|
||||
lru_num++;
|
||||
o->lru_in_lru = true;
|
||||
|
||||
lru_adjust();
|
||||
}
|
||||
@ -93,6 +95,7 @@ class LRU {
|
||||
lru_bothead = o;
|
||||
lru_nbot++;
|
||||
lru_num++;
|
||||
o->lru_in_lru = true;
|
||||
}
|
||||
|
||||
|
||||
@ -113,6 +116,9 @@ class LRU {
|
||||
|
||||
// remove an item
|
||||
LRUObject *lru_remove(LRUObject *o) {
|
||||
// not in list
|
||||
if (!o->lru_in_lru) return o;
|
||||
|
||||
if (o->lru_in_top) {
|
||||
//cout << "removing " << o << " from top" << endl;
|
||||
// top
|
||||
@ -140,6 +146,7 @@ class LRU {
|
||||
}
|
||||
lru_num--;
|
||||
o->lru_next = o->lru_prev = NULL;
|
||||
o->lru_in_lru = false;
|
||||
return o;
|
||||
}
|
||||
|
||||
|
@ -144,8 +144,8 @@ void CDir::hard_unpin() {
|
||||
inode->adjust_nested_hard_pinned( -1 );
|
||||
|
||||
// pending freeze?
|
||||
if (hard_pinned + nested_hard_pinned == 0 &&
|
||||
waiting_on_freeze)
|
||||
if (waiting_to_freeze.size() &&
|
||||
hard_pinned + nested_hard_pinned == 0)
|
||||
freeze_finish();
|
||||
}
|
||||
|
||||
@ -154,8 +154,8 @@ int CDir::adjust_nested_hard_pinned(int a) {
|
||||
inode->adjust_nested_hard_pinned(a);
|
||||
|
||||
// pending freeze?
|
||||
if (hard_pinned + nested_hard_pinned == 0 &&
|
||||
waiting_on_freeze)
|
||||
if (waiting_to_freeze.size() &&
|
||||
hard_pinned + nested_hard_pinned == 0)
|
||||
freeze_finish();
|
||||
}
|
||||
|
||||
@ -182,30 +182,36 @@ void CDir::add_freeze_waiter(Context *c)
|
||||
|
||||
void CDir::freeze(Context *c)
|
||||
{
|
||||
cout << " state " << state << endl;
|
||||
assert((state & (CDIR_MASK_FROZEN|CDIR_MASK_FREEZING)) == 0);
|
||||
|
||||
state_set(CDIR_MASK_FROZEN);
|
||||
inode->get();
|
||||
|
||||
if (nested_hard_pinned == 0) {
|
||||
cout << "freeze " << *inode << endl;
|
||||
|
||||
state_set(CDIR_MASK_FROZEN);
|
||||
inode->hard_pin(); // hard_pin for duration of freeze
|
||||
|
||||
// easy, we're frozen
|
||||
c->finish(0);
|
||||
delete c;
|
||||
|
||||
} else {
|
||||
// need to wait for pins to expire
|
||||
state_set(CDIR_MASK_FREEZING);
|
||||
waiting_on_freeze = c;
|
||||
cout << "freeze + wait " << *inode << endl;
|
||||
// need to wait for pins to expire
|
||||
waiting_to_freeze.push_back(c);
|
||||
}
|
||||
}
|
||||
|
||||
void CDir::freeze_finish()
|
||||
{
|
||||
Context *c = waiting_on_freeze;
|
||||
waiting_on_freeze = NULL;
|
||||
cout << "freeze_finish " << *inode << endl;
|
||||
|
||||
inode->hard_pin(); // hard_pin for duration of freeze
|
||||
|
||||
Context *c = waiting_to_freeze.front();
|
||||
waiting_to_freeze.pop_front();
|
||||
state_clear(CDIR_MASK_FREEZING);
|
||||
state_set(CDIR_MASK_FROZEN);
|
||||
|
||||
if (c) {
|
||||
c->finish(0);
|
||||
@ -215,8 +221,9 @@ void CDir::freeze_finish()
|
||||
|
||||
void CDir::unfreeze() // thaw?
|
||||
{
|
||||
cout << "unfreeze " << *inode << endl;
|
||||
state_clear(CDIR_MASK_FROZEN);
|
||||
inode->put();
|
||||
inode->hard_unpin();
|
||||
|
||||
list<Context*> finished;
|
||||
take_waiting(finished);
|
||||
|
@ -64,13 +64,13 @@ class CDir {
|
||||
int dir_auth;
|
||||
int dir_rep;
|
||||
set<int> dir_rep_by; // if dir_rep == CDIR_REP_LIST
|
||||
bool is_import, is_export;
|
||||
//bool is_import, is_export;
|
||||
|
||||
|
||||
// lock nesting, freeze
|
||||
int hard_pinned;
|
||||
int nested_hard_pinned;
|
||||
Context *waiting_on_freeze; // freezer
|
||||
list<Context*> waiting_to_freeze; // wannabe freezer, NOT waiting for *this to thaw
|
||||
|
||||
DecayCounter popularity;
|
||||
|
||||
@ -89,7 +89,6 @@ class CDir {
|
||||
|
||||
hard_pinned = 0;
|
||||
nested_hard_pinned = 0;
|
||||
waiting_on_freeze = NULL;
|
||||
|
||||
dir_auth = CDIR_AUTH_PARENT;
|
||||
dir_rep = CDIR_REP_NONE;
|
||||
|
@ -22,6 +22,7 @@ CInode::CInode() : LRUObject() {
|
||||
|
||||
hard_pinned = 0;
|
||||
nested_hard_pinned = 0;
|
||||
// state = 0;
|
||||
|
||||
mid_fetch = false;
|
||||
}
|
||||
@ -37,6 +38,8 @@ CInode *CInode::get_parent_inode()
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void CInode::make_path(string& s)
|
||||
{
|
||||
if (parent) {
|
||||
@ -47,6 +50,13 @@ void CInode::make_path(string& s)
|
||||
s = ""; // root
|
||||
}
|
||||
|
||||
ostream& operator<<(ostream& out, CInode& in)
|
||||
{
|
||||
string path;
|
||||
in.make_path(path);
|
||||
return out << "[" << in.inode.ino << "]" << path;
|
||||
}
|
||||
|
||||
|
||||
void CInode::hit()
|
||||
{
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <list>
|
||||
#include <vector>
|
||||
#include <set>
|
||||
#include <iostream>
|
||||
using namespace std;
|
||||
|
||||
|
||||
@ -21,6 +22,9 @@ using namespace std;
|
||||
|
||||
#define CINODE_MASK_SYNC (CINODE_SYNC_START|CINODE_SYNC_LOCK|CINODE_SYNC_FINISH)
|
||||
|
||||
#define CINODE_MASK_IMPORT 16
|
||||
#define CINODE_MASK_EXPORT 32
|
||||
|
||||
|
||||
class Context;
|
||||
class CDentry;
|
||||
@ -29,6 +33,12 @@ class MDS;
|
||||
class MDCluster;
|
||||
class Message;
|
||||
|
||||
|
||||
|
||||
class CInode;
|
||||
ostream& operator<<(ostream& out, CInode& in);
|
||||
|
||||
|
||||
// cached inode wrapper
|
||||
class CInode : LRUObject {
|
||||
public:
|
||||
@ -52,7 +62,7 @@ class CInode : LRUObject {
|
||||
|
||||
// distributed caching
|
||||
set<int> cached_by; // mds's that cache me. not well defined on replicas.
|
||||
//int state;
|
||||
//unsigned state;
|
||||
//set<int> sync_waiting_for_ack;
|
||||
|
||||
// waiters
|
||||
@ -78,13 +88,28 @@ class CInode : LRUObject {
|
||||
|
||||
|
||||
CInode *get_parent_inode();
|
||||
CInode *get_realm_root(); // import, hash, or root
|
||||
|
||||
// fun
|
||||
bool is_dir() { return inode.isdir; }
|
||||
void make_path(string& s);
|
||||
|
||||
bool is_root() { return (bool)(!parent); }
|
||||
|
||||
void hit();
|
||||
|
||||
inodeno_t ino() { return inode.ino; }
|
||||
|
||||
// state
|
||||
/*
|
||||
unsigned get_state() { return state; }
|
||||
void reset_state(unsigned s) { state = s; }
|
||||
void state_clear(unsigned mask) { state &= ~mask; }
|
||||
void state_set(unsigned mask) { state |= mask; }
|
||||
|
||||
bool is_export() { return state & CINODE_MASK_EXPORT; }
|
||||
bool is_import() { return state & CINODE_MASK_IMPORT; }
|
||||
*/
|
||||
|
||||
// sync
|
||||
/*
|
||||
int get_sync() { return state & CINODE_MASK_SYNC; }
|
||||
@ -121,11 +146,13 @@ class CInode : LRUObject {
|
||||
ref--;
|
||||
if (ref == 0)
|
||||
lru_unpin();
|
||||
cout << "put " << *this << " now " << ref << endl;
|
||||
}
|
||||
void get() {
|
||||
if (ref == 0)
|
||||
lru_pin();
|
||||
ref++;
|
||||
cout << "get " << *this << " now " << ref << endl;
|
||||
}
|
||||
|
||||
// --- hierarchy stuff
|
||||
|
@ -17,8 +17,8 @@ using namespace std;
|
||||
|
||||
|
||||
#define MIN_LOAD 50 // ??
|
||||
#define MIN_REEXPORT 25 // will automatically reexport
|
||||
#define MIN_OFFLOAD 50 // point at which i stop trying, close enough
|
||||
#define MIN_REEXPORT 5 // will automatically reexport
|
||||
#define MIN_OFFLOAD 10 // point at which i stop trying, close enough
|
||||
|
||||
ostream& operator<<( ostream& out, mds_load_t& load )
|
||||
{
|
||||
@ -147,6 +147,7 @@ void MDBalancer::do_rebalance()
|
||||
|
||||
if (my_load < target_load.root_pop) {
|
||||
cout << " i am underloaded, doing nothing." << endl;
|
||||
mds->mdcache->show_imports();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -198,7 +199,9 @@ void MDBalancer::do_rebalance()
|
||||
it != mds->mdcache->imports.end();
|
||||
it++) {
|
||||
import_pop_map.insert(pair<double,CInode*>((*it)->popularity.get(), *it));
|
||||
import_from_map.insert(pair<int,CInode*>((*it)->authority(mds->get_cluster()), *it));
|
||||
int from = (*it)->authority(mds->get_cluster());
|
||||
cout << "map i imported " << **it << " from " << from << endl;
|
||||
import_from_map.insert(pair<int,CInode*>(from, *it));
|
||||
}
|
||||
|
||||
// do my exports!
|
||||
@ -212,24 +215,29 @@ void MDBalancer::do_rebalance()
|
||||
|
||||
cout << "mds" << whoami << " sending " << amount << " to " << target << endl;
|
||||
|
||||
mds->mdcache->show_imports();
|
||||
|
||||
// search imports from target
|
||||
if (import_from_map.count(target)) {
|
||||
cout << " aha, looking through imports from target " << endl;
|
||||
cout << " aha, looking through imports from target mds" << target << endl;
|
||||
pair<multimap<int,CInode*>::iterator, multimap<int,CInode*>::iterator> p;
|
||||
for (p = import_from_map.equal_range(import_from_map.begin(), import_from_map.end(), target);
|
||||
for (p = import_from_map.equal_range(target);
|
||||
p.first != p.second;
|
||||
p.first++) {
|
||||
CInode *in = (*p.first).second;
|
||||
if (in->is_root()) continue;
|
||||
double pop = in->popularity.get();
|
||||
assert(in->authority(mds->get_cluster()) == target);
|
||||
cout << "considering " << *in << " from " << (*p.first).first << endl;
|
||||
assert(in->authority(mds->get_cluster()) == target); // cuz that's how i put it in the map, dummy
|
||||
|
||||
if (pop <= amount) {
|
||||
cout << "reexporting " << in->inode.ino << " pop " << pop << " back to " << target << endl;
|
||||
cout << "reexporting " << *in << " pop " << pop << " back to " << target << endl;
|
||||
mds->mdcache->export_dir(in, target);
|
||||
amount -= pop;
|
||||
import_from_map.erase(target);
|
||||
import_pop_map.erase(pop);
|
||||
} else {
|
||||
cout << "can't reexport " << in->inode.ino << ", too big " << pop << endl;
|
||||
cout << "can't reexport " << *in << ", too big " << pop << endl;
|
||||
}
|
||||
if (amount < MIN_OFFLOAD) break;
|
||||
}
|
||||
@ -240,12 +248,15 @@ void MDBalancer::do_rebalance()
|
||||
for (map<double,CInode*>::iterator import = import_pop_map.begin();
|
||||
import != import_pop_map.end();
|
||||
import++) {
|
||||
CInode *imp = (*import).second;
|
||||
if (imp->is_root()) continue;
|
||||
|
||||
double pop = (*import).first;
|
||||
if (pop < amount ||
|
||||
pop < MIN_REEXPORT) {
|
||||
cout << "reexporting " << ((*import).second)->inode.ino << " pop " << pop << endl;
|
||||
cout << "reexporting " << *imp << " pop " << pop << endl;
|
||||
amount -= pop;
|
||||
mds->mdcache->export_dir((*import).second, ((*import).second)->authority(mds->get_cluster()));
|
||||
mds->mdcache->export_dir(imp, imp->authority(mds->get_cluster()));
|
||||
}
|
||||
if (amount < MIN_OFFLOAD) break;
|
||||
}
|
||||
@ -253,8 +264,6 @@ void MDBalancer::do_rebalance()
|
||||
|
||||
// okay, search for fragments of my workload
|
||||
set<CInode*> candidates = mds->mdcache->imports;
|
||||
if (whoami == 0)
|
||||
candidates.insert(mds->mdcache->get_root());
|
||||
|
||||
list<CInode*> exports;
|
||||
double have = 0;
|
||||
@ -267,12 +276,13 @@ void MDBalancer::do_rebalance()
|
||||
}
|
||||
|
||||
for (list<CInode*>::iterator it = exports.begin(); it != exports.end(); it++) {
|
||||
string name = "";
|
||||
(*it)->make_path(name);
|
||||
cout << " exporting fragment " << name << " pop " << (*it)->popularity.get() << endl;
|
||||
cout << " exporting fragment " << **it << " pop " << (*it)->popularity.get() << endl;
|
||||
mds->mdcache->export_dir(*it, target);
|
||||
}
|
||||
}
|
||||
|
||||
cout << "rebalance done" << endl;
|
||||
mds->mdcache->show_imports();
|
||||
|
||||
}
|
||||
|
||||
@ -296,7 +306,7 @@ void MDBalancer::find_exports(CInode *idir,
|
||||
list<CInode*> bigger;
|
||||
multimap<double, CInode*> smaller;
|
||||
|
||||
cout << " need " << need << " (" << needmin << " - " << needmax << endl;
|
||||
cout << " find_exports in " << *idir << " need " << need << " (" << needmin << " - " << needmax << ")" << endl;
|
||||
|
||||
for (CDir_map_t::iterator it = idir->dir->begin();
|
||||
it != idir->dir->end();
|
||||
@ -305,7 +315,7 @@ void MDBalancer::find_exports(CInode *idir,
|
||||
|
||||
if (!in->is_dir()) continue;
|
||||
if (!in->dir) continue; // clearly not popular
|
||||
if (in->dir->dir_auth != CDIR_AUTH_PARENT) continue;
|
||||
if (mds->mdcache->exports.count(in)) continue;
|
||||
if (in->dir->is_freeze_root()) continue; // can't export this right now!
|
||||
|
||||
double pop = in->popularity.get();
|
||||
@ -336,7 +346,7 @@ void MDBalancer::find_exports(CInode *idir,
|
||||
if ((*it).first < midchunk)
|
||||
break; // try later
|
||||
|
||||
cout << " taking smaller " << (*it).first << endl;
|
||||
cout << " taking smaller " << *(*it).second << endl;
|
||||
|
||||
exports.push_back((*it).second);
|
||||
have += (*it).first;
|
||||
@ -348,7 +358,7 @@ void MDBalancer::find_exports(CInode *idir,
|
||||
for (list<CInode*>::iterator it = bigger.begin();
|
||||
it != bigger.end();
|
||||
it++) {
|
||||
cout << " descending into " << (*it)->inode.ino << endl;
|
||||
cout << " descending into " << **it << endl;
|
||||
find_exports(*it, amount, exports, have);
|
||||
if (have > needmin)
|
||||
return;
|
||||
@ -359,7 +369,7 @@ void MDBalancer::find_exports(CInode *idir,
|
||||
it != smaller.rend();
|
||||
it++) {
|
||||
|
||||
cout << " taking (much) smaller " << (*it).first << endl;
|
||||
cout << " taking (much) smaller " << *(*it).second << endl;
|
||||
|
||||
exports.push_back((*it).second);
|
||||
have += (*it).first;
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include <errno.h>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <map>
|
||||
using namespace std;
|
||||
|
||||
MDCache::MDCache(MDS *m)
|
||||
@ -103,7 +104,7 @@ bool MDCache::trim(__int32_t max) {
|
||||
in->parent->dir->state_clear(CDIR_MASK_COMPLETE);
|
||||
|
||||
// remove it
|
||||
cout << "mds" << mds->get_nodeid() << " deleting " << in->inode.ino << " " << in << endl;
|
||||
cout << "mds" << mds->get_nodeid() << " trim deleting " << *in << " " << in << endl;
|
||||
remove_inode(in);
|
||||
delete in;
|
||||
|
||||
@ -219,6 +220,9 @@ int MDCache::open_root(Context *c)
|
||||
|
||||
set_root( root );
|
||||
|
||||
// root is technically an import (from a vacuum)
|
||||
imports.insert( root );
|
||||
|
||||
if (c) {
|
||||
c->finish(0);
|
||||
delete c;
|
||||
@ -245,6 +249,34 @@ int MDCache::open_root(Context *c)
|
||||
}
|
||||
|
||||
|
||||
CInode *MDCache::get_containing_import(CInode *in)
|
||||
{
|
||||
CInode *imp = in; // might be *in
|
||||
|
||||
// find the underlying import!
|
||||
while (imp &&
|
||||
imports.count(imp) == 0) {
|
||||
imp = imp->get_parent_inode();
|
||||
}
|
||||
|
||||
assert(imp);
|
||||
return imp;
|
||||
}
|
||||
|
||||
CInode *MDCache::get_containing_export(CInode *in)
|
||||
{
|
||||
CInode *ex = in; // might be *in
|
||||
|
||||
// find the underlying import!
|
||||
while (ex && // white not at root,
|
||||
exports.count(ex) == 0) { // we didn't find an export,
|
||||
ex = ex->get_parent_inode();
|
||||
}
|
||||
|
||||
return ex;
|
||||
}
|
||||
|
||||
|
||||
// SYNC
|
||||
|
||||
/*
|
||||
@ -452,7 +484,7 @@ int MDCache::path_traverse(string& path,
|
||||
// frozen?
|
||||
if (cur->dir->is_freeze_root()) {
|
||||
// doh!
|
||||
cout << "mds" << whoami << " dir is frozen, waiting" << endl;
|
||||
cout << "mds" << whoami << " dir " << *cur << " is frozen, waiting" << endl;
|
||||
cur->dir->add_freeze_waiter(new C_MDS_RetryMessage(mds, req));
|
||||
return 1;
|
||||
}
|
||||
@ -475,7 +507,7 @@ int MDCache::path_traverse(string& path,
|
||||
return -ENOENT;
|
||||
} else {
|
||||
// directory isn't complete; reload
|
||||
cout << "mds" << whoami << " incomplete dir contents for " << cur->inode.ino << ", fetching" << endl;
|
||||
cout << "mds" << whoami << " incomplete dir contents for " << *cur << ", fetching" << endl;
|
||||
mds->mdstore->fetch_dir(cur, new C_MDS_RetryMessage(mds, req));
|
||||
return 1;
|
||||
}
|
||||
@ -491,7 +523,7 @@ int MDCache::path_traverse(string& path,
|
||||
for (int i=depth; i<path_bits.size(); i++)
|
||||
want->push_back(path_bits[i]);
|
||||
|
||||
cur->get(); // pin discoveree
|
||||
lru->lru_touch(cur); // touch discoveree
|
||||
|
||||
mds->messenger->send_message(new MDiscover(whoami, have_clean, want),
|
||||
MSG_ADDR_MDS(dauth), MDS_PORT_CACHE,
|
||||
@ -516,7 +548,7 @@ int MDCache::path_traverse(string& path,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cout << cur->inode.ino << " not a dir " << cur->inode.isdir << endl;
|
||||
cout << *cur << " not a dir " << cur->inode.isdir << endl;
|
||||
return -ENOTDIR;
|
||||
}
|
||||
|
||||
@ -539,7 +571,7 @@ int MDCache::handle_discover(MDiscover *dis)
|
||||
// this is a result
|
||||
|
||||
if (dis->want == 0) {
|
||||
cout << "got root" << endl;
|
||||
cout << "mds" << mds->get_nodeid() << " handle_discover got root" << endl;
|
||||
|
||||
CInode *root = new CInode();
|
||||
root->inode = dis->trace[0].inode;
|
||||
@ -574,13 +606,14 @@ int MDCache::handle_discover(MDiscover *dis)
|
||||
// traverse to start point
|
||||
vector<CInode*> trav;
|
||||
|
||||
cout << "mds" << mds->get_nodeid() << " handle_discover got result" << endl;
|
||||
|
||||
int r = path_traverse(dis->basepath, trav, NULL, MDS_TRAVERSE_FAIL); // FIXME BUG
|
||||
if (r != 0) throw "wtf";
|
||||
|
||||
CInode *cur = trav[trav.size()-1];
|
||||
CInode *start = cur;
|
||||
|
||||
cur->put(); // unpin
|
||||
|
||||
list<Context*> finished;
|
||||
|
||||
@ -609,6 +642,7 @@ int MDCache::handle_discover(MDiscover *dis)
|
||||
|
||||
add_inode( in );
|
||||
link_inode( cur, (*dis->want)[i], in );
|
||||
cout << " adding " << *in << endl;
|
||||
}
|
||||
|
||||
cur->dir->take_waiting((*dis->want)[i],
|
||||
@ -629,6 +663,9 @@ int MDCache::handle_discover(MDiscover *dis)
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
cout << "mds" << mds->get_nodeid() << " handle_discover from mds" << dis->asker << " current_need() " << dis->current_need() << endl;
|
||||
|
||||
// this is a request
|
||||
if (!root) {
|
||||
open_root(new C_MDS_RetryMessage(mds, dis));
|
||||
@ -714,7 +751,7 @@ int MDCache::handle_discover(MDiscover *dis)
|
||||
throw "implement me";
|
||||
} else {
|
||||
// readdir
|
||||
cout << "mds" << whoami << " incomplete dir contents for " << cur->inode.ino << ", fetching" << endl;
|
||||
cout << "mds" << whoami << " incomplete dir contents for " << *cur << ", fetching" << endl;
|
||||
mds->mdstore->fetch_dir(cur, new C_MDS_RetryMessage(mds, dis));
|
||||
return 0;
|
||||
}
|
||||
@ -737,7 +774,7 @@ int MDCache::send_inode_updates(CInode *in)
|
||||
{
|
||||
set<int>::iterator it;
|
||||
for (it = in->cached_by.begin(); it != in->cached_by.end(); it++) {
|
||||
cout << "mds" << mds->get_nodeid() << " sending inode_update on " << in->inode.ino << " to " << *it << endl;
|
||||
cout << "mds" << mds->get_nodeid() << " sending inode_update on " << *in << " to " << *it << endl;
|
||||
mds->messenger->send_message(new MInodeUpdate(in->inode,
|
||||
in->cached_by),
|
||||
MSG_ADDR_MDS(*it), MDS_PORT_CACHE,
|
||||
@ -763,7 +800,7 @@ void MDCache::handle_inode_update(MInodeUpdate *m)
|
||||
}
|
||||
|
||||
// update!
|
||||
cout << "mds" << mds->get_nodeid() << " inode_update on " << m->inode.ino << endl;
|
||||
cout << "mds" << mds->get_nodeid() << " inode_update on " << *in << endl;
|
||||
|
||||
in->inode = m->inode;
|
||||
in->cached_by = m->cached_by;
|
||||
@ -791,10 +828,11 @@ void MDCache::handle_inode_expire(MInodeExpire *m)
|
||||
// remove
|
||||
cout << "mds" << mds->get_nodeid() << " inode_expire on " << m->ino << " from mds" << m->get_source() << endl;
|
||||
|
||||
if (in->cached_by.count(m->get_source()))
|
||||
if (!in->cached_by.empty()) {
|
||||
in->cached_by.erase(m->get_source());
|
||||
if (in->cached_by.empty())
|
||||
in->put();
|
||||
if (in->cached_by.empty())
|
||||
in->put();
|
||||
}
|
||||
|
||||
delete m;
|
||||
}
|
||||
@ -803,11 +841,12 @@ void MDCache::handle_inode_expire(MInodeExpire *m)
|
||||
int MDCache::send_dir_updates(CDir *dir, int except)
|
||||
{
|
||||
int whoami = mds->get_nodeid();
|
||||
set<int>::iterator it;
|
||||
for (it = dir->inode->cached_by.begin(); it != dir->inode->cached_by.end(); it++) {
|
||||
for (set<int>::iterator it = dir->inode->cached_by.begin();
|
||||
it != dir->inode->cached_by.end();
|
||||
it++) {
|
||||
if (*it == whoami) continue;
|
||||
if (*it == except) continue;
|
||||
cout << "mds" << whoami << " sending dir_update on " << dir->inode->inode.ino << " to " << *it << endl;
|
||||
cout << "mds" << whoami << " sending dir_update on " << *(dir->inode) << " to " << *it << endl;
|
||||
mds->messenger->send_message(new MDirUpdate(dir->inode->inode.ino,
|
||||
dir->dir_auth,
|
||||
dir->dir_rep,
|
||||
@ -865,7 +904,7 @@ void MDCache::handle_inode_sync_start(MInodeSyncStart *m)
|
||||
// we shouldn't be authoritative...
|
||||
assert(m->authority != mds->get_nodeid());
|
||||
|
||||
cout << "mds" << mds->get_nodeid() << " sync_start " << in->inode.ino << ", sending ack" << endl;
|
||||
cout << "mds" << mds->get_nodeid() << " sync_start " << *in << ", sending ack" << endl;
|
||||
|
||||
// lock it
|
||||
in->get();
|
||||
@ -1015,9 +1054,13 @@ void MDCache::export_dir(CInode *in,
|
||||
{
|
||||
if (!in->dir) in->dir = new CDir(in);
|
||||
|
||||
string path;
|
||||
in->make_path(path);
|
||||
cout << "mds" << mds->get_nodeid() << " export_dir " << path << " to " << dest << ", freezing" << endl;
|
||||
if (!in->parent) {
|
||||
cout << "i won't export root" << endl;
|
||||
throw "asdf";
|
||||
return;
|
||||
}
|
||||
|
||||
cout << "mds" << mds->get_nodeid() << " export_dir " << *in << " to " << dest << ", freezing" << endl;
|
||||
|
||||
if (in->dir->is_frozen()) {
|
||||
cout << " can't export, frozen!" << endl;
|
||||
@ -1039,29 +1082,65 @@ void MDCache::export_dir(CInode *in,
|
||||
in->dir->freeze(new C_MDS_ExportFreeze(mds, in, dest, pop));
|
||||
}
|
||||
|
||||
|
||||
void MDCache::export_dir_frozen(CInode *in,
|
||||
int dest,
|
||||
double pop)
|
||||
{
|
||||
// subtree is now frozen!
|
||||
string path;
|
||||
in->make_path(path);
|
||||
cout << "mds" << mds->get_nodeid() << " export_dir " << path << " to " << dest << ", frozen" << endl;
|
||||
cout << "mds" << mds->get_nodeid() << " export_dir " << *in << " to " << dest << ", frozen" << endl;
|
||||
|
||||
show_imports();
|
||||
|
||||
|
||||
// update imports/exports
|
||||
CInode *containing_import = get_containing_import(in);
|
||||
if (containing_import == in) {
|
||||
cout << " i'm rexporting a previous import" << endl;
|
||||
imports.erase(in);
|
||||
|
||||
in->put(); // unpin, no longer an import
|
||||
|
||||
// discard nested exports (that we're handing off
|
||||
pair<multimap<CInode*,CInode*>::iterator, multimap<CInode*,CInode*>::iterator> p =
|
||||
nested_exports.equal_range(in);
|
||||
while (p.first != p.second) {
|
||||
CInode *nested = (*p.first).second;
|
||||
|
||||
// nested beneath our new export *in; remove!
|
||||
cout << " export " << *nested << " was nested beneath us; removing from export list(s)" << endl;
|
||||
exports.erase(nested);
|
||||
nested_exports.erase(p.first++); // note this increments before call to erase
|
||||
}
|
||||
|
||||
// give it away
|
||||
in->dir->dir_auth = dest;
|
||||
if (imports.count(in)) {
|
||||
cout << " i'm rexporting this previous import" << endl;
|
||||
imports.remove(in);
|
||||
} else {
|
||||
cout << " i'm a subdir nested under import " << *containing_import << endl;
|
||||
exports.insert(in);
|
||||
in.get(); // and must keep it pinned
|
||||
nested_exports.insert(pair<CInode*,CInode*>(containing_import, in));
|
||||
|
||||
in->get(); // i must keep it pinned
|
||||
|
||||
// discard nested exports (that we're handing off)
|
||||
pair<multimap<CInode*,CInode*>::iterator, multimap<CInode*,CInode*>::iterator> p =
|
||||
nested_exports.equal_range(containing_import);
|
||||
while (p.first != p.second) {
|
||||
CInode *nested = (*p.first).second;
|
||||
CInode *containing_export = get_containing_export(nested);
|
||||
if (containing_export == in && nested != in) {
|
||||
// nested beneath our new export *in; remove!
|
||||
cout << " export " << *nested << " was nested beneath us; removing from export list(s)" << endl;
|
||||
exports.erase(nested);
|
||||
nested_exports.erase(p.first++); // note this increments before call to erase
|
||||
} else
|
||||
p.first++;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
// FIXME ****** look for subdirs exported to same target, and merge!
|
||||
|
||||
// give dir away
|
||||
in->dir->dir_auth = dest;
|
||||
|
||||
// build export message
|
||||
MExportDir *req = new MExportDir(in, pop); // include pop
|
||||
|
||||
// fill with relevant cache data
|
||||
@ -1083,7 +1162,7 @@ void MDCache::export_dir_walk(MExportDir *req,
|
||||
if (!idir->dir)
|
||||
return; // we don't ahve anything, obviously
|
||||
|
||||
cout << "export_dir_walk on " << idir->inode.ino << endl;
|
||||
cout << "export_dir_walk on " << *idir << " " << idir->dir->nitems << " items" << endl;
|
||||
|
||||
// dir
|
||||
crope dir_rope;
|
||||
@ -1117,7 +1196,7 @@ void MDCache::export_dir_walk(MExportDir *req,
|
||||
CDir_map_t::iterator it;
|
||||
for (it = idir->dir->begin(); it != idir->dir->end(); it++) {
|
||||
CInode *in = it->second->inode;
|
||||
|
||||
|
||||
// dir?
|
||||
if (in->is_dir())
|
||||
subdirs.push_back(in);
|
||||
@ -1130,8 +1209,13 @@ void MDCache::export_dir_walk(MExportDir *req,
|
||||
istate.inode = in->inode;
|
||||
istate.version = in->version;
|
||||
istate.popularity = in->popularity;
|
||||
istate.ref = in->ref;
|
||||
//istate.ref = in->ref;
|
||||
istate.ncached_by = in->cached_by.size();
|
||||
if (exports.count(in))
|
||||
istate.dir_auth = in->dir->dir_auth;
|
||||
else
|
||||
istate.dir_auth = -1;
|
||||
|
||||
dir_rope.append( (char*)&istate, sizeof(istate) );
|
||||
|
||||
for (set<int>::iterator it = in->cached_by.begin();
|
||||
@ -1163,17 +1247,18 @@ void MDCache::handle_export_dir_ack(MExportDirAck *m)
|
||||
// exported!
|
||||
CInode *in = mds->mdcache->get_inode(m->ino);
|
||||
|
||||
string path;
|
||||
in->make_path(path);
|
||||
cout << "mds" << mds->get_nodeid() << " export_dir_ack " << path << endl;
|
||||
cout << "mds" << mds->get_nodeid() << " export_dir_ack " << *in << endl;
|
||||
|
||||
// remove the metadata from the cache
|
||||
if (in->dir)
|
||||
export_dir_purge( in );
|
||||
|
||||
// unfreeze
|
||||
cout << "mds" << mds->get_nodeid() << " export_dir_ack " << *in << " unfrozen" << endl;
|
||||
in->dir->unfreeze();
|
||||
|
||||
show_imports();
|
||||
|
||||
// done
|
||||
delete m;
|
||||
}
|
||||
@ -1182,28 +1267,40 @@ void MDCache::handle_export_dir_ack(MExportDirAck *m)
|
||||
// called by handle_expirt_dir_ack
|
||||
void MDCache::export_dir_purge(CInode *idir)
|
||||
{
|
||||
cout << "export_dir_purge on " << idir->inode.ino << endl;
|
||||
cout << "export_dir_purge on " << *idir << endl;
|
||||
|
||||
CDir_map_t::iterator it;
|
||||
for (it = idir->dir->begin(); it != idir->dir->end(); it++) {
|
||||
CDir_map_t::iterator it = idir->dir->begin();
|
||||
while (it != idir->dir->end()) {
|
||||
CInode *in = it->second->inode;
|
||||
it++;
|
||||
|
||||
if (in->is_dir() && in->dir)
|
||||
export_dir_purge(in);
|
||||
|
||||
|
||||
// dir incomplete!
|
||||
in->parent->dir->state_clear(CDIR_MASK_COMPLETE);
|
||||
|
||||
cout << "mds" << mds->get_nodeid() << " deleting " << in->inode.ino << " " << in << endl;
|
||||
remove_inode(in);
|
||||
delete in;
|
||||
if (!in->cached_by.empty()) {
|
||||
in->cached_by.clear(); // only get to do this once, because we're newly non-authoritative.
|
||||
in->put();
|
||||
}
|
||||
|
||||
if (in->lru_expireable) {
|
||||
lru->lru_remove(in);
|
||||
cout << "mds" << mds->get_nodeid() << " export_dir_purge deleting " << *in << " " << in << endl;
|
||||
remove_inode(in);
|
||||
delete in;
|
||||
} else {
|
||||
cout << "mds" << mds->get_nodeid() << " export_dir_purge not deleting non-expireable " << *in << " " << in << endl;
|
||||
}
|
||||
}
|
||||
|
||||
cout << "export_dir_purge on " << idir->inode.ino << " done" << endl;
|
||||
cout << "export_dir_purge on " << *idir << " done" << endl;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// IMPORTS
|
||||
|
||||
|
||||
@ -1211,6 +1308,8 @@ void MDCache::handle_export_dir(MExportDir *m)
|
||||
{
|
||||
cout << "mds" << mds->get_nodeid() << " import_dir " << m->path << endl;
|
||||
|
||||
show_imports();
|
||||
|
||||
vector<CInode*> trav;
|
||||
|
||||
int r = path_traverse(m->path, trav, m, MDS_TRAVERSE_DISCOVER); // FIXME BUG
|
||||
@ -1219,30 +1318,50 @@ void MDCache::handle_export_dir(MExportDir *m)
|
||||
|
||||
CInode *in = trav[trav.size()-1];
|
||||
|
||||
in->get(); // pin for the import process only.
|
||||
|
||||
if (!in->dir) in->dir = new CDir(in);
|
||||
|
||||
// it's mine, now!
|
||||
in->dir->dir_auth = mds->get_nodeid();
|
||||
|
||||
CInode *containing_import;
|
||||
if (exports.count(in)) {
|
||||
// reimporting
|
||||
cout << " i'm reimporting this dir!" << endl;
|
||||
exports.remove(in);
|
||||
exports.erase(in);
|
||||
|
||||
in->put(); // unpin, no longer an export
|
||||
|
||||
containing_import = get_containing_import(in);
|
||||
cout << " it is nested under import " << containing_import << endl;
|
||||
for (pair< multimap<CInode*,CInode*>::iterator, multimap<CInode*,CInode*>::iterator > p =
|
||||
nested_exports.equal_range( containing_import );
|
||||
p.first != p.second;
|
||||
p.first++) {
|
||||
if ((*p.first).second == in) {
|
||||
nested_exports.erase(p.first);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// new import
|
||||
imports.insert(in);
|
||||
in.get(); // must keep it pinned
|
||||
|
||||
in->get(); // must keep it pinned
|
||||
|
||||
containing_import = in; // imported exports nested under *in
|
||||
}
|
||||
|
||||
|
||||
// **** FIXME **** look for subdirs that i reimported, and merge.
|
||||
|
||||
|
||||
// add this crap to my cache
|
||||
const char *p = m->state.c_str();
|
||||
for (int i = 0; i < m->ndirs; i++)
|
||||
import_dir_block(p);
|
||||
import_dir_block(p, containing_import);
|
||||
|
||||
// can i simplify dir_auth?
|
||||
if (in->authority(mds->get_cluster()) == in->dir->dir_auth)
|
||||
in->dir->dir_auth = CDIR_AUTH_PARENT;
|
||||
|
||||
// ignore "frozen" state of the main dir; it's from the authority
|
||||
in->dir->state_clear(CDIR_MASK_FROZEN);
|
||||
|
||||
@ -1268,14 +1387,19 @@ void MDCache::handle_export_dir(MExportDir *m)
|
||||
// tell everyone the news
|
||||
send_dir_updates(in->dir, m->get_source());
|
||||
|
||||
in->put(); // import done, unpin.
|
||||
|
||||
show_imports();
|
||||
|
||||
// done
|
||||
delete m;
|
||||
}
|
||||
|
||||
void MDCache::import_dir_block(pchar& p)
|
||||
void MDCache::import_dir_block(pchar& p, CInode *containing_import)
|
||||
{
|
||||
// set up dir
|
||||
Dir_Export_State_t *dstate = (Dir_Export_State_t*)p;
|
||||
cout << " import_dir_block " << dstate->ino << " " << dstate->nitems << " items" << endl;
|
||||
CInode *idir = get_inode(dstate->ino);
|
||||
assert(idir);
|
||||
|
||||
@ -1301,24 +1425,92 @@ void MDCache::import_dir_block(pchar& p)
|
||||
|
||||
// inode
|
||||
Inode_Export_State_t *istate = (Inode_Export_State_t*)p;
|
||||
CInode *in = new CInode;
|
||||
in->inode = istate->inode;
|
||||
CInode *in = get_inode(istate->inode.ino);
|
||||
if (!in) {
|
||||
in = new CInode;
|
||||
in->inode = istate->inode;
|
||||
|
||||
// add
|
||||
add_inode(in);
|
||||
link_inode(idir, dname, in);
|
||||
cout << " adding " << *in << endl;
|
||||
} else {
|
||||
cout << " already had " << *in << endl;
|
||||
in->inode = istate->inode;
|
||||
}
|
||||
|
||||
// update inode state with authoritative info
|
||||
in->version = istate->version;
|
||||
in->popularity = istate->popularity;
|
||||
|
||||
p += sizeof(*istate);
|
||||
|
||||
in->cached_by.clear();
|
||||
for (int nby = istate->ncached_by; nby>0; nby--) {
|
||||
in->cached_by.insert( *((int*)p) );
|
||||
p += sizeof(int);
|
||||
}
|
||||
|
||||
// ... ?
|
||||
// other state? ... ?
|
||||
|
||||
// add
|
||||
add_inode(in);
|
||||
link_inode(idir, dname, in);
|
||||
// was this an export?
|
||||
if (istate->dir_auth >= 0) {
|
||||
cout << "mds" << mds->get_nodeid() << " importing nested export " << *in << " to " << istate->dir_auth << endl;
|
||||
if (!in->dir) in->dir = new CDir(in);
|
||||
|
||||
// to us?
|
||||
if (in->dir->dir_auth == mds->get_nodeid()) {
|
||||
// adjust the import
|
||||
cout << " it was exported to me. adjusting\n";
|
||||
imports.erase(in);
|
||||
|
||||
// move nested exports under containing_import
|
||||
for (pair<multimap<CInode*,CInode*>::iterator, multimap<CInode*,CInode*>::iterator> p =
|
||||
nested_exports.equal_range(in);
|
||||
p.first != p.second;
|
||||
p.first++) {
|
||||
CInode *nested = (*p.first).second;
|
||||
cout << " moving nested export " << nested << " under " << containing_import << endl;
|
||||
nested_exports.insert(pair<CInode*,CInode*>(containing_import, nested));
|
||||
}
|
||||
|
||||
// de-list under old import
|
||||
nested_exports.erase(in);
|
||||
|
||||
in->dir->dir_auth = CDIR_AUTH_PARENT;
|
||||
} else {
|
||||
// add this export
|
||||
in->dir->dir_auth = istate->dir_auth;
|
||||
exports.insert(in);
|
||||
nested_exports.insert(pair<CInode*,CInode*>(containing_import, in));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
void MDCache::show_imports()
|
||||
{
|
||||
if (imports.size() == 0) {
|
||||
cout << "mds" << mds->get_nodeid() << " no imports/exports" << endl;
|
||||
return;
|
||||
}
|
||||
cout << "mds" << mds->get_nodeid() << " imports/exports:" << endl;
|
||||
for (set<CInode*>::iterator it = imports.begin();
|
||||
it != imports.end();
|
||||
it++) {
|
||||
cout << "mds" << mds->get_nodeid() << " + import " << **it << endl;
|
||||
|
||||
for (pair< multimap<CInode*,CInode*>::iterator, multimap<CInode*,CInode*>::iterator > p =
|
||||
nested_exports.equal_range( *it );
|
||||
p.first != p.second;
|
||||
p.first++) {
|
||||
CInode *exp = (*p.first).second;
|
||||
cout << "mds" << mds->get_nodeid() << " - ex " << *exp << " to " << exp->dir->dir_auth << endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -43,8 +43,10 @@ class MDCache {
|
||||
bool opening_root;
|
||||
list<Context*> waiting_for_root;
|
||||
|
||||
set<CInode*> imports;
|
||||
set<CInode*> imports; // includes root (on mds0)
|
||||
set<CInode*> exports;
|
||||
multimap<CInode*,CInode*> nested_exports; // nested exports of (imports|root)
|
||||
|
||||
|
||||
friend class MDBalancer;
|
||||
|
||||
@ -85,6 +87,10 @@ class MDCache {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
CInode *get_containing_import(CInode *in);
|
||||
CInode *get_containing_export(CInode *in);
|
||||
|
||||
|
||||
// adding/removing
|
||||
bool remove_inode(CInode *in);
|
||||
bool add_inode(CInode *in);
|
||||
@ -115,7 +121,7 @@ class MDCache {
|
||||
void export_dir_purge(CInode *idir);
|
||||
|
||||
void handle_export_dir(MExportDir *m);
|
||||
void import_dir_block(pchar& p);
|
||||
void import_dir_block(pchar& p, CInode *containing_import);
|
||||
|
||||
int send_inode_updates(CInode *in);
|
||||
void handle_inode_update(MInodeUpdate *m);
|
||||
@ -144,6 +150,8 @@ class MDCache {
|
||||
if (root) root->dump();
|
||||
}
|
||||
|
||||
void show_imports();
|
||||
|
||||
void dump_to_disk(MDS *m) {
|
||||
if (root) root->dump_to_disk(m);
|
||||
}
|
||||
|
@ -82,9 +82,7 @@ bool MDStore::fetch_dir( CInode *in,
|
||||
|
||||
bool MDStore::fetch_dir_2( int result, char *buf, size_t buflen, CInode *dir)
|
||||
{
|
||||
string dirname;
|
||||
dir->make_path(dirname);
|
||||
cout << "fetch_dir_2 on " << dirname << endl;
|
||||
cout << "fetch_dir_2 on " << *dir << endl;
|
||||
|
||||
// make sure we have a CDir
|
||||
if (dir->dir == NULL) {
|
||||
@ -93,6 +91,7 @@ bool MDStore::fetch_dir_2( int result, char *buf, size_t buflen, CInode *dir)
|
||||
|
||||
// parse buffer contents into cache
|
||||
__uint32_t num = *((__uint32_t*)buf);
|
||||
cout << " " << num << " items" << endl;
|
||||
size_t p = 4;
|
||||
while (p < buflen && num > 0) {
|
||||
// dentry
|
||||
|
@ -15,7 +15,8 @@ typedef struct {
|
||||
inode_t inode;
|
||||
__uint64_t version;
|
||||
DecayCounter popularity;
|
||||
int ref; // fyi for debugging?
|
||||
//int ref; // hmm, fyi for debugging?
|
||||
int dir_auth;
|
||||
|
||||
int ncached_by; // ints follow
|
||||
} Inode_Export_State_t;
|
||||
|
Loading…
Reference in New Issue
Block a user