crimson/osd: scrub integration with crimson

Signed-off-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Samuel Just 2023-08-12 00:38:40 +00:00
parent b1455202b1
commit d75ea50d5a
16 changed files with 1302 additions and 31 deletions

View File

@ -11,9 +11,11 @@
#include <seastar/core/future.hh>
#include "crimson/admin/admin_socket.h"
#include "crimson/common/log.h"
#include "crimson/osd/osd.h"
#include "crimson/osd/pg.h"
SET_SUBSYS(osd);
using crimson::osd::OSD;
using crimson::osd::PG;
@ -148,6 +150,43 @@ public:
}
};
template <bool deep>
class ScrubCommand : public PGCommand {
public:
explicit ScrubCommand(crimson::osd::OSD& osd) :
PGCommand{
osd,
deep ? "deep_scrub" : "scrub",
"",
deep ? "deep scrub pg" : "scrub pg"}
{}
seastar::future<tell_result_t>
do_command(Ref<PG> pg,
const cmdmap_t& cmdmap,
std::string_view format,
ceph::bufferlist&&) const final
{
LOG_PREFIX(ScrubCommand::do_command);
DEBUGDPP("deep: {}", *pg, deep);
return PG::interruptor::with_interruption([pg] {
pg->scrubber.handle_scrub_requested(deep);
return PG::interruptor::now();
}, [FNAME, pg](std::exception_ptr ep) {
DEBUGDPP("interrupted with {}", *pg, ep);
}, pg).then([format] {
std::unique_ptr<Formatter> f{
Formatter::create(format, "json-pretty", "json-pretty")
};
f->open_object_section("scrub");
f->dump_bool("deep", deep);
f->dump_stream("stamp") << ceph_clock_now();
f->close_section();
return seastar::make_ready_future<tell_result_t>(std::move(f));
});
}
};
} // namespace crimson::admin::pg
namespace crimson::admin {
@ -164,4 +203,9 @@ make_asok_hook<crimson::admin::pg::QueryCommand>(crimson::osd::OSD& osd);
template std::unique_ptr<AdminSocketHook>
make_asok_hook<crimson::admin::pg::MarkUnfoundLostCommand>(crimson::osd::OSD& osd);
template std::unique_ptr<AdminSocketHook>
make_asok_hook<crimson::admin::pg::ScrubCommand<true>>(crimson::osd::OSD& osd);
template std::unique_ptr<AdminSocketHook>
make_asok_hook<crimson::admin::pg::ScrubCommand<false>>(crimson::osd::OSD& osd);
} // namespace crimson::admin

View File

@ -6,5 +6,7 @@ namespace crimson::admin::pg {
class QueryCommand;
class MarkUnfoundLostCommand;
template <bool deep>
class ScrubCommand;
} // namespace crimson::admin::pg

View File

@ -28,6 +28,7 @@ add_executable(crimson-osd
osd_operations/background_recovery.cc
osd_operations/recovery_subrequest.cc
osd_operations/snaptrim_event.cc
osd_operations/scrub_events.cc
pg_recovery.cc
recovery_backend.cc
replicated_recovery_backend.cc
@ -35,6 +36,7 @@ add_executable(crimson-osd
scheduler/mclock_scheduler.cc
scrub/scrub_machine.cc
scrub/scrub_validator.cc
scrub/pg_scrubber.cc
osdmap_gate.cc
pg_activation_blocker.cc
pg_map.cc

View File

@ -1040,6 +1040,7 @@ std::pair<object_info_t, ObjectContextRef> OpsExecuter::prepare_clone(
void OpsExecuter::apply_stats()
{
pg->get_peering_state().apply_op_stats(get_target(), delta_stats);
pg->scrubber.handle_op_stats(get_target(), delta_stats);
pg->publish_stats_to_osd();
}

View File

@ -54,6 +54,7 @@
#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operations/recovery_subrequest.h"
#include "crimson/osd/osd_operations/replicated_request.h"
#include "crimson/osd/osd_operations/scrub_events.h"
#include "crimson/osd/osd_operation_external_tracking.h"
#include "crimson/crush/CrushLocation.h"
@ -644,6 +645,8 @@ seastar::future<> OSD::start_asok_admin()
// PG commands
asok->register_command(make_asok_hook<pg::QueryCommand>(*this));
asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this));
asok->register_command(make_asok_hook<pg::ScrubCommand<true>>(*this));
asok->register_command(make_asok_hook<pg::ScrubCommand<false>>(*this));
// ops commands
asok->register_command(
make_asok_hook<DumpInFlightOpsHook>(
@ -819,7 +822,13 @@ OSD::do_ms_dispatch(
case MSG_OSD_REPOPREPLY:
return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
case MSG_OSD_SCRUB2:
return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
return handle_scrub_command(
conn, boost::static_pointer_cast<MOSDScrub2>(m));
case MSG_OSD_REP_SCRUB:
case MSG_OSD_REP_SCRUBMAP:
return handle_scrub_message(
conn,
boost::static_pointer_cast<MOSDFastDispatchOp>(m));
case MSG_OSD_PG_UPDATE_LOG_MISSING:
return handle_update_log_missing(conn, boost::static_pointer_cast<
MOSDPGUpdateLogMissing>(m));
@ -1220,7 +1229,7 @@ seastar::future<> OSD::handle_rep_op_reply(
});
}
seastar::future<> OSD::handle_scrub(
seastar::future<> OSD::handle_scrub_command(
crimson::net::ConnectionRef conn,
Ref<MOSDScrub2> m)
{
@ -1230,17 +1239,22 @@ seastar::future<> OSD::handle_scrub(
}
return seastar::parallel_for_each(std::move(m->scrub_pgs),
[m, conn, this](spg_t pgid) {
pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
pgid.shard};
PeeringState::RequestScrub scrub_request{m->deep, m->repair};
return pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
conn,
from_shard,
pgid,
PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second;
return pg_shard_manager.start_pg_operation<
crimson::osd::ScrubRequested
>(m->deep, conn, m->epoch, pgid).second;
});
}
seastar::future<> OSD::handle_scrub_message(
crimson::net::ConnectionRef conn,
Ref<MOSDFastDispatchOp> m)
{
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return pg_shard_manager.start_pg_operation<
crimson::osd::ScrubMessage
>(m, conn, m->get_min_epoch(), m->get_spg()).second;
}
seastar::future<> OSD::handle_mark_me_down(
crimson::net::ConnectionRef conn,
Ref<MOSDMarkMeDown> m)

View File

@ -206,8 +206,10 @@ private:
Ref<MOSDPeeringOp> m);
seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
Ref<MOSDFastDispatchOp> m);
seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
Ref<MOSDScrub2> m);
seastar::future<> handle_scrub_command(crimson::net::ConnectionRef conn,
Ref<MOSDScrub2> m);
seastar::future<> handle_scrub_message(crimson::net::ConnectionRef conn,
Ref<MOSDFastDispatchOp> m);
seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
Ref<MOSDMarkMeDown> m);

