Merge pull request #40652 from ronen-fr/wip-ronenf-cscrub-class

osd/scrub: modify "classic" OSD scrub state-machine to support Crimson

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
This commit is contained in:
Kefu Chai 2021-06-06 00:06:32 +08:00 committed by GitHub
commit 89951a1d20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 648 additions and 290 deletions

View File

@ -1819,6 +1819,18 @@ void OSDService::queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_prio
queue_scrub_event_msg<PGScrubPushesUpdate>(pg, with_priority);
}
void OSDService::queue_scrub_chunk_free(PG* pg, Scrub::scrub_prio_t with_priority)
{
// Resulting scrub event: 'SelectedChunkFree'
queue_scrub_event_msg<PGScrubChunkIsFree>(pg, with_priority);
}
void OSDService::queue_scrub_chunk_busy(PG* pg, Scrub::scrub_prio_t with_priority)
{
// Resulting scrub event: 'ChunkIsBusy'
queue_scrub_event_msg<PGScrubChunkIsBusy>(pg, with_priority);
}
void OSDService::queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority)
{
queue_scrub_event_msg<PGScrubAppliedUpdate>(pg, with_priority);
@ -1836,18 +1848,42 @@ void OSDService::queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_prio
queue_scrub_event_msg<PGScrubDigestUpdate>(pg, with_priority);
}
void OSDService::queue_scrub_got_local_map(PG* pg, Scrub::scrub_prio_t with_priority)
{
// Resulting scrub event: 'IntLocalMapDone'
queue_scrub_event_msg<PGScrubGotLocalMap>(pg, with_priority);
}
void OSDService::queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority)
{
// Resulting scrub event: 'GotReplicas'
queue_scrub_event_msg<PGScrubGotReplMaps>(pg, with_priority);
}
void OSDService::queue_scrub_maps_compared(PG* pg, Scrub::scrub_prio_t with_priority)
{
// Resulting scrub event: 'MapsCompared'
queue_scrub_event_msg<PGScrubMapsCompared>(pg, with_priority);
}
void OSDService::queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority)
{
// Resulting scrub event: 'ReplicaPushesUpd'
queue_scrub_event_msg<PGScrubReplicaPushes>(pg, with_priority);
}
void OSDService::queue_scrub_is_finished(PG *pg)
{
// Resulting scrub event: 'ScrubFinished'
queue_scrub_event_msg<PGScrubScrubFinished>(pg, Scrub::scrub_prio_t::high_priority);
}
void OSDService::queue_scrub_next_chunk(PG *pg, Scrub::scrub_prio_t with_priority)
{
// Resulting scrub event: 'NextChunk'
queue_scrub_event_msg<PGScrubGetNextChunk>(pg, with_priority);
}
void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e)
{
dout(10) << __func__ << " on " << pgid << " e " << e << dendl;

View File

@ -586,9 +586,10 @@ public:
void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
void queue_for_snap_trim(PG *pg);
void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority);
void queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority);
/// queue the message (-> event) that all replicas reserved scrub resources for us
/// queue the message (-> event) that all replicas have reserved scrub resources for us
void queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority);
/// queue the message (-> event) that some replicas denied our scrub resources request
@ -604,15 +605,37 @@ public:
/// Signals that all pending updates were applied
void queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority);
/// Signals that the selected chunk (objects range) is available for scrubbing
void queue_scrub_chunk_free(PG* pg, Scrub::scrub_prio_t with_priority);
/// The chunk selected is blocked by user operations, and cannot be scrubbed now
void queue_scrub_chunk_busy(PG* pg, Scrub::scrub_prio_t with_priority);
/// The block-range that was locked and prevented the scrubbing - is freed
void queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority);
/// Signals that all write OPs are done
void queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority);
/// Signals that the the local (Primary's) scrub map is ready
void queue_scrub_got_local_map(PG* pg, Scrub::scrub_prio_t with_priority);
/// Signals that we (the Primary) got all waited-for scrub-maps from our replicas
void queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority);
/// Signals that all chunks were handled
/// Note: always with high priority, as must be acted upon before the
/// next scrub request arrives from the Primary (and the primary is free
/// to send the request once the replica's map is received).
void queue_scrub_is_finished(PG* pg);
/// Signals that there are more chunks to handle
void queue_scrub_next_chunk(PG* pg, Scrub::scrub_prio_t with_priority);
/// Signals that we have finished comparing the maps for this chunk
/// Note: required, as in Crimson this operation is 'futurized'.
void queue_scrub_maps_compared(PG* pg, Scrub::scrub_prio_t with_priority);
void queue_for_rep_scrub(PG* pg,
Scrub::scrub_prio_t with_high_priority,
unsigned int qu_priority);
@ -620,6 +643,8 @@ public:
/// Signals a change in the number of in-flight recovery writes
void queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority);
/// (not in Crimson) Queue a SchedReplica event to be sent to the replica, to trigger
/// a re-check of the availability of the scrub map prepared by the backend.
void queue_for_rep_scrub_resched(PG* pg,
Scrub::scrub_prio_t with_high_priority,
unsigned int qu_priority);

View File

