kv: s/Mutex/ceph::mutex/

Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2019-07-17 16:25:09 +08:00
parent de68b2cbbc
commit b1f5565c2f
4 changed files with 24 additions and 26 deletions

View File

@ -180,14 +180,14 @@ LevelDBStore::~LevelDBStore()
void LevelDBStore::close()
{
// stop compaction thread
compact_queue_lock.Lock();
compact_queue_lock.lock();
if (compact_thread.is_started()) {
compact_queue_stop = true;
compact_queue_cond.Signal();
compact_queue_lock.Unlock();
compact_queue_cond.notify_all();
compact_queue_lock.unlock();
compact_thread.join();
} else {
compact_queue_lock.Unlock();
compact_queue_lock.unlock();
}
if (logger)
@ -383,27 +383,26 @@ void LevelDBStore::compact()
void LevelDBStore::compact_thread_entry()
{
compact_queue_lock.Lock();
std::unique_lock l{compact_queue_lock};
while (!compact_queue_stop) {
while (!compact_queue.empty()) {
pair<string,string> range = compact_queue.front();
compact_queue.pop_front();
logger->set(l_leveldb_compact_queue_len, compact_queue.size());
compact_queue_lock.Unlock();
l.unlock();
logger->inc(l_leveldb_compact_range);
if (range.first.empty() && range.second.empty()) {
compact();
} else {
compact_range(range.first, range.second);
}
compact_queue_lock.Lock();
l.lock();
continue;
}
if (compact_queue_stop)
break;
compact_queue_cond.Wait(compact_queue_lock);
compact_queue_cond.wait(l);
}
compact_queue_lock.Unlock();
}
void LevelDBStore::compact_range_async(const string& start, const string& end)
@ -440,7 +439,7 @@ void LevelDBStore::compact_range_async(const string& start, const string& end)
compact_queue.push_back(make_pair(start, end));
logger->set(l_leveldb_compact_queue_len, compact_queue.size());
}
compact_queue_cond.Signal();
compact_queue_cond.notify_all();
if (!compact_thread.is_started()) {
compact_thread.create("levdbst_compact");
}

View File

@ -69,8 +69,9 @@ class LevelDBStore : public KeyValueDB {
int do_open(ostream &out, bool create_if_missing);
// manage async compactions
Mutex compact_queue_lock;
Cond compact_queue_cond;
ceph::mutex compact_queue_lock =
ceph::make_mutex("LevelDBStore::compact_thread_lock");
ceph::condition_variable compact_queue_cond;
list< pair<string,string> > compact_queue;
bool compact_queue_stop;
class CompactThread : public Thread {
@ -166,7 +167,6 @@ public:
#ifdef HAVE_LEVELDB_FILTER_POLICY
filterpolicy(NULL),
#endif
compact_queue_lock("LevelDBStore::compact_thread_lock"),
compact_queue_stop(false),
compact_thread(this),
options()

View File

@ -650,14 +650,14 @@ RocksDBStore::~RocksDBStore()
void RocksDBStore::close()
{
// stop compaction thread
compact_queue_lock.Lock();
compact_queue_lock.lock();
if (compact_thread.is_started()) {
compact_queue_stop = true;
compact_queue_cond.Signal();
compact_queue_lock.Unlock();
compact_queue_cond.notify_all();
compact_queue_lock.unlock();
compact_thread.join();
} else {
compact_queue_lock.Unlock();
compact_queue_lock.unlock();
}
if (logger)
@ -1288,25 +1288,24 @@ void RocksDBStore::compact()
void RocksDBStore::compact_thread_entry()
{
compact_queue_lock.Lock();
std::unique_lock l{compact_queue_lock};
while (!compact_queue_stop) {
while (!compact_queue.empty()) {
pair<string,string> range = compact_queue.front();
compact_queue.pop_front();
logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
compact_queue_lock.Unlock();
l.unlock();
logger->inc(l_rocksdb_compact_range);
if (range.first.empty() && range.second.empty()) {
compact();
} else {
compact_range(range.first, range.second);
}
compact_queue_lock.Lock();
l.lock();
continue;
}
compact_queue_cond.Wait(compact_queue_lock);
compact_queue_cond.wait(l);
}
compact_queue_lock.Unlock();
}
void RocksDBStore::compact_range_async(const string& start, const string& end)
@ -1346,7 +1345,7 @@ void RocksDBStore::compact_range_async(const string& start, const string& end)
compact_queue.push_back(make_pair(start, end));
logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
}
compact_queue_cond.Signal();
compact_queue_cond.notify_all();
if (!compact_thread.is_started()) {
compact_thread.create("rstore_compact");
}

View File

@ -95,8 +95,9 @@ class RocksDBStore : public KeyValueDB {
int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt);
// manage async compactions
Mutex compact_queue_lock;
Cond compact_queue_cond;
ceph::mutex compact_queue_lock =
ceph::make_mutex("RocksDBStore::compact_thread_lock");
ceph::condition_variable compact_queue_cond;
list< pair<string,string> > compact_queue;
bool compact_queue_stop;
class CompactThread : public Thread {
@ -155,7 +156,6 @@ public:
db(NULL),
env(static_cast<rocksdb::Env*>(p)),
dbstats(NULL),
compact_queue_lock("RocksDBStore::compact_thread_lock"),
compact_queue_stop(false),
compact_thread(this),
compact_on_mount(false),