mirror of
https://github.com/ceph/ceph
synced 2025-03-25 11:48:05 +00:00
Merge pull request #14035 from liewegas/wip-bluestore-kv-finisher
os/bluestore: separate kv_sync_thread into two parts Reviewed-by: Igor Fedotov <ifedotov@mirantis.com> Reviewed-by: Mark Nelson <mnelson@redhat.com>
This commit is contained in:
commit
ff04315240
@ -3281,6 +3281,7 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
|
||||
cct->_conf->bluestore_throttle_bytes +
|
||||
cct->_conf->bluestore_throttle_deferred_bytes),
|
||||
kv_sync_thread(this),
|
||||
kv_finalize_thread(this),
|
||||
mempool_thread(this)
|
||||
{
|
||||
_init_logger();
|
||||
@ -3309,6 +3310,7 @@ BlueStore::BlueStore(CephContext *cct,
|
||||
cct->_conf->bluestore_throttle_bytes +
|
||||
cct->_conf->bluestore_throttle_deferred_bytes),
|
||||
kv_sync_thread(this),
|
||||
kv_finalize_thread(this),
|
||||
min_alloc_size(_min_alloc_size),
|
||||
min_alloc_size_order(ctz(_min_alloc_size)),
|
||||
mempool_thread(this)
|
||||
@ -5012,6 +5014,7 @@ int BlueStore::_mount(bool kv_only)
|
||||
f->start();
|
||||
}
|
||||
kv_sync_thread.create("bstore_kv_sync");
|
||||
kv_finalize_thread.create("bstore_kv_final");
|
||||
|
||||
r = _deferred_replay();
|
||||
if (r < 0)
|
||||
@ -7901,6 +7904,10 @@ void BlueStore::_osr_drain_all()
|
||||
std::lock_guard<std::mutex> l(kv_lock);
|
||||
kv_cond.notify_one();
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> l(kv_finalize_lock);
|
||||
kv_finalize_cond.notify_one();
|
||||
}
|
||||
for (auto osr : s) {
|
||||
dout(20) << __func__ << " drain " << osr << dendl;
|
||||
osr->drain();
|
||||
@ -8128,32 +8135,6 @@ void BlueStore::_kv_sync_thread()
|
||||
logger->tinc(l_bluestore_kv_commit_lat, dur_kv);
|
||||
logger->tinc(l_bluestore_kv_lat, dur);
|
||||
}
|
||||
while (!kv_committing.empty()) {
|
||||
TransContext *txc = kv_committing.front();
|
||||
assert(txc->state == TransContext::STATE_KV_SUBMITTED);
|
||||
_txc_state_proc(txc);
|
||||
kv_committing.pop_front();
|
||||
}
|
||||
for (auto b : deferred_stable) {
|
||||
auto p = b->txcs.begin();
|
||||
while (p != b->txcs.end()) {
|
||||
TransContext *txc = &*p;
|
||||
p = b->txcs.erase(p); // unlink here because
|
||||
_txc_state_proc(txc); // this may destroy txc
|
||||
}
|
||||
delete b;
|
||||
}
|
||||
|
||||
if (!deferred_aggressive) {
|
||||
std::lock_guard<std::mutex> l(deferred_lock);
|
||||
if (deferred_queue_size >= deferred_batch_ops ||
|
||||
throttle_deferred_bytes.past_midpoint()) {
|
||||
_deferred_try_submit();
|
||||
}
|
||||
}
|
||||
|
||||
// this is as good a place as any ...
|
||||
_reap_collections();
|
||||
|
||||
if (bluefs) {
|
||||
if (!bluefs_gift_extents.empty()) {
|
||||
@ -8170,6 +8151,29 @@ void BlueStore::_kv_sync_thread()
|
||||
bluefs_extents_reclaiming.clear();
|
||||
}
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> m(kv_finalize_lock);
|
||||
if (kv_committing_to_finalize.empty()) {
|
||||
kv_committing_to_finalize.swap(kv_committing);
|
||||
} else {
|
||||
kv_committing_to_finalize.insert(
|
||||
kv_committing_to_finalize.end(),
|
||||
kv_committing.begin(),
|
||||
kv_committing.end());
|
||||
kv_committing.clear();
|
||||
}
|
||||
if (deferred_stable_to_finalize.empty()) {
|
||||
deferred_stable_to_finalize.swap(deferred_stable);
|
||||
} else {
|
||||
deferred_stable_to_finalize.insert(
|
||||
deferred_stable_to_finalize.end(),
|
||||
deferred_stable.begin(),
|
||||
deferred_stable.end());
|
||||
deferred_stable.clear();
|
||||
}
|
||||
kv_finalize_cond.notify_one();
|
||||
}
|
||||
|
||||
l.lock();
|
||||
// previously deferred "done" are now "stable" by virtue of this
|
||||
// commit cycle.
|
||||
@ -8179,6 +8183,63 @@ void BlueStore::_kv_sync_thread()
|
||||
dout(10) << __func__ << " finish" << dendl;
|
||||
}
|
||||
|
||||
void BlueStore::_kv_finalize_thread()
|
||||
{
|
||||
deque<TransContext*> kv_committed;
|
||||
deque<DeferredBatch*> deferred_stable;
|
||||
dout(10) << __func__ << " start" << dendl;
|
||||
std::unique_lock<std::mutex> l(kv_finalize_lock);
|
||||
while (true) {
|
||||
assert(kv_committed.empty());
|
||||
assert(deferred_stable.empty());
|
||||
if (kv_committing_to_finalize.empty() &&
|
||||
deferred_stable_to_finalize.empty()) {
|
||||
if (kv_stop)
|
||||
break;
|
||||
dout(20) << __func__ << " sleep" << dendl;
|
||||
kv_finalize_cond.wait(l);
|
||||
dout(20) << __func__ << " wake" << dendl;
|
||||
} else {
|
||||
kv_committed.swap(kv_committing_to_finalize);
|
||||
deferred_stable.swap(deferred_stable_to_finalize);
|
||||
l.unlock();
|
||||
dout(20) << __func__ << " kv_committed " << kv_committed << dendl;
|
||||
dout(20) << __func__ << " deferred_stable " << deferred_stable << dendl;
|
||||
|
||||
while (!kv_committed.empty()) {
|
||||
TransContext *txc = kv_committed.front();
|
||||
assert(txc->state == TransContext::STATE_KV_SUBMITTED);
|
||||
_txc_state_proc(txc);
|
||||
kv_committed.pop_front();
|
||||
}
|
||||
for (auto b : deferred_stable) {
|
||||
auto p = b->txcs.begin();
|
||||
while (p != b->txcs.end()) {
|
||||
TransContext *txc = &*p;
|
||||
p = b->txcs.erase(p); // unlink here because
|
||||
_txc_state_proc(txc); // this may destroy txc
|
||||
}
|
||||
delete b;
|
||||
}
|
||||
deferred_stable.clear();
|
||||
|
||||
if (!deferred_aggressive) {
|
||||
std::lock_guard<std::mutex> l(deferred_lock);
|
||||
if (deferred_queue_size >= deferred_batch_ops ||
|
||||
throttle_deferred_bytes.past_midpoint()) {
|
||||
_deferred_try_submit();
|
||||
}
|
||||
}
|
||||
|
||||
// this is as good a place as any ...
|
||||
_reap_collections();
|
||||
|
||||
l.lock();
|
||||
}
|
||||
}
|
||||
dout(10) << __func__ << " finish" << dendl;
|
||||
}
|
||||
|
||||
bluestore_deferred_op_t *BlueStore::_get_deferred_op(
|
||||
TransContext *txc, OnodeRef o)
|
||||
{
|
||||
|
@ -1729,6 +1729,14 @@ public:
|
||||
return NULL;
|
||||
}
|
||||
};
|
||||
struct KVFinalizeThread : public Thread {
|
||||
BlueStore *store;
|
||||
explicit KVFinalizeThread(BlueStore *s) : store(s) {}
|
||||
void *entry() {
|
||||
store->_kv_finalize_thread();
|
||||
return NULL;
|
||||
}
|
||||
};
|
||||
|
||||
struct DBHistogram {
|
||||
struct value_dist {
|
||||
@ -1809,6 +1817,12 @@ private:
|
||||
deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
|
||||
deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
|
||||
|
||||
KVFinalizeThread kv_finalize_thread;
|
||||
std::mutex kv_finalize_lock;
|
||||
std::condition_variable kv_finalize_cond;
|
||||
deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
|
||||
deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
|
||||
|
||||
PerfCounters *logger = nullptr;
|
||||
|
||||
std::mutex reap_lock;
|
||||
@ -1968,13 +1982,20 @@ private:
|
||||
void _osr_unregister_all();
|
||||
|
||||
void _kv_sync_thread();
|
||||
void _kv_finalize_thread();
|
||||
void _kv_stop() {
|
||||
{
|
||||
std::lock_guard<std::mutex> l(kv_lock);
|
||||
kv_stop = true;
|
||||
kv_cond.notify_all();
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> l(kv_finalize_lock);
|
||||
kv_finalize_cond.notify_all();
|
||||
}
|
||||
|
||||
kv_sync_thread.join();
|
||||
kv_finalize_thread.join();
|
||||
{
|
||||
std::lock_guard<std::mutex> l(kv_lock);
|
||||
kv_stop = false;
|
||||
|
@ -106,6 +106,8 @@ bluestore=0
|
||||
rgw_frontend="civetweb"
|
||||
lockdep=${LOCKDEP:-1}
|
||||
|
||||
filestore_path=
|
||||
|
||||
VSTART_SEC="client.vstart.sh"
|
||||
|
||||
MON_ADDR=""
|
||||
@ -249,6 +251,10 @@ case $1 in
|
||||
rgw_frontend=$2
|
||||
shift
|
||||
;;
|
||||
--filestore_path )
|
||||
filestore_path=$2
|
||||
shift
|
||||
;;
|
||||
-m )
|
||||
[ -z "$2" ] && usage_exit
|
||||
MON_ADDR=$2
|
||||
@ -575,7 +581,11 @@ EOF
|
||||
if command -v btrfs > /dev/null; then
|
||||
for f in $CEPH_DEV_DIR/osd$osd/*; do btrfs sub delete $f &> /dev/null || true; done
|
||||
fi
|
||||
mkdir -p $CEPH_DEV_DIR/osd$osd
|
||||
if [ -n "$filestore_path" ]; then
|
||||
ln -s $filestore_path $CEPH_DEV_DIR/osd$osd
|
||||
else
|
||||
mkdir -p $CEPH_DEV_DIR/osd$osd
|
||||
fi
|
||||
|
||||
local uuid=`uuidgen`
|
||||
echo "add osd$osd $uuid"
|
||||
|
Loading…
Reference in New Issue
Block a user