1
0
mirror of https://github.com/ceph/ceph synced 2025-04-01 23:02:17 +00:00

osd/PG: Add two new mClock implementations of the PG sharded operator queue

Create an mClock priority queue, which can in turn be used for two new
implementations of the PG shards operator queue. The first
(mClockOpClassQueue) prioritizes operations based on which class they
belong to (recovery, scrub, snaptrim, client op, osd subop). The
second (mClockClientQueue) also incorporates the client identifier, in
order to promote fairness between clients.

In addition, also remove OpQueue's remove_by_filter and all possible
associated subclass implementations and tests.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
This commit is contained in:
J. Eric Ivancich 2017-06-07 14:44:13 -04:00
parent 503de20927
commit bf70c4b9b0
21 changed files with 1946 additions and 397 deletions

View File

@ -37,10 +37,11 @@ class OpQueue {
public:
// How many Ops are in the queue
virtual unsigned length() const = 0;
// Ops will be removed f evaluates to true, f may have sideeffects
virtual void remove_by_filter(
std::function<bool (T)> f) = 0;
// Ops of this priority should be deleted immediately
// Ops of this class should be deleted immediately. If out isn't
// nullptr then items should be added to the front in
// front-to-back order. The typical strategy is to visit items in
// the queue in *reverse* order and to use *push_front* to insert
// them into out.
virtual void remove_by_class(K k, std::list<T> *out) = 0;
// Enqueue op in the back of the strict queue
virtual void enqueue_strict(K cl, unsigned priority, T item) = 0;

View File

@ -45,24 +45,6 @@ class PrioritizedQueue : public OpQueue <T, K> {
int64_t min_cost;
typedef std::list<std::pair<unsigned, T> > ListPairs;
static unsigned filter_list_pairs(
ListPairs *l,
std::function<bool (T)> f) {
unsigned ret = 0;
for (typename ListPairs::iterator i = l->end();
i != l->begin();
) {
auto next = i;
--next;
if (f(next->second)) {
++ret;
l->erase(next);
} else {
i = next;
}
}
return ret;
}
struct SubQueue {
private:
@ -142,24 +124,6 @@ class PrioritizedQueue : public OpQueue <T, K> {
bool empty() const {
return q.empty();
}
void remove_by_filter(
std::function<bool (T)> f) {
for (typename Classes::iterator i = q.begin();
i != q.end();
) {
size -= filter_list_pairs(&(i->second), f);
if (i->second.empty()) {
if (cur == i) {
++cur;
}
q.erase(i++);
} else {
++i;
}
}
if (cur == q.end())
cur = q.begin();
}
void remove_by_class(K k, std::list<T> *out) {
typename Classes::iterator i = q.find(k);
if (i == q.end()) {
@ -251,33 +215,6 @@ public:
return total;
}
void remove_by_filter(
std::function<bool (T)> f) final {
for (typename SubQueues::iterator i = queue.begin();
i != queue.end();
) {
unsigned priority = i->first;
i->second.remove_by_filter(f);
if (i->second.empty()) {
++i;
remove_queue(priority);
} else {
++i;
}
}
for (typename SubQueues::iterator i = high_queue.begin();
i != high_queue.end();
) {
i->second.remove_by_filter(f);
if (i->second.empty()) {
high_queue.erase(i++);
} else {
++i;
}
}
}
void remove_by_class(K k, std::list<T> *out = 0) final {
for (typename SubQueues::iterator i = queue.begin();
i != queue.end();

View File

@ -99,22 +99,6 @@ class WeightedPriorityQueue : public OpQueue <T, K>
unsigned get_size() const {
return lp.size();
}
unsigned filter_list_pairs(std::function<bool (T)>& f) {
unsigned count = 0;
// intrusive containers can't erase with a reverse_iterator
// so we have to walk backwards on our own. Since there is
// no iterator before begin, we have to test at the end.
for (Lit i = --lp.end();; --i) {
if (f(i->item)) {
i = lp.erase_and_dispose(i, DelItem<ListPair>());
++count;
}
if (i == lp.begin()) {
break;
}
}
return count;
}
unsigned filter_class(std::list<T>* out) {
unsigned count = 0;
for (Lit i = --lp.end();; --i) {
@ -180,25 +164,6 @@ class WeightedPriorityQueue : public OpQueue <T, K>
check_end();
return ret;
}
unsigned filter_list_pairs(std::function<bool (T)>& f) {
unsigned count = 0;
// intrusive containers can't erase with a reverse_iterator
// so we have to walk backwards on our own. Since there is
// no iterator before begin, we have to test at the end.
for (Kit i = klasses.begin(); i != klasses.end();) {
count += i->filter_list_pairs(f);
if (i->empty()) {
if (next == i) {
++next;
}
i = klasses.erase_and_dispose(i, DelItem<Klass>());
} else {
++i;
}
}
check_end();
return count;
}
unsigned filter_class(K& cl, std::list<T>* out) {
unsigned count = 0;
Kit i = klasses.find(cl, MapKey<Klass, K>());
@ -291,17 +256,6 @@ class WeightedPriorityQueue : public OpQueue <T, K>
}
return ret;
}
void filter_list_pairs(std::function<bool (T)>& f) {
for (Sit i = queues.begin(); i != queues.end();) {
size -= i->filter_list_pairs(f);
if (i->empty()) {
total_prio -= i->key;
i = queues.erase_and_dispose(i, DelItem<SubQueue>());
} else {
++i;
}
}
}
void filter_class(K& cl, std::list<T>* out) {
for (Sit i = queues.begin(); i != queues.end();) {
size -= i->filter_class(cl, out);
@ -338,10 +292,6 @@ class WeightedPriorityQueue : public OpQueue <T, K>
unsigned length() const final {
return strict.size + normal.size;
}
void remove_by_filter(std::function<bool (T)> f) final {
strict.filter_list_pairs(f);
normal.filter_list_pairs(f);
}
void remove_by_class(K cl, std::list<T>* removed = 0) final {
strict.filter_class(cl, removed);
normal.filter_class(cl, removed);

View File

@ -768,9 +768,35 @@ OPTION(osd_op_num_threads_per_shard_ssd, OPT_INT, 2)
OPTION(osd_op_num_shards, OPT_INT, 0)
OPTION(osd_op_num_shards_hdd, OPT_INT, 5)
OPTION(osd_op_num_shards_ssd, OPT_INT, 8)
OPTION(osd_op_queue, OPT_STR, "wpq") // PrioritzedQueue (prio), Weighted Priority Queue (wpq), or debug_random
// PrioritzedQueue (prio), Weighted Priority Queue (wpq ; default),
// mclock_opclass, mclock_client, or debug_random. "mclock_opclass"
// and "mclock_client" are based on the mClock/dmClock algorithm
// (Gulati, et al. 2010). "mclock_opclass" prioritizes based on the
// class the operation belongs to. "mclock_client" does the same but
// also works to ienforce fairness between clients. "debug_random"
// chooses among all four with equal probability.
OPTION(osd_op_queue, OPT_STR, "wpq")
OPTION(osd_op_queue_cut_off, OPT_STR, "low") // Min priority to go to strict queue. (low, high, debug_random)
// mClock priority queue parameters for five types of ops
OPTION(osd_op_queue_mclock_client_op_res, OPT_DOUBLE, 1000.0)
OPTION(osd_op_queue_mclock_client_op_wgt, OPT_DOUBLE, 500.0)
OPTION(osd_op_queue_mclock_client_op_lim, OPT_DOUBLE, 0.0)
OPTION(osd_op_queue_mclock_osd_subop_res, OPT_DOUBLE, 1000.0)
OPTION(osd_op_queue_mclock_osd_subop_wgt, OPT_DOUBLE, 500.0)
OPTION(osd_op_queue_mclock_osd_subop_lim, OPT_DOUBLE, 0.0)
OPTION(osd_op_queue_mclock_snap_res, OPT_DOUBLE, 0.0)
OPTION(osd_op_queue_mclock_snap_wgt, OPT_DOUBLE, 1.0)
OPTION(osd_op_queue_mclock_snap_lim, OPT_DOUBLE, 0.001)
OPTION(osd_op_queue_mclock_recov_res, OPT_DOUBLE, 0.0)
OPTION(osd_op_queue_mclock_recov_wgt, OPT_DOUBLE, 1.0)
OPTION(osd_op_queue_mclock_recov_lim, OPT_DOUBLE, 0.001)
OPTION(osd_op_queue_mclock_scrub_res, OPT_DOUBLE, 0.0)
OPTION(osd_op_queue_mclock_scrub_wgt, OPT_DOUBLE, 1.0)
OPTION(osd_op_queue_mclock_scrub_lim, OPT_DOUBLE, 0.001)
OPTION(osd_ignore_stale_divergent_priors, OPT_BOOL, false) // do not assert on divergent_prior entries which aren't in the log and whose on-disk objects are newer
// Set to true for testing. Users should NOT set this.

View File

@ -0,0 +1,361 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <functional>
#include <map>
#include <list>
#include <cmath>
#include "common/Formatter.h"
#include "common/OpQueue.h"
#include "dmclock/src/dmclock_server.h"
// the following is done to unclobber _ASSERT_H so it returns to the
// way ceph likes it
#include "include/assert.h"
namespace ceph {
namespace dmc = crimson::dmclock;
template <typename T, typename K>
class mClockQueue : public OpQueue <T, K> {
using priority_t = unsigned;
using cost_t = unsigned;
typedef std::list<std::pair<cost_t, T> > ListPairs;
static unsigned filter_list_pairs(ListPairs *l,
std::function<bool (const T&)> f,
std::list<T>* out = nullptr) {
unsigned ret = 0;
for (typename ListPairs::iterator i = l->end();
i != l->begin();
/* no inc */
) {
auto next = i;
--next;
if (f(next->second)) {
++ret;
if (out) out->push_back(next->second);
l->erase(next);
} else {
i = next;
}
}
return ret;
}
struct SubQueue {
private:
typedef std::map<K, ListPairs> Classes;
// client-class to ordered queue
Classes q;
unsigned tokens, max_tokens;
int64_t size;
typename Classes::iterator cur;
public:
SubQueue(const SubQueue &other)
: q(other.q),
tokens(other.tokens),
max_tokens(other.max_tokens),
size(other.size),
cur(q.begin()) {}
SubQueue()
: tokens(0),
max_tokens(0),
size(0), cur(q.begin()) {}
void set_max_tokens(unsigned mt) {
max_tokens = mt;
}
unsigned get_max_tokens() const {
return max_tokens;
}
unsigned num_tokens() const {
return tokens;
}
void put_tokens(unsigned t) {
tokens += t;
if (tokens > max_tokens) {
tokens = max_tokens;
}
}
void take_tokens(unsigned t) {
if (tokens > t) {
tokens -= t;
} else {
tokens = 0;
}
}
void enqueue(K cl, cost_t cost, T item) {
q[cl].push_back(std::make_pair(cost, item));
if (cur == q.end())
cur = q.begin();
size++;
}
void enqueue_front(K cl, cost_t cost, T item) {
q[cl].push_front(std::make_pair(cost, item));
if (cur == q.end())
cur = q.begin();
size++;
}
std::pair<cost_t, T> front() const {
assert(!(q.empty()));
assert(cur != q.end());
return cur->second.front();
}
void pop_front() {
assert(!(q.empty()));
assert(cur != q.end());
cur->second.pop_front();
if (cur->second.empty()) {
auto i = cur;
++cur;
q.erase(i);
} else {
++cur;
}
if (cur == q.end()) {
cur = q.begin();
}
size--;
}
unsigned length() const {
assert(size >= 0);
return (unsigned)size;
}
bool empty() const {
return q.empty();
}
void remove_by_filter(std::function<bool (const T&)> f) {
for (typename Classes::iterator i = q.begin();
i != q.end();
/* no-inc */) {
size -= filter_list_pairs(&(i->second), f);
if (i->second.empty()) {
if (cur == i) {
++cur;
}
i = q.erase(i);
} else {
++i;
}
}
if (cur == q.end()) cur = q.begin();
}
void remove_by_class(K k, std::list<T> *out) {
typename Classes::iterator i = q.find(k);
if (i == q.end()) {
return;
}
size -= i->second.size();
if (i == cur) {
++cur;
}
if (out) {
for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) {
out->push_front(j->second);
}
}
q.erase(i);
if (cur == q.end()) cur = q.begin();
}
void dump(ceph::Formatter *f) const {
f->dump_int("size", size);
f->dump_int("num_keys", q.size());
}
};
using SubQueues = std::map<priority_t, SubQueue>;
SubQueues high_queue;
dmc::PullPriorityQueue<K,T> queue;
// when enqueue_front is called, rather than try to re-calc tags
// to put in mClock priority queue, we'll just keep a separate
// list from which we dequeue items first, and only when it's
// empty do we use queue.
std::list<std::pair<K,T>> queue_front;
public:
mClockQueue(
const typename dmc::PullPriorityQueue<K,T>::ClientInfoFunc& info_func) :
queue(info_func, true)
{
// empty
}
unsigned length() const override final {
unsigned total = 0;
total += queue_front.size();
total += queue.request_count();
for (auto i = high_queue.cbegin(); i != high_queue.cend(); ++i) {
assert(i->second.length());
total += i->second.length();
}
return total;
}
// be sure to do things in reverse priority order and push_front
// to the list so items end up on list in front-to-back priority
// order
void remove_by_filter(std::function<bool (const T&)> filter_accum) {
queue.remove_by_req_filter(filter_accum, true);
for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) {
if (filter_accum(i->second)) {
i = decltype(i){ queue_front.erase(std::next(i).base()) };
} else {
++i;
}
}
for (typename SubQueues::iterator i = high_queue.begin();
i != high_queue.end();
/* no-inc */ ) {
i->second.remove_by_filter(filter_accum);
if (i->second.empty()) {
i = high_queue.erase(i);
} else {
++i;
}
}
}
void remove_by_class(K k, std::list<T> *out = nullptr) override final {
if (out) {
queue.remove_by_client(k,
true,
[&out] (const T& t) { out->push_front(t); });
} else {
queue.remove_by_client(k, true);
}
for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) {
if (k == i->first) {
if (nullptr != out) out->push_front(i->second);
i = decltype(i){ queue_front.erase(std::next(i).base()) };
} else {
++i;
}
}
for (auto i = high_queue.begin(); i != high_queue.end(); /* no-inc */) {
i->second.remove_by_class(k, out);
if (i->second.empty()) {
i = high_queue.erase(i);
} else {
++i;
}
}
}
void enqueue_strict(K cl, unsigned priority, T item) override final {
high_queue[priority].enqueue(cl, 0, item);
}
void enqueue_strict_front(K cl, unsigned priority, T item) override final {
high_queue[priority].enqueue_front(cl, 0, item);
}
void enqueue(K cl, unsigned priority, unsigned cost, T item) override final {
// priority is ignored
queue.add_request(item, cl, cost);
}
void enqueue_front(K cl,
unsigned priority,
unsigned cost,
T item) override final {
queue_front.emplace_front(std::pair<K,T>(cl, item));
}
bool empty() const override final {
return queue.empty() && high_queue.empty() && queue_front.empty();
}
T dequeue() override final {
assert(!empty());
if (!(high_queue.empty())) {
T ret = high_queue.rbegin()->second.front().second;
high_queue.rbegin()->second.pop_front();
if (high_queue.rbegin()->second.empty()) {
high_queue.erase(high_queue.rbegin()->first);
}
return ret;
}
if (!queue_front.empty()) {
T ret = queue_front.front().second;
queue_front.pop_front();
return ret;
}
auto pr = queue.pull_request();
assert(pr.is_retn());
auto& retn = pr.get_retn();
return *(retn.request);
}
void dump(ceph::Formatter *f) const override final {
f->open_array_section("high_queues");
for (typename SubQueues::const_iterator p = high_queue.begin();
p != high_queue.end();
++p) {
f->open_object_section("subqueue");
f->dump_int("priority", p->first);
p->second.dump(f);
f->close_section();
}
f->close_section();
f->open_object_section("queue_front");
f->dump_int("size", queue_front.size());
f->close_section();
f->open_object_section("queue");
f->dump_int("size", queue.request_count());
f->close_section();
} // dump
};
} // namespace ceph

View File

@ -29,6 +29,9 @@ set(osd_srcs
osd_types.cc
ECUtil.cc
ExtentCache.cc
mClockOpClassQueue.cc
mClockClientQueue.cc
PGQueueable.cc
${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
${osd_cyg_functions_src}
${osdc_osd_srcs})
@ -40,7 +43,7 @@ add_library(osd STATIC ${osd_srcs}
$<TARGET_OBJECTS:cls_references_objs>
$<TARGET_OBJECTS:global_common_objs>
$<TARGET_OBJECTS:heap_profiler_objs>)
target_link_libraries(osd ${LEVELDB_LIBRARIES} ${CMAKE_DL_LIBS} ${ALLOC_LIBS})
target_link_libraries(osd ${LEVELDB_LIBRARIES} dmclock ${CMAKE_DL_LIBS} ${ALLOC_LIBS})
if(WITH_LTTNG)
add_dependencies(osd osd-tp pg-tp)
endif()

View File

@ -166,22 +166,6 @@ static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) {
return *_dout << "osd." << whoami << " " << epoch << " ";
}
void PGQueueable::RunVis::operator()(const OpRequestRef &op) {
return osd->dequeue_op(pg, op, handle);
}
void PGQueueable::RunVis::operator()(const PGSnapTrim &op) {
return pg->snap_trimmer(op.epoch_queued);
}
void PGQueueable::RunVis::operator()(const PGScrub &op) {
return pg->scrub(op.epoch_queued, handle);
}
void PGQueueable::RunVis::operator()(const PGRecovery &op) {
return osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle);
}
//Initial features in new superblock.
//Features here are also automatically upgraded
CompatSet OSD::get_osd_initial_compat_set() {
@ -9726,6 +9710,7 @@ void OSD::PeeringWQ::_dequeue(list<PG*> *out) {
in_use.insert(out->begin(), out->end());
}
// =============================================================
#undef dout_context
@ -10118,3 +10103,21 @@ int heap(CephContext& cct, cmdmap_t& cmdmap, Formatter& f, std::ostream& os)
}} // namespace ceph::osd_cmds
std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q) {
switch(q) {
case OSD::io_queue::prioritized:
out << "prioritized";
break;
case OSD::io_queue::weightedpriority:
out << "weightedpriority";
break;
case OSD::io_queue::mclock_opclass:
out << "mclock_opclass";
break;
case OSD::io_queue::mclock_client:
out << "mclock_client";
break;
}
return out;
}

View File

@ -40,6 +40,8 @@
#include "OpRequest.h"
#include "Session.h"
#include "osd/PGQueueable.h"
#include <atomic>
#include <map>
#include <memory>
@ -53,6 +55,8 @@ using namespace std;
#include "common/sharedptr_registry.hpp"
#include "common/WeightedPriorityQueue.h"
#include "common/PrioritizedQueue.h"
#include "osd/mClockOpClassQueue.h"
#include "osd/mClockClientQueue.h"
#include "messages/MOSDOp.h"
#include "include/Spinlock.h"
#include "common/EventTrace.h"
@ -337,123 +341,6 @@ typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
class OSD;
struct PGScrub {
epoch_t epoch_queued;
explicit PGScrub(epoch_t e) : epoch_queued(e) {}
ostream &operator<<(ostream &rhs) {
return rhs << "PGScrub";
}
};
struct PGSnapTrim {
epoch_t epoch_queued;
explicit PGSnapTrim(epoch_t e) : epoch_queued(e) {}
ostream &operator<<(ostream &rhs) {
return rhs << "PGSnapTrim";
}
};
struct PGRecovery {
epoch_t epoch_queued;
uint64_t reserved_pushes;
PGRecovery(epoch_t e, uint64_t reserved_pushes)
: epoch_queued(e), reserved_pushes(reserved_pushes) {}
ostream &operator<<(ostream &rhs) {
return rhs << "PGRecovery(epoch=" << epoch_queued
<< ", reserved_pushes: " << reserved_pushes << ")";
}
};
class PGQueueable {
typedef boost::variant<
OpRequestRef,
PGSnapTrim,
PGScrub,
PGRecovery
> QVariant;
QVariant qvariant;
int cost;
unsigned priority;
utime_t start_time;
entity_inst_t owner;
epoch_t map_epoch; ///< an epoch we expect the PG to exist in
struct RunVis : public boost::static_visitor<> {
OSD *osd;
PGRef &pg;
ThreadPool::TPHandle &handle;
RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
: osd(osd), pg(pg), handle(handle) {}
void operator()(const OpRequestRef &op);
void operator()(const PGSnapTrim &op);
void operator()(const PGScrub &op);
void operator()(const PGRecovery &op);
};
struct StringifyVis : public boost::static_visitor<std::string> {
std::string operator()(const OpRequestRef &op) {
return stringify(op);
}
std::string operator()(const PGSnapTrim &op) {
return "PGSnapTrim";
}
std::string operator()(const PGScrub &op) {
return "PGScrub";
}
std::string operator()(const PGRecovery &op) {
return "PGRecovery";
}
};
friend ostream& operator<<(ostream& out, const PGQueueable& q) {
StringifyVis v;
return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant)
<< " prio " << q.priority << " cost " << q.cost
<< " e" << q.map_epoch << ")";
}
public:
// cppcheck-suppress noExplicitConstructor
PGQueueable(OpRequestRef op, epoch_t e)
: qvariant(op), cost(op->get_req()->get_cost()),
priority(op->get_req()->get_priority()),
start_time(op->get_req()->get_recv_stamp()),
owner(op->get_req()->get_source_inst()),
map_epoch(e)
{}
PGQueueable(
const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
const entity_inst_t &owner, epoch_t e)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner), map_epoch(e) {}
PGQueueable(
const PGScrub &op, int cost, unsigned priority, utime_t start_time,
const entity_inst_t &owner, epoch_t e)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner), map_epoch(e) {}
PGQueueable(
const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
const entity_inst_t &owner, epoch_t e)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner), map_epoch(e) {}
const boost::optional<OpRequestRef> maybe_get_op() const {
const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
}
uint64_t get_reserved_pushes() const {
const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
return op ? op->reserved_pushes : 0;
}
void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
RunVis v(osd, pg, handle);
boost::apply_visitor(v, qvariant);
}
unsigned get_priority() const { return priority; }
int get_cost() const { return cost; }
utime_t get_start_time() const { return start_time; }
entity_inst_t get_owner() const { return owner; }
epoch_t get_map_epoch() const { return map_epoch; }
};
class OSDService {
public:
OSD *osd;
@ -1694,10 +1581,14 @@ private:
friend struct C_OpenPGs;
// -- op queue --
enum io_queue {
enum class io_queue {
prioritized,
weightedpriority
weightedpriority,
mclock_opclass,
mclock_client,
};
friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
const io_queue op_queue;
const unsigned int op_prio_cutoff;
@ -1722,6 +1613,7 @@ private:
* and already requeued the items.
*/
friend class PGQueueable;
class ShardedOpWQ
: public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
{
@ -1774,19 +1666,25 @@ private:
: sdata_lock(lock_name.c_str(), false, true, false, cct),
sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
false, cct) {
if (opqueue == weightedpriority) {
if (opqueue == io_queue::weightedpriority) {
pqueue = std::unique_ptr
<WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
max_tok_per_prio, min_cost));
} else if (opqueue == prioritized) {
} else if (opqueue == io_queue::prioritized) {
pqueue = std::unique_ptr
<PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
max_tok_per_prio, min_cost));
} else if (opqueue == io_queue::mclock_opclass) {
pqueue = std::unique_ptr
<ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct));
} else if (opqueue == io_queue::mclock_client) {
pqueue = std::unique_ptr
<ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct));
}
}
};
}; // struct ShardData
vector<ShardData*> shard_list;
OSD *osd;
@ -1977,7 +1875,7 @@ private:
OSDMapRef get_osdmap() {
return osdmap;
}
epoch_t get_osdmap_epoch() {
epoch_t get_osdmap_epoch() const {
return osdmap ? osdmap->get_epoch() : 0;
}
@ -2368,7 +2266,7 @@ protected:
}
} remove_wq;
private:
private:
bool ms_can_fast_dispatch_any() const override { return true; }
bool ms_can_fast_dispatch(const Message *m) const override {
switch (m->get_type()) {
@ -2414,12 +2312,21 @@ protected:
io_queue get_io_queue() const {
if (cct->_conf->osd_op_queue == "debug_random") {
static io_queue index_lookup[] = { io_queue::prioritized,
io_queue::weightedpriority,
io_queue::mclock_opclass,
io_queue::mclock_client };
srand(time(NULL));
return (rand() % 2 < 1) ? prioritized : weightedpriority;
unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
return index_lookup[which];
} else if (cct->_conf->osd_op_queue == "wpq") {
return weightedpriority;
return io_queue::weightedpriority;
} else if (cct->_conf->osd_op_queue == "mclock_opclass") {
return io_queue::mclock_opclass;
} else if (cct->_conf->osd_op_queue == "mclock_client") {
return io_queue::mclock_client;
} else {
return prioritized;
return io_queue::prioritized;
}
}
@ -2507,9 +2414,13 @@ public:
friend class OSDService;
};
std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
//compatibility of the executable
extern const CompatSet::Feature ceph_osd_feature_compat[];
extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
extern const CompatSet::Feature ceph_osd_feature_incompat[];
#endif
#endif // CEPH_OSD_H

35
src/osd/PGQueueable.cc Normal file
View File

@ -0,0 +1,35 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "PG.h"
#include "PGQueueable.h"
#include "OSD.h"
void PGQueueable::RunVis::operator()(const OpRequestRef &op) {
osd->dequeue_op(pg, op, handle);
}
void PGQueueable::RunVis::operator()(const PGSnapTrim &op) {
pg->snap_trimmer(op.epoch_queued);
}
void PGQueueable::RunVis::operator()(const PGScrub &op) {
pg->scrub(op.epoch_queued, handle);
}
void PGQueueable::RunVis::operator()(const PGRecovery &op) {
osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle);
}

