rgw_file: fix mv/rename cases broken by zipper integration

There were two problems.  First, leaf object names must be
expressed as fully-qualified to the bucket as input to the
copy-object step.  Second, handle s->object in the same step
indicates the being-created destination object of the copy,
this was correct in the original zipper change but broken
later.

* add a rename/mv unit test

Tests for the following cases added:

1. move between two sub-directory paths in a single bucket
2. move between two names at the top level of a single bucket
3. move between sub-directory paths in different buckets (cross-bucket rename)

Fixes: https://tracker.ceph.com/issues/64950

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
This commit is contained in:
Matt Benjamin 2024-03-13 20:19:01 -04:00
parent 6185d084fa
commit f11cdb1e18
5 changed files with 330 additions and 6 deletions

View File

@ -65,4 +65,8 @@ ceph_test_librgw_file_gp ${K} --get --stat --put --create
echo "phase 5.2"
ceph_test_librgw_file_gp ${K} --delete
# rename tests
echo "phase 6.1"
ceph_test_librgw_file_rename ${K} --create
exit 0

View File

@ -2574,6 +2574,8 @@ public:
RGWFileHandle* dst_parent;
const std::string& src_name;
const std::string& dst_name;
std::string src_obj_name;
std::string dst_obj_name;
RGWCopyObjRequest(CephContext* _cct, std::unique_ptr<rgw::sal::User> _user,
RGWFileHandle* _src_parent, RGWFileHandle* _dst_parent,
@ -2607,13 +2609,14 @@ public:
state->src_bucket_name = src_parent->bucket_name();
state->bucket_name = dst_parent->bucket_name();
std::string dest_obj_name = dst_parent->format_child_name(dst_name, false);
src_obj_name = src_parent->format_child_name(src_name, false /* is_dir */);
dst_obj_name = dst_parent->format_child_name(dst_name, false);
int rc = valid_s3_object_name(dest_obj_name);
int rc = valid_s3_object_name(dst_obj_name);
if (rc != 0)
return rc;
state->object = RGWHandler::driver->get_object(rgw_obj_key(dest_obj_name));
state->object = RGWHandler::driver->get_object(rgw_obj_key(dst_obj_name));
/* XXX and fixup key attr (could optimize w/string ref and
* dest_obj_name) */
@ -2638,9 +2641,8 @@ public:
/* we don't have (any) headers, so just create default ACLs */
dest_policy.create_default(s->owner.id, s->owner.display_name);
/* src_object required before RGWCopyObj::verify_permissions() */
rgw_obj_key k = rgw_obj_key(src_name);
rgw_obj_key k = rgw_obj_key(src_obj_name);
s->src_object = s->bucket->get_object(k);
s->object = s->src_object->clone(); // needed to avoid trap at rgw_op.cc:5150
return 0;
}

View File

