From 4ac70ae196a7aa05ba98bab46ddc7b516a686577 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Wed, 9 Nov 2022 18:38:29 +0530 Subject: [PATCH 1/2] mClock: Add ability to handle high priority operations There are some cases that may require mClock to handle high priority operations before other items in the queue. In order to make this possible, we are introducing a "High Queue" that will hold high priority operations. The high queue will be dequeued before the mClock queue. High queue has been implemented as a priority queue, operations with higher priority will get preference at time of dequeue. Trello: https://trello.com/c/Kelm8z0x/775-qos-add-ability-to-handle-high-priority-operations Signed-off-by: Aishwarya Mathuria --- src/osd/scheduler/mClockScheduler.cc | 55 ++++++++++++++++-- src/osd/scheduler/mClockScheduler.h | 33 ++++++++++- src/test/osd/TestMClockScheduler.cc | 83 ++++++++++++++++++++++++++++ 3 files changed, 165 insertions(+), 6 deletions(-) diff --git a/src/osd/scheduler/mClockScheduler.cc b/src/osd/scheduler/mClockScheduler.cc index be6e00444bc..5aa4f71530e 100644 --- a/src/osd/scheduler/mClockScheduler.cc +++ b/src/osd/scheduler/mClockScheduler.cc @@ -401,6 +401,7 @@ void mClockScheduler::dump(ceph::Formatter &f) const // Display queue sizes f.open_object_section("queue_sizes"); f.dump_int("immediate", immediate.size()); + f.dump_int("high_priority_queue", high_priority.size()); f.dump_int("scheduler", scheduler.request_count()); f.close_section(); @@ -416,15 +417,27 @@ void mClockScheduler::dump(ceph::Formatter &f) const f.open_object_section("mClockQueues"); f.dump_string("queues", display_queues()); f.close_section(); + + f.open_object_section("HighPriorityQueue"); + for (auto it = high_priority.begin(); + it != high_priority.end(); it++) { + f.dump_int("priority", it->first); + f.dump_int("queue_size", it->second.size()); + } + f.close_section(); } void mClockScheduler::enqueue(OpSchedulerItem&& item) { auto id = get_scheduler_id(item); - + unsigned priority = item.get_priority(); + unsigned cutoff = get_io_prio_cut(cct); + // TODO: move this check into OpSchedulerItem, handle backwards compat if (op_scheduler_class::immediate == id.class_id) { immediate.push_front(std::move(item)); + } else if (priority >= cutoff) { + enqueue_high(priority, std::move(item)); } else { int cost = calc_scaled_cost(item.get_cost()); item.set_qos_cost(cost); @@ -442,6 +455,7 @@ void mClockScheduler::enqueue(OpSchedulerItem&& item) dout(20) << __func__ << " client_count: " << scheduler.client_count() << " queue_sizes: [ imm: " << immediate.size() + << " high_priority_queue: " << high_priority.size() << " sched: " << scheduler.request_count() << " ]" << dendl; dout(30) << __func__ << " mClockClients: " @@ -454,9 +468,30 @@ void mClockScheduler::enqueue(OpSchedulerItem&& item) void mClockScheduler::enqueue_front(OpSchedulerItem&& item) { - immediate.push_back(std::move(item)); - // TODO: item may not be immediate, update mclock machinery to permit - // putting the item back in the queue + unsigned priority = item.get_priority(); + unsigned cutoff = get_io_prio_cut(cct); + auto id = get_scheduler_id(item); + + if (op_scheduler_class::immediate == id.class_id) { + immediate.push_back(std::move(item)); + } else if (priority >= cutoff) { + enqueue_high(priority, std::move(item), true); + } else { + // mClock does not support enqueue at front, so we use + // the high queue with priority 0 + enqueue_high(0, std::move(item), true); + } +} + +void mClockScheduler::enqueue_high(unsigned priority, + OpSchedulerItem&& item, + bool front) +{ + if (front) { + high_priority[priority].push_back(std::move(item)); + } else { + high_priority[priority].push_front(std::move(item)); + } } WorkItem mClockScheduler::dequeue() @@ -465,6 +500,18 @@ WorkItem mClockScheduler::dequeue() WorkItem work_item{std::move(immediate.back())}; immediate.pop_back(); return work_item; + } else if (!high_priority.empty()) { + auto iter = high_priority.begin(); + // invariant: high_priority entries are never empty + assert(!iter->second.empty()); + WorkItem ret{std::move(iter->second.back())}; + iter->second.pop_back(); + if (iter->second.empty()) { + // maintain invariant, high priority entries are never empty + high_priority.erase(iter); + } + ceph_assert(std::get_if(&ret)); + return ret; } else { mclock_queue_t::PullReq result = scheduler.pull_request(); if (result.is_future()) { diff --git a/src/osd/scheduler/mClockScheduler.h b/src/osd/scheduler/mClockScheduler.h index c3b79dba44b..65e9c3e38a6 100644 --- a/src/osd/scheduler/mClockScheduler.h +++ b/src/osd/scheduler/mClockScheduler.h @@ -15,6 +15,7 @@ #pragma once +#include #include #include #include @@ -130,7 +131,19 @@ class mClockScheduler : public OpScheduler, md_config_obs_t { true, true, 2>; + using priority_t = unsigned; + using SubQueue = std::map, + std::greater>; + using SubQueueIter = SubQueue::iterator; mclock_queue_t scheduler; + /** + * high_priority + * + * Holds entries to be dequeued in strict order ahead of mClock + * Invariant: entries are never empty + */ + SubQueue high_priority; std::list immediate; static scheduler_id_t get_scheduler_id(const OpSchedulerItem &item) { @@ -143,6 +156,19 @@ class mClockScheduler : public OpScheduler, md_config_obs_t { }; } + static unsigned int get_io_prio_cut(CephContext *cct) { + if (cct->_conf->osd_op_queue_cut_off == "debug_random") { + std::random_device rd; + std::mt19937 random_gen(rd()); + return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; + } else if (cct->_conf->osd_op_queue_cut_off == "high") { + return CEPH_MSG_PRIO_HIGH; + } else { + // default / catch-all is 'low' + return CEPH_MSG_PRIO_LOW; + } + } + public: mClockScheduler(CephContext *cct, uint32_t num_shards, bool is_rotational); ~mClockScheduler() override; @@ -186,7 +212,7 @@ public: // Enqueue op in the back of the regular queue void enqueue(OpSchedulerItem &&item) final; - // Enqueue the op in the front of the regular queue + // Enqueue the op in the front of the high priority queue or the immediate queue (based on priority) void enqueue_front(OpSchedulerItem &&item) final; // Return an op to be dispatch @@ -194,7 +220,7 @@ public: // Returns if the queue is empty bool empty() const final { - return immediate.empty() && scheduler.empty(); + return immediate.empty() && scheduler.empty() && high_priority.empty(); } // Formatted output of the queue @@ -210,6 +236,9 @@ public: const char** get_tracked_conf_keys() const final; void handle_conf_change(const ConfigProxy& conf, const std::set &changed) final; +private: + // Enqueue the op to the high priority queue + void enqueue_high(unsigned prio, OpSchedulerItem &&item, bool front = false); }; } diff --git a/src/test/osd/TestMClockScheduler.cc b/src/test/osd/TestMClockScheduler.cc index 0feb427ec10..40a830000aa 100644 --- a/src/test/osd/TestMClockScheduler.cc +++ b/src/test/osd/TestMClockScheduler.cc @@ -83,6 +83,18 @@ OpSchedulerItem create_item( utime_t(), owner, e); } +template +OpSchedulerItem create_high_prio_item( + unsigned priority, epoch_t e, uint64_t owner, Args&&... args) +{ + // Create high priority item for testing high prio queue + return OpSchedulerItem( + std::make_unique( + std::forward(args)...), + 12, priority, + utime_t(), owner, e); +} + OpSchedulerItem get_item(WorkItem item) { return std::move(std::get(item)); @@ -169,3 +181,74 @@ TEST_F(mClockSchedulerTest, TestMultiClientOrderedEnqueueDequeue) { } ASSERT_TRUE(q.empty()); } + +TEST_F(mClockSchedulerTest, TestHighPriorityQueueEnqueueDequeue) { + ASSERT_TRUE(q.empty()); + for (unsigned i = 200; i < 205; ++i) { + q.enqueue(create_high_prio_item(i, i, client1, op_scheduler_class::client)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + ASSERT_FALSE(q.empty()); + // Higher priority ops should be dequeued first + auto r = get_item(q.dequeue()); + ASSERT_EQ(204u, r.get_map_epoch()); + + r = get_item(q.dequeue()); + ASSERT_EQ(203u, r.get_map_epoch()); + + r = get_item(q.dequeue()); + ASSERT_EQ(202u, r.get_map_epoch()); + + r = get_item(q.dequeue()); + ASSERT_EQ(201u, r.get_map_epoch()); + + r = get_item(q.dequeue()); + ASSERT_EQ(200u, r.get_map_epoch()); + + ASSERT_TRUE(q.empty()); +} + +TEST_F(mClockSchedulerTest, TestAllQueuesEnqueueDequeue) { + ASSERT_TRUE(q.empty()); + + // Insert ops into the mClock queue + for (unsigned i = 100; i < 102; ++i) { + q.enqueue(create_item(i, client1, op_scheduler_class::client)); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + // Insert ops into the immediate queue + for (unsigned i = 103; i < 105; ++i) { + q.enqueue(create_item(i, client1, op_scheduler_class::immediate)); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + // Insert ops into the high queue + for (unsigned i = 200; i < 202; ++i) { + q.enqueue(create_high_prio_item(i, i, client1, op_scheduler_class::client)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + ASSERT_FALSE(q.empty()); + auto r = get_item(q.dequeue()); + // Immediate queue should be dequeued first + ASSERT_EQ(103u, r.get_map_epoch()); + r = get_item(q.dequeue()); + ASSERT_EQ(104u, r.get_map_epoch()); + + // High priority queue should be dequeued second + // higher priority operation first + r = get_item(q.dequeue()); + ASSERT_EQ(201u, r.get_map_epoch()); + r = get_item(q.dequeue()); + ASSERT_EQ(200u, r.get_map_epoch()); + + // mClock queue will be dequeued last + r = get_item(q.dequeue()); + ASSERT_EQ(100u, r.get_map_epoch()); + r = get_item(q.dequeue()); + ASSERT_EQ(101u, r.get_map_epoch()); + + ASSERT_TRUE(q.empty()); +} From 02427fe20422a6b486ee515a6c3a71dae833d04a Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Mon, 6 Feb 2023 15:53:07 +0530 Subject: [PATCH 2/2] mClock: Replace immediate queue with high priority queue With the introduction of a high priority queue in the mClock workflow, we no longer need a separate queue for ops that need to be processed immediately. We will be using the high_priority queue with a very high priority for all items having op_scheduler_class as immediate. Signed-off-by: Aishwarya Mathuria --- src/osd/scheduler/mClockScheduler.cc | 13 ++++--------- src/osd/scheduler/mClockScheduler.h | 7 +++---- src/test/osd/TestMClockScheduler.cc | 4 ++-- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/src/osd/scheduler/mClockScheduler.cc b/src/osd/scheduler/mClockScheduler.cc index 5aa4f71530e..729caa4500f 100644 --- a/src/osd/scheduler/mClockScheduler.cc +++ b/src/osd/scheduler/mClockScheduler.cc @@ -400,7 +400,6 @@ void mClockScheduler::dump(ceph::Formatter &f) const { // Display queue sizes f.open_object_section("queue_sizes"); - f.dump_int("immediate", immediate.size()); f.dump_int("high_priority_queue", high_priority.size()); f.dump_int("scheduler", scheduler.request_count()); f.close_section(); @@ -435,7 +434,7 @@ void mClockScheduler::enqueue(OpSchedulerItem&& item) // TODO: move this check into OpSchedulerItem, handle backwards compat if (op_scheduler_class::immediate == id.class_id) { - immediate.push_front(std::move(item)); + enqueue_high(immediate_class_priority, std::move(item)); } else if (priority >= cutoff) { enqueue_high(priority, std::move(item)); } else { @@ -454,7 +453,7 @@ void mClockScheduler::enqueue(OpSchedulerItem&& item) } dout(20) << __func__ << " client_count: " << scheduler.client_count() - << " queue_sizes: [ imm: " << immediate.size() + << " queue_sizes: [ " << " high_priority_queue: " << high_priority.size() << " sched: " << scheduler.request_count() << " ]" << dendl; @@ -473,7 +472,7 @@ void mClockScheduler::enqueue_front(OpSchedulerItem&& item) auto id = get_scheduler_id(item); if (op_scheduler_class::immediate == id.class_id) { - immediate.push_back(std::move(item)); + enqueue_high(immediate_class_priority, std::move(item), true); } else if (priority >= cutoff) { enqueue_high(priority, std::move(item), true); } else { @@ -496,11 +495,7 @@ void mClockScheduler::enqueue_high(unsigned priority, WorkItem mClockScheduler::dequeue() { - if (!immediate.empty()) { - WorkItem work_item{std::move(immediate.back())}; - immediate.pop_back(); - return work_item; - } else if (!high_priority.empty()) { + if (!high_priority.empty()) { auto iter = high_priority.begin(); // invariant: high_priority entries are never empty assert(!iter->second.empty()); diff --git a/src/osd/scheduler/mClockScheduler.h b/src/osd/scheduler/mClockScheduler.h index 65e9c3e38a6..088df31ee19 100644 --- a/src/osd/scheduler/mClockScheduler.h +++ b/src/osd/scheduler/mClockScheduler.h @@ -135,7 +135,6 @@ class mClockScheduler : public OpScheduler, md_config_obs_t { using SubQueue = std::map, std::greater>; - using SubQueueIter = SubQueue::iterator; mclock_queue_t scheduler; /** * high_priority @@ -144,7 +143,7 @@ class mClockScheduler : public OpScheduler, md_config_obs_t { * Invariant: entries are never empty */ SubQueue high_priority; - std::list immediate; + priority_t immediate_class_priority = std::numeric_limits::max(); static scheduler_id_t get_scheduler_id(const OpSchedulerItem &item) { return scheduler_id_t{ @@ -212,7 +211,7 @@ public: // Enqueue op in the back of the regular queue void enqueue(OpSchedulerItem &&item) final; - // Enqueue the op in the front of the high priority queue or the immediate queue (based on priority) + // Enqueue the op in the front of the high priority queue void enqueue_front(OpSchedulerItem &&item) final; // Return an op to be dispatch @@ -220,7 +219,7 @@ public: // Returns if the queue is empty bool empty() const final { - return immediate.empty() && scheduler.empty() && high_priority.empty(); + return scheduler.empty() && high_priority.empty(); } // Formatted output of the queue diff --git a/src/test/osd/TestMClockScheduler.cc b/src/test/osd/TestMClockScheduler.cc index 40a830000aa..685d20bd842 100644 --- a/src/test/osd/TestMClockScheduler.cc +++ b/src/test/osd/TestMClockScheduler.cc @@ -218,7 +218,7 @@ TEST_F(mClockSchedulerTest, TestAllQueuesEnqueueDequeue) { std::this_thread::sleep_for(std::chrono::microseconds(1)); } - // Insert ops into the immediate queue + // Insert Immediate ops for (unsigned i = 103; i < 105; ++i) { q.enqueue(create_item(i, client1, op_scheduler_class::immediate)); std::this_thread::sleep_for(std::chrono::microseconds(1)); @@ -232,7 +232,7 @@ TEST_F(mClockSchedulerTest, TestAllQueuesEnqueueDequeue) { ASSERT_FALSE(q.empty()); auto r = get_item(q.dequeue()); - // Immediate queue should be dequeued first + // Ops classified as Immediate should be dequeued first ASSERT_EQ(103u, r.get_map_epoch()); r = get_item(q.dequeue()); ASSERT_EQ(104u, r.get_map_epoch());