View File

@ -54,6 +54,11 @@ enum class OperationTypeCode {
logmissing_request_reply,
snaptrim_event,
snaptrimobj_subevent,
scrub_requested,
scrub_message,
scrub_find_range,
scrub_reserve_range,
scrub_scan,
last_op
};
@ -71,6 +76,11 @@ static constexpr const char* const OP_NAMES[] = {
"logmissing_request_reply",
"snaptrim_event",
"snaptrimobj_subevent",
"scrub_requested",
"scrub_message",
"scrub_find_range",
"scrub_reserve_range",
"scrub_scan",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:

View File

@ -14,6 +14,7 @@
#include "crimson/osd/osd_operations/snaptrim_event.h"
#include "crimson/osd/pg_activation_blocker.h"
#include "crimson/osd/pg_map.h"
#include "crimson/osd/scrub/pg_scrubber.h"
namespace crimson::osd {
@ -30,6 +31,7 @@ struct LttngBackend
PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend,
PGActivationBlocker::BlockingEvent::Backend,
scrub::PGScrubber::BlockingEvent::Backend,
ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend,
ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
@ -91,6 +93,11 @@ struct LttngBackend
const PGActivationBlocker& blocker) override {
}
void handle(scrub::PGScrubber::BlockingEvent& ev,
const Operation& op,
const scrub::PGScrubber& blocker) override {
}
void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev,
const Operation& op,
const ClientRequest::PGPipeline::RecoverMissing& blocker) override {
@ -136,6 +143,7 @@ struct HistoricBackend
PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend,
PGActivationBlocker::BlockingEvent::Backend,
scrub::PGScrubber::BlockingEvent::Backend,
ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend,
ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
@ -197,6 +205,11 @@ struct HistoricBackend
const PGActivationBlocker& blocker) override {
}
void handle(scrub::PGScrubber::BlockingEvent& ev,
const Operation& op,
const scrub::PGScrubber& blocker) override {
}
void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev,
const Operation& op,
const ClientRequest::PGPipeline::RecoverMissing& blocker) override {

View File

@ -250,21 +250,31 @@ ClientRequest::process_op(
DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub",
*pg, *this, this_instance_id);
op_info.set_from_op(&*m, *pg->get_osdmap());
return pg->with_locked_obc(
m->get_hobj(), op_info,
[FNAME, this, pg, this_instance_id, &ihref](
auto head, auto obc) mutable {
DEBUGDPP("{}.{}: got obc {}, entering process stage",
*pg, *this, this_instance_id, obc->obs);
return ihref.enter_stage<interruptor>(
client_pp(*pg).process, *this
).then_interruptible(
[FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
DEBUGDPP("{}.{}: in process stage, calling do_process",
*pg, *this, this_instance_id);
return ihref.enter_blocker(
*this,
pg->scrubber,
&decltype(pg->scrubber)::wait_scrub,
m->get_hobj()
).then_interruptible(
[FNAME, this, pg, this_instance_id, &ihref]() mutable {
DEBUGDPP("{}.{}: past scrub blocker, getting obc",
*pg, *this, this_instance_id);
return pg->with_locked_obc(
m->get_hobj(), op_info,
[FNAME, this, pg, this_instance_id, &ihref](
auto head, auto obc) mutable {
DEBUGDPP("{}.{}: got obc {}, entering process stage",
*pg, *this, this_instance_id, obc->obs);
return ihref.enter_stage<interruptor>(
client_pp(*pg).process, *this
).then_interruptible(
[FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
DEBUGDPP("{}.{}: in process stage, calling do_process",
*pg, *this, this_instance_id);
return do_process(ihref, pg, obc, this_instance_id);
});
});
});
});
});
}
});
@ -354,7 +364,7 @@ ClientRequest::do_process(
[FNAME, this, pg, this_instance_id, &ihref](
auto submitted, auto all_completed) mutable {
return submitted.then_interruptible(
[FNAME, this, pg, this_instance_id, &ihref] {
[this, pg, &ihref] {
return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
}).then_interruptible(
[FNAME, this, pg, this_instance_id,
@ -371,12 +381,12 @@ ClientRequest::do_process(
return conn->send(std::move(reply));
});
}, crimson::ct_error::eagain::handle(
[FNAME, this, pg, this_instance_id, &ihref]() mutable {
[this, pg, this_instance_id, &ihref]() mutable {
return process_op(ihref, pg, this_instance_id);
}));
});
}, crimson::ct_error::eagain::handle(
[FNAME, this, pg, this_instance_id, &ihref]() mutable {
[this, pg, this_instance_id, &ihref]() mutable {
return process_op(ihref, pg, this_instance_id);
}));
}

View File

@ -17,6 +17,7 @@
#include "crimson/osd/osd_operations/common/pg_pipeline.h"
#include "crimson/osd/pg_activation_blocker.h"
#include "crimson/osd/pg_map.h"
#include "crimson/osd/scrub/pg_scrubber.h"
#include "crimson/common/type_helpers.h"
#include "crimson/common/utility.h"
#include "messages/MOSDOp.h"
@ -103,6 +104,7 @@ public:
PGPipeline::WaitForActive::BlockingEvent,
PGActivationBlocker::BlockingEvent,
PGPipeline::RecoverMissing::BlockingEvent,
scrub::PGScrubber::BlockingEvent,
PGPipeline::GetOBC::BlockingEvent,
PGPipeline::Process::BlockingEvent,
PGPipeline::WaitRepop::BlockingEvent,

View File

