Zipper - Assorted cleanups

- Move cluste stat into public header, allowing more zipper cleanup
- Swift versioning
- Implement a MPSerializer for Zipper.
- Add Lifecycle APIs to Zipper.

Signed-off-by: Daniel Gryniewicz <dang@redhat.com>
This commit is contained in:
Daniel Gryniewicz 2020-09-15 07:59:39 -04:00
parent 0905425cf1
commit c590759da0
13 changed files with 578 additions and 276 deletions

View File

@ -7224,7 +7224,7 @@ next:
if (opt_cmd == OPT::LC_LIST) {
formatter->open_array_section("lifecycle_list");
vector<cls_rgw_lc_entry> bucket_lc_map;
vector<rgw::sal::Lifecycle::LCEntry> bucket_lc_map;
string marker;
int index{0};
#define MAX_LC_LIST_ENTRIES 100

View File

@ -1819,7 +1819,8 @@ static int fix_single_bucket_lc(rgw::sal::RGWRadosStore *store,
return ret;
}
return rgw::lc::fix_lc_shard_entry(store, bucket_info, bucket_attrs);
return rgw::lc::fix_lc_shard_entry(store, store->get_rgwlc()->get_lc(), bucket_info,
bucket_attrs);
}
static void format_lc_status(Formatter* formatter,

View File

@ -18,7 +18,6 @@
#include "common/containers.h"
#include <common/errno.h>
#include "include/random.h"
#include "cls/rgw/cls_rgw_client.h"
#include "cls/lock/cls_lock_client.h"
#include "rgw_perf_counters.h"
#include "rgw_common.h"
@ -243,6 +242,7 @@ void *RGWLC::LCWorker::entry() {
void RGWLC::initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store) {
cct = _cct;
store = _store;
sal_lc = std::move(store->get_lifecycle());
max_objs = cct->_conf->rgw_lc_max_objs;
if (max_objs > HASH_PRIME)
max_objs = HASH_PRIME;
@ -291,7 +291,7 @@ bool RGWLC::if_already_run_today(time_t start_date)
return false;
}
static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent) {
static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) {
os << "<ent: bucket=";
os << ent.bucket;
os << "; start_time=";
@ -304,7 +304,7 @@ static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent)
int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
{
vector<cls_rgw_lc_entry> entries;
vector<rgw::sal::Lifecycle::LCEntry> entries;
string marker;
dout(5) << "RGWLC::bucket_lc_prepare(): PREPARE "
@ -313,16 +313,14 @@ int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
#define MAX_LC_LIST_ENTRIES 100
do {
int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index],
marker, MAX_LC_LIST_ENTRIES, entries);
int ret = sal_lc->list_entries(obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
if (ret < 0)
return ret;
for (auto& entry : entries) {
entry.start_time = ceph_clock_now();
entry.status = lc_uninitial; // lc_uninitial? really?
ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
obj_names[index], entry);
ret = sal_lc->set_entry(obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0)
<< "RGWLC::bucket_lc_prepare() failed to set entry on "
@ -370,17 +368,13 @@ static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days,
return (timediff >= cmp);
}
static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
rgw_obj& obj, RGWObjectCtx& ctx)
static bool pass_object_lock_check(rgw::sal::RGWStore* store, rgw::sal::RGWObject* obj, RGWObjectCtx& ctx)
{
if (!bucket_info.obj_lock_enabled()) {
if (!obj->get_bucket()->get_info().obj_lock_enabled()) {
return true;
}
RGWRados::Object op_target(store, bucket_info, ctx, obj);
RGWRados::Object::Read read_op(&op_target);
map<string, bufferlist> attrs;
read_op.params.attrs = &attrs;
int ret = read_op.prepare(null_yield);
std::unique_ptr<rgw::sal::RGWObject::ReadOp> read_op = obj->get_read_op(&ctx);
int ret = read_op->prepare(null_yield);
if (ret < 0) {
if (ret == -ENOENT) {
return true;
@ -388,8 +382,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
return false;
}
} else {
auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION);
if (iter != attrs.end()) {
auto iter = obj->get_attrs().find(RGW_ATTR_OBJECT_RETENTION);
if (iter != obj->get_attrs().end()) {
RGWObjectRetention retention;
try {
decode(retention, iter->second);
@ -403,8 +397,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
return false;
}
}
iter = attrs.find(RGW_ATTR_OBJECT_LEGAL_HOLD);
if (iter != attrs.end()) {
iter = obj->get_attrs().find(RGW_ATTR_OBJECT_LEGAL_HOLD);
if (iter != obj->get_attrs().end()) {
RGWObjectLegalHold obj_legal_hold;
try {
decode(obj_legal_hold, iter->second);
@ -422,30 +416,26 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
}
class LCObjsLister {
rgw::sal::RGWRadosStore *store;
RGWBucketInfo& bucket_info;
RGWRados::Bucket target;
RGWRados::Bucket::List list_op;
bool is_truncated{false};
rgw_obj_key next_marker;
rgw::sal::RGWStore *store;
rgw::sal::RGWBucket* bucket;
rgw::sal::RGWBucket::ListParams list_params;
rgw::sal::RGWBucket::ListResults list_results;
string prefix;
vector<rgw_bucket_dir_entry> objs;
vector<rgw_bucket_dir_entry>::iterator obj_iter;
rgw_bucket_dir_entry pre_obj;
int64_t delay_ms;
public:
LCObjsLister(rgw::sal::RGWRadosStore *_store, RGWBucketInfo& _bucket_info) :
store(_store), bucket_info(_bucket_info),
target(store->getRados(), bucket_info), list_op(&target) {
list_op.params.list_versions = bucket_info.versioned();
list_op.params.allow_unordered = true;
LCObjsLister(rgw::sal::RGWStore *_store, rgw::sal::RGWBucket* _bucket) :
store(_store), bucket(_bucket) {
list_params.list_versions = bucket->versioned();
list_params.allow_unordered = true;
delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
}
void set_prefix(const string& p) {
prefix = p;
list_op.params.prefix = prefix;
list_params.prefix = prefix;
}
int init() {
@ -453,13 +443,12 @@ public:
}
int fetch() {
int ret = list_op.list_objects(
1000, &objs, NULL, &is_truncated, null_yield);
int ret = bucket->list(list_params, 1000, list_results, null_yield);
if (ret < 0) {
return ret;
}
obj_iter = objs.begin();
obj_iter = list_results.objs.begin();
return 0;
}
@ -471,13 +460,13 @@ public:
bool get_obj(rgw_bucket_dir_entry **obj,
std::function<void(void)> fetch_barrier
= []() { /* nada */}) {
if (obj_iter == objs.end()) {
if (!is_truncated) {
if (obj_iter == list_results.objs.end()) {
if (!list_results.is_truncated) {
delay();
return false;
} else {
fetch_barrier();
list_op.params.marker = pre_obj.key;
list_params.marker = pre_obj.key;
int ret = fetch();
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret
@ -489,7 +478,7 @@ public:
}
/* returning address of entry in objs */
*obj = &(*obj_iter);
return obj_iter != objs.end();
return obj_iter != list_results.objs.end();
}
rgw_bucket_dir_entry get_prev_obj() {
@ -502,8 +491,8 @@ public:
}
boost::optional<std::string> next_key_name() {
if (obj_iter == objs.end() ||
(obj_iter + 1) == objs.end()) {
if (obj_iter == list_results.objs.end() ||
(obj_iter + 1) == list_results.objs.end()) {
/* this should have been called after get_obj() was called, so this should
* only happen if is_truncated is false */
return boost::none;
@ -521,12 +510,12 @@ struct op_env {
lc_op op;
rgw::sal::RGWRadosStore *store;
LCWorker* worker;
RGWBucketInfo& bucket_info;
rgw::sal::RGWBucket* bucket;
LCObjsLister& ol;
op_env(lc_op& _op, rgw::sal::RGWRadosStore *_store, LCWorker* _worker,
RGWBucketInfo& _bucket_info, LCObjsLister& _ol)
: op(_op), store(_store), worker(_worker), bucket_info(_bucket_info),
rgw::sal::RGWBucket* _bucket, LCObjsLister& _ol)
: op(_op), store(_store), worker(_worker), bucket(_bucket),
ol(_ol) {}
}; /* op_env */
@ -541,11 +530,11 @@ struct lc_op_ctx {
ceph::real_time effective_mtime;
rgw::sal::RGWRadosStore *store;
RGWBucketInfo& bucket_info;
rgw::sal::RGWBucket* bucket;
lc_op& op; // ok--refers to expanded env.op
LCObjsLister& ol;
rgw_obj obj;
std::unique_ptr<rgw::sal::RGWObject> obj;
RGWObjectCtx rctx;
const DoutPrefixProvider *dpp;
WorkQ* wq;
@ -556,9 +545,11 @@ struct lc_op_ctx {
const DoutPrefixProvider *dpp, WorkQ* wq)
: cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name),
effective_mtime(effective_mtime),
store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq)
{}
store(env.store), bucket(env.bucket), op(env.op), ol(env.ol),
rctx(env.store), dpp(dpp), wq(wq)
{
obj = bucket->get_object(o.key);
}
bool next_has_same_name(const std::string& key_name) {
return (next_key_name && key_name.compare(
@ -570,10 +561,12 @@ struct lc_op_ctx {
static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
{
auto& store = oc.store;
auto& bucket_info = oc.bucket_info;
auto& bucket_info = oc.bucket->get_info();
auto& o = oc.o;
auto obj_key = o.key;
auto& meta = o.meta;
int ret;
std::string version_id;
if (!remove_indeed) {
obj_key.instance.clear();
@ -581,20 +574,24 @@ static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
obj_key.instance = "null";
}
rgw_obj obj(bucket_info.bucket, obj_key);
std::unique_ptr<rgw::sal::RGWBucket> bucket;
std::unique_ptr<rgw::sal::RGWObject> obj;
ret = store->get_bucket(nullptr, bucket_info, &bucket);
if (ret < 0) {
return ret;
}
obj = bucket->get_object(obj_key);
ACLOwner obj_owner;
obj_owner.set_id(rgw_user {meta.owner});
obj_owner.set_name(meta.owner_display_name);
ACLOwner bucket_owner;
bucket_owner.set_id(bucket_info.owner);
RGWRados::Object del_target(store->getRados(), bucket_info, oc.rctx, obj);
RGWRados::Object::Delete del_op(&del_target);
del_op.params.bucket_owner = bucket_info.owner;
del_op.params.versioning_status = bucket_info.versioning_status();
del_op.params.obj_owner = obj_owner;
del_op.params.unmod_since = meta.mtime;
return del_op.delete_obj(null_yield);
return obj->delete_object(&oc.rctx, obj_owner, bucket_owner, meta.mtime, false, 0,
version_id, null_yield);
} /* remove_expired_obj */
class LCOpAction {
@ -822,24 +819,23 @@ static inline bool worker_should_stop(time_t stop_at, bool once)
return !once && stop_at < time(nullptr);
}
int RGWLC::handle_multipart_expiration(
RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map,
LCWorker* worker, time_t stop_at, bool once)
int RGWLC::handle_multipart_expiration(rgw::sal::RGWBucket* target,
const multimap<string, lc_op>& prefix_map,
LCWorker* worker, time_t stop_at, bool once)
{
MultipartMetaFilter mp_filter;
vector<rgw_bucket_dir_entry> objs;
bool is_truncated;
int ret;
RGWBucketInfo& bucket_info = target->get_bucket_info();
RGWRados::Bucket::List list_op(target);
rgw::sal::RGWBucket::ListParams params;
rgw::sal::RGWBucket::ListResults results;
auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
list_op.params.list_versions = false;
params.list_versions = false;
/* lifecycle processing does not depend on total order, so can
* take advantage of unordered listing optimizations--such as
* operating on one shard at a time */
list_op.params.allow_unordered = true;
list_op.params.ns = RGW_OBJ_NS_MULTIPART;
list_op.params.filter = &mp_filter;
params.allow_unordered = true;
params.ns = RGW_OBJ_NS_MULTIPART;
params.filter = &mp_filter;
auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
@ -851,7 +847,7 @@ int RGWLC::handle_multipart_expiration(
return;
}
RGWObjectCtx rctx(store);
int ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
int ret = abort_multipart_upload(store, cct, &rctx, target->get_info(), mp_obj);
if (ret == 0) {
if (perfcounter) {
perfcounter->inc(l_rgw_lc_abort_mpu, 1);
@ -889,11 +885,10 @@ int RGWLC::handle_multipart_expiration(
if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
continue;
}
list_op.params.prefix = prefix_iter->first;
params.prefix = prefix_iter->first;
do {
objs.clear();
list_op.params.marker = list_op.get_next_marker();
ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
ret = target->list(params, 1000, results, null_yield);
if (ret < 0) {
if (ret == (-ENOENT))
return 0;
@ -911,20 +906,18 @@ int RGWLC::handle_multipart_expiration(
} /* for objs */
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
} while(is_truncated);
} while(results.is_truncated);
} /* for prefix_map */
worker->workpool->drain();
return 0;
}
static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info,
rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
static int read_obj_tags(rgw::sal::RGWObject* obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
{
RGWRados::Object op_target(store, bucket_info, ctx, obj);
RGWRados::Object::Read read_op(&op_target);
std::unique_ptr<rgw::sal::RGWObject::ReadOp> rop = obj->get_read_op(&ctx);
return read_op.get_attr(RGW_ATTR_TAGS, tags_bl, null_yield);
return rop->get_attr(RGW_ATTR_TAGS, tags_bl, null_yield);
}
static bool is_valid_op(const lc_op& op)
@ -968,8 +961,7 @@ static int check_tags(lc_op_ctx& oc, bool *skip)
*skip = true;
bufferlist tags_bl;
int ret = read_obj_tags(oc.store->getRados(), oc.bucket_info, oc.obj,
oc.rctx, tags_bl);
int ret = read_obj_tags(oc.obj.get(), oc.rctx, tags_bl);
if (ret < 0) {
if (ret != -ENODATA) {
ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r="
@ -1084,20 +1076,20 @@ public:
r = remove_expired_obj(oc, true);
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: current is-dm remove_expired_obj "
<< oc.bucket_info.bucket << ":" << o.key
<< oc.bucket << ":" << o.key
<< " " << cpp_strerror(r) << " "
<< oc.wq->thr_name() << dendl;
return r;
}
ldout(oc.cct, 2) << "DELETED: current is-dm "
<< oc.bucket_info.bucket << ":" << o.key
<< oc.bucket << ":" << o.key
<< " " << oc.wq->thr_name() << dendl;
} else {
/* ! o.is_delete_marker() */
r = remove_expired_obj(oc, !oc.bucket_info.versioned());
r = remove_expired_obj(oc, !oc.bucket->versioned());
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
<< oc.bucket_info.bucket << ":" << o.key
<< oc.bucket << ":" << o.key
<< " " << cpp_strerror(r) << " "
<< oc.wq->thr_name() << dendl;
return r;
@ -1105,7 +1097,7 @@ public:
if (perfcounter) {
perfcounter->inc(l_rgw_lc_expire_current, 1);
}
ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
ldout(oc.cct, 2) << "DELETED:" << oc.bucket << ":" << o.key
<< " " << oc.wq->thr_name() << dendl;
}
return 0;
@ -1136,8 +1128,7 @@ public:
<< oc.wq->thr_name() << dendl;
return is_expired &&
pass_object_lock_check(oc.store->getRados(),
oc.bucket_info, oc.obj, oc.rctx);
pass_object_lock_check(oc.store, oc.obj.get(), oc.rctx);
}
int process(lc_op_ctx& oc) {
@ -1145,7 +1136,7 @@ public:
int r = remove_expired_obj(oc, true);
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) "
<< oc.bucket_info.bucket << ":" << o.key
<< oc.bucket << ":" << o.key
<< " " << cpp_strerror(r)
<< " " << oc.wq->thr_name() << dendl;
return r;
@ -1153,7 +1144,7 @@ public:
if (perfcounter) {
perfcounter->inc(l_rgw_lc_expire_noncurrent, 1);
}
ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
ldout(oc.cct, 2) << "DELETED:" << oc.bucket << ":" << o.key
<< " (non-current expiration) "
<< oc.wq->thr_name() << dendl;
return 0;
@ -1189,7 +1180,7 @@ public:
int r = remove_expired_obj(oc, true);
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
<< oc.bucket_info.bucket << ":" << o.key
<< oc.bucket << ":" << o.key
<< " " << cpp_strerror(r)
<< " " << oc.wq->thr_name()
<< dendl;
@ -1198,7 +1189,7 @@ public:
if (perfcounter) {
perfcounter->inc(l_rgw_lc_expire_dm, 1);
}
ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
ldout(oc.cct, 2) << "DELETED:" << oc.bucket << ":" << o.key
<< " (delete marker expiration) "
<< oc.wq->thr_name() << dendl;
return 0;
@ -1262,33 +1253,30 @@ public:
auto& o = oc.o;
rgw_placement_rule target_placement;
target_placement.inherit_from(oc.bucket_info.placement_rule);
target_placement.inherit_from(oc.bucket->get_placement_rule());
target_placement.storage_class = transition.storage_class;
if (!oc.store->svc()->zone->get_zone_params().
valid_placement(target_placement)) {
ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
<< target_placement
<< " bucket="<< oc.bucket_info.bucket
<< " bucket="<< oc.bucket
<< " rule_id=" << oc.op.id
<< " " << oc.wq->thr_name() << dendl;
return -EINVAL;
}
rgw::sal::RGWRadosBucket bucket(oc.store, oc.bucket_info);
rgw::sal::RGWRadosObject obj(oc.store, oc.obj.key, &bucket);
int r = oc.store->getRados()->transition_obj(
oc.rctx, &bucket, obj, target_placement, o.meta.mtime,
o.versioned_epoch, oc.dpp, null_yield);
int r = oc.obj->transition(oc.rctx, oc.bucket, target_placement, o.meta.mtime,
o.versioned_epoch, oc.dpp, null_yield);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj "
<< oc.bucket_info.bucket << ":" << o.key
<< oc.bucket << ":" << o.key
<< " -> " << transition.storage_class
<< " " << cpp_strerror(r)
<< " " << oc.wq->thr_name() << dendl;
return r;
}
ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket
ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket
<< ":" << o.key << " -> "
<< transition.storage_class
<< " " << oc.wq->thr_name() << dendl;
@ -1427,12 +1415,12 @@ int LCOpRule::process(rgw_bucket_dir_entry& o,
int r = (*selected)->process(ctx);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
<< env.bucket_info.bucket << ":" << o.key
<< env.bucket << ":" << o.key
<< " " << cpp_strerror(r)
<< " " << wq->thr_name() << dendl;
return r;
}
ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":"
ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":"
<< o.key << " " << wq->thr_name() << dendl;
}
@ -1444,8 +1432,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
time_t stop_at, bool once)
{
RGWLifecycleConfiguration config(cct);
RGWBucketInfo bucket_info;
map<string, bufferlist> bucket_attrs;
std::unique_ptr<rgw::sal::RGWBucket> bucket;
string no_ns, list_versions;
vector<rgw_bucket_dir_entry> objs;
vector<std::string> result;
@ -1453,9 +1440,14 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
string bucket_tenant = result[0];
string bucket_name = result[1];
string bucket_marker = result[2];
int ret = store->getRados()->get_bucket_info(
store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield,
&bucket_attrs);
int ret = store->get_bucket(nullptr, bucket_tenant, bucket_name, &bucket, null_yield);
if (ret < 0) {
ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name
<< " failed" << dendl;
return ret;
}
ret = bucket->get_bucket_info(null_yield);
if (ret < 0) {
ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name
<< " failed" << dendl;
@ -1469,18 +1461,16 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
}
);
if (bucket_info.bucket.marker != bucket_marker) {
if (bucket->get_marker() != bucket_marker) {
ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
<< bucket_tenant << ":" << bucket_name
<< " cur_marker=" << bucket_info.bucket.marker
<< " cur_marker=" << bucket->get_marker()
<< " orig_marker=" << bucket_marker << dendl;
return -ENOENT;
}
RGWRados::Bucket target(store->getRados(), bucket_info);
map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
if (aiter == bucket_attrs.end())
map<string, bufferlist>::iterator aiter = bucket->get_attrs().find(RGW_ATTR_LC);
if (aiter == bucket->get_attrs().end())
return 0;
bufferlist::const_iterator iter{&aiter->second};
@ -1541,7 +1531,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
pre_marker = next_marker;
}
LCObjsLister ol(store, bucket_info);
LCObjsLister ol(store, bucket.get());
ol.set_prefix(prefix_iter->first);
ret = ol.init();
@ -1552,7 +1542,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
return ret;
}
op_env oenv(op, store, worker, bucket_info, ol);
op_env oenv(op, store, worker, bucket.get(), ol);
LCOpRule orule(oenv);
orule.build(); // why can't ctor do it?
rgw_bucket_dir_entry* o{nullptr};
@ -1564,27 +1554,26 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
worker->workpool->drain();
}
ret = handle_multipart_expiration(&target, prefix_map, worker, stop_at, once);
ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once);
return ret;
}
int RGWLC::bucket_lc_post(int index, int max_lock_sec,
cls_rgw_lc_entry& entry, int& result,
rgw::sal::Lifecycle::LCEntry& entry, int& result,
LCWorker* worker)
{
utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
rados::cls::lock::Lock l(lc_index_lock_name);
l.set_cookie(cookie);
l.set_duration(lock_duration);
rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
obj_names[index],
cookie);
dout(5) << "RGWLC::bucket_lc_post(): POST " << entry
<< " index: " << index << " worker ix: " << worker->ix
<< dendl;
do {
int ret = l.lock_exclusive(
&store->getRados()->lc_pool_ctx, obj_names[index]);
int ret = lock->try_lock(lock_duration, null_yield);
if (ret == -EBUSY || ret == -EEXIST) {
/* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
@ -1597,8 +1586,7 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
<< dendl;
if (result == -ENOENT) {
ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx,
obj_names[index], entry);
ret = sal_lc->rm_entry(obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
<< obj_names[index] << dendl;
@ -1610,14 +1598,14 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
entry.status = lc_complete;
}
ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
obj_names[index], entry);
ret = sal_lc->set_entry(obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
<< obj_names[index] << dendl;
}
clean:
l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
lock->unlock();
delete lock;
ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
<< obj_names[index] << dendl;
return 0;
@ -1625,15 +1613,13 @@ clean:
}
int RGWLC::list_lc_progress(string& marker, uint32_t max_entries,
vector<cls_rgw_lc_entry>& progress_map,
vector<rgw::sal::Lifecycle::LCEntry>& progress_map,
int& index)
{
progress_map.clear();
for(; index < max_objs; index++, marker="") {
vector<cls_rgw_lc_entry> entries;
int ret =
cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker,
max_entries, entries);
vector<rgw::sal::Lifecycle::LCEntry> entries;
int ret = sal_lc->list_entries(obj_names[index], marker, max_entries, entries);
if (ret < 0) {
if (ret == -ENOENT) {
ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
@ -1718,19 +1704,19 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
<< "index: " << index << " worker ix: " << worker->ix
<< dendl;
rados::cls::lock::Lock l(lc_index_lock_name);
rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
obj_names[index],
std::string());
do {
utime_t now = ceph_clock_now();
//string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
cls_rgw_lc_entry entry;
rgw::sal::Lifecycle::LCEntry entry;
if (max_lock_secs <= 0)
return -EAGAIN;
utime_t time(max_lock_secs, 0);
l.set_duration(time);
int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx,
obj_names[index]);
int ret = lock->try_lock(time, null_yield);
if (ret == -EBUSY || ret == -EEXIST) {
/* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
@ -1741,9 +1727,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
if (ret < 0)
return 0;
cls_rgw_lc_obj_head head;
ret = cls_rgw_lc_get_head(store->getRados()->lc_pool_ctx, obj_names[index],
head);
rgw::sal::Lifecycle::LCHead head;
ret = sal_lc->get_head(obj_names[index], head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
<< obj_names[index] << ", ret=" << ret << dendl;
@ -1751,8 +1736,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
}
if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) {
ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx,
obj_names[index], head.marker, entry);
ret = sal_lc->get_entry(obj_names[index], head.marker, entry);
if (ret >= 0) {
if (entry.status == lc_processing) {
if (expired_session(entry.start_time)) {
@ -1784,8 +1768,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
}
}
ret = cls_rgw_lc_get_next_entry(store->getRados()->lc_pool_ctx,
obj_names[index], head.marker, entry);
ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
<< obj_names[index] << dendl;
@ -1801,8 +1784,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
<< dendl;
entry.status = lc_processing;
ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
obj_names[index], entry);
ret = sal_lc->set_entry(obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
<< obj_names[index] << entry.bucket << entry.status << dendl;
@ -1810,8 +1792,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
}
head.marker = entry.bucket;
ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx,
obj_names[index], head);
ret = sal_lc->put_head(obj_names[index], head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< obj_names[index]
@ -1823,7 +1804,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
<< " index: " << index << " worker ix: " << worker->ix
<< dendl;
l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
lock->unlock();
delete lock;
ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
bucket_lc_post(index, max_lock_secs, entry, ret, worker);
} while(1 && !once);
@ -1831,7 +1813,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
return 0;
exit:
l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
lock->unlock();
delete lock;
return 0;
}
@ -1967,6 +1950,7 @@ static std::string get_lc_shard_name(const rgw_bucket& bucket){
template<typename F>
static int guard_lc_modify(rgw::sal::RGWRadosStore* store,
rgw::sal::Lifecycle* sal_lc,
const rgw_bucket& bucket, const string& cookie,
const F& f) {
CephContext *cct = store->ctx();
@ -1977,21 +1961,20 @@ static int guard_lc_modify(rgw::sal::RGWRadosStore* store,
get_lc_oid(cct, shard_id, &oid);
/* XXX it makes sense to take shard_id for a bucket_id? */
cls_rgw_lc_entry entry;
rgw::sal::Lifecycle::LCEntry entry;
entry.bucket = shard_id;
entry.status = lc_uninitial;
int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
rados::cls::lock::Lock l(lc_index_lock_name);
rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
oid,
cookie);
utime_t time(max_lock_secs, 0);
l.set_duration(time);
l.set_cookie(cookie);
librados::IoCtx *ctx = store->getRados()->get_lc_pool_ctx();
int ret;
do {
ret = l.lock_exclusive(ctx, oid);
ret = lock->try_lock(time, null_yield);
if (ret == -EBUSY || ret == -EEXIST) {
ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
<< oid << ", sleep 5, try again" << dendl;
@ -2003,14 +1986,15 @@ static int guard_lc_modify(rgw::sal::RGWRadosStore* store,
<< oid << ", ret=" << ret << dendl;
break;
}
ret = f(ctx, oid, entry);
ret = f(sal_lc, oid, entry);
if (ret < 0) {
ldout(cct, 0) << "RGWLC::RGWPutLC() failed to set entry on "
<< oid << ", ret=" << ret << dendl;
}
break;
} while(true);
l.unlock(ctx, oid);
lock->unlock();
delete lock;
return ret;
}
@ -2033,10 +2017,10 @@ int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info,
rgw_bucket& bucket = bucket_info.bucket;
ret = guard_lc_modify(store, bucket, cookie,
[&](librados::IoCtx *ctx, const string& oid,
const cls_rgw_lc_entry& entry) {
return cls_rgw_lc_set_entry(*ctx, oid, entry);
ret = guard_lc_modify(store, sal_lc.get(), bucket, cookie,
[&](rgw::sal::Lifecycle* sal_lc, const string& oid,
const rgw::sal::Lifecycle::LCEntry& entry) {
return sal_lc->set_entry(oid, entry);
});
return ret;
@ -2060,10 +2044,10 @@ int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info,
}
ret = guard_lc_modify(store, bucket, cookie,
[&](librados::IoCtx *ctx, const string& oid,
const cls_rgw_lc_entry& entry) {
return cls_rgw_lc_rm_entry(*ctx, oid, entry);
ret = guard_lc_modify(store, sal_lc.get(), bucket, cookie,
[&](rgw::sal::Lifecycle* sal_lc, const string& oid,
const rgw::sal::Lifecycle::LCEntry& entry) {
return sal_lc->rm_entry(oid, entry);
});
return ret;
@ -2078,6 +2062,7 @@ RGWLC::~RGWLC()
namespace rgw::lc {
int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
rgw::sal::Lifecycle* sal_lc,
const RGWBucketInfo& bucket_info,
const map<std::string,bufferlist>& battrs)
{
@ -2090,20 +2075,18 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
std::string lc_oid;
get_lc_oid(store->ctx(), shard_name, &lc_oid);
cls_rgw_lc_entry entry;
rgw::sal::Lifecycle::LCEntry entry;
// There are multiple cases we need to encounter here
// 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
// 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
// 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
// We are not dropping the old marker here as that would be caught by the next LC process update
auto lc_pool_ctx = store->getRados()->get_lc_pool_ctx();
int ret = cls_rgw_lc_get_entry(*lc_pool_ctx,
lc_oid, shard_name, entry);
int ret = sal_lc->get_entry(lc_oid, shard_name, entry);
if (ret == 0) {
ldout(store->ctx(), 5) << "Entry already exists, nothing to do" << dendl;
return ret; // entry is already existing correctly set to marker
}
ldout(store->ctx(), 5) << "cls_rgw_lc_get_entry errored ret code=" << ret << dendl;
ldout(store->ctx(), 5) << "lc_get_entry errored ret code=" << ret << dendl;
if (ret == -ENOENT) {
ldout(store->ctx(), 1) << "No entry for bucket=" << bucket_info.bucket.name
<< " creating " << dendl;
@ -2113,11 +2096,11 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
std::string cookie = cookie_buf;
ret = guard_lc_modify(
store, bucket_info.bucket, cookie,
[&lc_pool_ctx, &lc_oid](librados::IoCtx* ctx,
store, sal_lc, bucket_info.bucket, cookie,
[&sal_lc, &lc_oid](rgw::sal::Lifecycle* slc,
const string& oid,
const cls_rgw_lc_entry& entry) {
return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry);
const rgw::sal::Lifecycle::LCEntry& entry) {
return slc->set_entry(lc_oid, entry);
});
}

View File

@ -462,6 +462,7 @@ WRITE_CLASS_ENCODER(RGWLifecycleConfiguration)
class RGWLC : public DoutPrefixProvider {
CephContext *cct;
rgw::sal::RGWRadosStore *store;
std::unique_ptr<rgw::sal::Lifecycle> sal_lc;
int max_objs{0};
string *obj_names{nullptr};
std::atomic<bool> down_flag = { false };
@ -516,12 +517,12 @@ public:
bool expired_session(time_t started);
time_t thread_stop_at();
int list_lc_progress(string& marker, uint32_t max_entries,
vector<cls_rgw_lc_entry>&, int& index);
vector<rgw::sal::Lifecycle::LCEntry>&, int& index);
int bucket_lc_prepare(int index, LCWorker* worker);
int bucket_lc_process(string& shard_id, LCWorker* worker, time_t stop_at,
bool once);
int bucket_lc_post(int index, int max_lock_sec,
cls_rgw_lc_entry& entry, int& result, LCWorker* worker);
rgw::sal::Lifecycle::LCEntry& entry, int& result, LCWorker* worker);
bool going_down();
void start_processor();
void stop_processor();
@ -532,19 +533,22 @@ public:
const map<string, bufferlist>& bucket_attrs);
CephContext *get_cct() const override { return cct; }
rgw::sal::Lifecycle *get_lc() const { return sal_lc.get(); }
unsigned get_subsys() const;
std::ostream& gen_prefix(std::ostream& out) const;
private:
int handle_multipart_expiration(RGWRados::Bucket *target,
int handle_multipart_expiration(rgw::sal::RGWBucket* target,
const multimap<string, lc_op>& prefix_map,
LCWorker* worker, time_t stop_at, bool once);
};
namespace rgw::lc {
int fix_lc_shard_entry(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info,
int fix_lc_shard_entry(rgw::sal::RGWRadosStore *store,
rgw::sal::Lifecycle* sal_lc,
const RGWBucketInfo& bucket_info,
const map<std::string,bufferlist>& battrs);
std::string s3_expiration_header(

View File

@ -313,7 +313,7 @@ void LCRule_S3::dump_xml(Formatter *f) const {
}
}
int RGWLifecycleConfiguration_S3::rebuild(RGWRados *store, RGWLifecycleConfiguration& dest)
int RGWLifecycleConfiguration_S3::rebuild(RGWLifecycleConfiguration& dest)
{
int ret = 0;
multimap<string, LCRule>::iterator iter;

View File

@ -95,7 +95,7 @@ public:
RGWLifecycleConfiguration_S3() : RGWLifecycleConfiguration(nullptr) {}
void decode_xml(XMLObj *obj);
int rebuild(RGWRados *store, RGWLifecycleConfiguration& dest);
int rebuild(RGWLifecycleConfiguration& dest);
void dump_xml(Formatter *f) const;
};

View File

@ -3371,7 +3371,6 @@ void RGWDeleteBucket::execute(optional_yield y)
int RGWPutObj::init_processing(optional_yield y) {
copy_source = url_decode(s->info.env->get("HTTP_X_AMZ_COPY_SOURCE", ""));
copy_source_range = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_RANGE");
map<string, bufferlist> src_attrs;
size_t pos;
int ret;
@ -3413,15 +3412,20 @@ int RGWPutObj::init_processing(optional_yield y) {
return ret;
}
}
ret = store->getRados()->get_bucket_info(store->svc(),
copy_source_tenant_name,
copy_source_bucket_name,
copy_source_bucket_info,
NULL, s->yield, &src_attrs);
std::unique_ptr<rgw::sal::RGWBucket> bucket;
ret = store->get_bucket(s->user.get(), copy_source_tenant_name, copy_source_bucket_name,
&bucket, s->yield);
if (ret < 0) {
ldpp_dout(this, 5) << __func__ << "(): get_bucket() returned ret=" << ret << dendl;
return ret;
}
ret = bucket->get_bucket_info(s->yield);
if (ret < 0) {
ldpp_dout(this, 5) << __func__ << "(): get_bucket_info() returned ret=" << ret << dendl;
return ret;
}
copy_source_bucket_info = bucket->get_info();
/* handle x-amz-copy-source-range */
if (copy_source_range) {
@ -3790,12 +3794,7 @@ void RGWPutObj::execute(optional_yield y)
/* Handle object versioning of Swift API. */
if (! multipart) {
op_ret = store->getRados()->swift_versioning_copy(obj_ctx,
s->bucket_owner.get_id(),
s->bucket.get(),
s->object.get(),
this,
s->yield);
op_ret = s->object->swift_versioning_copy(s->obj_ctx, this, s->yield);
if (op_ret < 0) {
return;
}
@ -4836,10 +4835,7 @@ void RGWDeleteObj::execute(optional_yield y)
s->object->set_atomic(s->obj_ctx);
bool ver_restored = false;
op_ret = store->getRados()->swift_versioning_restore(*obj_ctx, s->bucket_owner.get_id(),
s->bucket.get(),
s->object.get(),
ver_restored, this);
op_ret = s->object->swift_versioning_restore(s->obj_ctx, ver_restored, this);
if (op_ret < 0) {
return;
}
@ -5157,15 +5153,14 @@ void RGWCopyObj::execute(optional_yield y)
return;
}
RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
if ( ! version_id.empty()) {
dest_object->set_instance(version_id);
} else if (dest_bucket->versioning_enabled()) {
dest_object->gen_rand_obj_instance_name();
}
src_object->set_atomic(&obj_ctx);
dest_object->set_atomic(&obj_ctx);
src_object->set_atomic(s->obj_ctx);
dest_object->set_atomic(s->obj_ctx);
encode_delete_at_attr(delete_at, attrs);
@ -5189,16 +5184,12 @@ void RGWCopyObj::execute(optional_yield y)
/* Handle object versioning of Swift API. In case of copying to remote this
* should fail gently (op_ret == 0) as the dst_obj will not exist here. */
op_ret = store->getRados()->swift_versioning_copy(obj_ctx,
dest_bucket->get_info().owner,
dest_bucket.get(),
dest_object.get(),
this,
s->yield);
op_ret = dest_object->swift_versioning_copy(s->obj_ctx, this, s->yield);
if (op_ret < 0) {
return;
}
RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
op_ret = src_object->copy_object(obj_ctx,
s->user.get(),
&s->info,
@ -5557,7 +5548,7 @@ void RGWPutLC::execute(optional_yield y)
return;
}
op_ret = config.rebuild(store->getRados(), new_config);
op_ret = config.rebuild(new_config);
if (op_ret < 0)
return;
@ -5575,7 +5566,7 @@ void RGWPutLC::execute(optional_yield y)
return;
}
op_ret = store->getRados()->get_lc()->set_bucket_config(s->bucket->get_info(), s->bucket_attrs, &new_config);
op_ret = store->get_rgwlc()->set_bucket_config(s->bucket->get_info(), s->bucket_attrs, &new_config);
if (op_ret < 0) {
return;
}
@ -5591,7 +5582,7 @@ void RGWDeleteLC::execute(optional_yield y)
return;
}
op_ret = store->getRados()->get_lc()->remove_bucket_config(s->bucket->get_info(), s->bucket_attrs);
op_ret = store->get_rgwlc()->remove_bucket_config(s->bucket->get_info(), s->bucket_attrs);
if (op_ret < 0) {
return;
}
@ -6027,18 +6018,12 @@ void RGWCompleteMultipart::execute(optional_yield y)
/*take a cls lock on meta_obj to prevent racing completions (or retries)
from deleting the parts*/
rgw_pool meta_pool;
rgw_raw_obj raw_obj;
int max_lock_secs_mp =
s->cct->_conf.get_val<int64_t>("rgw_mp_lock_max_time");
utime_t dur(max_lock_secs_mp, 0);
store->getRados()->obj_to_raw((s->bucket->get_info()).placement_rule, meta_obj->get_obj(), &raw_obj);
store->getRados()->get_obj_data_pool((s->bucket->get_info()).placement_rule,
meta_obj->get_obj(), &meta_pool);
store->getRados()->open_pool_ctx(meta_pool, serializer.ioctx, true);
op_ret = serializer.try_lock(raw_obj.oid, dur, y);
serializer = meta_obj->get_serializer("RGWCompleteMultipart");
op_ret = serializer->try_lock(dur, y);
if (op_ret < 0) {
ldpp_dout(this, 0) << "failed to acquire lock" << dendl;
op_ret = -ERR_INTERNAL_ERROR;
@ -6208,11 +6193,11 @@ void RGWCompleteMultipart::execute(optional_yield y)
return;
// remove the upload obj
int r = store->getRados()->delete_obj(*static_cast<RGWObjectCtx *>(s->obj_ctx),
s->bucket->get_info(), meta_obj->get_obj(), 0);
string version_id;
int r = meta_obj->delete_object(s->obj_ctx, ACLOwner(), ACLOwner(), ceph::real_time(), false, 0, version_id, null_yield);
if (r >= 0) {
/* serializer's exclusive lock is released */
serializer.clear_locked();
serializer->clear_locked();
} else {
ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
}
@ -6225,28 +6210,13 @@ void RGWCompleteMultipart::execute(optional_yield y)
}
}
int RGWCompleteMultipart::MPSerializer::try_lock(
const std::string& _oid,
utime_t dur, optional_yield y)
{
oid = _oid;
op.assert_exists();
lock.set_duration(dur);
lock.lock_exclusive(&op);
int ret = rgw_rados_operate(ioctx, oid, &op, y);
if (! ret) {
locked = true;
}
return ret;
}
void RGWCompleteMultipart::complete()
{
/* release exclusive lock iff not already */
if (unlikely(serializer.locked)) {
int r = serializer.unlock();
if (unlikely(serializer && serializer->locked)) {
int r = serializer->unlock();
if (r < 0) {
ldpp_dout(this, 0) << "WARNING: failed to unlock " << serializer.oid << dendl;
ldpp_dout(this, 0) << "WARNING: failed to unlock " << serializer->oid << dendl;
}
}
send_response();
@ -7941,7 +7911,7 @@ void RGWGetObjLegalHold::execute(optional_yield y)
void RGWGetClusterStat::execute(optional_yield y)
{
op_ret = this->store->getRados()->get_rados_handle()->cluster_stat(stats_op);
op_ret = store->cluster_stat(stats_op);
}

View File

@ -48,7 +48,6 @@
#include "rgw_torrent.h"
#include "rgw_tag.h"
#include "rgw_object_lock.h"
#include "cls/lock/cls_lock_client.h"
#include "cls/rgw/cls_rgw_client.h"
#include "rgw_public_access.h"
@ -1741,31 +1740,11 @@ protected:
string etag;
string version_id;
bufferlist data;
struct MPSerializer {
librados::IoCtx ioctx;
rados::cls::lock::Lock lock;
librados::ObjectWriteOperation op;
std::string oid;
bool locked;
MPSerializer() : lock("RGWCompleteMultipart"), locked(false)
{}
int try_lock(const std::string& oid, utime_t dur, optional_yield y);
int unlock() {
return lock.unlock(&ioctx, oid);
}
void clear_locked() {
locked = false;
}
} serializer;
rgw::sal::MPSerializer* serializer;
public:
RGWCompleteMultipart() {}
~RGWCompleteMultipart() override {}
RGWCompleteMultipart() : serializer(nullptr) {}
~RGWCompleteMultipart() override { delete serializer; }
int verify_permission(optional_yield y) override;
void pre_exec() override;
@ -2375,7 +2354,7 @@ public:
class RGWGetClusterStat : public RGWOp {
protected:
struct rados_cluster_stat_t stats_op;
RGWClusterStat stats_op;
public:
RGWGetClusterStat() {}

View File

@ -8135,7 +8135,7 @@ int RGWRados::process_gc(bool expired_only)
}
int RGWRados::list_lc_progress(string& marker, uint32_t max_entries,
vector<cls_rgw_lc_entry>& progress_map,
vector<rgw::sal::Lifecycle::LCEntry>& progress_map,
int& index)
{
return lc->list_lc_progress(marker, max_entries, progress_map, index);

View File

@ -370,7 +370,11 @@ public:
class RGWGetDirHeader_CB;
class RGWGetUserHeader_CB;
namespace rgw { namespace sal { class RGWRadosStore; } }
namespace rgw { namespace sal {
class RGWRadosStore;
class MPRadosSerializer;
class LCRadosSerializer;
} }
class RGWAsyncRadosProcessor;
@ -396,7 +400,6 @@ class RGWRados
friend class RGWGC;
friend class RGWMetaNotifier;
friend class RGWDataNotifier;
friend class RGWLC;
friend class RGWObjectExpirer;
friend class RGWMetaSyncProcessorThread;
friend class RGWDataSyncProcessorThread;
@ -404,7 +407,8 @@ class RGWRados
friend class RGWBucketReshard;
friend class RGWBucketReshardLock;
friend class BucketIndexLockGuard;
friend class RGWCompleteMultipart;
friend class rgw::sal::MPRadosSerializer;
friend class rgw::sal::LCRadosSerializer;
friend class rgw::sal::RGWRadosStore;
/** Open the pool used as root for this gateway */
@ -1446,7 +1450,7 @@ public:
int process_lc();
int list_lc_progress(string& marker, uint32_t max_entries,
vector<cls_rgw_lc_entry>& progress_map, int& index);
vector<rgw::sal::Lifecycle::LCEntry>& progress_map, int& index);
int bucket_check_index(RGWBucketInfo& bucket_info,
map<RGWObjCategory, RGWStorageStats> *existing_stats,

View File

@ -21,6 +21,7 @@
class RGWGetDataCB;
struct RGWObjState;
class RGWAccessListFilter;
class RGWLC;
struct RGWUsageIter {
string read_iter;
@ -29,6 +30,22 @@ struct RGWUsageIter {
RGWUsageIter() : index(0) {}
};
/**
* @struct RGWClusterStat
* Cluster-wide usage information
*/
struct RGWClusterStat {
/// total device size
uint64_t kb;
/// total used
uint64_t kb_used;
/// total available/free
uint64_t kb_avail;
/// number of objects
uint64_t num_objects;
};
namespace rgw { namespace sal {
#define RGW_SAL_VERSION 1
@ -37,6 +54,8 @@ class RGWUser;
class RGWBucket;
class RGWObject;
class RGWBucketList;
struct MPSerializer;
class Lifecycle;
enum AttrsMod {
ATTRSMOD_NONE = 0,
@ -55,7 +74,7 @@ class RGWStore : public DoutPrefixProvider {
virtual std::unique_ptr<RGWObject> get_object(const rgw_obj_key& k) = 0;
virtual int get_bucket(RGWUser* u, const rgw_bucket& b, std::unique_ptr<RGWBucket>* bucket, optional_yield y) = 0;
virtual int get_bucket(RGWUser* u, const RGWBucketInfo& i, std::unique_ptr<RGWBucket>* bucket) = 0;
virtual int get_bucket(RGWUser* u, const std::string& tenant, const std::string&name, std::unique_ptr<RGWBucket>* bucket, optional_yield y) = 0;
virtual int get_bucket(RGWUser* u, const std::string& tenant, const std::string& name, std::unique_ptr<RGWBucket>* bucket, optional_yield y) = 0;
virtual int create_bucket(RGWUser& u, const rgw_bucket& b,
const std::string& zonegroup_id,
rgw_placement_rule& placement_rule,
@ -80,6 +99,9 @@ class RGWStore : public DoutPrefixProvider {
optional_yield y) = 0;
virtual const RGWZoneGroup& get_zonegroup() = 0;
virtual int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) = 0;
virtual int cluster_stat(RGWClusterStat& stats) = 0;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) = 0;
virtual RGWLC* get_rgwlc(void) = 0;
virtual void finalize(void)=0;
@ -162,7 +184,7 @@ class RGWBucket {
struct ListResults {
vector<rgw_bucket_dir_entry> objs;
map<std::string, bool> common_prefixes;
bool is_truncated;
bool is_truncated{false};
rgw_obj_key next_marker;
};
@ -205,6 +227,7 @@ class RGWBucket {
virtual int chown(RGWUser* new_user, RGWUser* old_user, optional_yield y) = 0;
virtual int put_instance_info(bool exclusive, ceph::real_time mtime) = 0;
virtual bool is_owner(RGWUser* user) = 0;
virtual RGWUser* get_owner(void) { return owner; };
virtual int check_empty(optional_yield y) = 0;
virtual int check_quota(RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, optional_yield y, bool check_size_only = false) = 0;
virtual int set_instance_attrs(RGWAttrs& attrs, optional_yield y) = 0;
@ -432,6 +455,7 @@ class RGWObject {
virtual int delete_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, optional_yield y) = 0;
virtual int copy_obj_data(RGWObjectCtx& rctx, RGWBucket* dest_bucket, RGWObject* dest_obj, uint16_t olh_epoch, std::string* petag, const DoutPrefixProvider *dpp, optional_yield y) = 0;
virtual bool is_expired() = 0;
virtual MPSerializer* get_serializer(const std::string& lock_name) = 0;
RGWAttrs& get_attrs(void) { return attrs; }
ceph::real_time get_mtime(void) const { return mtime; }
@ -446,6 +470,14 @@ class RGWObject {
void set_in_extra_data(bool i) { in_extra_data = i; }
int range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end);
/* Swift versioning */
virtual int swift_versioning_restore(RGWObjectCtx* obj_ctx,
bool& restored, /* out */
const DoutPrefixProvider *dpp) = 0;
virtual int swift_versioning_copy(RGWObjectCtx* obj_ctx,
const DoutPrefixProvider *dpp,
optional_yield y) = 0;
/* OPs */
virtual std::unique_ptr<ReadOp> get_read_op(RGWObjectCtx*) = 0;
virtual std::unique_ptr<WriteOp> get_write_op(RGWObjectCtx*) = 0;
@ -469,6 +501,14 @@ class RGWObject {
}
virtual void gen_rand_obj_instance_name() = 0;
virtual void raw_obj_to_obj(const rgw_raw_obj& raw_obj) = 0;
virtual void get_raw_obj(rgw_raw_obj* raw_obj) = 0;
virtual int transition(RGWObjectCtx& rctx,
RGWBucket* bucket,
const rgw_placement_rule& placement_rule,
const real_time& mtime,
uint64_t olh_epoch,
const DoutPrefixProvider *dpp,
optional_yield y) = 0;
/* dang - This is temporary, until the API is completed */
rgw_obj_key& get_key() { return key; }
@ -493,5 +533,63 @@ class RGWObject {
}
};
struct Serializer {
Serializer() = default;
virtual ~Serializer() = default;
virtual int try_lock(utime_t dur, optional_yield y) = 0;
virtual int unlock() = 0;
};
struct MPSerializer : Serializer {
bool locked;
std::string oid;
MPSerializer() : locked(false) {}
virtual ~MPSerializer() = default;
void clear_locked() {
locked = false;
}
};
struct LCSerializer : Serializer {
LCSerializer() {}
virtual ~LCSerializer() = default;
};
class Lifecycle {
public:
struct LCHead {
time_t start_date{0};
std::string marker;
LCHead() = default;
LCHead(time_t _date, std::string& _marker) : start_date(_date), marker(_marker) {}
};
struct LCEntry {
std::string bucket;
uint64_t start_time{0};
uint32_t status{0};
LCEntry() = default;
LCEntry(std::string& _bucket, uint64_t _time, uint32_t _status) : bucket(_bucket), start_time(_time), status(_status) {}
};
Lifecycle() = default;
virtual ~Lifecycle() = default;
virtual int get_entry(const string& oid, const std::string& marker, LCEntry& entry) = 0;
virtual int get_next_entry(const string& oid, std::string& marker, LCEntry& entry) = 0;
virtual int set_entry(const string& oid, const LCEntry& entry) = 0;
virtual int list_entries(const string& oid, const string& marker,
uint32_t max_entries, vector<LCEntry>& entries) = 0;
virtual int rm_entry(const string& oid, const LCEntry& entry) = 0;
virtual int get_head(const string& oid, LCHead& head) = 0;
virtual int put_head(const string& oid, const LCHead& head) = 0;
virtual LCSerializer* get_serializer(const std::string& lock_name, const std::string& oid, const std::string& cookie) = 0;
};
} } // namespace rgw::sal

View File

@ -28,12 +28,12 @@
#include "rgw_multi.h"
#include "rgw_acl_s3.h"
/* Stuff for RGWRadosStore. Move to separate file when store split out */
#include "rgw_zone.h"
#include "rgw_rest_conn.h"
#include "services/svc_sys_obj.h"
#include "services/svc_zone.h"
#include "services/svc_tier_rados.h"
#include "cls/rgw/cls_rgw_client.h"
#define dout_subsys ceph_subsys_rgw
@ -538,6 +538,11 @@ void RGWRadosObject::raw_obj_to_obj(const rgw_raw_obj& raw_obj)
set_key(tobj.key);
}
void RGWRadosObject::get_raw_obj(rgw_raw_obj* raw_obj)
{
store->getRados()->obj_to_raw((bucket->get_info()).placement_rule, get_obj(), raw_obj);
}
int RGWRadosObject::omap_get_vals_by_keys(const std::string& oid,
const std::set<std::string>& keys,
RGWAttrs *vals)
@ -556,6 +561,22 @@ int RGWRadosObject::omap_get_vals_by_keys(const std::string& oid,
return cur_ioctx.omap_get_vals_by_keys(oid, keys, vals);
}
MPSerializer* RGWRadosObject::get_serializer(const std::string& lock_name)
{
return new MPRadosSerializer(store, this, lock_name);
}
int RGWRadosObject::transition(RGWObjectCtx& rctx,
RGWBucket* bucket,
const rgw_placement_rule& placement_rule,
const real_time& mtime,
uint64_t olh_epoch,
const DoutPrefixProvider *dpp,
optional_yield y)
{
return store->getRados()->transition_obj(rctx, bucket, *this, placement_rule, mtime, olh_epoch, dpp, y);
}
std::unique_ptr<RGWObject::ReadOp> RGWRadosObject::get_read_op(RGWObjectCtx *ctx)
{
return std::unique_ptr<RGWObject::ReadOp>(new RGWRadosObject::RadosReadOp(this, ctx));
@ -752,6 +773,30 @@ int RGWRadosObject::RadosWriteOp::write_meta(uint64_t size, uint64_t accounted_s
return ret;
}
int RGWRadosObject::swift_versioning_restore(RGWObjectCtx* obj_ctx,
bool& restored,
const DoutPrefixProvider *dpp)
{
return store->getRados()->swift_versioning_restore(*obj_ctx,
bucket->get_owner()->get_id(),
bucket,
this,
restored,
dpp);
}
int RGWRadosObject::swift_versioning_copy(RGWObjectCtx* obj_ctx,
const DoutPrefixProvider *dpp,
optional_yield y)
{
return store->getRados()->swift_versioning_copy(*obj_ctx,
bucket->get_info().owner,
bucket,
this,
dpp,
y);
}
int RGWRadosStore::get_bucket(RGWUser* u, const rgw_bucket& b, std::unique_ptr<RGWBucket>* bucket, optional_yield y)
{
int ret;
@ -886,6 +931,23 @@ int RGWRadosStore::get_zonegroup(const string& id, RGWZoneGroup& zonegroup)
return rados->svc.zone->get_zonegroup(id, zonegroup);
}
int RGWRadosStore::cluster_stat(RGWClusterStat& stats)
{
rados_cluster_stat_t rados_stats;
int ret;
ret = rados->get_rados_handle()->cluster_stat(rados_stats);
if (ret < 0)
return ret;
stats.kb = rados_stats.kb;
stats.kb_used = rados_stats.kb_used;
stats.kb_avail = rados_stats.kb_avail;
stats.num_objects = rados_stats.num_objects;
return ret;
}
int RGWRadosStore::create_bucket(RGWUser& u, const rgw_bucket& b,
const string& zonegroup_id,
rgw_placement_rule& placement_rule,
@ -1002,6 +1064,141 @@ int RGWRadosStore::create_bucket(RGWUser& u, const rgw_bucket& b,
return ret;
}
std::unique_ptr<Lifecycle> RGWRadosStore::get_lifecycle(void)
{
return std::unique_ptr<Lifecycle>(new RadosLifecycle(this));
}
MPRadosSerializer::MPRadosSerializer(RGWRadosStore* store, RGWRadosObject* obj, const std::string& lock_name) :
lock(lock_name)
{
rgw_pool meta_pool;
rgw_raw_obj raw_obj;
obj->get_raw_obj(&raw_obj);
oid = raw_obj.oid;
store->getRados()->get_obj_data_pool(obj->get_bucket()->get_placement_rule(),
obj->get_obj(), &meta_pool);
store->getRados()->open_pool_ctx(meta_pool, ioctx, true);
}
int MPRadosSerializer::try_lock(utime_t dur, optional_yield y)
{
op.assert_exists();
lock.set_duration(dur);
lock.lock_exclusive(&op);
int ret = rgw_rados_operate(ioctx, oid, &op, y);
if (! ret) {
locked = true;
}
return ret;
}
LCRadosSerializer::LCRadosSerializer(RGWRadosStore* store, const std::string& _oid, const std::string& lock_name, const std::string& cookie) :
lock(lock_name), oid(_oid)
{
ioctx = &store->getRados()->lc_pool_ctx;
lock.set_cookie(cookie);
}
int LCRadosSerializer::try_lock(utime_t dur, optional_yield y)
{
lock.set_duration(dur);
return lock.lock_exclusive(ioctx, oid);
}
int RadosLifecycle::get_entry(const string& oid, const std::string& marker,
LCEntry& entry)
{
cls_rgw_lc_entry cls_entry;
int ret = cls_rgw_lc_get_entry(*store->getRados()->get_lc_pool_ctx(), oid, marker, cls_entry);
entry.bucket = cls_entry.bucket;
entry.start_time = cls_entry.start_time;
entry.status = cls_entry.status;
return ret;
}
int RadosLifecycle::get_next_entry(const string& oid, std::string& marker,
LCEntry& entry)
{
cls_rgw_lc_entry cls_entry;
int ret = cls_rgw_lc_get_next_entry(*store->getRados()->get_lc_pool_ctx(), oid, marker,
cls_entry);
entry.bucket = cls_entry.bucket;
entry.start_time = cls_entry.start_time;
entry.status = cls_entry.status;
return ret;
}
int RadosLifecycle::set_entry(const string& oid, const LCEntry& entry)
{
cls_rgw_lc_entry cls_entry;
cls_entry.bucket = entry.bucket;
cls_entry.start_time = entry.start_time;
cls_entry.status = entry.status;
return cls_rgw_lc_set_entry(*store->getRados()->get_lc_pool_ctx(), oid, cls_entry);
}
int RadosLifecycle::list_entries(const string& oid, const string& marker,
uint32_t max_entries, vector<LCEntry>& entries)
{
vector<cls_rgw_lc_entry> cls_entries;
int ret = cls_rgw_lc_list(*store->getRados()->get_lc_pool_ctx(), oid, marker, max_entries, cls_entries);
if (ret < 0)
return ret;
for (auto& entry : cls_entries) {
entries.push_back(LCEntry(entry.bucket, entry.start_time, entry.status));
}
return ret;
}
int RadosLifecycle::rm_entry(const string& oid, const LCEntry& entry)
{
cls_rgw_lc_entry cls_entry;
cls_entry.bucket = entry.bucket;
cls_entry.start_time = entry.start_time;
cls_entry.status = entry.status;
return cls_rgw_lc_rm_entry(*store->getRados()->get_lc_pool_ctx(), oid, cls_entry);
}
int RadosLifecycle::get_head(const string& oid, LCHead& head)
{
cls_rgw_lc_obj_head cls_head;
int ret = cls_rgw_lc_get_head(*store->getRados()->get_lc_pool_ctx(), oid, cls_head);
head.marker = cls_head.marker;
head.start_date = cls_head.start_date;
return ret;
}
int RadosLifecycle::put_head(const string& oid, const LCHead& head)
{
cls_rgw_lc_obj_head cls_head;
cls_head.marker = head.marker;
cls_head.start_date = head.start_date;
return cls_rgw_lc_put_head(*store->getRados()->get_lc_pool_ctx(), oid, cls_head);
}
LCSerializer* RadosLifecycle::get_serializer(const std::string& lock_name, const std::string& oid, const std::string& cookie)
{
return new LCRadosSerializer(store, oid, lock_name, cookie);
}
} // namespace rgw::sal
rgw::sal::RGWRadosStore *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)

View File

@ -17,6 +17,7 @@
#include "rgw_sal.h"
#include "rgw_rados.h"
#include "cls/lock/cls_lock_client.h"
namespace rgw { namespace sal {
@ -131,9 +132,26 @@ class RGWRadosObject : public RGWObject {
virtual bool is_expired() override;
virtual void gen_rand_obj_instance_name() override;
virtual void raw_obj_to_obj(const rgw_raw_obj& raw_obj) override;
virtual void get_raw_obj(rgw_raw_obj* raw_obj) override;
virtual std::unique_ptr<RGWObject> clone() {
return std::unique_ptr<RGWObject>(new RGWRadosObject(*this));
}
virtual MPSerializer* get_serializer(const std::string& lock_name) override;
virtual int transition(RGWObjectCtx& rctx,
RGWBucket* bucket,
const rgw_placement_rule& placement_rule,
const real_time& mtime,
uint64_t olh_epoch,
const DoutPrefixProvider *dpp,
optional_yield y) override;
/* Swift versioning */
virtual int swift_versioning_restore(RGWObjectCtx* obj_ctx,
bool& restored,
const DoutPrefixProvider *dpp) override;
virtual int swift_versioning_copy(RGWObjectCtx* obj_ctx,
const DoutPrefixProvider *dpp,
optional_yield y) override;
/* OPs */
virtual std::unique_ptr<ReadOp> get_read_op(RGWObjectCtx *) override;
@ -276,6 +294,9 @@ class RGWRadosStore : public RGWStore {
optional_yield y) override;
virtual const RGWZoneGroup& get_zonegroup() override;
virtual int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) override;
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
virtual RGWLC* get_rgwlc(void) { return rados->get_lc(); }
void setRados(RGWRados * st) { rados = st; }
RGWRados *getRados(void) { return rados; }
@ -302,6 +323,51 @@ class RGWRadosStore : public RGWStore {
};
class MPRadosSerializer : public MPSerializer {
librados::IoCtx ioctx;
rados::cls::lock::Lock lock;
librados::ObjectWriteOperation op;
public:
MPRadosSerializer(RGWRadosStore* store, RGWRadosObject* obj, const std::string& lock_name);
virtual int try_lock(utime_t dur, optional_yield y) override;
int unlock() {
return lock.unlock(&ioctx, oid);
}
};
class LCRadosSerializer : public LCSerializer {
librados::IoCtx* ioctx;
rados::cls::lock::Lock lock;
const std::string oid;
public:
LCRadosSerializer(RGWRadosStore* store, const std::string& oid, const std::string& lock_name, const std::string& cookie);
virtual int try_lock(utime_t dur, optional_yield y) override;
int unlock() {
return lock.unlock(ioctx, oid);
}
};
class RadosLifecycle : public Lifecycle {
RGWRadosStore* store;
public:
RadosLifecycle(RGWRadosStore* _st) : store(_st) {}
virtual int get_entry(const string& oid, const std::string& marker, LCEntry& entry) override;
virtual int get_next_entry(const string& oid, std::string& marker, LCEntry& entry) override;
virtual int set_entry(const string& oid, const LCEntry& entry) override;
virtual int list_entries(const string& oid, const string& marker,
uint32_t max_entries, vector<LCEntry>& entries) override;
virtual int rm_entry(const string& oid, const LCEntry& entry) override;
virtual int get_head(const string& oid, LCHead& head) override;
virtual int put_head(const string& oid, const LCHead& head) override;
virtual LCSerializer* get_serializer(const std::string& lock_name, const std::string& oid, const std::string& cookie) override;
};
} } // namespace rgw::sal
class RGWStoreManager {