diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index da88c394070..0b05353bd1f 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3383,7 +3383,9 @@ int RGWRados::get_max_chunk_size(const string& placement_rule, const rgw_obj& ob class RGWIndexCompletionManager; struct complete_op_data { + Mutex lock{"complete_op_data"}; AioCompletion *rados_completion{nullptr}; + int manager_shard_id{-1}; RGWIndexCompletionManager *manager{nullptr}; rgw_obj obj; RGWModifyOp op; @@ -3394,6 +3396,13 @@ struct complete_op_data { list remove_objs; bool log_op; uint16_t bilog_op; + + bool stopped{false}; + + void stop() { + Mutex::Locker l(lock); + stopped = true; + } }; class RGWIndexCompletionThread : public RGWRadosThread { @@ -3472,20 +3481,43 @@ int RGWIndexCompletionThread::process() class RGWIndexCompletionManager { RGWRados *store{nullptr}; - Mutex lock{"RGWIndexCompletionManager::lock"}; - set completions; + vector locks; + vector > completions; RGWIndexCompletionThread *completion_thread{nullptr}; + int num_shards; + + atomic_t cur_shard; + public: - RGWIndexCompletionManager(RGWRados *_store) : store(_store) {} + RGWIndexCompletionManager(RGWRados *_store) : store(_store) { + num_shards = store->ctx()->_conf->rgw_thread_pool_size; + + for (int i = 0; i < num_shards; i++) { + char buf[64]; + snprintf(buf, sizeof(buf), "RGWIndexCompletionManager::lock::%d", i); + locks.push_back(new Mutex(buf)); + } + + completions.resize(num_shards); + } ~RGWIndexCompletionManager() { stop(); + + for (auto l : locks) { + delete l; + } } - void create_completion(AioCompletion *rados_completion, - const rgw_obj& obj, + int next_shard() { + int result = cur_shard.read() % num_shards; + cur_shard.inc(); + return result; + } + + void create_completion(const rgw_obj& obj, RGWModifyOp op, string& tag, rgw_bucket_entry_ver& ver, const cls_rgw_obj_key& key, @@ -3510,17 +3542,31 @@ public: delete completion_thread; } - Mutex::Locker l(lock); - for (auto c : completions) { - c->rados_completion->release(); - delete c; + for (int i = 0; i < num_shards; ++i) { + Mutex::Locker l(*locks[i]); + for (auto c : completions[i]) { + Mutex::Locker cl(c->lock); + c->stop(); + } } completions.clear(); } }; -void RGWIndexCompletionManager::create_completion(AioCompletion *rados_completion, - const rgw_obj& obj, +static void obj_complete_cb(completion_t cb, void *arg) +{ + complete_op_data *completion = (complete_op_data *)arg; + completion->lock.Lock(); + if (completion->stopped) { + completion->lock.Unlock(); /* can drop lock, no one else is referencing us */ + delete completion; + } + ((complete_op_data *)arg)->manager->handle_completion(cb, completion); + completion->lock.Unlock(); +} + + +void RGWIndexCompletionManager::create_completion(const rgw_obj& obj, RGWModifyOp op, string& tag, rgw_bucket_entry_ver& ver, const cls_rgw_obj_key& key, @@ -3531,7 +3577,9 @@ void RGWIndexCompletionManager::create_completion(AioCompletion *rados_completio { complete_op_data *entry = new complete_op_data; - entry->rados_completion = rados_completion; + int shard_id = next_shard(); + + entry->manager_shard_id = shard_id; entry->manager = this; entry->obj = obj; entry->op = op; @@ -3545,21 +3593,26 @@ void RGWIndexCompletionManager::create_completion(AioCompletion *rados_completio *result = entry; - Mutex::Locker l(lock); - completions.insert(entry); + entry->rados_completion = librados::Rados::aio_create_completion(entry, NULL, obj_complete_cb); + + Mutex::Locker l(*locks[shard_id]); + completions[shard_id].insert(entry); } void RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg) { + int shard_id = arg->manager_shard_id; { - Mutex::Locker l(lock); + Mutex::Locker l(*locks[shard_id]); - auto iter = completions.find(arg); - if (iter == completions.end()) { + auto& comps = completions[shard_id]; + + auto iter = comps.find(arg); + if (iter == comps.end()) { return; } - completions.erase(iter); + comps.erase(iter); } int r = rados_aio_get_return_value(cb); @@ -12420,11 +12473,6 @@ int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, return bs.index_ctx.operate(bs.bucket_obj, &o); } -static void obj_complete_cb(completion_t cb, void *arg) -{ - ((complete_op_data *)arg)->manager->handle_completion(cb, (complete_op_data *)arg); -} - int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, rgw_bucket_dir_entry& ent, RGWObjCategory category, @@ -12462,11 +12510,10 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModify get_zone().log_data, bilog_flags, zones_trace); complete_op_data *arg; - AioCompletion *c = librados::Rados::aio_create_completion(arg, NULL, obj_complete_cb); - index_completion_manager->create_completion(c, obj, op, tag, ver, key, dir_meta, ro, + index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, ro, get_zone().log_data, bilog_flags, &arg); - int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o); - c->release(); + int ret = bs.index_ctx.aio_operate(bs.bucket_obj, arg->rados_completion, &o); + arg->rados_completion->release(); return ret; }