@ -0,0 +1,396 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "crimson/common/log.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/osd_connection_priv.h"
#include "messages/MOSDRepScrubMap.h"
#include "scrub_events.h"
SET_SUBSYS(osd);
namespace crimson::osd {
template <class T>
PGPeeringPipeline &RemoteScrubEventBaseT<T>::get_peering_pipeline(PG &pg)
{
return pg.peering_request_pg_pipeline;
}
template <class T>
ConnectionPipeline &RemoteScrubEventBaseT<T>::get_connection_pipeline()
{
return get_osd_priv(conn.get()).peering_request_conn_pipeline;
}
template <class T>
PerShardPipeline &RemoteScrubEventBaseT<T>::get_pershard_pipeline(
ShardServices &shard_services)
{
return shard_services.get_client_request_pipeline();
}
template <class T>
seastar::future<> RemoteScrubEventBaseT<T>::with_pg(
ShardServices &shard_services, Ref<PG> pg)
{
LOG_PREFIX(RemoteEventBaseT::with_pg);
return interruptor::with_interruption([FNAME, this, pg] {
DEBUGDPP("{} pg present", *pg, *that());
return this->template enter_stage<interruptor>(
get_peering_pipeline(*pg).await_map
).then_interruptible([this, pg] {
return this->template with_blocking_event<
PG_OSDMapGate::OSDMapBlocker::BlockingEvent
>([this, pg](auto &&trigger) {
return pg->osdmap_gate.wait_for_map(
std::move(trigger), get_epoch());
});
}).then_interruptible([this, pg](auto) {
return this->template enter_stage<interruptor>(
get_peering_pipeline(*pg).process);
}).then_interruptible([this, pg] {
return handle_event(*pg);
});
}, [FNAME, pg, this](std::exception_ptr ep) {
DEBUGDPP("{} interrupted with {}", *pg, *that(), ep);
}, pg);
}
ScrubRequested::ifut<> ScrubRequested::handle_event(PG &pg)
{
pg.scrubber.handle_scrub_requested(deep);
return seastar::now();
}
ScrubMessage::ifut<> ScrubMessage::handle_event(PG &pg)
{
pg.scrubber.handle_scrub_message(*m);
return seastar::now();
}
template class RemoteScrubEventBaseT<ScrubRequested>;
template class RemoteScrubEventBaseT<ScrubMessage>;
template <typename T>
ScrubAsyncOpT<T>::ScrubAsyncOpT(Ref<PG> pg) : pg(pg) {}
template <typename T>
typename ScrubAsyncOpT<T>::template ifut<> ScrubAsyncOpT<T>::start()
{
LOG_PREFIX(ScrubAsyncOpT::start);
DEBUGDPP("{} starting", *pg, *this);
return run(*pg);
}
ScrubFindRange::ifut<> ScrubFindRange::run(PG &pg)
{
LOG_PREFIX(ScrubFindRange::run);
using crimson::common::local_conf;
return interruptor::make_interruptible(
pg.shard_services.get_store().list_objects(
pg.get_collection_ref(),
ghobject_t(begin, ghobject_t::NO_GEN, pg.get_pgid().shard),
ghobject_t::get_max(),
local_conf().get_val<int64_t>("osd_scrub_chunk_max")
)
).then_interruptible([FNAME, this, &pg](auto ret) {
auto &[_, next] = ret;
// We rely on seeing an entire set of snapshots in a single chunk
auto end = next.hobj.get_max_object_boundary();
DEBUGDPP("got next.hobj: {}, returning begin, end: {}, {}",
pg, next.hobj, begin, end);
pg.scrubber.machine.process_event(
scrub::ScrubContext::request_range_complete_t{begin, end});
});
}
template class ScrubAsyncOpT<ScrubFindRange>;
ScrubReserveRange::ifut<> ScrubReserveRange::run(PG &pg)
{
LOG_PREFIX(ScrubReserveRange::run);
DEBUGDPP("", pg);
return pg.background_process_lock.lock(
).then_interruptible([FNAME, this, &pg] {
DEBUGDPP("pg_background_io_mutex locked", pg);
auto &scrubber = pg.scrubber;
ceph_assert(!scrubber.blocked);
scrubber.blocked = scrub::blocked_range_t{begin, end};
blocked_set = true;
auto& log = pg.peering_state.get_pg_log().get_log().log;
auto p = find_if(
log.crbegin(), log.crend(),
[this](const auto& e) -> bool {
return e.soid >= begin && e.soid < end;
});
if (p == log.crend()) {
return scrubber.machine.process_event(
scrub::ScrubContext::reserve_range_complete_t{eversion_t{}});
} else {
return scrubber.machine.process_event(
scrub::ScrubContext::reserve_range_complete_t{p->version});
}
}).finally([&pg, this] {
if (!blocked_set) {
pg.background_process_lock.unlock();
}
});
}
template class ScrubAsyncOpT<ScrubReserveRange>;
ScrubScan::ifut<> ScrubScan::run(PG &pg)
{
LOG_PREFIX(ScrubScan::start);
// legacy value, unused
ret.valid_through = pg.get_info().last_update;
DEBUGDPP("begin: {}, end: {}", pg, begin, end);
return interruptor::make_interruptible(
pg.shard_services.get_store().list_objects(
pg.get_collection_ref(),
ghobject_t(begin, ghobject_t::NO_GEN, pg.get_pgid().shard),
ghobject_t(end, ghobject_t::NO_GEN, pg.get_pgid().shard),
std::numeric_limits<uint64_t>::max())
).then_interruptible([FNAME, this, &pg](auto &&result) {
DEBUGDPP("listed {} objects", pg, std::get<0>(result).size());
return seastar::do_with(
std::move(std::get<0>(result)),
[this, &pg](auto &objects) {
return interruptor::do_for_each(
objects,
[this, &pg](auto &obj) {
if (obj.is_pgmeta() || obj.hobj.is_temp()) {
return interruptor::now();
} else {
return scan_object(pg, obj);
}
});
});
}).then_interruptible([FNAME, this, &pg] {
if (local) {
DEBUGDPP("complete, submitting local event", pg);
pg.scrubber.handle_event(
scrub::ScrubContext::scan_range_complete_t(
pg.get_pg_whoami(),
std::move(ret)));
return seastar::now();
} else {
DEBUGDPP("complete, sending response to primary", pg);
auto m = crimson::make_message<MOSDRepScrubMap>(
spg_t(pg.get_pgid().pgid, pg.get_primary().shard),
pg.get_osdmap_epoch(),
pg.get_pg_whoami());
encode(ret, m->get_data());
pg.scrubber.handle_event(
scrub::ScrubContext::generate_and_submit_chunk_result_complete_t{});
return pg.shard_services.send_to_osd(
pg.get_primary().osd,
std::move(m),
pg.get_osdmap_epoch());
}
});
}
ScrubScan::ifut<> ScrubScan::scan_object(
PG &pg,
const ghobject_t &obj)
{
LOG_PREFIX(ScrubScan::scan_object);
DEBUGDPP("obj: {}", pg, obj);
auto &entry = ret.objects[obj.hobj];
return interruptor::make_interruptible(
pg.shard_services.get_store().stat(
pg.get_collection_ref(),
obj)
).then_interruptible([FNAME, &pg, &obj, &entry](struct stat obj_stat) {
DEBUGDPP("obj: {}, stat complete, size {}", pg, obj, obj_stat.st_size);
entry.size = obj_stat.st_size;
return pg.shard_services.get_store().get_attrs(
pg.get_collection_ref(),
obj);
}).safe_then_interruptible([FNAME, &pg, &obj, &entry](auto &&attrs) {
DEBUGDPP("obj: {}, got {} attrs", pg, obj, attrs.size());
for (auto &i : attrs) {
i.second.rebuild();
if (i.second.length() == 0) {
entry.attrs[i.first];
} else {
entry.attrs.emplace(i.first, i.second.front());
}
}
}).handle_error_interruptible(
ct_error::all_same_way([FNAME, &pg, &obj, &entry](auto e) {
DEBUGDPP("obj: {} stat error", pg, obj);
entry.stat_error = true;
})
).then_interruptible([FNAME, this, &pg, &obj] {
if (deep) {
DEBUGDPP("obj: {} doing deep scan", pg, obj);
return deep_scan_object(pg, obj);
} else {
return interruptor::now();
}
});
}
struct obj_scrub_progress_t {
// nullopt once complete
std::optional<uint64_t> offset = 0;
ceph::buffer::hash data_hash{std::numeric_limits<uint32_t>::max()};
bool header_done = false;
std::optional<std::string> next_key;
bool keys_done = false;
ceph::buffer::hash omap_hash{std::numeric_limits<uint32_t>::max()};
};
ScrubScan::ifut<> ScrubScan::deep_scan_object(
PG &pg,
const ghobject_t &obj)
{
LOG_PREFIX(ScrubScan::deep_scan_object);
DEBUGDPP("obj: {}", pg, obj);
using crimson::common::local_conf;
auto &entry = ret.objects[obj.hobj];
return interruptor::repeat(
[FNAME, this, progress = obj_scrub_progress_t(),
&obj, &entry, &pg]() mutable
-> interruptible_future<seastar::stop_iteration> {
if (progress.offset) {
DEBUGDPP("op: {}, obj: {}, progress: {} scanning data",
pg, *this, obj, progress);
const auto stride = local_conf().get_val<Option::size_t>(
"osd_deep_scrub_stride");
return pg.shard_services.get_store().read(
pg.get_collection_ref(),
obj,
*(progress.offset),
stride
).safe_then([stride, &progress, &entry](auto bl) {
progress.data_hash << bl;
if (bl.length() < stride) {
progress.offset = std::nullopt;
entry.digest = progress.data_hash.digest();
entry.digest_present = true;
} else {
ceph_assert(stride == bl.length());
*(progress.offset) += stride;
}
}).handle_error(
ct_error::all_same_way([&progress, &entry](auto e) {
entry.read_error = true;
progress.offset = std::nullopt;
})
).then([] {
return interruptor::make_interruptible(
seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::no));
});
} else if (!progress.header_done) {
DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header",
pg, *this, obj, progress);
return pg.shard_services.get_store().omap_get_header(
pg.get_collection_ref(),
obj
).safe_then([&progress](auto bl) {
progress.omap_hash << bl;
}).handle_error(
ct_error::enodata::handle([] {}),
ct_error::all_same_way([&entry](auto e) {
entry.read_error = true;
})
).then([&progress] {
progress.header_done = true;
return interruptor::make_interruptible(
seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::no));
});
} else if (!progress.keys_done) {
DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys",
pg, *this, obj, progress);
return pg.shard_services.get_store().omap_get_values(
pg.get_collection_ref(),
obj,
progress.next_key
).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
const auto &[done, omap] = result;
DEBUGDPP("op: {}, obj: {}, progress: {} got {} keys",
pg, *this, obj, progress, omap.size());
for (const auto &p : omap) {
bufferlist bl;
encode(p.first, bl);
encode(p.second, bl);
progress.omap_hash << bl;
entry.object_omap_keys++;
entry.object_omap_bytes += p.second.length();
}
if (done) {
DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
pg, *this, obj, progress);
progress.keys_done = true;
entry.omap_digest = progress.omap_hash.digest();
entry.omap_digest_present = true;
if ((entry.object_omap_keys >
local_conf().get_val<uint64_t>(
"osd_deep_scrub_large_omap_object_key_threshold")) ||
(entry.object_omap_bytes >
local_conf().get_val<Option::size_t>(
"osd_deep_scrub_large_omap_object_value_sum_threshold"))) {
entry.large_omap_object_found = true;
entry.large_omap_object_key_count = entry.object_omap_keys;
ret.has_large_omap_object_errors = true;
}
} else {
ceph_assert(!omap.empty()); // omap_get_values invariant
DEBUGDPP("op: {}, obj: {}, progress: {} omap not done, next {}",
pg, *this, obj, progress, omap.crbegin()->first);
progress.next_key = omap.crbegin()->first;
}
}).handle_error(
ct_error::all_same_way([FNAME, this, &obj, &progress, &entry, &pg]
(auto e) {
DEBUGDPP("op: {}, obj: {}, progress: {} error reading omap {}",
pg, *this, obj, progress, e);
progress.keys_done = true;
entry.read_error = true;
})
).then([] {
return interruptor::make_interruptible(
seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::no));
});
} else {
DEBUGDPP("op: {}, obj: {}, progress: {} done",
pg, *this, obj, progress);
return interruptor::make_interruptible(
seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes));
}
});
}
template class ScrubAsyncOpT<ScrubScan>;
}
template <>
struct fmt::formatter<crimson::osd::obj_scrub_progress_t> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
template <typename FormatContext>
auto format(const crimson::osd::obj_scrub_progress_t &progress,
FormatContext& ctx)
{
return fmt::format_to(
ctx.out(),
"obj_scrub_progress_t(offset: {}, "
"header_done: {}, next_key: {}, keys_done: {})",
progress.offset, progress.header_done,
progress.next_key, progress.keys_done);
}
};

