mds: compose and send resolve messages in batch

Resolve messages for all MDS are the same, so we can compose and
send them in batch.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
Reviewed-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Yan, Zheng 2013-03-13 16:23:30 +08:00 committed by Greg Farnum
parent a6d9eb8c58
commit 30dbb1d4e5
2 changed files with 95 additions and 101 deletions

View File

@ -2436,10 +2436,6 @@ void MDCache::resolve_start()
if (rootdir)
adjust_subtree_auth(rootdir, CDIR_AUTH_UNKNOWN);
}
for (map<int, map<metareqid_t, MDSlaveUpdate*> >::iterator p = uncommitted_slave_updates.begin();
p != uncommitted_slave_updates.end(); ++p)
need_resolve_ack.insert(p->first);
}
void MDCache::send_resolves()
@ -2448,9 +2444,10 @@ void MDCache::send_resolves()
got_resolve.clear();
other_ambiguous_imports.clear();
if (!need_resolve_ack.empty()) {
for (set<int>::iterator p = need_resolve_ack.begin(); p != need_resolve_ack.end(); ++p)
send_slave_resolve(*p);
send_slave_resolves();
if (!resolve_ack_gather.empty()) {
dout(10) << "send_resolves still waiting for resolve ack from ("
<< need_resolve_ack << ")" << dendl;
return;
}
if (!need_resolve_rollback.empty()) {
@ -2458,95 +2455,74 @@ void MDCache::send_resolves()
<< need_resolve_rollback << ")" << dendl;
return;
}
assert(uncommitted_slave_updates.empty());
for (set<int>::iterator p = recovery_set.begin(); p != recovery_set.end(); ++p) {
int who = *p;
if (who == mds->whoami)
continue;
if (migrator->is_importing() ||
migrator->is_exporting())
send_resolve_later(who);
else
send_resolve_now(who);
send_subtree_resolves();
}
void MDCache::send_slave_resolves()
{
dout(10) << "send_slave_resolves" << dendl;
map<int, MMDSResolve*> resolves;
if (mds->is_resolve()) {
for (map<int, map<metareqid_t, MDSlaveUpdate*> >::iterator p = uncommitted_slave_updates.begin();
p != uncommitted_slave_updates.end();
++p) {
resolves[p->first] = new MMDSResolve;
for (map<metareqid_t, MDSlaveUpdate*>::iterator q = p->second.begin();
q != p->second.end();
++q) {
dout(10) << " including uncommitted " << q->first << dendl;
resolves[p->first]->add_slave_request(q->first);
}
}
} else {
set<int> resolve_set;
mds->mdsmap->get_mds_set(resolve_set, MDSMap::STATE_RESOLVE);
for (hash_map<metareqid_t, MDRequest*>::iterator p = active_requests.begin();
p != active_requests.end();
++p) {
if (!p->second->is_slave() || !p->second->slave_did_prepare())
continue;
int master = p->second->slave_to_mds;
if (resolve_set.count(master)) {
dout(10) << " including uncommitted " << *p->second << dendl;
if (!resolves.count(master))
resolves[master] = new MMDSResolve;
resolves[master]->add_slave_request(p->first);
}
}
}
for (map<int, MMDSResolve*>::iterator p = resolves.begin();
p != resolves.end();
++p) {
dout(10) << "sending slave resolve to mds." << p->first << dendl;
mds->send_message_mds(p->second, p->first);
need_resolve_ack.insert(p->first);
}
}
void MDCache::send_resolve_later(int who)
void MDCache::send_subtree_resolves()
{
dout(10) << "send_resolve_later to mds." << who << dendl;
wants_resolve.insert(who);
}
dout(10) << "send_subtree_resolves" << dendl;
void MDCache::maybe_send_pending_resolves()
{
if (wants_resolve.empty())
return; // nothing to send.
// only if it's appropriate!
if (migrator->is_exporting() ||
migrator->is_importing()) {
dout(7) << "maybe_send_pending_resolves waiting, imports/exports still in progress" << dendl;
if (migrator->is_exporting() || migrator->is_importing()) {
dout(7) << "send_subtree_resolves waiting, imports/exports still in progress" << dendl;
migrator->show_importing();
migrator->show_exporting();
resolves_pending = true;
return; // not now
}
// ok, send them.
for (set<int>::iterator p = wants_resolve.begin();
p != wants_resolve.end();
++p)
send_resolve_now(*p);
wants_resolve.clear();
}
class C_MDC_SendResolve : public Context {
MDCache *mdc;
int who;
public:
C_MDC_SendResolve(MDCache *c, int w) : mdc(c), who(w) { }
void finish(int r) {
mdc->send_resolve_now(who);
map<int, MMDSResolve*> resolves;
for (set<int>::iterator p = recovery_set.begin();
p != recovery_set.end();
++p) {
if (*p == mds->whoami)
continue;
resolves[*p] = new MMDSResolve;
}
};
void MDCache::send_slave_resolve(int who)
{
dout(10) << "send_slave_resolve to mds." << who << dendl;
MMDSResolve *m = new MMDSResolve;
// list prepare requests lacking a commit
// [active survivor]
for (hash_map<metareqid_t, MDRequest*>::iterator p = active_requests.begin();
p != active_requests.end();
++p) {
if (p->second->is_slave() && p->second->slave_to_mds == who) {
dout(10) << " including uncommitted " << *p->second << dendl;
m->add_slave_request(p->first);
}
}
// [resolving]
if (uncommitted_slave_updates.count(who) &&
!uncommitted_slave_updates[who].empty()) {
for (map<metareqid_t, MDSlaveUpdate*>::iterator p = uncommitted_slave_updates[who].begin();
p != uncommitted_slave_updates[who].end();
++p) {
dout(10) << " including uncommitted " << p->first << dendl;
m->add_slave_request(p->first);
}
}
assert(!m->slave_requests.empty());
dout(10) << " will need resolve ack from mds." << who << dendl;
mds->send_message_mds(m, who);
}
void MDCache::send_resolve_now(int who)
{
dout(10) << "send_resolve_now to mds." << who << dendl;
MMDSResolve *m = new MMDSResolve;
show_subtrees();
// known
for (map<CDir*,set<CDir*> >::iterator p = subtrees.begin();
@ -2566,22 +2542,30 @@ void MDCache::send_resolve_now(int who)
set<CDir*> bounds;
get_subtree_bounds(dir, bounds);
vector<dirfrag_t> dfls;
for (set<CDir*>::iterator p = bounds.begin(); p != bounds.end(); ++p)
dfls.push_back((*p)->dirfrag());
m->add_ambiguous_import(dir->dirfrag(), dfls);
for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q)
dfls.push_back((*q)->dirfrag());
for (map<int, MMDSResolve*>::iterator q = resolves.begin();
q != resolves.end();
++q)
resolves[q->first]->add_ambiguous_import(dir->dirfrag(), dfls);
dout(10) << " ambig " << dir->dirfrag() << " " << dfls << dendl;
} else {
// not ambiguous.
m->add_subtree(dir->dirfrag());
for (map<int, MMDSResolve*>::iterator q = resolves.begin();
q != resolves.end();
++q)
resolves[q->first]->add_subtree(dir->dirfrag());
// bounds too
vector<dirfrag_t> dfls;
for (set<CDir*>::iterator q = subtrees[dir].begin();
q != subtrees[dir].end();
++q) {
CDir *bound = *q;
m->add_subtree_bound(dir->dirfrag(), bound->dirfrag());
dfls.push_back(bound->dirfrag());
for (map<int, MMDSResolve*>::iterator r = resolves.begin();
r != resolves.end();
++r)
resolves[r->first]->add_subtree_bound(dir->dirfrag(), bound->dirfrag());
}
dout(10) << " claim " << dir->dirfrag() << " " << dfls << dendl;
}
@ -2591,15 +2575,23 @@ void MDCache::send_resolve_now(int who)
for (map<dirfrag_t, vector<dirfrag_t> >::iterator p = my_ambiguous_imports.begin();
p != my_ambiguous_imports.end();
++p) {
m->add_ambiguous_import(p->first, p->second);
for (map<int, MMDSResolve*>::iterator q = resolves.begin();
q != resolves.end();
++q)
resolves[q->first]->add_ambiguous_import(p->first, p->second);
dout(10) << " ambig " << p->first << " " << p->second << dendl;
}
// send
mds->send_message_mds(m, who);
for (map<int, MMDSResolve*>::iterator p = resolves.begin();
p != resolves.end();
++p) {
dout(10) << "sending subtee resolve to mds." << p->first << dendl;
mds->send_message_mds(p->second, p->first);
}
resolves_pending = false;
}
void MDCache::handle_mds_failure(int who)
{
dout(7) << "handle_mds_failure mds." << who << dendl;
@ -2635,7 +2627,6 @@ void MDCache::handle_mds_failure(int who)
// slave to the failed node?
if (p->second->slave_to_mds == who) {
if (p->second->slave_did_prepare()) {
need_resolve_ack.insert(who);
dout(10) << " slave request " << *p->second << " uncommitted, will resolve shortly" << dendl;
} else {
dout(10) << " slave request " << *p->second << " has no prepare, finishing up" << dendl;
@ -3015,7 +3006,7 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
need_resolve_ack.erase(from);
if (need_resolve_ack.empty() && need_resolve_rollback.empty()) {
send_resolves();
send_subtree_resolves();
process_delayed_resolve();
}
@ -3082,7 +3073,7 @@ void MDCache::finish_rollback(metareqid_t reqid) {
finish_uncommitted_slave_update(reqid, need_resolve_rollback[reqid]);
need_resolve_rollback.erase(reqid);
if (need_resolve_ack.empty() && need_resolve_rollback.empty()) {
send_resolves();
send_subtree_resolves();
process_delayed_resolve();
}
}

View File

@ -328,6 +328,7 @@ protected:
friend class ESlaveUpdate;
friend class ECommitted;
bool resolves_pending;
set<int> wants_resolve; // nodes i need to send my resolve to
set<int> got_resolve; // nodes i got resolves from
set<int> need_resolve_ack; // nodes i need a resolve_ack from
@ -367,10 +368,12 @@ public:
void finish_ambiguous_import(dirfrag_t dirino);
void resolve_start();
void send_resolves();
void send_slave_resolve(int who);
void send_resolve_now(int who);
void send_resolve_later(int who);
void maybe_send_pending_resolves();
void send_slave_resolves();
void send_subtree_resolves();
void maybe_send_pending_resolves() {
if (resolves_pending)
send_subtree_resolves();
}
void _move_subtree_map_bound(dirfrag_t df, dirfrag_t oldparent, dirfrag_t newparent,
map<dirfrag_t,vector<dirfrag_t> >& subtrees);