kclient: revalidate dentries while constructing paths.

This commit is contained in:
Sage Weil 2008-06-16 21:04:28 -07:00
parent 7e6326227b
commit bb367cbf45
6 changed files with 67 additions and 49 deletions

View File

@ -1,6 +1,5 @@
v0.3
- client rsync bug
- document nested accounting
v0.4
- ENOSPC
@ -8,7 +7,7 @@ v0.4
big items
- ENOSPC
- quotas
- accounting
/ - accounting
- enforcement
- rados cow/snapshot infrastructure
- mds snapshots

View File

@ -10,12 +10,24 @@ const struct inode_operations ceph_dir_iops;
const struct file_operations ceph_dir_fops;
struct dentry_operations ceph_dentry_ops;
static int ceph_dentry_revalidate(struct dentry *dentry, struct nameidata *nd);
/*
* build a dentry's path, relative to sb root. allocate on
* heap; caller must kfree.
* (based on build_path_from_dentry in fs/cifs/dir.c)
* build a dentry's path. allocate on heap; caller must kfree. based
* on build_path_from_dentry in fs/cifs/dir.c.
*
* stop path construction as soon as we hit a dentry we do not have a
* valid lease over. races aside, this ensures we describe the
* operation relative to a base inode that is likely to be cached by
* the MDS, using a relative path that is known to be valid (e.g., not
* munged up by a directory rename on another client).
*
* this is, unfortunately, both racy and inefficient. dentries are
* revalidated during path traversal, and revalidated _again_ when we
* reconstruct the reverse path. lame. unfortunately the VFS doesn't
* tell us the path it traversed, so i'm not sure we can do any better.
*/
char *ceph_build_dentry_path(struct dentry *dentry, int *plen)
char *ceph_build_dentry_path(struct dentry *dentry, int *plen, __u64 *base)
{
struct dentry *temp;
char *path;
@ -27,6 +39,9 @@ char *ceph_build_dentry_path(struct dentry *dentry, int *plen)
retry:
len = 0;
for (temp = dentry; !IS_ROOT(temp);) {
if (temp->d_inode &&
!ceph_dentry_revalidate(temp, 0))
break;
len += 1 + temp->d_name.len;
temp = temp->d_parent;
if (temp == NULL) {
@ -42,7 +57,7 @@ retry:
return ERR_PTR(-ENOMEM);
pos = len;
path[pos] = 0; /* trailing null */
for (temp = dentry; !IS_ROOT(temp);) {
for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
pos -= temp->d_name.len;
if (pos < 0) {
break;
@ -63,7 +78,7 @@ retry:
}
if (pos != 0) {
derr(1, "did not end path lookup where expected, "
"namelen is %d\n", len);
"namelen is %d, pos is %d\n", len, pos);
/* presumably this is only possible if racing with a
rename of one of the parent directories (we can not
lock the dentries above us to prevent this, but
@ -72,9 +87,10 @@ retry:
goto retry;
}
dout(10, "build_path_dentry on %p %d build '%.*s'\n",
dentry, atomic_read(&dentry->d_count), len, path);
*base = ceph_ino(temp->d_inode);
*plen = len;
dout(10, "build_path_dentry on %p %d built %llx '%.*s'\n",
dentry, atomic_read(&dentry->d_count), *base, len, path);
return path;
}
@ -257,12 +273,12 @@ struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry,
dentry, USE_CAP_MDS);
} else {
/* build path */
path = ceph_build_dentry_path(dentry, &pathlen);
u64 pathbase;
path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
if (IS_ERR(path))
return ERR_PTR(PTR_ERR(path));
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LSTAT,
ceph_ino(sb->s_root->d_inode),
path, 0, 0,
pathbase, path, 0, 0,
dentry, USE_ANY_MDS);
kfree(path);
}
@ -327,16 +343,16 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
struct ceph_mds_request_head *rhead;
char *path;
int pathlen;
u64 pathbase;
int err;
dout(5, "dir_mknod in dir %p dentry %p mode 0%o rdev %d\n",
dir, dentry, mode, rdev);
path = ceph_build_dentry_path(dentry, &pathlen);
path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
if (IS_ERR(path))
return PTR_ERR(path);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD,
ceph_ino(dir->i_sb->s_root->d_inode),
path, 0, 0,
pathbase, path, 0, 0,
dentry, USE_AUTH_MDS);
kfree(path);
if (IS_ERR(req)) {
@ -394,15 +410,15 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
struct ceph_mds_request *req;
char *path;
int pathlen;
u64 pathbase;
int err;
dout(5, "dir_symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
path = ceph_build_dentry_path(dentry, &pathlen);
path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
if (IS_ERR(path))
return PTR_ERR(path);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK,
ceph_ino(dir->i_sb->s_root->d_inode),
path, 0, dest,
pathbase, path, 0, dest,
dentry, USE_AUTH_MDS);
kfree(path);
if (IS_ERR(req)) {
@ -428,15 +444,15 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode)
struct ceph_mds_request_head *rhead;
char *path;
int pathlen;
u64 pathbase;
int err;
dout(5, "dir_mkdir in dir %p dentry %p mode 0%o\n", dir, dentry, mode);
path = ceph_build_dentry_path(dentry, &pathlen);
path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
if (IS_ERR(path))
return PTR_ERR(path);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKDIR,
ceph_ino(dir->i_sb->s_root->d_inode),
path, 0, 0,
pathbase, path, 0, 0,
dentry, USE_AUTH_MDS);
kfree(path);
if (IS_ERR(req)) {
@ -464,23 +480,22 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
struct ceph_mds_request *req;
char *oldpath, *path;
int oldpathlen, pathlen;
u64 oldpathbase, pathbase;
int err;
dout(5, "dir_link in dir %p old_dentry %p dentry %p\n", dir,
old_dentry, dentry);
oldpath = ceph_build_dentry_path(old_dentry, &oldpathlen);
oldpath = ceph_build_dentry_path(old_dentry, &oldpathlen, &oldpathbase);
if (IS_ERR(oldpath))
return PTR_ERR(oldpath);
path = ceph_build_dentry_path(dentry, &pathlen);
path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
if (IS_ERR(path)) {
kfree(oldpath);
return PTR_ERR(path);
}
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK,
ceph_ino(dir->i_sb->s_root->d_inode),
path,
ceph_ino(dir->i_sb->s_root->d_inode),
oldpath,
pathbase, path,
oldpathbase, oldpath,
dentry, USE_AUTH_MDS);
kfree(oldpath);
kfree(path);
@ -518,18 +533,18 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
struct ceph_mds_request *req;
char *path;
int pathlen;
u64 pathbase;
int err;
int op = ((dentry->d_inode->i_mode & S_IFMT) == S_IFDIR) ?
CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
dout(5, "dir_unlink/rmdir in dir %p dentry %p inode %p\n",
dir, dentry, inode);
path = ceph_build_dentry_path(dentry, &pathlen);
path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
if (IS_ERR(path))
return PTR_ERR(path);
req = ceph_mdsc_create_request(mdsc, op,
ceph_ino(dir->i_sb->s_root->d_inode),
path, 0, 0,
pathbase, path, 0, 0,
dentry, USE_AUTH_MDS);
kfree(path);
if (IS_ERR(req))
@ -557,24 +572,24 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
struct ceph_client *client = ceph_sb_to_client(old_dir->i_sb);
struct ceph_mds_client *mdsc = &client->mdsc;
struct ceph_mds_request *req;
struct dentry *root = old_dir->i_sb->s_root;
char *oldpath, *newpath;
int oldpathlen, newpathlen;
u64 oldpathbase, newpathbase;
int err;
dout(5, "dir_rename in dir %p dentry %p to dir %p dentry %p\n",
old_dir, old_dentry, new_dir, new_dentry);
oldpath = ceph_build_dentry_path(old_dentry, &oldpathlen);
oldpath = ceph_build_dentry_path(old_dentry, &oldpathlen, &oldpathbase);
if (IS_ERR(oldpath))
return PTR_ERR(oldpath);
newpath = ceph_build_dentry_path(new_dentry, &newpathlen);
newpath = ceph_build_dentry_path(new_dentry, &newpathlen, &newpathbase);
if (IS_ERR(newpath)) {
kfree(oldpath);
return PTR_ERR(newpath);
}
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME,
ceph_ino(root->d_inode), oldpath,
ceph_ino(root->d_inode), newpath,
oldpathbase, oldpath,
newpathbase, newpath,
new_dentry, USE_AUTH_MDS);
kfree(oldpath);
kfree(newpath);

