mirror of
https://github.com/ceph/ceph
synced 2025-04-01 14:51:13 +00:00
rgw: split per-bucket resharding logic out from general resharding
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
parent
0bc15671ee
commit
7a34a049ae
@ -894,3 +894,21 @@ void cls_rgw_guard_bucket_resharding(librados::ObjectWriteOperation& op, int ret
|
||||
::encode(call, in);
|
||||
op.exec("rgw", "guard_bucket_resharding", in);
|
||||
}
|
||||
|
||||
static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
|
||||
const cls_rgw_bucket_instance_entry& entry,
|
||||
BucketIndexAioManager *manager) {
|
||||
bufferlist in;
|
||||
struct cls_rgw_set_bucket_resharding_op call;
|
||||
call.entry = entry;
|
||||
::encode(call, in);
|
||||
librados::ObjectWriteOperation op;
|
||||
op.exec("rgw", "set_bucket_resharding", in);
|
||||
return manager->aio_operate(io_ctx, oid, &op);
|
||||
}
|
||||
|
||||
int CLSRGWIssueSetBucketResharding::issue_op(int shard_id, const string& oid)
|
||||
{
|
||||
return issue_set_bucket_resharding(io_ctx, oid, entry, &manager);
|
||||
}
|
||||
|
||||
|
@ -456,6 +456,16 @@ public:
|
||||
CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
|
||||
};
|
||||
|
||||
class CLSRGWIssueSetBucketResharding : public CLSRGWConcurrentIO {
|
||||
cls_rgw_bucket_instance_entry entry;
|
||||
protected:
|
||||
int issue_op(int shard_id, const string& oid) override;
|
||||
public:
|
||||
CLSRGWIssueSetBucketResharding(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
|
||||
const cls_rgw_bucket_instance_entry& entry,
|
||||
uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
|
||||
};
|
||||
|
||||
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
|
||||
|
||||
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
|
||||
|
@ -8250,6 +8250,19 @@ int RGWRados::bucket_rebuild_index(RGWBucketInfo& bucket_info)
|
||||
return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
|
||||
}
|
||||
|
||||
int RGWRados::bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
|
||||
{
|
||||
librados::IoCtx index_ctx;
|
||||
map<int, string> bucket_objs;
|
||||
|
||||
int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
|
||||
if (r < 0) {
|
||||
return r;
|
||||
}
|
||||
|
||||
return CLSRGWIssueSetBucketResharding(index_ctx, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
|
||||
}
|
||||
|
||||
int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj)
|
||||
{
|
||||
RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
|
||||
|
@ -2181,6 +2181,7 @@ class RGWRados
|
||||
friend class RGWStateLog;
|
||||
friend class RGWReplicaLogger;
|
||||
friend class RGWReshard;
|
||||
friend class RGWBucketReshard;
|
||||
friend class BucketIndexLockGuard;
|
||||
|
||||
/** Open the pool used as root for this gateway */
|
||||
@ -3427,6 +3428,7 @@ public:
|
||||
map<RGWObjCategory, RGWStorageStats> *existing_stats,
|
||||
map<RGWObjCategory, RGWStorageStats> *calculated_stats);
|
||||
int bucket_rebuild_index(RGWBucketInfo& bucket_info);
|
||||
int bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry);
|
||||
int remove_objs_from_index(RGWBucketInfo& bucket_info, list<rgw_obj_index_key>& oid_list);
|
||||
int move_rados_obj(librados::IoCtx& src_ioctx,
|
||||
const string& src_oid, const string& src_locator,
|
||||
|
@ -14,6 +14,65 @@ const string reshard_oid = "reshard";
|
||||
const string reshard_lock_name = "reshard_process";
|
||||
const string bucket_instance_lock_name = "bucket_instance_lock";
|
||||
|
||||
RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info) :
|
||||
store(_store), bucket_info(_bucket_info),
|
||||
reshard_lock(reshard_lock_name) {
|
||||
const rgw_bucket& b = bucket_info.bucket;
|
||||
reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
|
||||
}
|
||||
|
||||
int RGWBucketReshard::lock_bucket()
|
||||
{
|
||||
#warning set timeout for guard lock
|
||||
|
||||
int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
|
||||
if (ret < 0) {
|
||||
ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl;
|
||||
return ret;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void RGWBucketReshard::unlock_bucket()
|
||||
{
|
||||
int ret = reshard_lock.unlock(&store->reshard_pool_ctx, reshard_oid);
|
||||
if (ret < 0) {
|
||||
ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
int RGWBucketReshard::init_resharding(const cls_rgw_reshard_entry& entry)
|
||||
{
|
||||
if (entry.new_instance_id.empty()) {
|
||||
ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
cls_rgw_bucket_instance_entry instance_entry;
|
||||
instance_entry.new_bucket_instance_id = entry.new_instance_id;
|
||||
|
||||
int ret = store->bucket_set_reshard(bucket_info, instance_entry);
|
||||
if (ret < 0) {
|
||||
ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
|
||||
<< cpp_strerror(-ret) << dendl;
|
||||
return ret;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWBucketReshard::clear_resharding()
|
||||
{
|
||||
cls_rgw_bucket_instance_entry instance_entry;
|
||||
|
||||
int ret = store->bucket_set_reshard(bucket_info, instance_entry);
|
||||
if (ret < 0) {
|
||||
ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
|
||||
<< cpp_strerror(-ret) << dendl;
|
||||
return ret;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store),
|
||||
instance_lock(bucket_instance_lock_name)
|
||||
{
|
||||
@ -104,38 +163,6 @@ std::string create_bucket_index_lock_name(const string& bucket_instance_id) {
|
||||
return bucket_instance_lock_name + "." + bucket_instance_id;
|
||||
}
|
||||
|
||||
int RGWReshard::set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
|
||||
{
|
||||
rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
|
||||
|
||||
if (entry.new_instance_id.empty()) {
|
||||
ldout(cct, 0) << "RGWReshard::" << __func__ << " missing new bucket instance id" << dendl;
|
||||
return -EEXIST;
|
||||
}
|
||||
|
||||
int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid);
|
||||
if (ret == -EBUSY) {
|
||||
ldout(cct, 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
|
||||
return 0;
|
||||
}
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
cls_rgw_bucket_instance_entry instance_entry;
|
||||
instance_entry.new_bucket_instance_id = entry.new_instance_id;
|
||||
|
||||
ret = cls_rgw_set_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid, instance_entry);
|
||||
if (ret < 0) {
|
||||
ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: cls_rgw_set_bucket_resharding: "
|
||||
<< cpp_strerror(-ret) << dendl;
|
||||
goto done;
|
||||
}
|
||||
|
||||
done:
|
||||
l.unlock(&store->reshard_pool_ctx, bucket_instance_oid);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
|
||||
{
|
||||
rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
|
||||
@ -150,7 +177,7 @@ int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_r
|
||||
|
||||
entry.new_instance_id.clear();
|
||||
|
||||
ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
|
||||
ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
|
||||
|
||||
l.unlock(&store->reshard_pool_ctx, bucket_instance_oid);
|
||||
return ret;
|
||||
|
@ -33,6 +33,25 @@ protected:
|
||||
int unlock();
|
||||
};
|
||||
|
||||
|
||||
class RGWBucketReshard {
|
||||
RGWRados *store;
|
||||
RGWBucketInfo bucket_info;
|
||||
|
||||
string reshard_oid;
|
||||
rados::cls::lock::Lock reshard_lock;
|
||||
|
||||
int lock_bucket();
|
||||
void unlock_bucket();
|
||||
int init_resharding(const cls_rgw_reshard_entry& entry);
|
||||
int clear_resharding();
|
||||
public:
|
||||
RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info);
|
||||
|
||||
int reshard();
|
||||
int abort_reshard();
|
||||
};
|
||||
|
||||
class RGWReshard {
|
||||
CephContext *cct;
|
||||
RGWRados *store;
|
||||
@ -49,7 +68,6 @@ class RGWReshard {
|
||||
int get(cls_rgw_reshard_entry& entry);
|
||||
int remove(cls_rgw_reshard_entry& entry);
|
||||
int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
|
||||
int set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
|
||||
int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
|
||||
/*
|
||||
if succefull, keeps the bucket index locked. It will be unlocked
|
||||
|
Loading…
Reference in New Issue
Block a user