mirror of
https://github.com/ceph/ceph
synced 2025-03-11 02:39:05 +00:00
common/timer: s/Mutex/ceph::mutex/
Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
parent
bcabe52695
commit
649891ae71
src
@ -33,11 +33,7 @@ public:
|
||||
};
|
||||
|
||||
|
||||
|
||||
typedef std::multimap < utime_t, Context *> scheduled_map_t;
|
||||
typedef std::map < Context*, scheduled_map_t::iterator > event_lookup_map_t;
|
||||
|
||||
SafeTimer::SafeTimer(CephContext *cct_, Mutex &l, bool safe_callbacks)
|
||||
SafeTimer::SafeTimer(CephContext *cct_, ceph::mutex &l, bool safe_callbacks)
|
||||
: cct(cct_), lock(l),
|
||||
safe_callbacks(safe_callbacks),
|
||||
thread(NULL),
|
||||
@ -61,10 +57,10 @@ void SafeTimer::shutdown()
|
||||
{
|
||||
ldout(cct,10) << "shutdown" << dendl;
|
||||
if (thread) {
|
||||
ceph_assert(lock.is_locked());
|
||||
ceph_assert(ceph_mutex_is_locked(lock));
|
||||
cancel_all_events();
|
||||
stopping = true;
|
||||
cond.Signal();
|
||||
cond.notify_all();
|
||||
lock.unlock();
|
||||
thread->join();
|
||||
lock.lock();
|
||||
@ -75,13 +71,13 @@ void SafeTimer::shutdown()
|
||||
|
||||
void SafeTimer::timer_thread()
|
||||
{
|
||||
lock.lock();
|
||||
std::unique_lock l{lock};
|
||||
ldout(cct,10) << "timer_thread starting" << dendl;
|
||||
while (!stopping) {
|
||||
utime_t now = ceph_clock_now();
|
||||
auto now = clock_t::now();
|
||||
|
||||
while (!schedule.empty()) {
|
||||
scheduled_map_t::iterator p = schedule.begin();
|
||||
auto p = schedule.begin();
|
||||
|
||||
// is the future now?
|
||||
if (p->first > now)
|
||||
@ -92,11 +88,13 @@ void SafeTimer::timer_thread()
|
||||
schedule.erase(p);
|
||||
ldout(cct,10) << "timer_thread executing " << callback << dendl;
|
||||
|
||||
if (!safe_callbacks)
|
||||
lock.unlock();
|
||||
callback->complete(0);
|
||||
if (!safe_callbacks)
|
||||
lock.lock();
|
||||
if (!safe_callbacks) {
|
||||
l.unlock();
|
||||
callback->complete(0);
|
||||
l.lock();
|
||||
} else {
|
||||
callback->complete(0);
|
||||
}
|
||||
}
|
||||
|
||||
// recheck stopping if we dropped the lock
|
||||
@ -104,28 +102,27 @@ void SafeTimer::timer_thread()
|
||||
break;
|
||||
|
||||
ldout(cct,20) << "timer_thread going to sleep" << dendl;
|
||||
if (schedule.empty())
|
||||
cond.Wait(lock);
|
||||
else
|
||||
cond.WaitUntil(lock, schedule.begin()->first);
|
||||
if (schedule.empty()) {
|
||||
cond.wait(l);
|
||||
} else {
|
||||
cond.wait_until(l, schedule.begin()->first);
|
||||
}
|
||||
ldout(cct,20) << "timer_thread awake" << dendl;
|
||||
}
|
||||
ldout(cct,10) << "timer_thread exiting" << dendl;
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
Context* SafeTimer::add_event_after(double seconds, Context *callback)
|
||||
{
|
||||
ceph_assert(lock.is_locked());
|
||||
ceph_assert(ceph_mutex_is_locked(lock));
|
||||
|
||||
utime_t when = ceph_clock_now();
|
||||
when += seconds;
|
||||
auto when = clock_t::now() + ceph::make_timespan(seconds);
|
||||
return add_event_at(when, callback);
|
||||
}
|
||||
|
||||
Context* SafeTimer::add_event_at(utime_t when, Context *callback)
|
||||
Context* SafeTimer::add_event_at(SafeTimer::clock_t::time_point when, Context *callback)
|
||||
{
|
||||
ceph_assert(lock.is_locked());
|
||||
ceph_assert(ceph_mutex_is_locked(lock));
|
||||
ldout(cct,10) << __func__ << " " << when << " -> " << callback << dendl;
|
||||
if (stopping) {
|
||||
ldout(cct,5) << __func__ << " already shutdown, event not added" << dendl;
|
||||
@ -144,13 +141,13 @@ Context* SafeTimer::add_event_at(utime_t when, Context *callback)
|
||||
/* If the event we have just inserted comes before everything else, we need to
|
||||
* adjust our timeout. */
|
||||
if (i == schedule.begin())
|
||||
cond.Signal();
|
||||
cond.notify_all();
|
||||
return callback;
|
||||
}
|
||||
|
||||
bool SafeTimer::cancel_event(Context *callback)
|
||||
{
|
||||
ceph_assert(lock.is_locked());
|
||||
ceph_assert(ceph_mutex_is_locked(lock));
|
||||
|
||||
auto p = events.find(callback);
|
||||
if (p == events.end()) {
|
||||
@ -169,7 +166,7 @@ bool SafeTimer::cancel_event(Context *callback)
|
||||
void SafeTimer::cancel_all_events()
|
||||
{
|
||||
ldout(cct,10) << "cancel_all_events" << dendl;
|
||||
ceph_assert(lock.is_locked());
|
||||
ceph_assert(ceph_mutex_is_locked(lock));
|
||||
|
||||
while (!events.empty()) {
|
||||
auto p = events.begin();
|
||||
|
@ -15,8 +15,7 @@
|
||||
#ifndef CEPH_TIMER_H
|
||||
#define CEPH_TIMER_H
|
||||
|
||||
#include "Cond.h"
|
||||
#include "Mutex.h"
|
||||
#include "ceph_mutex.h"
|
||||
|
||||
class CephContext;
|
||||
class Context;
|
||||
@ -25,8 +24,8 @@ class SafeTimerThread;
|
||||
class SafeTimer
|
||||
{
|
||||
CephContext *cct;
|
||||
Mutex& lock;
|
||||
Cond cond;
|
||||
ceph::mutex& lock;
|
||||
ceph::condition_variable cond;
|
||||
bool safe_callbacks;
|
||||
|
||||
friend class SafeTimerThread;
|
||||
@ -35,8 +34,11 @@ class SafeTimer
|
||||
void timer_thread();
|
||||
void _shutdown();
|
||||
|
||||
std::multimap<utime_t, Context*> schedule;
|
||||
std::map<Context*, std::multimap<utime_t, Context*>::iterator> events;
|
||||
using clock_t = ceph::real_clock;
|
||||
using scheduled_map_t = std::multimap<clock_t::time_point, Context*>;
|
||||
scheduled_map_t schedule;
|
||||
using event_lookup_map_t = std::map<Context*, scheduled_map_t::iterator>;
|
||||
event_lookup_map_t events;
|
||||
bool stopping;
|
||||
|
||||
void dump(const char *caller = 0) const;
|
||||
@ -56,7 +58,7 @@ public:
|
||||
* If you are able to relax requirements on cancelled callbacks, then
|
||||
* setting safe_callbacks = false eliminates the lock cycle issue.
|
||||
* */
|
||||
SafeTimer(CephContext *cct, Mutex &l, bool safe_callbacks=true);
|
||||
SafeTimer(CephContext *cct, ceph::mutex &l, bool safe_callbacks=true);
|
||||
virtual ~SafeTimer();
|
||||
|
||||
/* Call with the event_lock UNLOCKED.
|
||||
@ -71,7 +73,7 @@ public:
|
||||
/* Schedule an event in the future
|
||||
* Call with the event_lock LOCKED */
|
||||
Context* add_event_after(double seconds, Context *callback);
|
||||
Context* add_event_at(utime_t when, Context *callback);
|
||||
Context* add_event_at(clock_t::time_point when, Context *callback);
|
||||
|
||||
/* Cancel an event.
|
||||
* Call with the event_lock LOCKED
|
||||
|
@ -1,7 +1,8 @@
|
||||
#include "common/ceph_argparse.h"
|
||||
#include "common/Mutex.h"
|
||||
#include "common/ceph_mutex.h"
|
||||
#include "common/Timer.h"
|
||||
#include "global/global_init.h"
|
||||
#include "include/Context.h"
|
||||
|
||||
#include <iostream>
|
||||
|
||||
@ -20,7 +21,7 @@ namespace
|
||||
int array_idx;
|
||||
TestContext* test_contexts[MAX_TEST_CONTEXTS];
|
||||
|
||||
Mutex array_lock("test_timers_mutex");
|
||||
ceph::mutex array_lock = ceph::make_mutex("test_timers_mutex");
|
||||
}
|
||||
|
||||
class TestContext : public Context
|
||||
@ -33,10 +34,9 @@ public:
|
||||
|
||||
void finish(int r) override
|
||||
{
|
||||
array_lock.Lock();
|
||||
lock_guard locker{array_lock};
|
||||
cout << "TestContext " << num << std::endl;
|
||||
test_array[array_idx++] = num;
|
||||
array_lock.Unlock();
|
||||
}
|
||||
|
||||
~TestContext() override
|
||||
@ -57,10 +57,9 @@ public:
|
||||
|
||||
void finish(int r) override
|
||||
{
|
||||
array_lock.Lock();
|
||||
std::lock_guard locker{array_lock};
|
||||
cout << "StrictOrderTestContext " << num << std::endl;
|
||||
test_array[num] = num;
|
||||
array_lock.Unlock();
|
||||
}
|
||||
|
||||
~StrictOrderTestContext() override
|
||||
@ -76,7 +75,7 @@ static void print_status(const char *str, int ret)
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
static int basic_timer_test(T &timer, Mutex *lock)
|
||||
static int basic_timer_test(T &timer, ceph::mutex *lock)
|
||||
{
|
||||
int ret = 0;
|
||||
memset(&test_array, 0, sizeof(test_array));
|
||||
@ -92,20 +91,18 @@ static int basic_timer_test(T &timer, Mutex *lock)
|
||||
|
||||
for (int i = 0; i < MAX_TEST_CONTEXTS; ++i) {
|
||||
if (lock)
|
||||
lock->Lock();
|
||||
utime_t inc(2 * i, 0);
|
||||
utime_t t = ceph_clock_now() + inc;
|
||||
lock->lock();
|
||||
auto t = ceph::real_clock::now() + std::chrono::seconds(2 * i);
|
||||
timer.add_event_at(t, test_contexts[i]);
|
||||
if (lock)
|
||||
lock->Unlock();
|
||||
lock->unlock();
|
||||
}
|
||||
|
||||
bool done = false;
|
||||
do {
|
||||
sleep(1);
|
||||
array_lock.Lock();
|
||||
std::lock_guard locker{array_lock};
|
||||
done = (array_idx == MAX_TEST_CONTEXTS);
|
||||
array_lock.Unlock();
|
||||
} while (!done);
|
||||
|
||||
for (int i = 0; i < MAX_TEST_CONTEXTS; ++i) {
|
||||
@ -119,7 +116,7 @@ static int basic_timer_test(T &timer, Mutex *lock)
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int test_out_of_order_insertion(SafeTimer &timer, Mutex *lock)
|
||||
static int test_out_of_order_insertion(SafeTimer &timer, ceph::mutex *lock)
|
||||
{
|
||||
int ret = 0;
|
||||
memset(&test_array, 0, sizeof(test_array));
|
||||
@ -132,27 +129,23 @@ static int test_out_of_order_insertion(SafeTimer &timer, Mutex *lock)
|
||||
test_contexts[1] = new StrictOrderTestContext(1);
|
||||
|
||||
{
|
||||
utime_t inc(100, 0);
|
||||
utime_t t = ceph_clock_now() + inc;
|
||||
lock->Lock();
|
||||
auto t = ceph::real_clock::now() + 100s;
|
||||
std::lock_guard locker{*lock};
|
||||
timer.add_event_at(t, test_contexts[0]);
|
||||
lock->Unlock();
|
||||
}
|
||||
|
||||
{
|
||||
utime_t inc(2, 0);
|
||||
utime_t t = ceph_clock_now() + inc;
|
||||
lock->Lock();
|
||||
auto t = ceph::real_clock::now() + 2s;
|
||||
std::lock_guard locker{*lock};
|
||||
timer.add_event_at(t, test_contexts[1]);
|
||||
lock->Unlock();
|
||||
}
|
||||
|
||||
int secs = 0;
|
||||
for (; secs < 100 ; ++secs) {
|
||||
sleep(1);
|
||||
array_lock.Lock();
|
||||
array_lock.lock();
|
||||
int a = test_array[1];
|
||||
array_lock.Unlock();
|
||||
array_lock.unlock();
|
||||
if (a == 1)
|
||||
break;
|
||||
}
|
||||
@ -166,7 +159,8 @@ static int test_out_of_order_insertion(SafeTimer &timer, Mutex *lock)
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int safe_timer_cancel_all_test(SafeTimer &safe_timer, Mutex& safe_timer_lock)
|
||||
static int safe_timer_cancel_all_test(SafeTimer &safe_timer,
|
||||
ceph::mutex& safe_timer_lock)
|
||||
{
|
||||
cout << __PRETTY_FUNCTION__ << std::endl;
|
||||
|
||||
@ -179,19 +173,18 @@ static int safe_timer_cancel_all_test(SafeTimer &safe_timer, Mutex& safe_timer_l
|
||||
test_contexts[i] = new TestContext(i);
|
||||
}
|
||||
|
||||
safe_timer_lock.Lock();
|
||||
safe_timer_lock.lock();
|
||||
for (int i = 0; i < MAX_TEST_CONTEXTS; ++i) {
|
||||
utime_t inc(4 * i, 0);
|
||||
utime_t t = ceph_clock_now() + inc;
|
||||
auto t = ceph::real_clock::now() + std::chrono::seconds(4 * i);
|
||||
safe_timer.add_event_at(t, test_contexts[i]);
|
||||
}
|
||||
safe_timer_lock.Unlock();
|
||||
safe_timer_lock.unlock();
|
||||
|
||||
sleep(10);
|
||||
|
||||
safe_timer_lock.Lock();
|
||||
safe_timer_lock.lock();
|
||||
safe_timer.cancel_all_events();
|
||||
safe_timer_lock.Unlock();
|
||||
safe_timer_lock.unlock();
|
||||
|
||||
for (int i = 0; i < array_idx; ++i) {
|
||||
if (test_array[i] != i) {
|
||||
@ -204,7 +197,8 @@ static int safe_timer_cancel_all_test(SafeTimer &safe_timer, Mutex& safe_timer_l
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int safe_timer_cancellation_test(SafeTimer &safe_timer, Mutex& safe_timer_lock)
|
||||
static int safe_timer_cancellation_test(SafeTimer &safe_timer,
|
||||
ceph::mutex& safe_timer_lock)
|
||||
{
|
||||
cout << __PRETTY_FUNCTION__ << std::endl;
|
||||
|
||||
@ -217,26 +211,25 @@ static int safe_timer_cancellation_test(SafeTimer &safe_timer, Mutex& safe_timer
|
||||
test_contexts[i] = new StrictOrderTestContext(i);
|
||||
}
|
||||
|
||||
safe_timer_lock.Lock();
|
||||
safe_timer_lock.lock();
|
||||
for (int i = 0; i < MAX_TEST_CONTEXTS; ++i) {
|
||||
utime_t inc(4 * i, 0);
|
||||
utime_t t = ceph_clock_now() + inc;
|
||||
auto t = ceph::real_clock::now() + std::chrono::seconds(4 * i);
|
||||
safe_timer.add_event_at(t, test_contexts[i]);
|
||||
}
|
||||
safe_timer_lock.Unlock();
|
||||
safe_timer_lock.unlock();
|
||||
|
||||
// cancel the even-numbered events
|
||||
for (int i = 0; i < MAX_TEST_CONTEXTS; i += 2) {
|
||||
safe_timer_lock.Lock();
|
||||
safe_timer_lock.lock();
|
||||
safe_timer.cancel_event(test_contexts[i]);
|
||||
safe_timer_lock.Unlock();
|
||||
safe_timer_lock.unlock();
|
||||
}
|
||||
|
||||
sleep(20);
|
||||
|
||||
safe_timer_lock.Lock();
|
||||
safe_timer_lock.lock();
|
||||
safe_timer.cancel_all_events();
|
||||
safe_timer_lock.Unlock();
|
||||
safe_timer_lock.unlock();
|
||||
|
||||
for (int i = 1; i < array_idx; i += 2) {
|
||||
if (test_array[i] != i) {
|
||||
@ -260,7 +253,7 @@ int main(int argc, const char **argv)
|
||||
common_init_finish(g_ceph_context);
|
||||
|
||||
int ret;
|
||||
Mutex safe_timer_lock("safe_timer_lock");
|
||||
ceph::mutex safe_timer_lock = ceph::make_mutex("safe_timer_lock");
|
||||
SafeTimer safe_timer(g_ceph_context, safe_timer_lock);
|
||||
|
||||
ret = basic_timer_test <SafeTimer>(safe_timer, &safe_timer_lock);
|
||||
|
Loading…
Reference in New Issue
Block a user