@ -5497,7 +5497,8 @@ int RGWCopyObj::verify_permission(optional_yield y)
rgw_placement_rule src_placement;
/* check source object permissions */
op_ret = read_obj_policy(this, driver, s, src_bucket->get_info(), src_bucket->get_attrs(), src_acl, &src_placement.storage_class,
op_ret = read_obj_policy(this, driver, s, src_bucket->get_info(),
src_bucket->get_attrs(), src_acl, &src_placement.storage_class,
src_policy, src_bucket.get(), s->src_object.get(), y);
if (op_ret < 0) {
return op_ret;

View File

@ -287,6 +287,7 @@ install(TARGETS ceph_test_librgw_file DESTINATION ${CMAKE_INSTALL_BINDIR})
add_dependencies(ceph_test_librgw_file
ceph_test_librgw_file_cd
ceph_test_librgw_file_gp
ceph_test_librgw_file_rename
ceph_test_librgw_file_nfsns
ceph_test_librgw_file_aw
ceph_test_librgw_file_marker)
@ -391,6 +392,20 @@ target_link_libraries(ceph_test_librgw_file_xattr
)
target_link_libraries(ceph_test_librgw_file_xattr spawn)
# ceph_test_librgw_file_rename (mv/rename tests)
add_executable(ceph_test_librgw_file_rename
librgw_file_rename.cc
)
target_link_libraries(ceph_test_librgw_file_rename
rgw
librados
ceph-common
${UNITTEST_LIBS}
${EXTRALIBS}
${ALLOC_LIBS}
)
install(TARGETS ceph_test_librgw_file_rename DESTINATION ${CMAKE_INSTALL_BINDIR})
# ceph_test_rgw_token
add_executable(ceph_test_rgw_token
test_rgw_token.cc

View File

@ -0,0 +1,302 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <stdint.h>
#include <tuple>
#include <iostream>
#include "include/rados/librgw.h"
#include "include/rados/rgw_file.h"
#include "gtest/gtest.h"
#include "common/ceph_argparse.h"
#include "common/debug.h"
#define dout_subsys ceph_subsys_rgw
using namespace std;
namespace {
librgw_t rgw = nullptr;
string userid("testuser");
string access_key("");
string secret_key("");
struct rgw_fs *fs = nullptr;
uint32_t owner_uid = 867;
uint32_t owner_gid = 5309;
uint32_t create_mask = RGW_SETATTR_UID | RGW_SETATTR_GID | RGW_SETATTR_MODE;
bool do_create = false;
bool do_delete = false;
string bucket1_name = "wyndemere";
string bucket2_name = "galahad";
string obj_name1 = "tommy1";
string obj_name2 = "ricky1";
string obj_name3 = "zoot";
struct rgw_file_handle* bucket1_fh = nullptr;
struct rgw_file_handle* bucket2_fh = nullptr;
struct rgw_file_handle* object_fh = nullptr;
string subdir1_name = "meep";
struct rgw_file_handle* subdir1_fh;
string subdir2_name = "mork";
struct rgw_file_handle* subdir2_fh;
struct {
int argc;
char **argv;
} saved_args;
}
TEST(LibRGW, INIT) {
int ret = librgw_create(&rgw, saved_args.argc, saved_args.argv);
ASSERT_EQ(ret, 0);
ASSERT_NE(rgw, nullptr);
}
TEST(LibRGW, MOUNT) {
int ret = rgw_mount2(rgw, userid.c_str(), access_key.c_str(),
secret_key.c_str(), "/", &fs, RGW_MOUNT_FLAG_NONE);
ASSERT_EQ(ret, 0);
ASSERT_NE(fs, nullptr);
}
TEST(LibRGW, CREATE_BUCKETS) {
if (do_create) {
struct stat st;
int ret{0};
st.st_uid = owner_uid;
st.st_gid = owner_gid;
st.st_mode = 755;
ret = rgw_mkdir(fs, fs->root_fh, bucket1_name.c_str(), &st, create_mask,
&bucket1_fh, RGW_MKDIR_FLAG_NONE);
ASSERT_EQ(ret, 0);
ret = rgw_fh_rele(fs, bucket1_fh, 0 /* flags */);
ASSERT_EQ(ret, 0);
ret = rgw_mkdir(fs, fs->root_fh, bucket2_name.c_str(), &st, create_mask,
&bucket2_fh, RGW_MKDIR_FLAG_NONE);
ASSERT_EQ(ret, 0);
ret = rgw_fh_rele(fs, bucket2_fh, 0 /* flags */);
ASSERT_EQ(ret, 0);
}
}
TEST(LibRGW, LOOKUP_BUCKETS) {
int ret{0};
ret = rgw_lookup(fs, fs->root_fh, bucket1_name.c_str(), &bucket1_fh,
nullptr, 0, RGW_LOOKUP_FLAG_NONE);
ASSERT_EQ(ret, 0);
ret = rgw_lookup(fs, fs->root_fh, bucket2_name.c_str(), &bucket2_fh,
nullptr, 0, RGW_LOOKUP_FLAG_NONE);
ASSERT_EQ(ret, 0);
}
static inline
int make_object(struct rgw_file_handle* parent_fh, const string& name) {
int ret{0};
ret = rgw_lookup(fs, parent_fh, name.c_str(), &object_fh,
nullptr, 0, RGW_LOOKUP_FLAG_CREATE);
ret = rgw_open(fs, object_fh, 0 /* posix flags */, 0 /* flags */);
size_t nbytes;
string data = "hi mom";
ret = rgw_write(fs, object_fh, 0, data.length(), &nbytes,
(void*) data.c_str(), RGW_WRITE_FLAG_NONE);
/* commit write transaction */
ret = rgw_close(fs, object_fh, 0 /* flags */);
return ret;
}
TEST(LibRGW, TOPDIR_RENAME) {
/* rename a file directly residing at the bucket */
int ret{0};
ret = make_object(bucket1_fh, obj_name1);
ASSERT_EQ(ret, 0);
/* now move it */
ret = rgw_rename(fs,
bucket1_fh, obj_name1.c_str(),
bucket1_fh, obj_name2.c_str(),
0 /* flags */);
ASSERT_EQ(ret, 0);
/* now check the result */
struct rgw_file_handle* name2_fh;
ret = rgw_lookup(fs, bucket1_fh, obj_name2.c_str(), &name2_fh,
nullptr, 0, RGW_LOOKUP_FLAG_NONE);
ASSERT_EQ(ret, 0);
/* release file handle */
ret = rgw_fh_rele(fs, name2_fh, 0 /* flags */);
ASSERT_EQ(ret, 0);
}
TEST(LibRGW, SUBDIR_RENAME) {
int ret{0};
if (do_create) {
struct stat st;
st.st_uid = owner_uid;
st.st_gid = owner_gid;
st.st_mode = 755;
ret = rgw_mkdir(fs, bucket1_fh, subdir1_name.c_str(), &st, create_mask,
&subdir1_fh, RGW_MKDIR_FLAG_NONE);
ASSERT_EQ(ret, 0);
ret = make_object(subdir1_fh, obj_name1);
ASSERT_EQ(ret, 0);
} else {
ret = rgw_lookup(fs, bucket1_fh, subdir1_name.c_str(), &subdir1_fh,
nullptr, 0, RGW_LOOKUP_FLAG_NONE);
ASSERT_EQ(ret, 0);
}
/* now move it */
ret = rgw_rename(fs,
subdir1_fh, obj_name1.c_str(),
subdir1_fh, obj_name2.c_str(),
0 /* flags */);
ASSERT_EQ(ret, 0);
/* now check the result */
struct rgw_file_handle* name2_fh;
ret = rgw_lookup(fs, subdir1_fh, obj_name2.c_str(), &name2_fh,
nullptr, 0, RGW_LOOKUP_FLAG_NONE);
ASSERT_EQ(ret, 0);
/* release file handle */
ret = rgw_fh_rele(fs, name2_fh, 0 /* flags */);
ASSERT_EQ(ret, 0);
/* we'll re-use subdir1_fh */
}
TEST(LibRGW, CROSS_BUCKET_RENAME) {
/* rename a file across bucket boundaries */
int ret{0};
if (do_create) {
struct stat st;
st.st_uid = owner_uid;
st.st_gid = owner_gid;
st.st_mode = 755;
ret = rgw_mkdir(fs, bucket2_fh, subdir2_name.c_str(), &st, create_mask,
&subdir2_fh, RGW_MKDIR_FLAG_NONE); // galahad/mork
ASSERT_EQ(ret, 0);
ret = make_object(subdir1_fh, obj_name1); // wyndemere/meep/tommy1
ASSERT_EQ(ret, 0);
} else {
ret = rgw_lookup(fs, bucket2_fh, subdir2_name.c_str(), &subdir2_fh,
nullptr, 0, RGW_LOOKUP_FLAG_NONE);
ASSERT_EQ(ret, 0);
}
/* now move it -- subdir2 is directory mork in bucket galahad */
ret = rgw_rename(fs,
subdir1_fh, obj_name1.c_str(),
subdir2_fh, obj_name3.c_str(),
0 /* flags */);
ASSERT_EQ(ret, 0);
/* now check the result */
struct rgw_file_handle* name3_fh; // galahad/mork/zoot
ret = rgw_lookup(fs, subdir2_fh, obj_name3.c_str(), &name3_fh,
nullptr, 0, RGW_LOOKUP_FLAG_NONE);
ASSERT_EQ(ret, 0);
/* release file handle */
ret = rgw_fh_rele(fs, name3_fh, 0 /* flags */);
ASSERT_EQ(ret, 0);
}
TEST(LibRGW, CLEANUP) {
// do nothing
}
TEST(LibRGW, UMOUNT) {
if (! fs)
return;
int ret = rgw_umount(fs, RGW_UMOUNT_FLAG_NONE);
ASSERT_EQ(ret, 0);
}
TEST(LibRGW, SHUTDOWN) {
librgw_shutdown(rgw);
}
int main(int argc, char *argv[])
{
auto args = argv_to_vec(argc, argv);
env_to_vec(args);
char* v = getenv("AWS_ACCESS_KEY_ID");
if (v) {
access_key = v;
}
v = getenv("AWS_SECRET_ACCESS_KEY");
if (v) {
secret_key = v;
}
string val;
for (auto arg_iter = args.begin(); arg_iter != args.end();) {
if (ceph_argparse_witharg(args, arg_iter, &val, "--access",
(char*) nullptr)) {
access_key = val;
} else if (ceph_argparse_witharg(args, arg_iter, &val, "--secret",
(char*) nullptr)) {
secret_key = val;
} else if (ceph_argparse_witharg(args, arg_iter, &val, "--userid",
(char*) nullptr)) {
userid = val;
} else if (ceph_argparse_witharg(args, arg_iter, &val, "--uid",
(char*) nullptr)) {
owner_uid = std::stoi(val);
} else if (ceph_argparse_witharg(args, arg_iter, &val, "--gid",
(char*) nullptr)) {
owner_gid = std::stoi(val);
} else if (ceph_argparse_flag(args, arg_iter, "--create",
(char*) nullptr)) {
do_create = true;
} else if (ceph_argparse_flag(args, arg_iter, "--delete",
(char*) nullptr)) {
do_delete = true;
} else {
++arg_iter;
}
}
/* don't accidentally run as anonymous */
if ((access_key == "") ||
(secret_key == "")) {
std::cout << argv[0] << " no AWS credentials, exiting" << std::endl;
return EPERM;
}
saved_args.argc = argc;
saved_args.argv = argv;
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}