rgwlc: permit lifecycle processing for a single bucket

Permit a --bucket option to be passed to radosgw-admin lc process,
and propagate the bucket name to lifecycle processing, and process
only the named bucket if one is provided.

Fixes: https://tracker.ceph.com/issues/53430

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
This commit is contained in:
Matt Benjamin 2021-11-29 20:25:41 -05:00
parent 4f3a33af02
commit 9ddc223e63
6 changed files with 25 additions and 11 deletions

View File

@ -365,7 +365,8 @@ which are as follows:
List all bucket lifecycle progress.
:command:`lc process`
Manually process lifecycle.
Manually process lifecycle. If bucket specified with --bucket=<bucket>,
only <bucket> is processed.
:command:`metadata get`
Get metadata info.

View File

@ -7549,7 +7549,7 @@ next:
}
if (opt_cmd == OPT::LC_PROCESS) {
int ret = static_cast<rgw::sal::RadosStore*>(store)->getRados()->process_lc();
int ret = static_cast<rgw::sal::RadosStore*>(store)->getRados()->process_lc(bucket_name);
if (ret < 0) {
cerr << "ERROR: lc processing returned error: " << cpp_strerror(-ret) << std::endl;
return 1;

View File

@ -214,10 +214,11 @@ bool RGWLifecycleConfiguration::valid()
void *RGWLC::LCWorker::entry() {
do {
std::string bucket_name{""}; // empty restriction, all buckets
utime_t start = ceph_clock_now();
if (should_work(start)) {
ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
int r = lc->process(this, false /* once */);
int r = lc->process(this, bucket_name, false /* once */);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
<< r << dendl;
@ -1881,7 +1882,8 @@ static inline vector<int> random_sequence(uint32_t n)
return v;
}
int RGWLC::process(LCWorker* worker, bool once = false)
int RGWLC::process(LCWorker* worker, const std::string& bucket_name,
bool once = false)
{
int max_secs = cct->_conf->rgw_lc_lock_max_time;
@ -1889,7 +1891,7 @@ int RGWLC::process(LCWorker* worker, bool once = false)
* that might be running in parallel */
vector<int> shard_seq = random_sequence(max_objs);
for (auto index : shard_seq) {
int ret = process(index, max_secs, worker, once);
int ret = process(index, max_secs, worker, bucket_name, once);
if (ret < 0)
return ret;
}
@ -1924,12 +1926,13 @@ time_t RGWLC::thread_stop_at()
}
int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
bool once = false)
const std::string& bucket_name, bool once = false)
{
ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
<< "index: " << index << " worker ix: " << worker->ix
<< dendl;
std::string bucket_prefix = fmt::format(":{}:", bucket_name);
rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
obj_names[index],
std::string());
@ -1994,6 +1997,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
}
}
next_bucket:
ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
@ -2005,6 +2009,14 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
if (entry.bucket.empty())
goto exit;
/* skip over bucket if processing for only a single bucket was
* requested, and this one isn't it */
if (! bucket_name.empty()) {
if (! boost::algorithm::starts_with(entry.bucket, bucket_prefix)) {
goto next_bucket;
}
}
ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
<< " index: " << index << " worker ix: " << worker->ix
<< dendl;

View File

@ -515,8 +515,9 @@ public:
void initialize(CephContext *_cct, rgw::sal::Store* _store);
void finalize();
int process(LCWorker* worker, bool once);
int process(int index, int max_secs, LCWorker* worker, bool once);
int process(LCWorker* worker, const std::string& bucket_name, bool once);
int process(int index, int max_secs, LCWorker* worker,
const std::string& bucket_name, bool once);
bool if_already_run_today(time_t start_date);
bool expired_session(time_t started);
time_t thread_stop_at();

View File

@ -8336,12 +8336,12 @@ int RGWRados::list_lc_progress(string& marker, uint32_t max_entries,
return lc->list_lc_progress(marker, max_entries, progress_map, index);
}
int RGWRados::process_lc()
int RGWRados::process_lc(const std::string& bucket_name)
{
RGWLC lc;
lc.initialize(cct, this->store);
RGWLC::LCWorker worker(&lc, cct, &lc, 0);
auto ret = lc.process(&worker, true /* once */);
auto ret = lc.process(&worker, bucket_name, true /* once */);
lc.stop_processor(); // sets down_flag, but returns immediately
return ret;
}

View File

@ -1450,7 +1450,7 @@ public:
bool process_expire_objects(const DoutPrefixProvider *dpp);
int defer_gc(const DoutPrefixProvider *dpp, void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, optional_yield y);
int process_lc();
int process_lc(const std::string& bucket_name);
int list_lc_progress(std::string& marker, uint32_t max_entries,
std::vector<rgw::sal::Lifecycle::LCEntry>& progress_map, int& index);