rgw: reshard complete handling, use multiple locks

to reduce contention. Also, fix completions cleanup.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Yehuda Sadeh 2017-05-16 18:10:15 -07:00
parent da1f5bc910
commit 94e3c380ab

View File

@ -3383,7 +3383,9 @@ int RGWRados::get_max_chunk_size(const string& placement_rule, const rgw_obj& ob
class RGWIndexCompletionManager;
struct complete_op_data {
Mutex lock{"complete_op_data"};
AioCompletion *rados_completion{nullptr};
int manager_shard_id{-1};
RGWIndexCompletionManager *manager{nullptr};
rgw_obj obj;
RGWModifyOp op;
@ -3394,6 +3396,13 @@ struct complete_op_data {
list<cls_rgw_obj_key> remove_objs;
bool log_op;
uint16_t bilog_op;
bool stopped{false};
void stop() {
Mutex::Locker l(lock);
stopped = true;
}
};
class RGWIndexCompletionThread : public RGWRadosThread {
@ -3472,20 +3481,43 @@ int RGWIndexCompletionThread::process()
class RGWIndexCompletionManager {
RGWRados *store{nullptr};
Mutex lock{"RGWIndexCompletionManager::lock"};
set<complete_op_data *> completions;
vector<Mutex *> locks;
vector<set<complete_op_data *> > completions;
RGWIndexCompletionThread *completion_thread{nullptr};
int num_shards;
atomic_t cur_shard;
public:
RGWIndexCompletionManager(RGWRados *_store) : store(_store) {}
RGWIndexCompletionManager(RGWRados *_store) : store(_store) {
num_shards = store->ctx()->_conf->rgw_thread_pool_size;
for (int i = 0; i < num_shards; i++) {
char buf[64];
snprintf(buf, sizeof(buf), "RGWIndexCompletionManager::lock::%d", i);
locks.push_back(new Mutex(buf));
}
completions.resize(num_shards);
}
~RGWIndexCompletionManager() {
stop();
for (auto l : locks) {
delete l;
}
}
void create_completion(AioCompletion *rados_completion,
const rgw_obj& obj,
int next_shard() {
int result = cur_shard.read() % num_shards;
cur_shard.inc();
return result;
}
void create_completion(const rgw_obj& obj,
RGWModifyOp op, string& tag,
rgw_bucket_entry_ver& ver,
const cls_rgw_obj_key& key,
@ -3510,17 +3542,31 @@ public:
delete completion_thread;
}
Mutex::Locker l(lock);
for (auto c : completions) {
c->rados_completion->release();
delete c;
for (int i = 0; i < num_shards; ++i) {
Mutex::Locker l(*locks[i]);
for (auto c : completions[i]) {
Mutex::Locker cl(c->lock);
c->stop();
}
}
completions.clear();
}
};
void RGWIndexCompletionManager::create_completion(AioCompletion *rados_completion,
const rgw_obj& obj,
static void obj_complete_cb(completion_t cb, void *arg)
{
complete_op_data *completion = (complete_op_data *)arg;
completion->lock.Lock();
if (completion->stopped) {
completion->lock.Unlock(); /* can drop lock, no one else is referencing us */
delete completion;
}
((complete_op_data *)arg)->manager->handle_completion(cb, completion);
completion->lock.Unlock();
}
void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
RGWModifyOp op, string& tag,
rgw_bucket_entry_ver& ver,
const cls_rgw_obj_key& key,
@ -3531,7 +3577,9 @@ void RGWIndexCompletionManager::create_completion(AioCompletion *rados_completio
{
complete_op_data *entry = new complete_op_data;
entry->rados_completion = rados_completion;
int shard_id = next_shard();
entry->manager_shard_id = shard_id;
entry->manager = this;
entry->obj = obj;
entry->op = op;
@ -3545,21 +3593,26 @@ void RGWIndexCompletionManager::create_completion(AioCompletion *rados_completio
*result = entry;
Mutex::Locker l(lock);
completions.insert(entry);
entry->rados_completion = librados::Rados::aio_create_completion(entry, NULL, obj_complete_cb);
Mutex::Locker l(*locks[shard_id]);
completions[shard_id].insert(entry);
}
void RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg)
{
int shard_id = arg->manager_shard_id;
{
Mutex::Locker l(lock);
Mutex::Locker l(*locks[shard_id]);
auto iter = completions.find(arg);
if (iter == completions.end()) {
auto& comps = completions[shard_id];
auto iter = comps.find(arg);
if (iter == comps.end()) {
return;
}
completions.erase(iter);
comps.erase(iter);
}
int r = rados_aio_get_return_value(cb);
@ -12420,11 +12473,6 @@ int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
return bs.index_ctx.operate(bs.bucket_obj, &o);
}
static void obj_complete_cb(completion_t cb, void *arg)
{
((complete_op_data *)arg)->manager->handle_completion(cb, (complete_op_data *)arg);
}
int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag,
int64_t pool, uint64_t epoch,
rgw_bucket_dir_entry& ent, RGWObjCategory category,
@ -12462,11 +12510,10 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModify
get_zone().log_data, bilog_flags, zones_trace);
complete_op_data *arg;
AioCompletion *c = librados::Rados::aio_create_completion(arg, NULL, obj_complete_cb);
index_completion_manager->create_completion(c, obj, op, tag, ver, key, dir_meta, ro,
index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, ro,
get_zone().log_data, bilog_flags, &arg);
int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
c->release();
int ret = bs.index_ctx.aio_operate(bs.bucket_obj, arg->rados_completion, &o);
arg->rados_completion->release();
return ret;
}