mirror of
https://github.com/ceph/ceph
synced 2025-03-25 11:48:05 +00:00
common/WorkQueue: fix a race avoiding drain endless.
In function ShardedThreadPool::shardedthreadpool_worker, when call _proces, it don't get shardedpool_lock. So there is a race between return_waiting_threads and _proces. If return_waiting_threads first run and _process will lost this signal and wait until. This may cause ShardedThreadPool::drain can't complete. Signed-off-by: Jianpeng Ma <jianpeng.ma@intel.com>
This commit is contained in:
parent
fb6eca1fb2
commit
f533dda42f
src
@ -416,6 +416,7 @@ void ShardedThreadPool::unpause()
|
|||||||
ldout(cct,10) << "unpause" << dendl;
|
ldout(cct,10) << "unpause" << dendl;
|
||||||
shardedpool_lock.Lock();
|
shardedpool_lock.Lock();
|
||||||
pause_threads = false;
|
pause_threads = false;
|
||||||
|
wq->stop_return_waiting_threads();
|
||||||
shardedpool_cond.Signal();
|
shardedpool_cond.Signal();
|
||||||
shardedpool_lock.Unlock();
|
shardedpool_lock.Unlock();
|
||||||
ldout(cct,10) << "unpaused" << dendl;
|
ldout(cct,10) << "unpaused" << dendl;
|
||||||
@ -432,6 +433,7 @@ void ShardedThreadPool::drain()
|
|||||||
wait_cond.Wait(shardedpool_lock);
|
wait_cond.Wait(shardedpool_lock);
|
||||||
}
|
}
|
||||||
drain_threads = false;
|
drain_threads = false;
|
||||||
|
wq->stop_return_waiting_threads();
|
||||||
shardedpool_cond.Signal();
|
shardedpool_cond.Signal();
|
||||||
shardedpool_lock.Unlock();
|
shardedpool_lock.Unlock();
|
||||||
ldout(cct,10) << "drained" << dendl;
|
ldout(cct,10) << "drained" << dendl;
|
||||||
|
@ -644,6 +644,7 @@ public:
|
|||||||
|
|
||||||
virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
|
virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
|
||||||
virtual void return_waiting_threads() = 0;
|
virtual void return_waiting_threads() = 0;
|
||||||
|
virtual void stop_return_waiting_threads() = 0;
|
||||||
virtual bool is_shard_empty(uint32_t thread_index) = 0;
|
virtual bool is_shard_empty(uint32_t thread_index) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -9754,19 +9754,26 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
|
|||||||
// peek at spg_t
|
// peek at spg_t
|
||||||
sdata->sdata_op_ordering_lock.Lock();
|
sdata->sdata_op_ordering_lock.Lock();
|
||||||
if (sdata->pqueue->empty()) {
|
if (sdata->pqueue->empty()) {
|
||||||
dout(20) << __func__ << " empty q, waiting" << dendl;
|
|
||||||
osd->cct->get_heartbeat_map()->clear_timeout(hb);
|
|
||||||
sdata->sdata_lock.Lock();
|
sdata->sdata_lock.Lock();
|
||||||
sdata->sdata_op_ordering_lock.Unlock();
|
if (!sdata->stop_waiting) {
|
||||||
sdata->sdata_cond.Wait(sdata->sdata_lock);
|
dout(20) << __func__ << " empty q, waiting" << dendl;
|
||||||
sdata->sdata_lock.Unlock();
|
osd->cct->get_heartbeat_map()->clear_timeout(hb);
|
||||||
sdata->sdata_op_ordering_lock.Lock();
|
sdata->sdata_op_ordering_lock.Unlock();
|
||||||
if (sdata->pqueue->empty()) {
|
sdata->sdata_cond.Wait(sdata->sdata_lock);
|
||||||
|
sdata->sdata_lock.Unlock();
|
||||||
|
sdata->sdata_op_ordering_lock.Lock();
|
||||||
|
if (sdata->pqueue->empty()) {
|
||||||
|
sdata->sdata_op_ordering_lock.Unlock();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
osd->cct->get_heartbeat_map()->reset_timeout(hb,
|
||||||
|
osd->cct->_conf->threadpool_default_timeout, 0);
|
||||||
|
} else {
|
||||||
|
dout(0) << __func__ << " need return immediately" << dendl;
|
||||||
|
sdata->sdata_lock.Unlock();
|
||||||
sdata->sdata_op_ordering_lock.Unlock();
|
sdata->sdata_op_ordering_lock.Unlock();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
osd->cct->get_heartbeat_map()->reset_timeout(hb,
|
|
||||||
osd->cct->_conf->threadpool_default_timeout, 0);
|
|
||||||
}
|
}
|
||||||
OpQueueItem item = sdata->pqueue->dequeue();
|
OpQueueItem item = sdata->pqueue->dequeue();
|
||||||
if (osd->is_stopping()) {
|
if (osd->is_stopping()) {
|
||||||
|
@ -1628,6 +1628,8 @@ private:
|
|||||||
/// priority queue
|
/// priority queue
|
||||||
std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
|
std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
|
||||||
|
|
||||||
|
bool stop_waiting = false;
|
||||||
|
|
||||||
void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
|
void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
|
||||||
unsigned priority = item.get_priority();
|
unsigned priority = item.get_priority();
|
||||||
unsigned cost = item.get_cost();
|
unsigned cost = item.get_cost();
|
||||||
@ -1723,11 +1725,22 @@ private:
|
|||||||
ShardData* sdata = shard_list[i];
|
ShardData* sdata = shard_list[i];
|
||||||
assert (NULL != sdata);
|
assert (NULL != sdata);
|
||||||
sdata->sdata_lock.Lock();
|
sdata->sdata_lock.Lock();
|
||||||
|
sdata->stop_waiting = true;
|
||||||
sdata->sdata_cond.Signal();
|
sdata->sdata_cond.Signal();
|
||||||
sdata->sdata_lock.Unlock();
|
sdata->sdata_lock.Unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void stop_return_waiting_threads() override {
|
||||||
|
for(uint32_t i = 0; i < num_shards; i++) {
|
||||||
|
ShardData* sdata = shard_list[i];
|
||||||
|
assert (NULL != sdata);
|
||||||
|
sdata->sdata_lock.Lock();
|
||||||
|
sdata->stop_waiting = false;
|
||||||
|
sdata->sdata_lock.Unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void dump(Formatter *f) {
|
void dump(Formatter *f) {
|
||||||
for(uint32_t i = 0; i < num_shards; i++) {
|
for(uint32_t i = 0; i < num_shards; i++) {
|
||||||
auto &&sdata = shard_list[i];
|
auto &&sdata = shard_list[i];
|
||||||
|
Loading…
Reference in New Issue
Block a user