Merge pull request #17032 from yangdongsheng/rbd_qos

rbd: implement image qos in tokenbucket algorithm

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2017-11-14 18:57:16 -05:00
commit 09070016f5
10 changed files with 278 additions and 1 deletions

View File

@ -0,0 +1,9 @@
tasks:
- rbd_fsx:
clients: [client.0]
ops: 6000
overrides:
ceph:
conf:
client:
rbd qos iops limit: 50

View File

@ -650,3 +650,127 @@ void OrderedThrottle::complete_pending_ops(UNIQUE_LOCK_T(m_lock)& l) {
++m_complete_tid; ++m_complete_tid;
} }
} }
uint64_t TokenBucketThrottle::Bucket::get(uint64_t c) {
if (0 == max) {
return 0;
}
uint64_t got = 0;
if (remain >= c) {
// There is enough token in bucket, take c.
got = c;
remain -= c;
} else {
// There is not enough, take all remain.
got = remain;
remain = 0;
}
return got;
}
uint64_t TokenBucketThrottle::Bucket::put(uint64_t c) {
if (0 == max) {
return 0;
}
if (c) {
// put c tokens into bucket
uint64_t current = remain;
if ((current + c) <= (uint64_t)max) {
remain += c;
} else {
remain = (uint64_t)max;
}
}
return remain;
}
void TokenBucketThrottle::Bucket::set_max(uint64_t m) {
if (remain > m)
remain = m;
max = m;
}
TokenBucketThrottle::TokenBucketThrottle(
CephContext *cct,
uint64_t capacity,
uint64_t avg,
SafeTimer *timer,
Mutex *timer_lock)
: m_cct(cct), m_throttle(m_cct, "token_bucket_throttle", capacity),
m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
m_lock("token_bucket_throttle_lock")
{
Mutex::Locker timer_locker(*m_timer_lock);
schedule_timer();
}
TokenBucketThrottle::~TokenBucketThrottle()
{
// cancel the timer events.
{
Mutex::Locker timer_locker(*m_timer_lock);
cancel_timer();
}
list<Blocker> tmp_blockers;
{
Mutex::Locker blockers_lock(m_lock);
tmp_blockers.splice(tmp_blockers.begin(), m_blockers, m_blockers.begin(), m_blockers.end());
}
for (auto b : tmp_blockers) {
b.ctx->complete(0);
}
}
void TokenBucketThrottle::set_max(uint64_t m) {
Mutex::Locker lock(m_lock);
m_throttle.set_max(m);
}
void TokenBucketThrottle::set_average(uint64_t avg) {
m_avg = avg;
}
void TokenBucketThrottle::add_tokens() {
list<Blocker> tmp_blockers;
{
// put m_avg tokens into bucket.
Mutex::Locker lock(m_lock);
m_throttle.put(m_avg);
// check the m_blockers from head to tail, if blocker can get
// enough tokens, let it go.
while (!m_blockers.empty()) {
Blocker blocker = m_blockers.front();
uint64_t got = m_throttle.get(blocker.tokens_requested);
if (got == blocker.tokens_requested) {
// got enough tokens for front.
tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin());
} else {
// there is no more tokens.
blocker.tokens_requested -= got;
break;
}
}
}
for (auto b : tmp_blockers) {
b.ctx->complete(0);
}
}
void TokenBucketThrottle::schedule_timer() {
add_tokens();
m_token_ctx = new FunctionContext(
[this](int r) {
schedule_timer();
});
m_timer->add_event_after(1, m_token_ctx);
}
void TokenBucketThrottle::cancel_timer() {
m_timer->cancel_event(m_token_ctx);
}

View File

@ -13,6 +13,7 @@
#include <mutex> #include <mutex>
#include "include/Context.h" #include "include/Context.h"
#include "common/Timer.h"
#include "common/convenience.h" #include "common/convenience.h"
#include "common/perf_counters.h" #include "common/perf_counters.h"
@ -330,4 +331,76 @@ private:
uint32_t waiters = 0; uint32_t waiters = 0;
}; };
class TokenBucketThrottle {
struct Bucket {
CephContext *cct;
const std::string name;
std::atomic<uint64_t> remain = { 0 }, max = { 0 };
Bucket(CephContext *cct, const std::string& n, uint64_t m)
: cct(cct), name(n),
remain(m), max(m)
{
}
uint64_t get(uint64_t c);
uint64_t put(uint64_t c);
void set_max(uint64_t m);
};
struct Blocker {
uint64_t tokens_requested;
Context *ctx;
Blocker(uint64_t _tokens_requested, Context* _ctx)
: tokens_requested(_tokens_requested), ctx(_ctx) {}
};
CephContext *m_cct;
Bucket m_throttle;
uint64_t m_avg = 0;
SafeTimer *m_timer;
Mutex *m_timer_lock;
FunctionContext *m_token_ctx = nullptr;
list<Blocker> m_blockers;
Mutex m_lock;
public:
TokenBucketThrottle(CephContext *cct, uint64_t capacity, uint64_t avg,
SafeTimer *timer, Mutex *timer_lock);
~TokenBucketThrottle();
template <typename T, typename I, void(T::*MF)(int, I*)>
bool get(uint64_t c, T *handler, I *item) {
if (0 == m_throttle.max)
return false;
bool waited = false;
Mutex::Locker lock(m_lock);
uint64_t got = m_throttle.get(c);
if (got < c) {
// Not enough tokens, add a blocker for it.
Context *ctx = new FunctionContext([this, handler, item](int r) {
(handler->*MF)(r, item);
});
m_blockers.emplace_back(c - got, ctx);
waited = true;
}
return waited;
}
void set_max(uint64_t m);
void set_average(uint64_t avg);
private:
void add_tokens();
void schedule_timer();
void cancel_timer();
};
#endif #endif