@ -1393,7 +1393,8 @@ bool PG::is_time_for_deep(bool allow_deep_scrub,
bool has_deep_errors,
const requested_scrub_t& planned) const
{
dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? " << allow_deep_scrub << dendl;
dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? "
<< allow_deep_scrub << dendl;
if (!allow_deep_scrub)
return false;
@ -1403,8 +1404,11 @@ bool PG::is_time_for_deep(bool allow_deep_scrub,
return true;
}
if (ceph_clock_now() >= next_deepscrub_interval())
if (ceph_clock_now() >= next_deepscrub_interval()) {
dout(20) << __func__ << ": now (" << ceph_clock_now() << ") >= time for deep ("
<< next_deepscrub_interval() << ")" << dendl;
return true;
}
if (has_deep_errors) {
osd->clog->info() << "osd." << osd->whoami << " pg " << info.pgid
@ -1532,6 +1536,7 @@ void PG::reg_next_scrub()
void PG::on_info_history_change()
{
dout(20) << __func__ << dendl;
if (m_scrubber) {
m_scrubber->unreg_next_scrub();
m_scrubber->reg_next_scrub(m_planned_scrub);
@ -1540,7 +1545,9 @@ void PG::on_info_history_change()
void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
{
m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
if (m_scrubber) {
m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
}
}
void PG::clear_ready_to_merge() {
@ -2057,15 +2064,15 @@ void PG::repair_object(
recovery_state.force_object_missing(bad_peers, soid, oi.version);
}
void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued)
void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc)
{
dout(20) << __func__ << " queued at: " << epoch_queued << dendl;
dout(20) << __func__ << ": " << desc << " queued at: " << epoch_queued << dendl;
if (is_active() && m_scrubber) {
((*m_scrubber).*fn)(epoch_queued);
} else {
// pg might be in the process of being deleted
dout(5) << __func__ << " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
(is_active() ? "(active) " : "(not active) ") << dendl;
(is_active() ? "(active) " : "(not active) ") << dendl;
}
}
@ -2076,103 +2083,13 @@ void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
m_scrubber->replica_scrub_op(op);
}
void PG::scrub(epoch_t epoch_queued, ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
// a new scrub
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::initiate_regular_scrub, epoch_queued);
}
// note: no need to secure OSD resources for a recovery scrub
void PG::recovery_scrub(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
// a new scrub
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair, epoch_queued);
}
void PG::replica_scrub(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued
<< (is_primary() ? " (primary)" : " (replica)") << dendl;
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued);
}
void PG::scrub_send_scrub_resched(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::send_scrub_resched, epoch_queued);
}
void PG::scrub_send_resources_granted(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
forward_scrub_event(&ScrubPgIF::send_remotes_reserved, epoch_queued);
}
void PG::scrub_send_resources_denied(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
forward_scrub_event(&ScrubPgIF::send_reservation_failure, epoch_queued);
}
void PG::replica_scrub_resched(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::send_sched_replica, epoch_queued);
}
void PG::scrub_send_pushes_update(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
forward_scrub_event(&ScrubPgIF::active_pushes_notification, epoch_queued);
}
void PG::scrub_send_replica_pushes(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd, epoch_queued);
}
void PG::scrub_send_applied_update(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
forward_scrub_event(&ScrubPgIF::update_applied_notification, epoch_queued);
}
void PG::scrub_send_unblocking(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
forward_scrub_event(&ScrubPgIF::send_scrub_unblock, epoch_queued);
}
void PG::scrub_send_digest_update(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
forward_scrub_event(&ScrubPgIF::digest_update_notification, epoch_queued);
}
void PG::scrub_send_replmaps_ready(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
forward_scrub_event(&ScrubPgIF::send_replica_maps_ready, epoch_queued);
forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued, "StartReplica/nw"sv);
}
bool PG::ops_blocked_by_scrub() const

View File

