Merge PR #36603 into master

* refs/pull/36603/head:
	test: add test for setting ceph mirror virtual xattr
	tests/pybind/cephfs: cleanup xattrs before starting tests
	client: filter ceph.* xattrs from listing
	client: force an attr fetch for ceph xattrs
	client: changes for ceph.mirror.info xattr
	mds: restrict setting/removing certain xattrs in ceph namespace
	mds: introduce ceph.mirror.info virtual xattr
	mds: customize xattr handling using dispatch handlers
	mds: introduce is_ceph_vxattr() helper

Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
Patrick Donnelly 2020-10-18 18:44:51 -07:00
commit 21e63cd4ae
No known key found for this signature in database
GPG Key ID: 3A2A7E25BEA8AADB
5 changed files with 399 additions and 29 deletions

View File

@ -11728,7 +11728,7 @@ int Client::_getxattr(Inode *in, const char *name, void *value, size_t size,
if (vxattr->flags & VXATTR_RSTAT) {
flags |= CEPH_STAT_RSTAT;
}
r = _getattr(in, flags, perms, true);
r = _getattr(in, flags | CEPH_STAT_CAP_XATTR, perms, true);
if (r != 0) {
// Error from getattr!
return r;
@ -11821,8 +11821,12 @@ int Client::_listxattr(Inode *in, char *name, size_t size,
}
r = 0;
for (const auto& p : in->xattrs) {
size_t this_len = p.first.length() + 1;
for ([[maybe_unused]] const auto &[xattr_name, xattr_value_bl] : in->xattrs) {
if (xattr_name.rfind("ceph.", 0) == 0) {
continue;
}
size_t this_len = xattr_name.length() + 1;
r += this_len;
if (len_only)
continue;
@ -11832,7 +11836,7 @@ int Client::_listxattr(Inode *in, char *name, size_t size,
goto out;
}
memcpy(name, p.first.c_str(), this_len);
memcpy(name, xattr_name.c_str(), this_len);
name += this_len;
size -= this_len;
}
@ -12266,6 +12270,21 @@ size_t Client::_vxattrcb_snap_btime(Inode *in, char *val, size_t size)
(long unsigned)in->snap_btime.nsec());
}
bool Client::_vxattrcb_mirror_info_exists(Inode *in)
{
// checking one of the xattrs would suffice
return in->xattrs.count("ceph.mirror.info.cluster_id") != 0;
}
size_t Client::_vxattrcb_mirror_info(Inode *in, char *val, size_t size)
{
return snprintf(val, size, "cluster_id=%.*s fs_id=%.*s",
in->xattrs["ceph.mirror.info.cluster_id"].length(),
in->xattrs["ceph.mirror.info.cluster_id"].c_str(),
in->xattrs["ceph.mirror.info.fs_id"].length(),
in->xattrs["ceph.mirror.info.fs_id"].c_str());
}
#define CEPH_XATTR_NAME(_type, _name) "ceph." #_type "." #_name
#define CEPH_XATTR_NAME2(_type, _name, _name2) "ceph." #_type "." #_name "." #_name2
@ -12347,6 +12366,13 @@ const Client::VXattr Client::_dir_vxattrs[] = {
exists_cb: &Client::_vxattrcb_snap_btime_exists,
flags: 0,
},
{
name: "ceph.mirror.info",
getxattr_cb: &Client::_vxattrcb_mirror_info,
readonly: false,
exists_cb: &Client::_vxattrcb_mirror_info_exists,
flags: 0,
},
{ name: "" } /* Required table terminator */
};

View File

@ -1294,6 +1294,9 @@ private:
bool _vxattrcb_snap_btime_exists(Inode *in);
size_t _vxattrcb_snap_btime(Inode *in, char *val, size_t size);
bool _vxattrcb_mirror_info_exists(Inode *in);
size_t _vxattrcb_mirror_info(Inode *in, char *val, size_t size);
static const VXattr *_get_vxattrs(Inode *in);
static const VXattr *_match_vxattr(Inode *in, const char *name);

View File

