kclient: use reference counted ceph_buffer for xattr blob, msg middle

This commit is contained in:
Sage Weil 2009-08-18 14:59:06 -07:00
parent b2bf914cc5
commit 81a8fe8316
10 changed files with 230 additions and 156 deletions

View File

@ -450,6 +450,7 @@ noinst_HEADERS = \
kernel/Kconfig\
kernel/Makefile\
kernel/addr.c\
kernel/buffer.h\
kernel/caps.c\
kernel/ceph_debug.h\
kernel/ceph_fs.h\

93
src/kernel/buffer.h Normal file
View File

@ -0,0 +1,93 @@
#ifndef __FS_CEPH_BUFFER_H
#define __FS_CEPH_BUFFER_H
#include <linux/mm.h>
#include <linux/types.h>
#include <linux/vmalloc.h>
#include "ceph_debug.h"
/*
* a simple reference counted buffer.
*
* use kmalloc for small sizes (<= one page), vmalloc for larger
* sizes.
*/
struct ceph_buffer {
atomic_t nref;
struct kvec vec;
size_t alloc_len;
bool is_static, is_vmalloc;
};
static inline void ceph_buffer_init_static(struct ceph_buffer *b)
{
atomic_set(&b->nref, 1);
b->vec.iov_base = NULL;
b->vec.iov_len = 0;
b->alloc_len = 0;
b->is_static = true;
}
static inline struct ceph_buffer *ceph_buffer_new(gfp_t gfp)
{
struct ceph_buffer *b;
b = kmalloc(sizeof(*b), gfp);
if (!b)
return NULL;
atomic_set(&b->nref, 1);
b->vec.iov_base = NULL;
b->vec.iov_len = 0;
b->alloc_len = 0;
b->is_static = false;
return b;
}
static inline int ceph_buffer_alloc(struct ceph_buffer *b, int len, gfp_t gfp)
{
if (len <= PAGE_SIZE) {
b->vec.iov_base = kmalloc(len, gfp);
b->is_vmalloc = false;
} else {
b->vec.iov_base = __vmalloc(len, gfp, PAGE_KERNEL);
b->is_vmalloc = true;
}
if (!b->vec.iov_base)
return -ENOMEM;
b->alloc_len = len;
b->vec.iov_len = len;
return 0;
}
static inline struct ceph_buffer *ceph_buffer_get(struct ceph_buffer *b)
{
atomic_inc(&b->nref);
return b;
}
static inline void ceph_buffer_put(struct ceph_buffer *b)
{
if (b && atomic_dec_and_test(&b->nref)) {
if (b->vec.iov_base) {
if (b->is_vmalloc)
vfree(b->vec.iov_base);
else
kfree(b->vec.iov_base);
}
kfree(b);
}
}
static inline struct ceph_buffer *ceph_buffer_new_alloc(int len, gfp_t gfp)
{
struct ceph_buffer *b = ceph_buffer_new(gfp);
if (b && ceph_buffer_alloc(b, len, gfp) < 0) {
ceph_buffer_put(b);
b = NULL;
}
return b;
}
#endif

View File

