From d4a88488b42dfc8a89634f8db8dcd8ae7fb23a82 Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 14 Mar 2007 17:54:32 +0000 Subject: [PATCH] * AnchorTable/AnchorClient update to handle failure for table of initiator. * Simplified anchor ops. * Rollback. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1231 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/TODO | 3 +- .../sage/cephmds2/client/SyntheticClient.cc | 90 +++++ .../sage/cephmds2/client/SyntheticClient.h | 6 + branches/sage/cephmds2/doc/anchortable.txt | 53 +++ branches/sage/cephmds2/mds/Anchor.h | 42 ++- branches/sage/cephmds2/mds/AnchorClient.cc | 214 ++++++++--- branches/sage/cephmds2/mds/AnchorClient.h | 58 ++- branches/sage/cephmds2/mds/AnchorTable.cc | 314 +++++++++++----- branches/sage/cephmds2/mds/AnchorTable.h | 40 +- branches/sage/cephmds2/mds/CDir.cc | 6 +- branches/sage/cephmds2/mds/CInode.h | 4 +- branches/sage/cephmds2/mds/MDCache.cc | 53 ++- branches/sage/cephmds2/mds/MDCache.h | 8 +- branches/sage/cephmds2/mds/MDS.cc | 19 +- branches/sage/cephmds2/mds/Server.cc | 352 ++++++++++++------ branches/sage/cephmds2/mds/Server.h | 12 +- branches/sage/cephmds2/mds/events/EAnchor.h | 18 +- branches/sage/cephmds2/mds/events/EMetaBlob.h | 20 +- branches/sage/cephmds2/mds/journal.cc | 59 ++- branches/sage/cephmds2/messages/MAnchor.h | 21 +- branches/sage/cephmds2/messages/MInodeLink.h | 3 + 21 files changed, 1003 insertions(+), 392 deletions(-) create mode 100644 branches/sage/cephmds2/doc/anchortable.txt diff --git a/branches/sage/cephmds2/TODO b/branches/sage/cephmds2/TODO index 6fb7ca156c9..5431bbe3d96 100644 --- a/branches/sage/cephmds2/TODO +++ b/branches/sage/cephmds2/TODO @@ -39,10 +39,11 @@ mds - clean up client op redirection - idempotent ops - journal+recovery + - anchortable, anchorclient + - link - unlink - open(wr cap), open+create - file capabilities i/o - - link - rename - dirfrags vs readdir - redo hard links diff --git a/branches/sage/cephmds2/client/SyntheticClient.cc b/branches/sage/cephmds2/client/SyntheticClient.cc index 8a3fc4e9d79..685ee26740b 100644 --- a/branches/sage/cephmds2/client/SyntheticClient.cc +++ b/branches/sage/cephmds2/client/SyntheticClient.cc @@ -121,6 +121,16 @@ void parse_syn_options(vector& args) syn_sargs.push_back( args[++i] ); syn_iargs.push_back( atoi(args[++i]) ); + } else if (strcmp(args[i],"thrashlinks") == 0) { + syn_modes.push_back( SYNCLIENT_MODE_THRASHLINKS ); + syn_iargs.push_back( atoi(args[++i]) ); + syn_iargs.push_back( atoi(args[++i]) ); + syn_iargs.push_back( atoi(args[++i]) ); + + + } else if (strcmp(args[i],"foo") == 0) { + syn_modes.push_back( SYNCLIENT_MODE_FOO ); + } else if (strcmp(args[i],"until") == 0) { syn_modes.push_back( SYNCLIENT_MODE_UNTIL ); syn_iargs.push_back( atoi(args[++i]) ); @@ -216,6 +226,11 @@ int SyntheticClient::run() dout(3) << "mode " << mode << endl; switch (mode) { + case SYNCLIENT_MODE_FOO: + if (run_me()) + foo(); + break; + case SYNCLIENT_MODE_RANDOMSLEEP: { int iarg1 = iargs.front(); @@ -334,6 +349,21 @@ int SyntheticClient::run() break; + case SYNCLIENT_MODE_THRASHLINKS: + { + string sarg1 = get_sarg(0); + int iarg1 = iargs.front(); iargs.pop_front(); + int iarg2 = iargs.front(); iargs.pop_front(); + int iarg3 = iargs.front(); iargs.pop_front(); + if (run_me()) { + dout(2) << "thrashlinks " << sarg1 << " " << iarg1 << " " << iarg2 << " " << iarg3 << endl; + thrash_links(sarg1.c_str(), iarg1, iarg2, iarg3); + } + } + break; + + + case SYNCLIENT_MODE_MAKEFILES: { int num = iargs.front(); iargs.pop_front(); @@ -1289,3 +1319,63 @@ void SyntheticClient::make_dir_mess(const char *basedir, int n) } + + +void SyntheticClient::foo() +{ + client->mknod("one", 0755); + client->mknod("two", 0755); + client->link("one", "three"); + client->mkdir("dir", 0755); + client->link("two", "/dir/twolink"); + client->link("dir/twolink", "four"); +} + +int SyntheticClient::thrash_links(const char *basedir, int dirs, int files, int depth) +{ + dout(1) << "thrash_links " << basedir << " " << dirs << " " << files << " " << depth << endl; + + if (time_to_stop()) return 0; + + // first make dir/file tree + make_dirs(basedir,dirs,files,depth); + + // now link shit up + int n = files*dirs; + for (int i=0; ilink(file.c_str(), ln.c_str()); + } + + return 0; +} + + diff --git a/branches/sage/cephmds2/client/SyntheticClient.h b/branches/sage/cephmds2/client/SyntheticClient.h index a83f4bc12ae..bf6b6f8f3e8 100644 --- a/branches/sage/cephmds2/client/SyntheticClient.h +++ b/branches/sage/cephmds2/client/SyntheticClient.h @@ -54,6 +54,8 @@ #define SYNCLIENT_MODE_SLEEP 62 +#define SYNCLIENT_MODE_FOO 100 +#define SYNCLIENT_MODE_THRASHLINKS 101 void parse_syn_options(vector& args); @@ -196,6 +198,10 @@ class SyntheticClient { void make_dir_mess(const char *basedir, int n); + void foo(); + + int thrash_links(const char *basedir, int dirs, int files, int depth); + }; #endif diff --git a/branches/sage/cephmds2/doc/anchortable.txt b/branches/sage/cephmds2/doc/anchortable.txt new file mode 100644 index 00000000000..ab036a14877 --- /dev/null +++ b/branches/sage/cephmds2/doc/anchortable.txt @@ -0,0 +1,53 @@ + +ANCHOR TABLE PROTOCOL + +MDS sends an update PREPARE to the anchortable MDS. The prepare is +identified by the ino and operation type; only one for each type +(create, update, destroy) can be pending at any time. Both parties +may actually be the same local node, but for simplicity we treat that +situation the same. (That is, we act as if they may fail +independently, even if they can't.) + +The anchortable journals the proposed update, and responds with an +AGREE and a version number. This uniquely identifies the request. + +The MDS can then update the filesystem metadata however it sees fit. +When it is finished (and the results journaled), it sends a COMMIT to +the anchortable. The table journals the commit, frees any state from +the transaction, and sends an ACK. The initiating MDS should then +journal the ACK to complete the transaction. + + +ANCHOR TABLE FAILURE + +If the AT fails before journaling the PREPARE and sending the AGREE, +the initiating MDS will simply retry the request. + +If the AT fails after journaling PREPARE but before journaling COMMIT, +it will resend AGREE to the initiating MDS. + +If the AT fails after the COMMIT, the transaction has been closed, and it +takes no action. If it receives a COMMIT for which it has no open +transaction, it will reply with ACK. + + +INITIATING MDS FAILURE + +If the MDS fails before the metadata update has been journaled, no +action is taken, since nothing is known about the previously proposed +transaction. If an AGREE message is received and there is no +corresponding PREPARE state, and ROLLBACK is sent to the anchor table. + +If the MDS fails after journaling the metadata update but before +journaling the ACK, it resends COMMIT to the anchor table. If it +receives an AGREE after resending the COMMIT, it simply ignores the +AGREE. The anchortable will respond with an ACK, allowing the +initiating MDS to journal the final ACK and close out the transaction +locally. + +On journal replay, each metadata update (EMetaBlob) encountered that +includes an anchor transaction is noted in the AnchorClient by adding +it to the pending_commit list, and each journaled ACK is removed from +that list. Journal replay may enounter ACKs with no prior metadata +update; these are ignored. When recovery finishes, a COMMIT is sent +for all outstanding transactions. diff --git a/branches/sage/cephmds2/mds/Anchor.h b/branches/sage/cephmds2/mds/Anchor.h index 36496a0e056..ba0092d2c46 100644 --- a/branches/sage/cephmds2/mds/Anchor.h +++ b/branches/sage/cephmds2/mds/Anchor.h @@ -25,34 +25,46 @@ using std::string; // anchor ops #define ANCHOR_OP_LOOKUP 1 #define ANCHOR_OP_LOOKUP_REPLY 2 -#define ANCHOR_OP_CREATE_PREPARE 3 -#define ANCHOR_OP_CREATE_ACK 4 -#define ANCHOR_OP_CREATE_COMMIT 5 -#define ANCHOR_OP_DESTROY_PREPARE 6 -#define ANCHOR_OP_DESTROY_ACK 7 -#define ANCHOR_OP_DESTROY_COMMIT 8 -#define ANCHOR_OP_UPDATE_PREPARE 9 -#define ANCHOR_OP_UPDATE_ACK 10 -#define ANCHOR_OP_UPDATE_COMMIT 11 + +#define ANCHOR_OP_CREATE_PREPARE 11 +#define ANCHOR_OP_CREATE_AGREE 12 + +#define ANCHOR_OP_DESTROY_PREPARE 21 +#define ANCHOR_OP_DESTROY_AGREE 22 + +#define ANCHOR_OP_UPDATE_PREPARE 31 +#define ANCHOR_OP_UPDATE_AGREE 32 + +#define ANCHOR_OP_COMMIT 41 +#define ANCHOR_OP_ACK 42 +#define ANCHOR_OP_ROLLBACK 43 + + inline const char* get_anchor_opname(int o) { switch (o) { case ANCHOR_OP_LOOKUP: return "lookup"; case ANCHOR_OP_LOOKUP_REPLY: return "lookup_reply"; + case ANCHOR_OP_CREATE_PREPARE: return "create_prepare"; - case ANCHOR_OP_CREATE_ACK: return "create_ack"; - case ANCHOR_OP_CREATE_COMMIT: return "create_commit"; + case ANCHOR_OP_CREATE_AGREE: return "create_agree"; case ANCHOR_OP_DESTROY_PREPARE: return "destroy_prepare"; - case ANCHOR_OP_DESTROY_ACK: return "destroy_ack"; - case ANCHOR_OP_DESTROY_COMMIT: return "destroy_commit"; + case ANCHOR_OP_DESTROY_AGREE: return "destroy_agree"; case ANCHOR_OP_UPDATE_PREPARE: return "update_prepare"; - case ANCHOR_OP_UPDATE_ACK: return "update_ack"; - case ANCHOR_OP_UPDATE_COMMIT: return "update_commit"; + case ANCHOR_OP_UPDATE_AGREE: return "update_agree"; + + case ANCHOR_OP_COMMIT: return "commit"; + case ANCHOR_OP_ACK: return "ack"; + case ANCHOR_OP_ROLLBACK: return "rollback"; default: assert(0); } } +// identifies a anchor table mutation + + + // anchor type class Anchor { diff --git a/branches/sage/cephmds2/mds/AnchorClient.cc b/branches/sage/cephmds2/mds/AnchorClient.cc index a94b5c0d741..c28231ca9b1 100644 --- a/branches/sage/cephmds2/mds/AnchorClient.cc +++ b/branches/sage/cephmds2/mds/AnchorClient.cc @@ -24,13 +24,15 @@ using std::endl; #include "msg/Messenger.h" #include "MDS.h" +#include "MDLog.h" +#include "events/EAnchor.h" #include "messages/MAnchor.h" #include "config.h" #undef dout -#define dout(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << messenger->get_myaddr() << ".anchorclient " -#define derr(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << messenger->get_myaddr() << ".anchorclient " +#define dout(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient " +#define derr(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient " void AnchorClient::dispatch(Message *m) @@ -47,17 +49,18 @@ void AnchorClient::dispatch(Message *m) void AnchorClient::handle_anchor_reply(class MAnchor *m) { + inodeno_t ino = m->get_ino(); + version_t atid = m->get_atid(); + switch (m->get_op()) { + // lookup case ANCHOR_OP_LOOKUP_REPLY: + assert(pending_lookup.count(ino)); { - assert(pending_lookup_trace.count(m->get_ino()) == 1); - - *(pending_lookup_trace[ m->get_ino() ]) = m->get_trace(); - Context *onfinish = pending_lookup[ m->get_ino() ]; - - pending_lookup_trace.erase(m->get_ino()); - pending_lookup.erase(m->get_ino()); + *pending_lookup[ino].trace = m->get_trace(); + Context *onfinish = pending_lookup[ino].onfinish; + pending_lookup.erase(ino); if (onfinish) { onfinish->finish(0); @@ -66,19 +69,100 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) } break; - case ANCHOR_OP_UPDATE_ACK: - case ANCHOR_OP_CREATE_ACK: - case ANCHOR_OP_DESTROY_ACK: - { - assert(pending_op.count(m->get_ino()) == 1); + // prepare -> agree + case ANCHOR_OP_CREATE_AGREE: + if (pending_create_prepare.count(ino)) { + dout(10) << "got create_agree on " << ino << " atid " << atid << endl; + Context *onfinish = pending_create_prepare[ino].onfinish; + *pending_create_prepare[ino].patid = atid; + pending_create_prepare.erase(ino); - Context *onfinish = pending_op[m->get_ino()]; - pending_op.erase(m->get_ino()); + pending_commit.insert(atid); + + if (onfinish) { + onfinish->finish(0); + delete onfinish; + } + } else { + dout(10) << "stray create_agree on " << ino + << " atid " << atid + << ", sending ROLLBACK" + << endl; + MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid); + mds->messenger->send_message(req, + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), + MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + } + break; + + case ANCHOR_OP_DESTROY_AGREE: + if (pending_destroy_prepare.count(ino)) { + dout(10) << "got destroy_agree on " << ino << " atid " << atid << endl; + Context *onfinish = pending_destroy_prepare[ino].onfinish; + *pending_destroy_prepare[ino].patid = atid; + pending_destroy_prepare.erase(ino); + + pending_commit.insert(atid); if (onfinish) { onfinish->finish(0); delete onfinish; } + } else { + dout(10) << "stray destroy_agree on " << ino + << " atid " << atid + << ", sending ROLLBACK" + << endl; + MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid); + mds->messenger->send_message(req, + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), + MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + } + break; + + case ANCHOR_OP_UPDATE_AGREE: + if (pending_update_prepare.count(ino)) { + dout(10) << "got update_agree on " << ino << " atid " << atid << endl; + Context *onfinish = pending_update_prepare[ino].onfinish; + *pending_update_prepare[ino].patid = atid; + pending_update_prepare.erase(ino); + + pending_commit.insert(atid); + + if (onfinish) { + onfinish->finish(0); + delete onfinish; + } + } else { + dout(10) << "stray update_agree on " << ino + << " atid " << atid + << ", sending ROLLBACK" + << endl; + MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid); + mds->messenger->send_message(req, + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), + MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + } + break; + + // commit -> ack + case ANCHOR_OP_ACK: + { + dout(10) << "got ack on atid " << atid << ", logging" << endl; + + // remove from committing list + assert(pending_commit.count(atid)); + pending_commit.erase(atid); + + // log ACK. + mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_ACK, 0, atid)); // ino doesn't matter. + + // kick any waiters + if (ack_waiters.count(atid)) { + dout(15) << "kicking waiters on atid " << atid << endl; + mds->queue_finished(ack_waiters[atid]); + ack_waiters.erase(atid); + } } break; @@ -86,6 +170,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) assert(0); } + delete m; } @@ -105,77 +190,96 @@ void AnchorClient::lookup(inodeno_t ino, vector& trace, Context *onfinis // send message MAnchor *req = new MAnchor(ANCHOR_OP_LOOKUP, ino); - pending_lookup_trace[ino] = &trace; - pending_lookup[ino] = onfinish; + assert(pending_lookup.count(ino) == 0); + pending_lookup[ino].onfinish = onfinish; + pending_lookup[ino].trace = &trace; - messenger->send_message(req, - mdsmap->get_inst(mdsmap->get_anchortable()), + mds->messenger->send_message(req, + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); } -void AnchorClient::prepare_create(inodeno_t ino, vector& trace, Context *onfinish) + +// PREPARE + +void AnchorClient::prepare_create(inodeno_t ino, vector& trace, + version_t *patid, Context *onfinish) { // send message MAnchor *req = new MAnchor(ANCHOR_OP_CREATE_PREPARE, ino); req->set_trace(trace); - pending_op[ino] = onfinish; + pending_create_prepare[ino].trace = trace; + pending_create_prepare[ino].patid = patid; + pending_create_prepare[ino].onfinish = onfinish; - messenger->send_message(req, - mdsmap->get_inst(mdsmap->get_anchortable()), + mds->messenger->send_message(req, + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); } -void AnchorClient::commit_create(inodeno_t ino) -{ - // send message - MAnchor *req = new MAnchor(ANCHOR_OP_CREATE_COMMIT, ino); - messenger->send_message(req, - mdsmap->get_inst(mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); -} - - -void AnchorClient::prepare_destroy(inodeno_t ino, Context *onfinish) +void AnchorClient::prepare_destroy(inodeno_t ino, + version_t *patid, Context *onfinish) { // send message MAnchor *req = new MAnchor(ANCHOR_OP_DESTROY_PREPARE, ino); - pending_op[ino] = onfinish; - messenger->send_message(req, - mdsmap->get_inst(mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); -} -void AnchorClient::commit_destroy(inodeno_t ino) -{ - // send message - MAnchor *req = new MAnchor(ANCHOR_OP_DESTROY_COMMIT, ino); - messenger->send_message(req, - mdsmap->get_inst(mdsmap->get_anchortable()), + pending_destroy_prepare[ino].onfinish = onfinish; + pending_destroy_prepare[ino].patid = patid; + mds->messenger->send_message(req, + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); } - -void AnchorClient::prepare_update(inodeno_t ino, vector& trace, Context *onfinish) +void AnchorClient::prepare_update(inodeno_t ino, vector& trace, + version_t *patid, Context *onfinish) { // send message MAnchor *req = new MAnchor(ANCHOR_OP_UPDATE_PREPARE, ino); req->set_trace(trace); - pending_op[ino] = onfinish; + pending_update_prepare[ino].trace = trace; + pending_update_prepare[ino].patid = patid; + pending_update_prepare[ino].onfinish = onfinish; - messenger->send_message(req, - mdsmap->get_inst(mdsmap->get_anchortable()), + mds->messenger->send_message(req, + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); } -void AnchorClient::commit_update(inodeno_t ino) + +// COMMIT + +void AnchorClient::commit(version_t atid) { + dout(10) << "commit " << atid << endl; + + assert(pending_commit.count(atid)); + pending_commit.insert(atid); + // send message - MAnchor *req = new MAnchor(ANCHOR_OP_UPDATE_COMMIT, ino); - messenger->send_message(req, - mdsmap->get_inst(mdsmap->get_anchortable()), + MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid); + mds->messenger->send_message(req, + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); } + +// RECOVERY + +void AnchorClient::finish_recovery() +{ + dout(7) << "finish_recovery - sending COMMIT on un-ACKed atids" << endl; + + for (set::iterator p = pending_commit.begin(); + p != pending_commit.end(); + ++p) { + dout(10) << " sending COMMIT on " << *p << endl; + MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, *p); + mds->messenger->send_message(req, + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), + MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + } +} + diff --git a/branches/sage/cephmds2/mds/AnchorClient.h b/branches/sage/cephmds2/mds/AnchorClient.h index 0911c85e5aa..6f036f27ab8 100644 --- a/branches/sage/cephmds2/mds/AnchorClient.h +++ b/branches/sage/cephmds2/mds/AnchorClient.h @@ -24,40 +24,66 @@ using __gnu_cxx::hash_map; #include "Anchor.h" -class Messenger; -class MDSMap; class Context; +class MDS; class AnchorClient : public Dispatcher { - Messenger *messenger; - MDSMap *mdsmap; + MDS *mds; // lookups - hash_map pending_lookup; - hash_map*> pending_lookup_trace; + struct _pending_lookup { + vector *trace; + Context *onfinish; + }; + hash_map pending_lookup; - // updates - hash_map pending_op; + // prepares + struct _pending_prepare { + vector trace; + Context *onfinish; + version_t *patid; // ptr to atid + }; + hash_map pending_create_prepare; + hash_map pending_destroy_prepare; + hash_map pending_update_prepare; + + // pending commits + set pending_commit; + map > ack_waiters; void handle_anchor_reply(class MAnchor *m); - public: - AnchorClient(Messenger *ms, MDSMap *mm) : messenger(ms), mdsmap(mm) {} + AnchorClient(MDS *m) : mds(m) {} void dispatch(Message *m); // async user interface void lookup(inodeno_t ino, vector& trace, Context *onfinish); - void prepare_create(inodeno_t ino, vector& trace, Context *onfinish); - void commit_create(inodeno_t ino); + void prepare_create(inodeno_t ino, vector& trace, version_t *atid, Context *onfinish); + void prepare_destroy(inodeno_t ino, version_t *atid, Context *onfinish); + void prepare_update(inodeno_t ino, vector& trace, version_t *atid, Context *onfinish); - void prepare_destroy(inodeno_t ino, Context *onfinish); - void commit_destroy(inodeno_t ino); + void commit(version_t atid); - void prepare_update(inodeno_t ino, vector& trace, Context *onfinish); - void commit_update(inodeno_t ino); + // for recovery (by other nodes) + void handle_mds_recovery(int mds); // called when someone else recovers + + // for recovery (by me) + void got_journaled_agree(version_t atid) { + pending_commit.insert(atid); + } + void got_journaled_ack(version_t atid) { + pending_commit.erase(atid); + } + bool has_committed(version_t atid) { + return pending_commit.count(atid) == 0; + } + void wait_for_ack(version_t atid, Context *c) { + ack_waiters[atid].push_back(c); + } + void finish_recovery(); // called when i recover and go active }; diff --git a/branches/sage/cephmds2/mds/AnchorTable.cc b/branches/sage/cephmds2/mds/AnchorTable.cc index f58a3c65c44..c20daf79c52 100644 --- a/branches/sage/cephmds2/mds/AnchorTable.cc +++ b/branches/sage/cephmds2/mds/AnchorTable.cc @@ -26,10 +26,19 @@ #include "config.h" #undef dout -#define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchortable " -#define derr(x) if (x <= g_conf.debug_mds) cerr << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchortable " +#define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myname() << ".anchortable " +#define derr(x) if (x <= g_conf.debug_mds) cerr << g_clock.now() << " " << mds->messenger->get_myname() << ".anchortable " +void AnchorTable::dump() +{ + dout(7) << "dump v " << version << endl; + for (hash_map::iterator it = anchor_map.begin(); + it != anchor_map.end(); + it++) + dout(15) << "dump " << it->second << endl; +} + /* * basic updates @@ -37,7 +46,7 @@ bool AnchorTable::add(inodeno_t ino, dirfrag_t dirfrag) { - dout(7) << "add " << ino << " dirfrag " << dirfrag << endl; + dout(17) << "add " << ino << " dirfrag " << dirfrag << endl; // parent should be there assert(dirfrag.ino < MDS_INO_BASE || // system dirino @@ -46,10 +55,10 @@ bool AnchorTable::add(inodeno_t ino, dirfrag_t dirfrag) if (anchor_map.count(ino) == 0) { // new item anchor_map[ino] = Anchor(ino, dirfrag); - dout(10) << " add: added " << anchor_map[ino] << endl; + dout(10) << "add added " << anchor_map[ino] << endl; return true; } else { - dout(10) << " add: had " << anchor_map[ino] << endl; + dout(10) << "add had " << anchor_map[ino] << endl; return false; } } @@ -59,9 +68,9 @@ void AnchorTable::inc(inodeno_t ino) dout(7) << "inc " << ino << endl; assert(anchor_map.count(ino)); - Anchor &anchor = anchor_map[ino]; while (1) { + Anchor &anchor = anchor_map[ino]; anchor.nref++; dout(10) << " inc: record now " << anchor << endl; @@ -69,7 +78,6 @@ void AnchorTable::inc(inodeno_t ino) if (ino == 0) break; if (anchor_map.count(ino) == 0) break; - anchor = anchor_map[ino]; } } @@ -143,63 +151,109 @@ void AnchorTable::create_prepare(inodeno_t ino, vector& trace) for (unsigned i=0; i& trace) { - pending_update[ino] = trace; version++; + pending_update[version].first = ino; + pending_update[version].second = trace; + //dump(); } -void AnchorTable::update_commit(inodeno_t ino) +void AnchorTable::commit(version_t atid) { - // remove old - dec(ino); + if (pending_create.count(atid)) { + dout(7) << "commit " << atid << " create " << pending_create[atid] << endl; + pending_create.erase(atid); + } - // add new - // make sure new trace is in table - vector &trace = pending_update[ino]; - for (unsigned i=0; i &trace = pending_update[atid].second; + + dout(7) << "commit " << atid << " update " << ino << endl; + + // remove old + dec(ino); + + // add new + for (unsigned i=0; i_create_prepare_logged(req); + at->_create_prepare_logged(req, atid); } }; @@ -216,30 +270,22 @@ void AnchorTable::handle_create_prepare(MAnchor *req) EAnchor *le = new EAnchor(ANCHOR_OP_CREATE_PREPARE, ino, version); le->set_trace(trace); mds->mdlog->submit_entry(le, - new C_AT_CreatePrepare(this, req)); + new C_AT_CreatePrepare(this, req, version)); } -void AnchorTable::_create_prepare_logged(MAnchor *req) +void AnchorTable::_create_prepare_logged(MAnchor *req, version_t atid) { inodeno_t ino = req->get_ino(); - dout(7) << "_create_prepare_logged " << ino << endl; + dout(7) << "_create_prepare_logged " << ino << " atid " << atid << endl; // reply - MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_ACK, ino); + MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, ino, atid); mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); delete req; } -void AnchorTable::handle_create_commit(MAnchor *req) -{ - inodeno_t ino = req->get_ino(); - dout(7) << "handle_create_commit " << ino << endl; - - create_commit(ino); - mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_CREATE_COMMIT, ino, version)); -} // DESTROY @@ -247,11 +293,12 @@ void AnchorTable::handle_create_commit(MAnchor *req) class C_AT_DestroyPrepare : public Context { AnchorTable *at; MAnchor *req; + version_t atid; public: - C_AT_DestroyPrepare(AnchorTable *a, MAnchor *r) : - at(a), req(r) { } + C_AT_DestroyPrepare(AnchorTable *a, MAnchor *r, version_t t) : + at(a), req(r), atid(t) { } void finish(int r) { - at->_destroy_prepare_logged(req); + at->_destroy_prepare_logged(req, atid); } }; @@ -263,31 +310,20 @@ void AnchorTable::handle_destroy_prepare(MAnchor *req) destroy_prepare(ino); mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_PREPARE, ino, version), - new C_AT_DestroyPrepare(this, req)); + new C_AT_DestroyPrepare(this, req, version)); } -void AnchorTable::_destroy_prepare_logged(MAnchor *req) +void AnchorTable::_destroy_prepare_logged(MAnchor *req, version_t atid) { inodeno_t ino = req->get_ino(); - dout(7) << "_destroy_prepare_logged " << ino << endl; + dout(7) << "_destroy_prepare_logged " << ino << " atid " << atid << endl; // reply - MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_ACK, ino); + MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, ino, atid); mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); - delete req; } -void AnchorTable::handle_destroy_commit(MAnchor *req) -{ - inodeno_t ino = req->get_ino(); - dout(7) << "handle_destroy_commit " << ino << endl; - - destroy_commit(ino); - - // log - mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_COMMIT, ino, version)); -} // UPDATE @@ -295,11 +331,12 @@ void AnchorTable::handle_destroy_commit(MAnchor *req) class C_AT_UpdatePrepare : public Context { AnchorTable *at; MAnchor *req; + version_t atid; public: - C_AT_UpdatePrepare(AnchorTable *a, MAnchor *r) : - at(a), req(r) { } + C_AT_UpdatePrepare(AnchorTable *a, MAnchor *r, version_t t) : + at(a), req(r), atid(t) { } void finish(int r) { - at->_update_prepare_logged(req); + at->_update_prepare_logged(req, atid); } }; @@ -316,32 +353,88 @@ void AnchorTable::handle_update_prepare(MAnchor *req) EAnchor *le = new EAnchor(ANCHOR_OP_UPDATE_PREPARE, ino, version); le->set_trace(trace); mds->mdlog->submit_entry(le, - new C_AT_UpdatePrepare(this, req)); + new C_AT_UpdatePrepare(this, req, version)); } -void AnchorTable::_update_prepare_logged(MAnchor *req) +void AnchorTable::_update_prepare_logged(MAnchor *req, version_t atid) { inodeno_t ino = req->get_ino(); - dout(7) << "_update_prepare_logged " << ino << endl; + dout(7) << "_update_prepare_logged " << ino << " atid " << atid << endl; // reply - MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_ACK, ino); + MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, ino, atid); mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); - delete req; } -void AnchorTable::handle_update_commit(MAnchor *req) -{ - inodeno_t ino = req->get_ino(); - dout(7) << "handle_update_commit " << ino << endl; - - update_commit(ino); - mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_UPDATE_COMMIT, ino, version)); + +// COMMIT + +class C_AT_Commit : public Context { + AnchorTable *at; + MAnchor *req; +public: + C_AT_Commit(AnchorTable *a, MAnchor *r) : + at(a), req(r) { } + void finish(int r) { + at->_commit_logged(req); + } +}; + +void AnchorTable::handle_commit(MAnchor *req) +{ + version_t atid = req->get_atid(); + dout(7) << "handle_commit " << atid << endl; + + if (pending_create.count(atid) || + pending_destroy.count(atid) || + pending_update.count(atid)) { + commit(atid); + mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_COMMIT, atid, version)); + } + else if (atid <= version) { + dout(0) << "got commit for atid " << atid << " <= " << version + << ", already committed, sending ack." + << endl; + MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, 0, atid); + mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); + delete req; + return; + } + else { + // wtf. + dout(0) << "got commit for atid " << atid << " > " << version << endl; + assert(atid <= version); + } + + // wait for it to journal + mds->mdlog->wait_for_sync(new C_AT_Commit(this, req)); } +void AnchorTable::_commit_logged(MAnchor *req) +{ + dout(7) << "_commit_logged, sending ACK" << endl; + MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, req->get_ino(), req->get_atid()); + mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); + delete req; +} + + + +// ROLLBACK + +void AnchorTable::handle_rollback(MAnchor *req) +{ + version_t atid = req->get_atid(); + dout(7) << "handle_rollback " << atid << endl; + rollback(atid); + delete req; +} + + + /* * messages */ @@ -384,23 +477,15 @@ void AnchorTable::handle_anchor_request(class MAnchor *req) case ANCHOR_OP_CREATE_PREPARE: handle_create_prepare(req); break; - case ANCHOR_OP_CREATE_COMMIT: - handle_create_commit(req); - break; - case ANCHOR_OP_DESTROY_PREPARE: handle_destroy_prepare(req); break; - case ANCHOR_OP_DESTROY_COMMIT: - handle_destroy_commit(req); - break; - - case ANCHOR_OP_UPDATE_PREPARE: handle_update_prepare(req); break; - case ANCHOR_OP_UPDATE_COMMIT: - handle_update_commit(req); + + case ANCHOR_OP_COMMIT: + handle_commit(req); break; default: @@ -416,11 +501,30 @@ void AnchorTable::handle_anchor_request(class MAnchor *req) // load/save entire table for now! +class C_AT_Saved : public Context { + AnchorTable *at; + version_t version; +public: + C_AT_Saved(AnchorTable *a, version_t v) : at(a), version(v) {} + void finish(int r) { + at->_saved(version); + } +}; + void AnchorTable::save(Context *onfinish) { dout(7) << "save v " << version << endl; if (!opened) return; + if (onfinish) + waiting_for_save[version].push_back(onfinish); + + if (committing_version == version) { + dout(7) << "save already committing v " << version << endl; + return; + } + committing_version = version; + // build up write bufferlist bl; @@ -445,18 +549,31 @@ void AnchorTable::save(Context *onfinish) size_t s = pending_update.size(); bl.append((char*)&s, sizeof(s)); - for (map >::iterator p = pending_update.begin(); + for (map > >::iterator p = pending_update.begin(); p != pending_update.end(); ++p) { bl.append((char*)&p->first, sizeof(p->first)); - ::_encode(p->second, bl); + bl.append((char*)&p->second.first, sizeof(p->second.first)); + ::_encode(p->second.second, bl); } // write! mds->objecter->write(object_t(MDS_INO_ANCHORTABLE+mds->get_nodeid(), 0), 0, bl.length(), bl, - NULL, onfinish); + NULL, new C_AT_Saved(this, version)); +} + +void AnchorTable::_saved(version_t v) +{ + dout(7) << "_saved v " << v << endl; + + assert(v <= committing_version); + assert(committed_version < v); + committed_version = v; + + finish_contexts(waiting_for_save[v], 0); + waiting_for_save.erase(v); } @@ -510,10 +627,15 @@ void AnchorTable::_loaded(bufferlist& bl) bl.copy(off, sizeof(s), (char*)&s); off += sizeof(s); for (size_t i=0; i anchor_map; // uncommitted operations - set pending_create; - set pending_destroy; - map > pending_update; + map pending_create; + map pending_destroy; + map > > pending_update; version_t version; // this includes anchor_map AND pending_* state. + version_t committing_version; + version_t committed_version; // load/save state bool opening, opened; // waiters list waiting_for_open; + map > waiting_for_save; protected: @@ -52,46 +55,52 @@ protected: // mid-level void create_prepare(inodeno_t ino, vector& trace); - void create_commit(inodeno_t ino); void destroy_prepare(inodeno_t ino); - void destroy_commit(inodeno_t ino); void update_prepare(inodeno_t ino, vector& trace); - void update_commit(inodeno_t ino); + void commit(version_t atid); + void rollback(version_t atid); friend class EAnchor; // used for journal replay. // high level interface void handle_lookup(MAnchor *req); void handle_create_prepare(MAnchor *req); - void _create_prepare_logged(MAnchor *req); - void handle_create_commit(MAnchor *req); + void _create_prepare_logged(MAnchor *req, version_t atid); friend class C_AT_CreatePrepare; void handle_destroy_prepare(MAnchor *req); - void _destroy_prepare_logged(MAnchor *req); - void handle_destroy_commit(MAnchor *req); + void _destroy_prepare_logged(MAnchor *req, version_t atid); friend class C_AT_DestroyPrepare; void handle_update_prepare(MAnchor *req); - void _update_prepare_logged(MAnchor *req); - void handle_update_commit(MAnchor *req); + void _update_prepare_logged(MAnchor *req, version_t atid); friend class C_AT_UpdatePrepare; + void handle_commit(MAnchor *req); + void _commit_logged(MAnchor *req); + friend class C_AT_Commit; + + void handle_rollback(MAnchor *req); + // messages void handle_anchor_request(MAnchor *m); + void dump(); + public: AnchorTable(MDS *m) : mds(m), - version(0), + version(0), committing_version(0), committed_version(0), opening(false), opened(false) { } void dispatch(class Message *m); version_t get_version() { return version; } + version_t get_committed_version() { return committed_version; } - void create_fresh() { // reset on mkfs() to empty, loaded table. - version = 0; + void create_fresh() { + // reset (i.e. on mkfs) to empty, but unsaved table. + version = 1; opened = true; opening = false; anchor_map.clear(); @@ -102,6 +111,7 @@ public: // load/save entire table for now! void save(Context *onfinish); + void _saved(version_t v); void load(Context *onfinish); void _loaded(bufferlist& bl); diff --git a/branches/sage/cephmds2/mds/CDir.cc b/branches/sage/cephmds2/mds/CDir.cc index e10da3b273f..3de5a60ca70 100644 --- a/branches/sage/cephmds2/mds/CDir.cc +++ b/branches/sage/cephmds2/mds/CDir.cc @@ -284,7 +284,11 @@ void CDir::link_inode_work( CDentry *dn, CInode *in ) void CDir::unlink_inode( CDentry *dn ) { - dout(12) << "unlink_inode " << *dn << " " << *dn->inode << endl; + if (dn->is_remote()) { + dout(12) << "unlink_inode " << *dn << endl; + } else { + dout(12) << "unlink_inode " << *dn << " " << *dn->inode << endl; + } unlink_inode_work(dn); diff --git a/branches/sage/cephmds2/mds/CInode.h b/branches/sage/cephmds2/mds/CInode.h index 0330b0eb8f9..bdab343bb59 100644 --- a/branches/sage/cephmds2/mds/CInode.h +++ b/branches/sage/cephmds2/mds/CInode.h @@ -226,7 +226,9 @@ protected: bool is_dir() { return ((inode.mode & INODE_TYPE_MASK) == INODE_MODE_DIR) ? true:false; } bool is_anchored() { return inode.anchored; } - + bool is_anchoring() { return state_test(STATE_ANCHORING); } + bool is_unanchoring() { return state_test(STATE_UNANCHORING); } + bool is_root() { return state & STATE_ROOT; } bool is_auth() { return state & STATE_AUTH; } diff --git a/branches/sage/cephmds2/mds/MDCache.cc b/branches/sage/cephmds2/mds/MDCache.cc index 6ccb4164b0d..daa1956dd96 100644 --- a/branches/sage/cephmds2/mds/MDCache.cc +++ b/branches/sage/cephmds2/mds/MDCache.cc @@ -3028,9 +3028,10 @@ class C_MDC_AnchorCreatePrepared : public Context { MDCache *cache; CInode *in; public: + version_t atid; C_MDC_AnchorCreatePrepared(MDCache *c, CInode *i) : cache(c), in(i) {} void finish(int r) { - cache->_anchor_create_prepared(in); + cache->_anchor_create_prepared(in, atid); } }; @@ -3058,24 +3059,26 @@ void MDCache::anchor_create(CInode *in, Context *onfinish) in->make_anchor_trace(trace); // do it - mds->anchorclient->prepare_create(in->ino(), trace, - new C_MDC_AnchorCreatePrepared(this, in)); + C_MDC_AnchorCreatePrepared *fin = new C_MDC_AnchorCreatePrepared(this, in); + mds->anchorclient->prepare_create(in->ino(), trace, &fin->atid, fin); } class C_MDC_AnchorCreateLogged : public Context { MDCache *cache; CInode *in; + version_t atid; version_t pdv; public: - C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t v) : cache(c), in(i), pdv(v) {} + C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t t, version_t v) : + cache(c), in(i), atid(t), pdv(v) {} void finish(int r) { - cache->_anchor_create_logged(in, pdv); + cache->_anchor_create_logged(in, atid, pdv); } }; -void MDCache::_anchor_create_prepared(CInode *in) +void MDCache::_anchor_create_prepared(CInode *in, version_t atid) { - dout(10) << "_anchor_create_prepared " << *in << endl; + dout(10) << "_anchor_create_prepared " << *in << " atid " << atid << endl; assert(in->inode.anchored == false); @@ -3090,12 +3093,15 @@ void MDCache::_anchor_create_prepared(CInode *in) pi->anchored = true; pi->version = pdv; + // note anchor transaction + le->metablob.add_anchor_transaction(atid); + // log + wait - mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, pdv)); + mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, atid, pdv)); } -void MDCache::_anchor_create_logged(CInode *in, version_t pdv) +void MDCache::_anchor_create_logged(CInode *in, version_t atid, version_t pdv) { dout(10) << "_anchor_create_logged pdv " << pdv << " on " << *in << endl; @@ -3106,10 +3112,10 @@ void MDCache::_anchor_create_logged(CInode *in, version_t pdv) // apply update to cache in->inode.anchored = true; - in->inode.version = pdv; + in->mark_dirty(pdv); // tell the anchortable we've committed - mds->anchorclient->commit_create(in->ino()); + mds->anchorclient->commit(atid); // trigger waiters in->finish_waiting(CInode::WAIT_ANCHORED, 0); @@ -3122,9 +3128,10 @@ class C_MDC_AnchorDestroyPrepared : public Context { MDCache *cache; CInode *in; public: + version_t atid; C_MDC_AnchorDestroyPrepared(MDCache *c, CInode *i) : cache(c), in(i) {} void finish(int r) { - cache->_anchor_destroy_prepared(in); + cache->_anchor_destroy_prepared(in, atid); } }; @@ -3149,23 +3156,26 @@ void MDCache::anchor_destroy(CInode *in, Context *onfinish) in->get(CInode::PIN_UNANCHORING); // do it - mds->anchorclient->prepare_destroy(in->ino(), new C_MDC_AnchorDestroyPrepared(this, in)); + C_MDC_AnchorDestroyPrepared *fin = new C_MDC_AnchorDestroyPrepared(this, in); + mds->anchorclient->prepare_destroy(in->ino(), &fin->atid, fin); } class C_MDC_AnchorDestroyLogged : public Context { MDCache *cache; CInode *in; + version_t atid; version_t pdv; public: - C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t v) : cache(c), in(i), pdv(v) {} + C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t t, version_t v) : + cache(c), in(i), atid(t), pdv(v) {} void finish(int r) { - cache->_anchor_destroy_logged(in, pdv); + cache->_anchor_destroy_logged(in, atid, pdv); } }; -void MDCache::_anchor_destroy_prepared(CInode *in) +void MDCache::_anchor_destroy_prepared(CInode *in, version_t atid) { - dout(10) << "_anchor_destroy_prepared " << *in << endl; + dout(10) << "_anchor_destroy_prepared " << *in << " atid " << atid << endl; assert(in->inode.anchored == true); @@ -3180,12 +3190,15 @@ void MDCache::_anchor_destroy_prepared(CInode *in) pi->anchored = true; pi->version = pdv; + // note anchor transaction + le->metablob.add_anchor_transaction(atid); + // log + wait - mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, pdv)); + mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, atid, pdv)); } -void MDCache::_anchor_destroy_logged(CInode *in, version_t pdv) +void MDCache::_anchor_destroy_logged(CInode *in, version_t atid, version_t pdv) { dout(10) << "_anchor_destroy_logged pdv " << pdv << " on " << *in << endl; @@ -3199,7 +3212,7 @@ void MDCache::_anchor_destroy_logged(CInode *in, version_t pdv) in->inode.version = pdv; // tell the anchortable we've committed - mds->anchorclient->commit_destroy(in->ino()); + mds->anchorclient->commit(atid); // trigger waiters in->finish_waiting(CInode::WAIT_UNANCHORED, 0); diff --git a/branches/sage/cephmds2/mds/MDCache.h b/branches/sage/cephmds2/mds/MDCache.h index 077bfb31f5d..8d7189e8f2a 100644 --- a/branches/sage/cephmds2/mds/MDCache.h +++ b/branches/sage/cephmds2/mds/MDCache.h @@ -347,10 +347,10 @@ public: void anchor_create(CInode *in, Context *onfinish); void anchor_destroy(CInode *in, Context *onfinish); protected: - void _anchor_create_prepared(CInode *in); - void _anchor_create_logged(CInode *in, version_t pdv); - void _anchor_destroy_prepared(CInode *in); - void _anchor_destroy_logged(CInode *in, version_t pdv); + void _anchor_create_prepared(CInode *in, version_t atid); + void _anchor_create_logged(CInode *in, version_t atid, version_t pdv); + void _anchor_destroy_prepared(CInode *in, version_t atid); + void _anchor_destroy_logged(CInode *in, version_t atid, version_t pdv); friend class C_MDC_AnchorCreatePrepared; friend class C_MDC_AnchorCreateLogged; diff --git a/branches/sage/cephmds2/mds/MDS.cc b/branches/sage/cephmds2/mds/MDS.cc index 6ef96b4f38a..2c4265b812a 100644 --- a/branches/sage/cephmds2/mds/MDS.cc +++ b/branches/sage/cephmds2/mds/MDS.cc @@ -83,7 +83,7 @@ MDS::MDS(int whoami, Messenger *m, MonMap *mm) : timer(mds_lock) { mdlog = new MDLog(this); balancer = new MDBalancer(this); - anchorclient = new AnchorClient(messenger, mdsmap); + anchorclient = new AnchorClient(this); idalloc = new IdAllocator(this); anchortable = new AnchorTable(this); @@ -508,6 +508,16 @@ void MDS::handle_mds_map(MMDSMap *m) // now active? if (is_active()) { + // did i just recover? + if (oldstate == MDSMap::STATE_REJOIN) { + dout(1) << "successful recovery!" << endl; + + // kick anchorclient (resent COMMITs) + anchorclient->finish_recovery(); + + // ... + } + dout(1) << "now active" << endl; finish_contexts(waitfor_active); // kick waiters } @@ -528,13 +538,6 @@ void MDS::handle_mds_map(MMDSMap *m) mdcache->shutdown_start(); - // save anchor table - if (mdsmap->get_anchortable() == whoami) - anchortable->save(0); // FIXME? or detect completion via filer? - - if (idalloc) - idalloc->save(0); // FIXME? or detect completion via filer? - // flush log mdlog->set_max_events(0); mdlog->trim(NULL); diff --git a/branches/sage/cephmds2/mds/Server.cc b/branches/sage/cephmds2/mds/Server.cc index 0b7ce1b0ff9..386e4e3f33d 100644 --- a/branches/sage/cephmds2/mds/Server.cc +++ b/branches/sage/cephmds2/mds/Server.cc @@ -953,6 +953,7 @@ void Server::handle_client_mknod(MClientRequest *req, CInode *diri) assert(dn); // it's a file. + dn->pre_dirty(); newi->inode.mode = req->args.mknod.mode; newi->inode.mode &= ~INODE_TYPE_MASK; newi->inode.mode |= INODE_MODE_FILE; @@ -1038,7 +1039,7 @@ int Server::prepare_mknod(MClientRequest *req, CInode *diri, *pdn = dir->lookup(name); if (*pdn) { if (!(*pdn)->can_read(req)) { - dout(10) << "waiting on (existing!) dentry " << **pdn << endl; + dout(10) << "waiting on (existing!) unreadable dentry " << **pdn << endl; dir->add_waiter(CDir::WAIT_DNREAD, name, new C_MDS_RetryRequest(mds, req, diri)); return 0; } @@ -1064,26 +1065,26 @@ int Server::prepare_mknod(MClientRequest *req, CInode *diri, return 0; } - // make sure dir is pinnable - - - // create inode - *pin = mdcache->create_inode(); - (*pin)->inode.uid = req->get_caller_uid(); - (*pin)->inode.gid = req->get_caller_gid(); - (*pin)->inode.ctime = (*pin)->inode.mtime = (*pin)->inode.atime = g_clock.gettime(); // now - // note: inode.version will get set by finisher's mark_dirty. - - // create dentry + // create null dentry if (!*pdn) *pdn = dir->add_dentry(name, 0); - (*pdn)->pre_dirty(); - // xlock dentry bool res = mds->locker->dentry_xlock_start(*pdn, req, diri); - assert(res == true); + if (!res) + return 0; + // yay! + + // create inode? + if (pin) { + *pin = mdcache->create_inode(); + (*pin)->inode.uid = req->get_caller_uid(); + (*pin)->inode.gid = req->get_caller_gid(); + (*pin)->inode.ctime = (*pin)->inode.mtime = (*pin)->inode.atime = g_clock.gettime(); // now + // note: inode.version will get set by finisher's mark_dirty. + } + // bump modify pop mds->balancer->hit_dir(dir, META_POP_DWR); @@ -1110,6 +1111,7 @@ void Server::handle_client_mkdir(MClientRequest *req, CInode *diri) assert(dn); // it's a directory. + dn->pre_dirty(); newi->inode.mode = req->args.mkdir.mode; newi->inode.mode &= ~INODE_TYPE_MASK; newi->inode.mode |= INODE_MODE_DIR; @@ -1166,6 +1168,7 @@ void Server::handle_client_symlink(MClientRequest *req, CInode *diri) assert(dn); // it's a symlink + dn->pre_dirty(); newi->inode.mode &= ~INODE_TYPE_MASK; newi->inode.mode |= INODE_MODE_SYMLINK; newi->symlink = req->get_sarg(); @@ -1214,20 +1217,6 @@ void Server::handle_client_link(MClientRequest *req, CInode *ref) CDir *dir = validate_new_dentry_dir(req, ref, dname); if (!dir) return; - // dentry exists? - CDentry *dn = dir->lookup(dname); - if (dn && (!dn->is_null() || dn->is_xlockedbyother(req))) { - dout(7) << "handle_client_link dn exists " << *dn << endl; - reply_request(req, -EEXIST); - return; - } - - // xlock dentry - if (!dn->is_xlockedbyme(req)) { - if (!mds->locker->dentry_xlock_start(dn, req, ref)) - return; - } - // discover link target filepath target = req->get_sarg(); dout(7) << "handle_client_link discovering target " << target << endl; @@ -1241,6 +1230,192 @@ void Server::handle_client_link(MClientRequest *req, CInode *ref) } +void Server::handle_client_link_2(int r, MClientRequest *req, CInode *diri, vector& trace) +{ + // target dne? + if (r < 0) { + dout(7) << "target " << req->get_sarg() << " dne" << endl; + reply_request(req, r); + return; + } + assert(r == 0); + + // identify target inode + CInode *targeti = mdcache->get_root(); + if (trace.size()) targeti = trace[trace.size()-1]->inode; + assert(targeti); + + // dir? + dout(7) << "target is " << *targeti << endl; + if (targeti->is_dir()) { + dout(7) << "target is a dir, failing" << endl; + reply_request(req, -EINVAL); + return; + } + + // can we create the dentry? + CDir *dir = 0; + CDentry *dn = 0; + + // make dentry and inode, xlock dentry. + r = prepare_mknod(req, diri, &dir, 0, &dn); + if (!r) + return; // wait on something + assert(dir); + assert(dn); + + // ok! + assert(dn->is_xlockedbyme(req)); + + // local or remote? + if (targeti->is_auth()) + link_local(req, diri, dn, targeti); + else + link_remote(req, diri, dn, targeti); +} + + +class C_MDS_link_local_finish : public Context { + MDS *mds; + MClientRequest *req; + CDentry *dn; + CInode *targeti; + version_t dpv; + time_t tctime; + time_t tpv; +public: + C_MDS_link_local_finish(MDS *m, MClientRequest *r, CDentry *d, CInode *ti, time_t ct) : + mds(m), req(r), dn(d), targeti(ti), + dpv(d->get_projected_version()), + tctime(ct), + tpv(targeti->get_parent_dn()->get_projected_version()) {} + void finish(int r) { + assert(r == 0); + mds->server->_link_local_finish(req, dn, targeti, dpv, tctime, tpv); + } +}; + + +void Server::link_local(MClientRequest *req, CInode *diri, + CDentry *dn, CInode *targeti) +{ + dout(10) << "link_local " << *dn << " to " << *targeti << endl; + + // anchor target? + if (targeti->get_parent_dir() == dn->get_dir()) { + dout(7) << "target is in the same dir, sweet" << endl; + } + else if (targeti->is_anchored() && !targeti->is_unanchoring()) { + dout(7) << "target anchored already (nlink=" << targeti->inode.nlink << "), sweet" << endl; + } else { + dout(7) << "target needs anchor, nlink=" << targeti->inode.nlink << ", creating anchor" << endl; + + mdcache->anchor_create(targeti, + new C_MDS_RetryRequest(mds, req, diri)); + return; + } + + // wrlock the target inode + if (!mds->locker->inode_hard_write_start(targeti, req)) + return; // fw or (wait for) lock + + // ok, let's do it. + // prepare log entry + EUpdate *le = new EUpdate("link_local"); + + // predirty + dn->pre_dirty(); + version_t tpdv = targeti->pre_dirty(); + + // add to event + le->metablob.add_dir_context(dn->get_dir()); + le->metablob.add_dentry(dn, true); + le->metablob.add_dir_context(targeti->get_parent_dir()); + inode_t *pi = le->metablob.add_dentry(targeti->parent, true, targeti); + + // update journaled target inode + pi->nlink++; + pi->ctime = g_clock.gettime(); + pi->version = tpdv; + + // finisher + C_MDS_link_local_finish *fin = new C_MDS_link_local_finish(mds, req, dn, targeti, pi->ctime); + + // log + wait + mdlog->submit_entry(le); + mdlog->wait_for_sync(fin); +} + +void Server::_link_local_finish(MClientRequest *req, CDentry *dn, CInode *targeti, + version_t dpv, time_t tctime, version_t tpv) +{ + dout(10) << "_link_local_finish " << *dn << " to " << *targeti << endl; + + // link and unlock the new dentry + dn->set_remote_ino(targeti->ino()); + dn->set_version(dpv); + dn->mark_dirty(dpv); + + // update the target + targeti->inode.nlink++; + targeti->inode.ctime = tctime; + targeti->mark_dirty(tpv); + + // unlock the new dentry and target inode + mds->locker->dentry_xlock_finish(dn); + mds->locker->inode_hard_write_finish(targeti); + + // bump target popularity + mds->balancer->hit_inode(targeti, META_POP_IWR); + + // reply + MClientReply *reply = new MClientReply(req, 0); + reply_request(req, reply, dn->get_dir()->get_inode()); // FIXME: imprecise ref +} + + + +void Server::link_remote(MClientRequest *req, CInode *ref, + CDentry *dn, CInode *targeti) +{ + dout(10) << "link_remote " << *dn << " to " << *targeti << endl; + + // pin the target replica in our cache + assert(!targeti->is_auth()); + mdcache->request_pin_inode(req, targeti); + + // 1. send LinkPrepare to dest (lock target on dest, journal target update) + + + + + // 2. create+journal new dentry, as with link_local. + // 3. send LinkCommit to dest (unlocks target on dest, journals commit) + + // IMPLEMENT ME + MClientReply *reply = new MClientReply(req, -EAGAIN); + reply_request(req, reply, dn->get_dir()->get_inode()); // FIXME: imprecise ref +} + + +/* +void Server::handle_client_link_finish(MClientRequest *req, CInode *ref, + CDentry *dn, CInode *targeti) +{ + // create remote link + dn->dir->link_inode(dn, targeti->ino()); + dn->link_remote( targeti ); // since we have it + dn->_mark_dirty(); // fixme + + mds->balancer->hit_dir(dn->dir, META_POP_DWR); + + // done! + commit_request(req, new MClientReply(req, 0), ref, + 0); // FIXME i should log something +} +*/ + +/* class C_MDS_RemoteLink : public Context { Server *server; MClientRequest *req; @@ -1271,62 +1446,7 @@ public: } }; -void Server::handle_client_link_2(int r, MClientRequest *req, CInode *diri, vector& trace) -{ - // target dne? - if (r < 0) { - dout(7) << "target " << req->get_sarg() << " dne" << endl; - reply_request(req, r); - return; - } - assert(r == 0); - CInode *targeti = mdcache->get_root(); - if (trace.size()) targeti = trace[trace.size()-1]->inode; - assert(targeti); - - // dir? - dout(7) << "target is " << *targeti << endl; - if (targeti->is_dir()) { - dout(7) << "target is a dir, failing" << endl; - reply_request(req, -EINVAL); - return; - } - - // what was the new dentry again? - string dname = req->get_filepath().last_dentry(); - frag_t fg = diri->pick_dirfrag(dname); - CDir *dir = diri->get_dirfrag(fg); - assert(dir); - CDentry *dn = dir->lookup(dname); - assert(dn); - assert(dn->is_xlockedbyme(req)); - - - // ok! - if (targeti->is_auth()) { - // mine - - // same dir? - if (targeti->get_parent_dir() == dn->get_dir()) { - dout(7) << "target is in the same dir, sweet" << endl; - } - else if (targeti->is_anchored()) { - dout(7) << "target anchored already (nlink=" << targeti->inode.nlink << "), sweet" << endl; - } else { - assert(targeti->inode.nlink == 1); - dout(7) << "target needs anchor, nlink=" << targeti->inode.nlink << ", creating anchor" << endl; - - mdcache->anchor_create(targeti, - new C_MDS_RetryRequest(mds, req, diri)); - return; - } - - // ok, inc link! - targeti->inode.nlink++; - dout(7) << "nlink++, now " << targeti->inode.nlink << " on " << *targeti << endl; - targeti->_mark_dirty(); // fixme - } else { // remote: send nlink++ request, wait dout(7) << "target is remote, sending InodeLink" << endl; @@ -1338,23 +1458,10 @@ void Server::handle_client_link_2(int r, MClientRequest *req, CInode *diri, vect return; } - handle_client_link_finish(req, diri, dn, targeti); -} +*/ + -void Server::handle_client_link_finish(MClientRequest *req, CInode *ref, - CDentry *dn, CInode *targeti) -{ - // create remote link - dn->dir->link_inode(dn, targeti->ino()); - dn->link_remote( targeti ); // since we have it - dn->_mark_dirty(); // fixme - - mds->balancer->hit_dir(dn->dir, META_POP_DWR); - // done! - commit_request(req, new MClientReply(req, 0), ref, - 0); // FIXME i should log something -} // UNLINK @@ -1403,8 +1510,7 @@ void Server::handle_client_unlink(MClientRequest *req, // have it. locked? if (!dn->can_read(req)) { dout(10) << " waiting on " << *dn << endl; - dir->add_waiter(CDir::WAIT_DNREAD, - name, + dir->add_waiter(CDir::WAIT_DNREAD, name, new C_MDS_RetryRequest(mds, req, diri)); return; } @@ -1415,14 +1521,29 @@ void Server::handle_client_unlink(MClientRequest *req, reply_request(req, -ENOENT); return; } + + // remote? if so, open up the inode. + if (!dn->inode) { + assert(dn->is_remote()); + CInode *in = mdcache->get_inode(dn->get_remote_ino()); + if (in) { + dout(7) << "linking in remote in " << *in << endl; + dn->link_remote(in); + } else { + dout(10) << "remote dn, opening inode for " << *dn << endl; + mdcache->open_remote_ino(dn->get_remote_ino(), req, + new C_MDS_RetryRequest(mds, req, diri)); + return; + } + } // ok! CInode *in = dn->inode; assert(in); if (rmdir) { - dout(7) << "handle_client_rmdir on dir " << *in << endl; + dout(7) << "handle_client_rmdir on " << *in << endl; } else { - dout(7) << "handle_client_unlink on non-dir " << *in << endl; + dout(7) << "handle_client_unlink on " << *in << endl; } int inauth = in->authority().first; @@ -1495,10 +1616,9 @@ void Server::handle_client_unlink(MClientRequest *req, } // am i dentry auth? - if (inauth != mds->get_nodeid()) { - // not auth; forward! - dout(7) << "handle_client_unlink not auth for " << *dir << " dn " << dn->name << ", fwd to " << inauth << endl; - mdcache->request_forward(req, inauth); + if (!dn->is_auth()) { + dout(7) << "handle_client_unlink/rmdir not auth for " << *dn << endl; + mdcache->request_forward(req, dn->authority().first); return; } @@ -1508,21 +1628,6 @@ void Server::handle_client_unlink(MClientRequest *req, if (!mds->locker->dentry_xlock_start(dn, req, diri)) return; - // is this a remote link? - if (dn->is_remote() && !dn->inode) { - CInode *in = mdcache->get_inode(dn->get_remote_ino()); - if (in) { - dn->link_remote(in); - } else { - // open inode - dout(7) << "opening target inode first, ino is " << dn->get_remote_ino() << endl; - mdcache->open_remote_ino(dn->get_remote_ino(), req, - new C_MDS_RetryRequest(mds, req, diri)); - return; - } - } - - mds->balancer->hit_dir(dn->dir, META_POP_DWR); // it's locked, unlink! @@ -2167,7 +2272,7 @@ void Server::handle_client_openc(MClientRequest *req, CInode *diri) // make dentry and inode, xlock dentry. int r = prepare_mknod(req, diri, &dir, &in, &dn); - if (!r) + if (r < 0) return; // wait on something assert(dir); assert(in); @@ -2176,6 +2281,7 @@ void Server::handle_client_openc(MClientRequest *req, CInode *diri) if (r == 1) { // created. // it's a file. + dn->pre_dirty(); in->inode.mode = 0644; // FIXME req should have a umask in->inode.mode |= INODE_MODE_FILE; diff --git a/branches/sage/cephmds2/mds/Server.h b/branches/sage/cephmds2/mds/Server.h index 86bd11577c0..9b33a5c2dee 100644 --- a/branches/sage/cephmds2/mds/Server.h +++ b/branches/sage/cephmds2/mds/Server.h @@ -90,12 +90,20 @@ public: // namespace changes void handle_client_mknod(MClientRequest *req, CInode *ref); + void handle_client_link(MClientRequest *req, CInode *ref); void handle_client_link_2(int r, MClientRequest *req, CInode *ref, vector& trace); - void handle_client_link_finish(MClientRequest *req, CInode *ref, - CDentry *dn, CInode *targeti); + void link_local(MClientRequest *req, CInode *diri, + CDentry *dn, CInode *targeti); + void _link_local_finish(MClientRequest *req, + CDentry *dn, CInode *targeti, + version_t, time_t, version_t); + void link_remote(MClientRequest *req, CInode *diri, + CDentry *dn, CInode *targeti); void handle_client_unlink(MClientRequest *req, CInode *ref); + + void handle_client_rename(MClientRequest *req, CInode *ref); void handle_client_rename_2(MClientRequest *req, CInode *ref, diff --git a/branches/sage/cephmds2/mds/events/EAnchor.h b/branches/sage/cephmds2/mds/events/EAnchor.h index 64a07588aa9..cd0098df7c7 100644 --- a/branches/sage/cephmds2/mds/events/EAnchor.h +++ b/branches/sage/cephmds2/mds/events/EAnchor.h @@ -25,6 +25,7 @@ class EAnchor : public LogEvent { protected: int op; inodeno_t ino; + version_t atid; vector trace; version_t version; // anchor table version @@ -32,7 +33,10 @@ protected: EAnchor() : LogEvent(EVENT_ANCHOR) { } EAnchor(int o, inodeno_t i, version_t v) : LogEvent(EVENT_ANCHOR), - op(o), ino(i), version(v) { } + op(o), ino(i), atid(0), version(v) { } + EAnchor(int o, version_t a, version_t v=0) : + LogEvent(EVENT_ANCHOR), + op(o), atid(a), version(v) { } void set_trace(vector& t) { trace = t; } vector& get_trace() { return trace; } @@ -40,6 +44,7 @@ protected: void encode_payload(bufferlist& bl) { bl.append((char*)&op, sizeof(op)); bl.append((char*)&ino, sizeof(ino)); + bl.append((char*)&atid, sizeof(atid)); ::_encode(trace, bl); bl.append((char*)&version, sizeof(version)); } @@ -48,16 +53,19 @@ protected: off += sizeof(op); bl.copy(off, sizeof(ino), (char*)&ino); off += sizeof(ino); + bl.copy(off, sizeof(atid), (char*)&atid); + off += sizeof(atid); ::_decode(trace, bl, off); bl.copy(off, sizeof(version), (char*)&version); off += sizeof(version); } - void print(ostream& out) { - out << "EAnchor " << get_anchor_opname(op) << " " << ino << endl; - } - + out << "EAnchor " << get_anchor_opname(op); + if (ino) out << " " << ino; + if (atid) out << " atid " << atid; + if (version) out << " v " << version; + } bool has_expired(MDS *mds); void expire(MDS *mds, Context *c); diff --git a/branches/sage/cephmds2/mds/events/EMetaBlob.h b/branches/sage/cephmds2/mds/events/EMetaBlob.h index 4fbeba83202..9166167c81d 100644 --- a/branches/sage/cephmds2/mds/events/EMetaBlob.h +++ b/branches/sage/cephmds2/mds/events/EMetaBlob.h @@ -209,7 +209,14 @@ class EMetaBlob { list lump_order; map lump_map; + // anchor transactions included in this update. + list atids; + public: + + void add_anchor_transaction(version_t atid) { + atids.push_back(atid); + } // remote pointer to to-be-journaled inode iff it's a normal (non-remote) dentry inode_t *add_dentry(CDentry *dn, bool dirty, CInode *in=0) { @@ -303,6 +310,7 @@ class EMetaBlob { bl.append((char*)&(*i), sizeof(*i)); lump_map[*i]._encode(bl); } + ::_encode(atids, bl); } void _decode(bufferlist& bl, int& off) { int n; @@ -315,14 +323,16 @@ class EMetaBlob { lump_order.push_back(dirfrag); lump_map[dirfrag]._decode(bl, off); } + ::_decode(atids, bl, off); } void print(ostream& out) const { - if (lump_order.empty()) - out << "[metablob empty]"; - else - out << "[metablob " << lump_order.front() - << ", " << lump_map.size() << " dirs]"; + out << "[metablob"; + if (!lump_order.empty()) + out << lump_order.front() << ", " << lump_map.size() << " dirs"; + if (!atids.empty()) + out << " atids " << atids; + out << "]"; } bool has_expired(MDS *mds); diff --git a/branches/sage/cephmds2/mds/journal.cc b/branches/sage/cephmds2/mds/journal.cc index 74bdbb1c5bb..6ffdf8b68a3 100644 --- a/branches/sage/cephmds2/mds/journal.cc +++ b/branches/sage/cephmds2/mds/journal.cc @@ -33,6 +33,7 @@ #include "MDCache.h" #include "Migrator.h" #include "AnchorTable.h" +#include "AnchorClient.h" #include "config.h" #undef dout @@ -121,6 +122,17 @@ bool EMetaBlob::has_expired(MDS *mds) assert(0); // i goofed the logic } + // have my anchortable ops committed? + for (list::iterator p = atids.begin(); + p != atids.end(); + ++p) { + if (!mds->anchorclient->has_committed(*p)) { + dout(10) << "EMetaBlob.has_expired anchor transaction " << *p + << " not yet acked" << endl; + return false; + } + } + return true; // all dirlumps expired. } @@ -178,12 +190,10 @@ void EMetaBlob::expire(MDS *mds, Context *c) assert(0); // hrm } - // commit or wait for export - // FIXME: what if export aborts? need to retry! - assert(!commit.empty() || !waitfor_export.empty()); - - //C_Gather *gather = new C_Gather(new C_journal_RetryExpire(mds, this, c)); + // set up gather context C_Gather *gather = new C_Gather(c); + + // do or wait for exports and commits for (map::iterator p = commit.begin(); p != commit.end(); ++p) { @@ -198,6 +208,18 @@ void EMetaBlob::expire(MDS *mds, Context *c) p != waitfor_export.end(); ++p) mds->mdcache->migrator->add_export_finish_waiter(*p, gather->new_sub()); + + + // have my anchortable ops committed? + for (list::iterator p = atids.begin(); + p != atids.end(); + ++p) { + if (!mds->anchorclient->has_committed(*p)) { + dout(10) << "EMetaBlob.expire anchor transaction " << *p + << " not yet acked, waiting" << endl; + mds->anchorclient->wait_for_ack(*p, gather->new_sub()); + } + } } void EMetaBlob::replay(MDS *mds) @@ -300,6 +322,14 @@ void EMetaBlob::replay(MDS *mds) } } } + + for (list::iterator p = atids.begin(); + p != atids.end(); + ++p) { + dout(10) << "EMetaBlob.replay noting anchor transaction " << *p << endl; + mds->anchorclient->got_journaled_agree(*p); + } + } // ----------------------- @@ -438,7 +468,7 @@ void EAlloc::replay(MDS *mds) bool EAnchor::has_expired(MDS *mds) { - version_t cv = mds->anchortable->get_version(); + version_t cv = mds->anchortable->get_committed_version(); if (cv < version) { dout(10) << "EAnchor.has_expired v " << version << " > " << cv << ", still dirty" << endl; @@ -467,24 +497,25 @@ void EAnchor::replay(MDS *mds) assert(version-1 == mds->anchortable->get_version()); switch (op) { + // anchortable case ANCHOR_OP_CREATE_PREPARE: mds->anchortable->create_prepare(ino, trace); break; - case ANCHOR_OP_CREATE_COMMIT: - mds->anchortable->create_commit(ino); - break; case ANCHOR_OP_DESTROY_PREPARE: mds->anchortable->destroy_prepare(ino); break; - case ANCHOR_OP_DESTROY_COMMIT: - mds->anchortable->destroy_commit(ino); - break; case ANCHOR_OP_UPDATE_PREPARE: mds->anchortable->update_prepare(ino, trace); break; - case ANCHOR_OP_UPDATE_COMMIT: - mds->anchortable->update_commit(ino); + case ANCHOR_OP_COMMIT: + mds->anchortable->commit(atid); break; + + // anchorclient + case ANCHOR_OP_ACK: + mds->anchorclient->got_journaled_ack(atid); + break; + default: assert(0); } diff --git a/branches/sage/cephmds2/messages/MAnchor.h b/branches/sage/cephmds2/messages/MAnchor.h index edc7b51f6b0..cecf74c015b 100644 --- a/branches/sage/cephmds2/messages/MAnchor.h +++ b/branches/sage/cephmds2/messages/MAnchor.h @@ -18,24 +18,25 @@ #include #include "msg/Message.h" -#include "mds/AnchorTable.h" +#include "mds/Anchor.h" class MAnchor : public Message { int op; inodeno_t ino; vector trace; + version_t atid; // anchor table version. public: MAnchor() {} - MAnchor(int o, inodeno_t i) : - Message(MSG_MDS_ANCHOR), - op(o), ino(i) { } - + MAnchor(int o, inodeno_t i, version_t v=0) : + Message(MSG_MDS_ANCHOR), + op(o), ino(i), atid(v) { } virtual char *get_type_name() { return "anchor"; } void print(ostream& o) { o << "anchor(" << get_anchor_opname(op) << " " << ino; + if (atid) o << " atid " << atid; for (unsigned i=0; i& get_trace() { return trace; } + version_t get_atid() { return atid; } virtual void decode_payload() { int off = 0; @@ -56,19 +58,16 @@ class MAnchor : public Message { off += sizeof(op); payload.copy(off, sizeof(ino), (char*)&ino); off += sizeof(ino); + payload.copy(off, sizeof(atid), (char*)&atid); + off += sizeof(atid); ::_decode(trace, payload, off); } virtual void encode_payload() { payload.append((char*)&op, sizeof(op)); payload.append((char*)&ino, sizeof(ino)); + payload.append((char*)&atid, sizeof(atid)); ::_encode(trace, payload); - /* - int n = trace.size(); - payload.append((char*)&n, sizeof(int)); - for (int i=0; i_encode(payload); - */ } }; diff --git a/branches/sage/cephmds2/messages/MInodeLink.h b/branches/sage/cephmds2/messages/MInodeLink.h index 4c7cf28f4c5..38b7ce2d585 100644 --- a/branches/sage/cephmds2/messages/MInodeLink.h +++ b/branches/sage/cephmds2/messages/MInodeLink.h @@ -21,6 +21,9 @@ typedef struct { } MInodeLink_st; class MInodeLink : public Message { + inodeno_t ino; + filepath link_name; + bool prepare; MInodeLink_st st; public: