filestore: split xattrs to multiple chunks

This commit is contained in:
Yehuda Sadeh 2010-10-22 10:09:32 -07:00
parent 515efd5a2f
commit f96eb80522
2 changed files with 248 additions and 41 deletions

View File

@ -47,7 +47,8 @@
#include <sstream>
#define ATTR_MAX 80
#define ATTR_MAX_NAME_LEN 128
#define ATTR_MAX_BLOCK_LEN 512 // (3*1024)
#define COMMIT_SNAP_ITEM "snap_%lld"
@ -70,42 +71,229 @@
#include <map>
/*
* xattr portability stupidity. hide errno, while we're at it.
*/
int do_getxattr(const char *fn, const char *name, void *val, size_t size) {
#ifdef DARWIN
static int sys_getxattr(const char *fn, const char *name, void *val, size_t size)
{
int r = ::getxattr(fn, name, val, size, 0, 0);
#else
int r = ::getxattr(fn, name, val, size);
#endif
return r < 0 ? -errno:r;
return (r < 0 ? -errno : r);
}
int do_setxattr(const char *fn, const char *name, const void *val, size_t size) {
#ifdef DARWIN
static int sys_setxattr(const char *fn, const char *name, const void *val, size_t size)
{
int r = ::setxattr(fn, name, val, size, 0, 0);
#else
int r = ::setxattr(fn, name, val, size, 0);
#endif
return r < 0 ? -errno:r;
return (r < 0 ? -errno : r);
}
int do_removexattr(const char *fn, const char *name) {
#ifdef DARWIN
static int sys_removexattr(const char *fn, const char *name)
{
int r = ::removexattr(fn, name, 0);
#else
int r = ::removexattr(fn, name);
#endif
return r < 0 ? -errno:r;
return (r < 0 ? -errno : r);
}
int do_listxattr(const char *fn, char *names, size_t len) {
#ifdef DARWIN
int sys_removexattr(const char *fn, const char *name)
{
int r = ::removexattr(fn, name, 0);
return (r < 0 ? -errno : r);
}
int sys_listxattr(const char *fn, char *names, size_t len)
{
int r = ::listxattr(fn, names, len, 0);
return (r < 0 ? -errno : r);
}
#else
static int sys_getxattr(const char *fn, const char *name, void *val, size_t size)
{
int r = ::getxattr(fn, name, val, size);
return (r < 0 ? -errno : r);
}
static int sys_setxattr(const char *fn, const char *name, const void *val, size_t size)
{
int r = ::setxattr(fn, name, val, size, 0);
return (r < 0 ? -errno : r);
}
static int sys_removexattr(const char *fn, const char *name)
{
int r = ::removexattr(fn, name);
return (r < 0 ? -errno : r);
}
int sys_listxattr(const char *fn, char *names, size_t len)
{
int r = ::listxattr(fn, names, len);
return (r < 0 ? -errno : r);
}
#endif
return r < 0 ? -errno:r;
static void get_raw_xattr_name(const char *name, int i, char *raw_name, int raw_len)
{
int r;
if (!i)
r = snprintf(raw_name, raw_len, "%s", name);
else
r = snprintf(raw_name, raw_len, "%s@%d", name, i);
assert(r < raw_len);
}
int do_getxattr_len(const char *fn, const char *name)
{
int i = 0, total = 0;
char raw_name[ATTR_MAX_NAME_LEN * 2 + 16];
int r;
do {
get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
r = sys_getxattr(fn, raw_name, 0, 0);
if (!i && r < 0) {
return r;
}
if (r < 0)
break;
total += r;
i++;
} while (r == ATTR_MAX_BLOCK_LEN);
return total;
}
int do_getxattr(const char *fn, const char *name, void *val, size_t size)
{
int i = 0, pos = 0;
char raw_name[ATTR_MAX_NAME_LEN * 2 + 16];
int ret = 0;
int r;
size_t chunk_size;
if (!size)
return do_getxattr_len(fn, name);
do {
chunk_size = (size < ATTR_MAX_BLOCK_LEN ? size : ATTR_MAX_BLOCK_LEN);
get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
size -= chunk_size;
r = sys_getxattr(fn, raw_name, (char *)val + pos, chunk_size);
if (r < 0) {
ret = r;
break;
}
if (r > 0)
pos += r;
i++;
} while (size && r == ATTR_MAX_BLOCK_LEN);
if (r >= 0) {
ret = pos;
/* is there another chunk? that can happen if the last read size span over
exactly one block */
if (chunk_size == ATTR_MAX_BLOCK_LEN) {
get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
r = sys_getxattr(fn, raw_name, 0, 0);
if (r > 0) { // there's another chunk.. the original buffer was too small
ret = -ERANGE;
}
}
}
return ret;
}
int do_setxattr(const char *fn, const char *name, const void *val, size_t size) {
int i = 0, pos = 0;
char raw_name[ATTR_MAX_NAME_LEN * 2 + 16];
int ret = 0;
size_t chunk_size;
do {
chunk_size = (size < ATTR_MAX_BLOCK_LEN ? size : ATTR_MAX_BLOCK_LEN);
get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
size -= chunk_size;
int r = sys_setxattr(fn, raw_name, (char *)val + pos, chunk_size);
if (r < 0) {
ret = r;
break;
}
pos += chunk_size;
ret = pos;
i++;
} while (size);
/* if we're exactly at a chunk size, remove the next one (if wasn't removed
before) */
if (ret >= 0 && chunk_size == ATTR_MAX_BLOCK_LEN) {
get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
sys_removexattr(fn, raw_name);
}
return ret;
}
int do_removexattr(const char *fn, const char *name) {
int i = 0;
char raw_name[ATTR_MAX_NAME_LEN * 2 + 16];
int r;
do {
get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
r = sys_removexattr(fn, raw_name);
if (!i && r < 0) {
return r;
}
i++;
} while (r >= 0);
return 0;
}
int do_listxattr(const char *fn, char *names, size_t len) {
int r;
if (!len)
return sys_listxattr(fn, names, len);
r = sys_listxattr(fn, 0, 0);
if (r < 0)
return r;
size_t total_len = r * 2; // should be enough
char *full_buf = (char *)malloc(total_len * 2);
if (!full_buf)
return -ENOMEM;
r = sys_listxattr(fn, full_buf, total_len);
if (r < 0)
return r;
char *p = full_buf;
char *end = full_buf + r;
char *dest = names;
char *dest_end = names + len;
while (p < end) {
int attr_len = strlen(p);
if (!strchr(p, '@')) {
if (dest + attr_len > dest_end) {
r = -ERANGE;
goto done;
}
strcpy(dest, p);
dest += attr_len + 1;
}
p += attr_len + 1;
}
r = dest - names;
done:
free(full_buf);
return r;
}
@ -116,6 +304,7 @@ static void get_attrname(const char *name, char *buf, int len)
{
snprintf(buf, len, "user.ceph.%s", name);
}
bool parse_attrname(char **name)
{
if (strncmp(*name, "user.ceph.", 10) == 0) {
@ -1985,8 +2174,8 @@ int FileStore::getattr(coll_t cid, const sobject_t& oid, const char *name,
char fn[PATH_MAX];
get_coname(cid, oid, fn, sizeof(fn));
dout(15) << "getattr " << fn << " '" << name << "' len " << size << dendl;
char n[ATTR_MAX];
get_attrname(name, n, ATTR_MAX);
char n[ATTR_MAX_NAME_LEN];
get_attrname(name, n, ATTR_MAX_NAME_LEN);
int r = do_getxattr(fn, n, value, size);
dout(10) << "getattr " << fn << " '" << name << "' len " << size << " = " << r << dendl;
return r;
@ -1999,8 +2188,8 @@ int FileStore::getattr(coll_t cid, const sobject_t& oid, const char *name, buffe
char fn[PATH_MAX];
get_coname(cid, oid, fn, sizeof(fn));
dout(15) << "getattr " << fn << " '" << name << "'" << dendl;
char n[ATTR_MAX];
get_attrname(name, n, ATTR_MAX);
char n[ATTR_MAX_NAME_LEN];
get_attrname(name, n, ATTR_MAX_NAME_LEN);
int r = _getattr(fn, n, bp);
dout(10) << "getattr " << fn << " '" << name << "' = " << r << dendl;
return r;
@ -2030,8 +2219,8 @@ int FileStore::_setattr(coll_t cid, const sobject_t& oid, const char *name,
char fn[PATH_MAX];
get_coname(cid, oid, fn, sizeof(fn));
dout(15) << "setattr " << fn << " '" << name << "' len " << size << dendl;
char n[ATTR_MAX];
get_attrname(name, n, ATTR_MAX);
char n[ATTR_MAX_NAME_LEN];
get_attrname(name, n, ATTR_MAX_NAME_LEN);
int r = do_setxattr(fn, n, value, size);
dout(10) << "setattr " << fn << " '" << name << "' len " << size << " = " << r << dendl;
return r;
@ -2048,8 +2237,8 @@ int FileStore::_setattrs(coll_t cid, const sobject_t& oid, map<string,bufferptr>
for (map<string,bufferptr>::iterator p = aset.begin();
p != aset.end();
++p) {
char n[ATTR_MAX];
get_attrname(p->first.c_str(), n, ATTR_MAX);
char n[ATTR_MAX_NAME_LEN];
get_attrname(p->first.c_str(), n, ATTR_MAX_NAME_LEN);
const char *val;
if (p->second.length())
val = p->second.c_str();
@ -2074,8 +2263,8 @@ int FileStore::_rmattr(coll_t cid, const sobject_t& oid, const char *name)
char fn[PATH_MAX];
get_coname(cid, oid, fn, sizeof(fn));
dout(15) << "rmattr " << fn << " '" << name << "'" << dendl;
char n[ATTR_MAX];
get_attrname(name, n, ATTR_MAX);
char n[ATTR_MAX_NAME_LEN];
get_attrname(name, n, ATTR_MAX_NAME_LEN);
int r = do_removexattr(fn, n);
dout(10) << "rmattr " << fn << " '" << name << "' = " << r << dendl;
return r;
@ -2094,8 +2283,8 @@ int FileStore::_rmattrs(coll_t cid, const sobject_t& oid)
int r = _getattrs(fn, aset);
if (r >= 0) {
for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); p++) {
char n[ATTR_MAX];
get_attrname(p->first.c_str(), n, ATTR_MAX);
char n[ATTR_MAX_NAME_LEN];
get_attrname(p->first.c_str(), n, ATTR_MAX_NAME_LEN);
r = do_removexattr(fn, n);
if (r < 0)
break;

View File

@ -47,8 +47,9 @@ void usage()
cerr << " get objname [outfile] -- fetch object\n";
cerr << " put objname [infile] -- write object\n";
cerr << " rm objname -- remove object\n";
cerr << " getxattr objame attr\n";
cerr << " setxattr objame attr val\n";
cerr << " listxattr objname\n";
cerr << " getxattr objname attr\n";
cerr << " setxattr objname attr val\n";
cerr << " ls -- list objects in pool\n\n";
cerr << " chown 123 -- change the pool owner to auid 123\n";
@ -317,11 +318,28 @@ int main(int argc, const char **argv)
bufferlist bl;
ret = rados.getxattr(p, oid, attr_name.c_str(), bl);
if (ret < 0) {
cerr << "error setting xattr " << pool << "/" << oid << "/" << attr_name << ": " << strerror_r(-ret, buf, sizeof(buf)) << std::endl;
cerr << "error getting xattr " << pool << "/" << oid << "/" << attr_name << ": " << strerror_r(-ret, buf, sizeof(buf)) << std::endl;
goto out;
}
string s(bl.c_str(), bl.length());
cout << s << std::endl;
} else if (strcmp(nargs[0], "listxattr") == 0) {
if (!pool || nargs.size() < 2)
usage();
string oid(nargs[1]);
map<std::string, bufferlist> attrset;
bufferlist bl;
ret = rados.getxattrs(p, oid, attrset);
if (ret < 0) {
cerr << "error getting xattr set " << pool << "/" << oid << ": " << strerror_r(-ret, buf, sizeof(buf)) << std::endl;
goto out;
}
for (map<std::string, bufferlist>::iterator iter = attrset.begin();
iter != attrset.end(); ++iter) {
cout << iter->first << std::endl;
}
}
else if (strcmp(nargs[0], "rm") == 0) {
if (!pool || nargs.size() < 2)