mirror of
https://github.com/ceph/ceph
synced 2025-01-29 22:43:40 +00:00
Merge PR #24296 into master
* refs/pull/24296/head: osd: Handle is_stopping() by discarding item so it doesn't loop osd: Simplify _process() logic recently added Reviewed-by: Jianpeng Ma <jianpeng.ma@intel.com> Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
commit
b624cea60a
@ -10251,30 +10251,8 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
|
||||
|
||||
// peek at spg_t
|
||||
sdata->shard_lock.Lock();
|
||||
if (is_smallest_thread_index) {
|
||||
if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
|
||||
sdata->sdata_wait_lock.Lock();
|
||||
if (!sdata->stop_waiting) {
|
||||
dout(20) << __func__ << " empty q, waiting" << dendl;
|
||||
osd->cct->get_heartbeat_map()->clear_timeout(hb);
|
||||
sdata->shard_lock.Unlock();
|
||||
sdata->sdata_cond.Wait(sdata->sdata_wait_lock);
|
||||
sdata->sdata_wait_lock.Unlock();
|
||||
sdata->shard_lock.Lock();
|
||||
if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
|
||||
sdata->shard_lock.Unlock();
|
||||
return;
|
||||
}
|
||||
osd->cct->get_heartbeat_map()->reset_timeout(hb,
|
||||
osd->cct->_conf->threadpool_default_timeout, 0);
|
||||
} else {
|
||||
dout(20) << __func__ << " need return immediately" << dendl;
|
||||
sdata->sdata_wait_lock.Unlock();
|
||||
sdata->shard_lock.Unlock();
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else if (sdata->pqueue->empty()) {
|
||||
if (sdata->pqueue->empty() &&
|
||||
!(is_smallest_thread_index && !sdata->context_queue.empty())) {
|
||||
sdata->sdata_wait_lock.Lock();
|
||||
if (!sdata->stop_waiting) {
|
||||
dout(20) << __func__ << " empty q, waiting" << dendl;
|
||||
@ -10283,7 +10261,8 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
|
||||
sdata->sdata_cond.Wait(sdata->sdata_wait_lock);
|
||||
sdata->sdata_wait_lock.Unlock();
|
||||
sdata->shard_lock.Lock();
|
||||
if (sdata->pqueue->empty()) {
|
||||
if (sdata->pqueue->empty() &&
|
||||
!(is_smallest_thread_index && !sdata->context_queue.empty())) {
|
||||
sdata->shard_lock.Unlock();
|
||||
return;
|
||||
}
|
||||
@ -10297,23 +10276,26 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
|
||||
}
|
||||
}
|
||||
|
||||
if (osd->is_stopping()) {
|
||||
sdata->shard_lock.Unlock();
|
||||
return; // OSD shutdown, discard.
|
||||
}
|
||||
|
||||
list<Context *> oncommits;
|
||||
if (is_smallest_thread_index && !sdata->context_queue.empty()) {
|
||||
sdata->context_queue.swap(oncommits);
|
||||
}
|
||||
|
||||
if (sdata->pqueue->empty()) {
|
||||
if (osd->is_stopping()) {
|
||||
sdata->shard_lock.Unlock();
|
||||
return; // OSD shutdown, discard.
|
||||
}
|
||||
sdata->shard_lock.Unlock();
|
||||
handle_oncommits(oncommits);
|
||||
return;
|
||||
}
|
||||
|
||||
OpQueueItem item = sdata->pqueue->dequeue();
|
||||
if (osd->is_stopping()) {
|
||||
sdata->shard_lock.Unlock();
|
||||
return; // OSD shutdown, discard.
|
||||
}
|
||||
|
||||
const auto token = item.get_ordering_token();
|
||||
auto r = sdata->pg_slots.emplace(token, nullptr);
|
||||
|
Loading…
Reference in New Issue
Block a user