148
src/osd/PGQueueable.h Normal file
View File

@ -0,0 +1,148 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <ostream>
#include "include/types.h"
#include "include/utime.h"
#include "osd/OpRequest.h"
#include "osd/PG.h"
class OSD;
struct PGScrub {
epoch_t epoch_queued;
explicit PGScrub(epoch_t e) : epoch_queued(e) {}
ostream &operator<<(ostream &rhs) {
return rhs << "PGScrub";
}
};
struct PGSnapTrim {
epoch_t epoch_queued;
explicit PGSnapTrim(epoch_t e) : epoch_queued(e) {}
ostream &operator<<(ostream &rhs) {
return rhs << "PGSnapTrim";
}
};
struct PGRecovery {
epoch_t epoch_queued;
uint64_t reserved_pushes;
PGRecovery(epoch_t e, uint64_t reserved_pushes)
: epoch_queued(e), reserved_pushes(reserved_pushes) {}
ostream &operator<<(ostream &rhs) {
return rhs << "PGRecovery(epoch=" << epoch_queued
<< ", reserved_pushes: " << reserved_pushes << ")";
}
};
class PGQueueable {
typedef boost::variant<
OpRequestRef,
PGSnapTrim,
PGScrub,
PGRecovery
> QVariant;
QVariant qvariant;
int cost;
unsigned priority;
utime_t start_time;
entity_inst_t owner;
epoch_t map_epoch; ///< an epoch we expect the PG to exist in
struct RunVis : public boost::static_visitor<> {
OSD *osd;
PGRef &pg;
ThreadPool::TPHandle &handle;
RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
: osd(osd), pg(pg), handle(handle) {}
void operator()(const OpRequestRef &op);
void operator()(const PGSnapTrim &op);
void operator()(const PGScrub &op);
void operator()(const PGRecovery &op);
}; // struct RunVis
struct StringifyVis : public boost::static_visitor<std::string> {
std::string operator()(const OpRequestRef &op) {
return stringify(op);
}
std::string operator()(const PGSnapTrim &op) {
return "PGSnapTrim";
}
std::string operator()(const PGScrub &op) {
return "PGScrub";
}
std::string operator()(const PGRecovery &op) {
return "PGRecovery";
}
};
friend ostream& operator<<(ostream& out, const PGQueueable& q) {
StringifyVis v;
return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant)
<< " prio " << q.priority << " cost " << q.cost
<< " e" << q.map_epoch << ")";
}
public:
PGQueueable(OpRequestRef op, epoch_t e)
: qvariant(op), cost(op->get_req()->get_cost()),
priority(op->get_req()->get_priority()),
start_time(op->get_req()->get_recv_stamp()),
owner(op->get_req()->get_source_inst()),
map_epoch(e)
{}
PGQueueable(
const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
const entity_inst_t &owner, epoch_t e)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner), map_epoch(e) {}
PGQueueable(
const PGScrub &op, int cost, unsigned priority, utime_t start_time,
const entity_inst_t &owner, epoch_t e)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner), map_epoch(e) {}
PGQueueable(
const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
const entity_inst_t &owner, epoch_t e)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner), map_epoch(e) {}
const boost::optional<OpRequestRef> maybe_get_op() const {
const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
}
uint64_t get_reserved_pushes() const {
const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
return op ? op->reserved_pushes : 0;
}
void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
RunVis v(osd, pg, handle);
boost::apply_visitor(v, qvariant);
}
unsigned get_priority() const { return priority; }
int get_cost() const { return cost; }
utime_t get_start_time() const { return start_time; }
entity_inst_t get_owner() const { return owner; }
epoch_t get_map_epoch() const { return map_epoch; }
const QVariant& get_variant() const { return qvariant; }
}; // struct PGQueueable