View File

@ -0,0 +1,290 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include "common/Formatter.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/scrub/pg_scrubber.h"
#include "osd/osd_types.h"
#include "peering_event.h"
namespace crimson::osd {
class PG;
template <typename T>
class RemoteScrubEventBaseT : public PhasedOperationT<T> {
T* that() {
return static_cast<T*>(this);
}
const T* that() const {
return static_cast<const T*>(this);
}
PipelineHandle handle;
crimson::net::ConnectionRef conn;
epoch_t epoch;
spg_t pgid;
protected:
using interruptor = InterruptibleOperation::interruptor;
template <typename U=void>
using ifut = InterruptibleOperation::interruptible_future<U>;
virtual ifut<> handle_event(PG &pg) = 0;
public:
RemoteScrubEventBaseT(
crimson::net::ConnectionRef conn, epoch_t epoch, spg_t pgid)
: conn(conn), epoch(epoch), pgid(pgid) {}
PGPeeringPipeline &get_peering_pipeline(PG &pg);
ConnectionPipeline &get_connection_pipeline();
PerShardPipeline &get_pershard_pipeline(ShardServices &);
crimson::net::Connection &get_connection() {
assert(conn);
return *conn;
};
static constexpr bool can_create() { return false; }
spg_t get_pgid() const {
return pgid;
}
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return epoch; }
seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
assert(conn);
return conn.get_foreign(
).then([this](auto f_conn) {
conn.reset();
return f_conn;
});
}
void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
assert(!conn);
conn = make_local_shared_foreign(std::move(_conn));
}
seastar::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
std::tuple<
class TrackableOperationT<T>::StartEvent,
ConnectionPipeline::AwaitActive::BlockingEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
ConnectionPipeline::GetPGMapping::BlockingEvent,
PerShardPipeline::CreateOrWaitPG::BlockingEvent,
PGMap::PGCreationBlockingEvent,
PGPeeringPipeline::AwaitMap::BlockingEvent,
PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
PGPeeringPipeline::Process::BlockingEvent,
class TrackableOperationT<T>::CompletionEvent
> tracking_events;
virtual ~RemoteScrubEventBaseT() = default;
};
class ScrubRequested final : public RemoteScrubEventBaseT<ScrubRequested> {
bool deep = false;
protected:
ifut<> handle_event(PG &pg) final;
public:
static constexpr OperationTypeCode type = OperationTypeCode::scrub_requested;
template <typename... Args>
ScrubRequested(bool deep, Args&&... base_args)
: RemoteScrubEventBaseT<ScrubRequested>(std::forward<Args>(base_args)...),
deep(deep) {}
void print(std::ostream &out) const final {
out << "(deep=" << deep << ")";
}
void dump_detail(ceph::Formatter *f) const final {
f->dump_bool("deep", deep);
}
};
class ScrubMessage final : public RemoteScrubEventBaseT<ScrubMessage> {
MessageRef m;
protected:
ifut<> handle_event(PG &pg) final;
public:
static constexpr OperationTypeCode type = OperationTypeCode::scrub_message;
template <typename... Args>
ScrubMessage(MessageRef m, Args&&... base_args)
: RemoteScrubEventBaseT<ScrubMessage>(std::forward<Args>(base_args)...),
m(m) {
ceph_assert(scrub::PGScrubber::is_scrub_message(*m));
}
void print(std::ostream &out) const final {
out << "(m=" << *m << ")";
}
void dump_detail(ceph::Formatter *f) const final {
f->dump_stream("m") << *m;
}
};
template <typename T>
class ScrubAsyncOpT : public TrackableOperationT<T> {
Ref<PG> pg;
public:
using interruptor = InterruptibleOperation::interruptor;
template <typename U=void>
using ifut = InterruptibleOperation::interruptible_future<U>;
ScrubAsyncOpT(Ref<PG> pg);
ifut<> start();
virtual ~ScrubAsyncOpT() = default;
protected:
virtual ifut<> run(PG &pg) = 0;
};
class ScrubFindRange : public ScrubAsyncOpT<ScrubFindRange> {
hobject_t begin;
public:
static constexpr OperationTypeCode type = OperationTypeCode::scrub_find_range;
template <typename... Args>
ScrubFindRange(const hobject_t &begin, Args&&... args)
: ScrubAsyncOpT(std::forward<Args>(args)...), begin(begin) {}
void print(std::ostream &out) const final {
out << "(begin=" << begin << ")";
}
void dump_detail(ceph::Formatter *f) const final {
f->dump_stream("begin") << begin;
}
protected:
ifut<> run(PG &pg) final;
};
class ScrubReserveRange : public ScrubAsyncOpT<ScrubReserveRange> {
hobject_t begin;
hobject_t end;
/// see run(), used to unlock background_io_mutex on interval change
bool blocked_set = false;
public:
static constexpr OperationTypeCode type =
OperationTypeCode::scrub_reserve_range;
template <typename... Args>
ScrubReserveRange(const hobject_t &begin, const hobject_t &end, Args&&... args)
: ScrubAsyncOpT(std::forward<Args>(args)...), begin(begin), end(end) {}
void print(std::ostream &out) const final {
out << "(begin=" << begin << ", end=" << end << ")";
}
void dump_detail(ceph::Formatter *f) const final {
f->dump_stream("begin") << begin;
f->dump_stream("end") << end;
}
protected:
ifut<> run(PG &pg) final;
};
class ScrubScan : public ScrubAsyncOpT<ScrubScan> {
/// deep or shallow scrub
const bool deep;
/// true: send event locally, false: send result to primary
const bool local;
/// object range to scan: [begin, end)
const hobject_t begin;
const hobject_t end;
/// result, see local
ScrubMap ret;
ifut<> scan_object(PG &pg, const ghobject_t &obj);
ifut<> deep_scan_object(PG &pg, const ghobject_t &obj);
public:
static constexpr OperationTypeCode type = OperationTypeCode::scrub_scan;
void print(std::ostream &out) const final {
out << "(deep=" << deep
<< ", local=" << local
<< ", begin=" << begin
<< ", end=" << end
<< ")";
}
void dump_detail(ceph::Formatter *f) const final {
f->dump_bool("deep", deep);
f->dump_bool("local", local);
f->dump_stream("begin") << begin;
f->dump_stream("end") << end;
}
ScrubScan(
Ref<PG> pg, bool deep, bool local,
const hobject_t &begin, const hobject_t &end)
: ScrubAsyncOpT(pg), deep(deep), local(local), begin(begin), end(end) {}
protected:
ifut<> run(PG &pg) final;
};
}
namespace crimson {
template <>
struct EventBackendRegistry<osd::ScrubRequested> {
static std::tuple<> get_backends() {
return {};
}
};
template <>
struct EventBackendRegistry<osd::ScrubMessage> {
static std::tuple<> get_backends() {
return {};
}
};
}
#if FMT_VERSION >= 90000
template <> struct fmt::formatter<crimson::osd::ScrubRequested>
: fmt::ostream_formatter {};
template <> struct fmt::formatter<crimson::osd::ScrubMessage>
: fmt::ostream_formatter {};
template <typename T>
struct fmt::formatter<crimson::osd::ScrubAsyncOpT<T>>
: fmt::ostream_formatter {};
template <> struct fmt::formatter<crimson::osd::ScrubFindRange>
: fmt::ostream_formatter {};
template <> struct fmt::formatter<crimson::osd::ScrubReserveRange>
: fmt::ostream_formatter {};
template <> struct fmt::formatter<crimson::osd::ScrubScan>
: fmt::ostream_formatter {};
#endif

