mirror of
https://github.com/ceph/ceph
synced 2025-03-31 16:25:56 +00:00
rgw: multiple fixes and adjustments related to resharding scheduler
still wip, but: - get rid of some unneeded rados locking - rename shards to logshards where relevant to avoid confusion - remove unused code Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
parent
b757e7e16e
commit
8e1bf1b9e8
@ -1733,7 +1733,8 @@ OPTION(debug_deliberately_leak_memory, OPT_BOOL, false)
|
||||
OPTION(rgw_swift_custom_header, OPT_STR, "") // option to enable swift custom headers
|
||||
|
||||
/* resharding tunables */
|
||||
OPTION(rgw_reshard_max_jobs, OPT_INT, 1024)
|
||||
OPTION(rgw_reshard_num_logs, OPT_INT, 16)
|
||||
OPTION(rgw_reshard_bucket_lock_duration, OPT_INT, 120) // duration of lock on bucket obj during resharding
|
||||
OPTION(rgw_dynamic_resharding, OPT_BOOL, true)
|
||||
OPTION(rgw_max_objs_per_shard, OPT_INT, 100000)
|
||||
OPTION(rgw_reshard_thread_interval, OPT_U32, 60 * 10) // maximum time between rounds of reshard thread processing
|
||||
|
@ -5626,31 +5626,40 @@ next:
|
||||
|
||||
if (opt_cmd == OPT_RESHARD_LIST) {
|
||||
list<cls_rgw_reshard_entry> entries;
|
||||
bool is_truncated = true;
|
||||
string marker;
|
||||
int ret;
|
||||
int count = 0;
|
||||
if (max_entries < 0) {
|
||||
max_entries = 1000;
|
||||
}
|
||||
|
||||
int num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
|
||||
|
||||
RGWReshard reshard(store);
|
||||
|
||||
formatter->open_array_section("reshard");
|
||||
do {
|
||||
entries.clear();
|
||||
ret = reshard.list(marker, max_entries, entries, is_truncated);
|
||||
if (ret < 0) {
|
||||
cerr << "Error listing resharding buckets: " << cpp_strerror(-ret) << std::endl;
|
||||
return ret;
|
||||
for (int i = 0; i < num_logshards; i++) {
|
||||
bool is_truncated = true;
|
||||
string marker;
|
||||
do {
|
||||
entries.clear();
|
||||
ret = reshard.list(i, marker, max_entries, entries, &is_truncated);
|
||||
if (ret < 0) {
|
||||
cerr << "Error listing resharding buckets: " << cpp_strerror(-ret) << std::endl;
|
||||
return ret;
|
||||
}
|
||||
for (auto iter=entries.begin(); iter != entries.end(); ++iter) {
|
||||
cls_rgw_reshard_entry& entry = *iter;
|
||||
encode_json("entry", entry, formatter);
|
||||
}
|
||||
count += entries.size();
|
||||
formatter->flush(cout);
|
||||
#warning marker?
|
||||
} while (is_truncated && count < max_entries);
|
||||
|
||||
if (count >= max_entries) {
|
||||
break;
|
||||
}
|
||||
for (auto iter=entries.begin(); iter != entries.end(); ++iter) {
|
||||
cls_rgw_reshard_entry& entry = *iter;
|
||||
encode_json("entry", entry, formatter);
|
||||
}
|
||||
count += entries.size();
|
||||
formatter->flush(cout);
|
||||
} while (is_truncated && count < max_entries);
|
||||
}
|
||||
|
||||
formatter->close_section();
|
||||
formatter->flush(cout);
|
||||
|
@ -12,7 +12,7 @@
|
||||
#define dout_context g_ceph_context
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
|
||||
const string reshard_oid = "reshard";
|
||||
const string reshard_oid_prefix = "reshard";
|
||||
const string reshard_lock_name = "reshard_process";
|
||||
const string bucket_instance_lock_name = "bucket_instance_lock";
|
||||
|
||||
@ -478,126 +478,101 @@ sleep(10);
|
||||
|
||||
RGWReshard::RGWReshard(RGWRados* _store): store(_store), instance_lock(bucket_instance_lock_name)
|
||||
{
|
||||
max_jobs = store->ctx()->_conf->rgw_reshard_max_jobs;
|
||||
num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
|
||||
}
|
||||
|
||||
string RGWReshard::get_logshard_key(const string& tenant, const string& bucket_name)
|
||||
{
|
||||
return bucket_name + ":" + tenant; /* transposed on purpose */
|
||||
}
|
||||
|
||||
#define MAX_RESHARD_LOGSHARDS_PRIME 7877
|
||||
|
||||
void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
|
||||
{
|
||||
string key = get_logshard_key(tenant, bucket_name);
|
||||
|
||||
uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
|
||||
uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
|
||||
sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
|
||||
int logshard = sid % num_logshards;
|
||||
|
||||
get_logshard_oid(logshard, oid);
|
||||
}
|
||||
|
||||
int RGWReshard::add(cls_rgw_reshard_entry& entry)
|
||||
{
|
||||
rados::cls::lock::Lock l(reshard_lock_name);
|
||||
string logshard_oid;
|
||||
|
||||
int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
|
||||
if (ret == -EBUSY) {
|
||||
ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << dendl;
|
||||
return 0;
|
||||
}
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
|
||||
|
||||
librados::ObjectWriteOperation op;
|
||||
cls_rgw_reshard_add(op, entry);
|
||||
|
||||
ret = store->reshard_pool_ctx.operate(reshard_oid, &op);
|
||||
|
||||
l.unlock(&store->reshard_pool_ctx, reshard_oid);
|
||||
return ret;
|
||||
int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
|
||||
if (ret < 0) {
|
||||
lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
|
||||
return ret;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int RGWReshard::list(string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool& is_truncated)
|
||||
int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
|
||||
{
|
||||
rados::cls::lock::Lock l(reshard_lock_name);
|
||||
string logshard_oid;
|
||||
|
||||
int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid);
|
||||
if (ret == -EBUSY) {
|
||||
ldout(store->ctx(), 0) << "RGWReshard::list failed to acquire lock on " << reshard_oid << dendl;
|
||||
return 0;
|
||||
}
|
||||
if (ret < 0)
|
||||
get_logshard_oid(logshard_num, &logshard_oid);
|
||||
|
||||
int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
|
||||
if (ret < 0) {
|
||||
lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl;
|
||||
return ret;
|
||||
|
||||
ret = cls_rgw_reshard_list(store->reshard_pool_ctx, reshard_oid, marker, max, entries, &is_truncated);
|
||||
|
||||
l.unlock(&store->reshard_pool_ctx, reshard_oid);
|
||||
return ret;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWReshard::get(cls_rgw_reshard_entry& entry)
|
||||
{
|
||||
rados::cls::lock::Lock l(reshard_lock_name);
|
||||
string logshard_oid;
|
||||
|
||||
int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid);
|
||||
if (ret == -EBUSY) {
|
||||
ldout(store->ctx(), 0) << "RGWReshardLog::get failed to acquire lock on " << reshard_oid << dendl;
|
||||
return 0;
|
||||
}
|
||||
if (ret < 0)
|
||||
get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
|
||||
|
||||
int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry);
|
||||
if (ret < 0) {
|
||||
lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = cls_rgw_reshard_get(store->reshard_pool_ctx, reshard_oid, entry);
|
||||
|
||||
l.unlock(&store->reshard_pool_ctx, reshard_oid);
|
||||
return ret;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWReshard::remove(cls_rgw_reshard_entry& entry)
|
||||
{
|
||||
rados::cls::lock::Lock l(reshard_lock_name);
|
||||
string logshard_oid;
|
||||
|
||||
int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
|
||||
if (ret == -EBUSY) {
|
||||
ldout(store->ctx(), 0) << "RGWReshardLog::remove failed to acquire lock on " << reshard_oid << dendl;
|
||||
return 0;
|
||||
}
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
|
||||
|
||||
librados::ObjectWriteOperation op;
|
||||
cls_rgw_reshard_remove(op, entry);
|
||||
|
||||
ret = store->reshard_pool_ctx.operate(reshard_oid, &op);
|
||||
int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
|
||||
if (ret < 0) {
|
||||
lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
|
||||
return ret;
|
||||
}
|
||||
|
||||
l.unlock(&store->reshard_pool_ctx, reshard_oid);
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::string create_bucket_index_lock_name(const string& bucket_instance_id) {
|
||||
return bucket_instance_lock_name + "." + bucket_instance_id;
|
||||
}
|
||||
|
||||
int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
|
||||
{
|
||||
rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
|
||||
|
||||
int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid);
|
||||
if (ret == -EBUSY) {
|
||||
ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
|
||||
return 0;
|
||||
}
|
||||
if (ret < 0)
|
||||
int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
|
||||
if (ret < 0) {
|
||||
lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
|
||||
return ret;
|
||||
|
||||
entry.new_instance_id.clear();
|
||||
|
||||
ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
|
||||
|
||||
l.unlock(&store->reshard_pool_ctx, bucket_instance_oid);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int RGWReshard::lock_bucket_index_shared(const string& oid)
|
||||
{
|
||||
int ret = instance_lock.lock_shared(&store->reshard_pool_ctx, oid);
|
||||
if (ret == -EBUSY) {
|
||||
ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int RGWReshard::unlock_bucket_index(const string& oid)
|
||||
{
|
||||
instance_lock.unlock(&store->reshard_pool_ctx, oid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -679,32 +654,34 @@ static int create_new_bucket_instance(RGWRados *store,
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWReshard::process_single_shard(const string& shard)
|
||||
int RGWReshard::process_single_logshard(int logshard_num)
|
||||
{
|
||||
string marker;
|
||||
bool truncated = false;
|
||||
|
||||
CephContext *cct = store->ctx();
|
||||
int max_entries = 1000;
|
||||
int max_secs = 10;
|
||||
|
||||
int max_secs = 60;
|
||||
|
||||
rados::cls::lock::Lock l(reshard_lock_name);
|
||||
|
||||
utime_t time(max_secs, 0);
|
||||
l.set_duration(time);
|
||||
|
||||
int ret = l.lock_exclusive(&store->reshard_pool_ctx, shard);
|
||||
string logshard_oid;
|
||||
get_logshard_oid(logshard_num, &logshard_oid);
|
||||
|
||||
int ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
|
||||
if (ret == -EBUSY) { /* already locked by another processor */
|
||||
dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
|
||||
ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
|
||||
return ret;
|
||||
}
|
||||
|
||||
do {
|
||||
std::list<cls_rgw_reshard_entry> entries;
|
||||
ret = list(marker, max_entries, entries, truncated);
|
||||
ret = list(logshard_num, marker, max_entries, entries, &truncated);
|
||||
if (ret < 0) {
|
||||
ldout(cct, 10) << "cannot list all reshards: " << shard
|
||||
<< dendl;
|
||||
ldout(cct, 10) << "cannot list all reshards in logshard oid=" << logshard_oid << dendl;
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -758,33 +735,35 @@ int RGWReshard::process_single_shard(const string& shard)
|
||||
}
|
||||
}
|
||||
}
|
||||
#warning update marker?
|
||||
#warning do work here, renew lock
|
||||
} while (truncated);
|
||||
|
||||
l.unlock(&store->reshard_pool_ctx, shard);
|
||||
l.unlock(&store->reshard_pool_ctx, logshard_oid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void RGWReshard::get_shard(int shard_num, string& shard)
|
||||
void RGWReshard::get_logshard_oid(int shard_num, string *logshard)
|
||||
{
|
||||
char buf[32];
|
||||
snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
|
||||
|
||||
string objname("bucket_reshard.");
|
||||
shard = objname + buf;
|
||||
string objname(reshard_oid_prefix);
|
||||
*logshard = objname + buf;
|
||||
}
|
||||
|
||||
int RGWReshard::inspect_all_shards()
|
||||
int RGWReshard::inspect_all_logshards()
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
for (int i = 0; i < num_shards; i++) {
|
||||
string shard;
|
||||
store->objexp_get_shard(i, shard);
|
||||
for (int i = 0; i < num_logshards; i++) {
|
||||
string logshard;
|
||||
get_logshard_oid(i, &logshard);
|
||||
|
||||
ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;
|
||||
ldout(store->ctx(), 20) << "proceeding logshard = " << logshard << dendl;
|
||||
|
||||
ret = process_single_shard(shard);
|
||||
ret = process_single_logshard(i);
|
||||
if (ret <0) {
|
||||
return ret;
|
||||
}
|
||||
@ -820,7 +799,7 @@ void *RGWReshard::ReshardWorker::entry() {
|
||||
do {
|
||||
utime_t start = ceph_clock_now();
|
||||
ldout(cct, 2) << "object expiration: start" << dendl;
|
||||
if (reshard->inspect_all_shards()) {
|
||||
if (reshard->inspect_all_logshards()) {
|
||||
/* All shards have been processed properly. Next time we can start
|
||||
* from this moment. */
|
||||
last_run = start;
|
||||
@ -833,7 +812,7 @@ void *RGWReshard::ReshardWorker::entry() {
|
||||
|
||||
utime_t end = ceph_clock_now();
|
||||
end -= start;
|
||||
int secs = cct->_conf->rgw_objexp_gc_interval;
|
||||
int secs = cct->_conf->rgw_reshard_thread_interval;
|
||||
|
||||
if (secs <= end.sec())
|
||||
continue; // next round
|
||||
@ -854,6 +833,7 @@ void RGWReshard::ReshardWorker::stop()
|
||||
cond.Signal();
|
||||
}
|
||||
|
||||
#if 0
|
||||
BucketIndexLockGuard::BucketIndexLockGuard(RGWRados* _store,
|
||||
const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
|
||||
store(_store),
|
||||
|
@ -14,6 +14,7 @@ class CephContext;
|
||||
class RGWRados;
|
||||
|
||||
|
||||
#if 0
|
||||
/* gets a locked lock , release it when exiting context */
|
||||
class BucketIndexLockGuard
|
||||
{
|
||||
@ -33,6 +34,7 @@ protected:
|
||||
int lock();
|
||||
int unlock();
|
||||
};
|
||||
#endif
|
||||
|
||||
|
||||
class RGWBucketReshard {
|
||||
@ -70,13 +72,10 @@ public:
|
||||
class RGWReshard {
|
||||
RGWRados *store;
|
||||
string lock_name;
|
||||
int max_jobs;
|
||||
rados::cls::lock::Lock instance_lock;
|
||||
int num_shards;
|
||||
int num_logshards;
|
||||
|
||||
int lock_bucket_index_shared(const string& oid);
|
||||
int unlock_bucket_index(const string& oid);
|
||||
void get_shard(int shard_num, string& shard);
|
||||
void get_logshard_oid(int shard_num, string *shard);
|
||||
protected:
|
||||
class ReshardWorker : public Thread {
|
||||
CephContext *cct;
|
||||
@ -99,29 +98,32 @@ protected:
|
||||
ReshardWorker *worker;
|
||||
std::atomic<bool> down_flag = { false };
|
||||
|
||||
public:
|
||||
RGWReshard(RGWRados* _store);
|
||||
int add(cls_rgw_reshard_entry& entry);
|
||||
int get(cls_rgw_reshard_entry& entry);
|
||||
int remove(cls_rgw_reshard_entry& entry);
|
||||
int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
|
||||
int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
|
||||
string get_logshard_key(const string& tenant, const string& bucket_name);
|
||||
void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid);
|
||||
|
||||
int reshard_bucket(Formatter *formatter,
|
||||
int num_shards,
|
||||
rgw_bucket& bucket,
|
||||
RGWBucketInfo& bucket_info,
|
||||
RGWBucketInfo& new_bucket_info,
|
||||
int max_entries,
|
||||
RGWBucketAdminOpState& bucket_op,
|
||||
bool verbose = false);
|
||||
public:
|
||||
RGWReshard(RGWRados* _store);
|
||||
int add(cls_rgw_reshard_entry& entry);
|
||||
int get(cls_rgw_reshard_entry& entry);
|
||||
int remove(cls_rgw_reshard_entry& entry);
|
||||
int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
|
||||
int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
|
||||
|
||||
/* reshard thread */
|
||||
int process_single_shard(const std::string& shard);
|
||||
int inspect_all_shards();
|
||||
bool going_down();
|
||||
void start_processor();
|
||||
void stop_processor();
|
||||
int reshard_bucket(Formatter *formatter,
|
||||
int num_shards,
|
||||
rgw_bucket& bucket,
|
||||
RGWBucketInfo& bucket_info,
|
||||
RGWBucketInfo& new_bucket_info,
|
||||
int max_entries,
|
||||
RGWBucketAdminOpState& bucket_op,
|
||||
bool verbose = false);
|
||||
|
||||
/* reshard thread */
|
||||
int process_single_logshard(int logshard_num);
|
||||
int inspect_all_logshards();
|
||||
bool going_down();
|
||||
void start_processor();
|
||||
void stop_processor();
|
||||
};
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user