View File

@ -0,0 +1,165 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <memory>
#include "osd/mClockClientQueue.h"
#include "common/dout.h"
namespace dmc = crimson::dmclock;
#define dout_context cct
#define dout_subsys ceph_subsys_osd
#undef dout_prefix
#define dout_prefix *_dout
namespace ceph {
mClockClientQueue::mclock_op_tags_t::mclock_op_tags_t(CephContext *cct) :
client_op(cct->_conf->osd_op_queue_mclock_client_op_res,
cct->_conf->osd_op_queue_mclock_client_op_wgt,
cct->_conf->osd_op_queue_mclock_client_op_lim),
osd_subop(cct->_conf->osd_op_queue_mclock_osd_subop_res,
cct->_conf->osd_op_queue_mclock_osd_subop_wgt,
cct->_conf->osd_op_queue_mclock_osd_subop_lim),
snaptrim(cct->_conf->osd_op_queue_mclock_snap_res,
cct->_conf->osd_op_queue_mclock_snap_wgt,
cct->_conf->osd_op_queue_mclock_snap_lim),
recov(cct->_conf->osd_op_queue_mclock_recov_res,
cct->_conf->osd_op_queue_mclock_recov_wgt,
cct->_conf->osd_op_queue_mclock_recov_lim),
scrub(cct->_conf->osd_op_queue_mclock_scrub_res,
cct->_conf->osd_op_queue_mclock_scrub_wgt,
cct->_conf->osd_op_queue_mclock_scrub_lim)
{
dout(20) <<
"mClockClientQueue settings:: " <<
"client_op:" << client_op <<
"; osd_subop:" << osd_subop <<
"; snaptrim:" << snaptrim <<
"; recov:" << recov <<
"; scrub:" << scrub <<
dendl;
}
dmc::ClientInfo
mClockClientQueue::op_class_client_info_f(
const mClockClientQueue::InnerClient& client)
{
switch(client.second) {
case osd_op_type_t::client_op:
return mclock_op_tags->client_op;
case osd_op_type_t::osd_subop:
return mclock_op_tags->osd_subop;
case osd_op_type_t::bg_snaptrim:
return mclock_op_tags->snaptrim;
case osd_op_type_t::bg_recovery:
return mclock_op_tags->recov;
case osd_op_type_t::bg_scrub:
return mclock_op_tags->scrub;
default:
assert(0);
return dmc::ClientInfo(-1, -1, -1);
}
}
/*
* class mClockClientQueue
*/
std::unique_ptr<mClockClientQueue::mclock_op_tags_t>
mClockClientQueue::mclock_op_tags(nullptr);
mClockClientQueue::pg_queueable_visitor_t
mClockClientQueue::pg_queueable_visitor;
mClockClientQueue::mClockClientQueue(CephContext *cct) :
queue(&mClockClientQueue::op_class_client_info_f)
{
// manage the singleton
if (!mclock_op_tags) {
mclock_op_tags.reset(new mclock_op_tags_t(cct));
}
}
mClockClientQueue::osd_op_type_t
mClockClientQueue::get_osd_op_type(const Request& request) {
osd_op_type_t type =
boost::apply_visitor(pg_queueable_visitor, request.second.get_variant());
// if we got client_op back then we need to distinguish between
// a client op and an osd subop.
if (osd_op_type_t::client_op != type) {
return type;
} else if (MSG_OSD_SUBOP ==
boost::get<OpRequestRef>(
request.second.get_variant())->get_req()->get_header().type) {
return osd_op_type_t::osd_subop;
} else {
return osd_op_type_t::client_op;
}
}
mClockClientQueue::InnerClient
inline mClockClientQueue::get_inner_client(const Client& cl,
const Request& request) {
return InnerClient(cl, get_osd_op_type(request));
}
// Formatted output of the queue
inline void mClockClientQueue::dump(ceph::Formatter *f) const {
queue.dump(f);
}
inline void mClockClientQueue::enqueue_strict(Client cl,
unsigned priority,
Request item) {
queue.enqueue_strict(get_inner_client(cl, item), priority, item);
}
// Enqueue op in the front of the strict queue
inline void mClockClientQueue::enqueue_strict_front(Client cl,
unsigned priority,
Request item) {
queue.enqueue_strict_front(get_inner_client(cl, item), priority, item);
}
// Enqueue op in the back of the regular queue
inline void mClockClientQueue::enqueue(Client cl,
unsigned priority,
unsigned cost,
Request item) {
queue.enqueue(get_inner_client(cl, item), priority, cost, item);
}
// Enqueue the op in the front of the regular queue
inline void mClockClientQueue::enqueue_front(Client cl,
unsigned priority,
unsigned cost,
Request item) {
queue.enqueue_front(get_inner_client(cl, item), priority, cost, item);
}
// Return an op to be dispatched
inline Request mClockClientQueue::dequeue() {
return queue.dequeue();
}
} // namespace ceph