@ -56,7 +56,9 @@
#include <list>
#include <iostream>
#include <regex>
#include <string_view>
#include <functional>
#include "common/config.h"
@ -5816,13 +5818,191 @@ void Server::handle_remove_vxattr(MDRequestRef& mdr, CInode *cur)
respond_to_request(mdr, -ENODATA);
}
const Server::XattrHandler Server::xattr_handlers[] = {
{
xattr_name: Server::DEFAULT_HANDLER,
description: "default xattr handler",
validate: &Server::default_xattr_validate,
setxattr: &Server::default_setxattr_handler,
removexattr: &Server::default_removexattr_handler,
},
{
xattr_name: "ceph.mirror.info",
description: "mirror info xattr handler",
validate: &Server::mirror_info_xattr_validate,
setxattr: &Server::mirror_info_setxattr_handler,
removexattr: &Server::mirror_info_removexattr_handler
},
};
const Server::XattrHandler* Server::get_xattr_or_default_handler(std::string_view xattr_name) {
const XattrHandler *default_xattr_handler = nullptr;
for (auto &handler : xattr_handlers) {
if (handler.xattr_name == Server::DEFAULT_HANDLER) {
ceph_assert(default_xattr_handler == nullptr);
default_xattr_handler = &handler;
}
if (handler.xattr_name == xattr_name) {
dout(20) << "handler=" << handler.description << dendl;
return &handler;
}
}
ceph_assert(default_xattr_handler != nullptr);
dout(20) << "handler=" << default_xattr_handler->description << dendl;
return default_xattr_handler;
}
int Server::xattr_validate(CInode *cur, const InodeStoreBase::xattr_map_const_ptr xattrs,
const std::string &xattr_name, int op, int flags) {
if (op == CEPH_MDS_OP_SETXATTR) {
if (xattrs) {
if ((flags & CEPH_XATTR_CREATE) && xattrs->count(mempool::mds_co::string(xattr_name))) {
dout(10) << "setxattr '" << xattr_name << "' XATTR_CREATE and EEXIST on " << *cur << dendl;
return -EEXIST;
}
}
if ((flags & CEPH_XATTR_REPLACE) && !(xattrs && xattrs->count(mempool::mds_co::string(xattr_name)))) {
dout(10) << "setxattr '" << xattr_name << "' XATTR_REPLACE and ENODATA on " << *cur << dendl;
return -ENODATA;
}
return 0;
}
if (op == CEPH_MDS_OP_RMXATTR) {
if (xattrs && xattrs->count(mempool::mds_co::string(xattr_name)) == 0) {
dout(10) << "removexattr '" << xattr_name << "' and ENODATA on " << *cur << dendl;
return -ENODATA;
}
return 0;
}
derr << ": unhandled validation for: " << xattr_name << dendl;
return -EINVAL;
}
void Server::xattr_set(InodeStoreBase::xattr_map_ptr xattrs, const std::string &xattr_name,
const bufferlist &xattr_value) {
size_t len = xattr_value.length();
bufferptr b = buffer::create(len);
if (len) {
xattr_value.begin().copy(len, b.c_str());
}
auto em = xattrs->emplace(std::piecewise_construct,
std::forward_as_tuple(mempool::mds_co::string(xattr_name)),
std::forward_as_tuple(b));
if (!em.second) {
em.first->second = b;
}
}
void Server::xattr_rm(InodeStoreBase::xattr_map_ptr xattrs, const std::string &xattr_name) {
xattrs->erase(mempool::mds_co::string(xattr_name));
}
int Server::default_xattr_validate(CInode *cur, const InodeStoreBase::xattr_map_const_ptr xattrs,
XattrOp *xattr_op) {
return xattr_validate(cur, xattrs, xattr_op->xattr_name, xattr_op->op, xattr_op->flags);
}
void Server::default_setxattr_handler(CInode *cur, InodeStoreBase::xattr_map_ptr xattrs,
const XattrOp &xattr_op) {
xattr_set(xattrs, xattr_op.xattr_name, xattr_op.xattr_value);
}
void Server::default_removexattr_handler(CInode *cur, InodeStoreBase::xattr_map_ptr xattrs,
const XattrOp &xattr_op) {
xattr_rm(xattrs, xattr_op.xattr_name);
}
// mirror info xattr handlers
const std::string Server::MirrorXattrInfo::MIRROR_INFO_REGEX = "^cluster_id=([a-f0-9]{8}-" \
"[a-f0-9]{4}-[a-f0-9]{4}-" \
"[a-f0-9]{4}-[a-f0-9]{12})" \
" fs_id=(\\d+)$";
const std::string Server::MirrorXattrInfo::CLUSTER_ID = "ceph.mirror.info.cluster_id";
const std::string Server::MirrorXattrInfo::FS_ID = "ceph.mirror.info.fs_id";
int Server::parse_mirror_info_xattr(const std::string &name, const std::string &value,
std::string &cluster_id, std::string &fs_id) {
dout(20) << "parsing name=" << name << ", value=" << value << dendl;
static const std::regex regex(Server::MirrorXattrInfo::MIRROR_INFO_REGEX);
std::smatch match;
std::regex_search(value, match, regex);
if (match.size() != 3) {
derr << "mirror info parse error" << dendl;
return -EINVAL;
}
cluster_id = match[1];
fs_id = match[2];
dout(20) << " parsed cluster_id=" << cluster_id << ", fs_id=" << fs_id << dendl;
return 0;
}
int Server::mirror_info_xattr_validate(CInode *cur, const InodeStoreBase::xattr_map_const_ptr xattrs,
XattrOp *xattr_op) {
if (!cur->is_root()) {
return -EINVAL;
}
int v1 = xattr_validate(cur, xattrs, Server::MirrorXattrInfo::CLUSTER_ID, xattr_op->op, xattr_op->flags);
int v2 = xattr_validate(cur, xattrs, Server::MirrorXattrInfo::FS_ID, xattr_op->op, xattr_op->flags);
if (v1 != v2) {
derr << "inconsistent mirror info state (" << v1 << "," << v2 << ")" << dendl;
return -EINVAL;
}
if (v1 < 0) {
return v1;
}
if (xattr_op->op == CEPH_MDS_OP_RMXATTR) {
return 0;
}
std::string cluster_id;
std::string fs_id;
int r = parse_mirror_info_xattr(xattr_op->xattr_name, xattr_op->xattr_value.to_str(),
cluster_id, fs_id);
if (r < 0) {
return r;
}
xattr_op->xinfo = std::make_unique<MirrorXattrInfo>(cluster_id, fs_id);
return 0;
}
void Server::mirror_info_setxattr_handler(CInode *cur, InodeStoreBase::xattr_map_ptr xattrs,
const XattrOp &xattr_op) {
auto mirror_info = dynamic_cast<MirrorXattrInfo&>(*(xattr_op.xinfo));
bufferlist bl;
bl.append(mirror_info.cluster_id.c_str(), mirror_info.cluster_id.length());
xattr_set(xattrs, Server::MirrorXattrInfo::CLUSTER_ID, bl);
bl.clear();
bl.append(mirror_info.fs_id.c_str(), mirror_info.fs_id.length());
xattr_set(xattrs, Server::MirrorXattrInfo::FS_ID, bl);
}
void Server::mirror_info_removexattr_handler(CInode *cur, InodeStoreBase::xattr_map_ptr xattrs,
const XattrOp &xattr_op) {
xattr_rm(xattrs, Server::MirrorXattrInfo::CLUSTER_ID);
xattr_rm(xattrs, Server::MirrorXattrInfo::FS_ID);
}
void Server::handle_client_setxattr(MDRequestRef& mdr)
{
const cref_t<MClientRequest> &req = mdr->client_request;
string name(req->get_path2());
// magic ceph.* namespace?
if (name.compare(0, 5, "ceph.") == 0) {
// is a ceph virtual xattr?
if (is_ceph_vxattr(name)) {
// can't use rdlock_path_pin_ref because we need to xlock snaplock/policylock
CInode *cur = try_get_auth_inode(mdr, req->get_filepath().get_ino());
if (!cur)
@ -5832,6 +6012,11 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
return;
}
if (!is_allowed_ceph_xattr(name)) {
respond_to_request(mdr, -EINVAL);
return;
}
CInode *cur = rdlock_path_pin_ref(mdr, true);
if (!cur)
return;
@ -5854,6 +6039,7 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
size_t len = req->get_data().length();
size_t inc = len + name.length();
auto handler = Server::get_xattr_or_default_handler(name);
const auto& pxattrs = cur->get_projected_xattrs();
if (pxattrs) {
// check xattrs kv pairs size
@ -5871,18 +6057,12 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
respond_to_request(mdr, -ENOSPC);
return;
}
if ((flags & CEPH_XATTR_CREATE) && pxattrs->count(mempool::mds_co::string(name))) {
dout(10) << "setxattr '" << name << "' XATTR_CREATE and EEXIST on " << *cur << dendl;
respond_to_request(mdr, -EEXIST);
return;
}
}
if ((flags & CEPH_XATTR_REPLACE) &&
!(pxattrs && pxattrs->count(mempool::mds_co::string(name)))) {
dout(10) << "setxattr '" << name << "' XATTR_REPLACE and ENODATA on " << *cur << dendl;
respond_to_request(mdr, -ENODATA);
XattrOp xattr_op(CEPH_MDS_OP_SETXATTR, name, req->get_data(), flags);
int r = std::invoke(handler->validate, this, cur, pxattrs, &xattr_op);
if (r < 0) {
respond_to_request(mdr, r);
return;
}
@ -5896,15 +6076,11 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
pi.inode->rstat.rctime = mdr->get_op_stamp();
pi.inode->change_attr++;
pi.inode->xattr_version++;
if ((flags & CEPH_XATTR_REMOVE)) {
pi.xattrs->erase(mempool::mds_co::string(name));
std::invoke(handler->removexattr, this, cur, pi.xattrs, xattr_op);
} else {
bufferptr b = buffer::create(len);
if (len)
req->get_data().begin().copy(len, b.c_str());
auto em = pi.xattrs->emplace(std::piecewise_construct, std::forward_as_tuple(mempool::mds_co::string(name)), std::forward_as_tuple(b));
if (!em.second)
em.first->second = b;
std::invoke(handler->setxattr, this, cur, pi.xattrs, xattr_op);
}
// log + wait
@ -5923,7 +6099,8 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
const cref_t<MClientRequest> &req = mdr->client_request;
std::string name(req->get_path2());
if (name.compare(0, 5, "ceph.") == 0) {
// is a ceph virtual xattr?
if (is_ceph_vxattr(name)) {
// can't use rdlock_path_pin_ref because we need to xlock snaplock/policylock
CInode *cur = try_get_auth_inode(mdr, req->get_filepath().get_ino());
if (!cur)
@ -5933,6 +6110,11 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
return;
}
if (!is_allowed_ceph_xattr(name)) {
respond_to_request(mdr, -EINVAL);
return;
}
CInode* cur = rdlock_path_pin_ref(mdr, true);
if (!cur)
return;
@ -5947,10 +6129,15 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
if (!mds->locker->acquire_locks(mdr, lov))
return;
auto handler = Server::get_xattr_or_default_handler(name);
bufferlist bl;
XattrOp xattr_op(CEPH_MDS_OP_RMXATTR, name, bl, 0);
const auto& pxattrs = cur->get_projected_xattrs();
if (pxattrs && pxattrs->count(mempool::mds_co::string(name)) == 0) {
dout(10) << "removexattr '" << name << "' and ENODATA on " << *cur << dendl;
respond_to_request(mdr, -ENODATA);
int r = std::invoke(handler->validate, this, cur, pxattrs, &xattr_op);
if (r < 0) {
respond_to_request(mdr, r);
return;
}
@ -5958,14 +6145,13 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
// project update
auto pi = cur->project_inode(mdr, true);
auto &px = *pi.xattrs;
pi.inode->version = cur->pre_dirty();
pi.inode->ctime = mdr->get_op_stamp();
if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
pi.inode->rstat.rctime = mdr->get_op_stamp();
pi.inode->change_attr++;
pi.inode->xattr_version++;
px.erase(mempool::mds_co::string(name));
std::invoke(handler->removexattr, this, cur, pi.xattrs, xattr_op);
// log + wait
mdr->ls = mdlog->get_current_segment();

View File

@ -30,6 +30,7 @@
#include "messages/MClientReclaimReply.h"
#include "messages/MLock.h"
#include "CInode.h"
#include "MDSRank.h"
#include "Mutation.h"
#include "MDSContext.h"
@ -318,6 +319,113 @@ private:
friend class ServerLogContext;
friend class Batch_Getattr_Lookup;
// placeholder for validation handler to store xattr specific
// data
struct XattrInfo {
virtual ~XattrInfo() {
}
};
struct MirrorXattrInfo : XattrInfo {
std::string cluster_id;
std::string fs_id;
static const std::string MIRROR_INFO_REGEX;
static const std::string CLUSTER_ID;
static const std::string FS_ID;
MirrorXattrInfo(std::string_view cluster_id,
std::string_view fs_id)
: cluster_id(cluster_id),
fs_id(fs_id) {
}
};
struct XattrOp {
int op;
std::string xattr_name;
const bufferlist &xattr_value;
int flags = 0;
std::unique_ptr<XattrInfo> xinfo;
XattrOp(int op, std::string_view xattr_name, const bufferlist &xattr_value, int flags)
: op(op),
xattr_name(xattr_name),
xattr_value(xattr_value),
flags (flags) {
}
};
struct XattrHandler {
const std::string xattr_name;
const std::string description;
// basic checks are to be done in this handler. return -errno to
// reject xattr request (set or remove), zero to proceed. handlers
// may parse xattr value for verification if needed and have an
// option to store custom data in XattrOp::xinfo.
int (Server::*validate)(CInode *cur, const InodeStoreBase::xattr_map_const_ptr xattrs,
XattrOp *xattr_op);
// set xattr for an inode in xattr_map
void (Server::*setxattr)(CInode *cur, InodeStoreBase::xattr_map_ptr xattrs,
const XattrOp &xattr_op);
// remove xattr for an inode from xattr_map
void (Server::*removexattr)(CInode *cur, InodeStoreBase::xattr_map_ptr xattrs,
const XattrOp &xattr_op);
};
inline static const std::string DEFAULT_HANDLER = "<default>";
static const XattrHandler xattr_handlers[];
const XattrHandler* get_xattr_or_default_handler(std::string_view xattr_name);
// generic variant to set/remove xattr in/from xattr_map
int xattr_validate(CInode *cur, const InodeStoreBase::xattr_map_const_ptr xattrs,
const std::string &xattr_name, int op, int flags);
void xattr_set(InodeStoreBase::xattr_map_ptr xattrs, const std::string &xattr_name,
const bufferlist &xattr_value);
void xattr_rm(InodeStoreBase::xattr_map_ptr xattrs, const std::string &xattr_name);
// default xattr handlers
int default_xattr_validate(CInode *cur, const InodeStoreBase::xattr_map_const_ptr xattrs,
XattrOp *xattr_op);
void default_setxattr_handler(CInode *cur, InodeStoreBase::xattr_map_ptr xattrs,
const XattrOp &xattr_op);
void default_removexattr_handler(CInode *cur, InodeStoreBase::xattr_map_ptr xattrs,
const XattrOp &xattr_op);
// mirror info xattr handler
int parse_mirror_info_xattr(const std::string &name, const std::string &value,
std::string &cluster_id, std::string &fs_id);
int mirror_info_xattr_validate(CInode *cur, const InodeStoreBase::xattr_map_const_ptr xattrs,
XattrOp *xattr_op);
void mirror_info_setxattr_handler(CInode *cur, InodeStoreBase::xattr_map_ptr xattrs,
const XattrOp &xattr_op);
void mirror_info_removexattr_handler(CInode *cur, InodeStoreBase::xattr_map_ptr xattrs,
const XattrOp &xattr_op);
static bool is_ceph_vxattr(std::string_view xattr_name) {
return xattr_name.rfind("ceph.dir.layout", 0) == 0 ||
xattr_name.rfind("ceph.file.layout", 0) == 0 ||
xattr_name.rfind("ceph.quota", 0) == 0 ||
xattr_name == "ceph.dir.subvolume"sv ||
xattr_name == "ceph.dir.pin"sv ||
xattr_name == "ceph.dir.pin.random"sv ||
xattr_name == "ceph.dir.pin.distributed"sv;
}
static bool is_allowed_ceph_xattr(std::string_view xattr_name) {
// not a ceph xattr -- allow!
if (xattr_name.rfind("ceph.", 0) != 0) {
return true;
}
return xattr_name == "ceph.mirror.info";
}
void reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &reply);
void flush_session(Session *session, MDSGatherBuilder& gather);

View File

@ -3,6 +3,7 @@ from nose.tools import assert_raises, assert_equal, assert_greater, with_setup
import cephfs as libcephfs
import fcntl
import os
import random
import time
import stat
import uuid
@ -34,6 +35,11 @@ def setup_test():
cephfs.closedir(d)
cephfs.chdir(b"/")
_, ret_buf = cephfs.listxattr("/")
print(f'ret_buf={ret_buf}')
xattrs = ret_buf.decode('utf-8').split('\x00')
for xattr in xattrs[:-1]:
cephfs.removexattr("/", xattr)
@with_setup(setup_test)
def test_conf_get():
@ -141,6 +147,47 @@ def test_xattr():
assert_equal(9, ret_val)
assert_equal("user.big\x00", ret_buff.decode('utf-8'))
@with_setup(setup_test)
def test_ceph_mirror_xattr():
def gen_mirror_xattr():
cluster_id = str(uuid.uuid4())
fs_id = random.randint(1, 10)
mirror_xattr = f'cluster_id={cluster_id} fs_id={fs_id}'
return mirror_xattr.encode('utf-8')
mirror_xattr_enc_1 = gen_mirror_xattr()
# mirror xattr is only allowed on root
cephfs.mkdir('/d0', 0o755)
assert_raises(libcephfs.InvalidValue, cephfs.setxattr,
'/d0', 'ceph.mirror.info', mirror_xattr_enc_1, os.XATTR_CREATE)
cephfs.rmdir('/d0')
cephfs.setxattr('/', 'ceph.mirror.info', mirror_xattr_enc_1, os.XATTR_CREATE)
assert_equal(mirror_xattr_enc_1, cephfs.getxattr('/', 'ceph.mirror.info'))
# setting again with XATTR_CREATE should fail
assert_raises(libcephfs.ObjectExists, cephfs.setxattr,
'/', 'ceph.mirror.info', mirror_xattr_enc_1, os.XATTR_CREATE)
# ceph.mirror.info should not show up in listing
ret_val, _ = cephfs.listxattr("/")
assert_equal(0, ret_val)
mirror_xattr_enc_2 = gen_mirror_xattr()
cephfs.setxattr('/', 'ceph.mirror.info', mirror_xattr_enc_2, os.XATTR_REPLACE)
assert_equal(mirror_xattr_enc_2, cephfs.getxattr('/', 'ceph.mirror.info'))
cephfs.removexattr('/', 'ceph.mirror.info')
# ceph.mirror.info is already removed
assert_raises(libcephfs.NoData, cephfs.getxattr, '/', 'ceph.mirror.info')
# removing again should throw error
assert_raises(libcephfs.NoData, cephfs.removexattr, "/", "ceph.mirror.info")
# check mirror info xattr format
assert_raises(libcephfs.InvalidValue, cephfs.setxattr, '/', 'ceph.mirror.info', b"unknown", 0)
@with_setup(setup_test)
def test_fxattr():
fd = cephfs.open(b'/file-fxattr', 'w', 0o755)