rgw: implement the container creation in BulkUpload of Swift API.

Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
This commit is contained in:
Radoslaw Zarzynski 2016-11-24 13:10:56 +01:00
parent 6c1bb7a10e
commit e9afe38665
2 changed files with 221 additions and 0 deletions

View File

@ -5528,6 +5528,220 @@ void RGWBulkUploadOp::pre_exec()
rgw_bucket_object_pre_exec(s);
}
boost::optional<std::pair<std::string, rgw_obj_key>>
RGWBulkUploadOp::parse_path(const boost::string_ref& path)
{
/* We need to skip all slashes at the beginning in order to preserve
* compliance with Swift. */
const size_t start_pos = path.find_first_not_of('/');
if (boost::string_ref::npos != start_pos) {
/* Seperator is the first slash after the leading ones. */
const size_t sep_pos = path.substr(start_pos).find('/');
if (boost::string_ref::npos != sep_pos) {
const auto bucket_name = path.substr(start_pos, sep_pos - start_pos);
const auto obj_name = path.substr(sep_pos + 1);
return std::make_pair(bucket_name.to_string(),
rgw_obj_key(obj_name.to_string()));
} else {
/* It's guaranteed here that bucket name is at least one character
* long and is different than slash. */
return std::make_pair(path.substr(start_pos).to_string(),
rgw_obj_key());
}
}
return boost::none;
}
int RGWBulkUploadOp::handle_dir_verify_permission()
{
if (s->user->max_buckets > 0) {
RGWUserBuckets buckets;
std::string marker;
bool is_truncated;
op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets,
marker, std::string(), s->user->max_buckets,
false, &is_truncated);
if (op_ret < 0) {
return op_ret;
}
if (buckets.count() >= static_cast<size_t>(s->user->max_buckets)) {
return -ERR_TOO_MANY_BUCKETS;
}
}
return 0;
}
int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
{
ldout(s->cct, 20) << "bulk upload: got directory=" << path << dendl;
op_ret = handle_dir_verify_permission();
if (op_ret < 0) {
return op_ret;
}
std::string bucket_name;
rgw_obj_key object_junk;
std::tie(bucket_name, object_junk) = *parse_path(path);
rgw_raw_obj obj(store->get_zone_params().domain_root,
rgw_make_bucket_entry_name(s->bucket_tenant, bucket_name));
/* Swift API doesn't support location constraint. We're just checking here
* whether creation is taking place in the master zone or not. */
if (! store->get_zonegroup().is_master) {
ldout(s->cct, 0) << "creating bucket in a non-master zone." << dendl;
op_ret = -EINVAL;
return op_ret;
}
/* we need to make sure we read bucket info, it's not read before for this
* specific request */
RGWBucketInfo binfo;
std::map<std::string, ceph::bufferlist> battrs;
RGWObjectCtx obj_ctx(store); // = *static_cast<RGWObjectCtx *>(s->obj_ctx);
op_ret = store->get_bucket_info(obj_ctx, s->bucket_tenant, bucket_name,
binfo, NULL, &battrs);
if (op_ret < 0 && op_ret != -ENOENT) {
return op_ret;
}
const bool bucket_exists = (op_ret != -ENOENT);
if (bucket_exists) {
RGWAccessControlPolicy old_policy(s->cct);
int r = get_bucket_policy_from_attr(s->cct, store, binfo,
battrs, &old_policy);
if (r >= 0) {
if (old_policy.get_owner().get_id().compare(s->user->user_id) != 0) {
op_ret = -EEXIST;
return op_ret;
}
}
}
RGWBucketInfo master_info;
rgw_bucket *pmaster_bucket = nullptr;
real_time creation_time;
obj_version objv, ep_objv, *pobjv = nullptr;
if (! store->is_meta_master()) {
JSONParser jp;
ceph::bufferlist in_data;
op_ret = forward_request_to_master(s, nullptr, store, in_data, &jp);
if (op_ret < 0) {
return op_ret;
}
JSONDecoder::decode_json("entry_point_object_ver", ep_objv, &jp);
JSONDecoder::decode_json("object_ver", objv, &jp);
JSONDecoder::decode_json("bucket_info", master_info, &jp);
ldout(s->cct, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver="
<< objv.ver << dendl;
ldout(s->cct, 20) << "got creation_time="<< master_info.creation_time
<< dendl;
pmaster_bucket= &master_info.bucket;
creation_time = master_info.creation_time;
pobjv = &objv;
} else {
pmaster_bucket = nullptr;
}
std::string placement_rule;
if (bucket_exists) {
std::string selected_placement_rule;
rgw_bucket bucket;
bucket.tenant = s->bucket_tenant;
bucket.name = s->bucket_name;
op_ret = store->select_bucket_placement(*(s->user),
store->get_zonegroup().get_id(),
placement_rule,
bucket,
&selected_placement_rule,
nullptr);
if (selected_placement_rule != binfo.placement_rule) {
op_ret = -EEXIST;
ldout(s->cct, 20) << "bulk upload: non-coherent placement rule" << dendl;
return op_ret;
}
}
/* Create metadata: ACLs. */
std::map<std::string, ceph::bufferlist> attrs;
RGWAccessControlPolicy policy;
policy.create_default(s->user->user_id, s->user->display_name);
ceph::bufferlist aclbl;
policy.encode(aclbl);
attrs.emplace(RGW_ATTR_ACL, std::move(aclbl));
RGWQuotaInfo quota_info;
const RGWQuotaInfo * pquota_info = nullptr;
rgw_bucket bucket;
bucket.tenant = s->bucket_tenant; /* ignored if bucket exists */
bucket.name = bucket_name;
RGWBucketInfo out_info;
op_ret = store->create_bucket(*(s->user),
bucket,
store->get_zonegroup().get_id(),
placement_rule, binfo.swift_ver_location,
pquota_info, attrs,
out_info, pobjv, &ep_objv, creation_time,
pmaster_bucket, true);
/* continue if EEXIST and create_bucket will fail below. this way we can
* recover from a partial create by retrying it. */
ldout(s->cct, 20) << "rgw_create_bucket returned ret=" << op_ret
<< ", bucket=" << bucket << dendl;
if (op_ret && op_ret != -EEXIST) {
return op_ret;
}
const bool existed = (op_ret == -EEXIST);
if (existed) {
/* bucket already existed, might have raced with another bucket creation, or
* might be partial bucket creation that never completed. Read existing bucket
* info, verify that the reported bucket owner is the current user.
* If all is ok then update the user's list of buckets.
* Otherwise inform client about a name conflict.
*/
if (out_info.owner.compare(s->user->user_id) != 0) {
op_ret = -EEXIST;
ldout(s->cct, 20) << "bulk upload: conflicting bucket name" << dendl;
return op_ret;
}
bucket = out_info.bucket;
}
op_ret = rgw_link_bucket(store, s->user->user_id, bucket,
out_info.creation_time, false);
if (op_ret && !existed && op_ret != -EEXIST) {
/* if it exists (or previously existed), don't remove it! */
op_ret = rgw_unlink_bucket(store, s->user->user_id,
bucket.tenant, bucket.name);
if (op_ret < 0) {
ldout(s->cct, 0) << "bulk upload: WARNING: failed to unlink bucket: ret="
<< op_ret << dendl;
}
} else if (op_ret == -EEXIST || (op_ret == 0 && existed)) {
ldout(s->cct, 20) << "bulk upload: containers already exists"
<< dendl;
op_ret = -ERR_BUCKET_EXISTS;
}
return op_ret;
}
void RGWBulkUploadOp::execute()
{
ceph::bufferlist buffer(64 * 1024);
@ -5564,6 +5778,8 @@ void RGWBulkUploadOp::execute()
}
case rgw::tar::FileType::DIRECTORY: {
ldout(s->cct, 2) << "bulk upload: handling regular directory" << dendl;
op_ret = handle_dir(header->get_filename());
break;
}
default: {

View File

@ -395,6 +395,11 @@ protected:
virtual std::unique_ptr<StreamGetter> create_stream() = 0;
virtual void send_response() = 0;
boost::optional<std::pair<std::string, rgw_obj_key>>
parse_path(const boost::string_ref& path);
int handle_dir_verify_permission();
int handle_dir(boost::string_ref path);
public:
int verify_permission() override;
void pre_exec() override;