146
src/osd/mClockClientQueue.h Normal file
View File

@ -0,0 +1,146 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <ostream>
#include "boost/variant.hpp"
#include "common/config.h"
#include "common/ceph_context.h"
#include "osd/PGQueueable.h"
#include "common/mClockPriorityQueue.h"
namespace ceph {
using Request = std::pair<spg_t, PGQueueable>;
using Client = entity_inst_t;
// This class exists to bridge the ceph code, which treats the class
// as the client, and the queue, where the class is
// osd_op_type_t. So this adpater class will transform calls
// appropriately.
class mClockClientQueue : public OpQueue<Request, Client> {
enum class osd_op_type_t {
client_op, osd_subop, bg_snaptrim, bg_recovery, bg_scrub };
using InnerClient = std::pair<entity_inst_t,osd_op_type_t>;
using queue_t = mClockQueue<Request, InnerClient>;
queue_t queue;
struct mclock_op_tags_t {
crimson::dmclock::ClientInfo client_op;
crimson::dmclock::ClientInfo osd_subop;
crimson::dmclock::ClientInfo snaptrim;
crimson::dmclock::ClientInfo recov;
crimson::dmclock::ClientInfo scrub;
mclock_op_tags_t(CephContext *cct);
};
static std::unique_ptr<mclock_op_tags_t> mclock_op_tags;
public:
mClockClientQueue(CephContext *cct);
static crimson::dmclock::ClientInfo
op_class_client_info_f(const InnerClient& client);
inline unsigned length() const override final {
return queue.length();
}
// Ops of this priority should be deleted immediately
inline void remove_by_class(Client cl,
std::list<Request> *out) override final {
queue.remove_by_filter(
[&cl, out] (const Request& r) -> bool {
if (cl == r.second.get_owner()) {
out->push_front(r);
return true;
} else {
return false;
}
});
}
void enqueue_strict(Client cl,
unsigned priority,
Request item) override final;
// Enqueue op in the front of the strict queue
void enqueue_strict_front(Client cl,
unsigned priority,
Request item) override final;
// Enqueue op in the back of the regular queue
void enqueue(Client cl,
unsigned priority,
unsigned cost,
Request item) override final;
// Enqueue the op in the front of the regular queue
void enqueue_front(Client cl,
unsigned priority,
unsigned cost,
Request item) override final;
// Return an op to be dispatch
Request dequeue() override final;
// Returns if the queue is empty
inline bool empty() const override final {
return queue.empty();
}
// Formatted output of the queue
void dump(ceph::Formatter *f) const override final;
protected:
struct pg_queueable_visitor_t : public boost::static_visitor<osd_op_type_t> {
osd_op_type_t operator()(const OpRequestRef& o) const {
// don't know if it's a client_op or a
return osd_op_type_t::client_op;
}
osd_op_type_t operator()(const PGSnapTrim& o) const {
return osd_op_type_t::bg_snaptrim;
}
osd_op_type_t operator()(const PGScrub& o) const {
return osd_op_type_t::bg_scrub;
}
osd_op_type_t operator()(const PGRecovery& o) const {
return osd_op_type_t::bg_recovery;
}
}; // class pg_queueable_visitor_t
static pg_queueable_visitor_t pg_queueable_visitor;
osd_op_type_t get_osd_op_type(const Request& request);
InnerClient get_inner_client(const Client& cl, const Request& request);
}; // class mClockClientAdapter
} // namespace ceph

