mirror of
https://github.com/ceph/ceph
synced 2025-03-09 09:48:09 +00:00
Merge pull request #15284 from majianpeng/bluestore-batch-throotle
os/bluestore: batch throttle Reviewed-by: Igor Fedotov <ifedotov@mirantis.com> Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
commit
0b0c7e56a1
@ -7516,6 +7516,9 @@ void BlueStore::_txc_state_proc(TransContext *txc)
|
||||
kv_queue_unsubmitted.push_back(txc);
|
||||
++txc->osr->kv_committing_serially;
|
||||
}
|
||||
if (txc->had_ios)
|
||||
kv_ios++;
|
||||
kv_throttle_costs += txc->cost;
|
||||
}
|
||||
return;
|
||||
case TransContext::STATE_KV_SUBMITTED:
|
||||
@ -7962,6 +7965,8 @@ void BlueStore::_kv_sync_thread()
|
||||
} else {
|
||||
deque<TransContext*> kv_submitting;
|
||||
deque<DeferredBatch*> deferred_done, deferred_stable;
|
||||
uint64_t aios = 0, costs = 0;
|
||||
|
||||
dout(20) << __func__ << " committing " << kv_queue.size()
|
||||
<< " submitting " << kv_queue_unsubmitted.size()
|
||||
<< " deferred done " << deferred_done_queue.size()
|
||||
@ -7971,6 +7976,10 @@ void BlueStore::_kv_sync_thread()
|
||||
kv_submitting.swap(kv_queue_unsubmitted);
|
||||
deferred_done.swap(deferred_done_queue);
|
||||
deferred_stable.swap(deferred_stable_queue);
|
||||
aios = kv_ios;
|
||||
costs = kv_throttle_costs;
|
||||
kv_ios = 0;
|
||||
kv_throttle_costs = 0;
|
||||
utime_t start = ceph_clock_now();
|
||||
l.unlock();
|
||||
|
||||
@ -7979,20 +7988,13 @@ void BlueStore::_kv_sync_thread()
|
||||
dout(30) << __func__ << " deferred_done " << deferred_done << dendl;
|
||||
dout(30) << __func__ << " deferred_stable " << deferred_stable << dendl;
|
||||
|
||||
int num_aios = 0;
|
||||
for (auto txc : kv_committing) {
|
||||
if (txc->had_ios) {
|
||||
++num_aios;
|
||||
}
|
||||
}
|
||||
|
||||
bool force_flush = false;
|
||||
// if bluefs is sharing the same device as data (only), then we
|
||||
// can rely on the bluefs commit to flush the device and make
|
||||
// deferred aios stable. that means that if we do have done deferred
|
||||
// txcs AND we are not on a single device, we need to force a flush.
|
||||
if (bluefs_single_shared_device && bluefs) {
|
||||
if (num_aios) {
|
||||
if (aios) {
|
||||
force_flush = true;
|
||||
} else if (kv_committing.empty() && kv_submitting.empty() &&
|
||||
deferred_stable.empty()) {
|
||||
@ -8004,7 +8006,7 @@ void BlueStore::_kv_sync_thread()
|
||||
force_flush = true;
|
||||
|
||||
if (force_flush) {
|
||||
dout(20) << __func__ << " num_aios=" << num_aios
|
||||
dout(20) << __func__ << " num_aios=" << aios
|
||||
<< " force_flush=" << (int)force_flush
|
||||
<< ", flushing, deferred done->stable" << dendl;
|
||||
// flush/barrier on block device
|
||||
@ -8063,15 +8065,16 @@ void BlueStore::_kv_sync_thread()
|
||||
--txc->osr->txc_with_unstable_io;
|
||||
}
|
||||
txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
|
||||
// release throttle *before* we commit. this allows new ops
|
||||
// to be prepared and enter pipeline while we are waiting on
|
||||
// the kv commit sync/flush. then hopefully on the next
|
||||
// iteration there will already be ops awake. otherwise, we
|
||||
// end up going to sleep, and then wake up when the very first
|
||||
// transaction is ready for commit.
|
||||
throttle_bytes.put(txc->cost);
|
||||
}
|
||||
|
||||
// release throttle *before* we commit. this allows new ops
|
||||
// to be prepared and enter pipeline while we are waiting on
|
||||
// the kv commit sync/flush. then hopefully on the next
|
||||
// iteration there will already be ops awake. otherwise, we
|
||||
// end up going to sleep, and then wake up when the very first
|
||||
// transaction is ready for commit.
|
||||
throttle_bytes.put(costs);
|
||||
|
||||
PExtentVector bluefs_gift_extents;
|
||||
if (bluefs &&
|
||||
after_flush - bluefs_last_balance >
|
||||
@ -8362,13 +8365,15 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
|
||||
}
|
||||
|
||||
{
|
||||
uint64_t costs = 0;
|
||||
std::lock_guard<std::mutex> l2(osr->qlock);
|
||||
for (auto& i : b->txcs) {
|
||||
TransContext *txc = &i;
|
||||
txc->state = TransContext::STATE_DEFERRED_CLEANUP;
|
||||
txc->osr->qcond.notify_all();
|
||||
throttle_deferred_bytes.put(txc->cost);
|
||||
costs += txc->cost;
|
||||
}
|
||||
osr->qcond.notify_all();
|
||||
throttle_deferred_bytes.put(costs);
|
||||
std::lock_guard<std::mutex> l(kv_lock);
|
||||
deferred_done_queue.emplace_back(b);
|
||||
}
|
||||
|
@ -1863,6 +1863,9 @@ private:
|
||||
|
||||
std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
|
||||
|
||||
uint64_t kv_ios = 0;
|
||||
uint64_t kv_throttle_costs = 0;
|
||||
|
||||
// cache trim control
|
||||
|
||||
// note that these update in a racy way, but we don't *really* care if
|
||||
|
Loading…
Reference in New Issue
Block a user