librgw: re-implement rgw_create

rgw_create hadn't been re-implemented for NFS namespace behavior,
where we expect it to create empty file objects.

It needs to do a full name conflict check with prefix-matching,
to catch conflicts within object names.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
This commit is contained in:
Matt Benjamin 2016-01-11 14:50:48 -05:00
parent ec924b6656
commit 117208c7d4
2 changed files with 62 additions and 23 deletions

View File

@ -643,17 +643,15 @@ static int valid_s3_bucket_name(const string& name, bool relaxed=false)
}
/*
generic create -- creates a regular file
generic create -- create an empty regular file
*/
int rgw_create(struct rgw_fs *rgw_fs,
struct rgw_file_handle *parent_fh,
const char *name, mode_t mode, struct stat *st,
struct rgw_file_handle **fh, uint32_t flags)
{
/* XXX a CREATE operation can be a precursor to the canonical
* OPEN, WRITE*, CLOSE transaction which writes or overwrites
* an object in S3 */
RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw);
RGWFileHandle* parent = get_rgwfh(parent_fh);
if ((! parent) ||
@ -665,27 +663,49 @@ int rgw_create(struct rgw_fs *rgw_fs,
using std::get;
LookupFHResult fhr = fs->lookup_fh(parent, name);
RGWFileHandle* rgw_fh = get<0>(fhr);
rgw_file_handle *lfh;
int rc = rgw_lookup(rgw_fs, parent_fh, name, &lfh,
RGW_LOOKUP_FLAG_NONE);
if (! rc) {
/* conflict! */
rc = rgw_fh_rele(rgw_fs, lfh, RGW_FH_RELE_FLAG_NONE);
return -EEXIST;
} else {
/* expand and check name */
std::string obj_name{parent->relative_object_name() + "/" + name};
if (! valid_s3_object_name(obj_name)) {
return -EINVAL;
} else {
/* create it */
buffer::list bl;
RGWPutObjRequest req(cct, fs->get_user(), parent->bucket_name(),
obj_name, bl);
rc = rgwlib.get_fe()->execute_req(&req);
int rc2 = req.get_ret();
if (! rgw_fh)
return -EINVAL;
/* mark if we created it */
if (get<1>(fhr) & RGWFileHandle::FLAG_CREATE) {
/* fill in stat data */
time_t now = time(0);
rgw_fh->set_times(now);
rgw_fh->open_for_create();
if ((rc == 0) &&
(rc2 == 0)) {
/* XXX atomicity */
LookupFHResult fhr = fs->lookup_fh(parent, name,
RGWFileHandle::FLAG_CREATE);
RGWFileHandle* rgw_fh = get<0>(fhr);
if (rgw_fh) {
if (get<1>(fhr) & RGWFileHandle::FLAG_CREATE) {
/* fill in stat data */
time_t now = time(0);
rgw_fh->set_times(now);
rgw_fh->open_for_create(); // XXX needed?
}
(void) rgw_fh->stat(st);
struct rgw_file_handle *rfh = rgw_fh->get_fh();
*fh = rfh;
} else
rc = -EIO;
}
}
}
(void) rgw_fh->stat(st);
struct rgw_file_handle *rfh = rgw_fh->get_fh();
*fh = rfh;
return 0;
return rc;
}
/*

View File

@ -267,7 +267,7 @@ TEST(LibRGW, SETUP_DIRS1) {
if (! sf.fh) {
if (do_create) {
/* make a new file object */
/* make a new file object (the hard way) */
rc = rgw_lookup(fs, sf.parent_fh, sf.name.c_str(), &sf.fh,
RGW_LOOKUP_FLAG_CREATE);
ASSERT_EQ(rc, 0);
@ -301,6 +301,25 @@ TEST(LibRGW, SETUP_DIRS1) {
} /* dirs1 top-level !exist */
}
TEST(LibRGW, RGW_CREATE_DIRS1) {
/* verify rgw_create (create [empty] file objects the easy way) */
if (do_dirs1) {
int rc;
struct stat st;
for (auto& dirs_rec : dirs_vec) {
/* create 1 more file in each sdir */
obj_rec& dir = get<0>(dirs_rec);
std::string sfname{"sfile_" + to_string(n_dirs1_objs)};
obj_rec sf{sfname, nullptr, dir.fh, nullptr};
rc = rgw_create(fs, sf.parent_fh, sf.name.c_str(), 644, &st, &sf.fh,
RGW_CREATE_FLAG_NONE);
ASSERT_EQ(rc, 0);
sf.sync();
}
n_dirs1_objs++;
}
}
TEST(LibRGW, BAD_DELETES_DIRS1) {
if (do_dirs1) {
int rc;