Merge pull request #13227 from ukernel/wip-multimds-misc

mds: misc multimds fixes

Reviewed-by: John Spray <john.spray@redhat.com>
This commit is contained in:
John Spray 2017-02-26 11:16:40 +00:00 committed by GitHub
commit 477ddea8a8
16 changed files with 397 additions and 219 deletions

View File

@ -315,6 +315,7 @@ public:
_mark_dirty(ls);
if (!replica_map.empty())
get(PIN_REPLICATED);
replica_nonce = 0;
}
// -- locking --

View File

@ -2363,7 +2363,8 @@ void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
unsigned s;
::decode(s, blp);
state &= MASK_STATE_IMPORT_KEPT;
state |= (s & MASK_STATE_EXPORTED);
state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
if (is_dirty()) {
get(PIN_DIRTY);
_mark_dirty(ls);

View File

@ -1421,6 +1421,7 @@ void CInode::encode_lock_state(int type, bufferlist& bl)
case CEPH_LOCK_IFILE:
if (is_auth()) {
::encode(inode.version, bl);
::encode(inode.ctime, bl);
::encode(inode.mtime, bl);
::encode(inode.atime, bl);
::encode(inode.time_warp_seq, bl);
@ -1504,11 +1505,13 @@ void CInode::encode_lock_state(int type, bufferlist& bl)
case CEPH_LOCK_IXATTR:
::encode(inode.version, bl);
::encode(inode.ctime, bl);
::encode(xattrs, bl);
break;
case CEPH_LOCK_ISNAP:
::encode(inode.version, bl);
::encode(inode.ctime, bl);
encode_snap(bl);
break;
@ -1520,6 +1523,7 @@ void CInode::encode_lock_state(int type, bufferlist& bl)
case CEPH_LOCK_IPOLICY:
if (inode.is_dir()) {
::encode(inode.version, bl);
::encode(inode.ctime, bl);
::encode(inode.layout, bl, mdcache->mds->mdsmap->get_up_features());
::encode(inode.quota, bl);
}
@ -1617,6 +1621,8 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
case CEPH_LOCK_IFILE:
if (!is_auth()) {
::decode(inode.version, p);
::decode(tm, p);
if (inode.ctime < tm) inode.ctime = tm;
::decode(inode.mtime, p);
::decode(inode.atime, p);
::decode(inode.time_warp_seq, p);
@ -1751,12 +1757,16 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
case CEPH_LOCK_IXATTR:
::decode(inode.version, p);
::decode(tm, p);
if (inode.ctime < tm) inode.ctime = tm;
::decode(xattrs, p);
break;
case CEPH_LOCK_ISNAP:
{
::decode(inode.version, p);
::decode(tm, p);
if (inode.ctime < tm) inode.ctime = tm;
snapid_t seq = 0;
if (snaprealm)
seq = snaprealm->srnode.seq;
@ -1774,6 +1784,8 @@ void CInode::decode_lock_state(int type, bufferlist& bl)
case CEPH_LOCK_IPOLICY:
if (inode.is_dir()) {
::decode(inode.version, p);
::decode(tm, p);
if (inode.ctime < tm) inode.ctime = tm;
::decode(inode.layout, p);
::decode(inode.quota, p);
}
@ -3052,18 +3064,6 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
bool valid = true;
// do not issue caps if inode differs from readdir snaprealm
SnapRealm *realm = find_snaprealm();
bool no_caps = session->is_stale() ||
(realm && dir_realm && realm != dir_realm) ||
is_frozen() || state_test(CInode::STATE_EXPORTINGCAPS);
if (no_caps)
dout(20) << "encode_inodestat no caps"
<< (session->is_stale()?", session stale ":"")
<< ((realm && dir_realm && realm != dir_realm)?", snaprealm differs ":"")
<< (state_test(CInode::STATE_EXPORTINGCAPS)?", exporting caps":"")
<< (is_frozen()?", frozen inode":"") << dendl;
// pick a version!
inode_t *oi = &inode;
inode_t *pi = get_projected_inode();
@ -3103,6 +3103,23 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
}
}
SnapRealm *realm = find_snaprealm();
bool no_caps = !valid ||
session->is_stale() ||
(dir_realm && realm != dir_realm) ||
is_frozen() ||
state_test(CInode::STATE_EXPORTINGCAPS);
if (no_caps)
dout(20) << "encode_inodestat no caps"
<< (!valid?", !valid":"")
<< (session->is_stale()?", session stale ":"")
<< ((dir_realm && realm != dir_realm)?", snaprealm differs ":"")
<< (is_frozen()?", frozen inode":"")
<< (state_test(CInode::STATE_EXPORTINGCAPS)?", exporting caps":"")
<< dendl;
// "fake" a version that is old (stable) version, +1 if projected.
version_t version = (oi->version * 2) + is_projected();
@ -3224,7 +3241,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
ecap.mseq = 0;
ecap.realm = 0;
} else {
if (!no_caps && valid && !cap) {
if (!no_caps && !cap) {
// add a new cap
cap = add_client_cap(client, session, realm);
if (is_auth()) {
@ -3235,12 +3252,26 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
}
}
if (!no_caps && valid && cap) {
int issue = 0;
if (!no_caps && cap) {
int likes = get_caps_liked();
int allowed = get_caps_allowed_for_client(session, file_i);
int issue = (cap->wanted() | likes) & allowed;
issue = (cap->wanted() | likes) & allowed;
cap->issue_norevoke(issue);
issue = cap->pending();
dout(10) << "encode_inodestat issuing " << ccap_string(issue)
<< " seq " << cap->get_last_seq() << dendl;
} else if (cap && cap->is_new() && !dir_realm) {
// alway issue new caps to client, otherwise the caps get lost
assert(cap->is_stale());
issue = cap->pending() | CEPH_CAP_PIN;
cap->issue_norevoke(issue);
dout(10) << "encode_inodestat issuing " << ccap_string(issue)
<< " seq " << cap->get_last_seq()
<< "(stale|new caps)" << dendl;
}
if (issue) {
cap->set_last_issue();
cap->set_last_issue_stamp(ceph_clock_now());
cap->clear_new();
@ -3248,13 +3279,9 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
ecap.wanted = cap->wanted();
ecap.cap_id = cap->get_cap_id();
ecap.seq = cap->get_last_seq();
dout(10) << "encode_inodestat issuing " << ccap_string(issue)
<< " seq " << cap->get_last_seq() << dendl;
ecap.mseq = cap->get_mseq();
ecap.realm = realm->inode->ino();
} else {
if (cap)
cap->clear_new();
ecap.cap_id = 0;
ecap.caps = 0;
ecap.seq = 0;
@ -3595,7 +3622,8 @@ void CInode::decode_import(bufferlist::iterator& p,
unsigned s;
::decode(s, p);
state |= (s & MASK_STATE_EXPORTED);
state_set(STATE_AUTH | (s & MASK_STATE_EXPORTED));
if (is_dirty()) {
get(PIN_DIRTY);
_mark_dirty(ls);
@ -3610,6 +3638,7 @@ void CInode::decode_import(bufferlist::iterator& p,
::decode(replica_map, p);
if (!replica_map.empty())
get(PIN_REPLICATED);
replica_nonce = 0;
// decode fragstat info on bounding cdirs
bufferlist bounding;

View File

@ -107,6 +107,7 @@ public:
const static unsigned STATE_STALE = (1<<0);
const static unsigned STATE_NEW = (1<<1);
const static unsigned STATE_IMPORTING = (1<<2);
Capability(CInode *i = NULL, uint64_t id = 0, client_t c = 0) :
@ -253,6 +254,9 @@ public:
bool is_new() { return state & STATE_NEW; }
void mark_new() { state |= STATE_NEW; }
void clear_new() { state &= ~STATE_NEW; }
bool is_importing() { return state & STATE_IMPORTING; }
void mark_importing() { state |= STATE_IMPORTING; }
void clear_importing() { state &= ~STATE_IMPORTING; }
CInode *get_inode() { return inode; }
client_t get_client() const { return client; }

View File

@ -330,8 +330,16 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
dout(10) << " must authpin " << *object << dendl;
if (mdr->is_auth_pinned(object))
continue;
if (mdr->is_auth_pinned(object)) {
if (object != (MDSCacheObject*)auth_pin_freeze)
continue;
if (mdr->more()->is_remote_frozen_authpin) {
if (mdr->more()->rename_inode == auth_pin_freeze)
continue;
// unfreeze auth pin for the wrong inode
mustpin_remote[mdr->more()->rename_inode->authority().first].size();
}
}
if (!object->is_auth()) {
if (!mdr->locks.empty())
@ -531,7 +539,8 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
goto out;
}
if (!wrlock_start(*p, mdr))
// nowait if we have already gotten remote wrlock
if (!wrlock_start(*p, mdr, need_remote_wrlock))
goto out;
dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
}

View File

@ -2512,12 +2512,12 @@ ESubtreeMap *MDCache::create_subtree_map()
ESubtreeMap *le = new ESubtreeMap();
mds->mdlog->_start_entry(le);
CDir *mydir = 0;
if (myin) {
mydir = myin->get_dirfrag(frag_t());
}
map<dirfrag_t, CDir*> dirs_to_add;
list<CDir*> maybe;
if (myin) {
CDir* mydir = myin->get_dirfrag(frag_t());
dirs_to_add[mydir->dirfrag()] = mydir;
}
// include all auth subtrees, and their bounds.
// and a spanning tree to tie it to the root.
@ -2544,21 +2544,9 @@ ESubtreeMap *MDCache::create_subtree_map()
dout(15) << " subtree " << *dir << dendl;
}
dirs_to_add[dir->dirfrag()] = dir;
le->subtrees[dir->dirfrag()].clear();
if (dir->get_dir_auth().second != CDIR_AUTH_UNKNOWN &&
le->ambiguous_subtrees.count(dir->dirfrag()) == 0 &&
p->second.empty()) {
dout(10) << " maybe journal " << *dir << dendl;
maybe.push_back(dir);
continue;
}
le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
le->metablob.add_dir(dir, false);
if (mydir == dir)
mydir = NULL;
// bounds
for (set<CDir*>::iterator q = p->second.begin();
@ -2566,9 +2554,8 @@ ESubtreeMap *MDCache::create_subtree_map()
++q) {
CDir *bound = *q;
dout(15) << " subtree bound " << *bound << dendl;
dirs_to_add[bound->dirfrag()] = bound;
le->subtrees[dir->dirfrag()].push_back(bound->dirfrag());
le->metablob.add_dir_context(bound, EMetaBlob::TO_ROOT);
le->metablob.add_dir(bound, false);
}
}
@ -2598,11 +2585,10 @@ ESubtreeMap *MDCache::create_subtree_map()
continue;
}
bool journal_dir = false;
if (dir->is_subtree_root()) {
if (le->subtrees.count(newparent->dirfrag()) &&
oldparent->get_dir_auth() != newparent->get_dir_auth())
journal_dir = true;
dirs_to_add[dir->dirfrag()] = dir;
// children are fine. change parent.
_move_subtree_map_bound(dir->dirfrag(), oldparent->dirfrag(), newparent->dirfrag(),
le->subtrees);
@ -2613,13 +2599,13 @@ ESubtreeMap *MDCache::create_subtree_map()
dout(10) << " creating subtree for " << dir->dirfrag() << dendl;
// if oldparent is auth, subtree is mine; include it.
if (le->subtrees.count(oldparent->dirfrag())) {
dirs_to_add[dir->dirfrag()] = dir;
le->subtrees[dir->dirfrag()].clear();
journal_dir = true;
}
// if newparent is auth, subtree is a new bound
if (le->subtrees.count(newparent->dirfrag())) {
dirs_to_add[dir->dirfrag()] = dir;
le->subtrees[newparent->dirfrag()].push_back(dir->dirfrag()); // newparent is auth; new bound
journal_dir = true;
}
newparent = dir;
}
@ -2634,10 +2620,6 @@ ESubtreeMap *MDCache::create_subtree_map()
le->subtrees);
}
}
if (journal_dir) {
le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
le->metablob.add_dir(dir, false);
}
}
}
}
@ -2658,6 +2640,7 @@ ESubtreeMap *MDCache::create_subtree_map()
dout(10) << "simplify: " << p->first << " swallowing " << b << " with bounds " << bb << dendl;
for (vector<dirfrag_t>::iterator r = bb.begin(); r != bb.end(); ++r)
p->second.push_back(*r);
dirs_to_add.erase(b);
le->subtrees.erase(b);
p->second.erase(p->second.begin() + i);
} else {
@ -2666,26 +2649,15 @@ ESubtreeMap *MDCache::create_subtree_map()
}
}
for (list<CDir*>::iterator p = maybe.begin(); p != maybe.end(); ++p) {
CDir *dir = *p;
if (le->subtrees.count(dir->dirfrag())) {
// not swallowed by above code
le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
le->metablob.add_dir(dir, false);
} else {
dout(10) << "simplify: not journal " << *dir << dendl;
}
for (auto p : dirs_to_add) {
CDir *dir = p.second;
le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
le->metablob.add_dir(dir, false);
}
dout(15) << " subtrees " << le->subtrees << dendl;
dout(15) << " ambiguous_subtrees " << le->ambiguous_subtrees << dendl;
if (mydir) {
// include my dir
le->metablob.add_dir_context(mydir, EMetaBlob::TO_ROOT);
le->metablob.add_dir(mydir, false);
}
//le->metablob.print(cout);
le->expire_pos = mds->mdlog->journaler->get_expire_pos();
return le;
@ -2748,7 +2720,7 @@ void MDCache::send_slave_resolves()
q != p->second.end();
++q) {
dout(10) << " including uncommitted " << q->first << dendl;
resolves[p->first]->add_slave_request(q->first);
resolves[p->first]->add_slave_request(q->first, false);
}
}
} else {
@ -2758,14 +2730,18 @@ void MDCache::send_slave_resolves()
p != active_requests.end();
++p) {
MDRequestRef& mdr = p->second;
if (!mdr->is_slave() || !mdr->slave_did_prepare())
if (!mdr->is_slave())
continue;
if (!mdr->slave_did_prepare() && !mdr->committing) {
continue;
}
mds_rank_t master = mdr->slave_to_mds;
if (resolve_set.count(master) || is_ambiguous_slave_update(p->first, master)) {
dout(10) << " including uncommitted " << *mdr << dendl;
if (!resolves.count(master))
resolves[master] = new MMDSResolve;
if (mdr->has_more() && mdr->more()->is_inode_exporter) {
if (!mdr->committing &&
mdr->has_more() && mdr->more()->is_inode_exporter) {
// re-send cap exports
CInode *in = mdr->more()->rename_inode;
map<client_t, Capability::Export> cap_map;
@ -2775,7 +2751,7 @@ void MDCache::send_slave_resolves()
::encode(cap_map, bl);
resolves[master]->add_slave_request(p->first, bl);
} else {
resolves[master]->add_slave_request(p->first);
resolves[master]->add_slave_request(p->first, mdr->committing);
}
}
}
@ -2914,6 +2890,9 @@ void MDCache::handle_mds_failure(mds_rank_t who)
if (mdr->slave_to_mds == who) {
if (mdr->slave_did_prepare()) {
dout(10) << " slave request " << *mdr << " uncommitted, will resolve shortly" << dendl;
if (is_ambiguous_slave_update(p->first, mdr->slave_to_mds))
remove_ambiguous_slave_update(p->first, mdr->slave_to_mds);
if (!mdr->more()->waiting_on_slave.empty()) {
assert(mdr->more()->srcdn_auth_mds == mds->get_nodeid());
// will rollback, no need to wait
@ -2923,9 +2902,9 @@ void MDCache::handle_mds_failure(mds_rank_t who)
}
mdr->more()->waiting_on_slave.clear();
}
} else {
} else if (!mdr->committing) {
dout(10) << " slave request " << *mdr << " has no prepare, finishing up" << dendl;
if (mdr->slave_request)
if (mdr->slave_request || mdr->slave_rolling_back())
mdr->aborted = true;
else
finish.push_back(mdr);
@ -3091,7 +3070,6 @@ void MDCache::handle_mds_recovery(mds_rank_t who)
}
}
kick_discovers(who);
kick_open_ino_peers(who);
kick_find_ino_peers(who);
@ -3133,36 +3111,41 @@ void MDCache::handle_resolve(MMDSResolve *m)
// ambiguous slave requests?
if (!m->slave_requests.empty()) {
for (map<metareqid_t, bufferlist>::iterator p = m->slave_requests.begin();
p != m->slave_requests.end();
++p) {
if (uncommitted_masters.count(p->first) && !uncommitted_masters[p->first].safe)
pending_masters.insert(p->first);
}
if (mds->is_clientreplay() || mds->is_active() || mds->is_stopping()) {
for (auto p = m->slave_requests.begin(); p != m->slave_requests.end(); ++p) {
if (uncommitted_masters.count(p->first) && !uncommitted_masters[p->first].safe) {
assert(!p->second.committing);
pending_masters.insert(p->first);
}
}
if (!pending_masters.empty()) {
dout(10) << " still have pending updates, delay processing slave resolve" << dendl;
delayed_resolve[from] = m;
return;
if (!pending_masters.empty()) {
dout(10) << " still have pending updates, delay processing slave resolve" << dendl;
delayed_resolve[from] = m;
return;
}
}
MMDSResolveAck *ack = new MMDSResolveAck;
for (map<metareqid_t, bufferlist>::iterator p = m->slave_requests.begin();
p != m->slave_requests.end();
++p) {
for (auto p = m->slave_requests.begin(); p != m->slave_requests.end(); ++p) {
if (uncommitted_masters.count(p->first)) { //mds->sessionmap.have_completed_request(p->first)) {
// COMMIT
dout(10) << " ambiguous slave request " << *p << " will COMMIT" << dendl;
ack->add_commit(p->first);
if (p->second.committing) {
// already committing, waiting for the OP_COMMITTED slave reply
dout(10) << " already committing slave request " << *p << " noop "<< dendl;
} else {
dout(10) << " ambiguous slave request " << *p << " will COMMIT" << dendl;
ack->add_commit(p->first);
}
uncommitted_masters[p->first].slaves.insert(from); // wait for slave OP_COMMITTED before we log ECommitted
if (p->second.length() > 0) {
if (p->second.inode_caps.length() > 0) {
// slave wants to export caps (rename)
assert(mds->is_resolve());
inodeno_t ino;
map<client_t,Capability::Export> cap_exports;
bufferlist::iterator q = p->second.begin();
bufferlist::iterator q = p->second.inode_caps.begin();
::decode(ino, q);
::decode(cap_exports, q);
@ -3187,6 +3170,7 @@ void MDCache::handle_resolve(MMDSResolve *m)
} else {
// ABORT
dout(10) << " ambiguous slave request " << *p << " will ABORT" << dendl;
assert(!p->second.committing);
ack->add_abort(p->first);
}
}
@ -3315,7 +3299,7 @@ void MDCache::maybe_resolve_finish()
}
dout(10) << "maybe_resolve_finish got all resolves+resolve_acks, done." << dendl;
disambiguate_imports();
disambiguate_my_imports();
finish_committed_masters();
if (resolve_done) {
@ -3503,11 +3487,11 @@ void MDCache::finish_rollback(metareqid_t reqid) {
}
}
void MDCache::disambiguate_imports()
void MDCache::disambiguate_other_imports()
{
dout(10) << "disambiguate_imports" << dendl;
dout(10) << "disambiguate_other_imports" << dendl;
bool is_resolve = mds->is_resolve();
bool recovering = !(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
// other nodes' ambiguous imports
for (map<mds_rank_t, map<dirfrag_t, vector<dirfrag_t> > >::iterator p = other_ambiguous_imports.begin();
p != other_ambiguous_imports.end();
@ -3520,7 +3504,7 @@ void MDCache::disambiguate_imports()
++q) {
dout(10) << " ambiguous import " << q->first << " bounds " << q->second << dendl;
// an ambiguous import will not race with a refragmentation; it's appropriate to force here.
CDir *dir = get_force_dirfrag(q->first, is_resolve);
CDir *dir = get_force_dirfrag(q->first, recovering);
if (!dir) continue;
if (dir->is_ambiguous_auth() || // works for me_ambig or if i am a surviving bystander
@ -3534,6 +3518,18 @@ void MDCache::disambiguate_imports()
}
}
other_ambiguous_imports.clear();
}
void MDCache::disambiguate_my_imports()
{
dout(10) << "disambiguate_my_imports" << dendl;
if (!mds->is_resolve()) {
assert(my_ambiguous_imports.empty());
return;
}
disambiguate_other_imports();
// my ambiguous imports
mds_authority_t me_ambig(mds->get_nodeid(), mds->get_nodeid());
@ -3565,18 +3561,15 @@ void MDCache::disambiguate_imports()
assert(my_ambiguous_imports.empty());
mds->mdlog->flush();
if (is_resolve) {
// verify all my subtrees are unambiguous!
for (map<CDir*,set<CDir*> >::iterator p = subtrees.begin();
p != subtrees.end();
++p) {
CDir *dir = p->first;
if (dir->is_ambiguous_dir_auth()) {
dout(0) << "disambiguate_imports uh oh, dir_auth is still ambiguous for " << *dir << dendl;
show_subtrees();
}
assert(!dir->is_ambiguous_dir_auth());
// verify all my subtrees are unambiguous!
for (map<CDir*,set<CDir*> >::iterator p = subtrees.begin();
p != subtrees.end();
++p) {
CDir *dir = p->first;
if (dir->is_ambiguous_dir_auth()) {
dout(0) << "disambiguate_imports uh oh, dir_auth is still ambiguous for " << *dir << dendl;
}
assert(!dir->is_ambiguous_dir_auth());
}
show_subtrees();
@ -3940,6 +3933,13 @@ void MDCache::rejoin_send_rejoins()
return;
}
assert(!migrator->is_importing());
assert(!migrator->is_exporting());
if (!mds->is_rejoin()) {
disambiguate_other_imports();
}
map<mds_rank_t, MMDSCacheRejoin*> rejoins;
@ -3976,8 +3976,6 @@ void MDCache::rejoin_send_rejoins()
}
}
assert(!migrator->is_importing());
assert(!migrator->is_exporting());
// check all subtrees
for (map<CDir*, set<CDir*> >::iterator p = subtrees.begin();
@ -4902,6 +4900,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
diri = new CInode(this, false);
diri->inode.ino = p->first.ino;
diri->inode.mode = S_IFDIR;
diri->inode.dir_layout.dl_dir_hash = g_conf->mds_default_dir_hash;
add_inode(diri);
if (MDS_INO_MDSDIR(from) == p->first.ino) {
diri->inode_auth = mds_authority_t(from, CDIR_AUTH_UNKNOWN);
@ -4973,6 +4972,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
in = new CInode(this, false, q->second.first, q->first.snapid);
in->inode.ino = q->second.ino;
in->inode.mode = S_IFDIR;
in->inode.dir_layout.dl_dir_hash = g_conf->mds_default_dir_hash;
add_inode(in);
dout(10) << " add inode " << *in << dendl;
} else if (in->get_parent_dn()) {
@ -6566,11 +6566,20 @@ bool MDCache::trim_dentry(CDentry *dn, map<mds_rank_t, MCacheExpire*>& expiremap
assert(dn->is_auth());
}
// If replica dentry is not readable, it's likely we will receive
// MDentryLink/MDentryUnlink message soon (It's possible we first
// receive a MDentryUnlink message, then MDentryLink message)
// MDentryLink message only replicates an inode, so we should
// avoid trimming the inode's parent dentry. This is because that
// unconnected replicas are problematic for subtree migration.
if (!dn->is_auth() && !dn->lock.can_read(-1) &&
!dn->get_dir()->get_inode()->is_stray())
return true;
// adjust the dir state
// NOTE: we can safely remove a clean, null dentry without effecting
// directory completeness.
// (check this _before_ we unlink the inode, below!)
bool null_dentry = false;
bool clear_complete = false;
if (!(dnl->is_null() && dn->is_clean()))
clear_complete = true;
@ -6587,7 +6596,6 @@ bool MDCache::trim_dentry(CDentry *dn, map<mds_rank_t, MCacheExpire*>& expiremap
return true; // purging stray instead of trimming
} else {
assert(dnl->is_null());
null_dentry = true;
}
if (dn->is_auth()) {
@ -6596,16 +6604,6 @@ bool MDCache::trim_dentry(CDentry *dn, map<mds_rank_t, MCacheExpire*>& expiremap
}
} else {
// notify dentry authority.
// If null replica dentry is not readable, it's likely we will
// receive a MDentryLink message soon. MDentryLink message only
// replicates an inode, so we should avoid trimming the inode's
// parent dentry. This is because that unconnected replicas are
// problematic for subtree migration.
if (null_dentry && !dn->lock.can_read(-1) &&
!dn->get_dir()->get_inode()->is_stray())
return true;
mds_authority_t auth = dn->authority();
for (int p=0; p<2; p++) {
@ -8639,6 +8637,12 @@ void MDCache::do_open_ino_peer(inodeno_t ino, open_ino_info_t& info)
void MDCache::handle_open_ino(MMDSOpenIno *m, int err)
{
if (mds->get_state() < MDSMap::STATE_REJOIN &&
mds->get_want_state() != CEPH_MDS_STATE_REJOIN) {
m->put();
return;
}
dout(10) << "handle_open_ino " << *m << " err " << err << dendl;
inodeno_t ino = m->ino;
@ -8680,11 +8684,10 @@ void MDCache::handle_open_ino_reply(MMDSOpenInoReply *m)
inodeno_t ino = m->ino;
mds_rank_t from = mds_rank_t(m->get_source().num());
if (opening_inodes.count(ino)) {
open_ino_info_t& info = opening_inodes[ino];
if (info.checking == from)
info.checking = MDS_RANK_NONE;
auto it = opening_inodes.find(ino);
if (it != opening_inodes.end() && it->second.checking == from) {
open_ino_info_t& info = it->second;
info.checking = MDS_RANK_NONE;
info.checked.insert(from);
CInode *in = get_inode(ino);
@ -8840,6 +8843,11 @@ void MDCache::_do_find_ino_peer(find_ino_peer_info_t& fip)
void MDCache::handle_find_ino(MMDSFindIno *m)
{
if (mds->get_state() < MDSMap::STATE_REJOIN) {
m->put();
return;
}
dout(10) << "handle_find_ino " << *m << dendl;
MMDSFindInoReply *r = new MMDSFindInoReply(m->tid);
CInode *in = get_inode(m->ino);
@ -9015,8 +9023,15 @@ void MDCache::request_finish(MDRequestRef& mdr)
if (mdr->has_more() && mdr->more()->slave_commit) {
Context *fin = mdr->more()->slave_commit;
mdr->more()->slave_commit = 0;
int ret = mdr->aborted ? -1 : 0;
mdr->aborted = false;
int ret;
if (mdr->aborted) {
mdr->aborted = false;
ret = -1;
mdr->more()->slave_rolling_back = true;
} else {
ret = 0;
mdr->committing = true;
}
fin->complete(ret); // this must re-call request_finish.
return;
}
@ -9703,18 +9718,22 @@ void MDCache::kick_discovers(mds_rank_t who)
void MDCache::handle_discover(MDiscover *dis)
{
mds_rank_t whoami = mds->get_nodeid();
mds_rank_t from = mds_rank_t(dis->get_source_inst().name._num);
mds_rank_t from = mds_rank_t(dis->get_source().num());
assert(from != whoami);
if (mds->get_state() <= MDSMap::STATE_REJOIN) {
mds_rank_t from = mds_rank_t(dis->get_source().num());
if (mds->get_state() < MDSMap::STATE_REJOIN &&
mds->get_want_state() != CEPH_MDS_STATE_REJOIN) {
dis->put();
return;
}
// proceed if requester is in the REJOIN stage, the request is from parallel_fetch().
// delay processing request from survivor because we may not yet choose lock states.
if (mds->get_state() < MDSMap::STATE_REJOIN ||
!mds->mdsmap->is_rejoin(from)) {
if (!mds->mdsmap->is_rejoin(from)) {
dout(0) << "discover_reply not yet active(|still rejoining), delaying" << dendl;
mds->wait_for_active(new C_MDS_RetryMessage(mds, dis));
mds->wait_for_replay(new C_MDS_RetryMessage(mds, dis));
return;
}
}

View File

@ -381,8 +381,9 @@ public:
void wait_for_uncommitted_master(metareqid_t reqid, MDSInternalContextBase *c) {
uncommitted_masters[reqid].waiters.push_back(c);
}
bool have_uncommitted_master(metareqid_t reqid) {
return uncommitted_masters.count(reqid);
bool have_uncommitted_master(metareqid_t reqid, mds_rank_t from) {
auto p = uncommitted_masters.find(reqid);
return p != uncommitted_masters.end() && p->second.slaves.count(from) > 0;
}
void log_master_commit(metareqid_t reqid);
void logged_master_update(metareqid_t reqid);
@ -441,7 +442,8 @@ protected:
void process_delayed_resolve();
void discard_delayed_resolve(mds_rank_t who);
void maybe_resolve_finish();
void disambiguate_imports();
void disambiguate_my_imports();
void disambiguate_other_imports();
void trim_unlinked_inodes();
void add_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master, MDSlaveUpdate*);
void finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master);
@ -451,17 +453,19 @@ public:
void remove_inode_recursive(CInode *in);
bool is_ambiguous_slave_update(metareqid_t reqid, mds_rank_t master) {
return ambiguous_slave_updates.count(master) &&
ambiguous_slave_updates[master].count(reqid);
auto p = ambiguous_slave_updates.find(master);
return p != ambiguous_slave_updates.end() && p->second.count(reqid);
}
void add_ambiguous_slave_update(metareqid_t reqid, mds_rank_t master) {
ambiguous_slave_updates[master].insert(reqid);
}
void remove_ambiguous_slave_update(metareqid_t reqid, mds_rank_t master) {
assert(ambiguous_slave_updates[master].count(reqid));
ambiguous_slave_updates[master].erase(reqid);
if (ambiguous_slave_updates[master].empty())
ambiguous_slave_updates.erase(master);
auto p = ambiguous_slave_updates.find(master);
auto q = p->second.find(reqid);
assert(q != p->second.end());
p->second.erase(q);
if (p->second.empty())
ambiguous_slave_updates.erase(p);
}
void add_rollback(metareqid_t reqid, mds_rank_t master) {

View File

@ -295,13 +295,15 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
// NOTE: state order reversal, warning comes after prepping
case EXPORT_WARNING:
dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << dendl;
it->second.state = EXPORT_CANCELLING;
// fall-thru
case EXPORT_PREPPING:
if (state != EXPORT_WARNING)
if (state != EXPORT_WARNING) {
dout(10) << "export state=prepping : unpinning bounds, unfreezing" << dendl;
it->second.state = EXPORT_CANCELLED;
}
it->second.state = EXPORT_CANCELLED;
{
// unpin bounds
set<CDir*> bounds;
@ -329,7 +331,7 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
case EXPORT_EXPORTING:
dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
it->second.state = EXPORT_CANCELLED;
it->second.state = EXPORT_CANCELLING;
export_reverse(dir);
break;
@ -338,23 +340,25 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
dout(10) << "export state=loggingfinish|notifying : ignoring dest failure, we were successful." << dendl;
// leave export_state, don't clean up now.
break;
case EXPORT_CANCELLING:
break;
default:
ceph_abort();
}
// finish clean-up?
if (it->second.state == EXPORT_CANCELLED) {
// wake up any waiters
mds->queue_waiters(it->second.waiting_for_finish);
if (it->second.state == EXPORT_CANCELLING ||
it->second.state == EXPORT_CANCELLED) {
MutationRef mut;
mut.swap(it->second.mut);
MutationRef mut = it->second.mut;
export_state.erase(it);
dir->state_clear(CDir::STATE_EXPORTING);
// send pending import_maps? (these need to go out when all exports have finished.)
cache->maybe_send_pending_resolves();
if (it->second.state == EXPORT_CANCELLED) {
export_state.erase(it);
dir->state_clear(CDir::STATE_EXPORTING);
// send pending import_maps?
cache->maybe_send_pending_resolves();
}
// drop locks
if (state == EXPORT_LOCKING || state == EXPORT_DISCOVERING) {
@ -373,6 +377,17 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
}
}
void Migrator::export_cancel_finish(CDir *dir)
{
assert(dir->state_test(CDir::STATE_EXPORTING));
dir->state_clear(CDir::STATE_EXPORTING);
// pinned by Migrator::export_notify_abort()
dir->auth_unpin(this);
// send pending import_maps? (these need to go out when all exports have finished.)
cache->maybe_send_pending_resolves();
}
// ==========================================================
// mds failure handling
@ -409,7 +424,8 @@ void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
// - that are going to the failed node
// - that aren't frozen yet (to avoid auth_pin deadlock)
// - they havne't prepped yet (they may need to discover bounds to do that)
if (p->second.peer == who ||
if ((p->second.peer == who &&
p->second.state != EXPORT_CANCELLING) ||
p->second.state == EXPORT_LOCKING ||
p->second.state == EXPORT_DISCOVERING ||
p->second.state == EXPORT_FREEZING ||
@ -418,11 +434,11 @@ void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
dout(10) << "cleaning up export state (" << p->second.state << ")"
<< get_export_statename(p->second.state) << " of " << *dir << dendl;
export_try_cancel(dir);
} else {
} else if (p->second.peer != who) {
// bystander failed.
if (p->second.warning_ack_waiting.erase(who)) {
p->second.notify_ack_waiting.erase(who); // they won't get a notify either.
if (p->second.state == EXPORT_WARNING) {
p->second.notify_ack_waiting.erase(who); // they won't get a notify either.
// exporter waiting for warning acks, let's fake theirs.
dout(10) << "faking export_warning_ack from mds." << who
<< " on " << *dir << " to mds." << p->second.peer
@ -432,13 +448,18 @@ void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
}
}
if (p->second.notify_ack_waiting.erase(who)) {
// exporter is waiting for notify acks, fake it
dout(10) << "faking export_notify_ack from mds." << who
<< " on " << *dir << " to mds." << p->second.peer
<< dendl;
if (p->second.state == EXPORT_NOTIFYING) {
// exporter is waiting for notify acks, fake it
dout(10) << "faking export_notify_ack from mds." << who
<< " on " << *dir << " to mds." << p->second.peer
<< dendl;
if (p->second.notify_ack_waiting.empty())
export_finish(dir);
} else if (p->second.state == EXPORT_CANCELLING) {
if (p->second.notify_ack_waiting.empty()) {
export_state.erase(p);
export_cancel_finish(dir);
}
}
}
}
@ -631,7 +652,9 @@ void Migrator::audit()
CDir *dir = p->first;
if (p->second.state == EXPORT_LOCKING ||
p->second.state == EXPORT_DISCOVERING ||
p->second.state == EXPORT_FREEZING) continue;
p->second.state == EXPORT_FREEZING ||
p->second.state == EXPORT_CANCELLING)
continue;
assert(dir->is_ambiguous_dir_auth());
assert(dir->authority().first == mds->get_nodeid() ||
dir->authority().second == mds->get_nodeid());
@ -902,7 +925,9 @@ void Migrator::export_sessions_flushed(CDir *dir, uint64_t tid)
dout(7) << "export_sessions_flushed " << *dir << dendl;
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
if (it == export_state.end() || it->second.tid != tid) {
if (it == export_state.end() ||
it->second.state == EXPORT_CANCELLING ||
it->second.tid != tid) {
// export must have aborted.
dout(7) << "export must have aborted on " << dir << dendl;
return;
@ -944,7 +969,6 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
// .. unwind ..
dir->unfreeze_tree();
dir->state_clear(CDir::STATE_EXPORTING);
mds->queue_waiters(it->second.waiting_for_finish);
mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
@ -1210,6 +1234,7 @@ void Migrator::export_go_synced(CDir *dir, uint64_t tid)
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
if (it == export_state.end() ||
it->second.state == EXPORT_CANCELLING ||
it->second.tid != tid) {
// export must have aborted.
dout(7) << "export must have aborted on " << dir << dendl;
@ -1610,11 +1635,19 @@ void Migrator::export_notify_abort(CDir *dir, set<CDir*>& bounds)
dout(7) << "export_notify_abort " << *dir << dendl;
export_state_t& stat = export_state[dir];
assert(stat.state == EXPORT_CANCELLING);
if (stat.notify_ack_waiting.empty()) {
stat.state = EXPORT_CANCELLED;
return;
}
dir->auth_pin(this);
for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
p != stat.notify_ack_waiting.end();
++p) {
MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(),stat.tid,false,
MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(),stat.tid, true,
pair<int,int>(mds->get_nodeid(),stat.peer),
pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
@ -1765,22 +1798,29 @@ void Migrator::handle_export_notify_ack(MExportDirNotifyAck *m)
auto export_state_entry = export_state.find(dir);
if (export_state_entry != export_state.end()) {
export_state_t& stat = export_state_entry->second;
if (stat.state == EXPORT_WARNING) {
if (stat.state == EXPORT_WARNING &&
stat.warning_ack_waiting.erase(from)) {
// exporting. process warning.
dout(7) << "handle_export_notify_ack from " << m->get_source()
<< ": exporting, processing warning on " << *dir << dendl;
stat.warning_ack_waiting.erase(from);
if (stat.warning_ack_waiting.empty())
export_go(dir); // start export.
} else if (stat.state == EXPORT_NOTIFYING) {
} else if (stat.state == EXPORT_NOTIFYING &&
stat.notify_ack_waiting.erase(from)) {
// exporting. process notify.
dout(7) << "handle_export_notify_ack from " << m->get_source()
<< ": exporting, processing notify on " << *dir << dendl;
stat.notify_ack_waiting.erase(from);
if (stat.notify_ack_waiting.empty())
export_finish(dir);
} else if (stat.state == EXPORT_CANCELLING &&
m->get_new_auth().second == CDIR_AUTH_UNKNOWN && // not warning ack
stat.notify_ack_waiting.erase(from)) {
dout(7) << "handle_export_notify_ack from " << m->get_source()
<< ": cancelling export, processing notify on " << *dir << dendl;
if (stat.notify_ack_waiting.empty()) {
export_state.erase(export_state_entry);
export_cancel_finish(dir);
}
}
}
else {
@ -1860,9 +1900,6 @@ void Migrator::export_finish(CDir *dir)
// discard delayed expires
cache->discard_delayed_expire(dir);
// queue finishers
mds->queue_waiters(it->second.waiting_for_finish);
MutationRef mut = it->second.mut;
// remove from exporting list, clean up state
export_state.erase(it);
@ -2410,6 +2447,7 @@ void Migrator::import_reverse(CDir *dir)
cur->state_clear(CDir::STATE_AUTH);
cur->remove_bloom();
cur->clear_replica_map();
cur->set_replica_nonce(CDir::EXPORT_NONCE);
if (cur->is_dirty())
cur->mark_clean();
@ -2420,6 +2458,7 @@ void Migrator::import_reverse(CDir *dir)
// dentry
dn->state_clear(CDentry::STATE_AUTH);
dn->clear_replica_map();
dn->set_replica_nonce(CDentry::EXPORT_NONCE);
if (dn->is_dirty())
dn->mark_clean();
@ -2428,6 +2467,7 @@ void Migrator::import_reverse(CDir *dir)
CInode *in = dn->get_linkage()->get_inode();
in->state_clear(CDentry::STATE_AUTH);
in->clear_replica_map();
in->set_replica_nonce(CInode::EXPORT_NONCE);
if (in->is_dirty())
in->mark_clean();
in->clear_dirty_rstat();
@ -2471,7 +2511,7 @@ void Migrator::import_reverse(CDir *dir)
++q) {
Capability *cap = in->get_client_cap(q->first);
assert(cap);
if (cap->is_new())
if (cap->is_importing())
in->remove_client_cap(q->first);
}
in->put(CInode::PIN_IMPORTINGCAPS);
@ -2665,6 +2705,7 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last)
Capability *cap = in->get_client_cap(q->first);
assert(cap);
cap->merge(q->second, true);
cap->clear_importing();
mds->mdcache->do_cap_import(session, in, cap, q->second.cap_id, q->second.seq,
q->second.mseq - 1, it->second.peer, CEPH_CAP_FLAG_AUTH);
}
@ -2764,8 +2805,6 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp,
if (!in) {
in = new CInode(mds->mdcache, true, 1, last);
added = true;
} else {
in->state_set(CInode::STATE_AUTH);
}
// state after link -- or not! -sage
@ -2841,7 +2880,7 @@ void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_c
if (!cap) {
cap = in->add_client_cap(it->first, session);
if (peer < 0)
cap->mark_new();
cap->mark_importing();
}
Capability::Import& im = import_map[it->first];
@ -2885,10 +2924,6 @@ int Migrator::decode_import_dir(bufferlist::iterator& blp,
// assimilate state
dir->decode_import(blp, now, ls);
// mark (may already be marked from get_or_open_dir() above)
if (!dir->is_auth())
dir->state_set(CDir::STATE_AUTH);
// adjust replica list
//assert(!dir->is_replica(oldauth)); // not true on failed export
dir->add_replica(oldauth, CDir::EXPORT_NONCE);
@ -3027,7 +3062,7 @@ void Migrator::handle_export_notify(MExportDirNotify *m)
// send ack
if (m->wants_ack()) {
mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag(), m->get_tid()), from);
mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag(), m->get_tid(), m->get_new_auth()), from);
} else {
// aborted. no ack.
dout(7) << "handle_export_notify no ack requested" << dendl;

View File

@ -58,16 +58,18 @@ private:
public:
// export stages. used to clean up intelligently if there's a failure.
const static int EXPORT_CANCELLED = 0; // cancelled
const static int EXPORT_LOCKING = 1; // acquiring locks
const static int EXPORT_DISCOVERING = 2; // dest is disovering export dir
const static int EXPORT_FREEZING = 3; // we're freezing the dir tree
const static int EXPORT_PREPPING = 4; // sending dest spanning tree to export bounds
const static int EXPORT_WARNING = 5; // warning bystanders of dir_auth_pending
const static int EXPORT_EXPORTING = 6; // sent actual export, waiting for ack
const static int EXPORT_LOGGINGFINISH = 7; // logging EExportFinish
const static int EXPORT_NOTIFYING = 8; // waiting for notifyacks
const static int EXPORT_CANCELLING = 1; // waiting for cancel notifyacks
const static int EXPORT_LOCKING = 2; // acquiring locks
const static int EXPORT_DISCOVERING = 3; // dest is disovering export dir
const static int EXPORT_FREEZING = 4; // we're freezing the dir tree
const static int EXPORT_PREPPING = 5; // sending dest spanning tree to export bounds
const static int EXPORT_WARNING = 6; // warning bystanders of dir_auth_pending
const static int EXPORT_EXPORTING = 7; // sent actual export, waiting for ack
const static int EXPORT_LOGGINGFINISH = 8; // logging EExportFinish
const static int EXPORT_NOTIFYING = 9; // waiting for notifyacks
static const char *get_export_statename(int s) {
switch (s) {
case EXPORT_CANCELLING: return "cancelling";
case EXPORT_LOCKING: return "locking";
case EXPORT_DISCOVERING: return "discovering";
case EXPORT_FREEZING: return "freezing";
@ -89,7 +91,6 @@ protected:
set<mds_rank_t> warning_ack_waiting;
set<mds_rank_t> notify_ack_waiting;
map<inodeno_t,map<client_t,Capability::Import> > peer_imported;
list<MDSInternalContextBase*> waiting_for_finish;
MutationRef mut;
// for freeze tree deadlock detection
utime_t last_cum_auth_pins_change;
@ -263,11 +264,6 @@ public:
map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
list<MDSInternalContextBase*>& finished, int *num_dentries);
void add_export_finish_waiter(CDir *dir, MDSInternalContextBase *c) {
map<CDir*, export_state_t>::iterator it = export_state.find(dir);
assert(it != export_state.end());
it->second.waiting_for_finish.push_back(c);
}
void clear_export_proxy_pins(CDir *dir);
void export_caps(CInode *in);
@ -280,6 +276,7 @@ public:
void export_go(CDir *dir);
void export_go_synced(CDir *dir, uint64_t tid);
void export_try_cancel(CDir *dir, bool notify_peer=true);
void export_cancel_finish(CDir *dir);
void export_reverse(CDir *dir);
void export_notify_abort(CDir *dir, set<CDir*>& bounds);
void handle_export_ack(MExportDirAck *m);

View File

@ -211,6 +211,11 @@ bool MDRequestImpl::slave_did_prepare()
return has_more() && more()->slave_commit;
}
bool MDRequestImpl::slave_rolling_back()
{
return has_more() && more()->slave_rolling_back;
}
bool MDRequestImpl::did_ino_allocation() const
{
return alloc_ino || used_prealloc_ino || prealloc_inos.size();

View File

@ -230,6 +230,7 @@ struct MDRequestImpl : public MutationImpl {
bool has_journaled_slaves;
bool slave_update_journaled;
bool slave_rolling_back;
// for rename
set<mds_rank_t> extra_witnesses; // replica list from srcdn auth (rename)
@ -270,6 +271,7 @@ struct MDRequestImpl : public MutationImpl {
More() :
slave_error(0),
has_journaled_slaves(false), slave_update_journaled(false),
slave_rolling_back(false),
srcdn_auth_mds(-1), inode_import_v(0), rename_inode(0),
is_freeze_authpin(false), is_ambiguous_auth(false),
is_remote_frozen_authpin(false), is_inode_exporter(false),
@ -317,6 +319,7 @@ struct MDRequestImpl : public MutationImpl {
bool has_more() const;
bool has_witnesses();
bool slave_did_prepare();
bool slave_rolling_back();
bool did_ino_allocation() const;
bool freeze_auth_pin(CInode *inode);
void unfreeze_auth_pin(bool clear_inode=false);

View File

@ -1737,8 +1737,9 @@ void Server::handle_slave_request_reply(MMDSSlaveRequest *m)
if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
metareqid_t r = m->get_reqid();
if (!mdcache->have_uncommitted_master(r)) {
dout(10) << "handle_slave_request_reply ignoring reply from unknown reqid " << r << dendl;
if (!mdcache->have_uncommitted_master(r, from)) {
dout(10) << "handle_slave_request_reply ignoring slave reply from mds."
<< from << " reqid " << r << dendl;
m->put();
return;
}
@ -2074,11 +2075,10 @@ void Server::handle_slave_auth_pin(MDRequestRef& mdr)
MDSCacheObjectInfo info;
(*p)->set_object_info(info);
reply->get_authpins().push_back(info);
if (*p == (MDSCacheObject*)auth_pin_freeze)
auth_pin_freeze->set_object_info(reply->get_authpin_freeze());
}
if (auth_pin_freeze)
auth_pin_freeze->set_object_info(reply->get_authpin_freeze());
if (wouldblock)
reply->mark_error_wouldblock();
if (readonly)
@ -2113,6 +2113,16 @@ void Server::handle_slave_auth_pin_ack(MDRequestRef& mdr, MMDSSlaveRequest *ack)
pinned.insert(object);
}
// removed frozen auth pin ?
if (mdr->more()->is_remote_frozen_authpin &&
ack->get_authpin_freeze() == MDSCacheObjectInfo()) {
auto p = mdr->remote_auth_pins.find(mdr->more()->rename_inode);
assert(p != mdr->remote_auth_pins.end());
if (p->second == from) {
mdr->more()->is_remote_frozen_authpin = false;
}
}
// removed auth pins?
map<MDSCacheObject*, mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
while (p != mdr->remote_auth_pins.end()) {
@ -5323,7 +5333,8 @@ void Server::_commit_slave_link(MDRequestRef& mdr, int r, CInode *targeti)
// write a commit to the journal
ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_commit", mdr->reqid, mdr->slave_to_mds,
ESlaveUpdate::OP_COMMIT, ESlaveUpdate::LINK);
mdlog->start_submit_entry(le, new C_MDS_CommittedSlave(this, mdr));
mdlog->start_entry(le);
submit_mdlog_entry(le, new C_MDS_CommittedSlave(this, mdr), mdr, __func__);
mdlog->flush();
} else {
do_link_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, mdr);
@ -6483,7 +6494,8 @@ void Server::handle_client_rename(MDRequestRef& mdr)
// are involved in the rename operation.
if (srcdnl->is_primary() && !mdr->more()->is_ambiguous_auth) {
dout(10) << " preparing ambiguous auth for srci" << dendl;
mdr->set_ambiguous_auth(srci);
assert(mdr->more()->is_remote_frozen_authpin);
assert(mdr->more()->rename_inode == srci);
_rename_prepare_witness(mdr, last, witnesses, srctrace, desttrace, straydn);
return;
}
@ -7804,12 +7816,22 @@ void Server::do_rename_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef
le->commit.add_primary_dentry(target->get_projected_parent_dn(), target, true);
}
if (force_journal_dest) {
dout(10) << " noting rename target ino " << target->ino() << " in metablob" << dendl;
le->commit.renamed_dirino = target->ino();
} else if (force_journal_src || (in && in->is_dir() && srcdn->authority().first == whoami)) {
if (in && in->is_dir() && (srcdn->authority().first == whoami || force_journal_src)) {
dout(10) << " noting renamed dir ino " << in->ino() << " in metablob" << dendl;
le->commit.renamed_dirino = in->ino();
if (srcdn->authority().first == whoami) {
list<CDir*> ls;
in->get_dirfrags(ls);
for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
CDir *dir = *p;
if (!dir->is_auth())
le->commit.renamed_dir_frags.push_back(dir->get_frag());
}
dout(10) << " noting renamed dir open frags " << le->commit.renamed_dir_frags << dendl;
}
} else if (force_journal_dest) {
dout(10) << " noting rename target ino " << target->ino() << " in metablob" << dendl;
le->commit.renamed_dirino = target->ino();
}
if (target && target->is_dir()) {
@ -7900,8 +7922,10 @@ void Server::_rename_rollback_finish(MutationRef& mut, MDRequestRef& mdr, CDentr
mdr->more()->is_ambiguous_auth = false;
}
mds->queue_waiters(finished);
if (finish_mdr)
if (finish_mdr || mdr->aborted)
mdcache->request_finish(mdr);
else
mdr->more()->slave_rolling_back = false;
}
mdcache->finish_rollback(mut->reqid);
@ -7919,6 +7943,11 @@ void Server::handle_slave_rename_prep_ack(MDRequestRef& mdr, MMDSSlaveRequest *a
// note slave
mdr->more()->slaves.insert(from);
if (mdr->more()->srcdn_auth_mds == from &&
mdr->more()->is_remote_frozen_authpin &&
!mdr->more()->is_ambiguous_auth) {
mdr->set_ambiguous_auth(mdr->more()->rename_inode);
}
// witnessed? or add extra witnesses?
assert(mdr->more()->witnessed.count(from) == 0);

View File

@ -318,6 +318,11 @@ void StrayManager::_purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *l
void StrayManager::enqueue(CDentry *dn, bool trunc)
{
if (aborted) {
dout(10) << __func__ << ": aborted, skip purging: " << *dn << dendl;
return;
}
CDentry::linkage_t *dnl = dn->get_projected_linkage();
assert(dnl);
CInode *in = dnl->get_inode();
@ -372,6 +377,9 @@ public:
void StrayManager::_advance()
{
if (aborted)
return;
std::map<CDir*, std::set<dentry_key_t> > to_fetch;
for (auto p = ready_for_purge.begin();
@ -850,7 +858,7 @@ void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
StrayManager::StrayManager(MDSRank *mds)
: delayed_eval_stray(member_offset(CDentry, item_stray)),
mds(mds), logger(NULL), started(false),
mds(mds), logger(NULL), started(false), aborted(false),
ops_in_flight(0), files_purging(0),
max_purge_ops(0),
num_strays(0), num_strays_purging(0), num_strays_delayed(0),
@ -886,6 +894,8 @@ void StrayManager::abort_queue()
trimmed_strays.clear();
fetching_strays.clear();
aborted = true;
}
void StrayManager::truncate(CDentry *dn, uint32_t op_allowance)
@ -992,6 +1002,9 @@ void StrayManager::notify_stray_loaded(CDentry *dn)
if (in->inode.nlink == 0)
in->state_set(CInode::STATE_ORPHAN);
if (aborted)
return;
auto p = trimmed_strays.find(dn->name);
if (p != trimmed_strays.end()) {
dn->state_set(CDentry::STATE_PURGING);
@ -1011,5 +1024,8 @@ void StrayManager::notify_stray_trimmed(CDentry *dn)
{
dout(10) << __func__ << ": " << *dn << dendl;
if (aborted)
return;
trimmed_strays.insert(dn->name);
}

View File

@ -56,6 +56,7 @@ class StrayManager
PerfCounters *logger;
bool started;
bool aborted;
// Throttled allowances
uint64_t ops_in_flight;

View File

@ -19,13 +19,15 @@
class MExportDirNotifyAck : public Message {
dirfrag_t dirfrag;
pair<__s32,__s32> new_auth;
public:
dirfrag_t get_dirfrag() { return dirfrag; }
pair<__s32,__s32> get_new_auth() { return new_auth; }
MExportDirNotifyAck() {}
MExportDirNotifyAck(dirfrag_t df, uint64_t tid) :
Message(MSG_MDS_EXPORTDIRNOTIFYACK), dirfrag(df) {
MExportDirNotifyAck(dirfrag_t df, uint64_t tid, pair<__s32,__s32> na) :
Message(MSG_MDS_EXPORTDIRNOTIFYACK), dirfrag(df), new_auth(na) {
set_tid(tid);
}
private:
@ -39,10 +41,12 @@ public:
void encode_payload(uint64_t features) {
::encode(dirfrag, payload);
::encode(new_auth, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(dirfrag, p);
::decode(new_auth, p);
}
};

View File

@ -20,10 +20,26 @@
#include "include/types.h"
class MMDSResolve : public Message {
public:
public:
map<dirfrag_t, vector<dirfrag_t> > subtrees;
map<dirfrag_t, vector<dirfrag_t> > ambiguous_imports;
map<metareqid_t, bufferlist> slave_requests;
struct slave_request {
bufferlist inode_caps;
bool committing;
slave_request() : committing(false) {}
void encode(bufferlist &bl) const {
::encode(inode_caps, bl);
::encode(committing, bl);
}
void decode(bufferlist::iterator &bl) {
::decode(inode_caps, bl);
::decode(committing, bl);
}
};
WRITE_CLASS_ENCODER(slave_request)
map<metareqid_t, slave_request> slave_requests;
MMDSResolve() : Message(MSG_MDS_RESOLVE) {}
private:
@ -49,12 +65,12 @@ public:
ambiguous_imports[im] = m;
}
void add_slave_request(metareqid_t reqid) {
slave_requests[reqid].clear();
void add_slave_request(metareqid_t reqid, bool committing) {
slave_requests[reqid].committing = committing;
}
void add_slave_request(metareqid_t reqid, bufferlist& bl) {
slave_requests[reqid].claim(bl);
slave_requests[reqid].inode_caps.claim(bl);
}
void encode_payload(uint64_t features) {
@ -70,4 +86,9 @@ public:
}
};
inline ostream& operator<<(ostream& out, const MMDSResolve::slave_request) {
return out;
}
WRITE_CLASS_ENCODER(MMDSResolve::slave_request)
#endif