View File

@ -0,0 +1,123 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <memory>
#include "osd/mClockOpClassQueue.h"
#include "common/dout.h"
namespace dmc = crimson::dmclock;
#define dout_context cct
#define dout_subsys ceph_subsys_osd
#undef dout_prefix
#define dout_prefix *_dout
namespace ceph {
mClockOpClassQueue::mclock_op_tags_t::mclock_op_tags_t(CephContext *cct) :
client_op(cct->_conf->osd_op_queue_mclock_client_op_res,
cct->_conf->osd_op_queue_mclock_client_op_wgt,
cct->_conf->osd_op_queue_mclock_client_op_lim),
osd_subop(cct->_conf->osd_op_queue_mclock_osd_subop_res,
cct->_conf->osd_op_queue_mclock_osd_subop_wgt,
cct->_conf->osd_op_queue_mclock_osd_subop_lim),
snaptrim(cct->_conf->osd_op_queue_mclock_snap_res,
cct->_conf->osd_op_queue_mclock_snap_wgt,
cct->_conf->osd_op_queue_mclock_snap_lim),
recov(cct->_conf->osd_op_queue_mclock_recov_res,
cct->_conf->osd_op_queue_mclock_recov_wgt,
cct->_conf->osd_op_queue_mclock_recov_lim),
scrub(cct->_conf->osd_op_queue_mclock_scrub_res,
cct->_conf->osd_op_queue_mclock_scrub_wgt,
cct->_conf->osd_op_queue_mclock_scrub_lim)
{
dout(20) <<
"mClockOpClassQueue settings:: " <<
"client_op:" << client_op <<
"; osd_subop:" << osd_subop <<
"; snaptrim:" << snaptrim <<
"; recov:" << recov <<
"; scrub:" << scrub <<
dendl;
}
dmc::ClientInfo
mClockOpClassQueue::op_class_client_info_f(const osd_op_type_t& op_type) {
switch(op_type) {
case osd_op_type_t::client_op:
return mclock_op_tags->client_op;
case osd_op_type_t::osd_subop:
return mclock_op_tags->osd_subop;
case osd_op_type_t::bg_snaptrim:
return mclock_op_tags->snaptrim;
case osd_op_type_t::bg_recovery:
return mclock_op_tags->recov;
case osd_op_type_t::bg_scrub:
return mclock_op_tags->scrub;
default:
assert(0);
return dmc::ClientInfo(-1, -1, -1);
}
}
/*
* class mClockOpClassQueue
*/
std::unique_ptr<mClockOpClassQueue::mclock_op_tags_t>
mClockOpClassQueue::mclock_op_tags(nullptr);
mClockOpClassQueue::pg_queueable_visitor_t
mClockOpClassQueue::pg_queueable_visitor;
mClockOpClassQueue::mClockOpClassQueue(CephContext *cct) :
queue(&mClockOpClassQueue::op_class_client_info_f)
{
// manage the singleton
if (!mclock_op_tags) {
mclock_op_tags.reset(new mclock_op_tags_t(cct));
}
}
mClockOpClassQueue::osd_op_type_t
mClockOpClassQueue::get_osd_op_type(const Request& request) {
osd_op_type_t type =
boost::apply_visitor(pg_queueable_visitor, request.second.get_variant());
// if we got client_op back then we need to distinguish between
// a client op and an osd subop.
if (osd_op_type_t::client_op != type) {
return type;
} else if (MSG_OSD_SUBOP ==
boost::get<OpRequestRef>(
request.second.get_variant())->get_req()->get_header().type) {
return osd_op_type_t::osd_subop;
} else {
return osd_op_type_t::client_op;
}
}
// Formatted output of the queue
void mClockOpClassQueue::dump(ceph::Formatter *f) const {
queue.dump(f);
}
} // namespace ceph

View File

@ -0,0 +1,153 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <ostream>
#include "boost/variant.hpp"
#include "common/config.h"
#include "common/ceph_context.h"
#include "osd/PGQueueable.h"
#include "common/mClockPriorityQueue.h"
namespace ceph {
using Request = std::pair<spg_t, PGQueueable>;
using Client = entity_inst_t;
// This class exists to bridge the ceph code, which treats the class
// as the client, and the queue, where the class is
// osd_op_type_t. So this adpater class will transform calls
// appropriately.
class mClockOpClassQueue : public OpQueue<Request, Client> {
enum class osd_op_type_t {
client_op, osd_subop, bg_snaptrim, bg_recovery, bg_scrub };
using queue_t = mClockQueue<Request, osd_op_type_t>;
queue_t queue;
struct mclock_op_tags_t {
crimson::dmclock::ClientInfo client_op;
crimson::dmclock::ClientInfo osd_subop;
crimson::dmclock::ClientInfo snaptrim;
crimson::dmclock::ClientInfo recov;
crimson::dmclock::ClientInfo scrub;
mclock_op_tags_t(CephContext *cct);
};
static std::unique_ptr<mclock_op_tags_t> mclock_op_tags;
public:
mClockOpClassQueue(CephContext *cct);
static crimson::dmclock::ClientInfo
op_class_client_info_f(const osd_op_type_t& op_type);
inline unsigned length() const override final {
return queue.length();
}
// Ops of this priority should be deleted immediately
inline void remove_by_class(Client cl,
std::list<Request> *out) override final {
queue.remove_by_filter(
[&cl, out] (const Request& r) -> bool {
if (cl == r.second.get_owner()) {
out->push_front(r);
return true;
} else {
return false;
}
});
}
inline void enqueue_strict(Client cl,
unsigned priority,
Request item) override final {
queue.enqueue_strict(get_osd_op_type(item), priority, item);
}
// Enqueue op in the front of the strict queue
inline void enqueue_strict_front(Client cl,
unsigned priority,
Request item) override final {
queue.enqueue_strict_front(get_osd_op_type(item), priority, item);
}
// Enqueue op in the back of the regular queue
inline void enqueue(Client cl,
unsigned priority,
unsigned cost,
Request item) override final {
queue.enqueue(get_osd_op_type(item), priority, cost, item);
}
// Enqueue the op in the front of the regular queue
inline void enqueue_front(Client cl,
unsigned priority,
unsigned cost,
Request item) override final {
queue.enqueue_front(get_osd_op_type(item), priority, cost, item);
}
// Returns if the queue is empty
inline bool empty() const override final {
return queue.empty();
}
// Return an op to be dispatch
inline Request dequeue() override final {
return queue.dequeue();
}
// Formatted output of the queue
void dump(ceph::Formatter *f) const override final;
protected:
struct pg_queueable_visitor_t : public boost::static_visitor<osd_op_type_t> {
osd_op_type_t operator()(const OpRequestRef& o) const {
// don't know if it's a client_op or a
return osd_op_type_t::client_op;
}
osd_op_type_t operator()(const PGSnapTrim& o) const {
return osd_op_type_t::bg_snaptrim;
}
osd_op_type_t operator()(const PGScrub& o) const {
return osd_op_type_t::bg_scrub;
}
osd_op_type_t operator()(const PGRecovery& o) const {
return osd_op_type_t::bg_recovery;
}
}; // class pg_queueable_visitor_t
static pg_queueable_visitor_t pg_queueable_visitor;
osd_op_type_t get_osd_op_type(const Request& request);
}; // class mClockOpClassAdapter
} // namespace ceph

