1
0
mirror of https://github.com/ceph/ceph synced 2025-01-09 20:52:09 +00:00

Merge pull request from majianpeng/optimize-mutex-contention

os/bluestore: optimize mutex contention

Reviewed-by: Igor Fedotov <ifedotov@suse.com>
This commit is contained in:
Kefu Chai 2021-01-01 12:12:43 +08:00 committed by GitHub
commit d2d1c23a51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 99 additions and 57 deletions

View File

@ -191,25 +191,31 @@ bool Throttle::get_or_fail(int64_t c)
}
assert (c >= 0);
std::lock_guard l(lock);
if (_should_wait(c) || !conds.empty()) {
ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
if (logger) {
logger->inc(l_throttle_get_or_fail_fail);
bool result = false;
{
std::lock_guard l(lock);
if (_should_wait(c) || !conds.empty()) {
ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
result = false;
} else {
ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load()
<< " -> " << (count.load() + c) << ")" << dendl;
count += c;
result = true;
}
return false;
} else {
ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load()
<< " -> " << (count.load() + c) << ")" << dendl;
count += c;
if (logger) {
}
if (logger) {
if (result) {
logger->inc(l_throttle_get_or_fail_success);
logger->inc(l_throttle_get);
logger->inc(l_throttle_get_sum, c);
logger->set(l_throttle_val, count);
} else {
logger->inc(l_throttle_get_or_fail_fail);
}
return true;
}
return result;
}
int64_t Throttle::put(int64_t c)
@ -222,19 +228,22 @@ int64_t Throttle::put(int64_t c)
ceph_assert(c >= 0);
ldout(cct, 10) << "put " << c << " (" << count.load() << " -> "
<< (count.load()-c) << ")" << dendl;
std::lock_guard l(lock);
if (c) {
if (!conds.empty())
conds.front().notify_one();
// if count goes negative, we failed somewhere!
ceph_assert(count >= c);
count -= c;
if (logger) {
logger->inc(l_throttle_put);
logger->inc(l_throttle_put_sum, c);
logger->set(l_throttle_val, count);
{
std::lock_guard l(lock);
if (c) {
if (!conds.empty())
conds.front().notify_one();
// if count goes negative, we failed somewhere!
ceph_assert(count >= c);
count -= c;
}
}
if (logger) {
logger->inc(l_throttle_put);
logger->inc(l_throttle_put_sum, c);
logger->set(l_throttle_val, count);
}
return count;
}

View File

@ -11676,11 +11676,11 @@ void BlueStore::_osr_drain_preceding(TransContext *txc)
++deferred_aggressive; // FIXME: maybe osr-local aggressive flag?
{
// submit anything pending
deferred_lock.lock();
osr->deferred_lock.lock();
if (osr->deferred_pending && !osr->deferred_running) {
_deferred_submit_unlock(osr);
} else {
deferred_lock.unlock();
osr->deferred_lock.unlock();
}
}
{
@ -11702,11 +11702,11 @@ void BlueStore::_osr_drain(OpSequencer *osr)
++deferred_aggressive; // FIXME: maybe osr-local aggressive flag?
{
// submit anything pending
deferred_lock.lock();
osr->deferred_lock.lock();
if (osr->deferred_pending && !osr->deferred_running) {
_deferred_submit_unlock(osr);
} else {
deferred_lock.unlock();
osr->deferred_lock.unlock();
}
}
{
@ -12238,59 +12238,86 @@ bluestore_deferred_op_t *BlueStore::_get_deferred_op(
void BlueStore::_deferred_queue(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << " osr " << txc->osr << dendl;
deferred_lock.lock();
if (!txc->osr->deferred_pending &&
!txc->osr->deferred_running) {
deferred_queue.push_back(*txc->osr);
DeferredBatch *tmp;
{
txc->osr->deferred_lock.lock();
if (!txc->osr->deferred_pending) {
tmp = new DeferredBatch(cct, txc->osr.get());
} else {
tmp = txc->osr->deferred_pending;
txc->osr->deferred_pending = nullptr;
}
txc->osr->deferred_lock.unlock();
}
if (!txc->osr->deferred_pending) {
txc->osr->deferred_pending = new DeferredBatch(cct, txc->osr.get());
}
++deferred_queue_size;
txc->osr->deferred_pending->txcs.push_back(*txc);
tmp->txcs.push_back(*txc);
bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
for (auto opi = wt.ops.begin(); opi != wt.ops.end(); ++opi) {
const auto& op = *opi;
ceph_assert(op.op == bluestore_deferred_op_t::OP_WRITE);
bufferlist::const_iterator p = op.data.begin();
for (auto e : op.extents) {
txc->osr->deferred_pending->prepare_write(
cct, wt.seq, e.offset, e.length, p);
tmp->prepare_write(cct, wt.seq, e.offset, e.length, p);
}
}
if (deferred_aggressive &&
!txc->osr->deferred_running) {
_deferred_submit_unlock(txc->osr.get());
} else {
deferred_lock.unlock();
{
txc->osr->deferred_lock.lock();
++deferred_queue_size;
txc->osr->deferred_pending = tmp;
// condition "tmp->txcs.size() == 1" mean deferred_pending was originally empty.
// So we should add osr into deferred_queue.
if (!txc->osr->deferred_running && (tmp->txcs.size() == 1)) {
deferred_lock.lock();
deferred_queue.push_back(*txc->osr);
deferred_lock.unlock();
}
if (deferred_aggressive &&
!txc->osr->deferred_running) {
_deferred_submit_unlock(txc->osr.get());
} else {
txc->osr->deferred_lock.unlock();
}
}
}
}
void BlueStore::deferred_try_submit()
{
dout(20) << __func__ << " " << deferred_queue.size() << " osrs, "
<< deferred_queue_size << " txcs" << dendl;
std::lock_guard l(deferred_lock);
vector<OpSequencerRef> osrs;
osrs.reserve(deferred_queue.size());
for (auto& osr : deferred_queue) {
osrs.push_back(&osr);
{
std::lock_guard l(deferred_lock);
osrs.reserve(deferred_queue.size());
for (auto& osr : deferred_queue) {
osrs.push_back(&osr);
}
}
for (auto& osr : osrs) {
osr->deferred_lock.lock();
if (osr->deferred_pending) {
if (!osr->deferred_running) {
_deferred_submit_unlock(osr.get());
deferred_lock.lock();
} else {
osr->deferred_lock.unlock();
dout(20) << __func__ << " osr " << osr << " already has running"
<< dendl;
}
} else {
osr->deferred_lock.unlock();
dout(20) << __func__ << " osr " << osr << " has no pending" << dendl;
}
}
deferred_last_submitted = ceph_clock_now();
{
std::lock_guard l(deferred_lock);
deferred_last_submitted = ceph_clock_now();
}
}
void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
@ -12308,7 +12335,7 @@ void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
osr->deferred_running = osr->deferred_pending;
osr->deferred_pending = nullptr;
deferred_lock.unlock();
osr->deferred_lock.unlock();
for (auto& txc : b->txcs) {
throttle.log_state_latency(txc, logger, l_bluestore_state_deferred_queued_lat);
@ -12365,16 +12392,20 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
DeferredBatch *b = osr->deferred_running;
{
deferred_lock.lock();
osr->deferred_lock.lock();
ceph_assert(osr->deferred_running == b);
osr->deferred_running = nullptr;
if (!osr->deferred_pending) {
dout(20) << __func__ << " dequeueing" << dendl;
auto q = deferred_queue.iterator_to(*osr);
deferred_queue.erase(q);
deferred_lock.unlock();
{
deferred_lock.lock();
auto q = deferred_queue.iterator_to(*osr);
deferred_queue.erase(q);
deferred_lock.unlock();
}
osr->deferred_lock.unlock();
} else {
deferred_lock.unlock();
osr->deferred_lock.unlock();
if (deferred_aggressive) {
dout(20) << __func__ << " queuing async deferred_try_submit" << dendl;
finisher.queue(new C_DeferredTrySubmit(this));

View File

@ -1853,6 +1853,8 @@ public:
DeferredBatch *deferred_running = nullptr;
DeferredBatch *deferred_pending = nullptr;
ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::OpSequencer::deferred_lock");
BlueStore *store;
coll_t cid;
@ -2082,7 +2084,7 @@ private:
ceph::make_mutex("BlueStore::atomic_alloc_and_submit_lock");
std::atomic<uint64_t> deferred_seq = {0};
deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
int deferred_queue_size = 0; ///< num txc's queued across all osrs
std::atomic_int deferred_queue_size = {0}; ///< num txc's queued across all osrs
std::atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
Finisher finisher;
utime_t deferred_last_submitted = utime_t();