mirror of
https://github.com/ceph/ceph
synced 2025-02-21 18:17:42 +00:00
* mds.migrator: fixed auth_pin vs exporting deadlock by allowing discover stage to abort.
* mds/journal: fixed importing bug git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1323 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
b7af8016c0
commit
a1dc12af20
@ -147,6 +147,8 @@ bool Locker::acquire_locks(MDRequest *mdr,
|
||||
p != dentry_xlocks.end();
|
||||
++p) {
|
||||
CDir *dir = (*p)->dir;
|
||||
dout(10) << "might auth_pin " << *dir << endl;
|
||||
|
||||
if (!dir->is_auth()) continue;
|
||||
if (!mdr->is_auth_pinned(dir) &&
|
||||
!dir->can_auth_pin()) {
|
||||
@ -177,6 +179,7 @@ bool Locker::acquire_locks(MDRequest *mdr,
|
||||
++p) {
|
||||
CDir *dir = (*p)->dir;
|
||||
if (!dir->is_auth()) continue;
|
||||
dout(10) << "auth_pinning " << *dir << endl;
|
||||
mdr->auth_pin(dir);
|
||||
}
|
||||
for (set<CInode*>::iterator p = inode_hard_xlocks.begin();
|
||||
@ -184,6 +187,7 @@ bool Locker::acquire_locks(MDRequest *mdr,
|
||||
++p) {
|
||||
CInode *in = *p;
|
||||
if (!in->is_auth()) continue;
|
||||
dout(10) << "auth_pinning " << *in << endl;
|
||||
mdr->auth_pin(in);
|
||||
}
|
||||
|
||||
|
@ -144,6 +144,7 @@ void MDBalancer::send_heartbeat()
|
||||
CDir *im = *it;
|
||||
int from = im->inode->authority().first;
|
||||
if (from == mds->get_nodeid()) continue;
|
||||
if (im->get_inode()->is_stray()) continue;
|
||||
import_map[from] += im->popularity[MDS_POP_CURDOM].meta_load();
|
||||
}
|
||||
mds_import_map[ mds->get_nodeid() ] = import_map;
|
||||
@ -431,6 +432,7 @@ void MDBalancer::do_rebalance(int beat)
|
||||
it != fullauthsubs.end();
|
||||
it++) {
|
||||
CDir *im = *it;
|
||||
if (im->get_inode()->is_stray()) continue;
|
||||
|
||||
double pop = im->popularity[MDS_POP_CURDOM].meta_load();
|
||||
if (pop < g_conf.mds_bal_idle_threshold &&
|
||||
@ -552,6 +554,7 @@ void MDBalancer::do_rebalance(int beat)
|
||||
for (set<CDir*>::iterator pot = candidates.begin();
|
||||
pot != candidates.end();
|
||||
pot++) {
|
||||
if ((*pot)->get_inode()->is_stray()) continue;
|
||||
find_exports(*pot, amount, exports, have, already_exporting);
|
||||
if (have > amount-MIN_OFFLOAD) {
|
||||
break;
|
||||
|
@ -2284,6 +2284,7 @@ bool MDCache::shutdown_pass()
|
||||
it != subtrees.end();
|
||||
it++) {
|
||||
CDir *dir = it->first;
|
||||
if (dir->get_inode()->is_stray()) continue;
|
||||
if (dir->is_frozen() || dir->is_freezing()) continue;
|
||||
if (!dir->is_fullauth()) continue;
|
||||
ls.push_back(dir);
|
||||
@ -2307,6 +2308,10 @@ bool MDCache::shutdown_pass()
|
||||
assert(!migrator->is_importing());
|
||||
|
||||
|
||||
// empty out stray contents
|
||||
// FIXME
|
||||
dout(7) << "FIXME: i need to empty out stray dir contents..." << endl;
|
||||
|
||||
// LOG
|
||||
mds->mdlog->trim(0);
|
||||
|
||||
|
@ -126,20 +126,20 @@ struct MDRequest {
|
||||
}
|
||||
|
||||
// auth pins
|
||||
bool is_auth_pinned(CInode *in) { return inode_auth_pins.count(in); }
|
||||
bool is_auth_pinned(CDir *dir) { return dir_auth_pins.count(dir); }
|
||||
void auth_pin(CInode *in) {
|
||||
if (inode_auth_pins.count(in)) {
|
||||
if (!is_auth_pinned(in)) {
|
||||
in->auth_pin();
|
||||
inode_auth_pins.insert(in);
|
||||
}
|
||||
}
|
||||
void auth_pin(CDir *dir) {
|
||||
if (dir_auth_pins.count(dir)) {
|
||||
if (!is_auth_pinned(dir)) {
|
||||
dir->auth_pin();
|
||||
dir_auth_pins.insert(dir);
|
||||
}
|
||||
}
|
||||
bool is_auth_pinned(CInode *in) { return inode_auth_pins.count(in); }
|
||||
bool is_auth_pinned(CDir *dir) { return dir_auth_pins.count(dir); }
|
||||
void drop_auth_pins() {
|
||||
for (set<CInode*>::iterator it = inode_auth_pins.begin();
|
||||
it != inode_auth_pins.end();
|
||||
|
@ -37,6 +37,7 @@
|
||||
|
||||
#include "messages/MExportDirDiscover.h"
|
||||
#include "messages/MExportDirDiscoverAck.h"
|
||||
#include "messages/MExportDirCancel.h"
|
||||
#include "messages/MExportDirPrep.h"
|
||||
#include "messages/MExportDirPrepAck.h"
|
||||
#include "messages/MExportDir.h"
|
||||
@ -177,32 +178,37 @@ void Migrator::handle_mds_failure(int who)
|
||||
next++;
|
||||
CDir *dir = p->first;
|
||||
|
||||
if (export_peer[dir] == who) {
|
||||
// the guy i'm exporting to failed.
|
||||
// clean up.
|
||||
// abort exports:
|
||||
// - that are going to the failed node
|
||||
// - that aren't frozen yet (to about auth_pin deadlock)
|
||||
if (export_peer[dir] == who ||
|
||||
p->second == EXPORT_DISCOVERING || p->second == EXPORT_FREEZING) {
|
||||
// the guy i'm exporting to failed, or we're just freezing.
|
||||
dout(10) << "cleaning up export state " << p->second << " of " << *dir << endl;
|
||||
|
||||
switch (p->second) {
|
||||
case EXPORT_DISCOVERING:
|
||||
dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << endl;
|
||||
dir->unfreeze_tree(); // cancel the freeze
|
||||
dir->auth_unpin(); // remove the auth_pin (that was holding up the freeze)
|
||||
dir->auth_unpin();
|
||||
export_state.erase(dir); // clean up
|
||||
dir->state_clear(CDir::STATE_EXPORTING);
|
||||
if (export_peer[dir] != who) // tell them.
|
||||
mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), who, MDS_PORT_MIGRATOR);
|
||||
break;
|
||||
|
||||
|
||||
case EXPORT_FREEZING:
|
||||
dout(10) << "export state=freezing : canceling freeze" << endl;
|
||||
dir->unfreeze_tree(); // cancel the freeze
|
||||
export_state.erase(dir); // clean up
|
||||
dir->state_clear(CDir::STATE_EXPORTING);
|
||||
if (export_peer[dir] != who) // tell them.
|
||||
mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), who, MDS_PORT_MIGRATOR);
|
||||
break;
|
||||
|
||||
// NOTE: state order reversal, warning comes after loggingstart+prepping
|
||||
case EXPORT_WARNING:
|
||||
dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << endl;
|
||||
//export_notify_abort(dir); // tell peers about abort
|
||||
|
||||
// fall-thru
|
||||
|
||||
//case EXPORT_LOGGINGSTART:
|
||||
@ -304,6 +310,12 @@ void Migrator::handle_mds_failure(int who)
|
||||
|
||||
if (import_peer[df] == who) {
|
||||
switch (import_state[df]) {
|
||||
case IMPORT_DISCOVERING:
|
||||
dout(10) << "import state=discovering : clearing state" << endl;
|
||||
import_state.erase(df);
|
||||
import_peer.erase(df);
|
||||
break;
|
||||
|
||||
case IMPORT_DISCOVERED:
|
||||
dout(10) << "import state=discovered : unpinning inode " << *diri << endl;
|
||||
assert(diri);
|
||||
@ -373,6 +385,8 @@ void Migrator::audit()
|
||||
for (map<dirfrag_t,int>::iterator p = import_state.begin();
|
||||
p != import_state.end();
|
||||
p++) {
|
||||
if (p->second == IMPORT_DISCOVERING)
|
||||
continue;
|
||||
if (p->second == IMPORT_DISCOVERED) {
|
||||
CInode *in = cache->get_inode(p->first.ino);
|
||||
assert(in);
|
||||
@ -380,7 +394,8 @@ void Migrator::audit()
|
||||
}
|
||||
CDir *dir = cache->get_dirfrag(p->first);
|
||||
assert(dir);
|
||||
if (p->second == IMPORT_PREPPING) continue;
|
||||
if (p->second == IMPORT_PREPPING)
|
||||
continue;
|
||||
assert(dir->auth_is_ambiguous());
|
||||
assert(dir->authority().first == mds->get_nodeid() ||
|
||||
dir->authority().second == mds->get_nodeid());
|
||||
@ -414,25 +429,22 @@ void Migrator::audit()
|
||||
class C_MDC_ExportFreeze : public Context {
|
||||
Migrator *mig;
|
||||
CDir *ex; // dir i'm exporting
|
||||
int dest;
|
||||
|
||||
public:
|
||||
C_MDC_ExportFreeze(Migrator *m, CDir *e, int d) :
|
||||
mig(m), ex(e), dest(d) {}
|
||||
C_MDC_ExportFreeze(Migrator *m, CDir *e) :
|
||||
mig(m), ex(e) {}
|
||||
virtual void finish(int r) {
|
||||
if (r >= 0)
|
||||
mig->export_frozen(ex, dest);
|
||||
mig->export_frozen(ex);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
/** export_dir(dir, dest)
|
||||
* public method to initiate an export.
|
||||
* will fail if the directory is freezing, frozen, unpinnable, or root.
|
||||
*/
|
||||
void Migrator::export_dir(CDir *dir,
|
||||
int dest)
|
||||
void Migrator::export_dir(CDir *dir, int dest)
|
||||
{
|
||||
dout(7) << "export_dir " << *dir << " to " << dest << endl;
|
||||
assert(dir->is_auth());
|
||||
@ -459,8 +471,11 @@ void Migrator::export_dir(CDir *dir,
|
||||
dout(7) << "can't export hashed dir right now. implement me carefully later." << endl;
|
||||
return;
|
||||
}
|
||||
if (dir->state_test(CDir::STATE_EXPORTING)) {
|
||||
dout(7) << "already exporting" << endl;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// pin path?
|
||||
vector<CDentry*> trace;
|
||||
cache->make_trace(trace, dir->inode);
|
||||
@ -468,25 +483,21 @@ void Migrator::export_dir(CDir *dir,
|
||||
dout(7) << "export_dir couldn't pin path, failing." << endl;
|
||||
return;
|
||||
}
|
||||
mds->locker->dentry_anon_rdlock_trace_start(trace);
|
||||
|
||||
// ok, let's go.
|
||||
// ok.
|
||||
mds->locker->dentry_anon_rdlock_trace_start(trace);
|
||||
assert(export_state.count(dir) == 0);
|
||||
export_state[dir] = EXPORT_DISCOVERING;
|
||||
export_peer[dir] = dest;
|
||||
|
||||
assert(!dir->state_test(CDir::STATE_EXPORTING));
|
||||
dir->state_set(CDir::STATE_EXPORTING);
|
||||
|
||||
// send ExportDirDiscover (ask target)
|
||||
mds->send_message_mds(new MExportDirDiscover(dir), dest, MDS_PORT_MIGRATOR);
|
||||
dir->auth_pin(); // pin dir, to hang up our freeze (unpin on discover ack)
|
||||
mds->send_message_mds(new MExportDirDiscover(dir), export_peer[dir], MDS_PORT_MIGRATOR);
|
||||
|
||||
// take away the popularity we're sending. FIXME: do this later?
|
||||
mds->balancer->subtract_export(dir);
|
||||
|
||||
// freeze the subtree
|
||||
dir->freeze_tree(new C_MDC_ExportFreeze(this, dir, dest));
|
||||
// start the freeze, but hold it up with an auth_pin.
|
||||
dir->auth_pin();
|
||||
dir->freeze_tree(new C_MDC_ExportFreeze(this, dir));
|
||||
}
|
||||
|
||||
|
||||
@ -500,33 +511,28 @@ void Migrator::handle_export_discover_ack(MExportDirDiscoverAck *m)
|
||||
assert(dir);
|
||||
|
||||
dout(7) << "export_discover_ack from " << m->get_source()
|
||||
<< " on " << *dir << ", releasing auth_pin" << endl;
|
||||
<< " on " << *dir << endl;
|
||||
|
||||
export_state[dir] = EXPORT_FREEZING;
|
||||
|
||||
dir->auth_unpin(); // unpin to allow freeze to complete
|
||||
if (export_state.count(dir) == 0 ||
|
||||
export_state[dir] != EXPORT_DISCOVERING ||
|
||||
export_peer[dir] != m->get_source().num()) {
|
||||
dout(7) << "must have aborted" << endl;
|
||||
} else {
|
||||
// freeze the subtree
|
||||
export_state[dir] = EXPORT_FREEZING;
|
||||
dir->auth_unpin();
|
||||
}
|
||||
|
||||
delete m; // done
|
||||
}
|
||||
|
||||
|
||||
void Migrator::export_frozen(CDir *dir,
|
||||
int dest)
|
||||
void Migrator::export_frozen(CDir *dir)
|
||||
{
|
||||
// subtree is now frozen!
|
||||
dout(7) << "export_frozen on " << *dir << " to " << dest << endl;
|
||||
|
||||
if (export_state.count(dir) == 0 ||
|
||||
export_state[dir] != EXPORT_FREEZING) {
|
||||
dout(7) << "dest must have failed, aborted" << endl;
|
||||
return;
|
||||
}
|
||||
|
||||
dout(7) << "export_frozen on " << *dir << endl;
|
||||
assert(dir->is_frozen());
|
||||
int dest = export_peer[dir];
|
||||
|
||||
// ok!
|
||||
//export_state[dir] = EXPORT_LOGGINGSTART;
|
||||
|
||||
cache->show_subtrees();
|
||||
|
||||
// note the bounds.
|
||||
@ -708,6 +714,9 @@ void Migrator::export_go(CDir *dir)
|
||||
// queue up the finisher
|
||||
dir->add_waiter( CDir::WAIT_UNFREEZE, fin );
|
||||
|
||||
// take away the popularity we're sending. FIXME: do this later?
|
||||
mds->balancer->subtract_export(dir);
|
||||
|
||||
// stats
|
||||
if (mds->logger) mds->logger->inc("ex");
|
||||
if (mds->logger) mds->logger->inc("iex", num_exported_inodes);
|
||||
@ -1005,9 +1014,6 @@ void Migrator::export_reverse(CDir *dir)
|
||||
// process delayed expires
|
||||
cache->process_delayed_expire(dir);
|
||||
|
||||
// tell peers
|
||||
//export_notify_abort(dir);
|
||||
|
||||
// unfreeze
|
||||
dir->unfreeze_tree();
|
||||
|
||||
@ -1020,23 +1026,6 @@ void Migrator::export_reverse(CDir *dir)
|
||||
cache->show_cache();
|
||||
}
|
||||
|
||||
/*
|
||||
void Migrator::export_notify_abort(CDir* dir)
|
||||
{
|
||||
dout(10) << "export_notify_abort " << *dir << endl;
|
||||
|
||||
// send out notify(abort) to bystanders. no ack necessary.
|
||||
for (set<int>::iterator p = export_notify_ack_waiting[dir].begin();
|
||||
p != export_notify_ack_waiting[dir].end();
|
||||
++p) {
|
||||
MExportDirNotify *notify = new MExportDirNotify(dir->ino(), false,
|
||||
pair<int,int>(mds->get_nodeid(), export_peer[dir]),
|
||||
pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
|
||||
notify->copy_bounds(export_bounds[dir]);
|
||||
mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR);
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
* once i get the ack, and logged the EExportFinish(true),
|
||||
@ -1215,46 +1204,89 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
|
||||
|
||||
dout(7) << "handle_export_discover on " << m->get_path() << endl;
|
||||
|
||||
// must discover it!
|
||||
filepath fpath(m->get_path());
|
||||
vector<CDentry*> trace;
|
||||
int r = cache->path_traverse(0,
|
||||
0,
|
||||
fpath, trace, true,
|
||||
m, new C_MDS_RetryMessage(mds,m), // on delay/retry
|
||||
MDS_TRAVERSE_DISCOVER);
|
||||
if (r > 0) return; // wait
|
||||
if (r < 0) {
|
||||
dout(7) << "handle_export_discover_2 failed to discover or not dir " << m->get_path() << ", NAK" << endl;
|
||||
assert(0); // this shouldn't happen if the auth pins his path properly!!!!
|
||||
}
|
||||
|
||||
CInode *in;
|
||||
if (trace.empty()) {
|
||||
in = cache->get_root();
|
||||
if (!in) {
|
||||
cache->open_root(new C_MDS_RetryMessage(mds, m));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
in = trace[trace.size()-1]->inode;
|
||||
}
|
||||
assert(in->is_dir());
|
||||
|
||||
// pin inode in the cache (for now)
|
||||
in->get(CInode::PIN_IMPORTING);
|
||||
|
||||
// note import state
|
||||
import_state[m->get_dirfrag()] = IMPORT_DISCOVERED;
|
||||
import_peer[m->get_dirfrag()] = m->get_source().num();
|
||||
dirfrag_t df = m->get_dirfrag();
|
||||
|
||||
// only start discovering on this message once.
|
||||
if (!m->started) {
|
||||
m->started = true;
|
||||
import_state[df] = IMPORT_DISCOVERING;
|
||||
import_peer[df] = m->get_source().num();
|
||||
}
|
||||
|
||||
// reply
|
||||
dout(7) << " sending export_discover_ack on " << *in << endl;
|
||||
mds->send_message_mds(new MExportDirDiscoverAck(m->get_dirfrag()),
|
||||
m->get_source().num(), MDS_PORT_MIGRATOR);
|
||||
// am i retrying after ancient path_traverse results?
|
||||
if (import_state.count(df) == 0 &&
|
||||
import_state[df] != IMPORT_DISCOVERING) {
|
||||
dout(7) << "hmm import_state is off, i must be obsolete lookup" << endl;
|
||||
delete m;
|
||||
return;
|
||||
}
|
||||
|
||||
// do we have it?
|
||||
CInode *in = cache->get_inode(m->get_dirfrag().ino);
|
||||
if (!in) {
|
||||
// must discover it!
|
||||
filepath fpath(m->get_path());
|
||||
vector<CDentry*> trace;
|
||||
int r = cache->path_traverse(0,
|
||||
0,
|
||||
fpath, trace, true,
|
||||
m, new C_MDS_RetryMessage(mds, m), // on delay/retry
|
||||
MDS_TRAVERSE_DISCOVER);
|
||||
if (r > 0) return; // wait
|
||||
if (r < 0) {
|
||||
dout(7) << "handle_export_discover_2 failed to discover or not dir " << m->get_path() << ", NAK" << endl;
|
||||
assert(0); // this shouldn't happen if the auth pins his path properly!!!!
|
||||
}
|
||||
|
||||
CInode *in;
|
||||
if (trace.empty()) {
|
||||
in = cache->get_root();
|
||||
if (!in) {
|
||||
cache->open_root(new C_MDS_RetryMessage(mds, m));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
in = trace[trace.size()-1]->inode;
|
||||
}
|
||||
}
|
||||
|
||||
// yay
|
||||
import_discovered(in, df);
|
||||
delete m;
|
||||
}
|
||||
|
||||
void Migrator::import_discovered(CInode *in, dirfrag_t df)
|
||||
{
|
||||
dout(7) << "import_discovered " << df << " inode " << *in << endl;
|
||||
|
||||
// pin inode in the cache (for now)
|
||||
assert(in->is_dir());
|
||||
in->get(CInode::PIN_IMPORTING);
|
||||
|
||||
// reply
|
||||
dout(7) << " sending export_discover_ack on " << *in << endl;
|
||||
mds->send_message_mds(new MExportDirDiscoverAck(df),
|
||||
import_peer[df], MDS_PORT_MIGRATOR);
|
||||
}
|
||||
|
||||
void Migrator::handle_export_cancel(MExportDirCancel *m)
|
||||
{
|
||||
dout(7) << "handle_export_cancel on " << m->get_dirfrag() << endl;
|
||||
|
||||
if (import_state[m->get_dirfrag()] == IMPORT_DISCOVERED) {
|
||||
CInode *in = cache->get_inode(m->get_dirfrag().ino);
|
||||
assert(in);
|
||||
in->put(CInode::PIN_IMPORTING);
|
||||
} else {
|
||||
assert(import_state[m->get_dirfrag()] == IMPORT_DISCOVERING);
|
||||
}
|
||||
|
||||
import_state.erase(m->get_dirfrag());
|
||||
import_peer.erase(m->get_dirfrag());
|
||||
|
||||
delete m;
|
||||
}
|
||||
|
||||
|
||||
void Migrator::handle_export_prep(MExportDirPrep *m)
|
||||
|
@ -31,6 +31,7 @@ class CDentry;
|
||||
|
||||
class MExportDirDiscover;
|
||||
class MExportDirDiscoverAck;
|
||||
class MExportDirCancel;
|
||||
class MExportDirPrep;
|
||||
class MExportDirPrepAck;
|
||||
class MExportDir;
|
||||
@ -39,21 +40,6 @@ class MExportDirNotify;
|
||||
class MExportDirNotifyAck;
|
||||
class MExportDirFinish;
|
||||
|
||||
class MHashDirDiscover;
|
||||
class MHashDirDiscoverAck;
|
||||
class MHashDirPrep;
|
||||
class MHashDirPrepAck;
|
||||
class MHashDir;
|
||||
class MHashDirAck;
|
||||
class MHashDirNotify;
|
||||
|
||||
class MUnhashDirPrep;
|
||||
class MUnhashDirPrepAck;
|
||||
class MUnhashDir;
|
||||
class MUnhashDirAck;
|
||||
class MUnhashDirNotify;
|
||||
class MUnhashDirNotifyAck;
|
||||
|
||||
class EImportStart;
|
||||
|
||||
class Migrator {
|
||||
@ -88,13 +74,14 @@ protected:
|
||||
|
||||
// -- imports --
|
||||
public:
|
||||
const static int IMPORT_DISCOVERED = 1; // waiting for prep
|
||||
const static int IMPORT_PREPPING = 2; // opening dirs on bounds
|
||||
const static int IMPORT_PREPPED = 3; // opened bounds, waiting for import
|
||||
const static int IMPORT_LOGGINGSTART = 4; // got import, logging EImportStart
|
||||
const static int IMPORT_ACKING = 5; // logged EImportStart, sent ack, waiting for finish
|
||||
//const static int IMPORT_LOGGINGFINISH = 6; // logging EImportFinish
|
||||
const static int IMPORT_ABORTING = 7; // notifying bystanders of an abort before unfreezing
|
||||
const static int IMPORT_DISCOVERING = 1; // waiting for prep
|
||||
const static int IMPORT_DISCOVERED = 2; // waiting for prep
|
||||
const static int IMPORT_PREPPING = 3; // opening dirs on bounds
|
||||
const static int IMPORT_PREPPED = 4; // opened bounds, waiting for import
|
||||
const static int IMPORT_LOGGINGSTART = 5; // got import, logging EImportStart
|
||||
const static int IMPORT_ACKING = 6; // logged EImportStart, sent ack, waiting for finish
|
||||
//const static int IMPORT_LOGGINGFINISH = 7; // logging EImportFinish
|
||||
const static int IMPORT_ABORTING = 8; // notifying bystanders of an abort before unfreezing
|
||||
|
||||
protected:
|
||||
map<dirfrag_t,int> import_state; // FIXME make these dirfrags
|
||||
@ -104,7 +91,6 @@ protected:
|
||||
map<CDir*,set<int> > import_bystanders;
|
||||
|
||||
|
||||
|
||||
/*
|
||||
// -- hashing madness --
|
||||
multimap<CDir*, int> unhash_waiting; // nodes i am waiting for UnhashDirAck's from
|
||||
@ -173,8 +159,7 @@ public:
|
||||
// -- import/export --
|
||||
// exporter
|
||||
public:
|
||||
void export_dir(CDir *dir,
|
||||
int mds);
|
||||
void export_dir(CDir *dir, int dest);
|
||||
void export_empty_import(CDir *dir);
|
||||
|
||||
void encode_export_inode(CInode *in, bufferlist& enc_state, int newauth);
|
||||
@ -187,7 +172,7 @@ public:
|
||||
|
||||
protected:
|
||||
void handle_export_discover_ack(MExportDirDiscoverAck *m);
|
||||
void export_frozen(CDir *dir, int dest);
|
||||
void export_frozen(CDir *dir);
|
||||
void handle_export_prep_ack(MExportDirPrepAck *m);
|
||||
void export_go(CDir *dir);
|
||||
int encode_export_dir(list<bufferlist>& dirstatelist,
|
||||
@ -196,7 +181,6 @@ public:
|
||||
CDir *dir,
|
||||
int newauth);
|
||||
void export_reverse(CDir *dir);
|
||||
void export_notify_abort(CDir* dir);
|
||||
void handle_export_ack(MExportDirAck *m);
|
||||
void export_logged_finish(CDir *dir);
|
||||
void handle_export_notify_ack(MExportDirNotifyAck *m);
|
||||
@ -204,14 +188,20 @@ public:
|
||||
|
||||
friend class C_MDC_ExportFreeze;
|
||||
friend class C_MDS_ExportFinishLogged;
|
||||
|
||||
|
||||
// importer
|
||||
void handle_export_discover(MExportDirDiscover *m);
|
||||
void handle_export_cancel(MExportDirCancel *m);
|
||||
void import_discovered(CInode *in, dirfrag_t df);
|
||||
void handle_export_prep(MExportDirPrep *m);
|
||||
void handle_export_dir(MExportDir *m);
|
||||
int decode_import_dir(bufferlist& bl,
|
||||
int oldauth,
|
||||
CDir *import_root,
|
||||
EImportStart *le);
|
||||
|
||||
|
||||
public:
|
||||
void import_reverse(CDir *dir, bool fix_dir_auth=true);
|
||||
protected:
|
||||
@ -230,69 +220,6 @@ protected:
|
||||
// bystander
|
||||
void handle_export_notify(MExportDirNotify *m);
|
||||
|
||||
|
||||
// -- hashed directories --
|
||||
|
||||
/*
|
||||
// HASH
|
||||
public:
|
||||
void hash_dir(CDir *dir); // on auth
|
||||
protected:
|
||||
map< CDir*, set<int> > hash_gather;
|
||||
map< CDir*, map< int, set<int> > > hash_notify_gather;
|
||||
map< CDir*, list<CInode*> > hash_proxy_inos;
|
||||
|
||||
// hash on auth
|
||||
void handle_hash_dir_discover_ack(MHashDirDiscoverAck *m);
|
||||
void hash_dir_complete(CDir *dir);
|
||||
void hash_dir_frozen(CDir *dir);
|
||||
void handle_hash_dir_prep_ack(MHashDirPrepAck *m);
|
||||
void hash_dir_go(CDir *dir);
|
||||
void handle_hash_dir_ack(MHashDirAck *m);
|
||||
void hash_dir_finish(CDir *dir);
|
||||
friend class C_MDC_HashFreeze;
|
||||
friend class C_MDC_HashComplete;
|
||||
|
||||
// auth and non-auth
|
||||
void handle_hash_dir_notify(MHashDirNotify *m);
|
||||
|
||||
// hash on non-auth
|
||||
void handle_hash_dir_discover(MHashDirDiscover *m);
|
||||
void handle_hash_dir_discover_2(MHashDirDiscover *m, CInode *in, int r);
|
||||
void handle_hash_dir_prep(MHashDirPrep *m);
|
||||
void handle_hash_dir(MHashDir *m);
|
||||
friend class C_MDC_HashDirDiscover;
|
||||
|
||||
// UNHASH
|
||||
public:
|
||||
void unhash_dir(CDir *dir); // on auth
|
||||
protected:
|
||||
map< CDir*, list<MUnhashDirAck*> > unhash_content;
|
||||
void import_hashed_content(CDir *dir, bufferlist& bl, int nden, int oldauth);
|
||||
|
||||
// unhash on auth
|
||||
void unhash_dir_frozen(CDir *dir);
|
||||
void unhash_dir_prep(CDir *dir);
|
||||
void handle_unhash_dir_prep_ack(MUnhashDirPrepAck *m);
|
||||
void unhash_dir_go(CDir *dir);
|
||||
void handle_unhash_dir_ack(MUnhashDirAck *m);
|
||||
void handle_unhash_dir_notify_ack(MUnhashDirNotifyAck *m);
|
||||
void unhash_dir_finish(CDir *dir);
|
||||
friend class C_MDC_UnhashFreeze;
|
||||
friend class C_MDC_UnhashComplete;
|
||||
|
||||
// unhash on all
|
||||
void unhash_dir_complete(CDir *dir);
|
||||
|
||||
// unhash on non-auth
|
||||
void handle_unhash_dir_prep(MUnhashDirPrep *m);
|
||||
void unhash_dir_prep_frozen(CDir *dir);
|
||||
void unhash_dir_prep_finish(CDir *dir);
|
||||
void handle_unhash_dir(MUnhashDir *m);
|
||||
void handle_unhash_dir_notify(MUnhashDirNotify *m);
|
||||
friend class C_MDC_UnhashPrepFreeze;
|
||||
*/
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
@ -591,6 +591,7 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mus
|
||||
|
||||
vector<CDentry*> trace;
|
||||
CDir *dir = traverse_to_auth_dir(mdr, trace, req->get_filepath());
|
||||
if (!dir) return 0;
|
||||
dout(10) << "rdlock_path_xlock_dentry dir " << *dir << endl;
|
||||
|
||||
// make sure we can auth_pin dir
|
||||
|
@ -105,12 +105,12 @@ bool EMetaBlob::has_expired(MDS *mds)
|
||||
if (ex->is_exporting()) {
|
||||
// wait until export is acked (logged on remote) and committed (logged locally)
|
||||
dout(10) << "EMetaBlob.has_expired ambiguous auth for " << *dir
|
||||
<< ", exporting (not yet safe) on " << *ex << endl;
|
||||
<< ", exporting on " << *ex << endl;
|
||||
return false;
|
||||
} else {
|
||||
dout(10) << "EMetaBlob.has_expired ambiguous auth for " << *dir
|
||||
<< ", _importing_ (safe) on " << *ex << endl;
|
||||
return true;
|
||||
<< ", importing on " << *ex << endl;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -153,6 +153,7 @@ void EMetaBlob::expire(MDS *mds, Context *c)
|
||||
{
|
||||
map<CDir*,version_t> commit; // dir -> version needed
|
||||
list<CDir*> waitfor_export;
|
||||
list<CDir*> waitfor_import;
|
||||
int ncommit = 0;
|
||||
|
||||
// examine dirv's for my lumps
|
||||
@ -187,7 +188,8 @@ void EMetaBlob::expire(MDS *mds, Context *c)
|
||||
continue;
|
||||
} else {
|
||||
dout(10) << "EMetaBlob.expire ambiguous auth for " << *dir
|
||||
<< ", but _importing_, we're safe on " << *ex << endl;
|
||||
<< ", waiting for import finish on " << *ex << endl;
|
||||
waitfor_import.push_back(ex);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -220,7 +222,11 @@ void EMetaBlob::expire(MDS *mds, Context *c)
|
||||
p != waitfor_export.end();
|
||||
++p)
|
||||
mds->mdcache->migrator->add_export_finish_waiter(*p, gather->new_sub());
|
||||
|
||||
for (list<CDir*>::iterator p = waitfor_import.begin();
|
||||
p != waitfor_import.end();
|
||||
++p)
|
||||
(*p)->add_waiter(CDir::WAIT_IMPORTED, gather->new_sub());
|
||||
|
||||
|
||||
// have my anchortable ops committed?
|
||||
for (list<version_t>::iterator p = atids.begin();
|
||||
|
48
branches/sage/cephmds2/messages/MExportDirCancel.h
Normal file
48
branches/sage/cephmds2/messages/MExportDirCancel.h
Normal file
@ -0,0 +1,48 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef __MEXPORTDIRCANCEL_H
|
||||
#define __MEXPORTDIRCANCEL_H
|
||||
|
||||
#include "msg/Message.h"
|
||||
#include "mds/CInode.h"
|
||||
#include "include/types.h"
|
||||
|
||||
class MExportDirCancel : public Message {
|
||||
dirfrag_t dirfrag;
|
||||
|
||||
public:
|
||||
dirfrag_t get_dirfrag() { return dirfrag; }
|
||||
|
||||
MExportDirCancel() {}
|
||||
MExportDirCancel(dirfrag_t df) :
|
||||
Message(MSG_MDS_EXPORTDIRCANCEL),
|
||||
dirfrag(df) { }
|
||||
|
||||
virtual char *get_type_name() { return "ExCancel"; }
|
||||
void print(ostream& o) {
|
||||
o << "export_cancel(" << dirfrag << ")";
|
||||
}
|
||||
|
||||
virtual void decode_payload() {
|
||||
int off = 0;
|
||||
payload.copy(off, sizeof(dirfrag), (char*)&dirfrag);
|
||||
off += sizeof(dirfrag);
|
||||
}
|
||||
|
||||
virtual void encode_payload() {
|
||||
payload.append((char*)&dirfrag, sizeof(dirfrag));
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
@ -27,9 +27,14 @@ class MExportDirDiscover : public Message {
|
||||
dirfrag_t get_dirfrag() { return dirfrag; }
|
||||
string& get_path() { return path; }
|
||||
|
||||
MExportDirDiscover() {}
|
||||
bool started;
|
||||
|
||||
MExportDirDiscover() :
|
||||
Message(MSG_MDS_EXPORTDIRDISCOVER),
|
||||
started(false) { }
|
||||
MExportDirDiscover(CDir *dir) :
|
||||
Message(MSG_MDS_EXPORTDIRDISCOVER) {
|
||||
Message(MSG_MDS_EXPORTDIRDISCOVER),
|
||||
started(false) {
|
||||
dir->get_inode()->make_path(path);
|
||||
dirfrag = dir->dirfrag();
|
||||
}
|
||||
|
@ -67,6 +67,7 @@ using namespace std;
|
||||
|
||||
#include "messages/MExportDirDiscover.h"
|
||||
#include "messages/MExportDirDiscoverAck.h"
|
||||
#include "messages/MExportDirCancel.h"
|
||||
#include "messages/MExportDirPrep.h"
|
||||
#include "messages/MExportDirPrepAck.h"
|
||||
#include "messages/MExportDirWarning.h"
|
||||
@ -77,24 +78,6 @@ using namespace std;
|
||||
#include "messages/MExportDirNotifyAck.h"
|
||||
#include "messages/MExportDirFinish.h"
|
||||
|
||||
#include "messages/MHashReaddir.h"
|
||||
#include "messages/MHashReaddirReply.h"
|
||||
|
||||
#include "messages/MHashDirDiscover.h"
|
||||
#include "messages/MHashDirDiscoverAck.h"
|
||||
#include "messages/MHashDirPrep.h"
|
||||
#include "messages/MHashDirPrepAck.h"
|
||||
#include "messages/MHashDir.h"
|
||||
#include "messages/MHashDirAck.h"
|
||||
#include "messages/MHashDirNotify.h"
|
||||
|
||||
#include "messages/MUnhashDirPrep.h"
|
||||
#include "messages/MUnhashDirPrepAck.h"
|
||||
#include "messages/MUnhashDir.h"
|
||||
#include "messages/MUnhashDirAck.h"
|
||||
#include "messages/MUnhashDirNotify.h"
|
||||
#include "messages/MUnhashDirNotifyAck.h"
|
||||
|
||||
#include "messages/MRenameWarning.h"
|
||||
#include "messages/MRenameNotify.h"
|
||||
#include "messages/MRenameNotifyAck.h"
|
||||
@ -297,6 +280,9 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
|
||||
case MSG_MDS_EXPORTDIRDISCOVERACK:
|
||||
m = new MExportDirDiscoverAck();
|
||||
break;
|
||||
case MSG_MDS_EXPORTDIRCANCEL:
|
||||
m = new MExportDirCancel();
|
||||
break;
|
||||
|
||||
case MSG_MDS_EXPORTDIR:
|
||||
m = new MExportDir;
|
||||
@ -332,54 +318,6 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
|
||||
break;
|
||||
|
||||
|
||||
case MSG_MDS_HASHREADDIR:
|
||||
m = new MHashReaddir();
|
||||
break;
|
||||
case MSG_MDS_HASHREADDIRREPLY:
|
||||
m = new MHashReaddirReply();
|
||||
break;
|
||||
|
||||
case MSG_MDS_HASHDIRDISCOVER:
|
||||
m = new MHashDirDiscover();
|
||||
break;
|
||||
case MSG_MDS_HASHDIRDISCOVERACK:
|
||||
m = new MHashDirDiscoverAck();
|
||||
break;
|
||||
case MSG_MDS_HASHDIRPREP:
|
||||
m = new MHashDirPrep();
|
||||
break;
|
||||
case MSG_MDS_HASHDIRPREPACK:
|
||||
m = new MHashDirPrepAck();
|
||||
break;
|
||||
case MSG_MDS_HASHDIR:
|
||||
m = new MHashDir();
|
||||
break;
|
||||
case MSG_MDS_HASHDIRACK:
|
||||
m = new MHashDirAck();
|
||||
break;
|
||||
case MSG_MDS_HASHDIRNOTIFY:
|
||||
m = new MHashDirNotify();
|
||||
break;
|
||||
|
||||
case MSG_MDS_UNHASHDIRPREP:
|
||||
m = new MUnhashDirPrep();
|
||||
break;
|
||||
case MSG_MDS_UNHASHDIRPREPACK:
|
||||
m = new MUnhashDirPrepAck();
|
||||
break;
|
||||
case MSG_MDS_UNHASHDIR:
|
||||
m = new MUnhashDir();
|
||||
break;
|
||||
case MSG_MDS_UNHASHDIRACK:
|
||||
m = new MUnhashDirAck();
|
||||
break;
|
||||
case MSG_MDS_UNHASHDIRNOTIFY:
|
||||
m = new MUnhashDirNotify();
|
||||
break;
|
||||
case MSG_MDS_UNHASHDIRNOTIFYACK:
|
||||
m = new MUnhashDirNotifyAck();
|
||||
break;
|
||||
|
||||
case MSG_MDS_RENAMEWARNING:
|
||||
m = new MRenameWarning();
|
||||
break;
|
||||
|
@ -119,8 +119,9 @@
|
||||
#define MSG_MDS_INODEUNLINK 142
|
||||
#define MSG_MDS_INODEUNLINKACK 143
|
||||
|
||||
#define MSG_MDS_EXPORTDIRDISCOVER 150
|
||||
#define MSG_MDS_EXPORTDIRDISCOVERACK 151
|
||||
#define MSG_MDS_EXPORTDIRDISCOVER 149
|
||||
#define MSG_MDS_EXPORTDIRDISCOVERACK 150
|
||||
#define MSG_MDS_EXPORTDIRCANCEL 151
|
||||
#define MSG_MDS_EXPORTDIRPREP 152
|
||||
#define MSG_MDS_EXPORTDIRPREPACK 153
|
||||
#define MSG_MDS_EXPORTDIRWARNING 154
|
||||
|
Loading…
Reference in New Issue
Block a user