Merge PR #18697 into master

* refs/pull/18697/head:
	mds: limit size of MExportDir message
	mds: optimize MDCache::try_subtree_merge
	mds: optimize import/export state access

Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
Patrick Donnelly 2017-11-22 13:40:21 -08:00
commit 23a86a404a
No known key found for this signature in database
GPG Key ID: 3A2A7E25BEA8AADB
4 changed files with 130 additions and 56 deletions

View File

@ -6209,6 +6209,10 @@ std::vector<Option> get_mds_options() {
.set_default(0)
.set_description(""),
Option("mds_max_export_size", Option::TYPE_UINT, Option::LEVEL_DEV)
.set_default(1_G)
.set_description(""),
Option("mds_kill_export_at", Option::TYPE_INT, Option::LEVEL_DEV)
.set_default(0)
.set_description(""),

View File

@ -881,8 +881,8 @@ void MDCache::adjust_subtree_auth(CDir *dir, mds_authority_t auth)
void MDCache::try_subtree_merge(CDir *dir)
{
dout(7) << "try_subtree_merge " << *dir << dendl;
assert(subtrees.count(dir));
set<CDir*> oldbounds = subtrees[dir];
// record my old bounds
auto oldbounds = subtrees.at(dir);
set<CInode*> to_eval;
// try merge at my root
@ -911,31 +911,32 @@ public:
void MDCache::try_subtree_merge_at(CDir *dir, set<CInode*> *to_eval)
{
dout(10) << "try_subtree_merge_at " << *dir << dendl;
assert(subtrees.count(dir));
if (dir->dir_auth.second != CDIR_AUTH_UNKNOWN ||
dir->state_test(CDir::STATE_EXPORTBOUND) ||
dir->state_test(CDir::STATE_AUXSUBTREE))
return;
auto it = subtrees.find(dir);
assert(it != subtrees.end());
// merge with parent?
CDir *parent = dir;
if (!dir->inode->is_base())
parent = get_subtree_root(dir->get_parent_dir());
if (parent != dir && // we have a parent,
parent->dir_auth == dir->dir_auth && // auth matches,
dir->dir_auth.second == CDIR_AUTH_UNKNOWN && // auth is unambiguous,
!dir->state_test(CDir::STATE_EXPORTBOUND) && // not an exportbound,
!dir->state_test(CDir::STATE_AUXSUBTREE)) { // not aux subtree
if (parent != dir && // we have a parent,
parent->dir_auth == dir->dir_auth) { // auth matches,
// merge with parent.
dout(10) << " subtree merge at " << *dir << dendl;
dir->set_dir_auth(CDIR_AUTH_DEFAULT);
// move our bounds under the parent
for (set<CDir*>::iterator p = subtrees[dir].begin();
p != subtrees[dir].end();
++p)
subtrees[parent].insert(*p);
subtrees[parent].insert(it->second.begin(), it->second.end());
// we are no longer a subtree or bound
dir->put(CDir::PIN_SUBTREE);
subtrees.erase(dir);
subtrees.erase(it);
subtrees[parent].erase(dir);
// adjust popularity?

View File

@ -321,13 +321,17 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
}
if (state == EXPORT_WARNING) {
// notify bystanders
export_notify_abort(dir, bounds);
export_notify_abort(dir, it->second, bounds);
// process delayed expires
cache->process_delayed_expire(dir);
}
}
dir->unfreeze_tree();
cache->try_subtree_merge(dir);
for (auto bd : it->second.residual_dirs) {
bd->unfreeze_tree();
cache->try_subtree_merge(bd);
}
if (notify_peer &&
(!mds->is_cluster_degraded() ||
mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
@ -337,7 +341,7 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
case EXPORT_EXPORTING:
dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
it->second.state = EXPORT_CANCELLING;
export_reverse(dir);
export_reverse(dir, it->second);
break;
case EXPORT_LOGGINGFINISH:
@ -506,7 +510,7 @@ void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
case IMPORT_PREPPING:
assert(dir);
dout(10) << "import state=prepping : unpinning base+bounds " << *dir << dendl;
import_reverse_prepping(dir);
import_reverse_prepping(dir, q->second);
break;
case IMPORT_PREPPED:
@ -521,7 +525,7 @@ void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
cache->adjust_subtree_auth(dir, q->second.peer);
// notify bystanders ; wait in aborting state
import_state[df].state = IMPORT_ABORTING;
q->second.state = IMPORT_ABORTING;
import_notify_abort(dir, bounds);
assert(g_conf->mds_kill_import_at != 10);
}
@ -1083,6 +1087,10 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
// CDir::_freeze_tree() should have forced it into subtree.
assert(dir->get_dir_auth() == mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
set<client_t> export_client_set;
check_export_size(dir, it->second, export_client_set);
// note the bounds.
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
@ -1120,8 +1128,7 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
CDir *bound = *p;
// pin it.
bound->get(CDir::PIN_EXPORTBOUND);
bound->state_set(CDir::STATE_EXPORTBOUND);
assert(bound->state_test(CDir::STATE_EXPORTBOUND));
dout(7) << " export bound " << *bound << dendl;
prep->add_bound( bound->dirfrag() );
@ -1129,8 +1136,14 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
// trace to bound
bufferlist tracebl;
CDir *cur = bound;
char start = '-';
if (it->second.residual_dirs.count(bound)) {
start = 'f';
cache->replicate_dir(bound, it->second.peer, tracebl);
dout(7) << " added " << *bound << dendl;
}
while (1) {
// don't repeat inodes
if (inodes_added.count(cur->inode->ino()))
@ -1182,9 +1195,6 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
// make sure any new instantiations of caps are flushed out
assert(it->second.warning_ack_waiting.empty());
set<client_t> export_client_set;
get_export_client_set(dir, export_client_set);
MDSGatherBuilder gather(g_ceph_context);
mds->server->flush_client_sessions(export_client_set, gather);
if (gather.has_subs()) {
@ -1194,35 +1204,79 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
}
}
void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
void Migrator::check_export_size(CDir *dir, export_state_t& stat, set<client_t>& client_set)
{
const unsigned frag_size = 800;
const unsigned inode_size = 1000;
const unsigned cap_size = 80;
const unsigned link_size = 10;
const unsigned null_size = 1;
uint64_t max_size = g_conf->get_val<uint64_t>("mds_max_export_size");
uint64_t approx_size = 0;
list<CDir*> dfs;
dfs.push_back(dir);
while (!dfs.empty()) {
CDir *dir = dfs.front();
dfs.pop_front();
approx_size += frag_size;
for (CDir::map_t::iterator p = dir->begin(); p != dir->end(); ++p) {
CDentry *dn = p->second;
if (!dn->get_linkage()->is_primary())
if (dn->get_linkage()->is_null()) {
approx_size += null_size;
continue;
}
if (dn->get_linkage()->is_remote()) {
approx_size += link_size;
continue;
}
approx_size += inode_size;
CInode *in = dn->get_linkage()->get_inode();
if (in->is_dir()) {
// directory?
list<CDir*> ls;
in->get_dirfrags(ls);
for (list<CDir*>::iterator q = ls.begin(); q != ls.end(); ++q) {
if (!(*q)->state_test(CDir::STATE_EXPORTBOUND)) {
for (auto q : ls) {
if (q->is_subtree_root()) {
q->state_set(CDir::STATE_EXPORTBOUND);
q->get(CDir::PIN_EXPORTBOUND);
} else {
// include nested dirfrag
assert((*q)->get_dir_auth().first == CDIR_AUTH_PARENT);
dfs.push_back(*q); // it's ours, recurse (later)
assert(q->get_dir_auth().first == CDIR_AUTH_PARENT);
dfs.push_front(q);
}
}
}
for (map<client_t, Capability*>::iterator q = in->client_caps.begin();
q != in->client_caps.end();
++q)
++q) {
approx_size += cap_size;
client_set.insert(q->first);
}
}
if (approx_size >= max_size)
break;
}
while (!dfs.empty()) {
CDir *dir = dfs.front();
dfs.pop_front();
dout(7) << "check_export_size: creating bound " << *dir << dendl;
assert(dir->is_auth());
dir->state_set(CDir::STATE_EXPORTBOUND);
dir->get(CDir::PIN_EXPORTBOUND);
mds->mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
// Another choice here is finishing all WAIT_UNFREEZE contexts and keeping
// the newly created subtree unfreeze.
dir->_freeze_tree();
stat.residual_dirs.insert(dir);
}
}
@ -1318,11 +1372,12 @@ public:
void Migrator::export_go(CDir *dir)
{
assert(export_state.count(dir));
dout(7) << "export_go " << *dir << " to " << export_state[dir].peer << dendl;
auto it = export_state.find(dir);
assert(it != export_state.end());
dout(7) << "export_go " << *dir << " to " << it->second.peer << dendl;
// first sync log to flush out e.g. any cap imports
mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, export_state[dir].tid));
mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, it->second.tid));
mds->mdlog->flush();
}
@ -1594,7 +1649,7 @@ uint64_t Migrator::encode_export_dir(bufferlist& exportbl,
if (!t->state_test(CDir::STATE_EXPORTBOUND)) {
// include nested dirfrag
assert(t->get_dir_auth().first == CDIR_AUTH_PARENT);
subdirs.push_back(t); // it's ours, recurse (later)
subdirs.push_front(t); // it's ours, recurse (later)
}
}
}
@ -1726,11 +1781,10 @@ void Migrator::handle_export_ack(MExportDirAck *m)
m->put();
}
void Migrator::export_notify_abort(CDir *dir, set<CDir*>& bounds)
void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds)
{
dout(7) << "export_notify_abort " << *dir << dendl;
export_state_t& stat = export_state[dir];
assert(stat.state == EXPORT_CANCELLING);
if (stat.notify_ack_waiting.empty()) {
@ -1743,9 +1797,9 @@ void Migrator::export_notify_abort(CDir *dir, set<CDir*>& bounds)
for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
p != stat.notify_ack_waiting.end();
++p) {
MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(),stat.tid, true,
pair<int,int>(mds->get_nodeid(),stat.peer),
pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN));
MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), stat.tid, true,
pair<int,int>(mds->get_nodeid(), stat.peer),
pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
notify->get_bounds().push_back((*i)->dirfrag());
mds->send_message_mds(notify, *p);
@ -1757,7 +1811,7 @@ void Migrator::export_notify_abort(CDir *dir, set<CDir*>& bounds)
* that is, we don't know they safely received and logged it, so we reverse our changes
* and go on.
*/
void Migrator::export_reverse(CDir *dir)
void Migrator::export_reverse(CDir *dir, export_state_t& stat)
{
dout(7) << "export_reverse " << *dir << dendl;
@ -1789,13 +1843,13 @@ void Migrator::export_reverse(CDir *dir)
}
// unpin bounds
for (const auto &bd : bounds) {
for (auto bd : bounds) {
bd->put(CDir::PIN_EXPORTBOUND);
bd->state_clear(CDir::STATE_EXPORTBOUND);
}
// notify bystanders
export_notify_abort(dir, bounds);
export_notify_abort(dir, stat, bounds);
// unfreeze tree, with possible subtree merge.
cache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
@ -1805,6 +1859,10 @@ void Migrator::export_reverse(CDir *dir)
dir->unfreeze_tree();
cache->try_subtree_merge(dir);
for (auto bd : stat.residual_dirs) {
bd->unfreeze_tree();
cache->try_subtree_merge(bd);
}
// revoke/resume stale caps
for (auto in : to_eval) {
@ -1991,6 +2049,12 @@ void Migrator::export_finish(CDir *dir)
// (we do this _after_ removing EXPORTBOUND pins, to allow merges)
dir->unfreeze_tree();
cache->try_subtree_merge(dir);
for (auto bd : it->second.residual_dirs) {
export_queue.push_front(pair<dirfrag_t,mds_rank_t>(bd->dirfrag(), it->second.peer));
bd->take_waiting(CDir::WAIT_ANY_MASK, finished);
bd->unfreeze_tree();
cache->try_subtree_merge(bd);
}
// no more auth subtree? clear scatter dirty
if (!dir->get_inode()->is_auth() &&
@ -2053,13 +2117,15 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
}
// only start discovering on this message once.
import_state_t *p_state;
map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
if (!m->started) {
assert(it == import_state.end());
m->started = true;
import_state[df].state = IMPORT_DISCOVERING;
import_state[df].peer = from;
import_state[df].tid = m->get_tid();
p_state = &import_state[df];
p_state->state = IMPORT_DISCOVERING;
p_state->peer = from;
p_state->tid = m->get_tid();
} else {
// am i retrying after ancient path_traverse results?
if (it == import_state.end() ||
@ -2070,6 +2136,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
return;
}
assert(it->second.state == IMPORT_DISCOVERING);
p_state = &it->second;
}
if (!mds->mdcache->is_open()) {
@ -2100,7 +2167,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
// yay
dout(7) << "handle_export_discover have " << df << " inode " << *in << dendl;
import_state[df].state = IMPORT_DISCOVERED;
p_state->state = IMPORT_DISCOVERED;
// pin inode in the cache (for now)
assert(in->is_dir());
@ -2108,7 +2175,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
// reply
dout(7) << " sending export_discover_ack on " << *in << dendl;
mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), import_state[df].peer);
mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), p_state->peer);
m->put();
assert (g_conf->mds_kill_import_at != 2);
}
@ -2125,10 +2192,10 @@ void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
import_state.erase(df);
}
void Migrator::import_reverse_prepping(CDir *dir)
void Migrator::import_reverse_prepping(CDir *dir, import_state_t& stat)
{
set<CDir*> bounds;
cache->map_dirfrag_set(import_state[dir->dirfrag()].bound_ls, bounds);
cache->map_dirfrag_set(stat.bound_ls, bounds);
import_remove_pins(dir, bounds);
import_reverse_final(dir);
}
@ -2150,7 +2217,7 @@ void Migrator::handle_export_cancel(MExportDirCancel *m)
} else if (it->second.state == IMPORT_PREPPING) {
CDir *dir = mds->mdcache->get_dirfrag(df);
assert(dir);
import_reverse_prepping(dir);
import_reverse_prepping(dir, it->second);
} else if (it->second.state == IMPORT_PREPPED) {
CDir *dir = mds->mdcache->get_dirfrag(df);
assert(dir);
@ -2263,7 +2330,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
} else
assert(0 == "unrecognized start char");
while (start != '-') {
while (!q.end()) {
CDentry *dn = cache->add_replica_dentry(q, cur, finished);
dout(10) << " added " << *dn << dendl;
CInode *in = cache->add_replica_inode(q, dn, finished);
@ -2360,7 +2427,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
}
if (!success)
import_reverse_prepping(dir);
import_reverse_prepping(dir, it->second);
// ok!
dout(7) << " sending export_prep_ack on " << *dir << dendl;

View File

@ -114,6 +114,8 @@ protected:
set<mds_rank_t> warning_ack_waiting;
set<mds_rank_t> notify_ack_waiting;
map<inodeno_t,map<client_t,Capability::Import> > peer_imported;
set<CDir*> residual_dirs;
MutationRef mut;
// for freeze tree deadlock detection
utime_t last_cum_auth_pins_change;
@ -145,14 +147,15 @@ protected:
void handle_export_discover_ack(MExportDirDiscoverAck *m);
void export_frozen(CDir *dir, uint64_t tid);
void check_export_size(CDir *dir, export_state_t& stat, set<client_t> &client_set);
void handle_export_prep_ack(MExportDirPrepAck *m);
void export_sessions_flushed(CDir *dir, uint64_t tid);
void export_go(CDir *dir);
void export_go_synced(CDir *dir, uint64_t tid);
void export_try_cancel(CDir *dir, bool notify_peer=true);
void export_cancel_finish(CDir *dir);
void export_reverse(CDir *dir);
void export_notify_abort(CDir *dir, set<CDir*>& bounds);
void export_reverse(CDir *dir, export_state_t& stat);
void export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds);
void handle_export_ack(MExportDirAck *m);
void export_logged_finish(CDir *dir);
void handle_export_notify_ack(MExportDirNotifyAck *m);
@ -175,7 +178,7 @@ protected:
void import_reverse_discovering(dirfrag_t df);
void import_reverse_discovered(dirfrag_t df, CInode *diri);
void import_reverse_prepping(CDir *dir);
void import_reverse_prepping(CDir *dir, import_state_t& stat);
void import_remove_pins(CDir *dir, set<CDir*>& bounds);
void import_reverse_unfreeze(CDir *dir);
void import_reverse_final(CDir *dir);
@ -295,7 +298,6 @@ public:
}
void get_export_lock_set(CDir *dir, set<SimpleLock*>& locks);
void get_export_client_set(CDir *dir, set<client_t> &client_set);
void get_export_client_set(CInode *in, set<client_t> &client_set);
void encode_export_inode(CInode *in, bufferlist& bl,