View File

@ -33,8 +33,7 @@ prepare_open_request(struct super_block *sb, struct dentry *dentry,
dout(5, "prepare_open_request dentry %p name '%s' flags %d\n", dentry,
dentry->d_name.name, flags);
pathbase = ceph_ino(sb->s_root->d_inode);
path = ceph_build_dentry_path(dentry, &pathlen);
path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
if (IS_ERR(path))
return ERR_PTR(PTR_ERR(path));
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_OPEN, pathbase, path,

View File

@ -613,6 +613,9 @@ int ceph_dentry_lease_valid(struct dentry *dentry)
* changes needed to properly reflect the completed operation (e.g.,
* call d_move). make note of the distribution of metadata across the
* mds cluster.
*
* FIXME: we should check inode.version to avoid races between traces
* from multiple MDSs after, say, a ancestor directory is renamed.
*/
int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
struct ceph_mds_session *session)
@ -1696,7 +1699,7 @@ static struct ceph_mds_request *prepare_setattr(struct ceph_mds_client *mdsc,
char *path;
int pathlen;
struct ceph_mds_request *req;
__u64 baseino = ceph_ino(dentry->d_inode->i_sb->s_root->d_inode);
u64 pathbase;
if (ia_valid & ATTR_FILE) {
dout(5, "prepare_setattr dentry %p (inode %llx)\n", dentry,
@ -1707,10 +1710,10 @@ static struct ceph_mds_request *prepare_setattr(struct ceph_mds_client *mdsc,
dentry, USE_CAP_MDS);
} else {
dout(5, "prepare_setattr dentry %p (full path)\n", dentry);
path = ceph_build_dentry_path(dentry, &pathlen);
path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
if (IS_ERR(path))
return ERR_PTR(PTR_ERR(path));
req = ceph_mdsc_create_request(mdsc, op, baseino, path, 0, 0,
req = ceph_mdsc_create_request(mdsc, op, pathbase, path, 0, 0,
dentry, USE_ANY_MDS);
kfree(path);
}
@ -2135,6 +2138,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
struct ceph_mds_request_head *rhead;
char *path;
int pathlen;
u64 pathbase;
int err;
int i, nr_pages;
struct page **pages = 0;
@ -2164,12 +2168,11 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
}
/* do request */
path = ceph_build_dentry_path(dentry, &pathlen);
path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
if (IS_ERR(path))
return PTR_ERR(path);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LSETXATTR,
ceph_ino(dentry->d_sb->s_root->d_inode),
path, 0, name,
pathbase, path, 0, name,
dentry, USE_AUTH_MDS);
kfree(path);
if (IS_ERR(req))
@ -2203,18 +2206,18 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
struct ceph_mds_request *req;
char *path;
int pathlen;
u64 pathbase;
int err;
/* only support user.* xattrs, for now */
if (strncmp(name, "user.", 5) != 0)
return -EOPNOTSUPP;
path = ceph_build_dentry_path(dentry, &pathlen);
path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
if (IS_ERR(path))
return PTR_ERR(path);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LRMXATTR,
ceph_ino(dentry->d_sb->s_root->d_inode),
path, 0, name,
pathbase, path, 0, name,
dentry, USE_AUTH_MDS);
kfree(path);
if (IS_ERR(req))

View File

@ -1274,6 +1274,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
struct ceph_inode_cap *cap;
char *path;
int pathlen, err;
u64 pathbase;
struct dentry *dentry;
struct ceph_inode_info *ci;
struct ceph_mds_cap_reconnect *rec;
@ -1347,7 +1348,8 @@ retry:
dentry = d_find_alias(&ci->vfs_inode);
if (dentry) {
path = ceph_build_dentry_path(dentry, &pathlen);
path = ceph_build_dentry_path(dentry, &pathlen,
&pathbase);
if (IS_ERR(path)) {
err = PTR_ERR(path);
BUG_ON(err);

View File

@ -499,7 +499,7 @@ extern const struct inode_operations ceph_dir_iops;
extern const struct file_operations ceph_dir_fops;
extern struct dentry_operations ceph_dentry_ops;
extern char *ceph_build_dentry_path(struct dentry *dentry, int *len);
extern char *ceph_build_dentry_path(struct dentry *dn, int *len, __u64 *base);
extern struct dentry *ceph_do_lookup(struct super_block *sb,
struct dentry *dentry,
int mask, int on_inode);