View File

@ -125,6 +125,7 @@ PG::PG(
osdmap,
this,
this),
scrubber(*this),
obc_registry{
local_conf()},
obc_loader{
@ -144,6 +145,7 @@ PG::PG(
pgid.shard),
wait_for_active_blocker(this)
{
scrubber.initiate();
peering_state.set_backend_predicates(
new ReadablePredicate(pg_whoami),
new RecoverablePredicate());
@ -331,6 +333,12 @@ void PG::on_activate(interval_set<snapid_t> snaps)
projected_last_update = peering_state.get_info().last_update;
}
void PG::on_replica_activate()
{
logger().debug("{}: {}", *this, __func__);
scrubber.on_replica_activate();
}
void PG::on_activate_complete()
{
wait_for_active_blocker.unblock();
@ -458,7 +466,7 @@ PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next)
Context *PG::on_clean()
{
// Not needed yet (will be needed for IO unblocking)
scrubber.on_primary_active_clean();
return nullptr;
}
@ -1347,6 +1355,9 @@ void PG::log_operation(
if (!is_primary()) { // && !is_ec_pg()
replica_clear_repop_obc(logv);
}
if (!logv.empty()) {
scrubber.on_log_update(logv.rbegin()->version);
}
peering_state.append_log(std::move(logv),
trim_to,
roll_forward_to,
@ -1527,6 +1538,7 @@ void PG::on_change(ceph::os::Transaction &t) {
logger().debug("{} {}: dropping requests", *this, __func__);
client_request_orderer.clear_and_cancel(*this);
}
scrubber.on_interval_change();
}
void PG::context_registry_on_change() {

View File

@ -39,6 +39,7 @@
#include "crimson/osd/pg_recovery_listener.h"
#include "crimson/osd/recovery_backend.h"
#include "crimson/osd/object_context_loader.h"
#include "crimson/osd/scrub/pg_scrubber.h"
class MQuery;
class OSDMap;
@ -160,8 +161,6 @@ public:
bool need_write_epoch,
ceph::os::Transaction &t) final;
void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
uint64_t get_snap_trimq_size() const final {
return std::size(snap_trimq);
}
@ -318,6 +317,7 @@ public:
}
void on_change(ceph::os::Transaction &t) final;
void on_activate(interval_set<snapid_t> to_trim) final;
void on_replica_activate() final;
void on_activate_complete() final;
void on_new_interval() final {
// Not needed yet
@ -627,6 +627,18 @@ private:
eversion_t projected_last_update;
public:
// scrub state
friend class ScrubScan;
friend class ScrubFindRange;
friend class ScrubReserveRange;
friend class scrub::PGScrubber;
template <typename T> friend class RemoteScrubEventBaseT;
scrub::PGScrubber scrubber;
void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
ObjectContextRegistry obc_registry;
ObjectContextLoader obc_loader;

View File

@ -0,0 +1,309 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
#include <fmt/ranges.h>
#include "crimson/common/log.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/osd_operations/scrub_events.h"
#include "messages/MOSDRepScrub.h"
#include "messages/MOSDRepScrubMap.h"
#include "pg_scrubber.h"
SET_SUBSYS(osd);
namespace crimson::osd::scrub {
void PGScrubber::dump_detail(Formatter *f) const
{
f->dump_stream("pgid") << pg.get_pgid();
}
PGScrubber::PGScrubber(PG &pg) : pg(pg), dpp(pg), machine(*this) {}
void PGScrubber::on_primary_active_clean()
{
LOG_PREFIX(PGScrubber::on_primary_active_clean);
DEBUGDPP("", pg);
handle_event(events::primary_activate_t{});
}
void PGScrubber::on_replica_activate()
{
LOG_PREFIX(PGScrubber::on_replica_activate);
DEBUGDPP("", pg);
handle_event(events::replica_activate_t{});
}
void PGScrubber::on_interval_change()
{
LOG_PREFIX(PGScrubber::on_interval_change);
DEBUGDPP("", pg);
/* Once reservations and scheduling are introduced, we'll need an
* IntervalChange event to drop remote resources (they'll be automatically
* released on the other side) */
handle_event(events::reset_t{});
waiting_for_update = std::nullopt;
ceph_assert(!blocked);
}
void PGScrubber::on_log_update(eversion_t v)
{
LOG_PREFIX(PGScrubber::on_interval_change);
if (waiting_for_update && v >= *waiting_for_update) {
DEBUGDPP("waiting_for_update: {}, v: {}", pg, *waiting_for_update, v);
handle_event(await_update_complete_t{});
waiting_for_update = std::nullopt;
}
}
void PGScrubber::handle_scrub_requested(bool deep)
{
LOG_PREFIX(PGScrubber::handle_scrub_requested);
DEBUGDPP("deep: {}", pg, deep);
handle_event(events::start_scrub_t{deep});
}
void PGScrubber::handle_scrub_message(Message &_m)
{
LOG_PREFIX(PGScrubber::handle_scrub_requested);
switch (_m.get_type()) {
case MSG_OSD_REP_SCRUB: {
MOSDRepScrub &m = *static_cast<MOSDRepScrub*>(&_m);
DEBUGDPP("MOSDRepScrub: {}", pg, m);
handle_event(events::replica_scan_t{
m.start, m.end, m.scrub_from, m.deep
});
break;
}
case MSG_OSD_REP_SCRUBMAP: {
MOSDRepScrubMap &m = *static_cast<MOSDRepScrubMap*>(&_m);
DEBUGDPP("MOSDRepScrubMap: {}", pg, m);
ScrubMap map;
auto iter = m.get_data().cbegin();
::decode(map, iter);
handle_event(scan_range_complete_t{
m.from, std::move(map)
});
break;
}
default:
DEBUGDPP("invalid message: {}", pg, _m);
ceph_assert(is_scrub_message(_m));
}
}
void PGScrubber::handle_op_stats(
const hobject_t &on_object,
object_stat_sum_t delta_stats) {
handle_event(events::op_stats_t{on_object, delta_stats});
}
PGScrubber::ifut<> PGScrubber::wait_scrub(
PGScrubber::BlockingEvent::TriggerI&& trigger,
const hobject_t &hoid)
{
LOG_PREFIX(PGScrubber::wait_scrub);
if (blocked && (hoid >= blocked->begin) && (hoid < blocked->end)) {
DEBUGDPP("blocked: {}, hoid: {}", pg, *blocked, hoid);
return trigger.maybe_record_blocking(
blocked->p.get_shared_future(),
*this);
} else {
return seastar::now();
}
}
void PGScrubber::notify_scrub_start(bool deep)
{
LOG_PREFIX(PGScrubber::notify_scrub_start);
DEBUGDPP("deep: {}", pg, deep);
pg.peering_state.state_set(PG_STATE_SCRUBBING);
if (deep) {
pg.peering_state.state_set(PG_STATE_DEEP_SCRUB);
}
pg.publish_stats_to_osd();
}
void PGScrubber::notify_scrub_end(bool deep)
{
LOG_PREFIX(PGScrubber::notify_scrub_end);
DEBUGDPP("deep: {}", pg, deep);
pg.peering_state.state_clear(PG_STATE_SCRUBBING);
if (deep) {
pg.peering_state.state_clear(PG_STATE_DEEP_SCRUB);
}
pg.publish_stats_to_osd();
}
const std::set<pg_shard_t> &PGScrubber::get_ids_to_scrub() const
{
return pg.peering_state.get_actingset();
}
chunk_validation_policy_t PGScrubber::get_policy() const
{
return chunk_validation_policy_t{
pg.get_primary(),
std::nullopt /* stripe_info, populate when EC is implemented */,
crimson::common::local_conf().get_val<Option::size_t>(
"osd_max_object_size"),
crimson::common::local_conf().get_val<std::string>(
"osd_hit_set_namespace"),
crimson::common::local_conf().get_val<Option::size_t>(
"osd_deep_scrub_large_omap_object_value_sum_threshold"),
crimson::common::local_conf().get_val<uint64_t>(
"osd_deep_scrub_large_omap_object_key_threshold")
};
}
void PGScrubber::request_range(const hobject_t &start)
{
LOG_PREFIX(PGScrubber::request_range);
DEBUGDPP("start: {}", pg, start);
std::ignore = pg.shard_services.start_operation_may_interrupt<
interruptor, ScrubFindRange
>(start, &pg);
}
/* TODO: This isn't actually enough. Here, classic would
* hold the pg lock from the wait_scrub through to IO submission.
* ClientRequest, however, isn't in the processing ExclusivePhase
* bit yet, and so this check may miss ops between the wait_scrub
* check and adding the IO to the log. */
void PGScrubber::reserve_range(const hobject_t &start, const hobject_t &end)
{
LOG_PREFIX(PGScrubber::reserve_range);
DEBUGDPP("start: {}, end: {}", pg, start, end);
std::ignore = pg.shard_services.start_operation_may_interrupt<
interruptor, ScrubReserveRange
>(start, end, &pg);
}
void PGScrubber::release_range()
{
LOG_PREFIX(PGScrubber::release_range);
ceph_assert(blocked);
DEBUGDPP("blocked: {}", pg, *blocked);
pg.background_process_lock.unlock();
blocked->p.set_value();
blocked = std::nullopt;
}
void PGScrubber::scan_range(
pg_shard_t target,
eversion_t version,
bool deep,
const hobject_t &start,
const hobject_t &end)
{
LOG_PREFIX(PGScrubber::scan_range);
DEBUGDPP("target: {}, version: {}, deep: {}, start: {}, end: {}",
pg, target, version, deep, start, end);
if (target == pg.get_pg_whoami()) {
std::ignore = pg.shard_services.start_operation_may_interrupt<
interruptor, ScrubScan
>(&pg, deep, true /* local */, start, end);
} else {
std::ignore = pg.shard_services.send_to_osd(
target.osd,
crimson::make_message<MOSDRepScrub>(
spg_t(pg.get_pgid().pgid, target.shard),
version,
pg.get_osdmap_epoch(),
pg.get_osdmap_epoch(),
start,
end,
deep,
false /* allow preemption -- irrelevant for replicas TODO */,
64 /* priority, TODO */,
false /* high_priority TODO */),
pg.get_osdmap_epoch());
}
}
bool PGScrubber::await_update(const eversion_t &version)
{
LOG_PREFIX(PGScrubber::await_update);
DEBUGDPP("version: {}", pg, version);
ceph_assert(!waiting_for_update);
auto& log = pg.peering_state.get_pg_log().get_log().log;
eversion_t current = log.empty() ? eversion_t() : log.rbegin()->version;
if (version <= current) {
return true;
} else {
waiting_for_update = version;
return false;
}
}
void PGScrubber::generate_and_submit_chunk_result(
const hobject_t &begin,
const hobject_t &end,
bool deep)
{
LOG_PREFIX(PGScrubber::generate_and_submit_chunk_result);
DEBUGDPP("begin: {}, end: {}, deep: {}", pg, begin, end, deep);
std::ignore = pg.shard_services.start_operation_may_interrupt<
interruptor, ScrubScan
>(&pg, deep, false /* local */, begin, end);
}
#define LOG_SCRUB_ERROR(MSG, ...) { \
auto errorstr = fmt::format(MSG, __VA_ARGS__); \
ERRORDPP("{}", pg, errorstr); \
pg.get_clog_error() << "pg " << pg.get_pgid() << ": " << errorstr; \
}
void PGScrubber::emit_chunk_result(
const request_range_result_t &range,
chunk_result_t &&result)
{
LOG_PREFIX(PGScrubber::emit_chunk_result);
if (result.has_errors()) {
LOG_SCRUB_ERROR(
"Scrub errors found. range: {}, result: {}",
range, result);
} else {
DEBUGDPP("Chunk complete. range: {}", pg, range);
}
}
void PGScrubber::emit_scrub_result(
bool deep,
object_stat_sum_t in_stats)
{
LOG_PREFIX(PGScrubber::emit_scrub_result);
DEBUGDPP("", pg);
pg.peering_state.update_stats(
[this, FNAME, deep, &in_stats](auto &history, auto &pg_stats) {
foreach_scrub_maintained_stat(
[deep, &pg_stats, &in_stats](
const auto &name, auto statptr, bool skip_for_shallow) {
if (deep && !skip_for_shallow) {
pg_stats.stats.sum.*statptr = in_stats.*statptr;
}
});
foreach_scrub_checked_stat(
[this, FNAME, &pg_stats, &in_stats](
const auto &name, auto statptr, const auto &invalid_predicate) {
if (!invalid_predicate(pg_stats) &&
(in_stats.*statptr != pg_stats.stats.sum.*statptr)) {
LOG_SCRUB_ERROR(
"stat mismatch for {}: scrubbed value: {}, stored pg value: {}",
name, in_stats.*statptr, pg_stats.stats.sum.*statptr);
++pg_stats.stats.sum.num_shallow_scrub_errors;
}
});
history.last_scrub = pg.peering_state.get_info().last_update;
auto now = ceph_clock_now();
history.last_scrub_stamp = now;
if (deep) {
history.last_deep_scrub_stamp = now;
}
return false; // notify_scrub_end will flush stats to osd
});
}
}

View File

@ -0,0 +1,152 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
#pragma once
#include "crimson/osd/pg_interval_interrupt_condition.h"
#include "scrub_machine.h"
namespace crimson::osd {
class PG;
class ScrubScan;
class ScrubFindRange;
class ScrubReserveRange;
}
namespace crimson::osd::scrub {
struct blocked_range_t {
hobject_t begin;
hobject_t end;
seastar::shared_promise<> p;
};
class PGScrubber : public crimson::BlockerT<PGScrubber>, ScrubContext {
friend class ::crimson::osd::ScrubScan;
friend class ::crimson::osd::ScrubFindRange;
friend class ::crimson::osd::ScrubReserveRange;
using interruptor = ::crimson::interruptible::interruptor<
::crimson::osd::IOInterruptCondition>;
template <typename T = void>
using ifut =
::crimson::interruptible::interruptible_future<
::crimson::osd::IOInterruptCondition, T>;
PG &pg;
/// PG alias for logging in header functions
DoutPrefixProvider &dpp;
ScrubMachine machine;
std::optional<blocked_range_t> blocked;
std::optional<eversion_t> waiting_for_update;
template <typename E>
void handle_event(E &&e)
{
LOG_PREFIX(PGScrubber::handle_event);
SUBDEBUGDPP(osd, "handle_event: {}", dpp, e);
machine.process_event(std::forward<E>(e));
}
public:
static constexpr const char *type_name = "PGScrubber";
using Blocker = PGScrubber;
void dump_detail(Formatter *f) const;
static inline bool is_scrub_message(Message &m) {
switch (m.get_type()) {
case MSG_OSD_REP_SCRUB:
case MSG_OSD_REP_SCRUBMAP:
return true;
default:
return false;
}
return false;
}
PGScrubber(PG &pg);
/// setup scrub machine state
void initiate() { machine.initiate(); }
/// notify machine on primary that PG is active+clean
void on_primary_active_clean();
/// notify machine on replica that PG is active
void on_replica_activate();
/// notify machine of interval change
void on_interval_change();
/// notify machine that PG has committed up to versino v
void on_log_update(eversion_t v);
/// handle scrub request
void handle_scrub_requested(bool deep);
/// handle other scrub message
void handle_scrub_message(Message &m);
/// notify machine of a mutation of on_object resulting in delta_stats
void handle_op_stats(
const hobject_t &on_object,
object_stat_sum_t delta_stats);
/// maybe block an op trying to mutate hoid until chunk is complete
ifut<> wait_scrub(
PGScrubber::BlockingEvent::TriggerI&& trigger,
const hobject_t &hoid);
private:
DoutPrefixProvider &get_dpp() final { return dpp; }
void notify_scrub_start(bool deep) final;
void notify_scrub_end(bool deep) final;
const std::set<pg_shard_t> &get_ids_to_scrub() const final;
chunk_validation_policy_t get_policy() const final;
void request_range(const hobject_t &start) final;
void reserve_range(const hobject_t &start, const hobject_t &end) final;
void release_range() final;
void scan_range(
pg_shard_t target,
eversion_t version,
bool deep,
const hobject_t &start,
const hobject_t &end) final;
bool await_update(const eversion_t &version) final;
void generate_and_submit_chunk_result(
const hobject_t &begin,
const hobject_t &end,
bool deep) final;
void emit_chunk_result(
const request_range_result_t &range,
chunk_result_t &&result) final;
void emit_scrub_result(
bool deep,
object_stat_sum_t scrub_stats) final;
};
};
template <>
struct fmt::formatter<crimson::osd::scrub::blocked_range_t> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
template <typename FormatContext>
auto format(const auto &range, FormatContext& ctx)
{
return fmt::format_to(
ctx.out(),
"{}~{}",
range.begin,
range.end);
}
};