From b1f5565c2f78d9c3f7cad877ea5ceb7611430ade Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Wed, 17 Jul 2019 16:25:09 +0800 Subject: [PATCH] kv: s/Mutex/ceph::mutex/ Signed-off-by: Kefu Chai --- src/kv/LevelDBStore.cc | 19 +++++++++---------- src/kv/LevelDBStore.h | 6 +++--- src/kv/RocksDBStore.cc | 19 +++++++++---------- src/kv/RocksDBStore.h | 6 +++--- 4 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/kv/LevelDBStore.cc b/src/kv/LevelDBStore.cc index a02aacf23ff..eba0778a766 100644 --- a/src/kv/LevelDBStore.cc +++ b/src/kv/LevelDBStore.cc @@ -180,14 +180,14 @@ LevelDBStore::~LevelDBStore() void LevelDBStore::close() { // stop compaction thread - compact_queue_lock.Lock(); + compact_queue_lock.lock(); if (compact_thread.is_started()) { compact_queue_stop = true; - compact_queue_cond.Signal(); - compact_queue_lock.Unlock(); + compact_queue_cond.notify_all(); + compact_queue_lock.unlock(); compact_thread.join(); } else { - compact_queue_lock.Unlock(); + compact_queue_lock.unlock(); } if (logger) @@ -383,27 +383,26 @@ void LevelDBStore::compact() void LevelDBStore::compact_thread_entry() { - compact_queue_lock.Lock(); + std::unique_lock l{compact_queue_lock}; while (!compact_queue_stop) { while (!compact_queue.empty()) { pair range = compact_queue.front(); compact_queue.pop_front(); logger->set(l_leveldb_compact_queue_len, compact_queue.size()); - compact_queue_lock.Unlock(); + l.unlock(); logger->inc(l_leveldb_compact_range); if (range.first.empty() && range.second.empty()) { compact(); } else { compact_range(range.first, range.second); } - compact_queue_lock.Lock(); + l.lock(); continue; } if (compact_queue_stop) break; - compact_queue_cond.Wait(compact_queue_lock); + compact_queue_cond.wait(l); } - compact_queue_lock.Unlock(); } void LevelDBStore::compact_range_async(const string& start, const string& end) @@ -440,7 +439,7 @@ void LevelDBStore::compact_range_async(const string& start, const string& end) compact_queue.push_back(make_pair(start, end)); logger->set(l_leveldb_compact_queue_len, compact_queue.size()); } - compact_queue_cond.Signal(); + compact_queue_cond.notify_all(); if (!compact_thread.is_started()) { compact_thread.create("levdbst_compact"); } diff --git a/src/kv/LevelDBStore.h b/src/kv/LevelDBStore.h index 34fdb7a6935..32680f73659 100644 --- a/src/kv/LevelDBStore.h +++ b/src/kv/LevelDBStore.h @@ -69,8 +69,9 @@ class LevelDBStore : public KeyValueDB { int do_open(ostream &out, bool create_if_missing); // manage async compactions - Mutex compact_queue_lock; - Cond compact_queue_cond; + ceph::mutex compact_queue_lock = + ceph::make_mutex("LevelDBStore::compact_thread_lock"); + ceph::condition_variable compact_queue_cond; list< pair > compact_queue; bool compact_queue_stop; class CompactThread : public Thread { @@ -166,7 +167,6 @@ public: #ifdef HAVE_LEVELDB_FILTER_POLICY filterpolicy(NULL), #endif - compact_queue_lock("LevelDBStore::compact_thread_lock"), compact_queue_stop(false), compact_thread(this), options() diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index 4756697f270..0a4be611ce8 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -650,14 +650,14 @@ RocksDBStore::~RocksDBStore() void RocksDBStore::close() { // stop compaction thread - compact_queue_lock.Lock(); + compact_queue_lock.lock(); if (compact_thread.is_started()) { compact_queue_stop = true; - compact_queue_cond.Signal(); - compact_queue_lock.Unlock(); + compact_queue_cond.notify_all(); + compact_queue_lock.unlock(); compact_thread.join(); } else { - compact_queue_lock.Unlock(); + compact_queue_lock.unlock(); } if (logger) @@ -1288,25 +1288,24 @@ void RocksDBStore::compact() void RocksDBStore::compact_thread_entry() { - compact_queue_lock.Lock(); + std::unique_lock l{compact_queue_lock}; while (!compact_queue_stop) { while (!compact_queue.empty()) { pair range = compact_queue.front(); compact_queue.pop_front(); logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); - compact_queue_lock.Unlock(); + l.unlock(); logger->inc(l_rocksdb_compact_range); if (range.first.empty() && range.second.empty()) { compact(); } else { compact_range(range.first, range.second); } - compact_queue_lock.Lock(); + l.lock(); continue; } - compact_queue_cond.Wait(compact_queue_lock); + compact_queue_cond.wait(l); } - compact_queue_lock.Unlock(); } void RocksDBStore::compact_range_async(const string& start, const string& end) @@ -1346,7 +1345,7 @@ void RocksDBStore::compact_range_async(const string& start, const string& end) compact_queue.push_back(make_pair(start, end)); logger->set(l_rocksdb_compact_queue_len, compact_queue.size()); } - compact_queue_cond.Signal(); + compact_queue_cond.notify_all(); if (!compact_thread.is_started()) { compact_thread.create("rstore_compact"); } diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index ea7c77d3c76..e2d8538a581 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -95,8 +95,9 @@ class RocksDBStore : public KeyValueDB { int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt); // manage async compactions - Mutex compact_queue_lock; - Cond compact_queue_cond; + ceph::mutex compact_queue_lock = + ceph::make_mutex("RocksDBStore::compact_thread_lock"); + ceph::condition_variable compact_queue_cond; list< pair > compact_queue; bool compact_queue_stop; class CompactThread : public Thread { @@ -155,7 +156,6 @@ public: db(NULL), env(static_cast(p)), dbstats(NULL), - compact_queue_lock("RocksDBStore::compact_thread_lock"), compact_queue_stop(false), compact_thread(this), compact_on_mount(false),