View File

@ -42,6 +42,13 @@ add_executable(unittest_prioritized_queue
add_ceph_unittest(unittest_prioritized_queue ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_prioritized_queue)
target_link_libraries(unittest_prioritized_queue global ${BLKID_LIBRARIES})
# unittest_mclock_priority_queue
add_executable(unittest_mclock_priority_queue EXCLUDE_FROM_ALL
test_mclock_priority_queue.cc
)
add_ceph_unittest(unittest_mclock_priority_queue ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_mclock_priority_queue)
target_link_libraries(unittest_mclock_priority_queue global ${BLKID_LIBRARIES} dmclock)
# unittest_str_map
add_executable(unittest_str_map
test_str_map.cc

View File

@ -0,0 +1,318 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2017 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <thread>
#include <chrono>
#include <iostream>
#include "gtest/gtest.h"
#include "common/mClockPriorityQueue.h"
struct Request {
int value;
Request() = default;
Request(const Request& o) = default;
Request(int value) :
value(value)
{}
};
struct Client {
int client_num;
Client() :
Client(-1)
{}
Client(int client_num) :
client_num(client_num)
{}
friend bool operator<(const Client& r1, const Client& r2) {
return r1.client_num < r2.client_num;
}
friend bool operator==(const Client& r1, const Client& r2) {
return r1.client_num == r2.client_num;
}
};
crimson::dmclock::ClientInfo client_info_func(const Client& c) {
static const crimson::dmclock::ClientInfo
the_info(10.0, 10.0, 10.0);
return the_info;
}
TEST(mClockPriorityQueue, Create)
{
ceph::mClockQueue<Request,Client> q(&client_info_func);
}
TEST(mClockPriorityQueue, Sizes)
{
ceph::mClockQueue<Request,Client> q(&client_info_func);
ASSERT_TRUE(q.empty());
ASSERT_EQ(0u, q.length());
Client c1(1);
Client c2(2);
q.enqueue_strict(c1, 1, Request(1));
q.enqueue_strict(c2, 2, Request(2));
q.enqueue_strict(c1, 2, Request(3));
q.enqueue(c2, 1, 0, Request(4));
q.enqueue(c1, 2, 0, Request(5));
q.enqueue_strict(c2, 1, Request(6));
ASSERT_FALSE(q.empty());
ASSERT_EQ(6u, q.length());
for (int i = 0; i < 6; ++i) {
(void) q.dequeue();
}
ASSERT_TRUE(q.empty());
ASSERT_EQ(0u, q.length());
}
TEST(mClockPriorityQueue, JustStrict)
{
ceph::mClockQueue<Request,Client> q(&client_info_func);
Client c1(1);
Client c2(2);
q.enqueue_strict(c1, 1, Request(1));
q.enqueue_strict(c2, 2, Request(2));
q.enqueue_strict(c1, 2, Request(3));
q.enqueue_strict(c2, 1, Request(4));
Request r;
r = q.dequeue();
ASSERT_EQ(2, r.value);
r = q.dequeue();
ASSERT_EQ(3, r.value);
r = q.dequeue();
ASSERT_EQ(1, r.value);
r = q.dequeue();
ASSERT_EQ(4, r.value);
}
TEST(mClockPriorityQueue, StrictPriorities)
{
ceph::mClockQueue<Request,Client> q(&client_info_func);
Client c1(1);
Client c2(2);
q.enqueue_strict(c1, 1, Request(1));
q.enqueue_strict(c2, 2, Request(2));
q.enqueue_strict(c1, 3, Request(3));
q.enqueue_strict(c2, 4, Request(4));
Request r;
r = q.dequeue();
ASSERT_EQ(4, r.value);
r = q.dequeue();
ASSERT_EQ(3, r.value);
r = q.dequeue();
ASSERT_EQ(2, r.value);
r = q.dequeue();
ASSERT_EQ(1, r.value);
}
TEST(mClockPriorityQueue, JustNotStrict)
{
ceph::mClockQueue<Request,Client> q(&client_info_func);
Client c1(1);
Client c2(2);
// non-strict queue ignores priorites, but will divide between
// clients evenly and maintain orders between clients
q.enqueue(c1, 1, 0, Request(1));
q.enqueue(c1, 2, 0, Request(2));
q.enqueue(c2, 3, 0, Request(3));
q.enqueue(c2, 4, 0, Request(4));
Request r1, r2;
r1 = q.dequeue();
ASSERT_TRUE(1 == r1.value || 3 == r1.value);
r2 = q.dequeue();
ASSERT_TRUE(1 == r2.value || 3 == r2.value);
ASSERT_NE(r1.value, r2.value);
r1 = q.dequeue();
ASSERT_TRUE(2 == r1.value || 4 == r1.value);
r2 = q.dequeue();
ASSERT_TRUE(2 == r2.value || 4 == r2.value);
ASSERT_NE(r1.value, r2.value);
}
TEST(mClockPriorityQueue, EnqueuFront)
{
ceph::mClockQueue<Request,Client> q(&client_info_func);
Client c1(1);
Client c2(2);
// non-strict queue ignores priorites, but will divide between
// clients evenly and maintain orders between clients
q.enqueue(c1, 1, 0, Request(1));
q.enqueue(c1, 2, 0, Request(2));
q.enqueue(c2, 3, 0, Request(3));
q.enqueue(c2, 4, 0, Request(4));
q.enqueue_strict(c2, 6, Request(6));
q.enqueue_strict(c1, 7, Request(7));
std::list<Request> reqs;
for (uint i = 0; i < 4; ++i) {
reqs.emplace_back(q.dequeue());
}
for (uint i = 0; i < 4; ++i) {
Request& r = reqs.front();
if (r.value > 5) {
q.enqueue_strict_front(r.value == 6 ? c2 : 1, r.value, r);
} else {
q.enqueue_front(r.value <= 2 ? c1 : c2, r.value, 0, r);
}
reqs.pop_front();
}
Request r;
r = q.dequeue();
ASSERT_EQ(7, r.value);
r = q.dequeue();
ASSERT_EQ(6, r.value);
r = q.dequeue();
ASSERT_TRUE(1 == r.value || 3 == r.value);
r = q.dequeue();
ASSERT_TRUE(1 == r.value || 3 == r.value);
r = q.dequeue();
ASSERT_TRUE(2 == r.value || 4 == r.value);
r = q.dequeue();
ASSERT_TRUE(2 == r.value || 4 == r.value);
}
TEST(mClockPriorityQueue, RemoveByClass)
{
ceph::mClockQueue<Request,Client> q(&client_info_func);
Client c1(1);
Client c2(2);
Client c3(3);
q.enqueue(c1, 1, 0, Request(1));
q.enqueue(c2, 1, 0, Request(2));
q.enqueue(c3, 1, 0, Request(4));
q.enqueue_strict(c1, 2, Request(8));
q.enqueue_strict(c2, 1, Request(16));
q.enqueue_strict(c3, 3, Request(32));
q.enqueue(c3, 1, 0, Request(64));
q.enqueue(c2, 1, 0, Request(128));
q.enqueue(c1, 1, 0, Request(256));
int out_mask = 2 | 16 | 128;
int in_mask = 1 | 8 | 256;
std::list<Request> out;
q.remove_by_class(c2, &out);
ASSERT_EQ(3u, out.size());
while (!out.empty()) {
ASSERT_TRUE((out.front().value & out_mask) > 0) <<
"had value that was not expected after first removal";
out.pop_front();
}
ASSERT_EQ(6u, q.length()) << "after removal of three from client c2";
q.remove_by_class(c3);
ASSERT_EQ(3u, q.length()) << "after removal of three from client c3";
while (!q.empty()) {
Request r = q.dequeue();
ASSERT_TRUE((r.value & in_mask) > 0) <<
"had value that was not expected after two removals";
}
}
TEST(mClockPriorityQueue, RemoveByFilter)
{
ceph::mClockQueue<Request,Client> q(&client_info_func);
Client c1(1);
Client c2(2);
Client c3(3);
q.enqueue(c1, 1, 0, Request(1));
q.enqueue(c2, 1, 0, Request(2));
q.enqueue(c3, 1, 0, Request(3));
q.enqueue_strict(c1, 2, Request(4));
q.enqueue_strict(c2, 1, Request(5));
q.enqueue_strict(c3, 3, Request(6));
q.enqueue(c3, 1, 0, Request(7));
q.enqueue(c2, 1, 0, Request(8));
q.enqueue(c1, 1, 0, Request(9));
std::list<Request> filtered;
q.remove_by_filter([&](const Request& r) -> bool {
if (r.value & 2) {
filtered.push_back(r);
return true;
} else {
return false;
}
});
ASSERT_EQ(4u, filtered.size()) <<
"filter should have removed four elements";
while (!filtered.empty()) {
ASSERT_TRUE((filtered.front().value & 2) > 0) <<
"expect this value to have been filtered out";
filtered.pop_front();
}
ASSERT_EQ(5u, q.length()) <<
"filter should have left five remaining elements";
while (!q.empty()) {
Request r = q.dequeue();
ASSERT_TRUE((r.value & 2) == 0) <<
"expect this value to have been left in";
}
}

View File

@ -161,60 +161,6 @@ TEST_F(PrioritizedQueueTest, fairness_by_class) {
}
}
template <typename T>
struct Greater {
const T rhs;
std::list<T> *removed;
explicit Greater(const T& v, std::list<T> *removed) : rhs(v), removed(removed)
{}
bool operator()(const T& lhs) {
if (lhs > rhs) {
if (removed)
removed->push_back(lhs);
return true;
} else {
return false;
}
}
};
TEST_F(PrioritizedQueueTest, remove_by_filter) {
const unsigned min_cost = 1;
const unsigned max_tokens_per_subqueue = 50;
PQ pq(max_tokens_per_subqueue, min_cost);
Greater<Item> pred(item_size/2, nullptr);
unsigned num_to_remove = 0;
for (unsigned i = 0; i < item_size; i++) {
const Item& item = items[i];
pq.enqueue(Klass(1), 0, 10, item);
if (pred(item)) {
num_to_remove++;
}
}
std::list<Item> removed;
Greater<Item> pred2(item_size/2, &removed);
pq.remove_by_filter(pred2);
// see if the removed items are expected ones.
for (std::list<Item>::iterator it = removed.begin();
it != removed.end();
++it) {
const Item& item = *it;
EXPECT_TRUE(pred(item));
items.erase(remove(items.begin(), items.end(), item), items.end());
}
EXPECT_EQ(num_to_remove, removed.size());
EXPECT_EQ(item_size - num_to_remove, pq.length());
EXPECT_EQ(item_size - num_to_remove, items.size());
// see if the remainder are expeceted also.
while (!pq.empty()) {
const Item item = pq.dequeue();
EXPECT_FALSE(pred(item));
items.erase(remove(items.begin(), items.end(), item), items.end());
}
EXPECT_TRUE(items.empty());
}
TEST_F(PrioritizedQueueTest, remove_by_class) {
const unsigned min_cost = 1;

View File

@ -197,86 +197,6 @@ TEST_F(WeightedPriorityQueueTest, wpq_test_random) {
test_queue(rand() % 500 + 500, true);
}
template <typename T>
struct Greater {
const T rhs;
std::list<T> *removed;
Greater(const T &v, std::list<T> *removed) : rhs(v), removed(removed) {}
bool operator()(const T &lhs) {
if (std::get<2>(lhs) > std::get<2>(rhs)) {
if (removed)
removed->push_back(lhs);
return true;
} else {
return false;
}
}
};
TEST_F(WeightedPriorityQueueTest, wpq_test_remove_by_filter_null) {
WQ wq(0, 0);
LQ strictq, normq;
unsigned num_items = 100;
fill_queue(wq, strictq, normq, num_items);
// Pick a value that we didn't enqueue
Removed wq_removed;
Greater<Item> pred(std::make_tuple(0, 0, 1 << 17), &wq_removed);
wq.remove_by_filter(pred);
EXPECT_EQ(0u, wq_removed.size());
}
TEST_F(WeightedPriorityQueueTest, wpq_test_remove_by_filter) {
WQ wq(0, 0);
LQ strictq, normq;
unsigned num_items = 1000;
fill_queue(wq, strictq, normq, num_items);
Greater<Item> pred2(std::make_tuple(0, 0, (1 << 16) - (1 << 16)/10), nullptr);
Removed r_strictq, r_normq;
unsigned num_to_remove = 0;
// Figure out from what has been queued what we
// expect to be removed
for (LQ::iterator pi = strictq.begin();
pi != strictq.end(); ++pi) {
for (KlassItem::iterator ki = pi->second.begin();
ki != pi->second.end(); ++ki) {
for (ItemList::iterator li = ki->second.begin();
li != ki->second.end(); ++li) {
if (pred2(li->second)) {
++num_to_remove;
}
}
}
}
for (LQ::iterator pi = normq.begin();
pi != normq.end(); ++pi) {
for (KlassItem::iterator ki = pi->second.begin();
ki != pi->second.end(); ++ki) {
for (ItemList::iterator li = ki->second.begin();
li != ki->second.end(); ++li) {
if (pred2(li->second)) {
++num_to_remove;
}
}
}
}
Removed wq_removed;
Greater<Item> pred(
std::make_tuple(0, 0, (1 << 16) - (1 << 16)/10),
&wq_removed);
wq.remove_by_filter(pred);
// Check that what was removed was correct
for (Removed::iterator it = wq_removed.begin();
it != wq_removed.end(); ++it) {
EXPECT_TRUE(pred2(*it));
}
EXPECT_EQ(num_to_remove, wq_removed.size());
EXPECT_EQ(num_items - num_to_remove, wq.length());
// Make sure that none were missed
while (!(wq.empty())) {
EXPECT_FALSE(pred(wq.dequeue()));
}
}
TEST_F(WeightedPriorityQueueTest, wpq_test_remove_by_class_null) {
WQ wq(0, 0);
LQ strictq, normq;

View File

@ -106,3 +106,25 @@ add_executable(unittest_ec_transaction
)
add_ceph_unittest(unittest_ec_transaction ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_ec_transaction)
target_link_libraries(unittest_ec_transaction osd global ${BLKID_LIBRARIES})
# unittest_mclock_op_class_queue
add_executable(unittest_mclock_op_class_queue
TestMClockOpClassQueue.cc
)
add_ceph_unittest(unittest_mclock_op_class_queue
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_mclock_op_class_queue
)
target_link_libraries(unittest_mclock_op_class_queue
global osd dmclock
)
# unittest_mclock_client_queue
add_executable(unittest_mclock_client_queue
TestMClockClientQueue.cc
)
add_ceph_unittest(unittest_mclock_client_queue
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_mclock_client_queue
)
target_link_libraries(unittest_mclock_client_queue
global osd dmclock
)

View File

@ -0,0 +1,187 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
#include <iostream>
#include "gtest/gtest.h"
#include "global/global_init.h"
#include "common/common_init.h"
#include "osd/mClockClientQueue.h"
int main(int argc, char **argv) {
std::vector<const char*> args(argv, argv+argc);
auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
CODE_ENVIRONMENT_UTILITY,
CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
common_init_finish(g_ceph_context);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
class MClockClientQueueTest : public testing::Test {
public:
mClockClientQueue q;
entity_inst_t client1;
entity_inst_t client2;
entity_inst_t client3;
MClockClientQueueTest() :
q(g_ceph_context),
client1(entity_name_t(CEPH_ENTITY_TYPE_OSD, 1), entity_addr_t()),
client2(entity_name_t(CEPH_ENTITY_TYPE_OSD, 2), entity_addr_t()),
client3(entity_name_t(CEPH_ENTITY_TYPE_CLIENT, 1), entity_addr_t())
{}
#if 0 // more work needed here
Request create_client_op(epoch_t e, const entity_inst_t& owner) {
return Request(spg_t(), PGQueueable(OpRequestRef(), e));
}
#endif
Request create_snaptrim(epoch_t e, const entity_inst_t& owner) {
return Request(spg_t(),
PGQueueable(PGSnapTrim(e),
12, 12,
utime_t(), owner, e));
}
Request create_scrub(epoch_t e, const entity_inst_t& owner) {
return Request(spg_t(),
PGQueueable(PGScrub(e),
12, 12,
utime_t(), owner, e));
}
Request create_recovery(epoch_t e, const entity_inst_t& owner) {
return Request(spg_t(),
PGQueueable(PGRecovery(e, 64),
12, 12,
utime_t(), owner, e));
}
};
TEST_F(MClockClientQueueTest, TestSize) {
ASSERT_TRUE(q.empty());
ASSERT_EQ(0u, q.length());
q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
q.enqueue_strict(client2, 12, create_snaptrim(101, client2));
q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
q.enqueue_strict(client3, 12, create_snaptrim(103, client3));
q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
ASSERT_FALSE(q.empty());
ASSERT_EQ(5u, q.length());
std::list<Request> reqs;
reqs.push_back(q.dequeue());
reqs.push_back(q.dequeue());
reqs.push_back(q.dequeue());
ASSERT_FALSE(q.empty());
ASSERT_EQ(2u, q.length());
q.enqueue_front(client2, 12, 0, reqs.back());
reqs.pop_back();
q.enqueue_strict_front(client3, 12, reqs.back());
reqs.pop_back();
q.enqueue_strict_front(client2, 12, reqs.back());
reqs.pop_back();
ASSERT_FALSE(q.empty());
ASSERT_EQ(5u, q.length());
for (int i = 0; i < 5; ++i) {
(void) q.dequeue();
}
ASSERT_TRUE(q.empty());
ASSERT_EQ(0u, q.length());
}
TEST_F(MClockClientQueueTest, TestEnqueue) {
q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
q.enqueue(client2, 12, 0, create_snaptrim(101, client2));
q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
q.enqueue(client3, 12, 0, create_snaptrim(103, client3));
q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
Request r = q.dequeue();
ASSERT_EQ(100u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(101u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(103u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_TRUE(r.second.get_map_epoch() == 102u ||
r.second.get_map_epoch() == 104u);
r = q.dequeue();
ASSERT_TRUE(r.second.get_map_epoch() == 102u ||
r.second.get_map_epoch() == 104u);
}
TEST_F(MClockClientQueueTest, TestEnqueueStrict) {
q.enqueue_strict(client1, 12, create_snaptrim(100, client1));
q.enqueue_strict(client2, 13, create_snaptrim(101, client2));
q.enqueue_strict(client2, 16, create_snaptrim(102, client2));
q.enqueue_strict(client3, 14, create_snaptrim(103, client3));
q.enqueue_strict(client1, 15, create_snaptrim(104, client1));
Request r = q.dequeue();
ASSERT_EQ(102u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(104u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(103u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(101u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(100u, r.second.get_map_epoch());
}
TEST_F(MClockClientQueueTest, TestRemoveByClass) {
q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
q.enqueue_strict(client2, 12, create_snaptrim(101, client2));
q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
q.enqueue_strict(client3, 12, create_snaptrim(103, client3));
q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
std::list<Request> filtered_out;
q.remove_by_class(client2, &filtered_out);
ASSERT_EQ(2u, filtered_out.size());
while (!filtered_out.empty()) {
auto e = filtered_out.front().second.get_map_epoch() ;
ASSERT_TRUE(e == 101 || e == 102);
filtered_out.pop_front();
}
ASSERT_EQ(3u, q.length());
Request r = q.dequeue();
ASSERT_EQ(103u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(100u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(104u, r.second.get_map_epoch());
}

View File

@ -0,0 +1,187 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
#include <iostream>
#include "gtest/gtest.h"
#include "global/global_context.h"
#include "global/global_init.h"
#include "common/common_init.h"
#include "osd/mClockOpClassQueue.h"
int main(int argc, char **argv) {
std::vector<const char*> args(argv, argv+argc);
auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
CODE_ENVIRONMENT_UTILITY,
CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
common_init_finish(g_ceph_context);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
class MClockOpClassQueueTest : public testing::Test {
public:
mClockOpClassQueue q;
entity_inst_t client1;
entity_inst_t client2;
entity_inst_t client3;
MClockOpClassQueueTest() :
q(g_ceph_context),
client1(entity_name_t(CEPH_ENTITY_TYPE_OSD, 1), entity_addr_t()),
client2(entity_name_t(CEPH_ENTITY_TYPE_OSD, 2), entity_addr_t()),
client3(entity_name_t(CEPH_ENTITY_TYPE_CLIENT, 1), entity_addr_t())
{}
#if 0 // more work needed here
Request create_client_op(epoch_t e, const entity_inst_t& owner) {
return Request(spg_t(), PGQueueable(OpRequestRef(), e));
}
#endif
Request create_snaptrim(epoch_t e, const entity_inst_t& owner) {
return Request(spg_t(),
PGQueueable(PGSnapTrim(e),
12, 12,
utime_t(), owner, e));
}
Request create_scrub(epoch_t e, const entity_inst_t& owner) {
return Request(spg_t(),
PGQueueable(PGScrub(e),
12, 12,
utime_t(), owner, e));
}
Request create_recovery(epoch_t e, const entity_inst_t& owner) {
return Request(spg_t(),
PGQueueable(PGRecovery(e, 64),
12, 12,
utime_t(), owner, e));
}
};
TEST_F(MClockOpClassQueueTest, TestSize) {
ASSERT_TRUE(q.empty());
ASSERT_EQ(0u, q.length());
q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
q.enqueue_strict(client2, 12, create_snaptrim(101, client2));
q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
q.enqueue_strict(client3, 12, create_snaptrim(103, client3));
q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
ASSERT_FALSE(q.empty());
ASSERT_EQ(5u, q.length());
std::list<Request> reqs;
reqs.push_back(q.dequeue());
reqs.push_back(q.dequeue());
reqs.push_back(q.dequeue());
ASSERT_FALSE(q.empty());
ASSERT_EQ(2u, q.length());
q.enqueue_front(client2, 12, 0, reqs.back());
reqs.pop_back();
q.enqueue_strict_front(client3, 12, reqs.back());
reqs.pop_back();
q.enqueue_strict_front(client2, 12, reqs.back());
reqs.pop_back();
ASSERT_FALSE(q.empty());
ASSERT_EQ(5u, q.length());
for (int i = 0; i < 5; ++i) {
(void) q.dequeue();
}
ASSERT_TRUE(q.empty());
ASSERT_EQ(0u, q.length());
}
TEST_F(MClockOpClassQueueTest, TestEnqueue) {
q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
q.enqueue(client2, 12, 0, create_snaptrim(101, client2));
q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
q.enqueue(client3, 12, 0, create_snaptrim(103, client3));
q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
Request r = q.dequeue();
ASSERT_EQ(100u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(101u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(102u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(103u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(104u, r.second.get_map_epoch());
}
TEST_F(MClockOpClassQueueTest, TestEnqueueStrict) {
q.enqueue_strict(client1, 12, create_snaptrim(100, client1));
q.enqueue_strict(client2, 13, create_snaptrim(101, client2));
q.enqueue_strict(client2, 16, create_snaptrim(102, client2));
q.enqueue_strict(client3, 14, create_snaptrim(103, client3));
q.enqueue_strict(client1, 15, create_snaptrim(104, client1));
Request r = q.dequeue();
ASSERT_EQ(102u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(104u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(103u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(101u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(100u, r.second.get_map_epoch());
}
TEST_F(MClockOpClassQueueTest, TestRemoveByClass) {
q.enqueue(client1, 12, 0, create_snaptrim(100, client1));
q.enqueue_strict(client2, 12, create_snaptrim(101, client2));
q.enqueue(client2, 12, 0, create_snaptrim(102, client2));
q.enqueue_strict(client3, 12, create_snaptrim(103, client3));
q.enqueue(client1, 12, 0, create_snaptrim(104, client1));
std::list<Request> filtered_out;
q.remove_by_class(client2, &filtered_out);
ASSERT_EQ(2u, filtered_out.size());
while (!filtered_out.empty()) {
auto e = filtered_out.front().second.get_map_epoch() ;
ASSERT_TRUE(e == 101 || e == 102);
filtered_out.pop_front();
}
ASSERT_EQ(3u, q.length());
Request r = q.dequeue();
ASSERT_EQ(103u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(100u, r.second.get_map_epoch());
r = q.dequeue();
ASSERT_EQ(104u, r.second.get_map_epoch());
}