mirror of
https://github.com/ceph/ceph
synced 2025-01-20 18:21:57 +00:00
librbd: limit IO per second by TokenBucketThrottle
Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
This commit is contained in:
parent
8366ebceb5
commit
bf4e454a22
9
qa/suites/rbd/thrash/workloads/rbd_fsx_rate_limit.yaml
Normal file
9
qa/suites/rbd/thrash/workloads/rbd_fsx_rate_limit.yaml
Normal file
@ -0,0 +1,9 @@
|
||||
tasks:
|
||||
- rbd_fsx:
|
||||
clients: [client.0]
|
||||
ops: 6000
|
||||
overrides:
|
||||
ceph:
|
||||
conf:
|
||||
client:
|
||||
rbd qos iops limit: 50
|
@ -5419,6 +5419,10 @@ static std::vector<Option> get_rbd_options() {
|
||||
Option("rbd_journal_max_concurrent_object_sets", Option::TYPE_INT, Option::LEVEL_ADVANCED)
|
||||
.set_default(0)
|
||||
.set_description("maximum number of object sets a journal client can be behind before it is automatically unregistered"),
|
||||
|
||||
Option("rbd_qos_iops_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
|
||||
.set_default(0)
|
||||
.set_description("the desired limit of IO operations per second"),
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -1003,7 +1003,8 @@ struct C_InvalidateCache : public Context {
|
||||
"rbd_journal_max_concurrent_object_sets", false)(
|
||||
"rbd_mirroring_resync_after_disconnect", false)(
|
||||
"rbd_mirroring_replay_delay", false)(
|
||||
"rbd_skip_partial_discard", false);
|
||||
"rbd_skip_partial_discard", false)(
|
||||
"rbd_qos_iops_limit", false);
|
||||
|
||||
md_config_t local_config_t;
|
||||
std::map<std::string, bufferlist> res;
|
||||
@ -1064,6 +1065,7 @@ struct C_InvalidateCache : public Context {
|
||||
ASSIGN_OPTION(mirroring_replay_delay, int64_t);
|
||||
ASSIGN_OPTION(skip_partial_discard, bool);
|
||||
ASSIGN_OPTION(blkin_trace_all, bool);
|
||||
ASSIGN_OPTION(qos_iops_limit, uint64_t);
|
||||
|
||||
if (thread_safe) {
|
||||
ASSIGN_OPTION(journal_pool, std::string);
|
||||
@ -1072,6 +1074,8 @@ struct C_InvalidateCache : public Context {
|
||||
if (sparse_read_threshold_bytes == 0) {
|
||||
sparse_read_threshold_bytes = get_object_size();
|
||||
}
|
||||
|
||||
io_work_queue->apply_qos_iops_limit(qos_iops_limit);
|
||||
}
|
||||
|
||||
ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {
|
||||
|
@ -198,6 +198,7 @@ namespace librbd {
|
||||
int mirroring_replay_delay;
|
||||
bool skip_partial_discard;
|
||||
bool blkin_trace_all;
|
||||
uint64_t qos_iops_limit;
|
||||
|
||||
LibrbdAdminSocketHook *asok_hook;
|
||||
|
||||
|
@ -99,6 +99,14 @@ public:
|
||||
return m_trace;
|
||||
}
|
||||
|
||||
bool was_throttled() {
|
||||
return m_throttled;
|
||||
}
|
||||
|
||||
void set_throttled() {
|
||||
m_throttled = true;
|
||||
}
|
||||
|
||||
protected:
|
||||
typedef std::list<ObjectRequestHandle *> ObjectRequests;
|
||||
|
||||
@ -107,6 +115,7 @@ protected:
|
||||
Extents m_image_extents;
|
||||
ZTracer::Trace m_trace;
|
||||
bool m_bypass_image_cache = false;
|
||||
bool m_throttled = false;
|
||||
|
||||
ImageRequest(ImageCtxT &image_ctx, AioCompletion *aio_comp,
|
||||
Extents &&image_extents, const char *trace_name,
|
||||
|
@ -69,9 +69,21 @@ ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
|
||||
m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) {
|
||||
CephContext *cct = m_image_ctx.cct;
|
||||
ldout(cct, 5) << "ictx=" << image_ctx << dendl;
|
||||
|
||||
SafeTimer *timer;
|
||||
Mutex *timer_lock;
|
||||
ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
|
||||
|
||||
iops_throttle = new TokenBucketThrottle(
|
||||
cct, 0, 0, timer, timer_lock);
|
||||
this->register_work_queue();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
ImageRequestWQ<I>::~ImageRequestWQ() {
|
||||
delete iops_throttle;
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
|
||||
ReadResult &&read_result, int op_flags) {
|
||||
@ -541,6 +553,21 @@ void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
|
||||
}
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageRequestWQ<I>::apply_qos_iops_limit(uint64_t limit) {
|
||||
iops_throttle->set_max(limit);
|
||||
iops_throttle->set_average(limit);
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageRequestWQ<I>::handle_iops_throttle_ready(int r, ImageRequest<I> *item) {
|
||||
assert(m_io_blockers.load() > 0);
|
||||
--m_io_blockers;
|
||||
item->set_throttled();
|
||||
this->requeue(item);
|
||||
this->signal();
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void *ImageRequestWQ<I>::_void_dequeue() {
|
||||
CephContext *cct = m_image_ctx.cct;
|
||||
@ -574,6 +601,14 @@ void *ImageRequestWQ<I>::_void_dequeue() {
|
||||
ThreadPool::PointerWQ<ImageRequest<I> >::_void_dequeue());
|
||||
assert(peek_item == item);
|
||||
|
||||
if (!item->was_throttled() &&
|
||||
iops_throttle->get<ImageRequestWQ<I>, ImageRequest<I>,
|
||||
&ImageRequestWQ<I>::handle_iops_throttle_ready>(1, this, item)) {
|
||||
// io was queued into blockers list and wait for tokens.
|
||||
++m_io_blockers;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (lock_required) {
|
||||
this->get_pool_lock().Unlock();
|
||||
m_image_ctx.owner_lock.get_read();
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#include "include/Context.h"
|
||||
#include "common/RWLock.h"
|
||||
#include "common/Throttle.h"
|
||||
#include "common/WorkQueue.h"
|
||||
#include "librbd/io/Types.h"
|
||||
|
||||
@ -28,6 +29,7 @@ class ImageRequestWQ
|
||||
public:
|
||||
ImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti,
|
||||
ThreadPool *tp);
|
||||
~ImageRequestWQ();
|
||||
|
||||
ssize_t read(uint64_t off, uint64_t len, ReadResult &&read_result,
|
||||
int op_flags);
|
||||
@ -70,6 +72,8 @@ public:
|
||||
|
||||
void set_require_lock(Direction direction, bool enabled);
|
||||
|
||||
void apply_qos_iops_limit(uint64_t limit);
|
||||
|
||||
protected:
|
||||
void *_void_dequeue() override;
|
||||
void process(ImageRequest<ImageCtxT> *req) override;
|
||||
@ -93,6 +97,8 @@ private:
|
||||
std::atomic<unsigned> m_in_flight_writes { 0 };
|
||||
std::atomic<unsigned> m_io_blockers { 0 };
|
||||
|
||||
TokenBucketThrottle *iops_throttle;
|
||||
|
||||
bool m_shutdown = false;
|
||||
Context *m_on_shutdown = nullptr;
|
||||
|
||||
@ -119,6 +125,8 @@ private:
|
||||
void handle_acquire_lock(int r, ImageRequest<ImageCtxT> *req);
|
||||
void handle_refreshed(int r, ImageRequest<ImageCtxT> *req);
|
||||
void handle_blocked_writes(int r);
|
||||
|
||||
void handle_iops_throttle_ready(int r, ImageRequest<ImageCtxT> *item);
|
||||
};
|
||||
|
||||
} // namespace io
|
||||
|
@ -44,6 +44,8 @@ struct ImageRequest<librbd::MockTestImageCtx> {
|
||||
MOCK_CONST_METHOD0(start_op, void());
|
||||
MOCK_CONST_METHOD0(send, void());
|
||||
MOCK_CONST_METHOD1(fail, void(int));
|
||||
MOCK_CONST_METHOD0(was_throttled, bool());
|
||||
MOCK_CONST_METHOD0(set_throttled, void());
|
||||
|
||||
ImageRequest() {
|
||||
s_instance = this;
|
||||
|
Loading…
Reference in New Issue
Block a user