From f533dda42f86b2bae75e7c3d1a7754d8727d049c Mon Sep 17 00:00:00 2001 From: Jianpeng Ma Date: Mon, 6 Nov 2017 21:40:12 +0800 Subject: [PATCH] common/WorkQueue: fix a race avoiding drain endless. In function ShardedThreadPool::shardedthreadpool_worker, when call _proces, it don't get shardedpool_lock. So there is a race between return_waiting_threads and _proces. If return_waiting_threads first run and _process will lost this signal and wait until. This may cause ShardedThreadPool::drain can't complete. Signed-off-by: Jianpeng Ma --- src/common/WorkQueue.cc | 2 ++ src/common/WorkQueue.h | 1 + src/osd/OSD.cc | 25 ++++++++++++++++--------- src/osd/OSD.h | 13 +++++++++++++ 4 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/common/WorkQueue.cc b/src/common/WorkQueue.cc index 7da8c85d374..5c8c772ab49 100644 --- a/src/common/WorkQueue.cc +++ b/src/common/WorkQueue.cc @@ -416,6 +416,7 @@ void ShardedThreadPool::unpause() ldout(cct,10) << "unpause" << dendl; shardedpool_lock.Lock(); pause_threads = false; + wq->stop_return_waiting_threads(); shardedpool_cond.Signal(); shardedpool_lock.Unlock(); ldout(cct,10) << "unpaused" << dendl; @@ -432,6 +433,7 @@ void ShardedThreadPool::drain() wait_cond.Wait(shardedpool_lock); } drain_threads = false; + wq->stop_return_waiting_threads(); shardedpool_cond.Signal(); shardedpool_lock.Unlock(); ldout(cct,10) << "drained" << dendl; diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 9d055050ad7..acf1787cd4a 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -644,6 +644,7 @@ public: virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0; virtual void return_waiting_threads() = 0; + virtual void stop_return_waiting_threads() = 0; virtual bool is_shard_empty(uint32_t thread_index) = 0; }; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 43c13f1e074..298b95b73d2 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -9754,19 +9754,26 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) // peek at spg_t sdata->sdata_op_ordering_lock.Lock(); if (sdata->pqueue->empty()) { - dout(20) << __func__ << " empty q, waiting" << dendl; - osd->cct->get_heartbeat_map()->clear_timeout(hb); sdata->sdata_lock.Lock(); - sdata->sdata_op_ordering_lock.Unlock(); - sdata->sdata_cond.Wait(sdata->sdata_lock); - sdata->sdata_lock.Unlock(); - sdata->sdata_op_ordering_lock.Lock(); - if (sdata->pqueue->empty()) { + if (!sdata->stop_waiting) { + dout(20) << __func__ << " empty q, waiting" << dendl; + osd->cct->get_heartbeat_map()->clear_timeout(hb); + sdata->sdata_op_ordering_lock.Unlock(); + sdata->sdata_cond.Wait(sdata->sdata_lock); + sdata->sdata_lock.Unlock(); + sdata->sdata_op_ordering_lock.Lock(); + if (sdata->pqueue->empty()) { + sdata->sdata_op_ordering_lock.Unlock(); + return; + } + osd->cct->get_heartbeat_map()->reset_timeout(hb, + osd->cct->_conf->threadpool_default_timeout, 0); + } else { + dout(0) << __func__ << " need return immediately" << dendl; + sdata->sdata_lock.Unlock(); sdata->sdata_op_ordering_lock.Unlock(); return; } - osd->cct->get_heartbeat_map()->reset_timeout(hb, - osd->cct->_conf->threadpool_default_timeout, 0); } OpQueueItem item = sdata->pqueue->dequeue(); if (osd->is_stopping()) { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index eb59f9095c4..9e3d4a0c84e 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1628,6 +1628,8 @@ private: /// priority queue std::unique_ptr> pqueue; + bool stop_waiting = false; + void _enqueue_front(OpQueueItem&& item, unsigned cutoff) { unsigned priority = item.get_priority(); unsigned cost = item.get_cost(); @@ -1723,11 +1725,22 @@ private: ShardData* sdata = shard_list[i]; assert (NULL != sdata); sdata->sdata_lock.Lock(); + sdata->stop_waiting = true; sdata->sdata_cond.Signal(); sdata->sdata_lock.Unlock(); } } + void stop_return_waiting_threads() override { + for(uint32_t i = 0; i < num_shards; i++) { + ShardData* sdata = shard_list[i]; + assert (NULL != sdata); + sdata->sdata_lock.Lock(); + sdata->stop_waiting = false; + sdata->sdata_lock.Unlock(); + } + } + void dump(Formatter *f) { for(uint32_t i = 0; i < num_shards; i++) { auto &&sdata = shard_list[i];