* AnchorTable/AnchorClient update to handle failure for table of initiator.

* Simplified anchor ops.  
* Rollback.


git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1231 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sageweil 2007-03-14 17:54:32 +00:00
parent a29ff7ff72
commit d4a88488b4
21 changed files with 1003 additions and 392 deletions

View File

@ -39,10 +39,11 @@ mds
- clean up client op redirection
- idempotent ops
- journal+recovery
- anchortable, anchorclient
- link
- unlink
- open(wr cap), open+create
- file capabilities i/o
- link
- rename
- dirfrags vs readdir
- redo hard links

View File

@ -121,6 +121,16 @@ void parse_syn_options(vector<char*>& args)
syn_sargs.push_back( args[++i] );
syn_iargs.push_back( atoi(args[++i]) );
} else if (strcmp(args[i],"thrashlinks") == 0) {
syn_modes.push_back( SYNCLIENT_MODE_THRASHLINKS );
syn_iargs.push_back( atoi(args[++i]) );
syn_iargs.push_back( atoi(args[++i]) );
syn_iargs.push_back( atoi(args[++i]) );
} else if (strcmp(args[i],"foo") == 0) {
syn_modes.push_back( SYNCLIENT_MODE_FOO );
} else if (strcmp(args[i],"until") == 0) {
syn_modes.push_back( SYNCLIENT_MODE_UNTIL );
syn_iargs.push_back( atoi(args[++i]) );
@ -216,6 +226,11 @@ int SyntheticClient::run()
dout(3) << "mode " << mode << endl;
switch (mode) {
case SYNCLIENT_MODE_FOO:
if (run_me())
foo();
break;
case SYNCLIENT_MODE_RANDOMSLEEP:
{
int iarg1 = iargs.front();
@ -334,6 +349,21 @@ int SyntheticClient::run()
break;
case SYNCLIENT_MODE_THRASHLINKS:
{
string sarg1 = get_sarg(0);
int iarg1 = iargs.front(); iargs.pop_front();
int iarg2 = iargs.front(); iargs.pop_front();
int iarg3 = iargs.front(); iargs.pop_front();
if (run_me()) {
dout(2) << "thrashlinks " << sarg1 << " " << iarg1 << " " << iarg2 << " " << iarg3 << endl;
thrash_links(sarg1.c_str(), iarg1, iarg2, iarg3);
}
}
break;
case SYNCLIENT_MODE_MAKEFILES:
{
int num = iargs.front(); iargs.pop_front();
@ -1289,3 +1319,63 @@ void SyntheticClient::make_dir_mess(const char *basedir, int n)
}
void SyntheticClient::foo()
{
client->mknod("one", 0755);
client->mknod("two", 0755);
client->link("one", "three");
client->mkdir("dir", 0755);
client->link("two", "/dir/twolink");
client->link("dir/twolink", "four");
}
int SyntheticClient::thrash_links(const char *basedir, int dirs, int files, int depth)
{
dout(1) << "thrash_links " << basedir << " " << dirs << " " << files << " " << depth << endl;
if (time_to_stop()) return 0;
// first make dir/file tree
make_dirs(basedir,dirs,files,depth);
// now link shit up
int n = files*dirs;
for (int i=0; i<n; i++) {
if (time_to_stop()) return 0;
char f[20];
// pick a file
string file = basedir;
if (depth) {
int d = rand() % (depth+1);
for (int k=0; k<d; k++) {
sprintf(f, "/dir.%d", rand() % dirs);
file += f;
}
}
sprintf(f, "/file.%d", rand() % files);
file += f;
// pick a dir for our link
string ln = basedir;
if (depth) {
int d = rand() % (depth+1);
for (int k=0; k<d; k++) {
sprintf(f, "/dir.%d", rand() % dirs);
ln += f;
}
}
sprintf(f, "/ln.%d", i);
ln += f;
client->link(file.c_str(), ln.c_str());
}
return 0;
}

View File

@ -54,6 +54,8 @@
#define SYNCLIENT_MODE_SLEEP 62
#define SYNCLIENT_MODE_FOO 100
#define SYNCLIENT_MODE_THRASHLINKS 101
void parse_syn_options(vector<char*>& args);
@ -196,6 +198,10 @@ class SyntheticClient {
void make_dir_mess(const char *basedir, int n);
void foo();
int thrash_links(const char *basedir, int dirs, int files, int depth);
};
#endif

View File

@ -0,0 +1,53 @@
ANCHOR TABLE PROTOCOL
MDS sends an update PREPARE to the anchortable MDS. The prepare is
identified by the ino and operation type; only one for each type
(create, update, destroy) can be pending at any time. Both parties
may actually be the same local node, but for simplicity we treat that
situation the same. (That is, we act as if they may fail
independently, even if they can't.)
The anchortable journals the proposed update, and responds with an
AGREE and a version number. This uniquely identifies the request.
The MDS can then update the filesystem metadata however it sees fit.
When it is finished (and the results journaled), it sends a COMMIT to
the anchortable. The table journals the commit, frees any state from
the transaction, and sends an ACK. The initiating MDS should then
journal the ACK to complete the transaction.
ANCHOR TABLE FAILURE
If the AT fails before journaling the PREPARE and sending the AGREE,
the initiating MDS will simply retry the request.
If the AT fails after journaling PREPARE but before journaling COMMIT,
it will resend AGREE to the initiating MDS.
If the AT fails after the COMMIT, the transaction has been closed, and it
takes no action. If it receives a COMMIT for which it has no open
transaction, it will reply with ACK.
INITIATING MDS FAILURE
If the MDS fails before the metadata update has been journaled, no
action is taken, since nothing is known about the previously proposed
transaction. If an AGREE message is received and there is no
corresponding PREPARE state, and ROLLBACK is sent to the anchor table.
If the MDS fails after journaling the metadata update but before
journaling the ACK, it resends COMMIT to the anchor table. If it
receives an AGREE after resending the COMMIT, it simply ignores the
AGREE. The anchortable will respond with an ACK, allowing the
initiating MDS to journal the final ACK and close out the transaction
locally.
On journal replay, each metadata update (EMetaBlob) encountered that
includes an anchor transaction is noted in the AnchorClient by adding
it to the pending_commit list, and each journaled ACK is removed from
that list. Journal replay may enounter ACKs with no prior metadata
update; these are ignored. When recovery finishes, a COMMIT is sent
for all outstanding transactions.

View File

@ -25,34 +25,46 @@ using std::string;
// anchor ops
#define ANCHOR_OP_LOOKUP 1
#define ANCHOR_OP_LOOKUP_REPLY 2
#define ANCHOR_OP_CREATE_PREPARE 3
#define ANCHOR_OP_CREATE_ACK 4
#define ANCHOR_OP_CREATE_COMMIT 5
#define ANCHOR_OP_DESTROY_PREPARE 6
#define ANCHOR_OP_DESTROY_ACK 7
#define ANCHOR_OP_DESTROY_COMMIT 8
#define ANCHOR_OP_UPDATE_PREPARE 9
#define ANCHOR_OP_UPDATE_ACK 10
#define ANCHOR_OP_UPDATE_COMMIT 11
#define ANCHOR_OP_CREATE_PREPARE 11
#define ANCHOR_OP_CREATE_AGREE 12
#define ANCHOR_OP_DESTROY_PREPARE 21
#define ANCHOR_OP_DESTROY_AGREE 22
#define ANCHOR_OP_UPDATE_PREPARE 31
#define ANCHOR_OP_UPDATE_AGREE 32
#define ANCHOR_OP_COMMIT 41
#define ANCHOR_OP_ACK 42
#define ANCHOR_OP_ROLLBACK 43
inline const char* get_anchor_opname(int o) {
switch (o) {
case ANCHOR_OP_LOOKUP: return "lookup";
case ANCHOR_OP_LOOKUP_REPLY: return "lookup_reply";
case ANCHOR_OP_CREATE_PREPARE: return "create_prepare";
case ANCHOR_OP_CREATE_ACK: return "create_ack";
case ANCHOR_OP_CREATE_COMMIT: return "create_commit";
case ANCHOR_OP_CREATE_AGREE: return "create_agree";
case ANCHOR_OP_DESTROY_PREPARE: return "destroy_prepare";
case ANCHOR_OP_DESTROY_ACK: return "destroy_ack";
case ANCHOR_OP_DESTROY_COMMIT: return "destroy_commit";
case ANCHOR_OP_DESTROY_AGREE: return "destroy_agree";
case ANCHOR_OP_UPDATE_PREPARE: return "update_prepare";
case ANCHOR_OP_UPDATE_ACK: return "update_ack";
case ANCHOR_OP_UPDATE_COMMIT: return "update_commit";
case ANCHOR_OP_UPDATE_AGREE: return "update_agree";
case ANCHOR_OP_COMMIT: return "commit";
case ANCHOR_OP_ACK: return "ack";
case ANCHOR_OP_ROLLBACK: return "rollback";
default: assert(0);
}
}
// identifies a anchor table mutation
// anchor type
class Anchor {

View File

@ -24,13 +24,15 @@ using std::endl;
#include "msg/Messenger.h"
#include "MDS.h"
#include "MDLog.h"
#include "events/EAnchor.h"
#include "messages/MAnchor.h"
#include "config.h"
#undef dout
#define dout(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << messenger->get_myaddr() << ".anchorclient "
#define derr(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << messenger->get_myaddr() << ".anchorclient "
#define dout(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient "
#define derr(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient "
void AnchorClient::dispatch(Message *m)
@ -47,17 +49,18 @@ void AnchorClient::dispatch(Message *m)
void AnchorClient::handle_anchor_reply(class MAnchor *m)
{
inodeno_t ino = m->get_ino();
version_t atid = m->get_atid();
switch (m->get_op()) {
// lookup
case ANCHOR_OP_LOOKUP_REPLY:
assert(pending_lookup.count(ino));
{
assert(pending_lookup_trace.count(m->get_ino()) == 1);
*(pending_lookup_trace[ m->get_ino() ]) = m->get_trace();
Context *onfinish = pending_lookup[ m->get_ino() ];
pending_lookup_trace.erase(m->get_ino());
pending_lookup.erase(m->get_ino());
*pending_lookup[ino].trace = m->get_trace();
Context *onfinish = pending_lookup[ino].onfinish;
pending_lookup.erase(ino);
if (onfinish) {
onfinish->finish(0);
@ -66,19 +69,100 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
}
break;
case ANCHOR_OP_UPDATE_ACK:
case ANCHOR_OP_CREATE_ACK:
case ANCHOR_OP_DESTROY_ACK:
{
assert(pending_op.count(m->get_ino()) == 1);
// prepare -> agree
case ANCHOR_OP_CREATE_AGREE:
if (pending_create_prepare.count(ino)) {
dout(10) << "got create_agree on " << ino << " atid " << atid << endl;
Context *onfinish = pending_create_prepare[ino].onfinish;
*pending_create_prepare[ino].patid = atid;
pending_create_prepare.erase(ino);
Context *onfinish = pending_op[m->get_ino()];
pending_op.erase(m->get_ino());
pending_commit.insert(atid);
if (onfinish) {
onfinish->finish(0);
delete onfinish;
}
} else {
dout(10) << "stray create_agree on " << ino
<< " atid " << atid
<< ", sending ROLLBACK"
<< endl;
MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
mds->messenger->send_message(req,
mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
break;
case ANCHOR_OP_DESTROY_AGREE:
if (pending_destroy_prepare.count(ino)) {
dout(10) << "got destroy_agree on " << ino << " atid " << atid << endl;
Context *onfinish = pending_destroy_prepare[ino].onfinish;
*pending_destroy_prepare[ino].patid = atid;
pending_destroy_prepare.erase(ino);
pending_commit.insert(atid);
if (onfinish) {
onfinish->finish(0);
delete onfinish;
}
} else {
dout(10) << "stray destroy_agree on " << ino
<< " atid " << atid
<< ", sending ROLLBACK"
<< endl;
MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
mds->messenger->send_message(req,
mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
break;
case ANCHOR_OP_UPDATE_AGREE:
if (pending_update_prepare.count(ino)) {
dout(10) << "got update_agree on " << ino << " atid " << atid << endl;
Context *onfinish = pending_update_prepare[ino].onfinish;
*pending_update_prepare[ino].patid = atid;
pending_update_prepare.erase(ino);
pending_commit.insert(atid);
if (onfinish) {
onfinish->finish(0);
delete onfinish;
}
} else {
dout(10) << "stray update_agree on " << ino
<< " atid " << atid
<< ", sending ROLLBACK"
<< endl;
MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
mds->messenger->send_message(req,
mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
break;
// commit -> ack
case ANCHOR_OP_ACK:
{
dout(10) << "got ack on atid " << atid << ", logging" << endl;
// remove from committing list
assert(pending_commit.count(atid));
pending_commit.erase(atid);
// log ACK.
mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_ACK, 0, atid)); // ino doesn't matter.
// kick any waiters
if (ack_waiters.count(atid)) {
dout(15) << "kicking waiters on atid " << atid << endl;
mds->queue_finished(ack_waiters[atid]);
ack_waiters.erase(atid);
}
}
break;
@ -86,6 +170,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
assert(0);
}
delete m;
}
@ -105,77 +190,96 @@ void AnchorClient::lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinis
// send message
MAnchor *req = new MAnchor(ANCHOR_OP_LOOKUP, ino);
pending_lookup_trace[ino] = &trace;
pending_lookup[ino] = onfinish;
assert(pending_lookup.count(ino) == 0);
pending_lookup[ino].onfinish = onfinish;
pending_lookup[ino].trace = &trace;
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
mds->messenger->send_message(req,
mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
void AnchorClient::prepare_create(inodeno_t ino, vector<Anchor>& trace, Context *onfinish)
// PREPARE
void AnchorClient::prepare_create(inodeno_t ino, vector<Anchor>& trace,
version_t *patid, Context *onfinish)
{
// send message
MAnchor *req = new MAnchor(ANCHOR_OP_CREATE_PREPARE, ino);
req->set_trace(trace);
pending_op[ino] = onfinish;
pending_create_prepare[ino].trace = trace;
pending_create_prepare[ino].patid = patid;
pending_create_prepare[ino].onfinish = onfinish;
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
mds->messenger->send_message(req,
mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
void AnchorClient::commit_create(inodeno_t ino)
{
// send message
MAnchor *req = new MAnchor(ANCHOR_OP_CREATE_COMMIT, ino);
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
void AnchorClient::prepare_destroy(inodeno_t ino, Context *onfinish)
void AnchorClient::prepare_destroy(inodeno_t ino,
version_t *patid, Context *onfinish)
{
// send message
MAnchor *req = new MAnchor(ANCHOR_OP_DESTROY_PREPARE, ino);
pending_op[ino] = onfinish;
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
void AnchorClient::commit_destroy(inodeno_t ino)
{
// send message
MAnchor *req = new MAnchor(ANCHOR_OP_DESTROY_COMMIT, ino);
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
pending_destroy_prepare[ino].onfinish = onfinish;
pending_destroy_prepare[ino].patid = patid;
mds->messenger->send_message(req,
mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
void AnchorClient::prepare_update(inodeno_t ino, vector<Anchor>& trace, Context *onfinish)
void AnchorClient::prepare_update(inodeno_t ino, vector<Anchor>& trace,
version_t *patid, Context *onfinish)
{
// send message
MAnchor *req = new MAnchor(ANCHOR_OP_UPDATE_PREPARE, ino);
req->set_trace(trace);
pending_op[ino] = onfinish;
pending_update_prepare[ino].trace = trace;
pending_update_prepare[ino].patid = patid;
pending_update_prepare[ino].onfinish = onfinish;
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
mds->messenger->send_message(req,
mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
void AnchorClient::commit_update(inodeno_t ino)
// COMMIT
void AnchorClient::commit(version_t atid)
{
dout(10) << "commit " << atid << endl;
assert(pending_commit.count(atid));
pending_commit.insert(atid);
// send message
MAnchor *req = new MAnchor(ANCHOR_OP_UPDATE_COMMIT, ino);
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
mds->messenger->send_message(req,
mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
// RECOVERY
void AnchorClient::finish_recovery()
{
dout(7) << "finish_recovery - sending COMMIT on un-ACKed atids" << endl;
for (set<version_t>::iterator p = pending_commit.begin();
p != pending_commit.end();
++p) {
dout(10) << " sending COMMIT on " << *p << endl;
MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, *p);
mds->messenger->send_message(req,
mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
}

View File

@ -24,40 +24,66 @@ using __gnu_cxx::hash_map;
#include "Anchor.h"
class Messenger;
class MDSMap;
class Context;
class MDS;
class AnchorClient : public Dispatcher {
Messenger *messenger;
MDSMap *mdsmap;
MDS *mds;
// lookups
hash_map<inodeno_t, Context*> pending_lookup;
hash_map<inodeno_t, vector<Anchor>*> pending_lookup_trace;
struct _pending_lookup {
vector<Anchor> *trace;
Context *onfinish;
};
hash_map<inodeno_t, _pending_lookup> pending_lookup;
// updates
hash_map<inodeno_t, Context*> pending_op;
// prepares
struct _pending_prepare {
vector<Anchor> trace;
Context *onfinish;
version_t *patid; // ptr to atid
};
hash_map<inodeno_t, _pending_prepare> pending_create_prepare;
hash_map<inodeno_t, _pending_prepare> pending_destroy_prepare;
hash_map<inodeno_t, _pending_prepare> pending_update_prepare;
// pending commits
set<version_t> pending_commit;
map<version_t, list<Context*> > ack_waiters;
void handle_anchor_reply(class MAnchor *m);
public:
AnchorClient(Messenger *ms, MDSMap *mm) : messenger(ms), mdsmap(mm) {}
AnchorClient(MDS *m) : mds(m) {}
void dispatch(Message *m);
// async user interface
void lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinish);
void prepare_create(inodeno_t ino, vector<Anchor>& trace, Context *onfinish);
void commit_create(inodeno_t ino);
void prepare_create(inodeno_t ino, vector<Anchor>& trace, version_t *atid, Context *onfinish);
void prepare_destroy(inodeno_t ino, version_t *atid, Context *onfinish);
void prepare_update(inodeno_t ino, vector<Anchor>& trace, version_t *atid, Context *onfinish);
void prepare_destroy(inodeno_t ino, Context *onfinish);
void commit_destroy(inodeno_t ino);
void commit(version_t atid);
void prepare_update(inodeno_t ino, vector<Anchor>& trace, Context *onfinish);
void commit_update(inodeno_t ino);
// for recovery (by other nodes)
void handle_mds_recovery(int mds); // called when someone else recovers
// for recovery (by me)
void got_journaled_agree(version_t atid) {
pending_commit.insert(atid);
}
void got_journaled_ack(version_t atid) {
pending_commit.erase(atid);
}
bool has_committed(version_t atid) {
return pending_commit.count(atid) == 0;
}
void wait_for_ack(version_t atid, Context *c) {
ack_waiters[atid].push_back(c);
}
void finish_recovery(); // called when i recover and go active
};

View File

@ -26,10 +26,19 @@
#include "config.h"
#undef dout
#define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchortable "
#define derr(x) if (x <= g_conf.debug_mds) cerr << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchortable "
#define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myname() << ".anchortable "
#define derr(x) if (x <= g_conf.debug_mds) cerr << g_clock.now() << " " << mds->messenger->get_myname() << ".anchortable "
void AnchorTable::dump()
{
dout(7) << "dump v " << version << endl;
for (hash_map<inodeno_t, Anchor>::iterator it = anchor_map.begin();
it != anchor_map.end();
it++)
dout(15) << "dump " << it->second << endl;
}
/*
* basic updates
@ -37,7 +46,7 @@
bool AnchorTable::add(inodeno_t ino, dirfrag_t dirfrag)
{
dout(7) << "add " << ino << " dirfrag " << dirfrag << endl;
dout(17) << "add " << ino << " dirfrag " << dirfrag << endl;
// parent should be there
assert(dirfrag.ino < MDS_INO_BASE || // system dirino
@ -46,10 +55,10 @@ bool AnchorTable::add(inodeno_t ino, dirfrag_t dirfrag)
if (anchor_map.count(ino) == 0) {
// new item
anchor_map[ino] = Anchor(ino, dirfrag);
dout(10) << " add: added " << anchor_map[ino] << endl;
dout(10) << "add added " << anchor_map[ino] << endl;
return true;
} else {
dout(10) << " add: had " << anchor_map[ino] << endl;
dout(10) << "add had " << anchor_map[ino] << endl;
return false;
}
}
@ -59,9 +68,9 @@ void AnchorTable::inc(inodeno_t ino)
dout(7) << "inc " << ino << endl;
assert(anchor_map.count(ino));
Anchor &anchor = anchor_map[ino];
while (1) {
Anchor &anchor = anchor_map[ino];
anchor.nref++;
dout(10) << " inc: record now " << anchor << endl;
@ -69,7 +78,6 @@ void AnchorTable::inc(inodeno_t ino)
if (ino == 0) break;
if (anchor_map.count(ino) == 0) break;
anchor = anchor_map[ino];
}
}
@ -143,63 +151,109 @@ void AnchorTable::create_prepare(inodeno_t ino, vector<Anchor>& trace)
for (unsigned i=0; i<trace.size(); i++)
add(trace[i].ino, trace[i].dirfrag);
inc(ino);
pending_create.insert(ino); // so we can undo
version++;
}
void AnchorTable::create_commit(inodeno_t ino)
{
pending_create.erase(ino);
version++;
pending_create[version] = ino; // so we can undo
//dump();
}
void AnchorTable::destroy_prepare(inodeno_t ino)
{
pending_destroy.insert(ino);
version++;
}
void AnchorTable::destroy_commit(inodeno_t ino)
{
// apply
dec(ino);
pending_destroy.erase(ino);
version++;
pending_destroy[version] = ino;
//dump();
}
void AnchorTable::update_prepare(inodeno_t ino, vector<Anchor>& trace)
{
pending_update[ino] = trace;
version++;
pending_update[version].first = ino;
pending_update[version].second = trace;
//dump();
}
void AnchorTable::update_commit(inodeno_t ino)
void AnchorTable::commit(version_t atid)
{
// remove old
dec(ino);
if (pending_create.count(atid)) {
dout(7) << "commit " << atid << " create " << pending_create[atid] << endl;
pending_create.erase(atid);
}
// add new
// make sure new trace is in table
vector<Anchor> &trace = pending_update[ino];
for (unsigned i=0; i<trace.size(); i++)
add(trace[i].ino, trace[i].dirfrag);
inc(ino);
else if (pending_destroy.count(atid)) {
inodeno_t ino = pending_destroy[atid];
dout(7) << "commit " << atid << " destroy " << ino << endl;
pending_update.erase(ino);
dec(ino); // destroy
pending_destroy.erase(atid);
}
else if (pending_update.count(atid)) {
inodeno_t ino = pending_update[atid].first;
vector<Anchor> &trace = pending_update[atid].second;
dout(7) << "commit " << atid << " update " << ino << endl;
// remove old
dec(ino);
// add new
for (unsigned i=0; i<trace.size(); i++)
add(trace[i].ino, trace[i].dirfrag);
inc(ino);
pending_update.erase(atid);
}
else
assert(0);
// bump version.
version++;
//dump();
}
void AnchorTable::rollback(version_t atid)
{
if (pending_create.count(atid)) {
inodeno_t ino = pending_destroy[atid];
dout(7) << "rollback " << atid << " create " << ino << endl;
dec(ino);
pending_create.erase(atid);
}
else if (pending_destroy.count(atid)) {
inodeno_t ino = pending_destroy[atid];
dout(7) << "rollback " << atid << " destroy " << ino << endl;
pending_destroy.erase(atid);
}
else if (pending_update.count(atid)) {
inodeno_t ino = pending_update[atid].first;
dout(7) << "rollback " << atid << " update " << ino << endl;
pending_update.erase(atid);
}
else
assert(0);
// bump version.
version++;
//dump();
}
// CREATE
class C_AT_CreatePrepare : public Context {
AnchorTable *at;
MAnchor *req;
version_t atid;
public:
C_AT_CreatePrepare(AnchorTable *a, MAnchor *r) :
at(a), req(r) { }
C_AT_CreatePrepare(AnchorTable *a, MAnchor *r, version_t t) :
at(a), req(r), atid(t) { }
void finish(int r) {
at->_create_prepare_logged(req);
at->_create_prepare_logged(req, atid);
}
};
@ -216,30 +270,22 @@ void AnchorTable::handle_create_prepare(MAnchor *req)
EAnchor *le = new EAnchor(ANCHOR_OP_CREATE_PREPARE, ino, version);
le->set_trace(trace);
mds->mdlog->submit_entry(le,
new C_AT_CreatePrepare(this, req));
new C_AT_CreatePrepare(this, req, version));
}
void AnchorTable::_create_prepare_logged(MAnchor *req)
void AnchorTable::_create_prepare_logged(MAnchor *req, version_t atid)
{
inodeno_t ino = req->get_ino();
dout(7) << "_create_prepare_logged " << ino << endl;
dout(7) << "_create_prepare_logged " << ino << " atid " << atid << endl;
// reply
MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_ACK, ino);
MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, ino, atid);
mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
delete req;
}
void AnchorTable::handle_create_commit(MAnchor *req)
{
inodeno_t ino = req->get_ino();
dout(7) << "handle_create_commit " << ino << endl;
create_commit(ino);
mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_CREATE_COMMIT, ino, version));
}
// DESTROY
@ -247,11 +293,12 @@ void AnchorTable::handle_create_commit(MAnchor *req)
class C_AT_DestroyPrepare : public Context {
AnchorTable *at;
MAnchor *req;
version_t atid;
public:
C_AT_DestroyPrepare(AnchorTable *a, MAnchor *r) :
at(a), req(r) { }
C_AT_DestroyPrepare(AnchorTable *a, MAnchor *r, version_t t) :
at(a), req(r), atid(t) { }
void finish(int r) {
at->_destroy_prepare_logged(req);
at->_destroy_prepare_logged(req, atid);
}
};
@ -263,31 +310,20 @@ void AnchorTable::handle_destroy_prepare(MAnchor *req)
destroy_prepare(ino);
mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_PREPARE, ino, version),
new C_AT_DestroyPrepare(this, req));
new C_AT_DestroyPrepare(this, req, version));
}
void AnchorTable::_destroy_prepare_logged(MAnchor *req)
void AnchorTable::_destroy_prepare_logged(MAnchor *req, version_t atid)
{
inodeno_t ino = req->get_ino();
dout(7) << "_destroy_prepare_logged " << ino << endl;
dout(7) << "_destroy_prepare_logged " << ino << " atid " << atid << endl;
// reply
MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_ACK, ino);
MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, ino, atid);
mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
delete req;
}
void AnchorTable::handle_destroy_commit(MAnchor *req)
{
inodeno_t ino = req->get_ino();
dout(7) << "handle_destroy_commit " << ino << endl;
destroy_commit(ino);
// log
mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_COMMIT, ino, version));
}
// UPDATE
@ -295,11 +331,12 @@ void AnchorTable::handle_destroy_commit(MAnchor *req)
class C_AT_UpdatePrepare : public Context {
AnchorTable *at;
MAnchor *req;
version_t atid;
public:
C_AT_UpdatePrepare(AnchorTable *a, MAnchor *r) :
at(a), req(r) { }
C_AT_UpdatePrepare(AnchorTable *a, MAnchor *r, version_t t) :
at(a), req(r), atid(t) { }
void finish(int r) {
at->_update_prepare_logged(req);
at->_update_prepare_logged(req, atid);
}
};
@ -316,32 +353,88 @@ void AnchorTable::handle_update_prepare(MAnchor *req)
EAnchor *le = new EAnchor(ANCHOR_OP_UPDATE_PREPARE, ino, version);
le->set_trace(trace);
mds->mdlog->submit_entry(le,
new C_AT_UpdatePrepare(this, req));
new C_AT_UpdatePrepare(this, req, version));
}
void AnchorTable::_update_prepare_logged(MAnchor *req)
void AnchorTable::_update_prepare_logged(MAnchor *req, version_t atid)
{
inodeno_t ino = req->get_ino();
dout(7) << "_update_prepare_logged " << ino << endl;
dout(7) << "_update_prepare_logged " << ino << " atid " << atid << endl;
// reply
MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_ACK, ino);
MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, ino, atid);
mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
delete req;
}
void AnchorTable::handle_update_commit(MAnchor *req)
// COMMIT
class C_AT_Commit : public Context {
AnchorTable *at;
MAnchor *req;
public:
C_AT_Commit(AnchorTable *a, MAnchor *r) :
at(a), req(r) { }
void finish(int r) {
at->_commit_logged(req);
}
};
void AnchorTable::handle_commit(MAnchor *req)
{
inodeno_t ino = req->get_ino();
dout(7) << "handle_update_commit " << ino << endl;
version_t atid = req->get_atid();
dout(7) << "handle_commit " << atid << endl;
update_commit(ino);
if (pending_create.count(atid) ||
pending_destroy.count(atid) ||
pending_update.count(atid)) {
commit(atid);
mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_COMMIT, atid, version));
}
else if (atid <= version) {
dout(0) << "got commit for atid " << atid << " <= " << version
<< ", already committed, sending ack."
<< endl;
MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, 0, atid);
mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
delete req;
return;
}
else {
// wtf.
dout(0) << "got commit for atid " << atid << " > " << version << endl;
assert(atid <= version);
}
mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_UPDATE_COMMIT, ino, version));
// wait for it to journal
mds->mdlog->wait_for_sync(new C_AT_Commit(this, req));
}
void AnchorTable::_commit_logged(MAnchor *req)
{
dout(7) << "_commit_logged, sending ACK" << endl;
MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, req->get_ino(), req->get_atid());
mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
delete req;
}
// ROLLBACK
void AnchorTable::handle_rollback(MAnchor *req)
{
version_t atid = req->get_atid();
dout(7) << "handle_rollback " << atid << endl;
rollback(atid);
delete req;
}
/*
* messages
*/
@ -384,23 +477,15 @@ void AnchorTable::handle_anchor_request(class MAnchor *req)
case ANCHOR_OP_CREATE_PREPARE:
handle_create_prepare(req);
break;
case ANCHOR_OP_CREATE_COMMIT:
handle_create_commit(req);
break;
case ANCHOR_OP_DESTROY_PREPARE:
handle_destroy_prepare(req);
break;
case ANCHOR_OP_DESTROY_COMMIT:
handle_destroy_commit(req);
break;
case ANCHOR_OP_UPDATE_PREPARE:
handle_update_prepare(req);
break;
case ANCHOR_OP_UPDATE_COMMIT:
handle_update_commit(req);
case ANCHOR_OP_COMMIT:
handle_commit(req);
break;
default:
@ -416,11 +501,30 @@ void AnchorTable::handle_anchor_request(class MAnchor *req)
// load/save entire table for now!
class C_AT_Saved : public Context {
AnchorTable *at;
version_t version;
public:
C_AT_Saved(AnchorTable *a, version_t v) : at(a), version(v) {}
void finish(int r) {
at->_saved(version);
}
};
void AnchorTable::save(Context *onfinish)
{
dout(7) << "save v " << version << endl;
if (!opened) return;
if (onfinish)
waiting_for_save[version].push_back(onfinish);
if (committing_version == version) {
dout(7) << "save already committing v " << version << endl;
return;
}
committing_version = version;
// build up write
bufferlist bl;
@ -445,18 +549,31 @@ void AnchorTable::save(Context *onfinish)
size_t s = pending_update.size();
bl.append((char*)&s, sizeof(s));
for (map<inodeno_t, vector<Anchor> >::iterator p = pending_update.begin();
for (map<version_t, pair<inodeno_t, vector<Anchor> > >::iterator p = pending_update.begin();
p != pending_update.end();
++p) {
bl.append((char*)&p->first, sizeof(p->first));
::_encode(p->second, bl);
bl.append((char*)&p->second.first, sizeof(p->second.first));
::_encode(p->second.second, bl);
}
// write!
mds->objecter->write(object_t(MDS_INO_ANCHORTABLE+mds->get_nodeid(), 0),
0, bl.length(),
bl,
NULL, onfinish);
NULL, new C_AT_Saved(this, version));
}
void AnchorTable::_saved(version_t v)
{
dout(7) << "_saved v " << v << endl;
assert(v <= committing_version);
assert(committed_version < v);
committed_version = v;
finish_contexts(waiting_for_save[v], 0);
waiting_for_save.erase(v);
}
@ -510,10 +627,15 @@ void AnchorTable::_loaded(bufferlist& bl)
bl.copy(off, sizeof(s), (char*)&s);
off += sizeof(s);
for (size_t i=0; i<s; i++) {
version_t atid;
bl.copy(off, sizeof(atid), (char*)&atid);
off += sizeof(atid);
inodeno_t ino;
bl.copy(off, sizeof(ino), (char*)&ino);
off += sizeof(ino);
::_decode(pending_update[ino], bl, off);
pending_update[atid].first = ino;
::_decode(pending_update[atid].second, bl, off);
}
assert(off == (int)bl.length());

View File

@ -31,17 +31,20 @@ class AnchorTable {
hash_map<inodeno_t, Anchor> anchor_map;
// uncommitted operations
set<inodeno_t> pending_create;
set<inodeno_t> pending_destroy;
map<inodeno_t, vector<Anchor> > pending_update;
map<version_t, inodeno_t> pending_create;
map<version_t, inodeno_t> pending_destroy;
map<version_t, pair<inodeno_t, vector<Anchor> > > pending_update;
version_t version; // this includes anchor_map AND pending_* state.
version_t committing_version;
version_t committed_version;
// load/save state
bool opening, opened;
// waiters
list<Context*> waiting_for_open;
map<version_t, list<Context*> > waiting_for_save;
protected:
@ -52,46 +55,52 @@ protected:
// mid-level
void create_prepare(inodeno_t ino, vector<Anchor>& trace);
void create_commit(inodeno_t ino);
void destroy_prepare(inodeno_t ino);
void destroy_commit(inodeno_t ino);
void update_prepare(inodeno_t ino, vector<Anchor>& trace);
void update_commit(inodeno_t ino);
void commit(version_t atid);
void rollback(version_t atid);
friend class EAnchor; // used for journal replay.
// high level interface
void handle_lookup(MAnchor *req);
void handle_create_prepare(MAnchor *req);
void _create_prepare_logged(MAnchor *req);
void handle_create_commit(MAnchor *req);
void _create_prepare_logged(MAnchor *req, version_t atid);
friend class C_AT_CreatePrepare;
void handle_destroy_prepare(MAnchor *req);
void _destroy_prepare_logged(MAnchor *req);
void handle_destroy_commit(MAnchor *req);
void _destroy_prepare_logged(MAnchor *req, version_t atid);
friend class C_AT_DestroyPrepare;
void handle_update_prepare(MAnchor *req);
void _update_prepare_logged(MAnchor *req);
void handle_update_commit(MAnchor *req);
void _update_prepare_logged(MAnchor *req, version_t atid);
friend class C_AT_UpdatePrepare;
void handle_commit(MAnchor *req);
void _commit_logged(MAnchor *req);
friend class C_AT_Commit;
void handle_rollback(MAnchor *req);
// messages
void handle_anchor_request(MAnchor *m);
void dump();
public:
AnchorTable(MDS *m) :
mds(m),
version(0),
version(0), committing_version(0), committed_version(0),
opening(false), opened(false) { }
void dispatch(class Message *m);
version_t get_version() { return version; }
version_t get_committed_version() { return committed_version; }
void create_fresh() { // reset on mkfs() to empty, loaded table.
version = 0;
void create_fresh() {
// reset (i.e. on mkfs) to empty, but unsaved table.
version = 1;
opened = true;
opening = false;
anchor_map.clear();
@ -102,6 +111,7 @@ public:
// load/save entire table for now!
void save(Context *onfinish);
void _saved(version_t v);
void load(Context *onfinish);
void _loaded(bufferlist& bl);

View File

@ -284,7 +284,11 @@ void CDir::link_inode_work( CDentry *dn, CInode *in )
void CDir::unlink_inode( CDentry *dn )
{
dout(12) << "unlink_inode " << *dn << " " << *dn->inode << endl;
if (dn->is_remote()) {
dout(12) << "unlink_inode " << *dn << endl;
} else {
dout(12) << "unlink_inode " << *dn << " " << *dn->inode << endl;
}
unlink_inode_work(dn);

View File

@ -226,6 +226,8 @@ protected:
bool is_dir() { return ((inode.mode & INODE_TYPE_MASK) == INODE_MODE_DIR) ? true:false; }
bool is_anchored() { return inode.anchored; }
bool is_anchoring() { return state_test(STATE_ANCHORING); }
bool is_unanchoring() { return state_test(STATE_UNANCHORING); }
bool is_root() { return state & STATE_ROOT; }

View File

@ -3028,9 +3028,10 @@ class C_MDC_AnchorCreatePrepared : public Context {
MDCache *cache;
CInode *in;
public:
version_t atid;
C_MDC_AnchorCreatePrepared(MDCache *c, CInode *i) : cache(c), in(i) {}
void finish(int r) {
cache->_anchor_create_prepared(in);
cache->_anchor_create_prepared(in, atid);
}
};
@ -3058,24 +3059,26 @@ void MDCache::anchor_create(CInode *in, Context *onfinish)
in->make_anchor_trace(trace);
// do it
mds->anchorclient->prepare_create(in->ino(), trace,
new C_MDC_AnchorCreatePrepared(this, in));
C_MDC_AnchorCreatePrepared *fin = new C_MDC_AnchorCreatePrepared(this, in);
mds->anchorclient->prepare_create(in->ino(), trace, &fin->atid, fin);
}
class C_MDC_AnchorCreateLogged : public Context {
MDCache *cache;
CInode *in;
version_t atid;
version_t pdv;
public:
C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t v) : cache(c), in(i), pdv(v) {}
C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t t, version_t v) :
cache(c), in(i), atid(t), pdv(v) {}
void finish(int r) {
cache->_anchor_create_logged(in, pdv);
cache->_anchor_create_logged(in, atid, pdv);
}
};
void MDCache::_anchor_create_prepared(CInode *in)
void MDCache::_anchor_create_prepared(CInode *in, version_t atid)
{
dout(10) << "_anchor_create_prepared " << *in << endl;
dout(10) << "_anchor_create_prepared " << *in << " atid " << atid << endl;
assert(in->inode.anchored == false);
@ -3090,12 +3093,15 @@ void MDCache::_anchor_create_prepared(CInode *in)
pi->anchored = true;
pi->version = pdv;
// note anchor transaction
le->metablob.add_anchor_transaction(atid);
// log + wait
mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, pdv));
mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, atid, pdv));
}
void MDCache::_anchor_create_logged(CInode *in, version_t pdv)
void MDCache::_anchor_create_logged(CInode *in, version_t atid, version_t pdv)
{
dout(10) << "_anchor_create_logged pdv " << pdv << " on " << *in << endl;
@ -3106,10 +3112,10 @@ void MDCache::_anchor_create_logged(CInode *in, version_t pdv)
// apply update to cache
in->inode.anchored = true;
in->inode.version = pdv;
in->mark_dirty(pdv);
// tell the anchortable we've committed
mds->anchorclient->commit_create(in->ino());
mds->anchorclient->commit(atid);
// trigger waiters
in->finish_waiting(CInode::WAIT_ANCHORED, 0);
@ -3122,9 +3128,10 @@ class C_MDC_AnchorDestroyPrepared : public Context {
MDCache *cache;
CInode *in;
public:
version_t atid;
C_MDC_AnchorDestroyPrepared(MDCache *c, CInode *i) : cache(c), in(i) {}
void finish(int r) {
cache->_anchor_destroy_prepared(in);
cache->_anchor_destroy_prepared(in, atid);
}
};
@ -3149,23 +3156,26 @@ void MDCache::anchor_destroy(CInode *in, Context *onfinish)
in->get(CInode::PIN_UNANCHORING);
// do it
mds->anchorclient->prepare_destroy(in->ino(), new C_MDC_AnchorDestroyPrepared(this, in));
C_MDC_AnchorDestroyPrepared *fin = new C_MDC_AnchorDestroyPrepared(this, in);
mds->anchorclient->prepare_destroy(in->ino(), &fin->atid, fin);
}
class C_MDC_AnchorDestroyLogged : public Context {
MDCache *cache;
CInode *in;
version_t atid;
version_t pdv;
public:
C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t v) : cache(c), in(i), pdv(v) {}
C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t t, version_t v) :
cache(c), in(i), atid(t), pdv(v) {}
void finish(int r) {
cache->_anchor_destroy_logged(in, pdv);
cache->_anchor_destroy_logged(in, atid, pdv);
}
};
void MDCache::_anchor_destroy_prepared(CInode *in)
void MDCache::_anchor_destroy_prepared(CInode *in, version_t atid)
{
dout(10) << "_anchor_destroy_prepared " << *in << endl;
dout(10) << "_anchor_destroy_prepared " << *in << " atid " << atid << endl;
assert(in->inode.anchored == true);
@ -3180,12 +3190,15 @@ void MDCache::_anchor_destroy_prepared(CInode *in)
pi->anchored = true;
pi->version = pdv;
// note anchor transaction
le->metablob.add_anchor_transaction(atid);
// log + wait
mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, pdv));
mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, atid, pdv));
}
void MDCache::_anchor_destroy_logged(CInode *in, version_t pdv)
void MDCache::_anchor_destroy_logged(CInode *in, version_t atid, version_t pdv)
{
dout(10) << "_anchor_destroy_logged pdv " << pdv << " on " << *in << endl;
@ -3199,7 +3212,7 @@ void MDCache::_anchor_destroy_logged(CInode *in, version_t pdv)
in->inode.version = pdv;
// tell the anchortable we've committed
mds->anchorclient->commit_destroy(in->ino());
mds->anchorclient->commit(atid);
// trigger waiters
in->finish_waiting(CInode::WAIT_UNANCHORED, 0);

View File

@ -347,10 +347,10 @@ public:
void anchor_create(CInode *in, Context *onfinish);
void anchor_destroy(CInode *in, Context *onfinish);
protected:
void _anchor_create_prepared(CInode *in);
void _anchor_create_logged(CInode *in, version_t pdv);
void _anchor_destroy_prepared(CInode *in);
void _anchor_destroy_logged(CInode *in, version_t pdv);
void _anchor_create_prepared(CInode *in, version_t atid);
void _anchor_create_logged(CInode *in, version_t atid, version_t pdv);
void _anchor_destroy_prepared(CInode *in, version_t atid);
void _anchor_destroy_logged(CInode *in, version_t atid, version_t pdv);
friend class C_MDC_AnchorCreatePrepared;
friend class C_MDC_AnchorCreateLogged;

View File

@ -83,7 +83,7 @@ MDS::MDS(int whoami, Messenger *m, MonMap *mm) : timer(mds_lock) {
mdlog = new MDLog(this);
balancer = new MDBalancer(this);
anchorclient = new AnchorClient(messenger, mdsmap);
anchorclient = new AnchorClient(this);
idalloc = new IdAllocator(this);
anchortable = new AnchorTable(this);
@ -508,6 +508,16 @@ void MDS::handle_mds_map(MMDSMap *m)
// now active?
if (is_active()) {
// did i just recover?
if (oldstate == MDSMap::STATE_REJOIN) {
dout(1) << "successful recovery!" << endl;
// kick anchorclient (resent COMMITs)
anchorclient->finish_recovery();
// ...
}
dout(1) << "now active" << endl;
finish_contexts(waitfor_active); // kick waiters
}
@ -528,13 +538,6 @@ void MDS::handle_mds_map(MMDSMap *m)
mdcache->shutdown_start();
// save anchor table
if (mdsmap->get_anchortable() == whoami)
anchortable->save(0); // FIXME? or detect completion via filer?
if (idalloc)
idalloc->save(0); // FIXME? or detect completion via filer?
// flush log
mdlog->set_max_events(0);
mdlog->trim(NULL);

View File

@ -953,6 +953,7 @@ void Server::handle_client_mknod(MClientRequest *req, CInode *diri)
assert(dn);
// it's a file.
dn->pre_dirty();
newi->inode.mode = req->args.mknod.mode;
newi->inode.mode &= ~INODE_TYPE_MASK;
newi->inode.mode |= INODE_MODE_FILE;
@ -1038,7 +1039,7 @@ int Server::prepare_mknod(MClientRequest *req, CInode *diri,
*pdn = dir->lookup(name);
if (*pdn) {
if (!(*pdn)->can_read(req)) {
dout(10) << "waiting on (existing!) dentry " << **pdn << endl;
dout(10) << "waiting on (existing!) unreadable dentry " << **pdn << endl;
dir->add_waiter(CDir::WAIT_DNREAD, name, new C_MDS_RetryRequest(mds, req, diri));
return 0;
}
@ -1064,25 +1065,25 @@ int Server::prepare_mknod(MClientRequest *req, CInode *diri,
return 0;
}
// make sure dir is pinnable
// create inode
*pin = mdcache->create_inode();
(*pin)->inode.uid = req->get_caller_uid();
(*pin)->inode.gid = req->get_caller_gid();
(*pin)->inode.ctime = (*pin)->inode.mtime = (*pin)->inode.atime = g_clock.gettime(); // now
// note: inode.version will get set by finisher's mark_dirty.
// create dentry
// create null dentry
if (!*pdn)
*pdn = dir->add_dentry(name, 0);
(*pdn)->pre_dirty();
// xlock dentry
bool res = mds->locker->dentry_xlock_start(*pdn, req, diri);
assert(res == true);
if (!res)
return 0;
// yay!
// create inode?
if (pin) {
*pin = mdcache->create_inode();
(*pin)->inode.uid = req->get_caller_uid();
(*pin)->inode.gid = req->get_caller_gid();
(*pin)->inode.ctime = (*pin)->inode.mtime = (*pin)->inode.atime = g_clock.gettime(); // now
// note: inode.version will get set by finisher's mark_dirty.
}
// bump modify pop
mds->balancer->hit_dir(dir, META_POP_DWR);
@ -1110,6 +1111,7 @@ void Server::handle_client_mkdir(MClientRequest *req, CInode *diri)
assert(dn);
// it's a directory.
dn->pre_dirty();
newi->inode.mode = req->args.mkdir.mode;
newi->inode.mode &= ~INODE_TYPE_MASK;
newi->inode.mode |= INODE_MODE_DIR;
@ -1166,6 +1168,7 @@ void Server::handle_client_symlink(MClientRequest *req, CInode *diri)
assert(dn);
// it's a symlink
dn->pre_dirty();
newi->inode.mode &= ~INODE_TYPE_MASK;
newi->inode.mode |= INODE_MODE_SYMLINK;
newi->symlink = req->get_sarg();
@ -1214,20 +1217,6 @@ void Server::handle_client_link(MClientRequest *req, CInode *ref)
CDir *dir = validate_new_dentry_dir(req, ref, dname);
if (!dir) return;
// dentry exists?
CDentry *dn = dir->lookup(dname);
if (dn && (!dn->is_null() || dn->is_xlockedbyother(req))) {
dout(7) << "handle_client_link dn exists " << *dn << endl;
reply_request(req, -EEXIST);
return;
}
// xlock dentry
if (!dn->is_xlockedbyme(req)) {
if (!mds->locker->dentry_xlock_start(dn, req, ref))
return;
}
// discover link target
filepath target = req->get_sarg();
dout(7) << "handle_client_link discovering target " << target << endl;
@ -1241,6 +1230,192 @@ void Server::handle_client_link(MClientRequest *req, CInode *ref)
}
void Server::handle_client_link_2(int r, MClientRequest *req, CInode *diri, vector<CDentry*>& trace)
{
// target dne?
if (r < 0) {
dout(7) << "target " << req->get_sarg() << " dne" << endl;
reply_request(req, r);
return;
}
assert(r == 0);
// identify target inode
CInode *targeti = mdcache->get_root();
if (trace.size()) targeti = trace[trace.size()-1]->inode;
assert(targeti);
// dir?
dout(7) << "target is " << *targeti << endl;
if (targeti->is_dir()) {
dout(7) << "target is a dir, failing" << endl;
reply_request(req, -EINVAL);
return;
}
// can we create the dentry?
CDir *dir = 0;
CDentry *dn = 0;
// make dentry and inode, xlock dentry.
r = prepare_mknod(req, diri, &dir, 0, &dn);
if (!r)
return; // wait on something
assert(dir);
assert(dn);
// ok!
assert(dn->is_xlockedbyme(req));
// local or remote?
if (targeti->is_auth())
link_local(req, diri, dn, targeti);
else
link_remote(req, diri, dn, targeti);
}
class C_MDS_link_local_finish : public Context {
MDS *mds;
MClientRequest *req;
CDentry *dn;
CInode *targeti;
version_t dpv;
time_t tctime;
time_t tpv;
public:
C_MDS_link_local_finish(MDS *m, MClientRequest *r, CDentry *d, CInode *ti, time_t ct) :
mds(m), req(r), dn(d), targeti(ti),
dpv(d->get_projected_version()),
tctime(ct),
tpv(targeti->get_parent_dn()->get_projected_version()) {}
void finish(int r) {
assert(r == 0);
mds->server->_link_local_finish(req, dn, targeti, dpv, tctime, tpv);
}
};
void Server::link_local(MClientRequest *req, CInode *diri,
CDentry *dn, CInode *targeti)
{
dout(10) << "link_local " << *dn << " to " << *targeti << endl;
// anchor target?
if (targeti->get_parent_dir() == dn->get_dir()) {
dout(7) << "target is in the same dir, sweet" << endl;
}
else if (targeti->is_anchored() && !targeti->is_unanchoring()) {
dout(7) << "target anchored already (nlink=" << targeti->inode.nlink << "), sweet" << endl;
} else {
dout(7) << "target needs anchor, nlink=" << targeti->inode.nlink << ", creating anchor" << endl;
mdcache->anchor_create(targeti,
new C_MDS_RetryRequest(mds, req, diri));
return;
}
// wrlock the target inode
if (!mds->locker->inode_hard_write_start(targeti, req))
return; // fw or (wait for) lock
// ok, let's do it.
// prepare log entry
EUpdate *le = new EUpdate("link_local");
// predirty
dn->pre_dirty();
version_t tpdv = targeti->pre_dirty();
// add to event
le->metablob.add_dir_context(dn->get_dir());
le->metablob.add_dentry(dn, true);
le->metablob.add_dir_context(targeti->get_parent_dir());
inode_t *pi = le->metablob.add_dentry(targeti->parent, true, targeti);
// update journaled target inode
pi->nlink++;
pi->ctime = g_clock.gettime();
pi->version = tpdv;
// finisher
C_MDS_link_local_finish *fin = new C_MDS_link_local_finish(mds, req, dn, targeti, pi->ctime);
// log + wait
mdlog->submit_entry(le);
mdlog->wait_for_sync(fin);
}
void Server::_link_local_finish(MClientRequest *req, CDentry *dn, CInode *targeti,
version_t dpv, time_t tctime, version_t tpv)
{
dout(10) << "_link_local_finish " << *dn << " to " << *targeti << endl;
// link and unlock the new dentry
dn->set_remote_ino(targeti->ino());
dn->set_version(dpv);
dn->mark_dirty(dpv);
// update the target
targeti->inode.nlink++;
targeti->inode.ctime = tctime;
targeti->mark_dirty(tpv);
// unlock the new dentry and target inode
mds->locker->dentry_xlock_finish(dn);
mds->locker->inode_hard_write_finish(targeti);
// bump target popularity
mds->balancer->hit_inode(targeti, META_POP_IWR);
// reply
MClientReply *reply = new MClientReply(req, 0);
reply_request(req, reply, dn->get_dir()->get_inode()); // FIXME: imprecise ref
}
void Server::link_remote(MClientRequest *req, CInode *ref,
CDentry *dn, CInode *targeti)
{
dout(10) << "link_remote " << *dn << " to " << *targeti << endl;
// pin the target replica in our cache
assert(!targeti->is_auth());
mdcache->request_pin_inode(req, targeti);
// 1. send LinkPrepare to dest (lock target on dest, journal target update)
// 2. create+journal new dentry, as with link_local.
// 3. send LinkCommit to dest (unlocks target on dest, journals commit)
// IMPLEMENT ME
MClientReply *reply = new MClientReply(req, -EAGAIN);
reply_request(req, reply, dn->get_dir()->get_inode()); // FIXME: imprecise ref
}
/*
void Server::handle_client_link_finish(MClientRequest *req, CInode *ref,
CDentry *dn, CInode *targeti)
{
// create remote link
dn->dir->link_inode(dn, targeti->ino());
dn->link_remote( targeti ); // since we have it
dn->_mark_dirty(); // fixme
mds->balancer->hit_dir(dn->dir, META_POP_DWR);
// done!
commit_request(req, new MClientReply(req, 0), ref,
0); // FIXME i should log something
}
*/
/*
class C_MDS_RemoteLink : public Context {
Server *server;
MClientRequest *req;
@ -1271,61 +1446,6 @@ public:
}
};
void Server::handle_client_link_2(int r, MClientRequest *req, CInode *diri, vector<CDentry*>& trace)
{
// target dne?
if (r < 0) {
dout(7) << "target " << req->get_sarg() << " dne" << endl;
reply_request(req, r);
return;
}
assert(r == 0);
CInode *targeti = mdcache->get_root();
if (trace.size()) targeti = trace[trace.size()-1]->inode;
assert(targeti);
// dir?
dout(7) << "target is " << *targeti << endl;
if (targeti->is_dir()) {
dout(7) << "target is a dir, failing" << endl;
reply_request(req, -EINVAL);
return;
}
// what was the new dentry again?
string dname = req->get_filepath().last_dentry();
frag_t fg = diri->pick_dirfrag(dname);
CDir *dir = diri->get_dirfrag(fg);
assert(dir);
CDentry *dn = dir->lookup(dname);
assert(dn);
assert(dn->is_xlockedbyme(req));
// ok!
if (targeti->is_auth()) {
// mine
// same dir?
if (targeti->get_parent_dir() == dn->get_dir()) {
dout(7) << "target is in the same dir, sweet" << endl;
}
else if (targeti->is_anchored()) {
dout(7) << "target anchored already (nlink=" << targeti->inode.nlink << "), sweet" << endl;
} else {
assert(targeti->inode.nlink == 1);
dout(7) << "target needs anchor, nlink=" << targeti->inode.nlink << ", creating anchor" << endl;
mdcache->anchor_create(targeti,
new C_MDS_RetryRequest(mds, req, diri));
return;
}
// ok, inc link!
targeti->inode.nlink++;
dout(7) << "nlink++, now " << targeti->inode.nlink << " on " << *targeti << endl;
targeti->_mark_dirty(); // fixme
} else {
// remote: send nlink++ request, wait
@ -1338,23 +1458,10 @@ void Server::handle_client_link_2(int r, MClientRequest *req, CInode *diri, vect
return;
}
handle_client_link_finish(req, diri, dn, targeti);
}
*/
void Server::handle_client_link_finish(MClientRequest *req, CInode *ref,
CDentry *dn, CInode *targeti)
{
// create remote link
dn->dir->link_inode(dn, targeti->ino());
dn->link_remote( targeti ); // since we have it
dn->_mark_dirty(); // fixme
mds->balancer->hit_dir(dn->dir, META_POP_DWR);
// done!
commit_request(req, new MClientReply(req, 0), ref,
0); // FIXME i should log something
}
// UNLINK
@ -1403,8 +1510,7 @@ void Server::handle_client_unlink(MClientRequest *req,
// have it. locked?
if (!dn->can_read(req)) {
dout(10) << " waiting on " << *dn << endl;
dir->add_waiter(CDir::WAIT_DNREAD,
name,
dir->add_waiter(CDir::WAIT_DNREAD, name,
new C_MDS_RetryRequest(mds, req, diri));
return;
}
@ -1416,13 +1522,28 @@ void Server::handle_client_unlink(MClientRequest *req,
return;
}
// remote? if so, open up the inode.
if (!dn->inode) {
assert(dn->is_remote());
CInode *in = mdcache->get_inode(dn->get_remote_ino());
if (in) {
dout(7) << "linking in remote in " << *in << endl;
dn->link_remote(in);
} else {
dout(10) << "remote dn, opening inode for " << *dn << endl;
mdcache->open_remote_ino(dn->get_remote_ino(), req,
new C_MDS_RetryRequest(mds, req, diri));
return;
}
}
// ok!
CInode *in = dn->inode;
assert(in);
if (rmdir) {
dout(7) << "handle_client_rmdir on dir " << *in << endl;
dout(7) << "handle_client_rmdir on " << *in << endl;
} else {
dout(7) << "handle_client_unlink on non-dir " << *in << endl;
dout(7) << "handle_client_unlink on " << *in << endl;
}
int inauth = in->authority().first;
@ -1495,10 +1616,9 @@ void Server::handle_client_unlink(MClientRequest *req,
}
// am i dentry auth?
if (inauth != mds->get_nodeid()) {
// not auth; forward!
dout(7) << "handle_client_unlink not auth for " << *dir << " dn " << dn->name << ", fwd to " << inauth << endl;
mdcache->request_forward(req, inauth);
if (!dn->is_auth()) {
dout(7) << "handle_client_unlink/rmdir not auth for " << *dn << endl;
mdcache->request_forward(req, dn->authority().first);
return;
}
@ -1508,21 +1628,6 @@ void Server::handle_client_unlink(MClientRequest *req,
if (!mds->locker->dentry_xlock_start(dn, req, diri))
return;
// is this a remote link?
if (dn->is_remote() && !dn->inode) {
CInode *in = mdcache->get_inode(dn->get_remote_ino());
if (in) {
dn->link_remote(in);
} else {
// open inode
dout(7) << "opening target inode first, ino is " << dn->get_remote_ino() << endl;
mdcache->open_remote_ino(dn->get_remote_ino(), req,
new C_MDS_RetryRequest(mds, req, diri));
return;
}
}
mds->balancer->hit_dir(dn->dir, META_POP_DWR);
// it's locked, unlink!
@ -2167,7 +2272,7 @@ void Server::handle_client_openc(MClientRequest *req, CInode *diri)
// make dentry and inode, xlock dentry.
int r = prepare_mknod(req, diri, &dir, &in, &dn);
if (!r)
if (r < 0)
return; // wait on something
assert(dir);
assert(in);
@ -2176,6 +2281,7 @@ void Server::handle_client_openc(MClientRequest *req, CInode *diri)
if (r == 1) {
// created.
// it's a file.
dn->pre_dirty();
in->inode.mode = 0644; // FIXME req should have a umask
in->inode.mode |= INODE_MODE_FILE;

View File

@ -90,12 +90,20 @@ public:
// namespace changes
void handle_client_mknod(MClientRequest *req, CInode *ref);
void handle_client_link(MClientRequest *req, CInode *ref);
void handle_client_link_2(int r, MClientRequest *req, CInode *ref, vector<CDentry*>& trace);
void handle_client_link_finish(MClientRequest *req, CInode *ref,
CDentry *dn, CInode *targeti);
void link_local(MClientRequest *req, CInode *diri,
CDentry *dn, CInode *targeti);
void _link_local_finish(MClientRequest *req,
CDentry *dn, CInode *targeti,
version_t, time_t, version_t);
void link_remote(MClientRequest *req, CInode *diri,
CDentry *dn, CInode *targeti);
void handle_client_unlink(MClientRequest *req, CInode *ref);
void handle_client_rename(MClientRequest *req, CInode *ref);
void handle_client_rename_2(MClientRequest *req,
CInode *ref,

View File

@ -25,6 +25,7 @@ class EAnchor : public LogEvent {
protected:
int op;
inodeno_t ino;
version_t atid;
vector<Anchor> trace;
version_t version; // anchor table version
@ -32,7 +33,10 @@ protected:
EAnchor() : LogEvent(EVENT_ANCHOR) { }
EAnchor(int o, inodeno_t i, version_t v) :
LogEvent(EVENT_ANCHOR),
op(o), ino(i), version(v) { }
op(o), ino(i), atid(0), version(v) { }
EAnchor(int o, version_t a, version_t v=0) :
LogEvent(EVENT_ANCHOR),
op(o), atid(a), version(v) { }
void set_trace(vector<Anchor>& t) { trace = t; }
vector<Anchor>& get_trace() { return trace; }
@ -40,6 +44,7 @@ protected:
void encode_payload(bufferlist& bl) {
bl.append((char*)&op, sizeof(op));
bl.append((char*)&ino, sizeof(ino));
bl.append((char*)&atid, sizeof(atid));
::_encode(trace, bl);
bl.append((char*)&version, sizeof(version));
}
@ -48,17 +53,20 @@ protected:
off += sizeof(op);
bl.copy(off, sizeof(ino), (char*)&ino);
off += sizeof(ino);
bl.copy(off, sizeof(atid), (char*)&atid);
off += sizeof(atid);
::_decode(trace, bl, off);
bl.copy(off, sizeof(version), (char*)&version);
off += sizeof(version);
}
void print(ostream& out) {
out << "EAnchor " << get_anchor_opname(op) << " " << ino << endl;
out << "EAnchor " << get_anchor_opname(op);
if (ino) out << " " << ino;
if (atid) out << " atid " << atid;
if (version) out << " v " << version;
}
bool has_expired(MDS *mds);
void expire(MDS *mds, Context *c);
void replay(MDS *mds);

View File

@ -209,8 +209,15 @@ class EMetaBlob {
list<dirfrag_t> lump_order;
map<dirfrag_t, dirlump> lump_map;
// anchor transactions included in this update.
list<version_t> atids;
public:
void add_anchor_transaction(version_t atid) {
atids.push_back(atid);
}
// remote pointer to to-be-journaled inode iff it's a normal (non-remote) dentry
inode_t *add_dentry(CDentry *dn, bool dirty, CInode *in=0) {
CDir *dir = dn->get_dir();
@ -303,6 +310,7 @@ class EMetaBlob {
bl.append((char*)&(*i), sizeof(*i));
lump_map[*i]._encode(bl);
}
::_encode(atids, bl);
}
void _decode(bufferlist& bl, int& off) {
int n;
@ -315,14 +323,16 @@ class EMetaBlob {
lump_order.push_back(dirfrag);
lump_map[dirfrag]._decode(bl, off);
}
::_decode(atids, bl, off);
}
void print(ostream& out) const {
if (lump_order.empty())
out << "[metablob empty]";
else
out << "[metablob " << lump_order.front()
<< ", " << lump_map.size() << " dirs]";
out << "[metablob";
if (!lump_order.empty())
out << lump_order.front() << ", " << lump_map.size() << " dirs";
if (!atids.empty())
out << " atids " << atids;
out << "]";
}
bool has_expired(MDS *mds);

View File

@ -33,6 +33,7 @@
#include "MDCache.h"
#include "Migrator.h"
#include "AnchorTable.h"
#include "AnchorClient.h"
#include "config.h"
#undef dout
@ -121,6 +122,17 @@ bool EMetaBlob::has_expired(MDS *mds)
assert(0); // i goofed the logic
}
// have my anchortable ops committed?
for (list<version_t>::iterator p = atids.begin();
p != atids.end();
++p) {
if (!mds->anchorclient->has_committed(*p)) {
dout(10) << "EMetaBlob.has_expired anchor transaction " << *p
<< " not yet acked" << endl;
return false;
}
}
return true; // all dirlumps expired.
}
@ -178,12 +190,10 @@ void EMetaBlob::expire(MDS *mds, Context *c)
assert(0); // hrm
}
// commit or wait for export
// FIXME: what if export aborts? need to retry!
assert(!commit.empty() || !waitfor_export.empty());
//C_Gather *gather = new C_Gather(new C_journal_RetryExpire(mds, this, c));
// set up gather context
C_Gather *gather = new C_Gather(c);
// do or wait for exports and commits
for (map<CDir*,version_t>::iterator p = commit.begin();
p != commit.end();
++p) {
@ -198,6 +208,18 @@ void EMetaBlob::expire(MDS *mds, Context *c)
p != waitfor_export.end();
++p)
mds->mdcache->migrator->add_export_finish_waiter(*p, gather->new_sub());
// have my anchortable ops committed?
for (list<version_t>::iterator p = atids.begin();
p != atids.end();
++p) {
if (!mds->anchorclient->has_committed(*p)) {
dout(10) << "EMetaBlob.expire anchor transaction " << *p
<< " not yet acked, waiting" << endl;
mds->anchorclient->wait_for_ack(*p, gather->new_sub());
}
}
}
void EMetaBlob::replay(MDS *mds)
@ -300,6 +322,14 @@ void EMetaBlob::replay(MDS *mds)
}
}
}
for (list<version_t>::iterator p = atids.begin();
p != atids.end();
++p) {
dout(10) << "EMetaBlob.replay noting anchor transaction " << *p << endl;
mds->anchorclient->got_journaled_agree(*p);
}
}
// -----------------------
@ -438,7 +468,7 @@ void EAlloc::replay(MDS *mds)
bool EAnchor::has_expired(MDS *mds)
{
version_t cv = mds->anchortable->get_version();
version_t cv = mds->anchortable->get_committed_version();
if (cv < version) {
dout(10) << "EAnchor.has_expired v " << version << " > " << cv
<< ", still dirty" << endl;
@ -467,24 +497,25 @@ void EAnchor::replay(MDS *mds)
assert(version-1 == mds->anchortable->get_version());
switch (op) {
// anchortable
case ANCHOR_OP_CREATE_PREPARE:
mds->anchortable->create_prepare(ino, trace);
break;
case ANCHOR_OP_CREATE_COMMIT:
mds->anchortable->create_commit(ino);
break;
case ANCHOR_OP_DESTROY_PREPARE:
mds->anchortable->destroy_prepare(ino);
break;
case ANCHOR_OP_DESTROY_COMMIT:
mds->anchortable->destroy_commit(ino);
break;
case ANCHOR_OP_UPDATE_PREPARE:
mds->anchortable->update_prepare(ino, trace);
break;
case ANCHOR_OP_UPDATE_COMMIT:
mds->anchortable->update_commit(ino);
case ANCHOR_OP_COMMIT:
mds->anchortable->commit(atid);
break;
// anchorclient
case ANCHOR_OP_ACK:
mds->anchorclient->got_journaled_ack(atid);
break;
default:
assert(0);
}

View File

@ -18,24 +18,25 @@
#include <vector>
#include "msg/Message.h"
#include "mds/AnchorTable.h"
#include "mds/Anchor.h"
class MAnchor : public Message {
int op;
inodeno_t ino;
vector<Anchor> trace;
version_t atid; // anchor table version.
public:
MAnchor() {}
MAnchor(int o, inodeno_t i) :
Message(MSG_MDS_ANCHOR),
op(o), ino(i) { }
MAnchor(int o, inodeno_t i, version_t v=0) :
Message(MSG_MDS_ANCHOR),
op(o), ino(i), atid(v) { }
virtual char *get_type_name() { return "anchor"; }
void print(ostream& o) {
o << "anchor(" << get_anchor_opname(op) << " " << ino;
if (atid) o << " atid " << atid;
for (unsigned i=0; i<trace.size(); i++) {
o << ' ' << trace[i];
}
@ -49,6 +50,7 @@ class MAnchor : public Message {
int get_op() { return op; }
inodeno_t get_ino() { return ino; }
vector<Anchor>& get_trace() { return trace; }
version_t get_atid() { return atid; }
virtual void decode_payload() {
int off = 0;
@ -56,19 +58,16 @@ class MAnchor : public Message {
off += sizeof(op);
payload.copy(off, sizeof(ino), (char*)&ino);
off += sizeof(ino);
payload.copy(off, sizeof(atid), (char*)&atid);
off += sizeof(atid);
::_decode(trace, payload, off);
}
virtual void encode_payload() {
payload.append((char*)&op, sizeof(op));
payload.append((char*)&ino, sizeof(ino));
payload.append((char*)&atid, sizeof(atid));
::_encode(trace, payload);
/*
int n = trace.size();
payload.append((char*)&n, sizeof(int));
for (int i=0; i<n; i++)
trace[i]->_encode(payload);
*/
}
};

View File

@ -21,6 +21,9 @@ typedef struct {
} MInodeLink_st;
class MInodeLink : public Message {
inodeno_t ino;
filepath link_name;
bool prepare;
MInodeLink_st st;
public: