Merge pull request #53531 from ronen-fr/wip-rf-squeue2

osd/scrub: extract scrub initiation code out of the OSD

Reviewed-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Ronen Friedman 2023-09-22 16:46:38 +03:00 committed by GitHub
commit 5ecd20f620
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1509 additions and 911 deletions

View File

@ -49,9 +49,12 @@ function TEST_scrub_test() {
run_mon $dir a --osd_pool_default_size=3 || return 1
run_mgr $dir x || return 1
local ceph_osd_args="--osd-scrub-interval-randomize-ratio=0 --osd-deep-scrub-randomize-ratio=0 "
ceph_osd_args+="--osd_scrub_backoff_ratio=0 --osd_stats_update_period_not_scrubbing=3 "
ceph_osd_args+="--osd_stats_update_period_scrubbing=2"
for osd in $(seq 0 $(expr $OSDS - 1))
do
run_osd $dir $osd || return 1
run_osd $dir $osd $ceph_osd_args || return 1
done
# Create a pool with a single pg
@ -211,16 +214,17 @@ function TEST_scrub_extended_sleep() {
run_mon $dir a --osd_pool_default_size=3 || return 1
run_mgr $dir x || return 1
local ceph_osd_args="--osd-scrub-interval-randomize-ratio=0 --osd-deep-scrub-randomize-ratio=0 "
ceph_osd_args+="--osd_scrub_backoff_ratio=0 --osd_stats_update_period_not_scrubbing=3 "
ceph_osd_args+="--osd_stats_update_period_scrubbing=2 --osd_scrub_sleep=0 "
ceph_osd_args+="--osd_scrub_extended_sleep=20 --osd_scrub_begin_week_day=$DAY_START "
ceph_osd_args+="--osd_op_queue=wpq --osd_scrub_end_week_day=$DAY_END "
ceph_osd_args+="--bluestore_cache_autotune=false" # why needed?
for osd in $(seq 0 $(expr $OSDS - 1))
do
run_osd $dir $osd --osd_scrub_sleep=0 \
--osd_scrub_extended_sleep=20 \
--bluestore_cache_autotune=false \
--osd_deep_scrub_randomize_ratio=0.0 \
--osd_scrub_interval_randomize_ratio=0 \
--osd_scrub_begin_week_day=$DAY_START \
--osd_scrub_end_week_day=$DAY_END \
|| return 1
run_osd $dir $osd $ceph_osd_args || return 1
done
# Create a pool with a single pg
@ -527,6 +531,8 @@ function TEST_dump_scrub_schedule() {
--osd_scrub_interval_randomize_ratio=0 \
--osd_scrub_backoff_ratio=0.0 \
--osd_op_queue=wpq \
--osd_stats_update_period_not_scrubbing=3 \
--osd_stats_update_period_scrubbing=2 \
--osd_scrub_sleep=0.2"
for osd in $(seq 0 $(expr $OSDS - 1))

View File

@ -22,9 +22,12 @@ set(osd_srcs
PGBackend.cc
OSDCap.cc
scrubber/pg_scrubber.cc
scrubber/osd_scrub.cc
scrubber/osd_scrub_sched.cc
scrubber/PrimaryLogScrub.cc
scrubber/scrub_job.cc
scrubber/scrub_machine.cc
scrubber/scrub_resources.cc
scrubber/ScrubStore.cc
scrubber/scrub_backend.cc
Watch.cc

View File

@ -246,7 +246,7 @@ OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) :
osd_skip_data_digest(cct->_conf, "osd_skip_data_digest"),
publish_lock{ceph::make_mutex("OSDService::publish_lock")},
pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
m_scrub_queue{cct, *this},
m_osd_scrub{cct, *this, cct->_conf},
agent_valid_iterator(false),
agent_ops(0),
flush_mode_high_count(0),
@ -2853,7 +2853,7 @@ will start to track new ops received afterwards.";
f->close_section();
} else if (prefix == "dump_scrub_reservations") {
f->open_object_section("scrub_reservations");
service.get_scrub_services().dump_scrub_reservations(f);
service.get_scrub_services().resource_bookkeeper().dump_scrub_reservations(f);
f->close_section();
} else if (prefix == "get_latest_osdmap") {
get_latest_osdmap();
@ -6282,9 +6282,7 @@ void OSD::tick_without_osd_lock()
}
if (is_active()) {
if (!scrub_random_backoff()) {
sched_scrub();
}
service.get_scrub_services().initiate_scrub(service.is_recovery_active());
service.promote_throttle_recalibrate();
resume_creating_pg();
bool need_send_beacon = false;
@ -7597,131 +7595,17 @@ void OSD::handle_fast_scrub(MOSDScrub2 *m)
m->put();
}
bool OSD::scrub_random_backoff()
std::optional<PGLockWrapper> OSDService::get_locked_pg(spg_t pgid)
{
bool coin_flip = (rand() / (double)RAND_MAX >=
cct->_conf->osd_scrub_backoff_ratio);
if (!coin_flip) {
dout(20) << "scrub_random_backoff lost coin flip, randomly backing off (ratio: "
<< cct->_conf->osd_scrub_backoff_ratio << ")" << dendl;
return true;
auto pg = osd->lookup_lock_pg(pgid);
if (pg) {
return PGLockWrapper{std::move(pg)};
} else {
return std::nullopt;
}
return false;
}
void OSD::sched_scrub()
{
auto& scrub_scheduler = service.get_scrub_services();
if (auto blocked_pgs = scrub_scheduler.get_blocked_pgs_count();
blocked_pgs > 0) {
// some PGs managed by this OSD were blocked by a locked object during
// scrub. This means we might not have the resources needed to scrub now.
dout(10)
<< fmt::format(
"{}: PGs are blocked while scrubbing due to locked objects ({} PGs)",
__func__,
blocked_pgs)
<< dendl;
}
// fail fast if no resources are available
if (!scrub_scheduler.can_inc_scrubs()) {
dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
return;
}
// if there is a PG that is just now trying to reserve scrub replica resources -
// we should wait and not initiate a new scrub
if (scrub_scheduler.is_reserving_now()) {
dout(20) << __func__ << ": scrub resources reservation in progress" << dendl;
return;
}
Scrub::ScrubPreconds env_conditions;
if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
if (!cct->_conf->osd_repair_during_recovery) {
dout(15) << __func__ << ": not scheduling scrubs due to active recovery"
<< dendl;
return;
}
dout(10) << __func__
<< " will only schedule explicitly requested repair due to active recovery"
<< dendl;
env_conditions.allow_requested_repair_only = true;
}
if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
dout(20) << __func__ << " sched_scrub starts" << dendl;
auto all_jobs = scrub_scheduler.list_registered_jobs();
for (const auto& sj : all_jobs) {
dout(20) << "sched_scrub scrub-queue jobs: " << *sj << dendl;
}
}
auto was_started = scrub_scheduler.select_pg_and_scrub(env_conditions);
dout(20) << "sched_scrub done (" << ScrubQueue::attempt_res_text(was_started)
<< ")" << dendl;
}
Scrub::schedule_result_t OSDService::initiate_a_scrub(spg_t pgid,
bool allow_requested_repair_only)
{
dout(20) << __func__ << " trying " << pgid << dendl;
// we have a candidate to scrub. We need some PG information to know if scrubbing is
// allowed
PGRef pg = osd->lookup_lock_pg(pgid);
if (!pg) {
// the PG was dequeued in the short timespan between creating the candidates list
// (collect_ripe_jobs()) and here
dout(5) << __func__ << " pg " << pgid << " not found" << dendl;
return Scrub::schedule_result_t::no_such_pg;
}
// This has already started, so go on to the next scrub job
if (pg->is_scrub_queued_or_active()) {
pg->unlock();
dout(20) << __func__ << ": already in progress pgid " << pgid << dendl;
return Scrub::schedule_result_t::already_started;
}
// Skip other kinds of scrubbing if only explicitly requested repairing is allowed
if (allow_requested_repair_only && !pg->get_planned_scrub().must_repair) {
pg->unlock();
dout(10) << __func__ << " skip " << pgid
<< " because repairing is not explicitly requested on it" << dendl;
return Scrub::schedule_result_t::preconditions;
}
auto scrub_attempt = pg->sched_scrub();
pg->unlock();
return scrub_attempt;
}
void OSD::resched_all_scrubs()
{
dout(10) << __func__ << ": start" << dendl;
auto all_jobs = service.get_scrub_services().list_registered_jobs();
for (auto& e : all_jobs) {
auto& job = *e;
dout(20) << __func__ << ": examine " << job.pgid << dendl;
PGRef pg = _lookup_lock_pg(job.pgid);
if (!pg)
continue;
dout(15) << __func__ << ": updating scrub schedule on " << job.pgid << dendl;
pg->on_scrub_schedule_input_change();
pg->unlock();
}
dout(10) << __func__ << ": done" << dendl;
}
MPGStats* OSD::collect_pg_stats()
{
dout(15) << __func__ << dendl;
@ -9955,10 +9839,17 @@ void OSD::handle_conf_change(const ConfigProxy& conf,
}
if (changed.count("osd_scrub_min_interval") ||
changed.count("osd_scrub_max_interval")) {
resched_all_scrubs();
dout(0) << __func__ << ": scrub interval change" << dendl;
changed.count("osd_scrub_max_interval") ||
changed.count("osd_deep_scrub_interval")) {
service.get_scrub_services().on_config_change();
dout(0) << fmt::format(
"{}: scrub interval change (min:{} deep:{} max:{})",
__func__, cct->_conf->osd_scrub_min_interval,
cct->_conf->osd_deep_scrub_interval,
cct->_conf->osd_scrub_max_interval)
<< dendl;
}
check_config();
if (changed.count("osd_asio_thread_count")) {
service.poolctx.stop();

View File

@ -53,7 +53,7 @@
#include "common/EventTrace.h"
#include "osd/osd_perf_counters.h"
#include "common/Finisher.h"
#include "scrubber/osd_scrub_sched.h"
#include "scrubber/osd_scrub.h"
#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
@ -239,30 +239,18 @@ public:
void handle_misdirected_op(PG *pg, OpRequestRef op);
private:
/**
* The entity that maintains the set of PGs we may scrub (i.e. - those that we
* are their primary), and schedules their scrubbing.
*/
ScrubQueue m_scrub_queue;
/// the entity that offloads all scrubbing-related operations
OsdScrub m_osd_scrub;
public:
ScrubQueue& get_scrub_services() { return m_scrub_queue; }
OsdScrub& get_scrub_services() { return m_osd_scrub; }
/**
* A callback used by the ScrubQueue object to initiate a scrub on a specific PG.
*
* The request might fail for multiple reasons, as ScrubQueue cannot by its own
* check some of the PG-specific preconditions and those are checked here. See
* attempt_t definition.
*
* @param pgid to scrub
* @param allow_requested_repair_only
* @return a Scrub::attempt_t detailing either a success, or the failure reason.
* locks the named PG, returning an RAII wrapper that unlocks upon
* destruction.
* returns nullopt if failing to lock.
*/
Scrub::schedule_result_t initiate_a_scrub(
spg_t pgid,
bool allow_requested_repair_only) final;
std::optional<PGLockWrapper> get_locked_pg(spg_t pgid) final;
private:
// -- agent shared state --
@ -1867,9 +1855,7 @@ protected:
// -- scrubbing --
void sched_scrub();
void resched_all_scrubs();
bool scrub_random_backoff();
// -- status reporting --
MPGStats *collect_pg_stats();

View File

@ -1348,11 +1348,14 @@ Scrub::schedule_result_t PG::sched_scrub()
ceph_assert(m_scrubber);
if (is_scrub_queued_or_active()) {
return schedule_result_t::already_started;
dout(10) << __func__ << ": already scrubbing" << dendl;
return schedule_result_t::target_specific_failure;
}
if (!is_primary() || !is_active() || !is_clean()) {
return schedule_result_t::bad_pg_state;
dout(10) << __func__ << ": cannot scrub (not a clean and active primary)"
<< dendl;
return schedule_result_t::target_specific_failure;
}
if (state_test(PG_STATE_SNAPTRIM) || state_test(PG_STATE_SNAPTRIM_WAIT)) {
@ -1360,7 +1363,7 @@ Scrub::schedule_result_t PG::sched_scrub()
// (on the transition from NotTrimming to Trimming/WaitReservation),
// i.e. some time before setting 'snaptrim'.
dout(10) << __func__ << ": cannot scrub while snap-trimming" << dendl;
return schedule_result_t::bad_pg_state;
return schedule_result_t::target_specific_failure;
}
// analyse the combination of the requested scrub flags, the osd/pool configuration
@ -1372,14 +1375,14 @@ Scrub::schedule_result_t PG::sched_scrub()
// (due to configuration or priority issues)
// The reason was already reported by the callee.
dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
return schedule_result_t::preconditions;
return schedule_result_t::target_specific_failure;
}
// try to reserve the local OSD resources. If failing: no harm. We will
// be retried by the OSD later on.
if (!m_scrubber->reserve_local()) {
dout(10) << __func__ << ": failed to reserve locally" << dendl;
return schedule_result_t::no_local_resources;
return schedule_result_t::osd_wide_failure;
}
// can commit to the updated flags now, as nothing will stop the scrub
@ -2836,3 +2839,11 @@ void PG::with_heartbeat_peers(std::function<void(int)>&& f)
uint64_t PG::get_min_alloc_size() const {
return osd->store->get_min_alloc_size();
}
PGLockWrapper::~PGLockWrapper()
{
if (m_pg) {
// otherwise - we were 'moved from'
m_pg->unlock();
}
}

View File

@ -1450,4 +1450,22 @@ public:
}
};
/**
* Initialized with a locked PG. That PG is unlocked in the
* destructor.
* Used by OsdScrub when initiating a scrub.
*/
class PGLockWrapper {
public:
explicit PGLockWrapper(PGRef locked_pg) : m_pg{locked_pg} {}
PGRef pg() { return m_pg; }
~PGLockWrapper();
PGLockWrapper(PGLockWrapper&& rhs) : m_pg(std::move(rhs.m_pg)) {
rhs.m_pg = nullptr;
}
PGLockWrapper(const PGLockWrapper& rhs) = delete;
private:
PGRef m_pg;
};
#endif

View File

@ -0,0 +1,477 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "./osd_scrub.h"
#include "osd/OSD.h"
#include "osdc/Objecter.h"
#include "pg_scrubber.h"
using namespace ::std::chrono;
using namespace ::std::chrono_literals;
using namespace ::std::literals;
using schedule_result_t = Scrub::schedule_result_t;
#define dout_subsys ceph_subsys_osd
#undef dout_context
#define dout_context (cct)
#undef dout_prefix
#define dout_prefix _prefix_fn(_dout, this, __func__)
template <class T>
static std::ostream& _prefix_fn(std::ostream* _dout, T* t, std::string fn = "")
{
return t->gen_prefix(*_dout, fn);
}
OsdScrub::OsdScrub(
CephContext* cct,
Scrub::ScrubSchedListener& osd_svc,
const ceph::common::ConfigProxy& config)
: cct{cct}
, m_osd_svc{osd_svc}
, conf{config}
, m_resource_bookkeeper{[this](std::string msg) { log_fwd(msg); }, conf}
, m_queue{cct, m_osd_svc}
, m_log_prefix{fmt::format("osd.{} osd-scrub:", m_osd_svc.get_nodeid())}
, m_load_tracker{cct, conf, m_osd_svc.get_nodeid()}
{}
std::ostream& OsdScrub::gen_prefix(std::ostream& out, std::string_view fn) const
{
return out << m_log_prefix << fn << ": ";
}
void OsdScrub::dump_scrubs(ceph::Formatter* f) const
{
m_queue.dump_scrubs(f);
}
void OsdScrub::log_fwd(std::string_view text)
{
dout(20) << text << dendl;
}
bool OsdScrub::scrub_random_backoff() const
{
if (random_bool_with_probability(conf->osd_scrub_backoff_ratio)) {
dout(20) << fmt::format(
"lost coin flip, randomly backing off (ratio: {:.3f})",
conf->osd_scrub_backoff_ratio)
<< dendl;
return true; // backing off
}
return false;
}
void OsdScrub::initiate_scrub(bool is_recovery_active)
{
if (scrub_random_backoff()) {
// dice-roll says we should not scrub now
return;
}
if (auto blocked_pgs = get_blocked_pgs_count(); blocked_pgs > 0) {
// some PGs managed by this OSD were blocked by a locked object during
// scrub. This means we might not have the resources needed to scrub now.
dout(10)
<< fmt::format(
"PGs are blocked while scrubbing due to locked objects ({} PGs)",
blocked_pgs)
<< dendl;
}
// fail fast if no resources are available
if (!m_resource_bookkeeper.can_inc_scrubs()) {
dout(20) << "too many scrubs already running on this OSD" << dendl;
return;
}
// if there is a PG that is just now trying to reserve scrub replica resources -
// we should wait and not initiate a new scrub
if (m_queue.is_reserving_now()) {
dout(10) << "scrub resources reservation in progress" << dendl;
return;
}
utime_t scrub_time = ceph_clock_now();
dout(10) << fmt::format(
"time now:{}, recover is active?:{}", scrub_time,
is_recovery_active)
<< dendl;
// check the OSD-wide environment conditions (scrub resources, time, etc.).
// These may restrict the type of scrubs we are allowed to start, or just
// prevent us from starting any scrub at all.
auto env_restrictions =
restrictions_on_scrubbing(is_recovery_active, scrub_time);
if (!env_restrictions) {
return;
}
if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
dout(20) << "scrub scheduling (@tick) starts" << dendl;
auto all_jobs = m_queue.list_registered_jobs();
for (const auto& sj : all_jobs) {
dout(20) << fmt::format("\tscrub-queue jobs: {}", *sj) << dendl;
}
}
// at this phase of the refactoring: minimal changes to the
// queue interface used here: we ask for a list of
// eligible targets (based on the known restrictions).
// We try all elements of this list until a (possibly temporary) success.
auto candidates = m_queue.ready_to_scrub(*env_restrictions, scrub_time);
if (candidates.empty()) {
dout(20) << "no PGs are ready for scrubbing" << dendl;
return;
}
for (const auto& candidate : candidates) {
dout(20) << fmt::format("initiating scrub on pg[{}]", candidate) << dendl;
// we have a candidate to scrub. But we may fail when trying to initiate that
// scrub. For some failures - we can continue with the next candidate. For
// others - we should stop trying to scrub at this tick.
auto res = initiate_a_scrub(
candidate, env_restrictions->allow_requested_repair_only);
if (res == schedule_result_t::target_specific_failure) {
// continue with the next job.
// \todo: consider separate handling of "no such PG", as - later on -
// we should be removing both related targets.
continue;
} else if (res == schedule_result_t::osd_wide_failure) {
// no point in trying the other candidates at this time
break;
} else {
// the happy path. We are done
dout(20) << fmt::format("scrub initiated for pg[{}]", candidate.pgid)
<< dendl;
break;
}
}
}
std::optional<Scrub::OSDRestrictions> OsdScrub::restrictions_on_scrubbing(
bool is_recovery_active,
utime_t scrub_clock_now) const
{
// our local OSD may already be running too many scrubs
if (!m_resource_bookkeeper.can_inc_scrubs()) {
dout(10) << "OSD cannot inc scrubs" << dendl;
return std::nullopt;
}
// if there is a PG that is just now trying to reserve scrub replica resources
// - we should wait and not initiate a new scrub
if (m_queue.is_reserving_now()) {
dout(10) << "scrub resources reservation in progress" << dendl;
return std::nullopt;
}
Scrub::OSDRestrictions env_conditions;
env_conditions.time_permit = scrub_time_permit(scrub_clock_now);
env_conditions.load_is_low = m_load_tracker.scrub_load_below_threshold();
env_conditions.only_deadlined =
!env_conditions.time_permit || !env_conditions.load_is_low;
if (is_recovery_active && !conf->osd_scrub_during_recovery) {
if (!conf->osd_repair_during_recovery) {
dout(15) << "not scheduling scrubs due to active recovery" << dendl;
return std::nullopt;
}
dout(10) << "will only schedule explicitly requested repair due to active "
"recovery"
<< dendl;
env_conditions.allow_requested_repair_only = true;
}
return env_conditions;
}
Scrub::schedule_result_t OsdScrub::initiate_a_scrub(
spg_t pgid,
bool allow_requested_repair_only)
{
dout(20) << fmt::format("trying pg[{}]", pgid) << dendl;
// we have a candidate to scrub. We need some PG information to
// know if scrubbing is allowed
auto locked_pg = m_osd_svc.get_locked_pg(pgid);
if (!locked_pg) {
// the PG was dequeued in the short timespan between creating the
// candidates list (ready_to_scrub()) and here
dout(5) << fmt::format("pg[{}] not found", pgid) << dendl;
return Scrub::schedule_result_t::target_specific_failure;
}
// This one is already scrubbing, so go on to the next scrub job
if (locked_pg->pg()->is_scrub_queued_or_active()) {
dout(10) << fmt::format("pg[{}]: scrub already in progress", pgid) << dendl;
return Scrub::schedule_result_t::target_specific_failure;
}
// Skip other kinds of scrubbing if only explicitly requested repairing is allowed
if (allow_requested_repair_only &&
!locked_pg->pg()->get_planned_scrub().must_repair) {
dout(10) << fmt::format(
"skipping pg[{}] as repairing was not explicitly "
"requested for that pg",
pgid)
<< dendl;
return Scrub::schedule_result_t::target_specific_failure;
}
return locked_pg->pg()->sched_scrub();
}
void OsdScrub::on_config_change()
{
auto to_notify = m_queue.list_registered_jobs();
for (const auto& p : to_notify) {
dout(30) << fmt::format("rescheduling pg[{}] scrubs", *p) << dendl;
auto locked_pg = m_osd_svc.get_locked_pg(p->pgid);
if (!locked_pg)
continue;
dout(15) << fmt::format(
"updating scrub schedule on {}",
(locked_pg->pg())->get_pgid())
<< dendl;
locked_pg->pg()->on_scrub_schedule_input_change();
}
}
// ////////////////////////////////////////////////////////////////////////// //
// CPU load tracking and related
OsdScrub::LoadTracker::LoadTracker(
CephContext* cct,
const ceph::common::ConfigProxy& config,
int node_id)
: cct{cct}
, conf{config}
, log_prefix{fmt::format("osd.{} scrub-queue::load-tracker::", node_id)}
{
// initialize the daily loadavg with current 15min loadavg
if (double loadavgs[3]; getloadavg(loadavgs, 3) == 3) {
daily_loadavg = loadavgs[2];
} else {
derr << "OSD::init() : couldn't read loadavgs\n" << dendl;
daily_loadavg = 1.0;
}
}
///\todo replace with Knuth's algo (to reduce the numerical error)
std::optional<double> OsdScrub::LoadTracker::update_load_average()
{
int hb_interval = conf->osd_heartbeat_interval;
int n_samples = std::chrono::duration_cast<seconds>(24h).count();
if (hb_interval > 1) {
n_samples = std::max(n_samples / hb_interval, 1);
}
double loadavg;
if (getloadavg(&loadavg, 1) == 1) {
daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavg) / n_samples;
return 100 * loadavg;
}
return std::nullopt; // getloadavg() failed
}
bool OsdScrub::LoadTracker::scrub_load_below_threshold() const
{
double loadavgs[3];
if (getloadavg(loadavgs, 3) != 3) {
dout(10) << "couldn't read loadavgs" << dendl;
return false;
}
// allow scrub if below configured threshold
long cpus = sysconf(_SC_NPROCESSORS_ONLN);
double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0];
if (loadavg_per_cpu < conf->osd_scrub_load_threshold) {
dout(20) << fmt::format(
"loadavg per cpu {:.3f} < max {:.3f} = yes",
loadavg_per_cpu, conf->osd_scrub_load_threshold)
<< dendl;
return true;
}
// allow scrub if below daily avg and currently decreasing
if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) {
dout(20) << fmt::format(
"loadavg {:.3f} < daily_loadavg {:.3f} and < 15m avg "
"{:.3f} = yes",
loadavgs[0], daily_loadavg, loadavgs[2])
<< dendl;
return true;
}
dout(10) << fmt::format(
"loadavg {:.3f} >= max {:.3f} and ( >= daily_loadavg {:.3f} "
"or >= 15m avg {:.3f} ) = no",
loadavgs[0], conf->osd_scrub_load_threshold, daily_loadavg,
loadavgs[2])
<< dendl;
return false;
}
std::ostream& OsdScrub::LoadTracker::gen_prefix(
std::ostream& out,
std::string_view fn) const
{
return out << log_prefix << fn << ": ";
}
std::optional<double> OsdScrub::update_load_average()
{
return m_load_tracker.update_load_average();
}
// ////////////////////////////////////////////////////////////////////////// //
// checks for half-closed ranges. Modify the (p<till)to '<=' to check for
// closed.
static inline bool isbetween_modulo(int64_t from, int64_t till, int p)
{
// the 1st condition is because we have defined from==till as "always true"
return (till == from) || ((till >= from) ^ (p >= from) ^ (p < till));
}
bool OsdScrub::scrub_time_permit(utime_t now) const
{
const time_t tt = now.sec();
tm bdt;
localtime_r(&tt, &bdt);
bool day_permits = isbetween_modulo(
conf->osd_scrub_begin_week_day, conf->osd_scrub_end_week_day,
bdt.tm_wday);
if (!day_permits) {
dout(20) << fmt::format(
"should run between week day {} - {} now {} - no",
conf->osd_scrub_begin_week_day,
conf->osd_scrub_end_week_day, bdt.tm_wday)
<< dendl;
return false;
}
bool time_permits = isbetween_modulo(
conf->osd_scrub_begin_hour, conf->osd_scrub_end_hour, bdt.tm_hour);
dout(20) << fmt::format(
"should run between {} - {} now {} = {}",
conf->osd_scrub_begin_hour, conf->osd_scrub_end_hour,
bdt.tm_hour, (time_permits ? "yes" : "no"))
<< dendl;
return time_permits;
}
std::chrono::milliseconds OsdScrub::scrub_sleep_time(
utime_t t,
bool high_priority_scrub) const
{
const milliseconds regular_sleep_period =
milliseconds{int64_t(std::max(0.0, 1'000 * conf->osd_scrub_sleep))};
if (high_priority_scrub || scrub_time_permit(t)) {
return regular_sleep_period;
}
// relevant if scrubbing started during allowed time, but continued into
// forbidden hours
const milliseconds extended_sleep =
milliseconds{int64_t(1'000 * conf->osd_scrub_extended_sleep)};
dout(20) << fmt::format(
"scrubbing started during allowed time, but continued into "
"forbidden hours. regular_sleep_period {} extended_sleep {}",
regular_sleep_period, extended_sleep)
<< dendl;
return std::max(extended_sleep, regular_sleep_period);
}
// ////////////////////////////////////////////////////////////////////////// //
// forwarders to the queue
Scrub::sched_params_t OsdScrub::determine_scrub_time(
const requested_scrub_t& request_flags,
const pg_info_t& pg_info,
const pool_opts_t& pool_conf) const
{
return m_queue.determine_scrub_time(request_flags, pg_info, pool_conf);
}
void OsdScrub::update_job(
Scrub::ScrubJobRef sjob,
const Scrub::sched_params_t& suggested)
{
m_queue.update_job(sjob, suggested);
}
void OsdScrub::register_with_osd(
Scrub::ScrubJobRef sjob,
const Scrub::sched_params_t& suggested)
{
m_queue.register_with_osd(sjob, suggested);
}
void OsdScrub::remove_from_osd_queue(Scrub::ScrubJobRef sjob)
{
m_queue.remove_from_osd_queue(sjob);
}
bool OsdScrub::inc_scrubs_local()
{
return m_resource_bookkeeper.inc_scrubs_local();
}
void OsdScrub::dec_scrubs_local()
{
m_resource_bookkeeper.dec_scrubs_local();
}
bool OsdScrub::inc_scrubs_remote()
{
return m_resource_bookkeeper.inc_scrubs_remote();
}
void OsdScrub::dec_scrubs_remote()
{
m_resource_bookkeeper.dec_scrubs_remote();
}
void OsdScrub::mark_pg_scrub_blocked(spg_t blocked_pg)
{
m_queue.mark_pg_scrub_blocked(blocked_pg);
}
void OsdScrub::clear_pg_scrub_blocked(spg_t blocked_pg)
{
m_queue.clear_pg_scrub_blocked(blocked_pg);
}
int OsdScrub::get_blocked_pgs_count() const
{
return m_queue.get_blocked_pgs_count();
}
bool OsdScrub::set_reserving_now()
{
return m_queue.set_reserving_now();
}
void OsdScrub::clear_reserving_now()
{
m_queue.clear_reserving_now();
}

View File

@ -0,0 +1,243 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <string_view>
#include "osd/osd_types_fmt.h"
#include "osd/scrubber/osd_scrub_sched.h"
#include "osd/scrubber/scrub_resources.h"
#include "osd/scrubber_common.h"
/**
* Off-loading scrubbing initiation logic from the OSD.
* Also here: CPU load as pertaining to scrubs (TBD), and the scrub
* resource counters.
*
* Locking:
* (as of this first step in the scheduler refactoring)
* - No protected data is maintained directly by the OsdScrub object
* (as it is not yet protected by any single OSDservice lock).
*/
class OsdScrub {
public:
OsdScrub(
CephContext* cct,
Scrub::ScrubSchedListener& osd_svc,
const ceph::common::ConfigProxy& config);
~OsdScrub() = default;
// note: public, as accessed by the dout macros
std::ostream& gen_prefix(std::ostream& out, std::string_view fn) const;
/**
* called periodically by the OSD to select the first scrub-eligible PG
* and scrub it.
*/
void initiate_scrub(bool active_recovery);
/**
* logs a string at log level 20, using OsdScrub's prefix.
* An aux function to be used by sub-objects.
*/
void log_fwd(std::string_view text);
const Scrub::ScrubResources& resource_bookkeeper() const
{
return m_resource_bookkeeper;
}
void dump_scrubs(ceph::Formatter* f) const; ///< fwd to the queue
/**
* on_config_change() (the refactored "OSD::sched_all_scrubs()")
*
* for each PG registered with the OSD (i.e. - for which we are the primary):
* lock that PG, and call its on_scrub_schedule_input_change() method
* to handle a possible change in one of the configuration parameters
* that affect scrub scheduling.
*/
void on_config_change();
// implementing the PGs interface to the scrub scheduling objects
// ---------------------------------------------------------------
// updating the resource counters
bool inc_scrubs_local();
void dec_scrubs_local();
bool inc_scrubs_remote();
void dec_scrubs_remote();
// counting the number of PGs stuck while scrubbing, waiting for objects
void mark_pg_scrub_blocked(spg_t blocked_pg);
void clear_pg_scrub_blocked(spg_t blocked_pg);
// updating scheduling information for a specific PG
Scrub::sched_params_t determine_scrub_time(
const requested_scrub_t& request_flags,
const pg_info_t& pg_info,
const pool_opts_t& pool_conf) const;
/**
* modify a scrub-job's scheduled time and deadline
*
* There are 3 argument combinations to consider:
* - 'must' is asserted, and the suggested time is 'scrub_must_stamp':
* the registration will be with "beginning of time" target, making the
* scrub-job eligible to immediate scrub (given that external conditions
* do not prevent scrubbing)
*
* - 'must' is asserted, and the suggested time is 'now':
* This happens if our stats are unknown. The results are similar to the
* previous scenario.
*
* - not a 'must': we take the suggested time as a basis, and add to it some
* configuration / random delays.
*
* ('must' is Scrub::sched_params_t.is_must)
*
* locking: not using the jobs_lock
*/
void update_job(
Scrub::ScrubJobRef sjob,
const Scrub::sched_params_t& suggested);
/**
* Add the scrub job to the list of jobs (i.e. list of PGs) to be periodically
* scrubbed by the OSD.
* The registration is active as long as the PG exists and the OSD is its
* primary.
*
* See update_job() for the handling of the 'suggested' parameter.
*
* locking: might lock jobs_lock
*/
void register_with_osd(
Scrub::ScrubJobRef sjob,
const Scrub::sched_params_t& suggested);
/**
* remove the pg from set of PGs to be scanned for scrubbing.
* To be used if we are no longer the PG's primary, or if the PG is removed.
*/
void remove_from_osd_queue(Scrub::ScrubJobRef sjob);
/**
* \returns std::chrono::milliseconds indicating how long to wait between
* chunks.
*
* Implementation Note: Returned value is either osd_scrub_sleep or
* osd_scrub_extended_sleep, depending on must_scrub_param and time
* of day (see configs osd_scrub_begin*)
*/
std::chrono::milliseconds scrub_sleep_time(
utime_t t,
bool high_priority_scrub) const;
/**
* No new scrub session will start while a scrub was initiated on a PG,
* and that PG is trying to acquire replica resources.
* \retval false if the flag was already set (due to a race)
*/
bool set_reserving_now();
void clear_reserving_now();
/**
* \returns true if the current time is within the scrub time window
*/
[[nodiscard]] bool scrub_time_permit(utime_t t) const;
/**
* An external interface into the LoadTracker object. Used by
* the OSD tick to update the load data in the logger.
*
* \returns 100*(the decaying (running) average of the CPU load
* over the last 24 hours) or nullopt if the load is not
* available.
* Note that the multiplication by 100 is required by the logger interface
*/
std::optional<double> update_load_average();
private:
CephContext* cct;
Scrub::ScrubSchedListener& m_osd_svc;
const ceph::common::ConfigProxy& conf;
/**
* check the OSD-wide environment conditions (scrub resources, time, etc.).
* These may restrict the type of scrubs we are allowed to start, or just
* prevent us from starting any scrub at all.
*
* Specifically:
* a nullopt is returned if we are not allowed to scrub at all, for either of
* the following reasons: no local resources (too many scrubs on this OSD);
* a dice roll says we will not scrub in this tick;
* a recovery is in progress, and we are not allowed to scrub while recovery;
* a PG is trying to acquire replica resources.
*
* If we are allowed to scrub, the returned value specifies whether the only
* high priority scrubs or only overdue ones are allowed to go on.
*/
std::optional<Scrub::OSDRestrictions> restrictions_on_scrubbing(
bool is_recovery_active,
utime_t scrub_clock_now) const;
/**
* initiate a scrub on a specific PG
* The PG is locked, enabling us to query its state. Specifically, we
* verify that the PG is not already scrubbing, and that
* a possible 'allow requested repair only' condition is not in conflict.
*
* \returns a schedule_result_t object, indicating whether the scrub was
* initiated, and if not - why.
*/
Scrub::schedule_result_t initiate_a_scrub(
spg_t pgid,
bool allow_requested_repair_only);
/// resource reservation management
Scrub::ScrubResources m_resource_bookkeeper;
/// the queue of PGs waiting to be scrubbed
ScrubQueue m_queue;
const std::string m_log_prefix{};
/// number of PGs stuck while scrubbing, waiting for objects
int get_blocked_pgs_count() const;
/**
* roll a dice to determine whether we should skip this tick, not trying to
* schedule a new scrub.
* \returns true with probability of osd_scrub_backoff_ratio.
*/
bool scrub_random_backoff() const;
/**
* tracking the average load on the CPU. Used both by the
* OSD logger, and by the scrub queue (as no scrubbing is allowed if
* the load is too high).
*/
class LoadTracker {
CephContext* cct;
const ceph::common::ConfigProxy& conf;
const std::string log_prefix;
double daily_loadavg{0.0};
public:
explicit LoadTracker(
CephContext* cct,
const ceph::common::ConfigProxy& config,
int node_id);
std::optional<double> update_load_average();
[[nodiscard]] bool scrub_load_below_threshold() const;
std::ostream& gen_prefix(std::ostream& out, std::string_view fn) const;
};
LoadTracker m_load_tracker;
};

View File

@ -2,6 +2,7 @@
// vim: ts=8 sw=2 smarttab
#include "./osd_scrub_sched.h"
#include <string_view>
#include "osd/OSD.h"
#include "pg_scrubber.h"
@ -9,112 +10,40 @@
using namespace ::std::chrono;
using namespace ::std::chrono_literals;
using namespace ::std::literals;
using qu_state_t = Scrub::qu_state_t;
using must_scrub_t = Scrub::must_scrub_t;
using ScrubQContainer = Scrub::ScrubQContainer;
using sched_params_t = Scrub::sched_params_t;
using OSDRestrictions = Scrub::OSDRestrictions;
using ScrubJob = Scrub::ScrubJob;
// ////////////////////////////////////////////////////////////////////////// //
// ScrubJob
#define dout_context (cct)
#define dout_subsys ceph_subsys_osd
#undef dout_prefix
#define dout_prefix *_dout << "osd." << whoami << " "
ScrubQueue::ScrubJob::ScrubJob(CephContext* cct, const spg_t& pg, int node_id)
: RefCountedObject{cct}
, pgid{pg}
, whoami{node_id}
, cct{cct}
{}
// debug usage only
ostream& operator<<(ostream& out, const ScrubQueue::ScrubJob& sjob)
{
out << sjob.pgid << ", " << sjob.schedule.scheduled_at
<< " dead: " << sjob.schedule.deadline << " - "
<< sjob.registration_state() << " / failure: " << sjob.resources_failure
<< " / pen. t.o.: " << sjob.penalty_timeout
<< " / queue state: " << ScrubQueue::qu_state_text(sjob.state);
return out;
}
void ScrubQueue::ScrubJob::update_schedule(
const ScrubQueue::scrub_schedule_t& adjusted)
{
schedule = adjusted;
penalty_timeout = utime_t(0, 0); // helps with debugging
// 'updated' is changed here while not holding jobs_lock. That's OK, as
// the (atomic) flag will only be cleared by select_pg_and_scrub() after
// scan_penalized() is called and the job was moved to the to_scrub queue.
updated = true;
dout(10) << fmt::format("{}: pg[{}] adjusted: {:s} ({})", __func__, pgid,
schedule.scheduled_at, registration_state()) << dendl;
}
std::string ScrubQueue::ScrubJob::scheduling_state(utime_t now_is,
bool is_deep_expected) const
{
// if not in the OSD scheduling queues, not a candidate for scrubbing
if (state != qu_state_t::registered) {
return "no scrub is scheduled";
}
// if the time has passed, we are surely in the queue
// (note that for now we do not tell client if 'penalized')
if (now_is > schedule.scheduled_at) {
// we are never sure that the next scrub will indeed be shallow:
return fmt::format("queued for {}scrub", (is_deep_expected ? "deep " : ""));
}
return fmt::format("{}scrub scheduled @ {:s}",
(is_deep_expected ? "deep " : ""),
schedule.scheduled_at);
}
// ////////////////////////////////////////////////////////////////////////// //
// ScrubQueue
#define dout_subsys ceph_subsys_osd
#undef dout_context
#define dout_context (cct)
#undef dout_prefix
#define dout_prefix \
*_dout << "osd." << osd_service.get_nodeid() << " scrub-queue::" << __func__ \
<< " "
#define dout_prefix _prefix_fn(_dout, this, __func__)
template <class T>
static std::ostream& _prefix_fn(std::ostream* _dout, T* t, std::string fn = "")
{
return t->gen_prefix(*_dout, fn);
}
ScrubQueue::ScrubQueue(CephContext* cct, Scrub::ScrubSchedListener& osds)
: cct{cct}
, osd_service{osds}
{}
std::ostream& ScrubQueue::gen_prefix(std::ostream& out, std::string_view fn)
const
{
// initialize the daily loadavg with current 15min loadavg
if (double loadavgs[3]; getloadavg(loadavgs, 3) == 3) {
daily_loadavg = loadavgs[2];
} else {
derr << "OSD::init() : couldn't read loadavgs\n" << dendl;
daily_loadavg = 1.0;
}
}
std::optional<double> ScrubQueue::update_load_average()
{
int hb_interval = conf()->osd_heartbeat_interval;
int n_samples = std::chrono::duration_cast<seconds>(24h).count();
if (hb_interval > 1) {
n_samples /= hb_interval;
if (n_samples < 1)
n_samples = 1;
}
// get CPU load avg
double loadavg;
if (getloadavg(&loadavg, 1) == 1) {
daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavg) / n_samples;
dout(17) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
return 100 * loadavg;
}
return std::nullopt;
return out << fmt::format(
"osd.{} scrub-queue:{}: ", osd_service.get_nodeid(), fn);
}
/*
@ -127,7 +56,7 @@ std::optional<double> ScrubQueue::update_load_average()
*
* Note: not holding the jobs lock
*/
void ScrubQueue::remove_from_osd_queue(ScrubJobRef scrub_job)
void ScrubQueue::remove_from_osd_queue(Scrub::ScrubJobRef scrub_job)
{
dout(15) << "removing pg[" << scrub_job->pgid << "] from OSD scrub queue"
<< dendl;
@ -140,21 +69,21 @@ void ScrubQueue::remove_from_osd_queue(ScrubJobRef scrub_job)
if (ret) {
dout(10) << "pg[" << scrub_job->pgid << "] sched-state changed from "
<< qu_state_text(expected_state) << " to "
<< qu_state_text(scrub_job->state) << dendl;
<< ScrubJob::qu_state_text(expected_state) << " to "
<< ScrubJob::qu_state_text(scrub_job->state) << dendl;
} else {
// job wasn't in state 'registered' coming in
dout(5) << "removing pg[" << scrub_job->pgid
<< "] failed. State was: " << qu_state_text(expected_state)
<< "] failed. State was: " << ScrubJob::qu_state_text(expected_state)
<< dendl;
}
}
void ScrubQueue::register_with_osd(
ScrubJobRef scrub_job,
const ScrubQueue::sched_params_t& suggested)
Scrub::ScrubJobRef scrub_job,
const sched_params_t& suggested)
{
qu_state_t state_at_entry = scrub_job->state.load();
dout(20) << fmt::format(
@ -214,31 +143,31 @@ void ScrubQueue::register_with_osd(
}
// look mommy - no locks!
void ScrubQueue::update_job(ScrubJobRef scrub_job,
const ScrubQueue::sched_params_t& suggested)
void ScrubQueue::update_job(Scrub::ScrubJobRef scrub_job,
const sched_params_t& suggested)
{
// adjust the suggested scrub time according to OSD-wide status
auto adjusted = adjust_target_time(suggested);
scrub_job->update_schedule(adjusted);
}
ScrubQueue::sched_params_t ScrubQueue::determine_scrub_time(
sched_params_t ScrubQueue::determine_scrub_time(
const requested_scrub_t& request_flags,
const pg_info_t& pg_info,
const pool_opts_t& pool_conf) const
{
ScrubQueue::sched_params_t res;
sched_params_t res;
if (request_flags.must_scrub || request_flags.need_auto) {
// Set the smallest time that isn't utime_t()
res.proposed_time = PgScrubber::scrub_must_stamp();
res.is_must = ScrubQueue::must_scrub_t::mandatory;
res.is_must = Scrub::must_scrub_t::mandatory;
// we do not need the interval data in this case
} else if (pg_info.stats.stats_invalid && conf()->osd_scrub_invalid_stats) {
res.proposed_time = time_now();
res.is_must = ScrubQueue::must_scrub_t::mandatory;
res.is_must = Scrub::must_scrub_t::mandatory;
} else {
res.proposed_time = pg_info.history.last_scrub_stamp;
@ -271,7 +200,7 @@ void ScrubQueue::move_failed_pgs(utime_t now_is)
// remote resources. Move it to the secondary scrub queue.
dout(15) << "moving " << sjob->pgid
<< " state: " << ScrubQueue::qu_state_text(sjob->state) << dendl;
<< " state: " << ScrubJob::qu_state_text(sjob->state) << dendl;
// determine the penalty time, after which the job should be reinstated
utime_t after = now_is;
@ -297,62 +226,14 @@ void ScrubQueue::move_failed_pgs(utime_t now_is)
}
}
// clang-format off
/*
* Implementation note:
* Clang (10 & 11) produces here efficient table-based code, comparable to using
* a direct index into an array of strings.
* Gcc (11, trunk) is almost as efficient.
*/
std::string_view ScrubQueue::attempt_res_text(Scrub::schedule_result_t v)
std::vector<ScrubTargetId> ScrubQueue::ready_to_scrub(
OSDRestrictions restrictions, // note: 4B in size! (copy)
utime_t scrub_tick)
{
switch (v) {
case Scrub::schedule_result_t::scrub_initiated: return "scrubbing"sv;
case Scrub::schedule_result_t::none_ready: return "no ready job"sv;
case Scrub::schedule_result_t::no_local_resources: return "local resources shortage"sv;
case Scrub::schedule_result_t::already_started: return "denied as already started"sv;
case Scrub::schedule_result_t::no_such_pg: return "pg not found"sv;
case Scrub::schedule_result_t::bad_pg_state: return "prevented by pg state"sv;
case Scrub::schedule_result_t::preconditions: return "preconditions not met"sv;
}
// g++ (unlike CLANG), requires an extra 'return' here
return "(unknown)"sv;
}
std::string_view ScrubQueue::qu_state_text(qu_state_t st)
{
switch (st) {
case qu_state_t::not_registered: return "not registered w/ OSD"sv;
case qu_state_t::registered: return "registered"sv;
case qu_state_t::unregistering: return "unregistering"sv;
}
// g++ (unlike CLANG), requires an extra 'return' here
return "(unknown)"sv;
}
// clang-format on
/**
* a note regarding 'to_scrub_copy':
* 'to_scrub_copy' is a sorted set of all the ripe jobs from to_copy.
* As we usually expect to refer to only the first job in this set, we could
* consider an alternative implementation:
* - have collect_ripe_jobs() return the copied set without sorting it;
* - loop, performing:
* - use std::min_element() to find a candidate;
* - try that one. If not suitable, discard from 'to_scrub_copy'
*/
Scrub::schedule_result_t ScrubQueue::select_pg_and_scrub(
Scrub::ScrubPreconds& preconds)
{
dout(10) << " reg./pen. sizes: " << to_scrub.size() << " / "
<< penalized.size() << dendl;
utime_t now_is = time_now();
preconds.time_permit = scrub_time_permit(now_is);
preconds.load_is_low = scrub_load_below_threshold();
preconds.only_deadlined = !preconds.time_permit || !preconds.load_is_low;
dout(10) << fmt::format(
" @{:s}: reg./pen. sizes: {} / {} ({})", scrub_tick,
to_scrub.size(), penalized.size(), restrictions)
<< dendl;
// create a list of candidates (copying, as otherwise creating a deadlock):
// - possibly restore penalized
// - (if we didn't handle directly) remove invalid jobs
@ -363,41 +244,44 @@ Scrub::schedule_result_t ScrubQueue::select_pg_and_scrub(
std::unique_lock lck{jobs_lock};
// pardon all penalized jobs that have deadlined (or were updated)
scan_penalized(restore_penalized, now_is);
scan_penalized(restore_penalized, scrub_tick);
restore_penalized = false;
// remove the 'updated' flag from all entries
std::for_each(to_scrub.begin(),
to_scrub.end(),
[](const auto& jobref) -> void { jobref->updated = false; });
std::for_each(
to_scrub.begin(), to_scrub.end(),
[](const auto& jobref) -> void { jobref->updated = false; });
// add failed scrub attempts to the penalized list
move_failed_pgs(now_is);
move_failed_pgs(scrub_tick);
// collect all valid & ripe jobs from the two lists. Note that we must copy,
// as when we use the lists we will not be holding jobs_lock (see
// explanation above)
// collect all valid & ripe jobs from the two lists. Note that we must copy,
// as when we use the lists we will not be holding jobs_lock (see
// explanation above)
auto to_scrub_copy = collect_ripe_jobs(to_scrub, now_is);
auto penalized_copy = collect_ripe_jobs(penalized, now_is);
// and in this step 1 of the refactoring (Aug 2023): the set returned must be
// transformed into a vector of targets (which, in this phase, are
// the PG id-s).
auto to_scrub_copy = collect_ripe_jobs(to_scrub, restrictions, scrub_tick);
auto penalized_copy = collect_ripe_jobs(penalized, restrictions, scrub_tick);
lck.unlock();
// try the regular queue first
auto res = select_from_group(to_scrub_copy, preconds, now_is);
// in the sole scenario in which we've gone over all ripe jobs without success
// - we will try the penalized
if (res == Scrub::schedule_result_t::none_ready && !penalized_copy.empty()) {
res = select_from_group(penalized_copy, preconds, now_is);
dout(10) << "tried the penalized. Res: "
<< ScrubQueue::attempt_res_text(res) << dendl;
restore_penalized = true;
}
dout(15) << dendl;
return res;
std::vector<ScrubTargetId> all_ready;
std::transform(
to_scrub_copy.cbegin(), to_scrub_copy.cend(),
std::back_inserter(all_ready),
[](const auto& jobref) -> ScrubTargetId { return jobref->pgid; });
// not bothering to handle the "reached the penalized - so all should be
// forgiven" case, as the penalty queue is destined to be removed in a
// followup PR.
std::transform(
penalized_copy.cbegin(), penalized_copy.cend(),
std::back_inserter(all_ready),
[](const auto& jobref) -> ScrubTargetId { return jobref->pgid; });
return all_ready;
}
// must be called under lock
void ScrubQueue::rm_unregistered_jobs(ScrubQContainer& group)
{
@ -416,8 +300,8 @@ void ScrubQueue::rm_unregistered_jobs(ScrubQContainer& group)
namespace {
struct cmp_sched_time_t {
bool operator()(const ScrubQueue::ScrubJobRef& lhs,
const ScrubQueue::ScrubJobRef& rhs) const
bool operator()(const Scrub::ScrubJobRef& lhs,
const Scrub::ScrubJobRef& rhs) const
{
return lhs->schedule.scheduled_at < rhs->schedule.scheduled_at;
}
@ -425,29 +309,33 @@ struct cmp_sched_time_t {
} // namespace
// called under lock
ScrubQueue::ScrubQContainer ScrubQueue::collect_ripe_jobs(
ScrubQContainer& group,
utime_t time_now)
ScrubQContainer ScrubQueue::collect_ripe_jobs(
ScrubQContainer& group,
OSDRestrictions restrictions,
utime_t time_now)
{
rm_unregistered_jobs(group);
auto filtr = [time_now, restrictions](const auto& jobref) -> bool {
return jobref->schedule.scheduled_at <= time_now &&
(!restrictions.only_deadlined ||
(!jobref->schedule.deadline.is_zero() &&
jobref->schedule.deadline <= time_now));
};
// copy ripe jobs
ScrubQueue::ScrubQContainer ripes;
rm_unregistered_jobs(group);
// copy ripe jobs (unless prohibited by 'restrictions')
ScrubQContainer ripes;
ripes.reserve(group.size());
std::copy_if(group.begin(),
group.end(),
std::back_inserter(ripes),
[time_now](const auto& jobref) -> bool {
return jobref->schedule.scheduled_at <= time_now;
});
std::copy_if(group.begin(), group.end(), std::back_inserter(ripes), filtr);
std::sort(ripes.begin(), ripes.end(), cmp_sched_time_t{});
if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
for (const auto& jobref : group) {
if (jobref->schedule.scheduled_at > time_now) {
dout(20) << " not ripe: " << jobref->pgid << " @ "
<< jobref->schedule.scheduled_at << dendl;
if (!filtr(jobref)) {
dout(20) << fmt::format(
" not ripe: {} @ {:s}", jobref->pgid,
jobref->schedule.scheduled_at)
<< dendl;
}
}
}
@ -455,77 +343,14 @@ ScrubQueue::ScrubQContainer ScrubQueue::collect_ripe_jobs(
return ripes;
}
// not holding jobs_lock. 'group' is a copy of the actual list.
Scrub::schedule_result_t ScrubQueue::select_from_group(
ScrubQContainer& group,
const Scrub::ScrubPreconds& preconds,
utime_t now_is)
{
dout(15) << "jobs #: " << group.size() << dendl;
for (auto& candidate : group) {
// we expect the first job in the list to be a good candidate (if any)
dout(20) << "try initiating scrub for " << candidate->pgid << dendl;
if (preconds.only_deadlined && (candidate->schedule.deadline.is_zero() ||
candidate->schedule.deadline >= now_is)) {
dout(15) << " not scheduling scrub for " << candidate->pgid << " due to "
<< (preconds.time_permit ? "high load" : "time not permitting")
<< dendl;
continue;
}
// we have a candidate to scrub. We turn to the OSD to verify that the PG
// configuration allows the specified type of scrub, and to initiate the
// scrub.
switch (
osd_service.initiate_a_scrub(candidate->pgid,
preconds.allow_requested_repair_only)) {
case Scrub::schedule_result_t::scrub_initiated:
// the happy path. We are done
dout(20) << " initiated for " << candidate->pgid << dendl;
return Scrub::schedule_result_t::scrub_initiated;
case Scrub::schedule_result_t::already_started:
case Scrub::schedule_result_t::preconditions:
case Scrub::schedule_result_t::bad_pg_state:
// continue with the next job
dout(20) << "failed (state/cond/started) " << candidate->pgid << dendl;
break;
case Scrub::schedule_result_t::no_such_pg:
// The pg is no longer there
dout(20) << "failed (no pg) " << candidate->pgid << dendl;
break;
case Scrub::schedule_result_t::no_local_resources:
// failure to secure local resources. No point in trying the other
// PGs at this time. Note that this is not the same as replica resources
// failure!
dout(20) << "failed (local) " << candidate->pgid << dendl;
return Scrub::schedule_result_t::no_local_resources;
case Scrub::schedule_result_t::none_ready:
// can't happen. Just for the compiler.
dout(5) << "failed !!! " << candidate->pgid << dendl;
return Scrub::schedule_result_t::none_ready;
}
}
dout(20) << " returning 'none ready'" << dendl;
return Scrub::schedule_result_t::none_ready;
}
ScrubQueue::scrub_schedule_t ScrubQueue::adjust_target_time(
Scrub::scrub_schedule_t ScrubQueue::adjust_target_time(
const sched_params_t& times) const
{
ScrubQueue::scrub_schedule_t sched_n_dead{
Scrub::scrub_schedule_t sched_n_dead{
times.proposed_time, times.proposed_time};
if (times.is_must == ScrubQueue::must_scrub_t::not_mandatory) {
if (times.is_must == Scrub::must_scrub_t::not_mandatory) {
// unless explicitly requested, postpone the scrub with a random delay
double scrub_min_interval = times.min_interval > 0
? times.min_interval
@ -560,56 +385,6 @@ ScrubQueue::scrub_schedule_t ScrubQueue::adjust_target_time(
return sched_n_dead;
}
std::chrono::milliseconds ScrubQueue::scrub_sleep_time(bool must_scrub) const
{
std::chrono::milliseconds regular_sleep_period{
uint64_t(std::max(0.0, conf()->osd_scrub_sleep) * 1000)};
if (must_scrub || scrub_time_permit(time_now())) {
return regular_sleep_period;
}
// relevant if scrubbing started during allowed time, but continued into
// forbidden hours
std::chrono::milliseconds extended_sleep{
uint64_t(std::max(0.0, conf()->osd_scrub_extended_sleep) * 1000)};
dout(20) << "w/ extended sleep (" << extended_sleep << ")" << dendl;
return std::max(extended_sleep, regular_sleep_period);
}
bool ScrubQueue::scrub_load_below_threshold() const
{
double loadavgs[3];
if (getloadavg(loadavgs, 3) != 3) {
dout(10) << __func__ << " couldn't read loadavgs\n" << dendl;
return false;
}
// allow scrub if below configured threshold
long cpus = sysconf(_SC_NPROCESSORS_ONLN);
double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0];
if (loadavg_per_cpu < conf()->osd_scrub_load_threshold) {
dout(20) << "loadavg per cpu " << loadavg_per_cpu << " < max "
<< conf()->osd_scrub_load_threshold << " = yes" << dendl;
return true;
}
// allow scrub if below daily avg and currently decreasing
if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) {
dout(20) << "loadavg " << loadavgs[0] << " < daily_loadavg "
<< daily_loadavg << " and < 15m avg " << loadavgs[2] << " = yes"
<< dendl;
return true;
}
dout(20) << "loadavg " << loadavgs[0] << " >= max "
<< conf()->osd_scrub_load_threshold << " and ( >= daily_loadavg "
<< daily_loadavg << " or >= 15m avg " << loadavgs[2] << ") = no"
<< dendl;
return false;
}
// note: called with jobs_lock held
void ScrubQueue::scan_penalized(bool forgive_all, utime_t time_now)
@ -641,51 +416,6 @@ void ScrubQueue::scan_penalized(bool forgive_all, utime_t time_now)
}
}
// checks for half-closed ranges. Modify the (p<till)to '<=' to check for
// closed.
static inline bool isbetween_modulo(int64_t from, int64_t till, int p)
{
// the 1st condition is because we have defined from==till as "always true"
return (till == from) || ((till >= from) ^ (p >= from) ^ (p < till));
}
bool ScrubQueue::scrub_time_permit(utime_t now) const
{
tm bdt;
time_t tt = now.sec();
localtime_r(&tt, &bdt);
bool day_permit = isbetween_modulo(conf()->osd_scrub_begin_week_day,
conf()->osd_scrub_end_week_day,
bdt.tm_wday);
if (!day_permit) {
dout(20) << "should run between week day "
<< conf()->osd_scrub_begin_week_day << " - "
<< conf()->osd_scrub_end_week_day << " now " << bdt.tm_wday
<< " - no" << dendl;
return false;
}
bool time_permit = isbetween_modulo(conf()->osd_scrub_begin_hour,
conf()->osd_scrub_end_hour,
bdt.tm_hour);
dout(20) << "should run between " << conf()->osd_scrub_begin_hour << " - "
<< conf()->osd_scrub_end_hour << " now (" << bdt.tm_hour
<< ") = " << (time_permit ? "yes" : "no") << dendl;
return time_permit;
}
void ScrubQueue::ScrubJob::dump(ceph::Formatter* f) const
{
f->open_object_section("scrub");
f->dump_stream("pgid") << pgid;
f->dump_stream("sched_time") << schedule.scheduled_at;
f->dump_stream("deadline") << schedule.deadline;
f->dump_bool("forced",
schedule.scheduled_at == PgScrubber::scrub_must_stamp());
f->close_section();
}
void ScrubQueue::dump_scrubs(ceph::Formatter* f) const
{
ceph_assert(f != nullptr);
@ -693,20 +423,20 @@ void ScrubQueue::dump_scrubs(ceph::Formatter* f) const
f->open_array_section("scrubs");
std::for_each(to_scrub.cbegin(), to_scrub.cend(), [&f](const ScrubJobRef& j) {
j->dump(f);
});
std::for_each(
to_scrub.cbegin(), to_scrub.cend(),
[&f](const Scrub::ScrubJobRef& j) { j->dump(f); });
std::for_each(penalized.cbegin(),
penalized.cend(),
[&f](const ScrubJobRef& j) { j->dump(f); });
std::for_each(
penalized.cbegin(), penalized.cend(),
[&f](const Scrub::ScrubJobRef& j) { j->dump(f); });
f->close_section();
}
ScrubQueue::ScrubQContainer ScrubQueue::list_registered_jobs() const
ScrubQContainer ScrubQueue::list_registered_jobs() const
{
ScrubQueue::ScrubQContainer all_jobs;
ScrubQContainer all_jobs;
all_jobs.reserve(to_scrub.size() + penalized.size());
dout(20) << " size: " << all_jobs.capacity() << dendl;
@ -725,82 +455,7 @@ ScrubQueue::ScrubQContainer ScrubQueue::list_registered_jobs() const
}
// ////////////////////////////////////////////////////////////////////////// //
// ScrubQueue - scrub resource management
bool ScrubQueue::can_inc_scrubs() const
{
// consider removing the lock here. Caller already handles delayed
// inc_scrubs_local() failures
std::lock_guard lck{resource_lock};
if (scrubs_local + scrubs_remote < conf()->osd_max_scrubs) {
return true;
}
dout(20) << " == false. " << scrubs_local << " local + " << scrubs_remote
<< " remote >= max " << conf()->osd_max_scrubs << dendl;
return false;
}
bool ScrubQueue::inc_scrubs_local()
{
std::lock_guard lck{resource_lock};
if (scrubs_local + scrubs_remote < conf()->osd_max_scrubs) {
++scrubs_local;
return true;
}
dout(20) << ": " << scrubs_local << " local + " << scrubs_remote
<< " remote >= max " << conf()->osd_max_scrubs << dendl;
return false;
}
void ScrubQueue::dec_scrubs_local()
{
std::lock_guard lck{resource_lock};
dout(20) << ": " << scrubs_local << " -> " << (scrubs_local - 1) << " (max "
<< conf()->osd_max_scrubs << ", remote " << scrubs_remote << ")"
<< dendl;
--scrubs_local;
ceph_assert(scrubs_local >= 0);
}
bool ScrubQueue::inc_scrubs_remote()
{
std::lock_guard lck{resource_lock};
if (scrubs_local + scrubs_remote < conf()->osd_max_scrubs) {
dout(20) << ": " << scrubs_remote << " -> " << (scrubs_remote + 1)
<< " (max " << conf()->osd_max_scrubs << ", local "
<< scrubs_local << ")" << dendl;
++scrubs_remote;
return true;
}
dout(20) << ": " << scrubs_local << " local + " << scrubs_remote
<< " remote >= max " << conf()->osd_max_scrubs << dendl;
return false;
}
void ScrubQueue::dec_scrubs_remote()
{
std::lock_guard lck{resource_lock};
dout(20) << ": " << scrubs_remote << " -> " << (scrubs_remote - 1) << " (max "
<< conf()->osd_max_scrubs << ", local " << scrubs_local << ")"
<< dendl;
--scrubs_remote;
ceph_assert(scrubs_remote >= 0);
}
void ScrubQueue::dump_scrub_reservations(ceph::Formatter* f) const
{
std::lock_guard lck{resource_lock};
f->dump_int("scrubs_local", scrubs_local);
f->dump_int("scrubs_remote", scrubs_remote);
f->dump_int("osd_max_scrubs", conf()->osd_max_scrubs);
}
// ScrubQueue - maintaining the 'blocked on a locked object' count
void ScrubQueue::clear_pg_scrub_blocked(spg_t blocked_pg)
{
@ -820,3 +475,22 @@ int ScrubQueue::get_blocked_pgs_count() const
{
return blocked_scrubs_cnt;
}
// ////////////////////////////////////////////////////////////////////////// //
// ScrubQueue - maintaining the 'some PG is reserving' flag
bool ScrubQueue::set_reserving_now()
{
auto was_set = a_pg_is_reserving.exchange(true);
return !was_set;
}
void ScrubQueue::clear_reserving_now()
{
a_pg_is_reserving = false;
}
bool ScrubQueue::is_reserving_now() const
{
return a_pg_is_reserving;
}

View File

@ -6,9 +6,13 @@
/*
OSD
OSDService
OSDService
OsdScrub
Ownes & uses the following
ScrubQueue interfaces:
@ -21,9 +25,6 @@
ScrubQueue
@ -107,35 +108,20 @@ ScrubQueue interfaces (main functions):
*/
// clang-format on
#include <atomic>
#include <chrono>
#include <memory>
#include <optional>
#include <vector>
#include "common/RefCountedObj.h"
#include "common/ceph_atomic.h"
#include "osd/osd_types.h"
#include "osd/scrubber_common.h"
#include "include/utime_fmt.h"
#include "osd/osd_types_fmt.h"
#include "utime.h"
class PG;
#include "osd/scrubber/scrub_job.h"
#include "osd/PG.h"
namespace Scrub {
using namespace ::std::literals;
// possible outcome when trying to select a PG and scrub it
/// possible outcome when trying to select a PG and scrub it
enum class schedule_result_t {
scrub_initiated, // successfully started a scrub
none_ready, // no pg to scrub
no_local_resources, // failure to secure local OSD scrub resource
already_started, // failed, as already started scrubbing this pg
no_such_pg, // can't find this pg
bad_pg_state, // pg state (clean, active, etc.)
preconditions // time, configuration, etc.
scrub_initiated, // successfully started a scrub
target_specific_failure, // failed to scrub this specific target
osd_wide_failure // failed to scrub any target
};
// the OSD services provided to the scrub scheduler
@ -144,25 +130,18 @@ class ScrubSchedListener {
virtual int get_nodeid() const = 0; // returns the OSD number ('whoami')
/**
* A callback used by the ScrubQueue object to initiate a scrub on a specific
* PG.
*
* The request might fail for multiple reasons, as ScrubQueue cannot by its
* own check some of the PG-specific preconditions and those are checked here.
* See attempt_t definition.
*
* @return a Scrub::attempt_t detailing either a success, or the failure
* reason.
* locks the named PG, returning an RAII wrapper that unlocks upon
* destruction.
* returns nullopt if failing to lock.
*/
virtual schedule_result_t initiate_a_scrub(
spg_t pgid,
bool allow_requested_repair_only) = 0;
virtual std::optional<PGLockWrapper> get_locked_pg(spg_t pgid) = 0;
virtual ~ScrubSchedListener() {}
};
} // namespace Scrub
/**
* the queue of PGs waiting to be scrubbed.
* Main operations are scheduling/unscheduling a PG to be scrubbed at a certain
@ -175,163 +154,39 @@ class ScrubSchedListener {
*/
class ScrubQueue {
public:
enum class must_scrub_t { not_mandatory, mandatory };
enum class qu_state_t {
not_registered, // not a primary, thus not considered for scrubbing by this
// OSD (also the temporary state when just created)
registered, // in either of the two queues ('to_scrub' or 'penalized')
unregistering // in the process of being unregistered. Will be finalized
// under lock
};
ScrubQueue(CephContext* cct, Scrub::ScrubSchedListener& osds);
virtual ~ScrubQueue() = default;
struct scrub_schedule_t {
utime_t scheduled_at{};
utime_t deadline{0, 0};
};
struct sched_params_t {
utime_t proposed_time{};
double min_interval{0.0};
double max_interval{0.0};
must_scrub_t is_must{ScrubQueue::must_scrub_t::not_mandatory};
};
struct ScrubJob final : public RefCountedObject {
/**
* a time scheduled for scrub, and a deadline: The scrub could be delayed
* if system load is too high (but not if after the deadline),or if trying
* to scrub out of scrub hours.
*/
scrub_schedule_t schedule;
/// pg to be scrubbed
const spg_t pgid;
/// the OSD id (for the log)
const int whoami;
ceph::atomic<qu_state_t> state{qu_state_t::not_registered};
/**
* the old 'is_registered'. Set whenever the job is registered with the OSD,
* i.e. is in either the 'to_scrub' or the 'penalized' vectors.
*/
std::atomic_bool in_queues{false};
/// last scrub attempt failed to secure replica resources
bool resources_failure{false};
/**
* 'updated' is a temporary flag, used to create a barrier after
* 'sched_time' and 'deadline' (or any other job entry) were modified by
* different task.
* 'updated' also signals the need to move a job back from the penalized
* queue to the regular one.
*/
std::atomic_bool updated{false};
/**
* the scrubber is waiting for locked objects to be unlocked.
* Set after a grace period has passed.
*/
bool blocked{false};
utime_t blocked_since{};
utime_t penalty_timeout{0, 0};
CephContext* cct;
ScrubJob(CephContext* cct, const spg_t& pg, int node_id);
utime_t get_sched_time() const { return schedule.scheduled_at; }
/**
* relatively low-cost(*) access to the scrub job's state, to be used in
* logging.
* (*) not a low-cost access on x64 architecture
*/
std::string_view state_desc() const
{
return ScrubQueue::qu_state_text(state.load(std::memory_order_relaxed));
}
void update_schedule(const ScrubQueue::scrub_schedule_t& adjusted);
void dump(ceph::Formatter* f) const;
/*
* as the atomic 'in_queues' appears in many log prints, accessing it for
* display-only should be made less expensive (on ARM. On x86 the _relaxed
* produces the same code as '_cs')
*/
std::string_view registration_state() const
{
return in_queues.load(std::memory_order_relaxed) ? "in-queue"
: "not-queued";
}
/**
* access the 'state' directly, for when a distinction between 'registered'
* and 'unregistering' is needed (both have in_queues() == true)
*/
bool is_state_registered() const { return state == qu_state_t::registered; }
/**
* a text description of the "scheduling intentions" of this PG:
* are we already scheduled for a scrub/deep scrub? when?
*/
std::string scheduling_state(utime_t now_is, bool is_deep_expected) const;
friend std::ostream& operator<<(std::ostream& out, const ScrubJob& pg);
};
friend class TestOSDScrub;
friend class ScrubSchedTestWrapper; ///< unit-tests structure
using ScrubJobRef = ceph::ref_t<ScrubJob>;
using ScrubQContainer = std::vector<ScrubJobRef>;
static std::string_view qu_state_text(qu_state_t st);
using sched_params_t = Scrub::sched_params_t;
/**
* called periodically by the OSD to select the first scrub-eligible PG
* and scrub it.
* returns the list of all scrub targets that are ready to be scrubbed.
* Note that the following changes are expected in the near future (as part
* of the scheduling refactoring):
* - only one target will be requested by the OsdScrub (the OSD's sub-object
* that initiates scrubs);
* - that target would name a PG X scrub type;
*
* Selection is affected by:
* - time of day: scheduled scrubbing might be configured to only happen
* during certain hours;
* - same for days of the week, and for the system load;
*
* @param preconds: what types of scrub are allowed, given system status &
* config. Some of the preconditions are calculated here.
* @return Scrub::attempt_t::scrubbing if a scrub session was successfully
* initiated. Otherwise - the failure cause.
*
* locking: locks jobs_lock
* @param restrictions: what types of scrub are allowed, given system status
* & config. Some of the preconditions are calculated here.
*/
Scrub::schedule_result_t select_pg_and_scrub(Scrub::ScrubPreconds& preconds);
/**
* Translate attempt_ values into readable text
*/
static std::string_view attempt_res_text(Scrub::schedule_result_t v);
std::vector<ScrubTargetId> ready_to_scrub(
Scrub::OSDRestrictions restrictions, // 4B! copy
utime_t scrub_tick);
/**
* remove the pg from set of PGs to be scanned for scrubbing.
* To be used if we are no longer the PG's primary, or if the PG is removed.
*/
void remove_from_osd_queue(ScrubJobRef sjob);
void remove_from_osd_queue(Scrub::ScrubJobRef sjob);
/**
* @return the list (not std::set!) of all scrub jobs registered
* (apart from PGs in the process of being removed)
*/
ScrubQContainer list_registered_jobs() const;
Scrub::ScrubQContainer list_registered_jobs() const;
/**
* Add the scrub job to the list of jobs (i.e. list of PGs) to be periodically
@ -343,7 +198,7 @@ class ScrubQueue {
*
* locking: might lock jobs_lock
*/
void register_with_osd(ScrubJobRef sjob, const sched_params_t& suggested);
void register_with_osd(Scrub::ScrubJobRef sjob, const sched_params_t& suggested);
/**
* modify a scrub-job's scheduled time and deadline
@ -365,54 +220,41 @@ class ScrubQueue {
*
* locking: not using the jobs_lock
*/
void update_job(ScrubJobRef sjob, const sched_params_t& suggested);
void update_job(Scrub::ScrubJobRef sjob, const sched_params_t& suggested);
sched_params_t determine_scrub_time(const requested_scrub_t& request_flags,
const pg_info_t& pg_info,
const pool_opts_t& pool_conf) const;
std::ostream& gen_prefix(std::ostream& out, std::string_view fn) const;
public:
void dump_scrubs(ceph::Formatter* f) const;
/**
* No new scrub session will start while a scrub was initiated on a PG,
* and that PG is trying to acquire replica resources.
*
* \todo replace the atomic bool with a regular bool protected by a
* common OSD-service lock. Or better still - once PR#53263 is merged,
* remove this flag altogether.
*/
void set_reserving_now() { a_pg_is_reserving = true; }
void clear_reserving_now() { a_pg_is_reserving = false; }
bool is_reserving_now() const { return a_pg_is_reserving; }
bool can_inc_scrubs() const;
bool inc_scrubs_local();
void dec_scrubs_local();
bool inc_scrubs_remote();
void dec_scrubs_remote();
void dump_scrub_reservations(ceph::Formatter* f) const;
/**
* set_reserving_now()
* \returns 'false' if the flag was already set
* (which is a possible result of a race between the check in OsdScrub and
* the initiation of a scrub by some other PG)
*/
bool set_reserving_now();
void clear_reserving_now();
bool is_reserving_now() const;
/// counting the number of PGs stuck while scrubbing, waiting for objects
void mark_pg_scrub_blocked(spg_t blocked_pg);
void clear_pg_scrub_blocked(spg_t blocked_pg);
int get_blocked_pgs_count() const;
/**
* scrub_sleep_time
*
* Returns std::chrono::milliseconds indicating how long to wait between
* chunks.
*
* Implementation Note: Returned value will either osd_scrub_sleep or
* osd_scrub_extended_sleep depending on must_scrub_param and time
* of day (see configs osd_scrub_begin*)
*/
std::chrono::milliseconds scrub_sleep_time(bool must_scrub) const;
/**
* called every heartbeat to update the "daily" load average
*
* @returns a load value for the logger
*/
[[nodiscard]] std::optional<double> update_load_average();
private:
CephContext* cct;
Scrub::ScrubSchedListener& osd_service;
@ -434,18 +276,16 @@ class ScrubQueue {
*/
mutable ceph::mutex jobs_lock = ceph::make_mutex("ScrubQueue::jobs_lock");
ScrubQContainer to_scrub; ///< scrub jobs (i.e. PGs) to scrub
ScrubQContainer penalized; ///< those that failed to reserve remote resources
Scrub::ScrubQContainer to_scrub; ///< scrub jobs (i.e. PGs) to scrub
Scrub::ScrubQContainer penalized; ///< those that failed to reserve remote resources
bool restore_penalized{false};
double daily_loadavg{0.0};
static inline constexpr auto registered_job = [](const auto& jobref) -> bool {
return jobref->state == qu_state_t::registered;
return jobref->state == Scrub::qu_state_t::registered;
};
static inline constexpr auto invalid_state = [](const auto& jobref) -> bool {
return jobref->state == qu_state_t::not_registered;
return jobref->state == Scrub::qu_state_t::not_registered;
};
/**
@ -457,7 +297,7 @@ class ScrubQueue {
* clear dead entries (unregistered, or belonging to removed PGs) from a
* queue. Job state is changed to match new status.
*/
void rm_unregistered_jobs(ScrubQContainer& group);
void rm_unregistered_jobs(Scrub::ScrubQContainer& group);
/**
* the set of all scrub jobs in 'group' which are ready to be scrubbed
@ -467,17 +307,12 @@ class ScrubQueue {
*
* Note that the returned container holds independent refs to the
* scrub jobs.
* Note also that OSDRestrictions is 1L size, thus copied.
*/
ScrubQContainer collect_ripe_jobs(ScrubQContainer& group, utime_t time_now);
/// scrub resources management lock (guarding scrubs_local & scrubs_remote)
mutable ceph::mutex resource_lock =
ceph::make_mutex("ScrubQueue::resource_lock");
/// the counters used to manage scrub activity parallelism:
int scrubs_local{0};
int scrubs_remote{0};
Scrub::ScrubQContainer collect_ripe_jobs(
Scrub::ScrubQContainer& group,
Scrub::OSDRestrictions restrictions,
utime_t time_now);
/**
* The scrubbing of PGs might be delayed if the scrubbed chunk of objects is
@ -491,11 +326,15 @@ class ScrubQueue {
*/
std::atomic_int_fast16_t blocked_scrubs_cnt{0};
/**
* One of the OSD's primary PGs is in the initial phase of a scrub,
* trying to secure its replicas' resources. We will refrain from initiating
* any other scrub sessions until this one is done.
*
* \todo keep the ID of the reserving PG; possibly also the time it started.
*/
std::atomic_bool a_pg_is_reserving{false};
[[nodiscard]] bool scrub_load_below_threshold() const;
[[nodiscard]] bool scrub_time_permit(utime_t now) const;
/**
* If the scrub job was not explicitly requested, we postpone it by some
* random length of time.
@ -504,8 +343,8 @@ class ScrubQueue {
*
* @return a pair of values: the determined scrub time, and the deadline
*/
scrub_schedule_t adjust_target_time(
const sched_params_t& recomputed_params) const;
Scrub::scrub_schedule_t adjust_target_time(
const Scrub::sched_params_t& recomputed_params) const;
/**
* Look for scrub jobs that have their 'resources_failure' set. These jobs
@ -517,44 +356,9 @@ class ScrubQueue {
*/
void move_failed_pgs(utime_t now_is);
Scrub::schedule_result_t select_from_group(
ScrubQContainer& group,
const Scrub::ScrubPreconds& preconds,
utime_t now_is);
protected: // used by the unit-tests
/**
* unit-tests will override this function to return a mock time
*/
virtual utime_t time_now() const { return ceph_clock_now(); }
};
template <>
struct fmt::formatter<ScrubQueue::qu_state_t>
: fmt::formatter<std::string_view> {
template <typename FormatContext>
auto format(const ScrubQueue::qu_state_t& s, FormatContext& ctx)
{
auto out = ctx.out();
out = fmt::formatter<string_view>::format(
std::string{ScrubQueue::qu_state_text(s)}, ctx);
return out;
}
};
template <>
struct fmt::formatter<ScrubQueue::ScrubJob> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
template <typename FormatContext>
auto format(const ScrubQueue::ScrubJob& sjob, FormatContext& ctx)
{
return fmt::format_to(
ctx.out(),
"pg[{}] @ {:s} (dl:{:s}) - <{}> / failure: {} / pen. t.o.: {:s} / queue "
"state: {:.7}",
sjob.pgid, sjob.schedule.scheduled_at, sjob.schedule.deadline,
sjob.registration_state(), sjob.resources_failure, sjob.penalty_timeout,
sjob.state.load(std::memory_order_relaxed));
}
};

View File

@ -1687,9 +1687,9 @@ void PgScrubber::on_replica_reservation_timeout()
}
}
void PgScrubber::set_reserving_now()
bool PgScrubber::set_reserving_now()
{
m_osds->get_scrub_services().set_reserving_now();
return m_osds->get_scrub_services().set_reserving_now();
}
void PgScrubber::clear_reserving_now()
@ -2085,7 +2085,7 @@ pg_scrubbing_status_t PgScrubber::get_schedule() const
false /* is periodic? unknown, actually */};
}
}
if (m_scrub_job->state != ScrubQueue::qu_state_t::registered) {
if (m_scrub_job->state != Scrub::qu_state_t::registered) {
return pg_scrubbing_status_t{utime_t{},
0,
pg_scrub_sched_status_t::not_queued,
@ -2168,9 +2168,8 @@ PgScrubber::PgScrubber(PG* pg)
m_fsm = std::make_unique<ScrubMachine>(m_pg, this);
m_fsm->initiate();
m_scrub_job = ceph::make_ref<ScrubQueue::ScrubJob>(m_osds->cct,
m_pg->pg_id,
m_osds->get_nodeid());
m_scrub_job = ceph::make_ref<Scrub::ScrubJob>(
m_osds->cct, m_pg->pg_id, m_osds->get_nodeid());
}
void PgScrubber::set_scrub_begin_time()
@ -2265,7 +2264,7 @@ void PgScrubber::replica_handling_done()
std::chrono::milliseconds PgScrubber::get_scrub_sleep_time() const
{
return m_osds->get_scrub_services().scrub_sleep_time(
m_flags.required);
ceph_clock_now(), m_flags.required);
}
void PgScrubber::queue_for_scrub_resched(Scrub::scrub_prio_t prio)
@ -2469,7 +2468,7 @@ void ReplicaReservations::release_replica(pg_shard_t peer, epoch_t epoch)
ReplicaReservations::ReplicaReservations(
PG* pg,
pg_shard_t whoami,
ScrubQueue::ScrubJobRef scrubjob,
Scrub::ScrubJobRef scrubjob,
const ConfigProxy& conf)
: m_pg{pg}
, m_acting_set{pg->get_actingset()}

View File

@ -131,7 +131,7 @@ class ReplicaReservations {
bool m_had_rejections{false};
int m_pending{-1};
const pg_info_t& m_pg_info;
ScrubQueue::ScrubJobRef m_scrub_job; ///< a ref to this PG's scrub job
Scrub::ScrubJobRef m_scrub_job; ///< a ref to this PG's scrub job
const ConfigProxy& m_conf;
// detecting slow peers (see 'slow-secondary' above)
@ -161,7 +161,7 @@ class ReplicaReservations {
ReplicaReservations(PG* pg,
pg_shard_t whoami,
ScrubQueue::ScrubJobRef scrubjob,
Scrub::ScrubJobRef scrubjob,
const ConfigProxy& conf);
~ReplicaReservations();
@ -547,7 +547,7 @@ class PgScrubber : public ScrubPgIF,
void reserve_replicas() final;
void set_reserving_now() final;
bool set_reserving_now() final;
void clear_reserving_now() final;
[[nodiscard]] bool was_epoch_changed() const final;
@ -595,7 +595,7 @@ class PgScrubber : public ScrubPgIF,
virtual void _scrub_clear_state() {}
utime_t m_scrub_reg_stamp; ///< stamp we registered for
ScrubQueue::ScrubJobRef m_scrub_job; ///< the scrub-job used by the OSD to
Scrub::ScrubJobRef m_scrub_job; ///< the scrub-job used by the OSD to
///< schedule us
ostream& show(ostream& out) const override;

View File

@ -0,0 +1,108 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "./scrub_job.h"
#include "pg_scrubber.h"
using qu_state_t = Scrub::qu_state_t;
using must_scrub_t = Scrub::must_scrub_t;
using ScrubQContainer = Scrub::ScrubQContainer;
using sched_params_t = Scrub::sched_params_t;
using OSDRestrictions = Scrub::OSDRestrictions;
using ScrubJob = Scrub::ScrubJob;
// ////////////////////////////////////////////////////////////////////////// //
// ScrubJob
#define dout_subsys ceph_subsys_osd
#undef dout_context
#define dout_context (cct)
#undef dout_prefix
#define dout_prefix _prefix_fn(_dout, this, __func__)
template <class T>
static std::ostream& _prefix_fn(std::ostream* _dout, T* t, std::string fn = "")
{
return t->gen_prefix(*_dout, fn);
}
ScrubJob::ScrubJob(CephContext* cct, const spg_t& pg, int node_id)
: RefCountedObject{cct}
, pgid{pg}
, whoami{node_id}
, cct{cct}
, log_msg_prefix{fmt::format("osd.{}: scrub-job:pg[{}]:", node_id, pgid)}
{}
// debug usage only
namespace std {
ostream& operator<<(ostream& out, const ScrubJob& sjob)
{
return out << fmt::format("{}", sjob);
}
} // namespace std
void ScrubJob::update_schedule(const Scrub::scrub_schedule_t& adjusted)
{
schedule = adjusted;
penalty_timeout = utime_t(0, 0); // helps with debugging
// 'updated' is changed here while not holding jobs_lock. That's OK, as
// the (atomic) flag will only be cleared by select_pg_and_scrub() after
// scan_penalized() is called and the job was moved to the to_scrub queue.
updated = true;
dout(10) << fmt::format(
"adjusted: {:s} ({})", schedule.scheduled_at,
registration_state())
<< dendl;
}
std::string ScrubJob::scheduling_state(utime_t now_is, bool is_deep_expected)
const
{
// if not in the OSD scheduling queues, not a candidate for scrubbing
if (state != qu_state_t::registered) {
return "no scrub is scheduled";
}
// if the time has passed, we are surely in the queue
// (note that for now we do not tell client if 'penalized')
if (now_is > schedule.scheduled_at) {
// we are never sure that the next scrub will indeed be shallow:
return fmt::format("queued for {}scrub", (is_deep_expected ? "deep " : ""));
}
return fmt::format(
"{}scrub scheduled @ {:s}", (is_deep_expected ? "deep " : ""),
schedule.scheduled_at);
}
std::ostream& ScrubJob::gen_prefix(std::ostream& out, std::string_view fn) const
{
return out << log_msg_prefix << fn << ": ";
}
// clang-format off
std::string_view ScrubJob::qu_state_text(qu_state_t st)
{
switch (st) {
case qu_state_t::not_registered: return "not registered w/ OSD"sv;
case qu_state_t::registered: return "registered"sv;
case qu_state_t::unregistering: return "unregistering"sv;
}
// g++ (unlike CLANG), requires an extra 'return' here
return "(unknown)"sv;
}
// clang-format on
void ScrubJob::dump(ceph::Formatter* f) const
{
f->open_object_section("scrub");
f->dump_stream("pgid") << pgid;
f->dump_stream("sched_time") << schedule.scheduled_at;
f->dump_stream("deadline") << schedule.deadline;
f->dump_bool("forced",
schedule.scheduled_at == PgScrubber::scrub_must_stamp());
f->close_section();
}

View File

@ -0,0 +1,181 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <atomic>
#include <chrono>
#include <iostream>
#include <memory>
#include <vector>
#include "common/RefCountedObj.h"
#include "common/ceph_atomic.h"
#include "include/utime_fmt.h"
#include "osd/osd_types.h"
#include "osd/osd_types_fmt.h"
#include "osd/scrubber_common.h"
/**
* The ID used to name a candidate to scrub:
* - in this version: a PG is identified by its spg_t
* - in the (near) future: a PG + a scrub type (shallow/deep)
*/
using ScrubTargetId = spg_t;
namespace Scrub {
enum class must_scrub_t { not_mandatory, mandatory };
enum class qu_state_t {
not_registered, // not a primary, thus not considered for scrubbing by this
// OSD (also the temporary state when just created)
registered, // in either of the two queues ('to_scrub' or 'penalized')
unregistering // in the process of being unregistered. Will be finalized
// under lock
};
struct scrub_schedule_t {
utime_t scheduled_at{};
utime_t deadline{0, 0};
};
struct sched_params_t {
utime_t proposed_time{};
double min_interval{0.0};
double max_interval{0.0};
must_scrub_t is_must{must_scrub_t::not_mandatory};
};
class ScrubJob final : public RefCountedObject {
public:
/**
* a time scheduled for scrub, and a deadline: The scrub could be delayed
* if system load is too high (but not if after the deadline),or if trying
* to scrub out of scrub hours.
*/
scrub_schedule_t schedule;
/// pg to be scrubbed
const spg_t pgid;
/// the OSD id (for the log)
const int whoami;
ceph::atomic<qu_state_t> state{qu_state_t::not_registered};
/**
* the old 'is_registered'. Set whenever the job is registered with the OSD,
* i.e. is in either the 'to_scrub' or the 'penalized' vectors.
*/
std::atomic_bool in_queues{false};
/// last scrub attempt failed to secure replica resources
bool resources_failure{false};
/**
* 'updated' is a temporary flag, used to create a barrier after
* 'sched_time' and 'deadline' (or any other job entry) were modified by
* different task.
* 'updated' also signals the need to move a job back from the penalized
* queue to the regular one.
*/
std::atomic_bool updated{false};
/**
* the scrubber is waiting for locked objects to be unlocked.
* Set after a grace period has passed.
*/
bool blocked{false};
utime_t blocked_since{};
utime_t penalty_timeout{0, 0};
CephContext* cct;
ScrubJob(CephContext* cct, const spg_t& pg, int node_id);
utime_t get_sched_time() const { return schedule.scheduled_at; }
static std::string_view qu_state_text(qu_state_t st);
/**
* relatively low-cost(*) access to the scrub job's state, to be used in
* logging.
* (*) not a low-cost access on x64 architecture
*/
std::string_view state_desc() const
{
return qu_state_text(state.load(std::memory_order_relaxed));
}
void update_schedule(const scrub_schedule_t& adjusted);
void dump(ceph::Formatter* f) const;
/*
* as the atomic 'in_queues' appears in many log prints, accessing it for
* display-only should be made less expensive (on ARM. On x86 the _relaxed
* produces the same code as '_cs')
*/
std::string_view registration_state() const
{
return in_queues.load(std::memory_order_relaxed) ? "in-queue"
: "not-queued";
}
/**
* access the 'state' directly, for when a distinction between 'registered'
* and 'unregistering' is needed (both have in_queues() == true)
*/
bool is_state_registered() const { return state == qu_state_t::registered; }
/**
* a text description of the "scheduling intentions" of this PG:
* are we already scheduled for a scrub/deep scrub? when?
*/
std::string scheduling_state(utime_t now_is, bool is_deep_expected) const;
std::ostream& gen_prefix(std::ostream& out, std::string_view fn) const;
const std::string log_msg_prefix;
};
using ScrubJobRef = ceph::ref_t<ScrubJob>;
using ScrubQContainer = std::vector<ScrubJobRef>;
} // namespace Scrub
namespace std {
std::ostream& operator<<(std::ostream& out, const Scrub::ScrubJob& pg);
} // namespace std
namespace fmt {
template <>
struct formatter<Scrub::qu_state_t> : formatter<std::string_view> {
template <typename FormatContext>
auto format(const Scrub::qu_state_t& s, FormatContext& ctx)
{
auto out = ctx.out();
out = fmt::formatter<string_view>::format(
std::string{Scrub::ScrubJob::qu_state_text(s)}, ctx);
return out;
}
};
template <>
struct formatter<Scrub::ScrubJob> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
template <typename FormatContext>
auto format(const Scrub::ScrubJob& sjob, FormatContext& ctx)
{
return fmt::format_to(
ctx.out(),
"pg[{}] @ {:s} (dl:{:s}) - <{}> / failure: {} / pen. t.o.: {:s} / "
"queue "
"state: {:.7}",
sjob.pgid, sjob.schedule.scheduled_at, sjob.schedule.deadline,
sjob.registration_state(), sjob.resources_failure, sjob.penalty_timeout,
sjob.state.load(std::memory_order_relaxed));
}
};
} // namespace fmt

View File

@ -120,7 +120,14 @@ ReservingReplicas::ReservingReplicas(my_context ctx)
// prevent the OSD from starting another scrub while we are trying to secure
// replicas resources
scrbr->set_reserving_now();
if (!scrbr->set_reserving_now()) {
dout(1) << "ReservingReplicas::ReservingReplicas() some other PG is "
"already reserving replicas resources"
<< dendl;
post_event(ReservationFailure{});
return;
}
m_holding_isreserving_flag = true;
scrbr->reserve_replicas();
auto timeout = scrbr->get_cct()->_conf.get_val<
@ -136,7 +143,9 @@ ReservingReplicas::ReservingReplicas(my_context ctx)
ReservingReplicas::~ReservingReplicas()
{
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
scrbr->clear_reserving_now();
if (m_holding_isreserving_flag) {
scrbr->clear_reserving_now();
}
}
sc::result ReservingReplicas::react(const ReservationTimeout&)

View File

@ -328,6 +328,9 @@ struct ReservingReplicas : sc::state<ReservingReplicas, ScrubMachine>,
ceph::coarse_real_clock::now();
ScrubMachine::timer_event_token_t m_timeout_token;
/// if true - we must 'clear_reserving_now()' upon exit
bool m_holding_isreserving_flag{false};
sc::result react(const FullReset&);
sc::result react(const ReservationTimeout&);

View File

@ -190,8 +190,11 @@ struct ScrubMachineListener {
* and that PG is trying to acquire replica resources.
* set_reserving_now()/clear_reserving_now() let's the OSD scrub-queue know
* we are busy reserving.
*
* set_reserving_now() returns 'false' if there already is a PG in the
* reserving stage of the scrub session.
*/
virtual void set_reserving_now() = 0;
virtual bool set_reserving_now() = 0;
virtual void clear_reserving_now() = 0;
/**

View File

@ -0,0 +1,90 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "./scrub_resources.h"
#include <fmt/format.h>
#include "common/debug.h"
#include "include/ceph_assert.h"
using ScrubResources = Scrub::ScrubResources;
ScrubResources::ScrubResources(
log_upwards_t log_access,
const ceph::common::ConfigProxy& config)
: log_upwards{log_access}
, conf{config}
{}
bool ScrubResources::can_inc_scrubs() const
{
std::lock_guard lck{resource_lock};
if (scrubs_local + scrubs_remote < conf->osd_max_scrubs) {
return true;
}
log_upwards(fmt::format(
"{}== false. {} (local) + {} (remote) >= max ({})", __func__,
scrubs_local, scrubs_remote, conf->osd_max_scrubs));
return false;
}
bool ScrubResources::inc_scrubs_local()
{
std::lock_guard lck{resource_lock};
if (scrubs_local + scrubs_remote < conf->osd_max_scrubs) {
++scrubs_local;
return true;
}
log_upwards(fmt::format(
"{}: {} (local) + {} (remote) >= max ({})", __func__, scrubs_local,
scrubs_remote, conf->osd_max_scrubs));
return false;
}
void ScrubResources::dec_scrubs_local()
{
std::lock_guard lck{resource_lock};
log_upwards(fmt::format(
"{}: {} -> {} (max {}, remote {})", __func__, scrubs_local,
(scrubs_local - 1), conf->osd_max_scrubs, scrubs_remote));
--scrubs_local;
ceph_assert(scrubs_local >= 0);
}
bool ScrubResources::inc_scrubs_remote()
{
std::lock_guard lck{resource_lock};
if (scrubs_local + scrubs_remote < conf->osd_max_scrubs) {
log_upwards(fmt::format(
"{}: {} -> {} (max {}, local {})", __func__, scrubs_remote,
(scrubs_remote + 1), conf->osd_max_scrubs, scrubs_local));
++scrubs_remote;
return true;
}
log_upwards(fmt::format(
"{}: {} (local) + {} (remote) >= max ({})", __func__, scrubs_local,
scrubs_remote, conf->osd_max_scrubs));
return false;
}
void ScrubResources::dec_scrubs_remote()
{
std::lock_guard lck{resource_lock};
log_upwards(fmt::format(
"{}: {} -> {} (max {}, local {})", __func__, scrubs_remote,
(scrubs_remote - 1), conf->osd_max_scrubs, scrubs_local));
--scrubs_remote;
ceph_assert(scrubs_remote >= 0);
}
void ScrubResources::dump_scrub_reservations(ceph::Formatter* f) const
{
std::lock_guard lck{resource_lock};
f->dump_int("scrubs_local", scrubs_local);
f->dump_int("scrubs_remote", scrubs_remote);
f->dump_int("osd_max_scrubs", conf->osd_max_scrubs);
}

View File

@ -0,0 +1,66 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <functional>
#include <string>
#include "common/ceph_mutex.h"
#include "common/config_proxy.h"
#include "common/Formatter.h"
namespace Scrub {
/**
* an interface allowing the ScrubResources to log directly into its
* owner's log. This way, we do not need the full dout() mechanism
* (prefix func, OSD id, etc.)
*/
using log_upwards_t = std::function<void(std::string msg)>;
/**
* The number of concurrent scrub operations performed on an OSD is limited
* by a configuration parameter. The 'ScrubResources' class is responsible for
* maintaining a count of the number of scrubs currently performed, both
* acting as primary and acting as a replica, and for enforcing the limit.
*/
class ScrubResources {
/// the number of concurrent scrubs performed by Primaries on this OSD
int scrubs_local{0};
/// the number of active scrub reservations granted by replicas
int scrubs_remote{0};
mutable ceph::mutex resource_lock =
ceph::make_mutex("ScrubQueue::resource_lock");
log_upwards_t log_upwards; ///< access into the owner's dout()
const ceph::common::ConfigProxy& conf;
public:
explicit ScrubResources(
log_upwards_t log_access,
const ceph::common::ConfigProxy& config);
/**
* \returns true if the number of concurrent scrubs is
* below osd_max_scrubs
*/
bool can_inc_scrubs() const;
/// increments the number of scrubs acting as a Primary
bool inc_scrubs_local();
/// decrements the number of scrubs acting as a Primary
void dec_scrubs_local();
/// increments the number of scrubs acting as a Replica
bool inc_scrubs_remote();
/// decrements the number of scrubs acting as a Replica
void dec_scrubs_remote();
void dump_scrub_reservations(ceph::Formatter* f) const;
};
} // namespace Scrub

View File

@ -32,6 +32,11 @@ private:
ScrubberPasskey& operator=(const ScrubberPasskey&) = delete;
};
/// randomly returns true with probability equal to the passed parameter
static inline bool random_bool_with_probability(double probability) {
return (ceph::util::generate_random_number<double>(0.0, 1.0) < probability);
}
namespace Scrub {
/// high/low OP priority
@ -42,13 +47,36 @@ enum class scrub_prio_t : bool { low_priority = false, high_priority = true };
using act_token_t = uint32_t;
/// "environment" preconditions affecting which PGs are eligible for scrubbing
struct ScrubPreconds {
struct OSDRestrictions {
bool allow_requested_repair_only{false};
bool load_is_low{true};
bool time_permit{true};
bool only_deadlined{false};
};
} // namespace Scrub
namespace fmt {
template <>
struct formatter<Scrub::OSDRestrictions> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
template <typename FormatContext>
auto format(const Scrub::OSDRestrictions& conds, FormatContext& ctx)
{
return fmt::format_to(
ctx.out(),
"overdue-only:{} load:{} time:{} repair-only:{}",
conds.only_deadlined,
conds.load_is_low ? "ok" : "high",
conds.time_permit ? "ok" : "no",
conds.allow_requested_repair_only);
}
};
} // namespace fmt
namespace Scrub {
/// PG services used by the scrubber backend
struct PgScrubBeListener {
virtual ~PgScrubBeListener() = default;

View File

@ -42,8 +42,10 @@ int main(int argc, char** argv)
}
using schedule_result_t = Scrub::schedule_result_t;
using ScrubJobRef = ScrubQueue::ScrubJobRef;
using qu_state_t = ScrubQueue::qu_state_t;
using ScrubJobRef = Scrub::ScrubJobRef;
using qu_state_t = Scrub::qu_state_t;
using scrub_schedule_t = Scrub::scrub_schedule_t;
using ScrubQContainer = Scrub::ScrubQContainer;
/// enabling access into ScrubQueue internals
class ScrubSchedTestWrapper : public ScrubQueue {
@ -60,7 +62,8 @@ class ScrubSchedTestWrapper : public ScrubQueue {
ScrubQContainer collect_ripe_jobs()
{
return ScrubQueue::collect_ripe_jobs(to_scrub, time_now());
return ScrubQueue::collect_ripe_jobs(
to_scrub, Scrub::OSDRestrictions{}, time_now());
}
/**
@ -96,22 +99,17 @@ class FakeOsd : public Scrub::ScrubSchedListener {
int get_nodeid() const final { return m_osd_num; }
schedule_result_t initiate_a_scrub(spg_t pgid,
bool allow_requested_repair_only) final
{
std::ignore = allow_requested_repair_only;
auto res = m_next_response.find(pgid);
if (res == m_next_response.end()) {
return schedule_result_t::no_such_pg;
}
return m_next_response[pgid];
}
void set_initiation_response(spg_t pgid, schedule_result_t result)
{
m_next_response[pgid] = result;
}
std::optional<PGLockWrapper> get_locked_pg(spg_t pgid)
{
std::ignore = pgid;
return std::nullopt;
}
private:
int m_osd_num;
std::map<spg_t, schedule_result_t> m_next_response;
@ -128,7 +126,7 @@ struct sjob_config_t {
std::optional<double> pool_conf_max;
bool is_must;
bool is_need_auto;
ScrubQueue::scrub_schedule_t initial_schedule;
scrub_schedule_t initial_schedule;
};
@ -141,7 +139,7 @@ struct sjob_dynamic_data_t {
pg_info_t mocked_pg_info;
pool_opts_t mocked_pool_opts;
requested_scrub_t request_flags;
ScrubQueue::ScrubJobRef job;
ScrubJobRef job;
};
class TestScrubSched : public ::testing::Test {
@ -197,7 +195,7 @@ class TestScrubSched : public ::testing::Test {
dyn_data.request_flags.need_auto = sjob_data.is_need_auto;
// create the scrub job
dyn_data.job = ceph::make_ref<ScrubQueue::ScrubJob>(g_ceph_context,
dyn_data.job = ceph::make_ref<Scrub::ScrubJob>(g_ceph_context,
sjob_data.spg,
m_osd_num);
m_scrub_jobs.push_back(dyn_data);
@ -252,7 +250,7 @@ class TestScrubSched : public ::testing::Test {
}
void debug_print_jobs(std::string hdr,
const ScrubQueue::ScrubQContainer& jobs)
const ScrubQContainer& jobs)
{
std::cout << fmt::format("{}: time now {}", hdr, m_sched->time_now())
<< std::endl;
@ -287,7 +285,7 @@ std::vector<sjob_config_t> sjob_configs = {
std::nullopt, // max scrub delay in pool config
false, // must-scrub
false, // need-auto
ScrubQueue::scrub_schedule_t{} // initial schedule
scrub_schedule_t{} // initial schedule
},
{spg_t{pg_t{4, 1}},
@ -297,7 +295,7 @@ std::vector<sjob_config_t> sjob_configs = {
std::nullopt,
true,
false,
ScrubQueue::scrub_schedule_t{}},
scrub_schedule_t{}},
{spg_t{pg_t{7, 1}},
true,
@ -306,7 +304,7 @@ std::vector<sjob_config_t> sjob_configs = {
std::nullopt,
false,
false,
ScrubQueue::scrub_schedule_t{}},
scrub_schedule_t{}},
{spg_t{pg_t{5, 1}},
true,
@ -315,7 +313,7 @@ std::vector<sjob_config_t> sjob_configs = {
std::nullopt,
false,
false,
ScrubQueue::scrub_schedule_t{}}};
scrub_schedule_t{}}};
} // anonymous namespace