@ -1,6 +1,7 @@
#include <linux/fs.h>
#include <linux/kernel.h>
#include <linux/sched.h>
#include <linux/vmalloc.h>
#include <linux/wait.h>
#include "ceph_debug.h"
@ -846,7 +847,7 @@ static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, u64 cid, int op,
u64 time_warp_seq,
uid_t uid, gid_t gid, mode_t mode,
u64 xattr_version,
void *xattrs_blob, int xattrs_blob_size,
struct ceph_buffer *xattrs_buf,
u64 follows, int mds)
{
struct ceph_mds_caps *fc;
@ -858,10 +859,9 @@ static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, u64 cid, int op,
cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
ceph_cap_string(dirty),
seq, issue_seq, mseq, follows, size, max_size,
xattr_version, xattrs_blob_size);
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + xattrs_blob_size,
0, 0, NULL);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, NULL);
if (IS_ERR(msg))
return;
@ -894,12 +894,10 @@ static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, u64 cid, int op,
fc->mode = cpu_to_le32(mode);
fc->xattr_version = cpu_to_le64(xattr_version);
if (xattrs_blob) {
char *dst = (char *)fc;
dst += sizeof(*fc);
fc->xattr_len = cpu_to_le32(xattrs_blob_size);
memcpy(dst, xattrs_blob, xattrs_blob_size);
if (xattrs_buf) {
msg->middle = ceph_buffer_get(xattrs_buf);
fc->xattr_len = cpu_to_le32(xattrs_buf->vec.iov_len);
msg->hdr.middle_len = cpu_to_le32(xattrs_buf->vec.iov_len);
}
ceph_send_msg_mds(mdsc, msg, mds);
@ -997,8 +995,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
uid_t uid;
gid_t gid;
int mds = cap->session->s_mds;
void *xattrs_blob = NULL;
int xattrs_blob_size = 0;
u64 xattr_version = 0;
int delayed = 0;
u64 flush_tid = 0;
@ -1071,9 +1067,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
mode = inode->i_mode;
if (dropping & CEPH_CAP_XATTR_EXCL) {
__ceph_build_xattrs_blob(ci, &xattrs_blob, &xattrs_blob_size);
ci->i_xattrs.prealloc_blob = NULL;
ci->i_xattrs.prealloc_size = 0;
__ceph_build_xattrs_blob(ci);
xattr_version = ci->i_xattrs.version + 1;
}
@ -1090,11 +1084,9 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
size, max_size, &mtime, &atime, time_warp_seq,
uid, gid, mode,
xattr_version,
xattrs_blob, xattrs_blob_size,
(flushing & CEPH_CAP_XATTR_EXCL) ? ci->i_xattrs.blob : NULL,
follows, mds);
kfree(xattrs_blob);
if (wake)
wake_up(&ci->i_cap_wq);
@ -1184,7 +1176,7 @@ retry:
&capsnap->mtime, &capsnap->atime,
capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
0, NULL, 0,
0, NULL,
capsnap->follows, mds);
next_follows = capsnap->follows + 1;
@ -2058,7 +2050,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
struct ceph_mds_session *session,
struct ceph_cap *cap,
void **xattr_data)
struct ceph_buffer *xattr_buf)
__releases(inode->i_lock)
{
@ -2148,15 +2140,13 @@ start:
int len = le32_to_cpu(grant->xattr_len);
u64 version = le64_to_cpu(grant->xattr_version);
if (!(len > 4 && *xattr_data == NULL) && /* ENOMEM in caller */
version > ci->i_xattrs.version) {
if (version > ci->i_xattrs.version) {
dout(" got new xattrs v%llu on %p len %d\n",
version, inode, len);
kfree(ci->i_xattrs.data);
ci->i_xattrs.len = len;
if (ci->i_xattrs.blob)
ceph_buffer_put(ci->i_xattrs.blob);
ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
ci->i_xattrs.version = version;
ci->i_xattrs.data = *xattr_data;
*xattr_data = NULL;
}
}
@ -2528,7 +2518,6 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
u64 cap_id;
u64 size, max_size;
int check_caps = 0;
void *xattr_data = NULL;
int r;
dout("handle_caps from mds%d\n", mds);
@ -2579,16 +2568,12 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
case CEPH_CAP_OP_IMPORT:
handle_cap_import(mdsc, inode, h, session,
msg->front.iov_base + sizeof(*h),
msg->middle,
le32_to_cpu(h->snap_trace_len));
check_caps = 1; /* we may have sent a RELEASE to the old auth */
goto done;
}
/* preallocate space for xattrs? */
if (le32_to_cpu(h->xattr_len) > 4)
xattr_data = kmalloc(le32_to_cpu(h->xattr_len), GFP_NOFS);
/* the rest require a cap */
spin_lock(&inode->i_lock);
cap = __get_cap_for_mds(ceph_inode(inode), mds);
@ -2603,7 +2588,7 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
switch (op) {
case CEPH_CAP_OP_REVOKE:
case CEPH_CAP_OP_GRANT:
r = handle_cap_grant(inode, h, session, cap, &xattr_data);
r = handle_cap_grant(inode, h, session, cap, msg->middle);
if (r == 1) {
dout(" sending reply back to mds%d\n", mds);
ceph_msg_get(msg);
@ -2633,7 +2618,6 @@ done:
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
kfree(xattr_data);
if (check_caps)
ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, NULL);
if (inode)

View File

@ -64,6 +64,17 @@ monitor clients, and the messaging layer.
EOF
git add $target/ceph/buffer.h
git commit -s -F - <<EOF
ceph: ref counted buffer
struct ceph_buffer is a simple ref-counted buffer. We transparently
choose between kmalloc for small buffers and vmalloc for large ones.
This is used for allocating memory for xattr data, among other things.
EOF
git add $target/ceph/super.c
git commit -s -F - <<EOF
ceph: super.c

View File

@ -7,6 +7,7 @@
#include <linux/kernel.h>
#include <linux/namei.h>
#include <linux/writeback.h>
#include <linux/vmalloc.h>
#include "ceph_debug.h"
#include "super.h"
@ -273,17 +274,15 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_fragtree = RB_ROOT;
mutex_init(&ci->i_fragtree_mutex);
ci->i_xattrs.xattrs = RB_ROOT;
ci->i_xattrs.len = 0;
ci->i_xattrs.version = 0;
ci->i_xattrs.index_version = 0;
ci->i_xattrs.data = NULL;
ci->i_xattrs.blob = NULL;
ci->i_xattrs.prealloc_blob = NULL;
ci->i_xattrs.dirty = false;
ci->i_xattrs.index = RB_ROOT;
ci->i_xattrs.count = 0;
ci->i_xattrs.names_size = 0;
ci->i_xattrs.vals_size = 0;
ci->i_xattrs.prealloc_blob = NULL;
ci->i_xattrs.prealloc_size = 0;
ci->i_xattrs.dirty = 0;
ci->i_xattrs.version = 0;
ci->i_xattrs.index_version = 0;
ci->i_caps = RB_ROOT;
ci->i_auth_cap = NULL;
@ -358,8 +357,11 @@ void ceph_destroy_inode(struct inode *inode)
rb_erase(n, &ci->i_fragtree);
kfree(frag);
}
kfree(ci->i_xattrs.data);
__destroy_xattrs(ci);
ceph_buffer_put(ci->i_xattrs.blob);
ceph_buffer_put(ci->i_xattrs.prealloc_blob);
kmem_cache_free(ceph_inode_cachep, ci);
}
@ -486,7 +488,7 @@ static int fill_inode(struct inode *inode,
int issued, implemented;
struct timespec mtime, atime, ctime;
u32 nsplits;
void *xattr_data = NULL;
struct ceph_buffer *xattr_blob = NULL;
int err = 0;
int queue_trunc = 0;
@ -499,11 +501,11 @@ static int fill_inode(struct inode *inode,
* if len > 4 (meaning there are actually xattrs; the first 4
* bytes are the xattr count).
*/
if (iinfo->xattr_len > 4 && iinfo->xattr_len != ci->i_xattrs.len) {
xattr_data = kmalloc(iinfo->xattr_len, GFP_NOFS);
if (!xattr_data)
if (iinfo->xattr_len > 4) {
xattr_blob = ceph_buffer_new_alloc(iinfo->xattr_len, GFP_NOFS);
if (!xattr_blob)
pr_err("ceph fill_inode ENOMEM xattr blob %d bytes\n",
ci->i_xattrs.len);
iinfo->xattr_len);
}
spin_lock(&inode->i_lock);
@ -555,18 +557,15 @@ static int fill_inode(struct inode *inode,
/* xattrs */
/* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
if (iinfo->xattr_len && (issued & CEPH_CAP_XATTR_EXCL) == 0 &&
if ((issued & CEPH_CAP_XATTR_EXCL) == 0 &&
le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
if (ci->i_xattrs.len != iinfo->xattr_len) {
kfree(ci->i_xattrs.data);
ci->i_xattrs.len = iinfo->xattr_len;
ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
ci->i_xattrs.data = xattr_data;
xattr_data = NULL;
}
if (ci->i_xattrs.len > 4)
memcpy(ci->i_xattrs.data, iinfo->xattr_data,
ci->i_xattrs.len);
if (ci->i_xattrs.blob)
ceph_buffer_put(ci->i_xattrs.blob);
ci->i_xattrs.blob = xattr_blob;
if (xattr_blob)
memcpy(ci->i_xattrs.blob->vec.iov_base,
iinfo->xattr_data, iinfo->xattr_len);
ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
}
inode->i_mapping->a_ops = &ceph_aops;
@ -686,7 +685,7 @@ no_change:
err = 0;
out:
kfree(xattr_data);
ceph_buffer_put(xattr_blob);
return err;
}
@ -1699,7 +1698,7 @@ static int __set_xattr(struct ceph_inode_info *ci,
int c;
int new = 0;
p = &ci->i_xattrs.xattrs.rb_node;
p = &ci->i_xattrs.index.rb_node;
while (*p) {
parent = *p;
xattr = rb_entry(parent, struct ceph_inode_xattr, node);
@ -1760,7 +1759,7 @@ static int __set_xattr(struct ceph_inode_info *ci,
if (new) {
rb_link_node(&xattr->node, parent, p);
rb_insert_color(&xattr->node, &ci->i_xattrs.xattrs);
rb_insert_color(&xattr->node, &ci->i_xattrs.index);
dout("__set_xattr_val p=%p\n", p);
}
@ -1778,7 +1777,7 @@ static struct ceph_inode_xattr *__get_xattr(struct ceph_inode_info *ci,
struct ceph_inode_xattr *xattr = NULL;
int c;
p = &ci->i_xattrs.xattrs.rb_node;
p = &ci->i_xattrs.index.rb_node;
while (*p) {
parent = *p;
xattr = rb_entry(parent, struct ceph_inode_xattr, node);
@ -1817,7 +1816,7 @@ static int __remove_xattr(struct ceph_inode_info *ci,
if (!xattr)
return -EOPNOTSUPP;
rb_erase(&xattr->node, &ci->i_xattrs.xattrs);
rb_erase(&xattr->node, &ci->i_xattrs.index);
if (xattr->should_free_name)
kfree((void *)xattr->name);
@ -1839,7 +1838,7 @@ static int __remove_xattr_by_name(struct ceph_inode_info *ci,
struct ceph_inode_xattr *xattr;
int err;
p = &ci->i_xattrs.xattrs.rb_node;
p = &ci->i_xattrs.index.rb_node;
xattr = __get_xattr(ci, name);
err = __remove_xattr(ci, xattr);
return err;
@ -1851,7 +1850,7 @@ static char *__copy_xattr_names(struct ceph_inode_info *ci,
struct rb_node *p;
struct ceph_inode_xattr *xattr = NULL;
p = rb_first(&ci->i_xattrs.xattrs);
p = rb_first(&ci->i_xattrs.index);
dout("__copy_xattr_names count=%d\n", ci->i_xattrs.count);
while (p) {
@ -1874,7 +1873,7 @@ static void __destroy_xattrs(struct ceph_inode_info *ci)
struct rb_node *p, *tmp;
struct ceph_inode_xattr *xattr = NULL;
p = rb_first(&ci->i_xattrs.xattrs);
p = rb_first(&ci->i_xattrs.index);
dout("__destroy_xattrs p=%p\n", p);
@ -1884,7 +1883,7 @@ static void __destroy_xattrs(struct ceph_inode_info *ci)
p = rb_next(tmp);
dout("__destroy_xattrs next p=%p (%.*s)\n", p,
xattr->name_len, xattr->name);
rb_erase(tmp, &ci->i_xattrs.xattrs);
rb_erase(tmp, &ci->i_xattrs.index);
__free_xattr(xattr);
}
@ -1893,7 +1892,7 @@ static void __destroy_xattrs(struct ceph_inode_info *ci)
ci->i_xattrs.vals_size = 0;
ci->i_xattrs.index_version = 0;
ci->i_xattrs.count = 0;
ci->i_xattrs.xattrs = RB_ROOT;
ci->i_xattrs.index = RB_ROOT;
}
static int __build_xattrs(struct inode *inode)
@ -1909,7 +1908,8 @@ static int __build_xattrs(struct inode *inode)
int err;
int i;
dout("__build_xattrs(): ci->i_xattrs.len=%d\n", ci->i_xattrs.len);
dout("__build_xattrs() len=%d\n",
ci->i_xattrs.blob ? (int)ci->i_xattrs.blob->vec.iov_len : 0);
if (ci->i_xattrs.index_version >= ci->i_xattrs.version)
return 0; /* already built */
@ -1918,9 +1918,9 @@ static int __build_xattrs(struct inode *inode)
start:
/* updated internal xattr rb tree */
if (ci->i_xattrs.len > 4) {
p = ci->i_xattrs.data;
end = p + ci->i_xattrs.len;
if (ci->i_xattrs.blob && ci->i_xattrs.blob->vec.iov_len > 4) {
p = ci->i_xattrs.blob->vec.iov_base;
end = p + ci->i_xattrs.blob->vec.iov_len;
ceph_decode_32_safe(&p, end, numattr, bad);
xattr_version = ci->i_xattrs.version;
spin_unlock(&inode->i_lock);
@ -1965,7 +1965,7 @@ start:
kfree(xattrs);
}
ci->i_xattrs.index_version = ci->i_xattrs.version;
ci->i_xattrs.dirty = 0;
ci->i_xattrs.dirty = false;
return err;
bad_lock:
@ -2000,24 +2000,26 @@ static int __get_required_blob_size(struct ceph_inode_info *ci, int name_size,
return size;
}
void __ceph_build_xattrs_blob(struct ceph_inode_info *ci,
void **xattrs_blob,
int *blob_size)
/*
* If there are dirty xattrs, reencode xattrs into the prealloc_blob
* and swap into place.
*/
void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
{
struct rb_node *p;
struct ceph_inode_xattr *xattr = NULL;
void *dest;
dout("__build_xattrs_blob %p\n", &ci->vfs_inode);
if (ci->i_xattrs.dirty) {
int required_blob_size = __get_required_blob_size(ci, 0, 0);
int need = __get_required_blob_size(ci, 0, 0);
BUG_ON(required_blob_size > ci->i_xattrs.prealloc_size);
BUG_ON(need > ci->i_xattrs.prealloc_blob->alloc_len);
p = rb_first(&ci->i_xattrs.xattrs);
p = rb_first(&ci->i_xattrs.index);
dest = ci->i_xattrs.prealloc_blob->vec.iov_base;
dest = ci->i_xattrs.prealloc_blob;
ceph_encode_32(&dest, ci->i_xattrs.count);
while (p) {
xattr = rb_entry(p, struct ceph_inode_xattr, node);
@ -2031,13 +2033,14 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci,
p = rb_next(p);
}
*xattrs_blob = ci->i_xattrs.prealloc_blob;
*blob_size = ci->i_xattrs.prealloc_size;
} else {
/* actually, we're using the same data that we got from the
mds, don't build anything */
*xattrs_blob = NULL;
*blob_size = 0;
/* adjust buffer len; it may be larger than we need */
ci->i_xattrs.prealloc_blob->vec.iov_len =
dest - ci->i_xattrs.prealloc_blob->vec.iov_base;
ceph_buffer_put(ci->i_xattrs.blob);
ci->i_xattrs.blob = ci->i_xattrs.prealloc_blob;
ci->i_xattrs.prealloc_blob = NULL;
ci->i_xattrs.dirty = false;
}
}
@ -2237,7 +2240,6 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
struct ceph_inode_xattr *xattr = NULL;
int issued;
int required_blob_size;
void *prealloc_blob = NULL;
if (ceph_snap(inode) != CEPH_NOSNAP)
return -EROFS;
@ -2249,6 +2251,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
if (_ceph_match_vir_xattr(name) != NULL)
return -EOPNOTSUPP;
/* preallocate memory for xattr name, value, index node */
err = -ENOMEM;
newname = kmalloc(name_len + 1, GFP_NOFS);
if (!newname)
@ -2269,33 +2272,25 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
spin_lock(&inode->i_lock);
retry:
__build_xattrs(inode);
issued = __ceph_caps_issued(ci, NULL);
if (!(issued & CEPH_CAP_XATTR_EXCL))
goto do_sync;
__build_xattrs(inode);
required_blob_size = __get_required_blob_size(ci, name_len, val_len);
if (required_blob_size > ci->i_xattrs.prealloc_size) {
int prealloc_len = required_blob_size;
if (!ci->i_xattrs.prealloc_blob ||
required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
struct ceph_buffer *blob = NULL;
spin_unlock(&inode->i_lock);
dout(" required_blob_size=%d\n", required_blob_size);
prealloc_blob = kmalloc(prealloc_len, GFP_NOFS);
if (!prealloc_blob)
dout(" preaallocating new blob size=%d\n", required_blob_size);
blob = ceph_buffer_new_alloc(required_blob_size, GFP_NOFS);
if (!blob)
goto out;
spin_lock(&inode->i_lock);
required_blob_size = __get_required_blob_size(ci, name_len,
val_len);
if (prealloc_len < required_blob_size) {
/* lost a race and preallocated buffer is too small */
kfree(prealloc_blob);
} else {
kfree(ci->i_xattrs.prealloc_blob);
ci->i_xattrs.prealloc_blob = prealloc_blob;
ci->i_xattrs.prealloc_size = prealloc_len;
}
ceph_buffer_put(ci->i_xattrs.prealloc_blob);
ci->i_xattrs.prealloc_blob = blob;
goto retry;
}
@ -2303,7 +2298,7 @@ retry:
err = __set_xattr(ci, newname, name_len, newval,
val_len, 1, 1, 1, &xattr);
__ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
ci->i_xattrs.dirty = 1;
ci->i_xattrs.dirty = true;
inode->i_ctime = CURRENT_TIME;
spin_unlock(&inode->i_lock);
@ -2369,7 +2364,7 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
err = __remove_xattr_by_name(ceph_inode(inode), name);
__ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
ci->i_xattrs.dirty = 1;
ci->i_xattrs.dirty = true;
inode->i_ctime = CURRENT_TIME;
spin_unlock(&inode->i_lock);

View File

@ -658,11 +658,11 @@ static void prepare_write_message(struct ceph_connection *con)
con->out_kvec[v].iov_base = &m->hdr;
con->out_kvec[v++].iov_len = sizeof(m->hdr);
con->out_kvec[v++] = m->front;
if (m->middle.iov_len)
con->out_kvec[v++] = m->middle;
if (m->middle)
con->out_kvec[v++] = m->middle->vec;
con->out_kvec_left = v;
con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
m->middle.iov_len;
(m->middle ? m->middle->vec.iov_len : 0);
con->out_kvec_cur = con->out_kvec;
/* fill in crc (except data pages), footer */
@ -672,10 +672,10 @@ static void prepare_write_message(struct ceph_connection *con)
con->out_msg->footer.flags = 0;
con->out_msg->footer.front_crc =
cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
if (m->middle.iov_base)
if (m->middle)
con->out_msg->footer.middle_crc =
cpu_to_le32(crc32c(0, m->middle.iov_base,
m->middle.iov_len));
cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
m->middle->vec.iov_len));
else
con->out_msg->footer.middle_crc = 0;
con->out_msg->footer.data_crc = 0;
@ -1461,7 +1461,6 @@ static int read_partial_message(struct ceph_connection *con)
}
m = con->in_msg;
m->front.iov_len = 0; /* haven't read it yet */
m->middle.iov_len = 0; /* haven't read it yet */
memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
}
@ -1480,8 +1479,9 @@ static int read_partial_message(struct ceph_connection *con)
}
/* middle */
while (m->middle.iov_len < middle_len) {
if (m->middle.iov_base == NULL) {
while (middle_len > 0 && (!m->middle ||
m->middle->vec.iov_len < middle_len)) {
if (m->middle == NULL) {
BUG_ON(!con->msgr->alloc_middle);
ret = con->msgr->alloc_middle(con->msgr->parent, m);
if (ret < 0) {
@ -1493,16 +1493,18 @@ static int read_partial_message(struct ceph_connection *con)
con->in_tag = CEPH_MSGR_TAG_READY;
return 0;
}
m->middle->vec.iov_len = 0;
}
left = middle_len - m->middle.iov_len;
ret = ceph_tcp_recvmsg(con->sock, (char *)m->middle.iov_base +
m->middle.iov_len, left);
left = middle_len - m->middle->vec.iov_len;
ret = ceph_tcp_recvmsg(con->sock,
(char *)m->middle->vec.iov_base +
m->middle->vec.iov_len, left);
if (ret <= 0)
return ret;
m->middle.iov_len += ret;
if (m->middle.iov_len == middle_len)
con->in_middle_crc = crc32c(0, m->middle.iov_base,
m->middle.iov_len);
m->middle->vec.iov_len += ret;
if (m->middle->vec.iov_len == middle_len)
con->in_middle_crc = crc32c(0, m->middle->vec.iov_base,
m->middle->vec.iov_len);
}
/* (page) data */
@ -2360,8 +2362,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
m->front.iov_len = front_len;
/* middle */
m->middle.iov_base = NULL;
m->middle.iov_len = 0;
m->middle = NULL;
/* data */
m->nr_pages = calc_pages_for(page_off, page_len);
@ -2388,11 +2389,7 @@ void ceph_msg_kfree(struct ceph_msg *m)
vfree(m->front.iov_base);
else
kfree(m->front.iov_base);
if (m->middle.iov_base) {
dout("vfree %p\n", m->middle.iov_base);
vfree(m->middle.iov_base);
dout("vfree done\n");
}
ceph_buffer_put(m->middle);
kfree(m);
dout("msg_kfree %p done\n", m);
}

View File

@ -9,6 +9,7 @@
#include <linux/workqueue.h>
#include "types.h"
#include "buffer.h"
struct ceph_msg;
@ -97,7 +98,8 @@ struct ceph_messenger {
struct ceph_msg {
struct ceph_msg_header hdr; /* header */
struct ceph_msg_footer footer; /* footer */
struct kvec front, middle; /* unaligned blobs of message */
struct kvec front; /* unaligned blobs of message */
struct ceph_buffer *middle;
struct mutex page_mutex;
struct page **pages; /* data payload. NOT OWNER. */
unsigned nr_pages; /* size of page array */

View File

@ -133,10 +133,9 @@ void ceph_msgpool_put(struct ceph_msg_pool *pool, struct ceph_msg *msg)
spin_lock(&pool->lock);
if (pool->num < pool->min) {
/* drop middle, if any */
if (msg->middle.iov_base) {
vfree(msg->middle.iov_base);
msg->middle.iov_base = NULL;
msg->middle.iov_len = 0;
if (msg->middle) {
ceph_buffer_put(msg->middle);
msg->middle = NULL;
}
ceph_msg_get(msg); /* retake a single ref */
list_add(&msg->list_head, &pool->msgs);

View File

@ -1048,10 +1048,10 @@ static int ceph_alloc_middle(void *p, struct ceph_msg *msg)
dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
ceph_msg_type_name(type), middle_len);
BUG_ON(!middle_len);
BUG_ON(msg->middle.iov_base);
BUG_ON(msg->middle);
msg->middle.iov_base = __vmalloc(middle_len, GFP_NOFS, PAGE_KERNEL);
if (!msg->middle.iov_base)
msg->middle = ceph_buffer_new_alloc(middle_len, GFP_NOFS);
if (!msg->middle)
return -ENOMEM;
return 0;
}

View File

@ -269,27 +269,21 @@ struct ceph_inode_xattr {
};
struct ceph_inode_xattrs_info {
struct rb_root xattrs;
/*
* (still encoded) xattr blob. we avoid the overhead of parsing
* this until someone actually calls getxattr, etc.
*
* if i_xattrs.len == 0 or 4, i_xattrs.data == NULL.
* i_xattrs.len == 4 implies there are no xattrs; 0 means we
* don't know.
* blob->vec.iov_len == 4 implies there are no xattrs; blob ==
* NULL means we don't know.
*/
int len;
char *data;
struct ceph_buffer *blob, *prealloc_blob;
struct rb_root index;
bool dirty;
int count;
int names_size;
int vals_size;
u64 version;
u64 index_version;
int dirty;
void *prealloc_blob;
int prealloc_size;
u64 version, index_version;
};
/*
@ -851,9 +845,7 @@ extern int ceph_setxattr(struct dentry *, const char *, const void *,
extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t);
extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
extern int ceph_removexattr(struct dentry *, const char *);
extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci,
void **xattrs_blob,
int *blob_size);
extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
/* caps.c */
extern const char *ceph_cap_string(int c);