omg renames actually work (when on teh same mds at least).

getting closer!


git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@162 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sage 2005-04-26 19:57:25 +00:00
parent 160f8daf38
commit 71d15dc3ba
13 changed files with 484 additions and 287 deletions

View File

@ -14,7 +14,7 @@
md_config_t g_conf = {
num_mds: 13,
num_osd: 10,
num_client: 55,
num_client: 550,
osd_cow: false, // crashy? true,
@ -59,7 +59,7 @@ md_config_t g_conf = {
client_op_mknod: 10,
client_op_link: false,
client_op_unlink: 10,
client_op_rename: 00,
client_op_rename: 100,
client_op_mkdir: 10,
client_op_rmdir: 10,

View File

@ -1,5 +1,6 @@
#include "CDentry.h"
#include "CInode.h"
#include "CDir.h"
#include <cassert>
@ -13,7 +14,10 @@ ostream& operator<<(ostream& out, CDentry& dn)
if (dn.get_lockstate() == DN_LOCK_UNPINNING) out << " unpinning";
if (dn.get_lockstate() == DN_LOCK_PREXLOCK) out << " prexlock g=" << dn.get_gather_set();
if (dn.get_lockstate() == DN_LOCK_XLOCK) out << " xlock";
out << " in " << *dn.get_dir() << "]";
out << " inode=" << dn.get_inode();
out << " " << &dn;
out << " in " << *dn.get_dir();
out << "]";
return out;
}
@ -25,20 +29,35 @@ CDentry::CDentry(const CDentry& m) {
void CDentry::mark_dirty()
{
dout(10) << " mark_dirty " << *this << endl;
dirty = true;
// dir is now dirty (if it wasn't already)
dir->mark_dirty();
// pin inode?
if (inode && !dirty) inode->get(CINODE_PIN_DNDIRTY);
// i now live in that (potentially newly dirty) version
parent_dir_version = dir->get_version();
dirty = true;
}
void CDentry::mark_clean() {
dout(10) << " mark_clean " << *this << endl;
if (dirty && inode) inode->put(CINODE_PIN_DNDIRTY);
dirty = false;
}
void CDentry::make_path(string& s)
{
if (dir->inode->get_parent_dn())
dir->inode->get_parent_dn()->make_path(s);
s += "/";
s += name;
}
// =
const CDentry& CDentry::operator= (const CDentry& right) {
assert(0); //std::cerr << "copy op called, implement me" << endl;

View File

@ -76,6 +76,9 @@ class CDentry {
bool operator>= (const CDentry& right) const;
bool operator<= (const CDentry& right) const;
// misc
void make_path(string& p);
// -- state
__uint64_t get_parent_dir_version() { return parent_dir_version; }
void float_parent_dir_version(__uint64_t ge) {

View File

@ -20,6 +20,7 @@ ostream& operator<<(ostream& out, CDir& dir)
string path;
dir.get_inode()->make_path(path);
out << "[dir " << dir.ino() << " " << path << "/";
if (dir.is_dirty()) out << " dirty";
if (dir.is_import()) out << " import";
if (dir.is_export()) out << " export";
if (dir.is_auth()) {
@ -47,6 +48,8 @@ ostream& operator<<(ostream& out, CDir& dir)
out << " dir_auth=" << dir.get_dir_auth();
out << " state=" << dir.get_state();
out << " sz=" << dir.get_nitems() << "+" << dir.get_nnull();
out << " " << &dir;
return out << "]";
}
@ -112,32 +115,7 @@ int CDir::get_rep_count(MDCluster *mdc)
void CDir::add_child(CDentry *d)
{
dout(12) << "add_child " << *d << " to " << *this << endl;
}
void CDir::remove_child(CDentry *d) {
dout(12) << "remove_child " << *d << endl;
assert(items.count(d->name));
items.erase(d->name);
}
void CDir::inc_size(CDentry *dn)
{
nitems++;
if (nitems == 1)
get(CDIR_PIN_CHILD); // pin parent
}
void CDir::dec_size(CDentry *dn)
{
nitems--;
if (nitems == 0)
put(CDIR_PIN_CHILD); // release parent.
}
CDentry* CDir::lookup(const string& n) {
//cout << " lookup " << n << " in " << this << endl;
@ -160,6 +138,7 @@ CDentry* CDir::add_dentry( const string& dname, CInode *in )
// create dentry
CDentry* dn = new CDentry(dname, in);
dn->dir = this;
dn->parent_dir_version = version;
// add to dir
assert(items.count(dn->name) == 0);
@ -170,10 +149,18 @@ CDentry* CDir::add_dentry( const string& dname, CInode *in )
if (in) {
link_inode_work( dn, in );
} else {
assert(dn->inode == 0);
null_items[dn->name] = dn;
nnull++;
}
dout(10) << "add_dentry " << *dn << endl;
// pin?
if (nnull + nitems == 1) get(CDIR_PIN_CHILD);
assert(nnull + nitems == items.size());
assert(nnull == null_items.size());
return dn;
}
@ -181,6 +168,8 @@ CDentry* CDir::add_dentry( const string& dname, CInode *in )
void CDir::remove_dentry(CDentry *dn)
{
dout(10) << "remove_dentry " << *dn << endl;
if (dn->inode) {
// detach inode and dentry
unlink_inode_work(dn);
@ -196,17 +185,27 @@ void CDir::remove_dentry(CDentry *dn)
items.erase(dn->name);
delete dn;
// unpin?
if (nnull + nitems == 0) put(CDIR_PIN_CHILD);
assert(nnull + nitems == items.size());
assert(nnull == null_items.size());
}
void CDir::link_inode( CDentry *dn, CInode *in )
{
link_inode_work(dn,in);
dout(10) << "link_inode " << *dn << " " << *in << endl;
// remove from null list
assert(null_items.count(dn->name) == 1);
null_items.erase(dn->name);
nnull--;
assert(nnull + nitems == items.size());
assert(nnull == null_items.size());
}
void CDir::link_inode_work( CDentry *dn, CInode *in )
@ -222,6 +221,9 @@ void CDir::link_inode_work( CDentry *dn, CInode *in )
// clear dangling
in->state_clear(CINODE_STATE_DANGLING);
// dn dirty?
if (dn->is_dirty()) in->get(CINODE_PIN_DNDIRTY);
// adjust auth pin count
if (in->auth_pins + in->nested_auth_pins)
adjust_nested_auth_pins( in->auth_pins + in->nested_auth_pins );
@ -229,12 +231,17 @@ void CDir::link_inode_work( CDentry *dn, CInode *in )
void CDir::unlink_inode( CDentry *dn )
{
dout(10) << "unlink_inode " << *dn << " " << *dn->inode << endl;
unlink_inode_work(dn);
// add to null list
assert(null_items.count(dn->name) == 0);
null_items[dn->name] = dn;
nnull++;
assert(nnull + nitems == items.size());
assert(nnull == null_items.size());
}
void CDir::unlink_inode_work( CDentry *dn )
@ -243,7 +250,7 @@ void CDir::unlink_inode_work( CDentry *dn )
// explicitly define auth
in->dangling_auth = in->authority();
dout(10) << "unlink_inode " << *in << " dangling_auth now " << in->dangling_auth << endl;
//dout(10) << "unlink_inode " << *in << " dangling_auth now " << in->dangling_auth << endl;
// unlink auth_pin count
if (in->auth_pins + in->nested_auth_pins)
@ -251,6 +258,9 @@ void CDir::unlink_inode_work( CDentry *dn )
// set dangling flag
in->state_set(CINODE_STATE_DANGLING);
// dn dirty?
if (dn->is_dirty()) in->put(CINODE_PIN_DNDIRTY);
// detach inode
in->remove_parent(dn);
@ -266,7 +276,28 @@ void CDir::unlink_inode_work( CDentry *dn )
nitems--; // adjust dir size
}
void CDir::remove_null_dentries() {
dout(10) << "remove_null_dentries " << *this << endl;
list<CDentry*> dns;
for (CDir_map_t::iterator it = null_items.begin();
it != null_items.end();
it++) {
dns.push_back(it->second);
}
for (list<CDentry*>::iterator it = dns.begin();
it != dns.end();
it++) {
CDentry *dn = *it;
assert(dn->is_sync());
assert(dn->is_null());
remove_dentry(dn);
}
assert(nnull == 0);
assert(nnull + nitems == items.size());
assert(nnull == null_items.size());
}
@ -481,6 +512,7 @@ void CDir::mark_clean()
{
dout(10) << "mark_clean " << *this << " version " << version << endl;
state_clear(CDIR_STATE_DIRTY);
//assert(nnull == 0); // what about requests in progres... null dentries but dir isn't marked dirty (yet)
}
@ -829,14 +861,18 @@ void CDir::unfreeze_dir()
void CDir::dump(int depth) {
string ind(depth, '\t');
dout(10) << "dump:" << ind << *this << endl;
map<string,CDentry*>::iterator iter = items.begin();
while (iter != items.end()) {
CDentry* d = iter->second;
if (d->inode) {
char isdir = ' ';
if (d->inode->dir != NULL) isdir = '/';
dout(10) << ind << d->inode->inode.ino << " " << d->name << isdir << endl;
dout(10) << "dump: " << ind << *d << " = " << *d->inode << endl;
d->inode->dump(depth+1);
} else {
dout(10) << "dump: " << ind << *d << " = [null]" << endl;
}
iter++;
}

View File

@ -253,6 +253,8 @@ class CDir {
return nitems;
}
size_t get_nitems() { return nitems; }
size_t get_nnull() { return nnull; }
/*
size_t get_auth_size() {
assert(nauthitems <= nitems);
@ -266,13 +268,8 @@ class CDir {
// -- manipulation --
void add_child(CDentry *d);
void remove_child(CDentry *d);
void inc_size(CDentry *dn =0);
void dec_size(CDentry *dn =0);
CDentry* lookup(const string& n);
// dentries and inodes
public:
CDentry* add_dentry( const string& dname, CInode *in=0 );
@ -283,6 +280,7 @@ class CDir {
void link_inode_work( CDentry *dn, CInode *in );
void unlink_inode_work( CDentry *dn );
void remove_null_dentries(); // on empty, clean dir
// -- authority --
public:

View File

@ -51,6 +51,7 @@ ostream& operator<<(ostream& out, CInode& in)
else
out << " " << *it;
}
out << " " << &in;
out << "]";
return out;
}
@ -145,9 +146,7 @@ void CInode::set_auth(bool a)
void CInode::make_path(string& s)
{
if (parent) {
parent->dir->inode->make_path(s);
s += "/";
s += parent->name;
parent->make_path(s);
}
else if (is_root()) {
s = ""; // root

View File

@ -46,7 +46,7 @@ using namespace __gnu_cxx;
#define CINODE_PIN_OPENRD 5
#define CINODE_PIN_OPENWR 6
//#define CINODE_PIN_DNLOCK 7 // dentry is in funny lock state.
#define CINODE_PIN_DNDIRTY 7 // dentry is dirty
#define CINODE_PIN_AUTHPIN 8
@ -72,7 +72,7 @@ static char *cinode_pin_names[CINODE_NUM_PINS] = {
"waiter",
"openrd",
"openwr",
"request",
"dndirty",
"authpin",
"importing",
"presync",
@ -566,7 +566,7 @@ class CInode : LRUObject {
// for giving to clients
void get_dist_spec(set<int>& ls, int auth) {
ls = cached_by;
ls.insert(ls.begin(), auth);
ls.insert(auth);
}

View File

@ -129,7 +129,8 @@ void MDCache::remove_inode(CInode *o)
if (o->get_parent_dn()) {
// FIXME: multiple parents?
CDentry *dn = o->get_parent_dn();
dn->dir->unlink_inode(dn); // unlink
assert(!dn->is_dirty());
dn->dir->remove_dentry(dn); // unlink and hose dentry
}
inode_map.erase(o->ino()); // remove from map
lru->lru_remove(o); // remove from lru
@ -453,18 +454,18 @@ void MDCache::shutdown_start()
it != inode_map.end();
it++) {
CInode *in = it->second;
// commit any dirty dir that's ours
if (in->is_dir() && in->dir && in->dir->is_auth() && in->dir->is_dirty())
mds->mdstore->commit_dir(in->dir, NULL);
//drop locks?
if (in->is_auth()) {
//if (in->is_syncbyme()) inode_sync_release(in);
//if (in->is_lockbyme()) inode_lock_release(in);
}
}
// make sure sticky sync is off
// WHY: if sync sticks it may not unravel of its own accord; sticky
// relies on additional requests/etc. to trigger an unsync when
// needed, but we're just trimming caches.
//g_conf.mdcache_sticky_sync_normal = false;
}
bool MDCache::shutdown_pass()
@ -544,6 +545,7 @@ bool MDCache::shutdown_pass()
} else {
dout(7) << "there's still stuff in the cache: " << lru->lru_get_size() << endl;
show_cache();
dump();
}
return false;
}
@ -986,10 +988,8 @@ int MDCache::path_traverse(filepath& path,
bool MDCache::path_pin(vector<CDentry*>& trace,
Message *req)
Context *c)
{
assert(has_path_pinned.count(req) == 0);
// verify everything is pinnable
for (vector<CDentry*>::iterator it = trace.begin();
it != trace.end();
@ -997,11 +997,11 @@ bool MDCache::path_pin(vector<CDentry*>& trace,
CDentry *dn = *it;
if (!dn->is_pinnable()) {
// wait
if (req) {
if (c) {
dout(7) << "path_pin can't pin " << *dn << ", waiting" << endl;
dn->dir->add_waiter(CDIR_WAIT_DNREAD,
dn->name,
new C_MDS_RetryMessage(mds, req));
c);
} else {
dout(7) << "path_pin can't pin, no waiter, failing." << endl;
}
@ -1016,14 +1016,13 @@ bool MDCache::path_pin(vector<CDentry*>& trace,
(*it)->pin();
dout(10) << "path_pinned " << *(*it) << endl;
}
if (req) has_path_pinned.insert(req);
delete c;
return true;
}
void MDCache::path_unpin(vector<CDentry*>& trace,
Message *req)
void MDCache::path_unpin(vector<CDentry*>& trace)
{
list<Context*> finished;
@ -1038,9 +1037,7 @@ void MDCache::path_unpin(vector<CDentry*>& trace,
if (dn->lockstate == DN_LOCK_UNPINNING && !dn->is_pinned())
dn->dir->take_waiting(CDIR_WAIT_DNUNPINNED, dn->name, finished);
}
if (req) has_path_pinned.erase(req);
// finish any contexts
if (!finished.empty()) finish_contexts(finished);
}
@ -1066,13 +1063,15 @@ bool MDCache::request_start(MClientRequest *req,
assert(active_requests.count(req) == 0);
// pin path
if (!path_pin(trace, req)) return false;
if (trace.size()) {
if (!path_pin(trace, new C_MDS_RetryMessage(mds,req))) return false;
}
dout(7) << "request_start " << *req << endl;
// add to map
active_requests[req].ref = ref;
active_requests[req].trace = trace;
if (trace.size()) active_requests[req].traces[trace[trace.size()-1]] = trace;
return true;
}
@ -1095,22 +1094,24 @@ void MDCache::request_cleanup(MClientRequest *req)
dentry_xlock_finish(dn);
// null? FIXME: should dentry_xlock_finish do this?
if (dn->inode == NULL)
dn->dir->remove_child(dn);
// finish
dn->dir->finish_waiting(CDIR_WAIT_ANY, dn->name);
if (dn->inode == NULL)
delete dn;
// remove clean, null dentry? (from a failed rename or whatever)
if (dn->is_null() && !dn->is_dirty()) {
dn->dir->remove_dentry(dn);
}
}
active_request_xlocks.erase(req);
}
// unpin path
path_unpin(active_requests[req].trace, req);
// unpin paths
for (map< CDentry*, vector<CDentry*> >::iterator it = active_requests[req].traces.begin();
it != active_requests[req].traces.end();
it++) {
path_unpin(it->second);
}
// remove from map
active_requests.erase(req);
@ -1276,6 +1277,7 @@ void MDCache::handle_discover(MDiscover *dis)
// get inode
CDentry *dn = cur->dir->lookup( dis->get_dentry(i) );
/*
if (dn && !dn->can_read()) { // xlocked?
dout(7) << "waiting on " << *dn << endl;
cur->dir->add_waiter(CDIR_WAIT_DNREAD,
@ -1283,7 +1285,22 @@ void MDCache::handle_discover(MDiscover *dis)
new C_MDS_RetryMessage(mds, dis));
return;
}
if (dn && dn->inode) {
*/
if (dn) {
if (!dn->inode && dn->is_sync()) {
dout(7) << "mds" << whoami << " dentry " << dis->get_dentry(i) << " null in " << *cur->dir << ", returning error" << endl;
reply->set_flag_error( dis->get_dentry(i) );
break; // don't replicate null but non-locked dentries.
}
reply->add_dentry( dis->get_dentry(i), !dn->can_read() );
dout(7) << "added dentry " << *dn << endl;
if (!dn->inode) break; // we're done.
}
if (dn && dn->inode) {
CInode *next = dn->inode;
assert(next->is_auth());
@ -1292,17 +1309,16 @@ void MDCache::handle_discover(MDiscover *dis)
next->replicate_relax_locks();
}
// add dentry + inode
reply->add_dentry( dis->get_dentry(i) );
// add inode
int nonce = next->cached_by_add(dis->get_asker());
reply->add_inode( new CInodeDiscover(next,
nonce) );
dout(7) << "added dentry " << dn << " + " << *next << " nonce=" << nonce<< endl;
dout(7) << "added inode " << *next << " nonce=" << nonce<< endl;
// descend
cur = next;
} else {
// don't have it?
// don't have inode?
if (cur->dir->is_complete()) {
// set error flag in reply
dout(7) << "mds" << whoami << " dentry " << dis->get_dentry(i) << " not found in " << *cur->dir << ", returning error" << endl;
@ -1368,6 +1384,21 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
// nowhere!
dout(7) << "discover_reply root + " << m->get_path() << " " << m->get_num_inodes() << " inodes" << endl;
assert(!root);
assert(m->get_base_ino() == 0);
assert(!m->has_base_dentry());
assert(!m->has_base_dir());
// add in root
cur = new CInode(false);
m->get_inode(0).update_inode(cur);
// root
cur->state_set(CINODE_STATE_ROOT);
set_root( cur );
dout(7) << " got root: " << *cur << endl;
// take waiters
finished.swap(waiting_for_root);
} else {
// grab inode
@ -1382,20 +1413,19 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
dout(7) << "discover_reply " << *cur << " + " << m->get_path() << ", have " << m->get_num_inodes() << " inodes" << endl;
}
if (m->is_flag_error()) {
dout(7) << " flag error, dentry = " << m->get_error_dentry() << endl;
}
// fyi
if (m->is_flag_error()) dout(7) << " flag error, dentry = " << m->get_error_dentry() << endl;
// start this loop even if we have no inodes, but just the base_dir
int plus_root_dir = (m->get_num_inodes() == 1 && m->has_root()) ? 1:0; //ugly, sorry, see handle_discover
for (int i=0; i<(m->get_num_inodes() + m->has_base_dir() + plus_root_dir + m->is_flag_error()); i++) {
// loop over discover results.
// indexese follow each ([[dir] dentry] inode)
// can start, end with any type.
for (int i=m->has_root(); i<m->get_depth(); i++) {
// dir
if ((i > 0 && i < m->get_num_inodes()) ||
(i == 0 && m->has_base_dir()) ||
(i == 1 && m->has_root())) {
if ((i > 0) ||
(i == 0 && m->has_base_dir())) {
if (cur->dir) {
// had it
/* this is strange, but it happens when:
@ -1417,6 +1447,7 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
}
}
// dentry error?
if (m->is_flag_error()) {
// error!
assert(cur->is_dir());
@ -1432,99 +1463,71 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
break;
}
if (i >= m->get_num_inodes()) break;
if (i >= m->get_num_dentries()) break;
// dentry
dout(7) << "i = " << i << " dentry is " << m->get_dentry(i) << endl;
dout(7) << "i = " << i << " ino is " << m->get_ino(i) << endl;
// lookup dentry
CInode *in = 0;
CDentry *dn = 0;
if (i || m->has_base_dentry()) {
if (i > 0 ||
m->has_base_dentry()) {
dn = cur->dir->lookup( m->get_dentry(i) );
if (dn) in = dn->get_inode();
if (dn) {
dout(7) << "had " << *dn << endl;
} else {
dn = cur->dir->add_dentry( m->get_dentry(i) );
if (m->get_dentry_xlock(i)) {
dout(7) << " new dentry is xlock " << *dn << endl;
dn->lockstate = DN_LOCK_XLOCK;
}
dout(7) << "added " << *dn << endl;
}
cur->dir->take_waiting(CDIR_WAIT_DENTRY,
m->get_dentry(i),
finished);
}
if (i >= m->get_num_inodes()) break;
// inode
if (dn) {
// had dentry
/*
this should _only_ happen if the dentry was created after we sent our request,
which should _only_ happen if a rename took place.
so, DON'T overwrite the dentry, the discover has _old_ info!
*/
dout(7) << "had " << *dn << ", hopefully because it got renamed into place?" << endl;
dout(7) << "dentry currently linked to " << *in << endl;
dout(7) << "discover gave me ino " << m->get_ino(i) << endl;
if (in->ino() == m->get_ino(i)) {
in->replica_nonce = m->get_inode(i).get_replica_nonce();
dout(7) << "they match, just updating nonce for " << *in << endl;
} else {
// ????
CInode *o = get_inode(in->ino());
if (o) {
dout(7) << "well, i do have the inode: " << *o << endl;
o->replica_nonce = m->get_inode(i).get_replica_nonce();
}
// why does this happen:::: FIXME
assert(0);
}
// -> if it is the same, pbly my discover got hung up.
//assert(in->ino() != m->get_ino(i) );
//assert(0); // just so i can see this happen and make sure it looks right!
break; // i'm done.
//assert(0); // ?? XXX
//m->get_inode(i).update_inode(in);
//dout2(7) << ", now " << *in << endl;
} else {
// add inode
in = get_inode( m->get_inode(i).get_ino() );
if (in) {
dout(7) << "i have this inode: " << *in << " but it's not linked via dentry " << m->get_dentry(i) << endl;
// fix nonce
dout(7) << " my nonce is " << in->replica_nonce << ", taking from discover, which has " << m->get_inode(i).get_replica_nonce() << endl;
in->replica_nonce = m->get_inode(i).get_replica_nonce();
// link
cur->dir->add_dentry( m->get_dentry(i), in );
dout(7) << "XXX linked " << *in << endl;
cur->dir->take_waiting(CDIR_WAIT_DENTRY,
m->get_dentry(i),
finished);
} else {
// didn't have it.
in = new CInode(false);
m->get_inode(i).update_inode(in);
if (!i && m->has_root()) {
// root
in->state_set(CINODE_STATE_ROOT);
set_root( in );
dout(7) << " got root: " << *in << endl;
finished.splice(finished.end(), waiting_for_root);
} else {
// link in
add_inode( in );
cur->dir->add_dentry( m->get_dentry(i), in );
cur->dir->take_waiting(CDIR_WAIT_DENTRY,
m->get_dentry(i),
finished);
}
dout(7) << "i = " << i << " ino is " << m->get_ino(i) << endl;
CInode *in = get_inode( m->get_inode(i).get_ino() );
assert(dn);
if (in) {
dout(7) << "had " << *in << endl;
dout(7) << "added " << *in << " nonce " << in->replica_nonce << endl;
// fix nonce
dout(7) << " my nonce is " << in->replica_nonce << ", taking from discover, which has " << m->get_inode(i).get_replica_nonce() << endl;
in->replica_nonce = m->get_inode(i).get_replica_nonce();
if (dn && in != dn->inode) {
dout(7) << " but it's not linked via dentry " << *dn << endl;
// link
if (dn->inode) {
dout(7) << "dentry WAS linked to " << *dn->inode << endl;
assert(0); // WTF.
}
dn->dir->link_inode(dn, in);
}
}
else {
assert(dn->inode == 0); // better not be something else linked to this dentry...
// didn't have it.
in = new CInode(false);
m->get_inode(i).update_inode(in);
// link in
add_inode( in );
dn->dir->link_inode(dn, in);
dout(7) << "added " << *in << " nonce " << in->replica_nonce << endl;
}
// onward!
cur = in;
}
@ -1820,6 +1823,12 @@ void MDCache::dentry_unlink(CDentry *dn, Context *c)
// don't need ack.
}
if (dn->inode && dn->inode->dir) {
// mark dir clean, since it dne!
dn->inode->dir->remove_null_dentries();
dn->inode->dir->mark_clean();
}
// unlink
dn->dir->unlink_inode( dn );
dn->mark_dirty();
@ -1853,6 +1862,10 @@ void MDCache::handle_dentry_unlink(MDentryUnlink *m)
} else {
dout(7) << "handle_dentry_unlink on " << *dn << endl;
// dir?
if (dn->inode && dn->inode->dir)
dn->inode->dir->remove_null_dentries();
string dname = dn->name;
// unlink
@ -1895,7 +1908,7 @@ public:
};
*/
void MDCache::file_rename(CDentry *srcdn, CDentry *destdn, Context *c)
void MDCache::file_rename(CDentry *srcdn, CDentry *destdn, Context *c, bool everyone)
{
assert(srcdn->is_xlocked()); // by me
assert(destdn->is_xlocked()); // by me
@ -1909,6 +1922,8 @@ void MDCache::file_rename(CDentry *srcdn, CDentry *destdn, Context *c)
CInode *in = srcdn->inode;
Message *req = srcdn->xlockedby;
dump();
// update our cache
rename_file(srcdn, destdn);
@ -1924,12 +1939,18 @@ void MDCache::file_rename(CDentry *srcdn, CDentry *destdn, Context *c)
// tell replicas (no need to wait for ack) to do the same, and un-xlock.
// make list
set<int> who = srcdir->get_open_by();
for (set<int>::iterator it = destdir->open_by_begin();
it != destdir->open_by_end();
it++)
if (who.count(*it) == 0) who.insert(*it);
set<int> who;
if (everyone) {
for (int i=0; i<mds->get_cluster()->get_num_mds(); i++)
if (i != mds->get_nodeid()) who.insert(i);
} else {
who = srcdir->get_open_by();
for (set<int>::iterator it = destdir->open_by_begin();
it != destdir->open_by_end();
it++)
if (who.count(*it) == 0) who.insert(*it);
}
// tell
for (set<int>::iterator it = who.begin();
it != who.end();
@ -1985,30 +2006,37 @@ void MDCache::handle_rename_local_file(MRenameLocalFile*m)
if (srcdn && destdir) {
CInode *in = srcdn->inode;
if (!destdn) destdn = destdir->add_dentry(m->get_destname()); // create null dentry
if (!destdn) {
destdn = destdir->add_dentry(m->get_destname()); // create null dentry
destdn->lockstate = DN_LOCK_XLOCK; // that's xlocked!
assert(!srcdn->inode->is_dir());
}
dout(7) << "handle_rename_local_file renaming " << *srcdn << " to " << *destdn << endl;
rename_file(srcdn, destdn);
srcdir->remove_dentry(srcdn);
// remove src dentry
srcdn->dir->remove_dentry(srcdn);
// update imports/exports?
if (in->is_dir() && in->dir)
fix_renamed_dir(srcdir, in, destdir, false); // auth didnt change
srcdir->finish_waiting(CDIR_WAIT_ANY, srcdn->name);
destdir->finish_waiting(CDIR_WAIT_ANY, destdn->name);
srcdir->finish_waiting(CDIR_WAIT_ANY, m->get_srcname());
destdir->finish_waiting(CDIR_WAIT_ANY, m->get_destname());
}
else if (srcdn) {
dout(7) << "handle_rename_local_file unlinking src only " << *srcdn << endl;
assert(!srcdn->inode->is_dir());
srcdir->remove_dentry(srcdn); // will leave inode dangling.
srcdir->finish_waiting(CDIR_WAIT_ANY, m->get_srcname());
}
else if (destdn) {
dout(7) << "handle_rename_local_file unlinking dst only " << *destdn << endl;
srcdir->remove_dentry(destdn);
destdir->remove_dentry(destdn);
destdir->finish_waiting(CDIR_WAIT_ANY, m->get_destname());
}
@ -3322,7 +3350,7 @@ void MDCache::handle_lock_dir(MLock *m)
// DENTRY
bool MDCache::dentry_xlock_start(CDentry *dn, const string& path, MClientRequest *m, CInode *ref)
bool MDCache::dentry_xlock_start(CDentry *dn, MClientRequest *m, CInode *ref, bool all_nodes)
{
dout(7) << "dentry_xlock_start on " << *dn << endl;
@ -3340,11 +3368,11 @@ bool MDCache::dentry_xlock_start(CDentry *dn, const string& path, MClientRequest
// prelock?
if (dn->lockstate == DN_LOCK_PREXLOCK) {
if (dn->xlockedby == m) {
dout(7) << "dentry " << *dn << " prelock by me" << endl;
dout(7) << "dentry " << *dn << " prexlock by me" << endl;
dn->dir->add_waiter(CDIR_WAIT_DNLOCK, dn->name,
new C_MDS_RetryRequest(mds,m,ref));
} else {
dout(7) << "dentry " << *dn << " prelock by someone else" << endl;
dout(7) << "dentry " << *dn << " prexlock by someone else" << endl;
dn->dir->add_waiter(CDIR_WAIT_DNREAD, dn->name,
new C_MDS_RetryRequest(mds,m,ref));
}
@ -3364,7 +3392,7 @@ bool MDCache::dentry_xlock_start(CDentry *dn, const string& path, MClientRequest
return false;
}
// path pinned?
// is dentry path pinned?
if (dn->is_pinned()) {
dout(7) << "dentry " << *dn << " pinned, waiting" << endl;
dn->lockstate = DN_LOCK_UNPINNING;
@ -3374,18 +3402,49 @@ bool MDCache::dentry_xlock_start(CDentry *dn, const string& path, MClientRequest
return false;
}
// pin path up to dentry! (if success, point of no return)
CDentry *pdn = dn->dir->inode->get_parent_dn();
if (pdn) {
if (active_requests[m].traces.count(pdn)) {
dout(7) << "already path pinned parent dentry " << *pdn << endl;
} else {
dout(7) << "pinning parent dentry " << *pdn << endl;
vector<CDentry*> trace;
make_trace(trace, pdn->inode);
assert(trace.size());
if (!path_pin(trace, new C_MDS_RetryRequest(mds, m, ref))) return false;
active_requests[m].traces[trace[trace.size()-1]] = trace;
}
}
// pin dir!
dn->dir->auth_pin();
// mine!
dn->xlockedby = m;
if (dn->dir->is_open_by_anyone()) {
dn->gather_set = dn->dir->get_open_by();
if (dn->dir->is_open_by_anyone() || all_nodes) {
dn->lockstate = DN_LOCK_PREXLOCK;
// xlock with whom?
set<int> who;
if (all_nodes) {
for (int i=0; i<mds->get_cluster()->get_num_mds(); i++)
if (i != mds->get_nodeid()) who.insert(i);
} else {
who = dn->dir->get_open_by();
}
dn->gather_set = who;
for (set<int>::iterator it = dn->dir->open_by_begin();
it != dn->dir->open_by_end();
// make path
string path;
dn->make_path(path);
dout(10) << "path is " << path << " for " << *dn << endl;
for (set<int>::iterator it = who.begin();
it != who.end();
it++) {
MLock *m = new MLock(LOCK_AC_LOCK, mds->get_nodeid());
m->set_dn(dn->dir->ino(), dn->name);
@ -3419,7 +3478,7 @@ void MDCache::dentry_xlock_finish(CDentry *dn, bool quiet)
if (!quiet) {
// tell replicas?
if (dn->inode && // if NULL, don't bother.
if (//dn->inode && // if NULL, don't bother.
dn->dir->is_open_by_anyone()) {
for (set<int>::iterator it = dn->dir->open_by_begin();
it != dn->dir->open_by_end();
@ -3435,12 +3494,18 @@ void MDCache::dentry_xlock_finish(CDentry *dn, bool quiet)
// unpin dir
dn->dir->auth_unpin();
dump();
}
CDentry* MDCache::create_xlocked_dentry(CDir *dir, const string& dname, MClientRequest *req, CInode *ref)
/*
CDentry* MDCache::create_xlocked_dentry(CDir *dir, const string& dname, MClientRequest *req, CInode *ref, bool all_nodes)
{
assert(dir->lookup(dname) == 0);
dir->add_dentry(dname);
return dentry_xlock_start
// dir pinnable?
if (!dir->can_auth_pin()) {
dout(7) << "create_xlocked_dentry dir" << *dir << " not pinnable, waiting" << endl;
@ -3465,21 +3530,16 @@ CDentry* MDCache::create_xlocked_dentry(CDir *dir, const string& dname, MClientR
return dn;
}
/*
bool MDCache::dentry_sync(CDentry *dn)
{
}
*/
void MDCache::handle_lock_dn(MLock *m)
{
assert(m->get_otype() == LOCK_OTYPE_DN);
CInode *diri = get_inode(m->get_ino());
CDir *dir = diri->dir; // may be null
CInode *diri = get_inode(m->get_ino()); // may be null
CDir *dir = 0;
if (diri) dir = diri->dir; // may be null
string dname = m->get_dn();
int from = m->get_asker();
CDentry *dn = 0;
@ -3494,7 +3554,7 @@ void MDCache::handle_lock_dn(MLock *m)
if (dir->is_proxy()) {
// fw
assert(dauth >= 0);
dout(7) << "handle_lock_dn " << m->get_ino() << " dname " << dname << " from " << from << ": proxy, fw to " << dauth << endl;
dout(7) << "handle_lock_dn " << m << " " << m->get_ino() << " dname " << dname << " from " << from << ": proxy, fw to " << dauth << endl;
mds->messenger->send_message(m,
MSG_ADDR_MDS(dauth), MDS_PORT_CACHE,
MDS_PORT_CACHE);
@ -3507,8 +3567,9 @@ void MDCache::handle_lock_dn(MLock *m)
// replica
if (dir) dn = dir->lookup(dname);
if (!dn) {
dout(7) << "handle_lock_dn don't have " << m->get_ino() << " dname " << dname << endl;
if (0) {
dout(7) << "handle_lock_dn " << m << " don't have " << m->get_ino() << " dname " << dname << endl;
if (m->get_action() == LOCK_AC_LOCK) {
dout(7) << "handle_lock_dn don't have " << m->get_path() << ", discovering" << endl;
vector<CDentry*> trace;
@ -3517,8 +3578,10 @@ void MDCache::handle_lock_dn(MLock *m)
m, new C_MDS_RetryMessage(mds,m),
MDS_TRAVERSE_DISCOVER);
assert(r>0);
} else {
dout(7) << "safely ignoring." << endl;
}
if (1) {
if (0) {
if (m->get_action() == LOCK_AC_LOCK) {
// NAK
MLock *reply = new MLock(LOCK_AC_LOCKNAK, mds->get_nodeid());
@ -3539,7 +3602,8 @@ void MDCache::handle_lock_dn(MLock *m)
// -- replica --
case LOCK_AC_LOCK:
assert(dn->lockstate == DN_LOCK_SYNC ||
dn->lockstate == DN_LOCK_UNPINNING);
dn->lockstate == DN_LOCK_UNPINNING ||
dn->lockstate == DN_LOCK_XLOCK); // <-- bc the handle_lock_dn did the discover!
if (dn->is_pinned()) {
dn->lockstate = DN_LOCK_UNPINNING;
@ -4181,6 +4245,7 @@ void MDCache::export_dir_walk(MExportDir *req,
CInode *in = dn->inode;
// -- dentry
dout(7) << "export_dir_walk exporting " << *dn << endl;
dir_rope.append( it->first.c_str(), it->first.length()+1 );
if (dn->is_dirty())
@ -4338,6 +4403,9 @@ void MDCache::handle_export_dir_notify_ack(MExportDirNotifyAck *m)
if (dn->is_null()) {
assert(dn->is_sync());
dir->remove_dentry(dn);
} else {
dout(10) << "export_dir_notify_ack leaving xlocked neg " << *dn << endl;
dn->mark_clean();
}
}
}
@ -4366,10 +4434,10 @@ void MDCache::export_dir_finish(CDir *dir)
dir->unfreeze_tree();
// unpin path
dout(7) << "export_dir_finish unpinnning path" << endl;
dout(7) << "export_dir_finish unpinning path" << endl;
vector<CDentry*> trace;
make_trace(trace, dir->inode);
path_unpin(trace, 0);
path_unpin(trace);
show_imports();
}
@ -4913,13 +4981,14 @@ void MDCache::import_dir_block(crope& r,
if (!dn)
dn = dir->add_dentry(dname); // null
if (dirty == 'D') dn->mark_dirty();
// mark dn dirty _after_ we link the inode (scroll down)
if (icode == 'N') {
// null dentry
assert(dn->is_null());
// fall thru
}
else if (icode == 'I') {
// inode
@ -4979,6 +5048,9 @@ void MDCache::import_dir_block(crope& r,
NULL); // FIXME pay attention to completion?
}
}
// mark dentry dirty? (only _after_ we link the inode)
if (dirty == 'D') dn->mark_dirty();
}
@ -5807,12 +5879,14 @@ void MDCache::show_cache()
it != inode_map.end();
it++) {
dout(7) << "cache " << *((*it).second);
/*
if ((*it).second->ref)
dout2(7) << " pin " << (*it).second->ref_set;
if ((*it).second->cached_by.size())
dout2(7) << " cache_by " << (*it).second->cached_by;
*/
if ((*it).second->dir)
dout2(7) << " " << *(*it).second->dir;
dout2(7) << " ... " << *(*it).second->dir;
dout2(7) << endl;
}
}

View File

@ -56,7 +56,7 @@ typedef const char* pchar;
typedef struct {
CInode *ref; // reference inode
vector<CDentry*> trace; // path trace
map< CDentry*, vector<CDentry*> > traces; // path pins held
} active_request_t;
@ -97,6 +97,7 @@ class MDCache {
// active MDS requests
public:
map<MClientRequest*, active_request_t> active_requests;
map<Message*, set<CDentry*> > active_request_xlocks;
@ -173,11 +174,8 @@ class MDCache {
Context *onfinish=0);
void open_remote_dir(CInode *diri, Context *fin);
set<Message*> has_path_pinned;
bool path_pin(vector<CDentry*>& trace,
Message *req);
void path_unpin(vector<CDentry*>& trace,
Message *req);
bool path_pin(vector<CDentry*>& trace, Context *c);
void path_unpin(vector<CDentry*>& trace);
void make_trace(vector<CDentry*>& trace, CInode *in);
bool request_start(MClientRequest *req,
@ -202,7 +200,7 @@ class MDCache {
void dentry_unlink(CDentry *in, Context *c);
void handle_dentry_unlink(MDentryUnlink *m);
void file_rename(CDentry *srcdn, CDentry *destdn, Context *c);
void file_rename(CDentry *srcdn, CDentry *destdn, Context *c, bool everyone);
void handle_rename_local_file(MRenameLocalFile*m);
@ -336,9 +334,10 @@ class MDCache {
void handle_lock_dir(MLock *m);
// dentry
bool dentry_xlock_start(CDentry *dn, const string& path, MClientRequest *m, CInode *ref);
bool dentry_xlock_start(CDentry *dn,
MClientRequest *m, CInode *ref,
bool allnodes=false);
void dentry_xlock_finish(CDentry *dn, bool quiet=false);
CDentry* create_xlocked_dentry(CDir *dir, const string& name, MClientRequest *req, CInode *ref);
void handle_lock_dn(MLock *m);

View File

@ -432,6 +432,9 @@ void MDS::handle_client_request(MClientRequest *req)
dout(10) << "ref is " << *ref << endl;
// rename doesn't pin a path (initially)
if (req->get_op() == MDS_OP_RENAME) trace.clear();
// register
if (!mdcache->request_start(req, ref, trace))
return;
@ -789,8 +792,10 @@ void MDS::handle_client_readdir(MClientRequest *req,
CDir_map_t::iterator it;
int numfiles = 0;
for (it = cur->dir->begin(); it != cur->dir->end(); it++) {
CInode *in = it->second->inode;
if (!in) continue; // xlocked pre-rename?
//string name = it->first;
CDentry *dn = it->second;
CInode *in = dn->inode;
if (!in) continue; // null
c_inode_info *i = new c_inode_info;
i->inode = in->inode;
in->get_dist_spec(i->dist, whoami);
@ -884,21 +889,23 @@ CInode *MDS::mknod(MClientRequest *req, CInode *diri, bool okexist)
// make sure name doesn't already exist
CDentry *dn = dir->lookup(name);
if (dn != 0) {
if (dn) {
if (!dn->can_read(req)) {
dout(10) << "waiting on (existing!) dentry " << *dn << endl;
dir->add_waiter(CDIR_WAIT_DNREAD, name, new C_MDS_RetryRequest(this, req, diri));
return 0;
}
// name already exists
if (okexist) {
dout(10) << "dentry " << name << " exists in " << *dir << endl;
return dn->inode;
} else {
dout(10) << "dentry " << name << " exists in " << *dir << endl;
reply_request(req, -EEXIST);
return 0;
if (!dn->is_null()) {
// name already exists
if (okexist) {
dout(10) << "dentry " << name << " exists in " << *dir << endl;
return dn->inode;
} else {
dout(10) << "dentry " << name << " exists in " << *dir << endl;
reply_request(req, -EEXIST);
return 0;
}
}
}
@ -919,7 +926,10 @@ CInode *MDS::mknod(MClientRequest *req, CInode *diri, bool okexist)
newi->inode.atime = 1; // now, FIXME
// link
dn = dir->add_dentry(name, newi);
if (!dn)
dn = dir->add_dentry(name, newi);
else
dir->link_inode(dn, newi);
// mark dirty
dn->mark_dirty();
@ -1014,7 +1024,7 @@ void MDS::handle_client_unlink(MClientRequest *req,
// null?
if (dn->is_null()) {
dout(10) << " it's null " << *dn << endl;
dout(10) << "unlink on null dn " << *dn << endl;
reply_request(req, -ENOENT);
return;
}
@ -1105,7 +1115,7 @@ void MDS::handle_client_unlink(MClientRequest *req,
dout(7) << "handle_client_unlink/rmdir on " << *in << endl;
// xlock dentry
if (!mdcache->dentry_xlock_start(dn, req->get_path(), req, diri))
if (!mdcache->dentry_xlock_start(dn, req, diri))
return;
// it's locked, unlink!
@ -1206,27 +1216,31 @@ void MDS::handle_client_rename(MClientRequest *req,
// src dentry
CDentry *srcdn = srcdir->lookup(srcname); // FIXME for hard links
if (!srcdn) {
if (srcdir->is_complete()) {
dout(10) << "handle_client_rename src dne " << endl;
reply_request(req, -EEXIST);
return;
} else {
dout(10) << "readding incomplete dir" << endl;
mdstore->fetch_dir(srcdir,
new C_MDS_RetryRequest(this, req, srcdiri));
return;
}
}
// xlocked?
if (!srcdn->can_read(req)) {
if (srcdn && !srcdn->can_read(req)) {
dout(10) << " waiting on " << *srcdn << endl;
srcdir->add_waiter(CDIR_WAIT_DNREAD,
srcname,
new C_MDS_RetryRequest(this, req, srcdiri));
return;
}
if ((srcdn && !srcdn->inode) ||
(!srcdn && srcdir->is_complete())) {
dout(10) << "handle_client_rename src dne " << endl;
reply_request(req, -EEXIST);
return;
}
if (!srcdn && !srcdir->is_complete()) {
dout(10) << "readding incomplete dir" << endl;
mdstore->fetch_dir(srcdir,
new C_MDS_RetryRequest(this, req, srcdiri));
return;
}
assert(srcdn && srcdn->inode);
dout(10) << "handle_client_rename " << *srcdn << " to " << req->get_sarg() << endl;
@ -1261,6 +1275,7 @@ void MDS::handle_client_rename_2(MClientRequest *req,
CDir* destdir = 0;
string destname;
int destauth = -1;
bool result;
// what is the dest? (dir or file or complete filename)
// note: trace includes root, destpath doesn't (include leading /)
@ -1475,51 +1490,53 @@ void MDS::handle_client_rename_file(MClientRequest *req,
dout(7) << "dest dne" << endl;
}
bool everybody = false;
if (true || srcdn->inode->is_dir()) {
/* overkill warning: lock w/ everyone for simplicity.
i could limit this to cases where something beneath me is exported.
could possibly limit the list.
main constrained is that, regardless of the order i do the xlocks, and whatever
imports/exports might happen in the process, the destdir _must_ exist on any node
importing something beneath me when rename finishes.
*/
dout(7) << "overkill? doing xlocks with _all_ nodes" << endl;
everybody = true;
}
// pick lock ordering
if (req->get_path() < req->get_sarg()) {
// src
if (!srcdn->is_xlockedbyme(req) &&
!mdcache->dentry_xlock_start(srcdn, req->get_path(), req, srcdiri))
!mdcache->dentry_xlock_start(srcdn, req, srcdiri, everybody))
return;
dout(7) << "srcdn is xlock " << *srcdn << endl;
// dest
if (destdn) {
if (!destdn->is_xlockedbyme(req) &&
!mdcache->dentry_xlock_start(destdn, req->get_sarg(), req, srcdiri))
return;
dout(7) << "destdn is xlock " << *srcdn << endl;
} else {
destdn = mdcache->create_xlocked_dentry(destdir, destname, req, srcdiri);
if (!destdn) return;
dout(7) << "created destdn " << *destdn << endl;
}
if (!destdn) destdn = destdir->add_dentry(destname);
if (!destdn->is_xlockedbyme(req) &&
!mdcache->dentry_xlock_start(destdn, req, srcdiri, everybody))
return;
dout(7) << "destdn is xlock " << *destdn << endl;
} else {
// dest
if (destdn) {
if (!destdn->is_xlockedbyme(req) &&
!mdcache->dentry_xlock_start(destdn, req->get_sarg(), req, srcdiri))
return;
dout(7) << "destdn is xlock " << *srcdn << endl;
} else {
destdn = mdcache->create_xlocked_dentry(destdir, destname, req, srcdiri);
if (!destdn) return;
dout(7) << "created destdn " << *destdn << endl;
}
if (!destdn) destdn = destdir->add_dentry(destname);
if (!destdn->is_xlockedbyme(req) &&
!mdcache->dentry_xlock_start(destdn, req, srcdiri, everybody))
return;
dout(7) << "destdn is xlock " << *destdn << endl;
// src
if (!srcdn->is_xlockedbyme(req) &&
!mdcache->dentry_xlock_start(srcdn, req->get_path(), req, srcdiri))
!mdcache->dentry_xlock_start(srcdn, req, srcdiri, everybody))
return;
dout(7) << "srcdn is xlock " << *srcdn << endl;
}
// we're a go.
// forget about my locks!
mdcache->file_rename( srcdn, destdn,
new C_MDS_RenameFinish(this, req) );
new C_MDS_RenameFinish(this, req),
everybody );
}

View File

@ -410,13 +410,14 @@ void MDStore::do_commit_dir_2( int result,
dout(11) << "do_commit_dir_2 hashcode " << hashcode << " " << *dir << endl;
// mark inodes and dentries clean too (if we committed them!)
list<CDentry*> null_clean;
for (CDir_map_t::iterator it = dir->begin();
it != dir->end();
it++) {
it != dir->end(); ) {
CDentry *dn = it->second;
it++;
if (hashcode >= 0) {
int dentryhashcode = mds->get_cluster()->hash_dentry( dir->ino(), it->first );
int dentryhashcode = mds->get_cluster()->hash_dentry( dir->ino(), dn->get_name() );
if (dentryhashcode != hashcode) continue;
}
@ -432,7 +433,7 @@ void MDStore::do_commit_dir_2( int result,
// remove, if it's null and unlocked
if (dn->is_null() && dn->is_sync()) {
dout(5) << " removing clean and null " << *dn << endl;
dir->remove_dentry(dn);
null_clean.push_back(dn);
continue;
}
} else {
@ -463,6 +464,12 @@ void MDStore::do_commit_dir_2( int result,
assert(in->is_dirty());
}
}
// remove null clean dentries
for (list<CDentry*>::iterator it = null_clean.begin();
it != null_clean.end();
it++)
dir->remove_dentry(*it);
// unpin
dir->auth_unpin();

View File

@ -14,7 +14,7 @@ using namespace std;
// root: inode + dir
// dir dis: dir
#define max(a,b) ((a)>(b) ? (a):(b))
class MDiscoverReply : public Message {
inodeno_t base_ino;
@ -24,18 +24,37 @@ class MDiscoverReply : public Message {
bool flag_error;
string error_dentry; // dentry that was not found (to trigger waiters on asker)
// ... + dir + dentry + inode
// inode [ + ... ], base_ino = 0 : discover base_ino=0, start w/ root ino
// dentry + inode [ + ... ] : discover want_base_dir=false
// (dir + dentry + inode) + : discover want_base_dir=true
// in general,
// dir [ + dentry [ + inode [ + ...repeat... ]]]
// where that (dir, dentry, inode) share the same index.
// we can start and end with any type.
// dir [ + ... ] : discover want_base_dir=true
// dentry [ + inode [ + ... ] ] : discover want_base_dir=false
// no_base_dir=true
// -> we only exclude inode if dentry is null+xlock
// inode [ + ... ], base_ino = 0 : discover base_ino=0, start w/ root ino,
// no_base_dir=no_base_dentry=true
vector<CDirDiscover*> dirs; // not inode-aligned if no_base_dir = true.
filepath path; // not inode-aligned in no_base_dentry = true
filepath path; // not inode-aligned if no_base_dentry = true
vector<bool> path_xlock;
vector<CInodeDiscover*> inodes;
public:
// accessors
inodeno_t get_base_ino() { return base_ino; }
int get_num_inodes() { return inodes.size(); }
int get_num_dentries() { return path.depth(); }
int get_num_dirs() { return dirs.size(); }
int get_depth() { // return depth of deepest object (in dir/dentry/inode units)
return max( inodes.size(),
max( path.depth() + no_base_dentry + is_flag_error(),
dirs.size() + no_base_dir ));
}
bool has_base_dir() { return !no_base_dir; }
bool has_base_dentry() { return !no_base_dentry; }
@ -47,14 +66,16 @@ class MDiscoverReply : public Message {
return false;
}
string& get_path() { return path.get_path(); }
bool get_path_xlock(int i) { return path_xlock[i]; }
bool is_flag_forward() { return flag_forward; }
bool is_flag_error() { return flag_error; }
string& get_error_dentry() { return error_dentry; }
// these index _arguments_ are aligned to the inodes.
// these index _arguments_ are aligned to each ([[dir, ] dentry, ] inode) set.
CDirDiscover& get_dir(int n) { return *(dirs[n - no_base_dir]); }
string& get_dentry(int n) { return path[n - no_base_dentry]; }
bool get_dentry_xlock(int n) { return path_xlock[n - no_base_dentry]; }
CInodeDiscover& get_inode(int n) { return *(inodes[n]); }
inodeno_t get_ino(int n) { return inodes[n]->get_ino(); }
@ -80,12 +101,13 @@ class MDiscoverReply : public Message {
// builders
bool is_empty() {
return dirs.empty() && inodes.empty() && !flag_forward && !flag_error;
return dirs.empty() && path.depth() == 0 && inodes.empty() && !flag_forward && !flag_error;
}
void set_path(filepath& dp) { path = dp; }
void add_dentry(string& dn) {
void add_dentry(string& dn, bool xlock) {
if (inodes.empty() && path.depth() == 0) no_base_dentry = false;
path.add_dentry(dn);
path_xlock.push_back(xlock);
}
void add_inode(CInodeDiscover* din) {
@ -139,6 +161,17 @@ class MDiscoverReply : public Message {
// filepath
off = path._unrope(r, off);
dout(12) << path.depth() << " dentries out" << endl;
// xlock
r.copy(off, sizeof(int), (char*)&n);
off += sizeof(int);
for (int i=0; i<n; i++) {
bool b;
r.copy(off, sizeof(bool), (char*)&b);
off += sizeof(bool);
path_xlock.push_back(b);
}
}
virtual void encode_payload(crope& r) {
r.append((char*)&base_ino, sizeof(base_ino));
@ -170,6 +203,15 @@ class MDiscoverReply : public Message {
// path
r.append(path._rope());
dout(12) << path.depth() << " dentries in" << endl;
// path_xlock
n = path_xlock.size();
for (vector<bool>::iterator it = path_xlock.begin();
it != path_xlock.end();
it++) {
bool b = *it;
r.append((char*)&b, sizeof(bool));
}
}
};

View File

@ -9,8 +9,9 @@ while (<>) {
# cinode:auth_pin on inode [1000000002625 /gnu/blah_client_created. 0x89b7700] count now 1 + 0
if (/path_pinned /) {
my ($what) = /\[dentry (\S+) \d+ pins /;
# print "add_waiter $c $what\n";
my ($dname, $dir) = /\[dentry (\S+) .* in \[dir (\d+) /;
$what = "$dname $dir";
#print "$l pin $what\n";
$pin{$what}++;
$hist{$what} .= "$l: $_";
push( @pins, $what ) unless grep {$_ eq $what} @pins;
@ -19,7 +20,9 @@ while (<>) {
# cinode:auth_unpin on inode [1000000002625 (dangling) 0x89b7700] count now 0 + 0
if (/path_unpinned/) {
my ($what) = /\[dentry (\S+) /;# / on (.*\])/;
my ($dname, $dir) = /\[dentry (\S+) .* in \[dir (\d+) /;
$what = "$dname $dir";
#print "$l unpin $what\n";
$pin{$what}--;
$hist{$what} .= "$l: $_";
unless ($pin{$what}) {