common: s/Mutex/ceph::mutex/

Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2019-07-07 11:20:13 +08:00
parent 79a722402a
commit d5cbb94bcb
8 changed files with 50 additions and 51 deletions

View File

@ -126,17 +126,17 @@ class Cond {
* assume the caller is holding the appropriate lock.
*/
class C_Cond : public Context {
Cond *cond; ///< Cond to signal
ceph::condition_variable& cond; ///< Cond to signal
bool *done; ///< true if finish() has been called
int *rval; ///< return value
public:
C_Cond(Cond *c, bool *d, int *r) : cond(c), done(d), rval(r) {
C_Cond(ceph::condition_variable &c, bool *d, int *r) : cond(c), done(d), rval(r) {
*done = false;
}
void finish(int r) override {
*done = true;
*rval = r;
cond->Signal();
cond.notify_all();
}
};
@ -148,21 +148,21 @@ public:
* already hold it.
*/
class C_SafeCond : public Context {
Mutex *lock; ///< Mutex to take
Cond *cond; ///< Cond to signal
ceph::mutex& lock; ///< Mutex to take
ceph::condition_variable& cond; ///< Cond to signal
bool *done; ///< true after finish() has been called
int *rval; ///< return value (optional)
public:
C_SafeCond(Mutex *l, Cond *c, bool *d, int *r=0) : lock(l), cond(c), done(d), rval(r) {
C_SafeCond(ceph::mutex& l, ceph::condition_variable& c, bool *d, int *r=0)
: lock(l), cond(c), done(d), rval(r) {
*done = false;
}
void finish(int r) override {
lock->lock();
std::lock_guard l{lock};
if (rval)
*rval = r;
*done = true;
cond->Signal();
lock->unlock();
cond.notify_all();
}
};
@ -173,13 +173,16 @@ public:
* until wait() returns.
*/
class C_SaferCond : public Context {
Mutex lock; ///< Mutex to take
Cond cond; ///< Cond to signal
bool done; ///< true after finish() has been called
int rval; ///< return value
ceph::mutex lock; ///< Mutex to take
ceph::condition_variable cond; ///< Cond to signal
bool done = false; ///< true after finish() has been called
int rval = 0; ///< return value
public:
C_SaferCond() : lock("C_SaferCond"), done(false), rval(0) {}
explicit C_SaferCond(const std::string &name) : lock(name), done(false), rval(0) {}
C_SaferCond() :
C_SaferCond("C_SaferCond")
{}
explicit C_SaferCond(const std::string &name)
: lock(ceph::make_mutex(name)) {}
void finish(int r) override { complete(r); }
/// We overload complete in order to not delete the context
@ -187,27 +190,27 @@ public:
std::lock_guard l(lock);
done = true;
rval = r;
cond.Signal();
cond.notify_all();
}
/// Returns rval once the Context is called
int wait() {
std::lock_guard l(lock);
while (!done)
cond.Wait(lock);
std::unique_lock l{lock};
cond.wait(l, [this] { return done;});
return rval;
}
/// Wait until the \c secs expires or \c complete() is called
int wait_for(double secs) {
utime_t interval;
interval.set_from_double(secs);
std::lock_guard l{lock};
std::unique_lock l{lock};
if (done) {
return rval;
}
cond.WaitInterval(lock, interval);
return done ? rval : ETIMEDOUT;
if (cond.wait_for(l, ceph::make_timespan(secs), [this] { return done; })) {
return rval;
} else {
return ETIMEDOUT;
}
}
};

View File

@ -28,7 +28,6 @@ namespace ceph {
HeartbeatMap::HeartbeatMap(CephContext *cct)
: m_cct(cct),
m_rwlock("HeartbeatMap::m_rwlock"),
m_unhealthy_workers(0),
m_total_workers(0)
{
@ -41,7 +40,7 @@ HeartbeatMap::~HeartbeatMap()
heartbeat_handle_d *HeartbeatMap::add_worker(const string& name, pthread_t thread_id)
{
m_rwlock.get_write();
std::unique_lock locker{m_rwlock};
ldout(m_cct, 10) << "add_worker '" << name << "'" << dendl;
heartbeat_handle_d *h = new heartbeat_handle_d(name);
ANNOTATE_BENIGN_RACE_SIZED(&h->timeout, sizeof(h->timeout),
@ -51,16 +50,14 @@ heartbeat_handle_d *HeartbeatMap::add_worker(const string& name, pthread_t threa
m_workers.push_front(h);
h->list_item = m_workers.begin();
h->thread_id = thread_id;
m_rwlock.put_write();
return h;
}
void HeartbeatMap::remove_worker(const heartbeat_handle_d *h)
{
m_rwlock.get_write();
std::unique_lock locker{m_rwlock};
ldout(m_cct, 10) << "remove_worker '" << h->name << "'" << dendl;
m_workers.erase(h->list_item);
m_rwlock.put_write();
delete h;
}
@ -119,7 +116,7 @@ bool HeartbeatMap::is_healthy()
{
int unhealthy = 0;
int total = 0;
m_rwlock.get_read();
m_rwlock.lock_shared();
auto now = ceph::coarse_mono_clock::now();
if (m_cct->_conf->heartbeat_inject_failure) {
ldout(m_cct, 0) << "is_healthy injecting failure for next " << m_cct->_conf->heartbeat_inject_failure << " seconds" << dendl;
@ -146,7 +143,7 @@ bool HeartbeatMap::is_healthy()
}
total++;
}
m_rwlock.put_read();
m_rwlock.unlock_shared();
m_unhealthy_workers = unhealthy;
m_total_workers = total;

View File

@ -21,7 +21,7 @@
#include <pthread.h>
#include "common/ceph_time.h"
#include "RWLock.h"
#include "common/ceph_mutex.h"
class CephContext;
@ -81,7 +81,8 @@ class HeartbeatMap {
private:
CephContext *m_cct;
RWLock m_rwlock;
ceph::shared_mutex m_rwlock =
ceph::make_shared_mutex("HeartbeatMap::m_rwlock");
ceph::coarse_mono_clock::time_point m_inject_unhealthy_until;
std::list<heartbeat_handle_d*> m_workers;
std::atomic<unsigned> m_unhealthy_workers = { 0 };

View File

@ -708,11 +708,11 @@ TokenBucketThrottle::TokenBucketThrottle(
uint64_t capacity,
uint64_t avg,
SafeTimer *timer,
Mutex *timer_lock)
ceph::mutex *timer_lock)
: m_cct(cct), m_name(name),
m_throttle(m_cct, name + "_bucket", capacity),
m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
m_lock(name + "_lock")
m_lock(ceph::make_mutex(name + "_lock"))
{}
TokenBucketThrottle::~TokenBucketThrottle() {
@ -735,7 +735,7 @@ TokenBucketThrottle::~TokenBucketThrottle() {
int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst) {
{
std::lock_guard<Mutex> lock(m_lock);
std::lock_guard lock{m_lock};
if (0 < burst && burst < average) {
// the burst should never less than the average.
@ -769,7 +769,7 @@ int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst) {
// The schedule period will be changed when the average rate is set.
{
std::lock_guard<Mutex> timer_locker(*m_timer_lock);
std::lock_guard timer_locker{*m_timer_lock};
cancel_timer();
schedule_timer();
}

View File

@ -362,10 +362,10 @@ class TokenBucketThrottle {
uint64_t m_avg = 0;
uint64_t m_burst = 0;
SafeTimer *m_timer;
Mutex *m_timer_lock;
ceph::mutex *m_timer_lock;
FunctionContext *m_token_ctx = nullptr;
std::list<Blocker> m_blockers;
Mutex m_lock;
ceph::mutex m_lock;
// minimum of the filling period.
uint64_t m_tick_min = 50;
@ -409,7 +409,7 @@ class TokenBucketThrottle {
public:
TokenBucketThrottle(CephContext *cct, const std::string &name,
uint64_t capacity, uint64_t avg,
SafeTimer *timer, Mutex *timer_lock);
SafeTimer *timer, ceph::mutex *timer_lock);
~TokenBucketThrottle();

View File

@ -149,7 +149,7 @@ OpTracker::OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards):
num_optracker_shards(num_shards),
complaint_time(0), log_threshold(0),
tracking_enabled(tracking),
lock("OpTracker::lock"), cct(cct_) {
cct(cct_) {
for (uint32_t i = 0; i < num_optracker_shards; i++) {
char lock_name[32] = {0};
snprintf(lock_name, sizeof(lock_name), "%s:%" PRIu32, "OpTracker::ShardedLock", i);
@ -171,7 +171,7 @@ bool OpTracker::dump_historic_ops(Formatter *f, bool by_duration, set<string> fi
if (!tracking_enabled)
return false;
RWLock::RLocker l(lock);
std::shared_lock l{lock};
utime_t now = ceph_clock_now();
history.dump_ops(now, f, filters, by_duration);
return true;
@ -206,7 +206,7 @@ bool OpTracker::dump_historic_slow_ops(Formatter *f, set<string> filters)
if (!tracking_enabled)
return false;
RWLock::RLocker l(lock);
std::shared_lock l{lock};
utime_t now = ceph_clock_now();
history.dump_slow_ops(now, f, filters);
return true;
@ -217,7 +217,7 @@ bool OpTracker::dump_ops_in_flight(Formatter *f, bool print_only_blocked, set<st
if (!tracking_enabled)
return false;
RWLock::RLocker l(lock);
std::shared_lock l{lock};
f->open_object_section("ops_in_flight"); // overall dump
uint64_t total_ops_in_flight = 0;
f->open_array_section("ops"); // list of TrackedOps
@ -252,7 +252,7 @@ bool OpTracker::register_inflight_op(TrackedOp *i)
if (!tracking_enabled)
return false;
RWLock::RLocker l(lock);
std::shared_lock l{lock};
uint64_t current_seq = ++seq;
uint32_t shard_index = current_seq % num_optracker_shards;
ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
@ -282,7 +282,7 @@ void OpTracker::unregister_inflight_op(TrackedOp* const i)
void OpTracker::record_history_op(TrackedOpRef&& i)
{
RWLock::RLocker l(lock);
std::shared_lock l{lock};
history.insert(ceph_clock_now(), std::move(i));
}
@ -296,7 +296,7 @@ bool OpTracker::visit_ops_in_flight(utime_t* oldest_secs,
utime_t oldest_op = now;
uint64_t total_ops_in_flight = 0;
RWLock::RLocker l(lock);
std::shared_lock l{lock};
for (const auto sdata : sharded_in_flight_list) {
ceph_assert(sdata);
std::lock_guard locker(sdata->ops_in_flight_lock_sharded);

View File

@ -15,11 +15,10 @@
#define TRACKEDREQUEST_H_
#include <atomic>
#include "common/ceph_mutex.h"
#include "common/histogram.h"
#include "common/RWLock.h"
#include "common/Thread.h"
#include "common/Clock.h"
#include "common/ceph_mutex.h"
#include "include/spinlock.h"
#include "msg/Message.h"
@ -108,7 +107,7 @@ class OpTracker {
float complaint_time;
int log_threshold;
std::atomic<bool> tracking_enabled;
RWLock lock;
ceph::shared_mutex lock = ceph::make_shared_mutex("OpTracker::lock");
public:
CephContext *cct;

View File

@ -12,7 +12,6 @@
*
*/
#include "common/Mutex.h"
#include "common/HeartbeatMap.h"
#include "common/ceph_context.h"
#include "common/config.h"