mirror of
https://github.com/ceph/ceph
synced 2025-02-24 03:27:10 +00:00
throttle: Minimal destructor fix for Luminous
Get rid of the undefined behavior of destroying condition variables while they're being waited on. Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
This commit is contained in:
parent
05b182c1c2
commit
0f52f486ef
@ -1,6 +1,8 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
|
||||
#include "include/scope_guard.h"
|
||||
|
||||
#include "common/Throttle.h"
|
||||
#include "common/perf_counters.h"
|
||||
|
||||
@ -63,10 +65,9 @@ Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_
|
||||
|
||||
Throttle::~Throttle()
|
||||
{
|
||||
while (!cond.empty()) {
|
||||
Cond *cv = cond.front();
|
||||
delete cv;
|
||||
cond.pop_front();
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
assert(cond.empty());
|
||||
}
|
||||
|
||||
if (!use_perf)
|
||||
@ -95,26 +96,27 @@ bool Throttle::_wait(int64_t c)
|
||||
utime_t start;
|
||||
bool waited = false;
|
||||
if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
|
||||
Cond *cv = new Cond;
|
||||
cond.push_back(cv);
|
||||
waited = true;
|
||||
ldout(cct, 2) << "_wait waiting..." << dendl;
|
||||
if (logger)
|
||||
start = ceph_clock_now();
|
||||
{
|
||||
auto cv = cond.insert(cond.end(), new Cond);
|
||||
auto w = make_scope_guard([this, cv]() {
|
||||
delete *cv;
|
||||
cond.erase(cv);
|
||||
});
|
||||
waited = true;
|
||||
ldout(cct, 2) << "_wait waiting..." << dendl;
|
||||
if (logger)
|
||||
start = ceph_clock_now();
|
||||
|
||||
do {
|
||||
cv->Wait(lock);
|
||||
} while (_should_wait(c) || cv != cond.front());
|
||||
do {
|
||||
(*cv)->Wait(lock);
|
||||
} while ((_should_wait(c) || cv != cond.begin()));
|
||||
|
||||
ldout(cct, 2) << "_wait finished waiting" << dendl;
|
||||
if (logger) {
|
||||
utime_t dur = ceph_clock_now() - start;
|
||||
logger->tinc(l_throttle_wait, dur);
|
||||
ldout(cct, 2) << "_wait finished waiting" << dendl;
|
||||
if (logger) {
|
||||
utime_t dur = ceph_clock_now() - start;
|
||||
logger->tinc(l_throttle_wait, dur);
|
||||
}
|
||||
}
|
||||
|
||||
delete cv;
|
||||
cond.pop_front();
|
||||
|
||||
// wake up the next guy
|
||||
if (!cond.empty())
|
||||
cond.front()->SignalOne();
|
||||
@ -291,6 +293,11 @@ BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigne
|
||||
|
||||
BackoffThrottle::~BackoffThrottle()
|
||||
{
|
||||
{
|
||||
locker l(lock);
|
||||
assert(waiters.empty());
|
||||
}
|
||||
|
||||
if (!use_perf)
|
||||
return;
|
||||
|
||||
@ -537,13 +544,17 @@ SimpleThrottle::~SimpleThrottle()
|
||||
{
|
||||
Mutex::Locker l(m_lock);
|
||||
assert(m_current == 0);
|
||||
assert(waiters == 0);
|
||||
}
|
||||
|
||||
void SimpleThrottle::start_op()
|
||||
{
|
||||
Mutex::Locker l(m_lock);
|
||||
while (m_max == m_current)
|
||||
while (m_max == m_current) {
|
||||
waiters++;
|
||||
m_cond.Wait(m_lock);
|
||||
waiters--;
|
||||
}
|
||||
++m_current;
|
||||
}
|
||||
|
||||
@ -565,8 +576,11 @@ bool SimpleThrottle::pending_error() const
|
||||
int SimpleThrottle::wait_for_ret()
|
||||
{
|
||||
Mutex::Locker l(m_lock);
|
||||
while (m_current > 0)
|
||||
while (m_current > 0) {
|
||||
waiters++;
|
||||
m_cond.Wait(m_lock);
|
||||
waiters--;
|
||||
}
|
||||
return m_ret;
|
||||
}
|
||||
|
||||
@ -579,6 +593,11 @@ OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
|
||||
m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
|
||||
}
|
||||
|
||||
OrderedThrottle::~OrderedThrottle() {
|
||||
Mutex::Locker locker(m_lock);
|
||||
assert(waiters == 0);
|
||||
}
|
||||
|
||||
C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
|
||||
assert(on_finish != NULL);
|
||||
|
||||
@ -589,7 +608,9 @@ C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
|
||||
|
||||
complete_pending_ops();
|
||||
while (m_max == m_current) {
|
||||
++waiters;
|
||||
m_cond.Wait(m_lock);
|
||||
--waiters;
|
||||
complete_pending_ops();
|
||||
}
|
||||
++m_current;
|
||||
@ -629,7 +650,9 @@ int OrderedThrottle::wait_for_ret() {
|
||||
complete_pending_ops();
|
||||
|
||||
while (m_current > 0) {
|
||||
++waiters;
|
||||
m_cond.Wait(m_lock);
|
||||
--waiters;
|
||||
complete_pending_ops();
|
||||
}
|
||||
return m_ret_val;
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <atomic>
|
||||
#include <iostream>
|
||||
#include <condition_variable>
|
||||
#include <stdexcept>
|
||||
|
||||
#include "Cond.h"
|
||||
#include "include/Context.h"
|
||||
@ -33,6 +34,8 @@ class Throttle {
|
||||
Mutex lock;
|
||||
list<Cond*> cond;
|
||||
const bool use_perf;
|
||||
bool shutting_down = false;
|
||||
Cond shutdown_clear;
|
||||
|
||||
public:
|
||||
Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
|
||||
@ -259,6 +262,7 @@ private:
|
||||
uint64_t m_current;
|
||||
int m_ret;
|
||||
bool m_ignore_enoent;
|
||||
uint32_t waiters = 0;
|
||||
};
|
||||
|
||||
|
||||
@ -288,6 +292,7 @@ private:
|
||||
class OrderedThrottle {
|
||||
public:
|
||||
OrderedThrottle(uint64_t max, bool ignore_enoent);
|
||||
~OrderedThrottle();
|
||||
|
||||
C_OrderedThrottle *start_op(Context *on_finish);
|
||||
void end_op(int r);
|
||||
@ -326,6 +331,7 @@ private:
|
||||
TidResult m_tid_result;
|
||||
|
||||
void complete_pending_ops();
|
||||
uint32_t waiters = 0;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include "common/Thread.h"
|
||||
#include "common/Throttle.h"
|
||||
#include "common/ceph_argparse.h"
|
||||
#include "common/backport14.h"
|
||||
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
@ -57,7 +58,6 @@ protected:
|
||||
return NULL;
|
||||
}
|
||||
};
|
||||
|
||||
};
|
||||
|
||||
TEST_F(ThrottleTest, Throttle) {
|
||||
@ -216,42 +216,30 @@ TEST_F(ThrottleTest, wait) {
|
||||
} while(!waited);
|
||||
}
|
||||
|
||||
|
||||
TEST_F(ThrottleTest, destructor) {
|
||||
Thread_get *t;
|
||||
{
|
||||
int64_t throttle_max = 10;
|
||||
Throttle *throttle = new Throttle(g_ceph_context, "throttle", throttle_max);
|
||||
EXPECT_DEATH({
|
||||
int64_t throttle_max = 10;
|
||||
auto throttle = ceph::make_unique<Throttle>(g_ceph_context, "throttle",
|
||||
throttle_max);
|
||||
|
||||
ASSERT_FALSE(throttle->get(5));
|
||||
|
||||
t = new Thread_get(*throttle, 7);
|
||||
t->create("t_throttle");
|
||||
bool blocked;
|
||||
useconds_t delay = 1;
|
||||
do {
|
||||
usleep(delay);
|
||||
if (throttle->get_or_fail(1)) {
|
||||
throttle->put(1);
|
||||
blocked = false;
|
||||
} else {
|
||||
blocked = true;
|
||||
}
|
||||
delay *= 2;
|
||||
} while(!blocked);
|
||||
delete throttle;
|
||||
}
|
||||
|
||||
{ //
|
||||
// The thread is left hanging, otherwise it will abort().
|
||||
// Deleting the Throttle on which it is waiting creates a
|
||||
// inconsistency that will be detected: the Throttle object that
|
||||
// it references no longer exists.
|
||||
//
|
||||
pthread_t id = t->get_thread_id();
|
||||
ASSERT_EQ(pthread_kill(id, 0), 0);
|
||||
delete t;
|
||||
ASSERT_EQ(pthread_kill(id, 0), 0);
|
||||
}
|
||||
ASSERT_FALSE(throttle->get(5));
|
||||
unique_ptr<Thread_get> t = ceph::make_unique<Thread_get>(*throttle, 7);
|
||||
t->create("t_throttle");
|
||||
bool blocked;
|
||||
useconds_t delay = 1;
|
||||
do {
|
||||
usleep(delay);
|
||||
if (throttle->get_or_fail(1)) {
|
||||
throttle->put(1);
|
||||
blocked = false;
|
||||
} else {
|
||||
blocked = true;
|
||||
}
|
||||
delay *= 2;
|
||||
} while (!blocked);
|
||||
}, ".*");
|
||||
}
|
||||
|
||||
std::pair<double, std::chrono::duration<double> > test_backoff(
|
||||
@ -363,6 +351,25 @@ std::pair<double, std::chrono::duration<double> > test_backoff(
|
||||
wait_time / waits);
|
||||
}
|
||||
|
||||
TEST(BackoffThrottle, destruct) {
|
||||
EXPECT_DEATH({
|
||||
auto throttle = ceph::make_unique<BackoffThrottle>(
|
||||
g_ceph_context, "destructor test", 10);
|
||||
ASSERT_TRUE(throttle->set_params(0.4, 0.6, 1000, 2, 10, 6, nullptr));
|
||||
|
||||
throttle->get(5);
|
||||
{
|
||||
auto& t = *throttle;
|
||||
std::thread([&t]() {
|
||||
usleep(5);
|
||||
t.get(6);
|
||||
});
|
||||
}
|
||||
// No equivalent of get_or_fail()
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||
}, ".*");
|
||||
}
|
||||
|
||||
TEST(BackoffThrottle, undersaturated)
|
||||
{
|
||||
auto results = test_backoff(
|
||||
@ -417,6 +424,22 @@ TEST(BackoffThrottle, oversaturated)
|
||||
ASSERT_GT(results.second.count(), 0.0005);
|
||||
}
|
||||
|
||||
TEST(OrderedThrottle, destruct) {
|
||||
EXPECT_DEATH({
|
||||
auto throttle = ceph::make_unique<OrderedThrottle>(1, false);
|
||||
throttle->start_op(nullptr);
|
||||
{
|
||||
auto& t = *throttle;
|
||||
std::thread([&t]() {
|
||||
usleep(5);
|
||||
t.start_op(nullptr);
|
||||
});
|
||||
}
|
||||
// No equivalent of get_or_fail()
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||
}, ".*");
|
||||
}
|
||||
|
||||
/*
|
||||
* Local Variables:
|
||||
* compile-command: "cd ../.. ;
|
||||
@ -426,4 +449,3 @@ TEST(BackoffThrottle, oversaturated)
|
||||
* "
|
||||
* End:
|
||||
*/
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user