@ -374,26 +374,113 @@ public:
void finish_split_stats(const object_stat_sum_t& stats,
ObjectStore::Transaction &t);
void scrub(epoch_t queued, ThreadPool::TPHandle &handle);
void scrub(epoch_t queued, ThreadPool::TPHandle& handle)
{
// a new scrub
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::initiate_regular_scrub, queued, "StartScrub"sv);
}
/**
* a special version of PG::scrub(), which:
* - is initiated after repair, and
* - is not required to allocate local/remote OSD scrub resources
*/
void recovery_scrub(epoch_t queued, ThreadPool::TPHandle &handle);
void replica_scrub(epoch_t queued, ThreadPool::TPHandle &handle);
void replica_scrub_resched(epoch_t queued, ThreadPool::TPHandle &handle);
void recovery_scrub(epoch_t queued, ThreadPool::TPHandle& handle)
{
// a new scrub
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair, queued,
"AfterRepairScrub"sv);
}
/// Queues a PGScrubResourcesOK message. Will translate into 'RemotesReserved' FSM event
void scrub_send_resources_granted(epoch_t queued, ThreadPool::TPHandle &handle);
void scrub_send_resources_denied(epoch_t queued, ThreadPool::TPHandle &handle);
void scrub_send_scrub_resched(epoch_t queued, ThreadPool::TPHandle &handle);
void scrub_send_pushes_update(epoch_t queued, ThreadPool::TPHandle &handle);
void scrub_send_applied_update(epoch_t queued, ThreadPool::TPHandle &handle);
void scrub_send_unblocking(epoch_t epoch_queued, ThreadPool::TPHandle &handle);
void scrub_send_digest_update(epoch_t epoch_queued, ThreadPool::TPHandle &handle);
void scrub_send_replmaps_ready(epoch_t epoch_queued, ThreadPool::TPHandle &handle);
void scrub_send_replica_pushes(epoch_t queued, ThreadPool::TPHandle &handle);
void replica_scrub(epoch_t queued, ThreadPool::TPHandle &handle);
void replica_scrub_resched(epoch_t queued, ThreadPool::TPHandle& handle)
{
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::send_sched_replica, queued, "SchedReplica"sv);
}
void scrub_send_resources_granted(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_remotes_reserved, queued, "RemotesReserved"sv);
}
void scrub_send_resources_denied(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_reservation_failure, queued,
"ReservationFailure"sv);
}
void scrub_send_scrub_resched(epoch_t queued, ThreadPool::TPHandle& handle)
{
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::send_scrub_resched, queued, "InternalSchedScrub"sv);
}
void scrub_send_pushes_update(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::active_pushes_notification, queued,
"ActivePushesUpd"sv);
}
void scrub_send_applied_update(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::update_applied_notification, queued,
"UpdatesApplied"sv);
}
void scrub_send_unblocking(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_scrub_unblock, queued, "Unblocked"sv);
}
void scrub_send_digest_update(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::digest_update_notification, queued, "DigestUpdate"sv);
}
void scrub_send_local_map_ready(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_local_map_done, queued, "IntLocalMapDone"sv);
}
void scrub_send_replmaps_ready(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_replica_maps_ready, queued, "GotReplicas"sv);
}
void scrub_send_replica_pushes(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd, queued,
"ReplicaPushesUpd"sv);
}
void scrub_send_maps_compared(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_maps_compared, queued, "MapsCompared"sv);
}
void scrub_send_get_next_chunk(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_get_next_chunk, queued, "NextChunk"sv);
}
void scrub_send_scrub_is_finished(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_scrub_is_finished, queued, "ScrubFinished"sv);
}
void scrub_send_chunk_free(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_chunk_free, queued, "SelectedChunkFree"sv);
}
void scrub_send_chunk_busy(epoch_t queued, ThreadPool::TPHandle& handle)
{
forward_scrub_event(&ScrubPgIF::send_chunk_busy, queued, "ChunkIsBusy"sv);
}
void reg_next_scrub();
@ -564,7 +651,8 @@ private:
requested_scrub_t& planned) const;
using ScrubAPI = void (ScrubPgIF::*)(epoch_t epoch_queued);
void forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued);
void forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc);
public:
virtual void do_request(
@ -711,7 +799,7 @@ protected:
public:
bool dne() { return info.dne(); }
virtual void send_cluster_message(
void send_cluster_message(
int osd, MessageRef m, epoch_t epoch, bool share_map_update) override;
protected:
@ -1137,7 +1225,7 @@ protected:
void do_pending_flush();
public:
virtual void prepare_write(
void prepare_write(
pg_info_t &info,
pg_info_t &last_written_info,
PastIntervals &past_intervals,

View File

@ -3780,6 +3780,8 @@ std::optional<pg_stat_t> PeeringState::prepare_stats_for_publish(
const object_stat_collection_t &unstable_stats)
{
if (info.stats.stats.sum.num_scrub_errors) {
psdout(10) << __func__ << " inconsistent due to " <<
info.stats.stats.sum.num_scrub_errors << " scrub errors" << dendl;
state_set(PG_STATE_INCONSISTENT);
} else {
state_clear(PG_STATE_INCONSISTENT);

View File

@ -1,7 +1,7 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=2 sw=2 smarttab
#include "pg_scrubber.h"
#include "./pg_scrubber.h" // the '.' notation used to affect clang-format order
#include <iostream>
#include <vector>
@ -279,6 +279,16 @@ void PgScrubber::digest_update_notification(epoch_t epoch_queued)
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
void PgScrubber::send_local_map_done(epoch_t epoch_queued)
{
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
if (is_message_relevant(epoch_queued)) {
m_fsm->my_states();
m_fsm->process_event(Scrub::IntLocalMapDone{});
}
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
void PgScrubber::send_replica_maps_ready(epoch_t epoch_queued)
{
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
@ -320,6 +330,70 @@ void PgScrubber::send_reservation_failure(epoch_t epoch_queued)
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
void PgScrubber::send_full_reset(epoch_t epoch_queued)
{
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
m_fsm->my_states();
m_fsm->process_event(Scrub::FullReset{});
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
void PgScrubber::send_chunk_free(epoch_t epoch_queued)
{
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
if (check_interval(epoch_queued)) {
m_fsm->my_states();
m_fsm->process_event(Scrub::SelectedChunkFree{});
}
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
void PgScrubber::send_chunk_busy(epoch_t epoch_queued)
{
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
if (check_interval(epoch_queued)) {
m_fsm->my_states();
m_fsm->process_event(Scrub::ChunkIsBusy{});
}
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
void PgScrubber::send_get_next_chunk(epoch_t epoch_queued)
{
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
if (is_message_relevant(epoch_queued)) {
m_fsm->my_states();
m_fsm->process_event(Scrub::NextChunk{});
}
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
void PgScrubber::send_scrub_is_finished(epoch_t epoch_queued)
{
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
// can't check for "active"
m_fsm->my_states();
m_fsm->process_event(Scrub::ScrubFinished{});
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
void PgScrubber::send_maps_compared(epoch_t epoch_queued)
{
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
m_fsm->my_states();
m_fsm->process_event(Scrub::MapsCompared{});
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
// -----------------
bool PgScrubber::is_reserving() const
{
return m_fsm->is_reserving();
@ -342,7 +416,7 @@ unsigned int PgScrubber::scrub_requeue_priority(Scrub::scrub_prio_t with_priorit
if (with_priority == Scrub::scrub_prio_t::high_priority) {
qu_priority =
std::max(qu_priority, (unsigned int)m_pg->cct->_conf->osd_client_op_priority);
std::max(qu_priority, (unsigned int)m_pg->get_cct()->_conf->osd_client_op_priority);
}
return qu_priority;
}
@ -485,17 +559,12 @@ void PgScrubber::set_subset_last_update(eversion_t e)
}
/*
* The selected range is set directly into 'm_start' and 'm_end'
* setting:
* - m_subset_last_update
* - m_max_end
* - end
* - start
* By:
* - setting tentative range based on conf and divisor
* - requesting a partial list of elements from the backend;
* - handling some head/clones issues
*
* The selected range is set directly into 'm_start' and 'm_end'
*/
bool PgScrubber::select_range()
{
@ -576,6 +645,20 @@ bool PgScrubber::select_range()
return true;
}
void PgScrubber::select_range_n_notify()
{
if (select_range()) {
// the next chunk to handle is not blocked
dout(20) << __func__ << ": selection OK" << dendl;
m_osds->queue_scrub_chunk_free(m_pg, Scrub::scrub_prio_t::low_priority);
} else {
// we will wait for the objects range to become available for scrubbing
dout(10) << __func__ << ": selected chunk is busy" << dendl;
m_osds->queue_scrub_chunk_busy(m_pg, Scrub::scrub_prio_t::low_priority);
}
}
bool PgScrubber::write_blocked_by_scrub(const hobject_t& soid)
{
if (soid < m_start || soid >= m_end) {
@ -621,7 +704,7 @@ bool PgScrubber::range_intersects_scrub(const hobject_t& start, const hobject_t&
*/
void PgScrubber::add_delayed_scheduling()
{
m_end = m_start; // not blocking any range now
m_end = m_start; // not blocking any range now
milliseconds sleep_time{0ms};
if (m_needs_sleep) {
@ -694,15 +777,13 @@ eversion_t PgScrubber::search_log_for_updates() const
return p->version;
}
bool PgScrubber::get_replicas_maps(bool replica_can_preempt)
void PgScrubber::get_replicas_maps(bool replica_can_preempt)
{
dout(10) << __func__ << " started in epoch/interval: " << m_epoch_start << "/"
<< m_interval_start
<< " pg same_interval_since: " << m_pg->info.history.same_interval_since
<< dendl;
bool do_have_replicas = false;
m_primary_scrubmap_pos.reset();
// ask replicas to scan and send maps
@ -711,14 +792,12 @@ bool PgScrubber::get_replicas_maps(bool replica_can_preempt)
if (i == m_pg_whoami)
continue;
do_have_replicas = true;
m_maps_status.mark_replica_map_request(i);
_request_scrub_map(i, m_subset_last_update, m_start, m_end, m_is_deep,
replica_can_preempt);
}
dout(10) << __func__ << " awaiting" << m_maps_status << dendl;
return do_have_replicas;
}
bool PgScrubber::was_epoch_changed() const
@ -922,12 +1001,16 @@ void PgScrubber::_scan_snaps(ScrubMap& smap)
int PgScrubber::build_primary_map_chunk()
{
epoch_t map_building_since = m_pg->get_osdmap_epoch();
dout(20) << __func__ << ": initiated at epoch " << map_building_since << dendl;
auto ret = build_scrub_map_chunk(m_primary_scrubmap, m_primary_scrubmap_pos, m_start,
m_end, m_is_deep);
if (ret == -EINPROGRESS)
if (ret == -EINPROGRESS) {
// reschedule another round of asking the backend to collect the scrub data
m_osds->queue_for_scrub_resched(m_pg, Scrub::scrub_prio_t::low_priority);
}
return ret;
}
@ -939,21 +1022,46 @@ int PgScrubber::build_replica_map_chunk()
auto ret = build_scrub_map_chunk(replica_scrubmap, replica_scrubmap_pos, m_start, m_end,
m_is_deep);
if (ret == 0) {
switch (ret) {
// finished!
// In case we restarted smaller chunk, clear old data
case -EINPROGRESS:
// must wait for the backend to finish. No external event source.
// (note: previous version used low priority here. Now switched to using the
// priority of the original message)
m_osds->queue_for_rep_scrub_resched(m_pg, m_replica_request_priority,
m_flags.priority);
break;
m_cleaned_meta_map.clear_from(m_start);
m_cleaned_meta_map.insert(replica_scrubmap);
auto for_meta_scrub = clean_meta_map();
_scan_snaps(for_meta_scrub);
}
case 0: {
// finished!
m_cleaned_meta_map.clear_from(m_start);
m_cleaned_meta_map.insert(replica_scrubmap);
auto for_meta_scrub = clean_meta_map();
_scan_snaps(for_meta_scrub);
// previous version used low priority here. Now switched to using the priority
// of the original message
if (ret == -EINPROGRESS)
requeue_replica(m_replica_request_priority);
// the local map has been created. Send it to the primary.
// Note: once the message reaches the Primary, it may ask us for another
// chunk - and we better be done with the current scrub. Thus - the preparation of
// the reply message is separate, and we clear the scrub state before actually
// sending it.
auto reply = prep_replica_map_msg(PreemptionNoted::no_preemption);
replica_handling_done();
dout(15) << __func__ << " chunk map sent " << dendl;
send_replica_map(reply);
} break;
default:
// negative retval: build_scrub_map_chunk() signalled an error
// Pre-Pacific code ignored this option, treating it as a success.
// \todo Add an error flag in the returning message.
dout(1) << "Error! Aborting. ActiveReplica::react(SchedReplica) Ret: " << ret
<< dendl;
replica_handling_done();
// only in debug mode for now:
assert(false && "backend error");
break;
};
return ret;
}
@ -990,8 +1098,8 @@ int PgScrubber::build_scrub_map_chunk(
// scan objects
while (!pos.done()) {
int r = m_pg->get_pgbackend()->be_scan_list(map, pos);
dout(10) << __func__ << " be r " << r << dendl;
if (r == -EINPROGRESS) {
dout(20) << __func__ << " in progress" << dendl;
return r;
@ -1059,6 +1167,7 @@ void PgScrubber::maps_compare_n_cleanup()
m_start = m_end;
run_callbacks();
requeue_waiting();
m_osds->queue_scrub_maps_compared(m_pg, Scrub::scrub_prio_t::low_priority);
}
Scrub::preemption_t& PgScrubber::get_preemptor()
@ -1066,12 +1175,6 @@ Scrub::preemption_t& PgScrubber::get_preemptor()
return preemption_data;
}
void PgScrubber::requeue_replica(Scrub::scrub_prio_t is_high_priority)
{
dout(10) << __func__ << dendl;
m_osds->queue_for_rep_scrub_resched(m_pg, is_high_priority, m_flags.priority);
}
/*
* Process note: called for the arriving "give me your map, replica!" request. Unlike
* the original implementation, we do not requeue the Op waiting for
@ -1079,6 +1182,7 @@ void PgScrubber::requeue_replica(Scrub::scrub_prio_t is_high_priority)
*/
void PgScrubber::replica_scrub_op(OpRequestRef op)
{
op->mark_started();
auto msg = op->get_req<MOSDRepScrub>();
dout(10) << __func__ << " pg:" << m_pg->pg_id << " Msg: map_epoch:" << msg->map_epoch
<< " min_epoch:" << msg->min_epoch << " deep?" << msg->deep << dendl;
@ -1202,7 +1306,7 @@ void PgScrubber::scrub_compare_maps()
// Map from object with errors to good peer
map<hobject_t, list<pg_shard_t>> authoritative;
dout(2) << __func__ << m_pg->get_primary() << " has "
dout(2) << __func__ << ": primary (" << m_pg->get_primary() << ") has "
<< m_primary_scrubmap.objects.size() << " items" << dendl;
ss.str("");
@ -1212,7 +1316,6 @@ void PgScrubber::scrub_compare_maps()
maps, master_set, m_is_repair, m_missing, m_inconsistent,
authoritative, missing_digest, m_shallow_errors, m_deep_errors, m_store.get(),
m_pg->info.pgid, m_pg->recovery_state.get_acting(), ss);
dout(2) << ss.str() << dendl;
if (!ss.str().empty()) {
m_osds->clog->error(ss);
@ -1258,21 +1361,36 @@ void PgScrubber::scrub_compare_maps()
}
}
/**
* Send the requested map back to the primary (or - if we
* were preempted - let the primary know).
*/
void PgScrubber::send_replica_map(PreemptionNoted was_preempted)
ScrubMachineListener::MsgAndEpoch PgScrubber::prep_replica_map_msg(
PreemptionNoted was_preempted)
{
dout(10) << __func__ << " min epoch:" << m_replica_min_epoch << dendl;
auto reply = new MOSDRepScrubMap(spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard),
m_replica_min_epoch, m_pg_whoami);
auto reply =
make_message<MOSDRepScrubMap>(spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard),
m_replica_min_epoch, m_pg_whoami);
reply->preempted = (was_preempted == PreemptionNoted::preempted);
::encode(replica_scrubmap, reply->get_data());
m_osds->send_message_osd_cluster(m_pg->get_primary().osd, reply, m_replica_min_epoch);
return ScrubMachineListener::MsgAndEpoch{reply, m_replica_min_epoch};
}
void PgScrubber::send_replica_map(const MsgAndEpoch& preprepared)
{
m_pg->send_cluster_message(m_pg->get_primary().osd, preprepared.m_msg,
preprepared.m_epoch, false);
}
void PgScrubber::send_preempted_replica()
{
auto reply =
make_message<MOSDRepScrubMap>(spg_t{m_pg->info.pgid.pgid, m_pg->get_primary().shard},
m_replica_min_epoch, m_pg_whoami);
reply->preempted = true;
::encode(replica_scrubmap, reply->get_data()); // must not skip this
m_pg->send_cluster_message(m_pg->get_primary().osd, reply, m_replica_min_epoch, false);
}
/*
@ -1502,7 +1620,8 @@ void PgScrubber::unreserve_replicas()
void PgScrubber::scrub_finish()
{
dout(10) << __func__ << " before flags: " << m_flags
<< " deep_scrub_on_error: " << m_flags.deep_scrub_on_error << dendl;
<< ". repair state: " << (state_test(PG_STATE_REPAIR) ? "repair" : "no-repair")
<< ". deep_scrub_on_error: " << m_flags.deep_scrub_on_error << dendl;
ceph_assert(m_pg->is_locked());
@ -1522,7 +1641,7 @@ void PgScrubber::scrub_finish()
bool do_auto_scrub = false;
// if a regular scrub had errors within the limit, do a deep scrub to auto repair
if (m_flags.deep_scrub_on_error && m_authoritative.size() &&
if (m_flags.deep_scrub_on_error && !m_authoritative.empty() &&
m_authoritative.size() <= m_pg->cct->_conf->osd_scrub_auto_repair_num_errors) {
ceph_assert(!m_is_deep);
do_auto_scrub = true;
@ -1658,28 +1777,31 @@ void PgScrubber::scrub_finish()
}
}
Scrub::FsmNext PgScrubber::on_digest_updates()
void PgScrubber::on_digest_updates()
{
dout(10) << __func__ << " #pending: " << num_digest_updates_pending << " pending? "
<< num_digest_updates_pending
<< (m_end.is_max() ? " <last chunk> " : " <mid chunk> ") << dendl;
if (num_digest_updates_pending == 0) {
if (num_digest_updates_pending > 0) {
// do nothing for now. We will be called again when new updates arrive
return;
}
// got all updates, and finished with this chunk. Any more?
if (m_end.is_max()) {
scrub_finish();
m_osds->queue_scrub_is_finished(m_pg);
// got all updates, and finished with this chunk. Any more?
if (m_end.is_max()) {
scrub_finish();
return Scrub::FsmNext::goto_notactive;
} else {
// go get a new chunk (via "requeue")
preemption_data.reset();
return Scrub::FsmNext::next_chunk;
}
} else {
return Scrub::FsmNext::do_discard;
// go get a new chunk (via "requeue")
preemption_data.reset();
m_osds->queue_scrub_next_chunk(m_pg, m_pg->is_scrub_blocking_ops());
}
}
/*
* note that the flags-set fetched from the PG (m_pg->m_planned_scrub)
* is cleared once scrubbing starts; Some of the values dumped here are
@ -1756,7 +1878,6 @@ PgScrubber::PgScrubber(PG* pg)
, m_pg_whoami{pg->pg_whoami}
, preemption_data{pg}
{
dout(20) << " creating PgScrubber for " << pg->pg_id << " / " << m_pg_whoami << dendl;
m_fsm = std::make_unique<ScrubMachine>(m_pg, this);
m_fsm->initiate();
}

View File

@ -141,6 +141,7 @@ class MapsCollectionStatus {
} // namespace Scrub
/**
* the scrub operation flags. Primary only.
* Set at scrub start. Checked in multiple locations - mostly
@ -213,6 +214,20 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
void send_replica_pushes_upd(epoch_t epoch_queued) final;
void send_full_reset(epoch_t epoch_queued) final;
void send_chunk_free(epoch_t epoch_queued) final;
void send_chunk_busy(epoch_t epoch_queued) final;
void send_local_map_done(epoch_t epoch_queued) final;
void send_maps_compared(epoch_t epoch_queued) final;
void send_get_next_chunk(epoch_t epoch_queued) final;
void send_scrub_is_finished(epoch_t epoch_queued) final;
/**
* we allow some number of preemptions of the scrub, which mean we do
* not block. Then we start to block. Once we start blocking, we do
@ -318,12 +333,12 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
scrub_ls_result_t& res_inout) const override
{
return false;
};
}
// -------------------------------------------------------------------------------------------
// the I/F used by the state-machine (i.e. the implementation of ScrubMachineListener)
bool select_range() final;
void select_range_n_notify() final;
/// walk the log to find the latest update that affects our chunk
eversion_t search_log_for_updates() const final;
@ -335,8 +350,6 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
int pending_active_pushes() const final { return m_pg->active_pushes; }
void scrub_compare_maps() final;
void on_init() final;
void on_replica_init() final;
void replica_handling_done() final;
@ -345,18 +358,22 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
/// (thus can be called from FSM reactions)
void clear_pgscrub_state() final;
/*
* Send an 'InternalSchedScrub' FSM event either immediately, or - if 'm_need_sleep'
* is asserted - after a configuration-dependent timeout.
*/
void add_delayed_scheduling() final;
/**
* @returns have we asked at least one replica?
* 'false' means we are configured with no replicas, and
* should expect no maps to arrive.
*/
bool get_replicas_maps(bool replica_can_preempt) final;
void get_replicas_maps(bool replica_can_preempt) final;
Scrub::FsmNext on_digest_updates() final;
void on_digest_updates() final;
void send_replica_map(Scrub::PreemptionNoted was_preempted) final;
ScrubMachineListener::MsgAndEpoch
prep_replica_map_msg(Scrub::PreemptionNoted was_preempted) final;
void send_replica_map(const ScrubMachineListener::MsgAndEpoch& preprepared) final;
void send_preempted_replica() final;
void send_remotes_reserved(epoch_t epoch_queued) final;
void send_reservation_failure(epoch_t epoch_queued) final;
@ -479,6 +496,8 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
*/
[[nodiscard]] bool scrub_process_inconsistent();
void scrub_compare_maps();
bool m_needs_sleep{true}; ///< should we sleep before being rescheduled? always
///< 'true', unless we just got out of a sleep period
@ -602,6 +621,18 @@ private:
*/
void request_rescrubbing(requested_scrub_t& req_flags);
/*
* Select a range of objects to scrub.
*
* By:
* - setting tentative range based on conf and divisor
* - requesting a partial list of elements from the backend;
* - handling some head/clones issues
*
* The selected range is set directly into 'm_start' and 'm_end'
*/
bool select_range();
std::list<Context*> m_callbacks;
/**
@ -645,18 +676,13 @@ private:
ScrubMapBuilder replica_scrubmap_pos;
ScrubMap replica_scrubmap;
/**
* we mark the request priority as it arrived. It influences the queuing priority
* when we wait for local updates
*/
Scrub::scrub_prio_t m_replica_request_priority;
/**
* Queue a XX event to be sent to the replica, to trigger a re-check of the
* availability of the scrub map prepared by the backend.
*/
void requeue_replica(Scrub::scrub_prio_t is_high_priority);
/**
* the 'preemption' "state-machine".
* Note: I was considering an orthogonal sub-machine implementation, but as

View File

@ -130,6 +130,15 @@ void PGScrubDigestUpdate::run(OSD* osd,
pg->unlock();
}
void PGScrubGotLocalMap::run(OSD* osd,
OSDShard* sdata,
PGRef& pg,
ThreadPool::TPHandle& handle)
{
pg->scrub_send_local_map_ready(epoch_queued, handle);
pg->unlock();
}
void PGScrubGotReplMaps::run(OSD* osd,
OSDShard* sdata,
PGRef& pg,
@ -139,6 +148,15 @@ void PGScrubGotReplMaps::run(OSD* osd,
pg->unlock();
}
void PGScrubMapsCompared::run(OSD* osd,
OSDShard* sdata,
PGRef& pg,
ThreadPool::TPHandle& handle)
{
pg->scrub_send_maps_compared(epoch_queued, handle);
pg->unlock();
}
void PGRepScrub::run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle)
{
pg->replica_scrub(epoch_queued, handle);
@ -163,6 +181,42 @@ void PGScrubReplicaPushes::run([[maybe_unused]] OSD* osd,
pg->unlock();
}
void PGScrubScrubFinished::run([[maybe_unused]] OSD* osd,
OSDShard* sdata,
PGRef& pg,
ThreadPool::TPHandle& handle)
{
pg->scrub_send_scrub_is_finished(epoch_queued, handle);
pg->unlock();
}
void PGScrubGetNextChunk::run([[maybe_unused]] OSD* osd,
OSDShard* sdata,
PGRef& pg,
ThreadPool::TPHandle& handle)
{
pg->scrub_send_get_next_chunk(epoch_queued, handle);
pg->unlock();
}
void PGScrubChunkIsBusy::run([[maybe_unused]] OSD* osd,
OSDShard* sdata,
PGRef& pg,
ThreadPool::TPHandle& handle)
{
pg->scrub_send_chunk_busy(epoch_queued, handle);
pg->unlock();
}
void PGScrubChunkIsFree::run([[maybe_unused]] OSD* osd,
OSDShard* sdata,
PGRef& pg,
ThreadPool::TPHandle& handle)
{
pg->scrub_send_chunk_free(epoch_queued, handle);
pg->unlock();
}
void PGRecovery::run(
OSD *osd,
OSDShard *sdata,

View File

@ -428,6 +428,14 @@ class PGScrubDigestUpdate : public PGScrubItem {
void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
};
class PGScrubGotLocalMap : public PGScrubItem {
public:
PGScrubGotLocalMap(spg_t pg, epoch_t epoch_queued)
: PGScrubItem{pg, epoch_queued, "PGScrubGotLocalMap"}
{}
void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
};
class PGScrubGotReplMaps : public PGScrubItem {
public:
PGScrubGotReplMaps(spg_t pg, epoch_t epoch_queued)
@ -436,6 +444,14 @@ class PGScrubGotReplMaps : public PGScrubItem {
void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
};
class PGScrubMapsCompared : public PGScrubItem {
public:
PGScrubMapsCompared(spg_t pg, epoch_t epoch_queued)
: PGScrubItem{pg, epoch_queued, "PGScrubMapsCompared"}
{}
void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
};
class PGRepScrub : public PGScrubItem {
public:
PGRepScrub(spg_t pg, epoch_t epoch_queued) : PGScrubItem{pg, epoch_queued, "PGRepScrub"}
@ -459,6 +475,38 @@ class PGScrubReplicaPushes : public PGScrubItem {
void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
};
class PGScrubScrubFinished : public PGScrubItem {
public:
PGScrubScrubFinished(spg_t pg, epoch_t epoch_queued)
: PGScrubItem{pg, epoch_queued, "PGScrubScrubFinished"}
{}
void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
};
class PGScrubGetNextChunk : public PGScrubItem {
public:
PGScrubGetNextChunk(spg_t pg, epoch_t epoch_queued)
: PGScrubItem{pg, epoch_queued, "PGScrubGetNextChunk"}
{}
void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
};
class PGScrubChunkIsBusy : public PGScrubItem {
public:
PGScrubChunkIsBusy(spg_t pg, epoch_t epoch_queued)
: PGScrubItem{pg, epoch_queued, "PGScrubChunkIsBusy"}
{}
void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
};
class PGScrubChunkIsFree : public PGScrubItem {
public:
PGScrubChunkIsFree(spg_t pg, epoch_t epoch_queued)
: PGScrubItem{pg, epoch_queued, "PGScrubChunkIsFree"}
{}
void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
};
class PGRecovery : public PGOpQueueable {
epoch_t epoch_queued;
uint64_t reserved_pushes;

View File

@ -183,15 +183,10 @@ NewChunk::NewChunk(my_context ctx) : my_base(ctx)
scrbr->get_preemptor().adjust_parameters();
// choose range to work on
bool got_a_chunk = scrbr->select_range();
if (got_a_chunk) {
dout(15) << __func__ << " selection OK" << dendl;
post_event(SelectedChunkFree{});
} else {
dout(10) << __func__ << " selected chunk is busy" << dendl;
// wait until we are available (transitioning to Blocked)
post_event(ChunkIsBusy{});
}
// select_range_n_notify() will signal either SelectedChunkFree or
// ChunkIsBusy. If 'busy', we transition to Blocked, and wait for the
// range to become available.
scrbr->select_range_n_notify();
}
sc::result NewChunk::react(const SelectedChunkFree&)
@ -290,7 +285,6 @@ BuildMap::BuildMap(my_context ctx) : my_base(ctx)
} else if (ret < 0) {
dout(10) << "BuildMap::BuildMap() Error! Aborting. Ret: " << ret << dendl;
// scrbr->mark_local_map_ready();
post_event(InternalError{});
} else {
@ -342,14 +336,29 @@ WaitReplicas::WaitReplicas(my_context ctx) : my_base(ctx)
post_event(GotReplicas{});
}
/**
* note: now that maps_compare_n_cleanup() is "futurized"(*), and we remain in this state
* for a while even after we got all our maps, we must prevent are_all_maps_available()
* (actually - the code after the if()) from being called more than once.
* This is basically a separate state, but it's too transitory and artificial to justify
* the cost of a separate state.
* (*) "futurized" - in Crimson, the call to maps_compare_n_cleanup() returns immediately
* after initiating the process. The actual termination of the maps comparing etc' is
* signalled via an event. As we share the code with "classic" OSD, here too
* maps_compare_n_cleanup() is responsible for signalling the completion of the
* processing.
*/
sc::result WaitReplicas::react(const GotReplicas&)
{
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
dout(10) << "WaitReplicas::react(const GotReplicas&)" << dendl;
if (scrbr->are_all_maps_available()) {
if (!all_maps_already_called && scrbr->are_all_maps_available()) {
dout(10) << "WaitReplicas::react(const GotReplicas&) got all" << dendl;
all_maps_already_called = true;
// were we preempted?
if (scrbr->get_preemptor().disable_and_test()) { // a test&set
@ -359,8 +368,9 @@ sc::result WaitReplicas::react(const GotReplicas&)
} else {
// maps_compare_n_cleanup() will arrange for MapsCompared event to be sent:
scrbr->maps_compare_n_cleanup();
return transit<WaitDigestUpdate>();
return discard_event();
}
} else {
return discard_event();
@ -383,24 +393,14 @@ sc::result WaitDigestUpdate::react(const DigestUpdate&)
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
dout(10) << "WaitDigestUpdate::react(const DigestUpdate&)" << dendl;
switch (scrbr->on_digest_updates()) {
// on_digest_updates() will either:
// - do nothing - if we are still waiting for updates, or
// - finish the scrubbing of the current chunk, and:
// - send NextChunk, or
// - send ScrubFinished
case Scrub::FsmNext::goto_notactive:
// scrubbing is done
return transit<NotActive>();
case Scrub::FsmNext::next_chunk:
// go get the next chunk
return transit<PendingTimer>();
case Scrub::FsmNext::do_discard:
// still waiting for more updates
return discard_event();
}
__builtin_unreachable(); // Prevent a gcc warning.
// Adding a phony 'default:' above is wrong: (a) prevents a
// warning if FsmNext is extended, and (b) elicits a correct
// warning from Clang
scrbr->on_digest_updates();
return discard_event();
}
ScrubMachine::ScrubMachine(PG* pg, ScrubMachineListener* pg_scrub)
@ -468,39 +468,18 @@ sc::result ActiveReplica::react(const SchedReplica&)
if (scrbr->get_preemptor().was_preempted()) {
dout(10) << "replica scrub job preempted" << dendl;
scrbr->send_replica_map(PreemptionNoted::preempted);
scrbr->send_preempted_replica();
scrbr->replica_handling_done();
return transit<NotActive>();
}
// start or check progress of build_replica_map_chunk()
auto ret = scrbr->build_replica_map_chunk();
dout(15) << "ActiveReplica::react(const SchedReplica&) Ret: " << ret << dendl;
if (ret == -EINPROGRESS) {
// must wait for the backend to finish. No external event source.
// build_replica_map_chunk() has already requeued a SchedReplica
// event.
dout(20) << "waiting for the backend..." << dendl;
return discard_event();
}
if (ret < 0) {
// the existing code ignores this option, treating an error
// report as a success.
dout(1) << "Error! Aborting. ActiveReplica::react(SchedReplica) Ret: " << ret
<< dendl;
scrbr->replica_handling_done();
auto ret_init = scrbr->build_replica_map_chunk();
if (ret_init != -EINPROGRESS) {
return transit<NotActive>();
}
// the local map was created. Send it to the primary.
scrbr->send_replica_map(PreemptionNoted::no_preemption);
scrbr->replica_handling_done();
return transit<NotActive>();
return discard_event();
}
/**

View File

@ -36,58 +36,79 @@ namespace mpl = ::boost::mpl;
void on_event_creation(std::string_view nm);
void on_event_discard(std::string_view nm);
#define MEV(E) \
struct E : sc::event<E> { \
inline static int actv{0}; \
E() \
{ \
if (!actv++) \
on_event_creation(#E); \
} \
~E() \
{ \
if (!--actv) \
on_event_discard(#E); \
} \
#define MEV(E) \
struct E : sc::event<E> { \
inline static int actv{0}; \
E() \
{ \
if (!actv++) \
on_event_creation(#E); \
} \
~E() \
{ \
if (!--actv) \
on_event_discard(#E); \
} \
void print(std::ostream* out) const { *out << #E; } \
std::string_view print() const { return #E; } \
};
MEV(RemotesReserved) ///< all replicas have granted our reserve request
MEV(RemotesReserved) ///< all replicas have granted our reserve request
MEV(ReservationFailure) ///< a reservation request has failed
MEV(StartScrub) ///< initiate a new scrubbing session (relevant if we are a Primary)
MEV(AfterRepairScrub) ///< initiate a new scrubbing session. Only triggered at Recovery
///< completion.
MEV(Unblocked) ///< triggered when the PG unblocked an object that was marked for
MEV(Unblocked) ///< triggered when the PG unblocked an object that was marked for
///< scrubbing. Via the PGScrubUnblocked op
MEV(InternalSchedScrub)
MEV(SelectedChunkFree)
MEV(ChunkIsBusy)
MEV(ActivePushesUpd) ///< Update to active_pushes. 'active_pushes' represents recovery
///< that is in-flight to the local ObjectStore
MEV(UpdatesApplied) // external
MEV(ActivePushesUpd) ///< Update to active_pushes. 'active_pushes' represents recovery
///< that is in-flight to the local ObjectStore
MEV(UpdatesApplied) // external
MEV(InternalAllUpdates) ///< the internal counterpart of UpdatesApplied
MEV(GotReplicas) ///< got a map from a replica
MEV(GotReplicas) ///< got a map from a replica
MEV(IntBmPreempted) ///< internal - BuildMap preempted. Required, as detected within the
///< ctor
MEV(InternalError)
MEV(IntLocalMapDone)
MEV(DigestUpdate) ///< external. called upon success of a MODIFY op. See
///< scrub_snapshot_metadata()
MEV(AllChunksDone)
MEV(StartReplica) ///< initiating replica scrub. replica_scrub_op() -> OSD Q ->
///< replica_scrub()
MEV(MapsCompared) ///< (Crimson) maps_compare_n_cleanup() transactions are done
MEV(StartReplica) ///< initiating replica scrub.
MEV(StartReplicaNoWait) ///< 'start replica' when there are no pending updates
MEV(SchedReplica)
MEV(ReplicaPushesUpd) ///< Update to active_pushes. 'active_pushes' represents recovery
///< that is in-flight to the local ObjectStore
MEV(FullReset) ///< guarantee that the FSM is in the quiescent state (i.e. NotActive)
MEV(NextChunk) ///< finished handling this chunk. Go get the next one
MEV(ScrubFinished) ///< all chunks handled
struct NotActive; ///< the quiescent state. No active scrubbing.
struct ReservingReplicas; ///< securing scrub resources from replicas' OSDs
@ -163,6 +184,7 @@ struct BuildMap;
struct DrainReplMaps; ///< a problem during BuildMap. Wait for all replicas to report,
///< then restart.
struct WaitReplicas; ///< wait for all replicas to report
struct WaitDigestUpdate;
struct ActiveScrubbing : sc::state<ActiveScrubbing, ScrubMachine, PendingTimer> {
@ -271,15 +293,22 @@ struct WaitReplicas : sc::state<WaitReplicas, ActiveScrubbing> {
explicit WaitReplicas(my_context ctx);
using reactions =
mpl::list<sc::custom_reaction<GotReplicas>, sc::deferral<DigestUpdate>>;
mpl::list<sc::custom_reaction<GotReplicas>, // all replicas are accounted for
sc::transition<MapsCompared, WaitDigestUpdate>,
sc::deferral<DigestUpdate> // might arrive before we've reached WDU
>;
sc::result react(const GotReplicas&);
bool all_maps_already_called{false}; // see comment in react code
};
struct WaitDigestUpdate : sc::state<WaitDigestUpdate, ActiveScrubbing> {
explicit WaitDigestUpdate(my_context ctx);
using reactions = mpl::list<sc::custom_reaction<DigestUpdate>>;
using reactions = mpl::list<sc::custom_reaction<DigestUpdate>,
sc::transition<NextChunk, PendingTimer>,
sc::transition<ScrubFinished, NotActive>>;
sc::result react(const DigestUpdate&);
};
@ -304,8 +333,9 @@ struct ReplicaWaitUpdates : sc::state<ReplicaWaitUpdates, ScrubMachine> {
struct ActiveReplica : sc::state<ActiveReplica, ScrubMachine> {
explicit ActiveReplica(my_context ctx);
using reactions =
mpl::list<sc::custom_reaction<SchedReplica>, sc::custom_reaction<FullReset>>;
using reactions = mpl::list<sc::custom_reaction<SchedReplica>,
sc::custom_reaction<FullReset>,
sc::transition<ScrubFinished, NotActive>>;
sc::result react(const SchedReplica&);
sc::result react(const FullReset&);

View File

@ -12,16 +12,13 @@
namespace Scrub {
/// used when PgScrubber is called by the scrub-machine, to tell the FSM
/// how to continue
enum class FsmNext { do_discard, next_chunk, goto_notactive };
enum class PreemptionNoted { no_preemption, preempted };
/// the interface exposed by the PgScrubber into its internal
/// preemption_data object
struct preemption_t {
virtual ~preemption_t(){};
virtual ~preemption_t() = default;
[[nodiscard]] virtual bool is_preemptable() const = 0;
@ -47,9 +44,14 @@ struct preemption_t {
struct ScrubMachineListener {
virtual ~ScrubMachineListener(){};
struct MsgAndEpoch {
MessageRef m_msg;
epoch_t m_epoch;
};
virtual bool select_range() = 0;
virtual ~ScrubMachineListener() = default;
virtual void select_range_n_notify() = 0;
/// walk the log to find the latest update that affects our chunk
virtual eversion_t search_log_for_updates() const = 0;
@ -62,32 +64,48 @@ struct ScrubMachineListener {
virtual int build_replica_map_chunk() = 0;
virtual void scrub_compare_maps() = 0;
virtual void on_init() = 0;
virtual void on_replica_init() = 0;
virtual void replica_handling_done() = 0;
// no virtual void discard_reservation_by_primary() = 0;
/// the version of 'scrub_clear_state()' that does not try to invoke FSM services
/// (thus can be called from FSM reactions)
virtual void clear_pgscrub_state() = 0;
/*
* Send an 'InternalSchedScrub' FSM event either immediately, or - if 'm_need_sleep'
* is asserted - after a configuration-dependent timeout.
*/
virtual void add_delayed_scheduling() = 0;
/**
* @returns have we asked at least one replica?
* 'false' means we are configured with no replicas, and
* should expect no maps to arrive.
* Ask all replicas for their scrub maps for the current chunk.
*/
virtual bool get_replicas_maps(bool replica_can_preempt) = 0;
virtual void get_replicas_maps(bool replica_can_preempt) = 0;
virtual Scrub::FsmNext on_digest_updates() = 0;
virtual void on_digest_updates() = 0;
virtual void send_replica_map(Scrub::PreemptionNoted was_preempted) = 0;
/**
* Prepare a MOSDRepScrubMap message carrying the requested scrub map
* @param was_preempted - were we preempted?
* @return the message, and the current value of 'm_replica_min_epoch' (which is
* used when sending the message, but will be overwritten before that).
*/
[[nodiscard]] virtual MsgAndEpoch prep_replica_map_msg(
Scrub::PreemptionNoted was_preempted) = 0;
/**
* Send to the primary the pre-prepared message containing the requested map
*/
virtual void send_replica_map(const MsgAndEpoch& preprepared) = 0;
/**
* Let the primary know that we were preempted while trying to build the
* requested map.
*/
virtual void send_preempted_replica() = 0;
[[nodiscard]] virtual bool has_pg_marked_new_updates() const = 0;
@ -102,7 +120,7 @@ struct ScrubMachineListener {
* rep maps are available:
* - the maps are compared
* - the scrub region markers (start_ & end_) are advanced
* - callbacks and ops that were pending are free to run
* - callbacks and ops that were pending are allowed to run
*/
virtual void maps_compare_n_cleanup() = 0;

View File

@ -108,7 +108,7 @@ ostream& operator<<(ostream& out, const requested_scrub_t& sf);
*/
struct ScrubPgIF {
virtual ~ScrubPgIF(){};
virtual ~ScrubPgIF() = default;
friend ostream& operator<<(ostream& out, const ScrubPgIF& s) { return s.show(out); }
@ -138,6 +138,20 @@ struct ScrubPgIF {
virtual void send_sched_replica(epoch_t epoch_queued) = 0;
virtual void send_full_reset(epoch_t epoch_queued) = 0;
virtual void send_chunk_free(epoch_t epoch_queued) = 0;
virtual void send_chunk_busy(epoch_t epoch_queued) = 0;
virtual void send_local_map_done(epoch_t epoch_queued) = 0;
virtual void send_get_next_chunk(epoch_t epoch_queued) = 0;
virtual void send_scrub_is_finished(epoch_t epoch_queued) = 0;
virtual void send_maps_compared(epoch_t epoch_queued) = 0;
// --------------------------------------------------
[[nodiscard]] virtual bool are_callbacks_pending()