Merge pull request #49482 from amathuria/wip-amathuria-mclock-high-prio-queue

mClock: Add ability to handle high priority operations
This commit is contained in:
Laura Flores 2023-02-27 17:55:43 -06:00 committed by GitHub
commit fac345d9fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 167 additions and 14 deletions

View File

@ -414,7 +414,7 @@ void mClockScheduler::dump(ceph::Formatter &f) const
{
// Display queue sizes
f.open_object_section("queue_sizes");
f.dump_int("immediate", immediate.size());
f.dump_int("high_priority_queue", high_priority.size());
f.dump_int("scheduler", scheduler.request_count());
f.close_section();
@ -430,15 +430,27 @@ void mClockScheduler::dump(ceph::Formatter &f) const
f.open_object_section("mClockQueues");
f.dump_string("queues", display_queues());
f.close_section();
f.open_object_section("HighPriorityQueue");
for (auto it = high_priority.begin();
it != high_priority.end(); it++) {
f.dump_int("priority", it->first);
f.dump_int("queue_size", it->second.size());
}
f.close_section();
}
void mClockScheduler::enqueue(OpSchedulerItem&& item)
{
auto id = get_scheduler_id(item);
unsigned priority = item.get_priority();
unsigned cutoff = get_io_prio_cut(cct);
// TODO: move this check into OpSchedulerItem, handle backwards compat
if (op_scheduler_class::immediate == id.class_id) {
immediate.push_front(std::move(item));
enqueue_high(immediate_class_priority, std::move(item));
} else if (priority >= cutoff) {
enqueue_high(priority, std::move(item));
} else {
int cost = calc_scaled_cost(item.get_cost());
item.set_qos_cost(cost);
@ -455,7 +467,8 @@ void mClockScheduler::enqueue(OpSchedulerItem&& item)
}
dout(20) << __func__ << " client_count: " << scheduler.client_count()
<< " queue_sizes: [ imm: " << immediate.size()
<< " queue_sizes: [ "
<< " high_priority_queue: " << high_priority.size()
<< " sched: " << scheduler.request_count() << " ]"
<< dendl;
dout(30) << __func__ << " mClockClients: "
@ -468,17 +481,46 @@ void mClockScheduler::enqueue(OpSchedulerItem&& item)
void mClockScheduler::enqueue_front(OpSchedulerItem&& item)
{
immediate.push_back(std::move(item));
// TODO: item may not be immediate, update mclock machinery to permit
// putting the item back in the queue
unsigned priority = item.get_priority();
unsigned cutoff = get_io_prio_cut(cct);
auto id = get_scheduler_id(item);
if (op_scheduler_class::immediate == id.class_id) {
enqueue_high(immediate_class_priority, std::move(item), true);
} else if (priority >= cutoff) {
enqueue_high(priority, std::move(item), true);
} else {
// mClock does not support enqueue at front, so we use
// the high queue with priority 0
enqueue_high(0, std::move(item), true);
}
}
void mClockScheduler::enqueue_high(unsigned priority,
OpSchedulerItem&& item,
bool front)
{
if (front) {
high_priority[priority].push_back(std::move(item));
} else {
high_priority[priority].push_front(std::move(item));
}
}
WorkItem mClockScheduler::dequeue()
{
if (!immediate.empty()) {
WorkItem work_item{std::move(immediate.back())};
immediate.pop_back();
return work_item;
if (!high_priority.empty()) {
auto iter = high_priority.begin();
// invariant: high_priority entries are never empty
assert(!iter->second.empty());
WorkItem ret{std::move(iter->second.back())};
iter->second.pop_back();
if (iter->second.empty()) {
// maintain invariant, high priority entries are never empty
high_priority.erase(iter);
}
ceph_assert(std::get_if<OpSchedulerItem>(&ret));
return ret;
} else {
mclock_queue_t::PullReq result = scheduler.pull_request();
if (result.is_future()) {

View File

@ -15,6 +15,7 @@
#pragma once
#include <functional>
#include <ostream>
#include <map>
#include <vector>
@ -133,8 +134,19 @@ class mClockScheduler : public OpScheduler, md_config_obs_t {
true,
true,
2>;
using priority_t = unsigned;
using SubQueue = std::map<priority_t,
std::list<OpSchedulerItem>,
std::greater<priority_t>>;
mclock_queue_t scheduler;
std::list<OpSchedulerItem> immediate;
/**
* high_priority
*
* Holds entries to be dequeued in strict order ahead of mClock
* Invariant: entries are never empty
*/
SubQueue high_priority;
priority_t immediate_class_priority = std::numeric_limits<priority_t>::max();
static scheduler_id_t get_scheduler_id(const OpSchedulerItem &item) {
return scheduler_id_t{
@ -146,6 +158,19 @@ class mClockScheduler : public OpScheduler, md_config_obs_t {
};
}
static unsigned int get_io_prio_cut(CephContext *cct) {
if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
std::random_device rd;
std::mt19937 random_gen(rd());
return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
} else if (cct->_conf->osd_op_queue_cut_off == "high") {
return CEPH_MSG_PRIO_HIGH;
} else {
// default / catch-all is 'low'
return CEPH_MSG_PRIO_LOW;
}
}
public:
mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards,
int shard_id, bool is_rotational, MonClient *monc);
@ -190,7 +215,7 @@ public:
// Enqueue op in the back of the regular queue
void enqueue(OpSchedulerItem &&item) final;
// Enqueue the op in the front of the regular queue
// Enqueue the op in the front of the high priority queue
void enqueue_front(OpSchedulerItem &&item) final;
// Return an op to be dispatch
@ -198,7 +223,7 @@ public:
// Returns if the queue is empty
bool empty() const final {
return immediate.empty() && scheduler.empty();
return scheduler.empty() && high_priority.empty();
}
// Formatted output of the queue
@ -214,6 +239,9 @@ public:
const char** get_tracked_conf_keys() const final;
void handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed) final;
private:
// Enqueue the op to the high priority queue
void enqueue_high(unsigned prio, OpSchedulerItem &&item, bool front = false);
};
}

View File

@ -89,6 +89,18 @@ OpSchedulerItem create_item(
utime_t(), owner, e);
}
template <typename... Args>
OpSchedulerItem create_high_prio_item(
unsigned priority, epoch_t e, uint64_t owner, Args&&... args)
{
// Create high priority item for testing high prio queue
return OpSchedulerItem(
std::make_unique<mClockSchedulerTest::MockDmclockItem>(
std::forward<Args>(args)...),
12, priority,
utime_t(), owner, e);
}
OpSchedulerItem get_item(WorkItem item)
{
return std::move(std::get<OpSchedulerItem>(item));
@ -175,3 +187,74 @@ TEST_F(mClockSchedulerTest, TestMultiClientOrderedEnqueueDequeue) {
}
ASSERT_TRUE(q.empty());
}
TEST_F(mClockSchedulerTest, TestHighPriorityQueueEnqueueDequeue) {
ASSERT_TRUE(q.empty());
for (unsigned i = 200; i < 205; ++i) {
q.enqueue(create_high_prio_item(i, i, client1, op_scheduler_class::client));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
ASSERT_FALSE(q.empty());
// Higher priority ops should be dequeued first
auto r = get_item(q.dequeue());
ASSERT_EQ(204u, r.get_map_epoch());
r = get_item(q.dequeue());
ASSERT_EQ(203u, r.get_map_epoch());
r = get_item(q.dequeue());
ASSERT_EQ(202u, r.get_map_epoch());
r = get_item(q.dequeue());
ASSERT_EQ(201u, r.get_map_epoch());
r = get_item(q.dequeue());
ASSERT_EQ(200u, r.get_map_epoch());
ASSERT_TRUE(q.empty());
}
TEST_F(mClockSchedulerTest, TestAllQueuesEnqueueDequeue) {
ASSERT_TRUE(q.empty());
// Insert ops into the mClock queue
for (unsigned i = 100; i < 102; ++i) {
q.enqueue(create_item(i, client1, op_scheduler_class::client));
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
// Insert Immediate ops
for (unsigned i = 103; i < 105; ++i) {
q.enqueue(create_item(i, client1, op_scheduler_class::immediate));
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
// Insert ops into the high queue
for (unsigned i = 200; i < 202; ++i) {
q.enqueue(create_high_prio_item(i, i, client1, op_scheduler_class::client));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
ASSERT_FALSE(q.empty());
auto r = get_item(q.dequeue());
// Ops classified as Immediate should be dequeued first
ASSERT_EQ(103u, r.get_map_epoch());
r = get_item(q.dequeue());
ASSERT_EQ(104u, r.get_map_epoch());
// High priority queue should be dequeued second
// higher priority operation first
r = get_item(q.dequeue());
ASSERT_EQ(201u, r.get_map_epoch());
r = get_item(q.dequeue());
ASSERT_EQ(200u, r.get_map_epoch());
// mClock queue will be dequeued last
r = get_item(q.dequeue());
ASSERT_EQ(100u, r.get_map_epoch());
r = get_item(q.dequeue());
ASSERT_EQ(101u, r.get_map_epoch());
ASSERT_TRUE(q.empty());
}