mirror of
https://github.com/ceph/ceph
synced 2025-01-29 14:34:40 +00:00
rgw: object write processors use tail placement rule
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
parent
8cb6de178e
commit
cbfb05660e
@ -3834,6 +3834,7 @@ void RGWPostObj::execute()
|
||||
ldpp_dout(this, 15) << "supplied_md5=" << supplied_md5 << dendl;
|
||||
}
|
||||
|
||||
|
||||
rgw_obj obj(s->bucket, get_current_filename());
|
||||
if (s->bucket_info.versioning_enabled()) {
|
||||
store->gen_rand_obj_instance_name(&obj);
|
||||
@ -3842,6 +3843,7 @@ void RGWPostObj::execute()
|
||||
rgw::AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
|
||||
using namespace rgw::putobj;
|
||||
AtomicObjectProcessor processor(&aio, store, s->bucket_info,
|
||||
nullptr,
|
||||
s->bucket_owner.get_id(),
|
||||
*static_cast<RGWObjectCtx*>(s->obj_ctx),
|
||||
obj, 0, s->req_id);
|
||||
@ -4707,6 +4709,7 @@ void RGWCopyObj::execute()
|
||||
src_obj,
|
||||
dest_bucket_info,
|
||||
src_bucket_info,
|
||||
nullptr, /* dest placement rule */
|
||||
&src_mtime,
|
||||
&mtime,
|
||||
mod_ptr,
|
||||
@ -6569,7 +6572,7 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
|
||||
rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
|
||||
using namespace rgw::putobj;
|
||||
|
||||
AtomicObjectProcessor processor(&aio, store, binfo, bowner.get_id(),
|
||||
AtomicObjectProcessor processor(&aio, store, binfo, nullptr, bowner.get_id(),
|
||||
obj_ctx, obj, 0, s->req_id);
|
||||
|
||||
op_ret = processor.prepare();
|
||||
|
@ -211,6 +211,7 @@ int AtomicObjectProcessor::prepare()
|
||||
|
||||
r = manifest_gen.create_begin(store->ctx(), &manifest,
|
||||
bucket_info.placement_rule,
|
||||
ptail_placement_rule,
|
||||
head_obj.bucket, head_obj);
|
||||
if (r < 0) {
|
||||
return r;
|
||||
@ -330,6 +331,7 @@ int MultipartObjectProcessor::prepare_head()
|
||||
{
|
||||
int r = manifest_gen.create_begin(store->ctx(), &manifest,
|
||||
bucket_info.placement_rule,
|
||||
ptail_placement_rule,
|
||||
target_obj.bucket, target_obj);
|
||||
if (r < 0) {
|
||||
return r;
|
||||
|
@ -114,6 +114,7 @@ class ManifestObjectProcessor : public HeadObjectProcessor,
|
||||
protected:
|
||||
RGWRados *const store;
|
||||
const RGWBucketInfo& bucket_info;
|
||||
const string *ptail_placement_rule;
|
||||
const rgw_user& owner;
|
||||
RGWObjectCtx& obj_ctx;
|
||||
rgw_obj head_obj;
|
||||
@ -130,10 +131,12 @@ class ManifestObjectProcessor : public HeadObjectProcessor,
|
||||
public:
|
||||
ManifestObjectProcessor(Aio *aio, RGWRados *store,
|
||||
const RGWBucketInfo& bucket_info,
|
||||
const string *ptail_placement_rule,
|
||||
const rgw_user& owner, RGWObjectCtx& obj_ctx,
|
||||
const rgw_obj& head_obj)
|
||||
: HeadObjectProcessor(0),
|
||||
store(store), bucket_info(bucket_info), owner(owner),
|
||||
store(store), bucket_info(bucket_info),
|
||||
ptail_placement_rule(ptail_placement_rule), owner(owner),
|
||||
obj_ctx(obj_ctx), head_obj(head_obj),
|
||||
writer(aio, store, bucket_info, obj_ctx, head_obj),
|
||||
chunk(&writer, 0), stripe(&chunk, this, 0)
|
||||
@ -151,11 +154,14 @@ class AtomicObjectProcessor : public ManifestObjectProcessor {
|
||||
int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
|
||||
public:
|
||||
AtomicObjectProcessor(Aio *aio, RGWRados *store,
|
||||
const RGWBucketInfo& bucket_info, const rgw_user& owner,
|
||||
const RGWBucketInfo& bucket_info,
|
||||
const string *ptail_placement_rule,
|
||||
const rgw_user& owner,
|
||||
RGWObjectCtx& obj_ctx, const rgw_obj& head_obj,
|
||||
std::optional<uint64_t> olh_epoch,
|
||||
const std::string& unique_tag)
|
||||
: ManifestObjectProcessor(aio, store, bucket_info, owner, obj_ctx, head_obj),
|
||||
: ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
|
||||
owner, obj_ctx, head_obj),
|
||||
olh_epoch(olh_epoch), unique_tag(unique_tag)
|
||||
{}
|
||||
|
||||
@ -190,11 +196,13 @@ class MultipartObjectProcessor : public ManifestObjectProcessor {
|
||||
public:
|
||||
MultipartObjectProcessor(Aio *aio, RGWRados *store,
|
||||
const RGWBucketInfo& bucket_info,
|
||||
const string *ptail_placement_rule,
|
||||
const rgw_user& owner, RGWObjectCtx& obj_ctx,
|
||||
const rgw_obj& head_obj,
|
||||
const std::string& upload_id, uint64_t part_num,
|
||||
const std::string& part_num_str)
|
||||
: ManifestObjectProcessor(aio, store, bucket_info, owner, obj_ctx, head_obj),
|
||||
: ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
|
||||
owner, obj_ctx, head_obj),
|
||||
target_obj(head_obj), upload_id(upload_id),
|
||||
part_num(part_num), part_num_str(part_num_str),
|
||||
mp(head_obj.key.name, upload_id)
|
||||
|
@ -307,12 +307,19 @@ void RGWObjManifest::obj_iterator::operator++()
|
||||
update_location();
|
||||
}
|
||||
|
||||
int RGWObjManifest::generator::create_begin(CephContext *cct, RGWObjManifest *_m, const string& placement_rule, const rgw_bucket& _b, const rgw_obj& _obj)
|
||||
int RGWObjManifest::generator::create_begin(CephContext *cct, RGWObjManifest *_m,
|
||||
const string& head_placement_rule,
|
||||
const string *tail_placement_rule,
|
||||
const rgw_bucket& _b, const rgw_obj& _obj)
|
||||
{
|
||||
manifest = _m;
|
||||
|
||||
manifest->set_tail_placement(placement_rule, _b);
|
||||
manifest->set_head(placement_rule, _obj, 0);
|
||||
if (!tail_placement_rule) {
|
||||
tail_placement_rule = &head_placement_rule;
|
||||
}
|
||||
|
||||
manifest->set_tail_placement(*tail_placement_rule, _b);
|
||||
manifest->set_head(head_placement_rule, _obj, 0);
|
||||
last_ofs = 0;
|
||||
|
||||
if (manifest->get_prefix().empty()) {
|
||||
@ -3377,6 +3384,7 @@ int RGWRados::swift_versioning_copy(RGWObjectCtx& obj_ctx,
|
||||
obj,
|
||||
dest_bucket_info,
|
||||
bucket_info,
|
||||
nullptr, /* const string *tail_rule */
|
||||
NULL, /* time_t *src_mtime */
|
||||
NULL, /* time_t *mtime */
|
||||
NULL, /* const time_t *mod_ptr */
|
||||
@ -3468,6 +3476,7 @@ int RGWRados::swift_versioning_restore(RGWSysObjectCtx& sysobj_ctx,
|
||||
archive_obj, /* src obj */
|
||||
bucket_info, /* dest bucket info */
|
||||
archive_binfo, /* src bucket info */
|
||||
nullptr, /* const string *ptail_rule */
|
||||
nullptr, /* time_t *src_mtime */
|
||||
nullptr, /* time_t *mtime */
|
||||
nullptr, /* const time_t *mod_ptr */
|
||||
@ -3965,7 +3974,7 @@ int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, const rgw_obj& obj)
|
||||
attrset.erase(RGW_ATTR_ID_TAG);
|
||||
attrset.erase(RGW_ATTR_TAIL_TAG);
|
||||
|
||||
return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, NULL, mtime, attrset,
|
||||
return copy_obj_data(rctx, dest_bucket_info, nullptr, read_op, obj_size - 1, obj, NULL, mtime, attrset,
|
||||
0, real_time(), NULL);
|
||||
}
|
||||
|
||||
@ -4215,7 +4224,9 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
|
||||
|
||||
rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
|
||||
using namespace rgw::putobj;
|
||||
AtomicObjectProcessor processor(&aio, this, dest_bucket_info, user_id,
|
||||
string *ptail_rule{nullptr};
|
||||
#warning FIXME ptail_rule
|
||||
AtomicObjectProcessor processor(&aio, this, dest_bucket_info, ptail_rule, user_id,
|
||||
obj_ctx, dest_obj, olh_epoch, tag);
|
||||
int ret = processor.prepare();
|
||||
if (ret < 0) {
|
||||
@ -4470,6 +4481,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
|
||||
rgw_obj& src_obj,
|
||||
RGWBucketInfo& dest_bucket_info,
|
||||
RGWBucketInfo& src_bucket_info,
|
||||
const string *ptail_rule,
|
||||
real_time *src_mtime,
|
||||
real_time *mtime,
|
||||
const real_time *mod_ptr,
|
||||
@ -4582,11 +4594,27 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
|
||||
|
||||
rgw_pool src_pool;
|
||||
rgw_pool dest_pool;
|
||||
if (!get_obj_data_pool(src_bucket_info.placement_rule, src_obj, &src_pool)) {
|
||||
|
||||
const string *src_rule{nullptr};
|
||||
|
||||
if (astate->has_manifest) {
|
||||
src_rule = &astate->manifest.get_tail_placement().placement_rule;
|
||||
}
|
||||
|
||||
if (!src_rule || src_rule->empty()) {
|
||||
src_rule = &src_bucket_info.placement_rule;
|
||||
}
|
||||
|
||||
if (!ptail_rule) {
|
||||
ptail_rule = &dest_bucket_info.placement_rule;
|
||||
}
|
||||
|
||||
if (!get_obj_data_pool(*src_rule, src_obj, &src_pool)) {
|
||||
ldout(cct, 0) << "ERROR: failed to locate data pool for " << src_obj << dendl;
|
||||
return -EIO;
|
||||
}
|
||||
if (!get_obj_data_pool(dest_bucket_info.placement_rule, dest_obj, &dest_pool)) {
|
||||
|
||||
if (!get_obj_data_pool(*ptail_rule, dest_obj, &dest_pool)) {
|
||||
ldout(cct, 0) << "ERROR: failed to locate data pool for " << dest_obj << dendl;
|
||||
return -EIO;
|
||||
}
|
||||
@ -4619,7 +4647,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
|
||||
|
||||
if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
|
||||
attrs.erase(RGW_ATTR_TAIL_TAG);
|
||||
return copy_obj_data(obj_ctx, dest_bucket_info, read_op, obj_size - 1, dest_obj,
|
||||
return copy_obj_data(obj_ctx, dest_bucket_info, ptail_rule, read_op, obj_size - 1, dest_obj,
|
||||
mtime, real_time(), attrs, olh_epoch, delete_at, petag);
|
||||
}
|
||||
|
||||
@ -4736,6 +4764,7 @@ done_ret:
|
||||
|
||||
int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
|
||||
RGWBucketInfo& dest_bucket_info,
|
||||
const string *ptail_rule,
|
||||
RGWRados::Object::Read& read_op, off_t end,
|
||||
const rgw_obj& dest_obj,
|
||||
real_time *mtime,
|
||||
@ -4750,7 +4779,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
|
||||
|
||||
rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
|
||||
using namespace rgw::putobj;
|
||||
AtomicObjectProcessor processor(&aio, this, dest_bucket_info,
|
||||
AtomicObjectProcessor processor(&aio, this, dest_bucket_info, ptail_rule,
|
||||
dest_bucket_info.owner, obj_ctx,
|
||||
dest_obj, olh_epoch, tag);
|
||||
int ret = processor.prepare();
|
||||
|
@ -807,7 +807,11 @@ public:
|
||||
public:
|
||||
generator() : manifest(NULL), last_ofs(0), cur_part_ofs(0), cur_part_id(0),
|
||||
cur_stripe(0), cur_stripe_size(0) {}
|
||||
int create_begin(CephContext *cct, RGWObjManifest *manifest, const string& placement_rule, const rgw_bucket& bucket, const rgw_obj& obj);
|
||||
int create_begin(CephContext *cct, RGWObjManifest *manifest,
|
||||
const string& head_placement_rule,
|
||||
const string *tail_placement_rule,
|
||||
const rgw_bucket& bucket,
|
||||
const rgw_obj& obj);
|
||||
|
||||
int create_next(uint64_t ofs);
|
||||
|
||||
@ -1934,6 +1938,7 @@ public:
|
||||
rgw_obj& src_obj,
|
||||
RGWBucketInfo& dest_bucket_info,
|
||||
RGWBucketInfo& src_bucket_info,
|
||||
const string *ptail_rule,
|
||||
ceph::real_time *src_mtime,
|
||||
ceph::real_time *mtime,
|
||||
const ceph::real_time *mod_ptr,
|
||||
@ -1955,6 +1960,7 @@ public:
|
||||
|
||||
int copy_obj_data(RGWObjectCtx& obj_ctx,
|
||||
RGWBucketInfo& dest_bucket_info,
|
||||
const string *ptail_rule,
|
||||
RGWRados::Object::Read& read_op, off_t end,
|
||||
const rgw_obj& dest_obj,
|
||||
ceph::real_time *mtime,
|
||||
|
Loading…
Reference in New Issue
Block a user