Merge pull request #21635 from yangdongsheng/qos_read_write

librbd: support bps throttle and throttle read and write seperately.

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2018-06-19 17:32:15 -04:00 committed by GitHub
commit c707f0fa88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 319 additions and 54 deletions

View File

@ -685,7 +685,7 @@ uint64_t TokenBucketThrottle::Bucket::put(uint64_t c) {
}
void TokenBucketThrottle::Bucket::set_max(uint64_t m) {
if (remain > m)
if (remain > m || max == 0)
remain = m;
max = m;
}

View File

@ -373,28 +373,40 @@ public:
SafeTimer *timer, Mutex *timer_lock);
~TokenBucketThrottle();
template <typename T, typename I, void(T::*MF)(int, I*, uint64_t)>
void add_blocker(uint64_t c, T *handler, I *item, uint64_t flag) {
Context *ctx = new FunctionContext([handler, item, flag](int r) {
(handler->*MF)(r, item, flag);
});
m_blockers.emplace_back(c, ctx);
}
template <typename T, typename I, void(T::*MF)(int, I*)>
bool get(uint64_t c, T *handler, I *item) {
template <typename T, typename I, void(T::*MF)(int, I*, uint64_t)>
bool get(uint64_t c, T *handler, I *item, uint64_t flag) {
if (0 == m_throttle.max)
return false;
bool waited = false;
bool wait = false;
uint64_t got = 0;
Mutex::Locker lock(m_lock);
uint64_t got = m_throttle.get(c);
if (got < c) {
// Not enough tokens, add a blocker for it.
Context *ctx = new FunctionContext([handler, item](int r) {
(handler->*MF)(r, item);
});
m_blockers.emplace_back(c - got, ctx);
waited = true;
if (!m_blockers.empty()) {
// Keep the order of requests, add item after previous blocked requests.
wait = true;
} else {
got = m_throttle.get(c);
if (got < c) {
// Not enough tokens, add a blocker for it.
wait = true;
}
}
return waited;
if (wait)
add_blocker<T, I, MF>(c - got, handler, item, flag);
return wait;
}
void set_max(uint64_t m);
void set_average(uint64_t avg);

View File

@ -6466,6 +6466,26 @@ static std::vector<Option> get_rbd_options() {
.set_default(0)
.set_description("the desired limit of IO operations per second"),
Option("rbd_qos_bps_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description("the desired limit of IO bytes per second"),
Option("rbd_qos_read_iops_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description("the desired limit of read operations per second"),
Option("rbd_qos_write_iops_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description("the desired limit of write operations per second"),
Option("rbd_qos_read_bps_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description("the desired limit of read bytes per second"),
Option("rbd_qos_write_bps_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description("the desired limit of write bytes per second"),
Option("rbd_discard_on_zeroed_write_same", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(true)
.set_description("discard data on zeroed write same instead of writing zero"),

View File

@ -771,7 +771,12 @@ public:
"rbd_mirroring_delete_delay", false)(
"rbd_mirroring_replay_delay", false)(
"rbd_skip_partial_discard", false)(
"rbd_qos_iops_limit", false);
"rbd_qos_iops_limit", false)(
"rbd_qos_bps_limit", false)(
"rbd_qos_read_iops_limit", false)(
"rbd_qos_write_iops_limit", false)(
"rbd_qos_read_bps_limit", false)(
"rbd_qos_write_bps_limit", false);
md_config_t local_config_t;
std::map<std::string, bufferlist> res;
@ -834,6 +839,11 @@ public:
ASSIGN_OPTION(skip_partial_discard, bool);
ASSIGN_OPTION(blkin_trace_all, bool);
ASSIGN_OPTION(qos_iops_limit, uint64_t);
ASSIGN_OPTION(qos_bps_limit, uint64_t);
ASSIGN_OPTION(qos_read_iops_limit, uint64_t);
ASSIGN_OPTION(qos_write_iops_limit, uint64_t);
ASSIGN_OPTION(qos_read_bps_limit, uint64_t);
ASSIGN_OPTION(qos_write_bps_limit, uint64_t);
if (thread_safe) {
ASSIGN_OPTION(journal_pool, std::string);
@ -843,7 +853,12 @@ public:
sparse_read_threshold_bytes = get_object_size();
}
io_work_queue->apply_qos_iops_limit(qos_iops_limit);
io_work_queue->apply_qos_limit(qos_iops_limit, RBD_QOS_IOPS_THROTTLE);
io_work_queue->apply_qos_limit(qos_bps_limit, RBD_QOS_BPS_THROTTLE);
io_work_queue->apply_qos_limit(qos_read_iops_limit, RBD_QOS_READ_IOPS_THROTTLE);
io_work_queue->apply_qos_limit(qos_write_iops_limit, RBD_QOS_WRITE_IOPS_THROTTLE);
io_work_queue->apply_qos_limit(qos_read_bps_limit, RBD_QOS_READ_BPS_THROTTLE);
io_work_queue->apply_qos_limit(qos_write_bps_limit, RBD_QOS_WRITE_BPS_THROTTLE);
}
ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {

View File

@ -198,6 +198,11 @@ namespace librbd {
bool skip_partial_discard;
bool blkin_trace_all;
uint64_t qos_iops_limit;
uint64_t qos_bps_limit;
uint64_t qos_read_iops_limit;
uint64_t qos_write_iops_limit;
uint64_t qos_read_bps_limit;
uint64_t qos_write_bps_limit;
LibrbdAdminSocketHook *asok_hook;

View File

@ -71,6 +71,45 @@ struct ImageDispatchSpec<I>::IsWriteOpVisitor
}
};
template <typename I>
struct ImageDispatchSpec<I>::TokenRequestedVisitor
: public boost::static_visitor<uint64_t> {
ImageDispatchSpec* spec;
uint64_t flag;
TokenRequestedVisitor(ImageDispatchSpec* spec, uint64_t _flag)
: spec(spec), flag(_flag) {
}
uint64_t operator()(const Read&) const {
if (flag & RBD_QOS_WRITE_MASK) {
return 0;
}
if (flag & RBD_QOS_BPS_MASK) {
return spec->extents_length();
}
return 1;
}
template <typename T>
uint64_t operator()(const Flush&) const {
return 0;
}
template <typename T>
uint64_t operator()(const T&) const {
if (flag & RBD_QOS_READ_MASK) {
return 0;
}
if (flag & RBD_QOS_BPS_MASK) {
return spec->extents_length();
}
return 1;
}
};
template <typename I>
void ImageDispatchSpec<I>::send() {
boost::apply_visitor(SendVisitor{this}, m_request);
@ -82,11 +121,27 @@ void ImageDispatchSpec<I>::fail(int r) {
m_aio_comp->fail(r);
}
template <typename I>
uint64_t ImageDispatchSpec<I>::extents_length() {
uint64_t length = 0;
auto &extents = this->m_image_extents;
for (auto &extent : extents) {
length += extent.second;
}
return length;
}
template <typename I>
bool ImageDispatchSpec<I>::is_write_op() const {
return boost::apply_visitor(IsWriteOpVisitor{}, m_request);
}
template <typename I>
uint64_t ImageDispatchSpec<I>::tokens_requested(uint64_t flag) {
return boost::apply_visitor(TokenRequestedVisitor{this, flag}, m_request);
}
template <typename I>
void ImageDispatchSpec<I>::start_op() {
m_aio_comp->start_op();

View File

@ -129,11 +129,18 @@ public:
void start_op();
bool was_throttled() {
return m_throttled;
uint64_t tokens_requested(uint64_t flag);
bool was_throttled(uint64_t flag) {
return m_throttled_flag & flag;
}
void set_throttled() {
m_throttled = true;
void set_throttled(uint64_t flag) {
m_throttled_flag |= flag;
}
bool were_all_throttled() {
return m_throttled_flag & RBD_QOS_MASK;
}
private:
@ -146,6 +153,7 @@ private:
struct SendVisitor;
struct IsWriteOpVisitor;
struct TokenRequestedVisitor;
ImageDispatchSpec(ImageCtxT& image_ctx, AioCompletion* aio_comp,
Extents&& image_extents, Request&& request,
@ -161,9 +169,9 @@ private:
Request m_request;
int m_op_flags;
ZTracer::Trace m_parent_trace;
std::atomic<uint64_t> m_throttled_flag = 0;
bool m_throttled = false;
uint64_t extents_length();
};
} // namespace io

View File

@ -77,6 +77,15 @@ struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
}
};
static std::list<uint64_t> throttle_flags = {
RBD_QOS_IOPS_THROTTLE,
RBD_QOS_BPS_THROTTLE,
RBD_QOS_READ_IOPS_THROTTLE,
RBD_QOS_WRITE_IOPS_THROTTLE,
RBD_QOS_READ_BPS_THROTTLE,
RBD_QOS_WRITE_BPS_THROTTLE
};
template <typename I>
ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
time_t ti, ThreadPool *tp)
@ -90,14 +99,19 @@ ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
Mutex *timer_lock;
ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
iops_throttle = new TokenBucketThrottle(
cct, 0, 0, timer, timer_lock);
for (auto flag : throttle_flags) {
m_throttles.push_back(make_pair(
flag, new TokenBucketThrottle(cct, 0, 0, timer, timer_lock)));
}
this->register_work_queue();
}
template <typename I>
ImageRequestWQ<I>::~ImageRequestWQ() {
delete iops_throttle;
for (auto t : m_throttles) {
delete t.second;
}
}
template <typename I>
@ -577,22 +591,66 @@ void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
}
template <typename I>
void ImageRequestWQ<I>::apply_qos_iops_limit(uint64_t limit) {
iops_throttle->set_max(limit);
iops_throttle->set_average(limit);
void ImageRequestWQ<I>::apply_qos_limit(uint64_t limit, const uint64_t flag) {
TokenBucketThrottle *throttle = nullptr;
for (auto pair : m_throttles) {
if (flag == pair.first) {
throttle = pair.second;
break;
}
}
assert(throttle != nullptr);
throttle->set_max(limit);
throttle->set_average(limit);
if (limit)
m_qos_enabled_flag |= flag;
else
m_qos_enabled_flag &= ~flag;
}
template <typename I>
void ImageRequestWQ<I>::handle_iops_throttle_ready(
int r, ImageDispatchSpec<I> *item) {
void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item, uint64_t flag) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl;
assert(m_io_blockers.load() > 0);
--m_io_blockers;
item->set_throttled();
this->requeue(item);
this->signal();
assert(m_io_throttled.load() > 0);
item->set_throttled(flag);
if (item->were_all_throttled()) {
this->requeue(item);
--m_io_throttled;
this->signal();
}
}
template <typename I>
bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) {
uint64_t tokens = 0;
uint64_t flag = 0;
bool blocked = false;
TokenBucketThrottle* throttle = nullptr;
for (auto t : m_throttles) {
flag = t.first;
if (item->was_throttled(flag))
continue;
if (!(m_qos_enabled_flag & flag)) {
item->set_throttled(flag);
continue;
}
throttle = t.second;
tokens = item->tokens_requested(flag);
if (throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>,
&ImageRequestWQ<I>::handle_throttle_ready>(
tokens, this, item, flag)) {
blocked = true;
} else {
item->set_throttled(flag);
}
}
return blocked;
}
template <typename I>
@ -605,15 +663,12 @@ void *ImageRequestWQ<I>::_void_dequeue() {
return nullptr;
}
if (!peek_item->was_throttled() &&
iops_throttle->get<
ImageRequestWQ<I>, ImageDispatchSpec<I>,
&ImageRequestWQ<I>::handle_iops_throttle_ready>(1, this, peek_item)) {
if (needs_throttle(peek_item)) {
ldout(cct, 15) << "throttling IO " << peek_item << dendl;
// dequeue the throttled item and block future IO
++m_io_throttled;
// dequeue the throttled item
ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue();
++m_io_blockers;
return nullptr;
}

View File

@ -71,11 +71,16 @@ public:
void set_require_lock(Direction direction, bool enabled);
void apply_qos_iops_limit(uint64_t limit);
void apply_qos_limit(uint64_t limit, const uint64_t flag);
protected:
void *_void_dequeue() override;
void process(ImageDispatchSpec<ImageCtxT> *req) override;
bool _empty() override {
return (ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT>>::_empty() &&
m_io_throttled.load() == 0);
}
private:
typedef std::list<Context *> Contexts;
@ -95,8 +100,10 @@ private:
std::atomic<unsigned> m_in_flight_ios { 0 };
std::atomic<unsigned> m_in_flight_writes { 0 };
std::atomic<unsigned> m_io_blockers { 0 };
std::atomic<unsigned> m_io_throttled { 0 };
TokenBucketThrottle *iops_throttle;
std::list<std::pair<uint64_t, TokenBucketThrottle*> > m_throttles;
uint64_t m_qos_enabled_flag = 0;
bool m_shutdown = false;
Context *m_on_shutdown = nullptr;
@ -112,6 +119,8 @@ private:
return (m_queued_writes == 0);
}
bool needs_throttle(ImageDispatchSpec<ImageCtxT> *item);
void finish_queued_io(ImageDispatchSpec<ImageCtxT> *req);
void finish_in_flight_write();
@ -125,7 +134,7 @@ private:
void handle_refreshed(int r, ImageDispatchSpec<ImageCtxT> *req);
void handle_blocked_writes(int r);
void handle_iops_throttle_ready(int r, ImageDispatchSpec<ImageCtxT> *item);
void handle_throttle_ready(int r, ImageDispatchSpec<ImageCtxT> *item, uint64_t flag);
};
} // namespace io

View File

@ -11,6 +11,20 @@
namespace librbd {
namespace io {
#define RBD_QOS_IOPS_THROTTLE 1 << 0
#define RBD_QOS_BPS_THROTTLE 1 << 1
#define RBD_QOS_READ_IOPS_THROTTLE 1 << 2
#define RBD_QOS_WRITE_IOPS_THROTTLE 1 << 3
#define RBD_QOS_READ_BPS_THROTTLE 1 << 4
#define RBD_QOS_WRITE_BPS_THROTTLE 1 << 5
#define RBD_QOS_BPS_MASK (RBD_QOS_BPS_THROTTLE | RBD_QOS_READ_BPS_THROTTLE | RBD_QOS_WRITE_BPS_THROTTLE)
#define RBD_QOS_IOPS_MASK (RBD_QOS_IOPS_THROTTLE | RBD_QOS_READ_IOPS_THROTTLE | RBD_QOS_WRITE_IOPS_THROTTLE)
#define RBD_QOS_READ_MASK (RBD_QOS_READ_BPS_THROTTLE | RBD_QOS_READ_IOPS_THROTTLE)
#define RBD_QOS_WRITE_MASK (RBD_QOS_WRITE_BPS_THROTTLE | RBD_QOS_WRITE_IOPS_THROTTLE)
#define RBD_QOS_MASK (RBD_QOS_BPS_MASK | RBD_QOS_IOPS_MASK)
typedef enum {
AIO_TYPE_NONE = 0,
AIO_TYPE_GENERIC,

View File

@ -62,8 +62,10 @@ struct ImageDispatchSpec<librbd::MockTestImageCtx> {
MOCK_CONST_METHOD0(start_op, void());
MOCK_CONST_METHOD0(send, void());
MOCK_CONST_METHOD1(fail, void(int));
MOCK_CONST_METHOD0(was_throttled, bool());
MOCK_CONST_METHOD0(set_throttled, void());
MOCK_CONST_METHOD1(was_throttled, bool(uint64_t));
MOCK_CONST_METHOD0(were_all_throttled, bool());
MOCK_CONST_METHOD1(set_throttled, void(uint64_t));
MOCK_CONST_METHOD1(tokens_requested, uint64_t(uint64_t));
ImageDispatchSpec() {
s_instance = this;
@ -97,6 +99,7 @@ struct ThreadPool::PointerWQ<librbd::io::ImageDispatchSpec<librbd::MockTestImage
MOCK_METHOD0(drain, void());
MOCK_METHOD0(empty, bool());
MOCK_METHOD0(mock_empty, bool());
MOCK_METHOD0(signal, void());
MOCK_METHOD0(process_finish, void());
@ -125,6 +128,9 @@ struct ThreadPool::PointerWQ<librbd::io::ImageDispatchSpec<librbd::MockTestImage
return dequeue();
}
virtual void process(ImageDispatchSpec *req) = 0;
virtual bool _empty() {
return mock_empty();
}
};
@ -164,6 +170,10 @@ struct TestMockIoImageRequestWQ : public TestMockFixture {
EXPECT_CALL(image_request_wq, queue(_));
}
void expect_requeue(MockImageRequestWQ &image_request_wq) {
EXPECT_CALL(image_request_wq, requeue(_));
}
void expect_front(MockImageRequestWQ &image_request_wq,
MockImageDispatchSpec *image_request) {
EXPECT_CALL(image_request_wq, front()).WillOnce(Return(image_request));
@ -218,10 +228,24 @@ struct TestMockIoImageRequestWQ : public TestMockFixture {
}));
}
void expect_was_throttled(MockImageDispatchSpec &mock_image_request,
bool throttled) {
EXPECT_CALL(mock_image_request, was_throttled())
.WillOnce(Return(throttled));
void expect_set_throttled(MockImageDispatchSpec &mock_image_request) {
EXPECT_CALL(mock_image_request, set_throttled(_)).Times(6);
}
void expect_was_throttled(MockImageDispatchSpec &mock_image_request, bool value) {
EXPECT_CALL(mock_image_request, was_throttled(_)).Times(6).WillRepeatedly(Return(value));
}
void expect_tokens_requested(MockImageDispatchSpec &mock_image_request, uint64_t value) {
EXPECT_CALL(mock_image_request, tokens_requested(_)).WillOnce(Return(value));
}
void expect_all_throttled(MockImageDispatchSpec &mock_image_request, bool value) {
EXPECT_CALL(mock_image_request, were_all_throttled()).WillOnce(Return(value));
}
void expect_start_op(MockImageDispatchSpec &mock_image_request) {
EXPECT_CALL(mock_image_request, start_op()).Times(1);
}
};
@ -248,7 +272,6 @@ TEST_F(TestMockIoImageRequestWQ, AcquireLockError) {
librbd::exclusive_lock::MockPolicy mock_exclusive_lock_policy;
expect_front(mock_image_request_wq, mock_queued_image_request);
expect_was_throttled(*mock_queued_image_request, false);
expect_is_refresh_request(mock_image_ctx, false);
expect_is_write_op(*mock_queued_image_request, true);
expect_dequeue(mock_image_request_wq, mock_queued_image_request);
@ -276,17 +299,19 @@ TEST_F(TestMockIoImageRequestWQ, RefreshError) {
MockTestImageCtx mock_image_ctx(*ictx);
auto mock_queued_image_request = new MockImageDispatchSpec();
expect_was_throttled(*mock_queued_image_request, false);
expect_set_throttled(*mock_queued_image_request);
InSequence seq;
MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
auto mock_queued_image_request = new MockImageDispatchSpec();
expect_is_write_op(*mock_queued_image_request, true);
expect_queue(mock_image_request_wq);
auto *aio_comp = new librbd::io::AioCompletion();
mock_image_request_wq.aio_write(aio_comp, 0, 0, {}, 0);
expect_front(mock_image_request_wq, mock_queued_image_request);
expect_was_throttled(*mock_queued_image_request, false);
expect_is_refresh_request(mock_image_ctx, true);
expect_is_write_op(*mock_queued_image_request, true);
expect_dequeue(mock_image_request_wq, mock_queued_image_request);
@ -306,5 +331,52 @@ TEST_F(TestMockIoImageRequestWQ, RefreshError) {
aio_comp->release();
}
TEST_F(TestMockIoImageRequestWQ, QosNoLimit) {
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
MockTestImageCtx mock_image_ctx(*ictx);
MockImageDispatchSpec mock_queued_image_request;
expect_was_throttled(mock_queued_image_request, false);
expect_set_throttled(mock_queued_image_request);
InSequence seq;
MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
mock_image_request_wq.apply_qos_limit(0, RBD_QOS_BPS_THROTTLE);
expect_front(mock_image_request_wq, &mock_queued_image_request);
expect_is_refresh_request(mock_image_ctx, false);
expect_is_write_op(mock_queued_image_request, true);
expect_dequeue(mock_image_request_wq, &mock_queued_image_request);
expect_start_op(mock_queued_image_request);
ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == &mock_queued_image_request);
}
TEST_F(TestMockIoImageRequestWQ, BPSQos) {
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
MockTestImageCtx mock_image_ctx(*ictx);
MockImageDispatchSpec mock_queued_image_request;
expect_was_throttled(mock_queued_image_request, false);
expect_set_throttled(mock_queued_image_request);
InSequence seq;
MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
mock_image_request_wq.apply_qos_limit(1, RBD_QOS_BPS_THROTTLE);
expect_front(mock_image_request_wq, &mock_queued_image_request);
expect_tokens_requested(mock_queued_image_request, 2);
expect_dequeue(mock_image_request_wq, &mock_queued_image_request);
expect_all_throttled(mock_queued_image_request, true);
expect_requeue(mock_image_request_wq);
expect_signal(mock_image_request_wq);
ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr);
}
} // namespace io
} // namespace librbd