View File

@ -5814,6 +5814,10 @@ static std::vector<Option> get_rbd_options() {
Option("rbd_journal_max_concurrent_object_sets", Option::TYPE_INT, Option::LEVEL_ADVANCED) Option("rbd_journal_max_concurrent_object_sets", Option::TYPE_INT, Option::LEVEL_ADVANCED)
.set_default(0) .set_default(0)
.set_description("maximum number of object sets a journal client can be behind before it is automatically unregistered"), .set_description("maximum number of object sets a journal client can be behind before it is automatically unregistered"),
Option("rbd_qos_iops_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description("the desired limit of IO operations per second"),
}); });
} }

View File

@ -1021,7 +1021,8 @@ struct C_InvalidateCache : public Context {
"rbd_journal_max_concurrent_object_sets", false)( "rbd_journal_max_concurrent_object_sets", false)(
"rbd_mirroring_resync_after_disconnect", false)( "rbd_mirroring_resync_after_disconnect", false)(
"rbd_mirroring_replay_delay", false)( "rbd_mirroring_replay_delay", false)(
"rbd_skip_partial_discard", false); "rbd_skip_partial_discard", false)(
"rbd_qos_iops_limit", false);
md_config_t local_config_t; md_config_t local_config_t;
std::map<std::string, bufferlist> res; std::map<std::string, bufferlist> res;
@ -1082,6 +1083,7 @@ struct C_InvalidateCache : public Context {
ASSIGN_OPTION(mirroring_replay_delay, int64_t); ASSIGN_OPTION(mirroring_replay_delay, int64_t);
ASSIGN_OPTION(skip_partial_discard, bool); ASSIGN_OPTION(skip_partial_discard, bool);
ASSIGN_OPTION(blkin_trace_all, bool); ASSIGN_OPTION(blkin_trace_all, bool);
ASSIGN_OPTION(qos_iops_limit, uint64_t);
if (thread_safe) { if (thread_safe) {
ASSIGN_OPTION(journal_pool, std::string); ASSIGN_OPTION(journal_pool, std::string);
@ -1090,6 +1092,8 @@ struct C_InvalidateCache : public Context {
if (sparse_read_threshold_bytes == 0) { if (sparse_read_threshold_bytes == 0) {
sparse_read_threshold_bytes = get_object_size(); sparse_read_threshold_bytes = get_object_size();
} }
io_work_queue->apply_qos_iops_limit(qos_iops_limit);
} }
ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() { ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {

View File

@ -199,6 +199,7 @@ namespace librbd {
int mirroring_replay_delay; int mirroring_replay_delay;
bool skip_partial_discard; bool skip_partial_discard;
bool blkin_trace_all; bool blkin_trace_all;
uint64_t qos_iops_limit;
LibrbdAdminSocketHook *asok_hook; LibrbdAdminSocketHook *asok_hook;

View File

@ -99,6 +99,14 @@ public:
return m_trace; return m_trace;
} }
bool was_throttled() {
return m_throttled;
}
void set_throttled() {
m_throttled = true;
}
protected: protected:
typedef std::list<ObjectRequestHandle *> ObjectRequests; typedef std::list<ObjectRequestHandle *> ObjectRequests;
@ -107,6 +115,7 @@ protected:
Extents m_image_extents; Extents m_image_extents;
ZTracer::Trace m_trace; ZTracer::Trace m_trace;
bool m_bypass_image_cache = false; bool m_bypass_image_cache = false;
bool m_throttled = false;
ImageRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp, ImageRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
Extents &&image_extents, const char *trace_name, Extents &&image_extents, const char *trace_name,

View File

@ -69,9 +69,21 @@ ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) { m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) {
CephContext *cct = m_image_ctx.cct; CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << "ictx=" << image_ctx << dendl; ldout(cct, 5) << "ictx=" << image_ctx << dendl;
SafeTimer *timer;
Mutex *timer_lock;
ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
iops_throttle = new TokenBucketThrottle(
cct, 0, 0, timer, timer_lock);
this->register_work_queue(); this->register_work_queue();
} }
template <typename I>
ImageRequestWQ<I>::~ImageRequestWQ() {
delete iops_throttle;
}
template <typename I> template <typename I>
ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len, ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
ReadResult &&read_result, int op_flags) { ReadResult &&read_result, int op_flags) {
@ -541,6 +553,25 @@ void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
} }
} }
template <typename I>
void ImageRequestWQ<I>::apply_qos_iops_limit(uint64_t limit) {
iops_throttle->set_max(limit);
iops_throttle->set_average(limit);
}
template <typename I>
void ImageRequestWQ<I>::handle_iops_throttle_ready(int r,
ImageRequest<I> *item) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl;
assert(m_io_blockers.load() > 0);
--m_io_blockers;
item->set_throttled();
this->requeue(item);
this->signal();
}
template <typename I> template <typename I>
void *ImageRequestWQ<I>::_void_dequeue() { void *ImageRequestWQ<I>::_void_dequeue() {
CephContext *cct = m_image_ctx.cct; CephContext *cct = m_image_ctx.cct;
@ -551,6 +582,18 @@ void *ImageRequestWQ<I>::_void_dequeue() {
return nullptr; return nullptr;
} }
if (!peek_item->was_throttled() &&
iops_throttle->get<
ImageRequestWQ<I>, ImageRequest<I>,
&ImageRequestWQ<I>::handle_iops_throttle_ready>(1, this, peek_item)) {
ldout(cct, 15) << "throttling IO " << peek_item << dendl;
// dequeue the throttled item and block future IO
ThreadPool::PointerWQ<ImageRequest<I> >::_void_dequeue();
++m_io_blockers;
return nullptr;
}
bool lock_required; bool lock_required;
bool refresh_required = m_image_ctx.state->is_refresh_required(); bool refresh_required = m_image_ctx.state->is_refresh_required();
{ {

View File

@ -6,6 +6,7 @@
#include "include/Context.h" #include "include/Context.h"
#include "common/RWLock.h" #include "common/RWLock.h"
#include "common/Throttle.h"
#include "common/WorkQueue.h" #include "common/WorkQueue.h"
#include "librbd/io/Types.h" #include "librbd/io/Types.h"
@ -28,6 +29,7 @@ class ImageRequestWQ
public: public:
ImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti, ImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti,
ThreadPool *tp); ThreadPool *tp);
~ImageRequestWQ();
ssize_t read(uint64_t off, uint64_t len, ReadResult &&read_result, ssize_t read(uint64_t off, uint64_t len, ReadResult &&read_result,
int op_flags); int op_flags);
@ -70,6 +72,8 @@ public:
void set_require_lock(Direction direction, bool enabled); void set_require_lock(Direction direction, bool enabled);
void apply_qos_iops_limit(uint64_t limit);
protected: protected:
void *_void_dequeue() override; void *_void_dequeue() override;
void process(ImageRequest<ImageCtxT> *req) override; void process(ImageRequest<ImageCtxT> *req) override;
@ -93,6 +97,8 @@ private:
std::atomic<unsigned> m_in_flight_writes { 0 }; std::atomic<unsigned> m_in_flight_writes { 0 };
std::atomic<unsigned> m_io_blockers { 0 }; std::atomic<unsigned> m_io_blockers { 0 };
TokenBucketThrottle *iops_throttle;
bool m_shutdown = false; bool m_shutdown = false;
Context *m_on_shutdown = nullptr; Context *m_on_shutdown = nullptr;
@ -119,6 +125,8 @@ private:
void handle_acquire_lock(int r, ImageRequest<ImageCtxT> *req); void handle_acquire_lock(int r, ImageRequest<ImageCtxT> *req);
void handle_refreshed(int r, ImageRequest<ImageCtxT> *req); void handle_refreshed(int r, ImageRequest<ImageCtxT> *req);
void handle_blocked_writes(int r); void handle_blocked_writes(int r);
void handle_iops_throttle_ready(int r, ImageRequest<ImageCtxT> *item);
}; };
} // namespace io } // namespace io

View File

@ -44,6 +44,8 @@ struct ImageRequest<librbd::MockTestImageCtx> {
MOCK_CONST_METHOD0(start_op, void()); MOCK_CONST_METHOD0(start_op, void());
MOCK_CONST_METHOD0(send, void()); MOCK_CONST_METHOD0(send, void());
MOCK_CONST_METHOD1(fail, void(int)); MOCK_CONST_METHOD1(fail, void(int));
MOCK_CONST_METHOD0(was_throttled, bool());
MOCK_CONST_METHOD0(set_throttled, void());
ImageRequest() { ImageRequest() {
s_instance = this; s_instance = this;