build fat nfs handles, and add mds GETINODE op to resolve them

This commit is contained in:
Sage Weil 2008-05-09 22:47:04 -07:00
parent 1930d85a55
commit ed1748f726
11 changed files with 266 additions and 93 deletions

View File

@ -1,5 +1,6 @@
userspace client
- rm -rf on fragmented directory
- move the size check(s) on read from _read() into FileCache
- time out caps, wake up waiters on renewal
- validate dn leases
@ -11,9 +12,8 @@ userspace client
- also needs cope with mds failures
kernel client
- large directories
- frag_map vs frag_tree in ci
- readdir
- direct mds requests intelligently
- readdir on large directories
- flush caps on sync, fsync, etc.
- do we need to block?
- timeout mds session close on umount

View File

@ -69,8 +69,6 @@ struct ceph_timespec {
/*
* dir fragments
*/
typedef __le32 ceph_frag_t;
static inline __u32 frag_make(__u32 b, __u32 v) { return (b << 24) | (v & (0xffffffu >> (24-b))); }
static inline __u32 frag_bits(__u32 f) { return f >> 24; }
static inline __u32 frag_value(__u32 f) { return f & 0xffffffu; }
@ -190,6 +188,45 @@ struct ceph_eversion {
/*
* string hash.
*
* taken from Linux, tho we should probably take care to use this one
* in case the upstream hash changes.
*/
/* Name hashing routines. Initial hash value */
/* Hash courtesy of the R5 hash in reiserfs modulo sign bits */
#define ceph_init_name_hash() 0
/* partial hash update function. Assume roughly 4 bits per character */
static inline unsigned long
ceph_partial_name_hash(unsigned long c, unsigned long prevhash)
{
return (prevhash + (c << 4) + (c >> 4)) * 11;
}
/*
* Finally: cut down the number of bits to a int value (and try to avoid
* losing bits)
*/
static inline unsigned long ceph_end_name_hash(unsigned long hash)
{
return (unsigned int) hash;
}
/* Compute the hash for a name string. */
static inline unsigned int
ceph_full_name_hash(const unsigned char *name, unsigned int len)
{
unsigned long hash = ceph_init_name_hash();
while (len--)
hash = ceph_partial_name_hash(*name++, hash);
return ceph_end_name_hash(hash);
}
/*********************************************
* message types
*/
@ -394,10 +431,14 @@ struct ceph_mds_session_head {
* mds ops.
* & 0x1000 -> write op
* & 0x10000 -> follow symlink (e.g. stat(), not lstat()).
& & 0x100000 -> use weird ino/path trace
*/
#define CEPH_MDS_OP_WRITE 0x1000
#define CEPH_MDS_OP_WRITE 0x01000
#define CEPH_MDS_OP_FOLLOW_LINK 0x10000
#define CEPH_MDS_OP_INO_PATH 0x100000
enum {
CEPH_MDS_OP_FINDINODE = 0x100100,
CEPH_MDS_OP_LSTAT = 0x00100,
CEPH_MDS_OP_LUTIME = 0x01101,
CEPH_MDS_OP_LCHMOD = 0x01102,
@ -442,7 +483,7 @@ struct ceph_mds_request_head {
__le32 mask;
} __attribute__ ((packed)) fstat;
struct {
ceph_frag_t frag;
__le32 frag;
} __attribute__ ((packed)) readdir;
struct {
struct ceph_timespec mtime;
@ -474,6 +515,10 @@ struct ceph_mds_request_head {
} __attribute__ ((packed)) args;
} __attribute__ ((packed));
struct ceph_inopath_item {
__le64 ino;
__le32 dname_hash;
} __attribute__ ((packed));
/* client reply */
struct ceph_mds_reply_head {

View File

@ -118,6 +118,7 @@ WRITE_RAW_ENCODER(ceph_mds_lease)
WRITE_RAW_ENCODER(ceph_mds_reply_head)
WRITE_RAW_ENCODER(ceph_mds_reply_inode)
WRITE_RAW_ENCODER(ceph_frag_tree_split)
WRITE_RAW_ENCODER(ceph_inopath_item)
WRITE_RAW_ENCODER(ceph_osd_request_head)
WRITE_RAW_ENCODER(ceph_osd_reply_head)

View File

@ -7,105 +7,106 @@ int ceph_debug_export = -1;
#define DOUT_PREFIX "export: "
#include "super.h"
/*
* fh is N tuples of
* <ino, parent's d_name.hash>
*/
int ceph_encode_fh(struct dentry *dentry, __u32 *fh, int *max_len,
int connectable)
{
struct inode *inode = dentry->d_inode;
int len = *max_len;
int len;
int type = 1;
dout(10, "encode_fh %p max_len %d%s\n", dentry, *max_len,
connectable ? " connectable":"");
if (len < 2 || (connectable && len < 4))
if (*max_len < 3 || (connectable && *max_len < 6))
return -ENOSPC;
/*
* pretty sure this is racy
*/
/* note: caller holds dentry->d_lock */
*(u64 *)fh = ceph_ino(dentry->d_inode);
fh[2] = dentry->d_name.hash;
len = 3;
*(u64 *)fh = ceph_ino(inode);
fh[3] = inode->i_generation;
if (connectable) {
struct inode *parent;
spin_lock(&dentry->d_lock);
parent = dentry->d_parent->d_inode;
*(u64 *)(fh + 3) = ceph_ino(parent);
fh[5] = parent->i_generation;
spin_unlock(&dentry->d_lock);
len = 6;
while (len + 3 <= *max_len) {
dentry = dentry->d_parent;
if (!dentry)
break;
*(u64 *)(fh + len) = ceph_ino(dentry->d_inode);
fh[len + 2] = dentry->d_name.hash;
len += 3;
type = 2;
if (IS_ROOT(dentry))
break;
}
*max_len = len;
return type;
}
struct dentry *__fh_to_dentry(struct super_block *sb, u32 *fh, int fh_len)
{
struct ceph_mds_client *mdsc = &ceph_client(sb)->mdsc;
struct inode *inode;
struct dentry *dentry;
u64 ino = *(u64 *)fh;
u32 hash = fh[2];
int err;
inode = ceph_find_inode(sb, ino);
if (!inode) {
struct ceph_mds_request *req;
derr(10, "__fh_to_dentry %llx.%x -- no inode\n", ino, hash);
req = ceph_mdsc_create_request(mdsc,
CEPH_MDS_OP_FINDINODE,
fh_len/3, (char *)fh, 0, 0);
if (IS_ERR(req))
return ERR_PTR(PTR_ERR(req));
err = ceph_mdsc_do_request(mdsc, req);
ceph_mdsc_put_request(req);
inode = ceph_find_inode(sb, ino);
if (!inode)
return ERR_PTR(err ? err : -ESTALE);
}
dentry = d_alloc_anon(inode);
if (!dentry) {
derr(10, "__fh_to_dentry %llx.%x -- inode %p but ENOMEM\n",
ino, hash, inode);
iput(inode);
return ERR_PTR(-ENOMEM);
}
dout(10, "__fh_to_dentry %llx.%x -- inode %p dentry %p\n", ino, hash,
inode, dentry);
return dentry;
}
struct dentry *ceph_fh_to_dentry(struct super_block *sb, struct fid *fid,
int fh_len, int fh_type)
{
u32 *fh = fid->raw;
u64 ino = *(u64 *)fh;
u32 gen = fh[2];
struct inode *inode;
struct dentry *dentry;
inode = ceph_find_inode(sb, ino);
if (!inode) {
derr(10, "fh_to_dentry %llx.%d -- no inode\n", ino, gen);
return ERR_PTR(-ESTALE);
}
if (inode->i_generation != fh[2]) {
derr(10, "fh_to_dentry %llx.%d -- %p gen is %d\n", ino, gen,
inode, inode->i_generation);
iput(inode);
return ERR_PTR(-ESTALE);
}
dentry = d_alloc_anon(inode);
if (!dentry) {
derr(10, "fh_to_dentry %llx.%d -- inode %p but ENOMEM\n",
ino, gen, inode);
iput(inode);
return ERR_PTR(-ENOMEM);
}
dout(10, "fh_to_dentry %llx.%d -- inode %p dentry %p\n", ino, gen,
inode, dentry);
return dentry;
return __fh_to_dentry(sb, fh, fh_len);
}
struct dentry *ceph_fh_to_parent(struct super_block *sb, struct fid *fid,
int fh_len, int fh_type)
{
u32 *fh = fid->raw;
u64 ino = *(u64 *)(fh + 3);
u32 gen;
struct inode *inode;
struct dentry *dentry;
u64 ino = *(u64 *)fh;
u32 hash = fh[2];
derr(10, "fh_to_parent %llx.%x\n", ino, hash);
if (fh_len < 6)
return ERR_PTR(-ESTALE);
gen = fh[5];
inode = ceph_find_inode(sb, ino);
if (!inode) {
derr(10, "fh_to_parent %llx.%d -- no inode\n", ino, gen);
return ERR_PTR(-ESTALE);
}
if (inode->i_generation != gen) {
derr(10, "fh_to_parent %llx.%d -- %p gen is %d\n", ino, gen,
inode, inode->i_generation);
iput(inode);
return ERR_PTR(-ESTALE);
}
dentry = d_alloc_anon(inode);
if (!dentry) {
derr(10, "fh_to_parent %llx.%d -- inode %p but ENOMEM\n",
ino, gen, inode);
iput(inode);
return ERR_PTR(-ENOMEM);
}
dout(10, "fh_to_parent %llx.%d -- inode %p dentry %p\n", ino, gen,
inode, dentry);
return dentry;
return __fh_to_dentry(sb, fh + 3, fh_len - 3);
}
const struct export_operations ceph_export_ops = {

View File

@ -20,6 +20,7 @@ int ceph_debug_mdsc = -1;
const char *ceph_mds_op_name(int op)
{
switch (op) {
case CEPH_MDS_OP_FINDINODE: return "findinode";
case CEPH_MDS_OP_STAT: return "stat";
case CEPH_MDS_OP_LSTAT: return "lstat";
case CEPH_MDS_OP_UTIME: return "utime";
@ -855,6 +856,11 @@ bad:
/* exported functions */
/*
* slight hacky weirdness: if op is a FINDINODE, ino1 is the _length_
* of path1, and path1 isn't null terminated (it's an nfs filehandle
* fragment). path2 is not used.
*/
struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op,
ceph_ino_t ino1, const char *path1,
@ -866,11 +872,15 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op,
void *p, *end;
int pathlen;
pathlen = 2*(sizeof(ino1) + sizeof(__u32));
if (path1)
pathlen += strlen(path1);
if (path2)
pathlen += strlen(path2);
if (op == CEPH_MDS_OP_FINDINODE)
pathlen = sizeof(u32) + ino1*sizeof(struct ceph_inopath_item);
else {
pathlen = 2*(sizeof(ino1) + sizeof(__u32));
if (path1)
pathlen += strlen(path1);
if (path2)
pathlen += strlen(path2);
}
msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST,
sizeof(struct ceph_mds_request_head) + pathlen,
@ -898,14 +908,24 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op,
head->caller_gid = cpu_to_le32(current->egid);
/* encode paths */
ceph_encode_filepath(&p, end, ino1, path1);
ceph_encode_filepath(&p, end, ino2, path2);
if (op == CEPH_MDS_OP_FINDINODE) {
derr(10,"p %p\n", p);
ceph_encode_32(&p, ino1);
memcpy(p, path1, ino1 * sizeof(struct ceph_inopath_item));
p += ino1 * sizeof(struct ceph_inopath_item);
derr(10, " p %p end %p len %d\n", p, end, (int)ino1);
} else {
ceph_encode_filepath(&p, end, ino1, path1);
ceph_encode_filepath(&p, end, ino2, path2);
if (path1)
dout(10, "create_request path1 %llx/%s\n",
ino1, path1);
if (path2)
dout(10, "create_request path2 %llx/%s\n",
ino2, path2);
}
dout(10, "create_request op %d=%s -> %p\n", op,
ceph_mds_op_name(op), req);
if (path1)
dout(10, "create_request path1 %llx/%s\n", ino1, path1);
if (path2)
dout(10, "create_request path2 %llx/%s\n", ino2, path2);
BUG_ON(p != end);
return req;

View File

@ -139,8 +139,8 @@ frag_t CInode::pick_dirfrag(const string& dn)
if (dirfragtree.empty())
return frag_t(); // avoid the string hash if we can.
static hash<string> H;
return dirfragtree[H(dn)];
__u32 h = ceph_full_name_hash((const unsigned char *)dn.data(), dn.length());
return dirfragtree[h];
}
void CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)

View File

@ -4166,6 +4166,78 @@ bool MDCache::path_is_mine(filepath& path)
return cur->is_auth();
}
int MDCache::inopath_traverse(MDRequest *mdr, vector<ceph_inopath_item> &inopath)
{
dout(10) << "inopath_traverse mdr " << *mdr << " inopath " << inopath << dendl;
// find first...
int i;
CInode *cur = 0;
for (i=0; i<(int)inopath.size(); i++) {
cur = get_inode(inodeno_t(inopath[i].ino));
if (cur) break;
dout(10) << " don't have " << inopath[i].ino << dendl;
}
if (!cur)
return -ESTALE;
if (i == 0) {
dout(10) << " found " << *cur << dendl;
mdr->pin(cur);
mdr->ref = cur;
return 0; // yay
}
dout(10) << " have ancestor " << *cur << dendl;
// load up subdir
if (!cur->is_dir())
return -ENOTDIR;
frag_t fg = cur->dirfragtree[frag_t(inopath[i].dname_hash)];
dout(10) << " hash " << inopath[i].dname_hash << " is frag " << fg << dendl;
CDir *curdir = cur->get_dirfrag(fg);
if (!curdir) {
if (cur->is_auth()) {
// parent dir frozen_dir?
if (cur->is_frozen_dir()) {
dout(7) << "inopath_traverse: " << *cur->get_parent_dir() << " is frozen_dir, waiting" << dendl;
cur->get_parent_dn()->get_dir()->add_waiter(CDir::WAIT_UNFREEZE, _get_waiter(mdr, 0));
return 1;
}
curdir = cur->get_or_open_dirfrag(this, fg);
} else {
open_remote_dirfrag(cur, fg, _get_waiter(mdr, 0));
return 1;
}
}
assert(curdir);
// forward to dir auth?
if (!curdir->is_auth()) {
if (curdir->is_ambiguous_auth()) {
// wait
dout(7) << "traverse: waiting for single auth in " << *curdir << dendl;
curdir->add_waiter(CDir::WAIT_SINGLEAUTH, _get_waiter(mdr, 0));
return 1;
}
request_forward(mdr, curdir->authority().first);
return 2;
}
if (curdir->is_complete())
return -ESTALE; // give up? :( we _could_ try other frags...
touch_inode(cur);
curdir->fetch(_get_waiter(mdr, 0));
return 1;
}
/**
* path_traverse_to_dir -- traverse to deepest dir we have
*

View File

@ -613,6 +613,8 @@ public:
return path_is_mine(path);
}
CDir *path_traverse_to_dir(filepath& path);
int inopath_traverse(MDRequest *mdr, vector<ceph_inopath_item>& inopath);
void open_remote_dirfrag(CInode *diri, frag_t fg, Context *fin);
CInode *get_dentry_inode(CDentry *dn, MDRequest *mdr);

View File

@ -724,6 +724,9 @@ void Server::dispatch_client_request(MDRequest *mdr)
assert(mdr->more()->waiting_on_slave.empty());
switch (req->get_op()) {
case CEPH_MDS_OP_FINDINODE:
handle_client_findinode(mdr);
break;
// inodes ops.
case CEPH_MDS_OP_STAT:
@ -1565,6 +1568,18 @@ void Server::handle_client_stat(MDRequest *mdr)
}
void Server::handle_client_findinode(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request;
int r = mdcache->inopath_traverse(mdr, req->inopath);
if (r > 0)
return; // delayed
dout(10) << "reply to findinode on " << *mdr->ref << dendl;
MClientReply *reply = new MClientReply(req, r);
reply_request(mdr, reply);
}
// ===============================================================================

View File

@ -103,6 +103,7 @@ public:
// requests on existing inodes.
void handle_client_stat(MDRequest *mdr);
void handle_client_findinode(MDRequest *mdr);
void handle_client_utime(MDRequest *mdr);
void handle_client_chmod(MDRequest *mdr);
void handle_client_chown(MDRequest *mdr);

View File

@ -46,6 +46,7 @@
static inline const char* ceph_mds_op_name(int op) {
switch (op) {
case CEPH_MDS_OP_FINDINODE: return "findinode";
case CEPH_MDS_OP_STAT: return "stat";
case CEPH_MDS_OP_LSTAT: return "lstat";
case CEPH_MDS_OP_UTIME: return "utime";
@ -72,12 +73,17 @@ static inline const char* ceph_mds_op_name(int op) {
// metadata ops.
static inline ostream& operator<<(ostream &out, const ceph_inopath_item &i) {
return out << i.ino << "." << i.dname_hash;
}
class MClientRequest : public Message {
public:
struct ceph_mds_request_head head;
// path arguments
filepath path, path2;
vector<ceph_inopath_item> inopath;
public:
// cons
@ -159,14 +165,22 @@ public:
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(head, p);
::decode(path, p);
::decode(path2, p);
if (head.op == CEPH_MDS_OP_FINDINODE) {
::decode(inopath, p);
} else {
::decode(path, p);
::decode(path2, p);
}
}
void encode_payload() {
::encode(head, payload);
::encode(path, payload);
::encode(path2, payload);
if (head.op == CEPH_MDS_OP_FINDINODE) {
::encode(path, payload);
::encode(path2, payload);
} else {
::encode(inopath, payload);
}
}
const char *get_type_name() { return "creq"; }
@ -175,9 +189,11 @@ public:
<< "." << get_tid()
<< " " << ceph_mds_op_name(get_op());
//if (!get_filepath().empty())
out << " " << get_filepath();
out << " " << get_filepath();
if (!get_filepath2().empty())
out << " " << get_filepath2();
if (!inopath.empty())
out << " " << inopath;
if (head.retry_attempt)
out << " RETRY=" << head.retry_attempt;
out << ")";