mirror of
https://github.com/ceph/ceph
synced 2025-02-20 17:37:29 +00:00
frag bugs, migrator bugs (!), other frag+migration work
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1552 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
3624203b23
commit
ab4b8a5fca
@ -113,7 +113,7 @@ class frag_t {
|
||||
|
||||
// binary splitting
|
||||
frag_t get_sibling() const {
|
||||
assert(bits() > 0);
|
||||
assert(!is_root());
|
||||
return frag_t(_enc ^ (1 << (bits()-1)));
|
||||
}
|
||||
bool is_left() const {
|
||||
@ -511,7 +511,8 @@ public:
|
||||
bool clean = true;
|
||||
set<frag_t>::iterator p = _set.begin();
|
||||
while (p != _set.end()) {
|
||||
if (_set.count(p->get_sibling())) {
|
||||
if (!p->is_root() &&
|
||||
_set.count(p->get_sibling())) {
|
||||
_set.erase(p->get_sibling());
|
||||
_set.insert(p->parent());
|
||||
_set.erase(p++);
|
||||
|
@ -465,11 +465,15 @@ void CDir::purge_stolen(list<Context*>& waiters)
|
||||
// take waiters _before_ unfreeze...
|
||||
take_waiting(WAIT_ANY, waiters);
|
||||
|
||||
assert(is_frozen_dir());
|
||||
unfreeze_dir();
|
||||
if (is_auth()) {
|
||||
assert(is_frozen_dir());
|
||||
unfreeze_dir();
|
||||
}
|
||||
|
||||
nnull = nitems = 0;
|
||||
|
||||
if (is_auth())
|
||||
clear_replica_map();
|
||||
if (is_dirty()) mark_clean();
|
||||
if (state_test(STATE_EXPORT)) put(PIN_EXPORT);
|
||||
if (state_test(STATE_IMPORTBOUND)) put(PIN_IMPORTBOUND);
|
||||
@ -477,23 +481,23 @@ void CDir::purge_stolen(list<Context*>& waiters)
|
||||
|
||||
if (auth_pins > 0) put(PIN_AUTHPIN);
|
||||
|
||||
assert(get_num_ref() == 0);
|
||||
assert(get_num_ref() == (state_test(STATE_STICKY) ? 1:0));
|
||||
}
|
||||
|
||||
void CDir::init_fragment_pins()
|
||||
{
|
||||
if (!replica_map.empty()) get(PIN_REPLICATED);
|
||||
if (state_test(STATE_DIRTY)) get(PIN_DIRTY);
|
||||
if (state_test(STATE_EXPORT)) get(PIN_EXPORT);
|
||||
if (state_test(STATE_EXPORTBOUND)) get(PIN_EXPORTBOUND);
|
||||
if (state_test(STATE_IMPORTBOUND)) get(PIN_IMPORTBOUND);
|
||||
if (state_test(STATE_STICKY)) get(PIN_STICKY);
|
||||
}
|
||||
|
||||
void CDir::split(int bits, list<CDir*>& subs, list<Context*>& waiters)
|
||||
{
|
||||
dout(10) << "split by " << bits << " bits" << endl;
|
||||
dout(10) << "split by " << bits << " bits on " << *this << endl;
|
||||
|
||||
assert(is_complete());
|
||||
assert(is_complete() || !is_auth());
|
||||
|
||||
list<frag_t> frags;
|
||||
frag.split(bits, frags);
|
||||
@ -505,11 +509,11 @@ void CDir::split(int bits, list<CDir*>& subs, list<Context*>& waiters)
|
||||
for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
|
||||
CDir *f = new CDir(inode, *p, cache, is_auth());
|
||||
f->state_set(state & MASK_STATE_FRAGMENT_KEPT);
|
||||
f->replica_map = replica_map;
|
||||
f->dir_auth = dir_auth;
|
||||
f->init_fragment_pins();
|
||||
f->version = version;
|
||||
f->projected_version = projected_version;
|
||||
f->replica_map = replica_map;
|
||||
f->freeze_dir(0);
|
||||
dout(10) << " subfrag " << *p << " " << *f << endl;
|
||||
subfrags[n++] = f;
|
||||
subs.push_back(f);
|
||||
@ -556,6 +560,7 @@ void CDir::merge(int bits, list<Context*>& waiters)
|
||||
|
||||
// merge state
|
||||
state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
|
||||
dir_auth = dir->dir_auth;
|
||||
|
||||
dir->purge_stolen(waiters);
|
||||
inode->close_dirfrag(dir->get_frag());
|
||||
@ -1377,7 +1382,6 @@ void CDir::freeze_tree(Context *c)
|
||||
assert(!is_freezing());
|
||||
|
||||
if (is_freezeable()) {
|
||||
dout(10) << "freeze_tree " << *this << endl;
|
||||
_freeze_tree(c);
|
||||
} else {
|
||||
state_set(STATE_FREEZINGTREE);
|
||||
@ -1523,7 +1527,6 @@ void CDir::freeze_dir(Context *c)
|
||||
assert(!is_freezing());
|
||||
|
||||
if (is_freezeable_dir()) {
|
||||
dout(10) << "freeze_dir " << *this << endl;
|
||||
_freeze_dir(c);
|
||||
} else {
|
||||
state_set(STATE_FREEZINGDIR);
|
||||
@ -1538,8 +1541,6 @@ void CDir::_freeze_dir(Context *c)
|
||||
{
|
||||
dout(10) << "_freeze_dir " << *this << endl;
|
||||
|
||||
assert(is_freezeable_dir());
|
||||
|
||||
state_clear(STATE_FREEZINGDIR);
|
||||
state_set(STATE_FROZENDIR);
|
||||
get(PIN_FROZEN);
|
||||
|
@ -125,8 +125,7 @@ class CDir : public MDSCacheObject {
|
||||
STATE_COMPLETE |
|
||||
STATE_EXPORT |
|
||||
STATE_EXPORTBOUND |
|
||||
STATE_IMPORTBOUND |
|
||||
STATE_STICKY);
|
||||
STATE_IMPORTBOUND);
|
||||
|
||||
// -- rep spec --
|
||||
static const int REP_NONE = 0;
|
||||
|
@ -144,6 +144,25 @@ void CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
|
||||
ls.push_back(dirfrags[*p]);
|
||||
}
|
||||
|
||||
CDir *CInode::get_approx_dirfrag(frag_t fg)
|
||||
{
|
||||
CDir *dir = get_dirfrag(fg);
|
||||
if (dir) return dir;
|
||||
|
||||
// find a child?
|
||||
list<CDir*> ls;
|
||||
get_dirfrags_under(fg, ls);
|
||||
if (!ls.empty())
|
||||
return ls.front();
|
||||
|
||||
// try parents?
|
||||
while (1) {
|
||||
fg = fg.parent();
|
||||
dir = get_dirfrag(fg);
|
||||
if (dir) return dir;
|
||||
}
|
||||
}
|
||||
|
||||
void CInode::get_dirfrags(list<CDir*>& ls)
|
||||
{
|
||||
// all dirfrags
|
||||
|
@ -151,6 +151,7 @@ public:
|
||||
return 0;
|
||||
}
|
||||
void get_dirfrags_under(frag_t fg, list<CDir*>& ls);
|
||||
CDir* get_approx_dirfrag(frag_t fg);
|
||||
void get_dirfrags(list<CDir*>& ls);
|
||||
void get_nested_dirfrags(list<CDir*>& ls);
|
||||
void get_subtree_dirfrags(list<CDir*>& ls);
|
||||
|
@ -3208,8 +3208,10 @@ void MDCache::handle_cache_expire(MCacheExpire *m)
|
||||
++p) {
|
||||
// check container?
|
||||
if (p->first.ino > 0) {
|
||||
CDir *con = get_dirfrag(p->first);
|
||||
assert(con); // we had better have this.
|
||||
CInode *coni = get_inode(p->first.ino);
|
||||
assert(coni); // we had better have this.
|
||||
CDir *con = coni->get_approx_dirfrag(p->first.frag);
|
||||
assert(con);
|
||||
|
||||
if (!con->is_auth() ||
|
||||
(con->is_auth() && con->is_exporting() &&
|
||||
@ -3294,20 +3296,31 @@ void MDCache::handle_cache_expire(MCacheExpire *m)
|
||||
pd != p->second.dentries.end();
|
||||
++pd) {
|
||||
dout(0) << " dn expires in dir " << pd->first << endl;
|
||||
CDir *dir = get_dirfrag(pd->first);
|
||||
CInode *diri = get_inode(pd->first.ino);
|
||||
assert(diri);
|
||||
CDir *dir = diri->get_dirfrag(pd->first.frag);
|
||||
|
||||
if (!dir) {
|
||||
dout(0) << " dn expires on " << pd->first << " from " << from << ", don't have it" << endl;
|
||||
assert(dir);
|
||||
}
|
||||
assert(dir->is_auth());
|
||||
dout(0) << " dn expires on " << pd->first << " from " << from << ", must have refragmented" << endl;
|
||||
} else {
|
||||
assert(dir->is_auth());
|
||||
}
|
||||
|
||||
for (map<string,int>::iterator p = pd->second.begin();
|
||||
p != pd->second.end();
|
||||
++p) {
|
||||
int nonce = p->second;
|
||||
CDentry *dn;
|
||||
|
||||
CDentry *dn = dir->lookup(p->first);
|
||||
if (dir) {
|
||||
dn = dir->lookup(p->first);
|
||||
} else {
|
||||
// which dirfrag for this dentry?
|
||||
CDir *dir = diri->get_dirfrag(diri->pick_dirfrag(p->first));
|
||||
assert(dir->is_auth());
|
||||
dn = dir->lookup(p->first);
|
||||
}
|
||||
|
||||
if (!dn)
|
||||
dout(0) << " missing dentry for " << p->first << " in " << *dir << endl;
|
||||
assert(dn);
|
||||
@ -5446,17 +5459,17 @@ void MDCache::handle_dentry_unlink(MDentryUnlink *m)
|
||||
|
||||
|
||||
/**
|
||||
* _refragment_dir -- adjust fragmentation for a directory
|
||||
* adjust_dir_fragments -- adjust fragmentation for a directory
|
||||
*
|
||||
* @diri - directory inode
|
||||
* @basefrag - base fragment
|
||||
* @bits - bit adjustment. positive for split, negative for merge.
|
||||
*/
|
||||
void MDCache::_refragment_dir(CInode *diri, frag_t basefrag, int bits,
|
||||
list<CDir*>& resultfrags,
|
||||
list<Context*>& waiters)
|
||||
void MDCache::adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits,
|
||||
list<CDir*>& resultfrags,
|
||||
list<Context*>& waiters)
|
||||
{
|
||||
dout(10) << "_refragment_dir " << basefrag << " " << bits
|
||||
dout(10) << "adjust_dir_fragments " << basefrag << " " << bits
|
||||
<< " on " << *diri << endl;
|
||||
|
||||
// adjust fragtree
|
||||
@ -5466,12 +5479,45 @@ void MDCache::_refragment_dir(CInode *diri, frag_t basefrag, int bits,
|
||||
CDir *base = diri->get_or_open_dirfrag(this, basefrag);
|
||||
|
||||
if (bits > 0) {
|
||||
if (base)
|
||||
if (base) {
|
||||
CDir *baseparent = base->get_parent_dir();
|
||||
|
||||
base->split(bits, resultfrags, waiters);
|
||||
|
||||
// did i change the subtree map?
|
||||
if (base->is_subtree_root()) {
|
||||
// am i a bound?
|
||||
if (baseparent) {
|
||||
CDir *parent = get_subtree_root(baseparent);
|
||||
assert(subtrees[parent].count(base));
|
||||
subtrees[parent].erase(base);
|
||||
for (list<CDir*>::iterator p = resultfrags.begin();
|
||||
p != resultfrags.end();
|
||||
++p) {
|
||||
subtrees[parent].insert(*p);
|
||||
subtrees[*p].clear(); // new frag is now its own subtree
|
||||
}
|
||||
}
|
||||
|
||||
// adjust my bounds.
|
||||
set<CDir*> bounds;
|
||||
bounds.swap(subtrees[base]);
|
||||
subtrees.erase(base);
|
||||
for (set<CDir*>::iterator p = bounds.begin();
|
||||
p != bounds.end();
|
||||
++p) {
|
||||
CDir *frag = get_subtree_root((*p)->get_parent_dir());
|
||||
subtrees[frag].insert(*p);
|
||||
}
|
||||
|
||||
show_subtrees(10);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assert(base);
|
||||
base->merge(bits, waiters);
|
||||
resultfrags.push_back(base);
|
||||
assert(0); // FIXME adjust subtree map! and clean up this code, probably.
|
||||
}
|
||||
}
|
||||
|
||||
@ -5523,6 +5569,11 @@ void MDCache::split_dir(CDir *dir, int bits)
|
||||
|
||||
/*
|
||||
* initial the freeze, blocking with an auth_pin.
|
||||
*
|
||||
* some reason(s) we have to freeze:
|
||||
* - on merge, version/projected version are unified from all fragments;
|
||||
* concurrent pipelined updates in the directory will have divergent
|
||||
* versioning... and that's no good.
|
||||
*/
|
||||
void MDCache::fragment_freeze(CInode *diri, list<CDir*>& frags, frag_t basefrag, int bits)
|
||||
{
|
||||
@ -5621,10 +5672,10 @@ void MDCache::fragment_go(CInode *diri, list<CDir*>& startfrags, frag_t basefrag
|
||||
// refragment
|
||||
list<CDir*> resultfrags;
|
||||
list<Context*> waiters;
|
||||
_refragment_dir(diri, basefrag, bits, resultfrags, waiters);
|
||||
adjust_dir_fragments(diri, basefrag, bits, resultfrags, waiters);
|
||||
mds->queue_waiters(waiters);
|
||||
|
||||
// dirty resulting frags
|
||||
// freeze and dirty resulting frags
|
||||
set<int> peers;
|
||||
vector<version_t> pvs;
|
||||
for (list<CDir*>::iterator p = resultfrags.begin();
|
||||
@ -5633,6 +5684,8 @@ void MDCache::fragment_go(CInode *diri, list<CDir*>& startfrags, frag_t basefrag
|
||||
CDir *dir = *p;
|
||||
dout(10) << " result frag " << *dir << endl;
|
||||
|
||||
dir->_freeze_dir();
|
||||
|
||||
// first time only,
|
||||
if (p == resultfrags.begin()) {
|
||||
le->metablob.add_dir_context(dir);
|
||||
@ -5733,7 +5786,7 @@ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
|
||||
list<Context*> waiters;
|
||||
|
||||
// add replica dir (for merge)?
|
||||
// (_refragment_dir expects base to already exist, if non-auth)
|
||||
// (adjust_dir_fragments expects base to already exist, if non-auth)
|
||||
if (notify->get_bits() < 0) {
|
||||
CDirDiscover basedis;
|
||||
int off = 0;
|
||||
@ -5744,17 +5797,9 @@ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
|
||||
|
||||
// refragment
|
||||
list<CDir*> resultfrags;
|
||||
_refragment_dir(diri, notify->get_basefrag(), notify->get_bits(),
|
||||
resultfrags, waiters);
|
||||
adjust_dir_fragments(diri, notify->get_basefrag(), notify->get_bits(),
|
||||
resultfrags, waiters);
|
||||
mds->queue_waiters(waiters);
|
||||
|
||||
// writebehind?
|
||||
if (diri->is_auth()) {
|
||||
LogEvent *le = new EFragment(diri->ino(),
|
||||
notify->get_basefrag(),
|
||||
notify->get_bits());
|
||||
mds->mdlog->submit_entry(le);
|
||||
}
|
||||
}
|
||||
|
||||
delete notify;
|
||||
|
@ -598,9 +598,8 @@ protected:
|
||||
|
||||
// -- fragmenting --
|
||||
private:
|
||||
void _refragment_dir(CInode *diri, frag_t basefrag, int bits,
|
||||
list<CDir*>& frags,
|
||||
list<Context*>& waiters);
|
||||
void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits,
|
||||
list<CDir*>& frags, list<Context*>& waiters);
|
||||
friend class EFragment;
|
||||
|
||||
public:
|
||||
|
@ -1262,27 +1262,19 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
|
||||
dout(7) << "handle_export_discover_2 failed to discover or not dir " << m->get_path() << ", NAK" << endl;
|
||||
assert(0); // this shouldn't happen if the auth pins his path properly!!!!
|
||||
}
|
||||
|
||||
CInode *in;
|
||||
if (trace.empty()) {
|
||||
in = cache->get_root();
|
||||
if (!in) {
|
||||
cache->open_root(new C_MDS_RetryMessage(mds, m));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
in = trace[trace.size()-1]->inode;
|
||||
}
|
||||
|
||||
assert(0); // this shouldn't happen; the get_inode above would have succeeded.
|
||||
}
|
||||
|
||||
// yay
|
||||
|
||||
dout(7) << "handle_export_discover have " << df << " inode " << *in << endl;
|
||||
|
||||
import_state[m->get_dirfrag()] = IMPORT_DISCOVERED;
|
||||
|
||||
// pin inode in the cache (for now)
|
||||
assert(in->is_dir());
|
||||
in->get(CInode::PIN_IMPORTING);
|
||||
|
||||
|
||||
// reply
|
||||
dout(7) << " sending export_discover_ack on " << *in << endl;
|
||||
mds->send_message_mds(new MExportDirDiscoverAck(df),
|
||||
@ -1315,7 +1307,8 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
|
||||
|
||||
// make sure we didn't abort
|
||||
if (import_state.count(m->get_dirfrag()) == 0 ||
|
||||
import_state[m->get_dirfrag()] != IMPORT_DISCOVERED ||
|
||||
(import_state[m->get_dirfrag()] != IMPORT_DISCOVERED &&
|
||||
import_state[m->get_dirfrag()] != IMPORT_PREPPING) ||
|
||||
import_peer[m->get_dirfrag()] != oldauth) {
|
||||
dout(10) << "handle_export_prep import has aborted, dropping" << endl;
|
||||
delete m;
|
||||
|
@ -898,7 +898,7 @@ void EFragment::replay(MDS *mds)
|
||||
|
||||
list<CDir*> resultfrags;
|
||||
list<Context*> waiters;
|
||||
mds->mdcache->_refragment_dir(in, basefrag, bits, resultfrags, waiters);
|
||||
mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters);
|
||||
|
||||
metablob.replay(mds);
|
||||
}
|
||||
|
@ -379,7 +379,8 @@ int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fr
|
||||
// queue
|
||||
if (directory.count(inst.addr) &&
|
||||
shutdown_set.count(inst.addr) == 0) {
|
||||
dout(1) << "--> " << get_myname() << " -> " << inst.name << " --- " << *m << endl;
|
||||
dout(1) << "--> " << get_myname() << " -> " << inst.name << " --- " << *m << " -- " << m
|
||||
<< endl;
|
||||
directory[inst.addr]->queue_incoming(m);
|
||||
} else {
|
||||
dout(0) << "--> " << get_myname() << " -> " << inst.name << " " << *m << " -- " << m
|
||||
|
Loading…
Reference in New Issue
Block a user