crimson/osd: bring SnapTrimObjSubEvent

Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
This commit is contained in:
Radosław Zarzyński 2022-12-06 22:20:15 +01:00 committed by Radoslaw Zarzynski
parent 116c04595b
commit 972af821bb
6 changed files with 417 additions and 0 deletions

View File

@ -46,6 +46,7 @@ enum class OperationTypeCode {
logmissing_request,
logmissing_request_reply,
snaptrim_event,
snaptrimobj_subevent,
last_op
};
@ -62,6 +63,7 @@ static constexpr const char* const OP_NAMES[] = {
"logmissing_request",
"logmissing_request_reply",
"snaptrim_event",
"snaptrimobj_subevent",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:

View File

@ -297,4 +297,11 @@ struct EventBackendRegistry<osd::PGAdvanceMap> {
}
};
template <>
struct EventBackendRegistry<osd::SnapTrimObjSubEvent> {
static std::tuple<> get_backends() {
return {};
}
};
} // namespace crimson

View File

@ -12,6 +12,7 @@ class CommonPGPipeline {
protected:
friend class InternalClientRequest;
friend class SnapTrimEvent;
friend class SnapTrimObjSubEvent;
struct WaitForActive : OrderedExclusivePhaseT<WaitForActive> {
static constexpr auto type_name = "CommonPGPipeline:::wait_for_active";

View File

@ -5,6 +5,7 @@
#include "crimson/osd/osd_operations/snaptrim_event.h"
#include "crimson/osd/pg.h"
#include "include/expected.hpp"
namespace {
seastar::logger& logger() {
@ -19,6 +20,13 @@ namespace crimson {
return {};
}
};
template <>
struct EventBackendRegistry<osd::SnapTrimObjSubEvent> {
static std::tuple<> get_backends() {
return {};
}
};
}
namespace crimson::osd {
@ -136,6 +144,11 @@ seastar::future<seastar::stop_iteration> SnapTrimEvent::with_pg(
}
for (const auto& object : to_trim) {
logger().debug("{}: trimming {}", *this, object);
auto [op, fut] = shard_services.start_operation<SnapTrimObjSubEvent>(
pg,
object,
snapid);
subop_blocker.emplace_back(op->get_id(), std::move(fut));
}
return subop_blocker.wait_completion().then([] {
return seastar::make_ready_future<seastar::stop_iteration>(
@ -151,4 +164,336 @@ seastar::future<seastar::stop_iteration> SnapTrimEvent::with_pg(
}, pg);
}
CommonPGPipeline& SnapTrimObjSubEvent::pp()
{
return pg->request_pg_pipeline;
}
seastar::future<> SnapTrimObjSubEvent::start()
{
logger().debug("{}: start", *this);
IRef ref = this;
auto maybe_delay = seastar::now();
if (auto delay = 0; delay) {
maybe_delay = seastar::sleep(
std::chrono::milliseconds(std::lround(delay * 1000)));
}
return maybe_delay.then([this] {
return with_pg(pg->get_shard_services(), pg);
}).finally([ref=std::move(ref), this] {
logger().debug("{}: complete", *ref);
return handle.complete();
});
}
tl::expected<SnapTrimObjSubEvent::remove_or_update_ret_t, int>
SnapTrimObjSubEvent::remove_or_update(
ObjectContextRef obc,
ObjectContextRef head_obc)
{
ceph::os::Transaction txn{};
std::vector<pg_log_entry_t> log_entries{};
SnapSet& snapset = obc->ssc->snapset;
auto citer = snapset.clone_snaps.find(coid.snap);
if (citer == snapset.clone_snaps.end()) {
logger().error("{}: No clone_snaps in snapset {} for object {}",
*this, snapset, coid);
return tl::unexpected{-ENOENT};
}
const auto& old_snaps = citer->second;
if (old_snaps.empty()) {
logger().error("{}: no object info snaps for object {}",
*this, coid);
return tl::unexpected{-ENOENT};
}
if (snapset.seq == 0) {
logger().error("{}: no snapset.seq for object {}",
*this, coid);
return tl::unexpected{-ENOENT};
}
const OSDMapRef& osdmap = pg->get_osdmap();
std::set<snapid_t> new_snaps;
for (const auto& old_snap : old_snaps) {
if (!osdmap->in_removed_snaps_queue(pg->get_info().pgid.pgid.pool(),
old_snap)
&& old_snap != snap_to_trim) {
new_snaps.insert(old_snap);
}
}
std::vector<snapid_t>::iterator p = snapset.clones.end();
if (new_snaps.empty()) {
p = std::find(snapset.clones.begin(), snapset.clones.end(), coid.snap);
if (p == snapset.clones.end()) {
logger().error("{}: Snap {} not in clones",
*this, coid.snap);
return tl::unexpected{-ENOENT};
}
}
int64_t num_objects_before_trim = delta_stats.num_objects;
osd_op_p.at_version = pg->next_version();
object_info_t &coi = obc->obs.oi;
if (new_snaps.empty()) {
// remove clone
logger().info("{}: {} snaps {} -> {} ... deleting",
*this, coid, old_snaps, new_snaps);
// ...from snapset
assert(p != snapset.clones.end());
snapid_t last = coid.snap;
delta_stats.num_bytes -= snapset.get_clone_bytes(last);
if (p != snapset.clones.begin()) {
// not the oldest... merge overlap into next older clone
std::vector<snapid_t>::iterator n = p - 1;
hobject_t prev_coid = coid;
prev_coid.snap = *n;
// does the classical OSD really need is_present_clone(prev_coid)?
delta_stats.num_bytes -= snapset.get_clone_bytes(*n);
snapset.clone_overlap[*n].intersection_of(
snapset.clone_overlap[*p]);
delta_stats.num_bytes += snapset.get_clone_bytes(*n);
}
delta_stats.num_objects--;
if (coi.is_dirty()) {
delta_stats.num_objects_dirty--;
}
if (coi.is_omap()) {
delta_stats.num_objects_omap--;
}
if (coi.is_whiteout()) {
logger().debug("{}: trimming whiteout on {}",
*this, coid);
delta_stats.num_whiteouts--;
}
delta_stats.num_object_clones--;
obc->obs.exists = false;
snapset.clones.erase(p);
snapset.clone_overlap.erase(last);
snapset.clone_size.erase(last);
snapset.clone_snaps.erase(last);
log_entries.emplace_back(
pg_log_entry_t{
pg_log_entry_t::DELETE,
coid,
osd_op_p.at_version,
coi.version,
0,
osd_reqid_t(),
coi.mtime, // will be replaced in `apply_to()`
0}
);
txn.remove(
pg->get_collection_ref()->get_cid(),
ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
coi = object_info_t(coid);
} else {
// save adjusted snaps for this object
logger().info("{}: {} snaps {} -> {}",
*this, coid, old_snaps, new_snaps);
snapset.clone_snaps[coid.snap] =
std::vector<snapid_t>(new_snaps.rbegin(), new_snaps.rend());
// we still do a 'modify' event on this object just to trigger a
// snapmapper.update ... :(
coi.prior_version = coi.version;
coi.version = osd_op_p.at_version;
ceph::bufferlist bl;
encode(coi, bl, pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
txn.setattr(
pg->get_collection_ref()->get_cid(),
ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
OI_ATTR,
bl);
log_entries.emplace_back(
pg_log_entry_t{
pg_log_entry_t::MODIFY,
coid,
coi.version,
coi.prior_version,
0,
osd_reqid_t(),
coi.mtime,
0}
);
}
osd_op_p.at_version = pg->next_version();
// save head snapset
logger().debug("{}: {} new snapset {} on {}",
*this, coid, snapset, head_obc->obs.oi);
const auto head_oid = coid.get_head();
if (snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) {
// NOTE: this arguably constitutes minor interference with the
// tiering agent if this is a cache tier since a snap trim event
// is effectively evicting a whiteout we might otherwise want to
// keep around.
logger().info("{}: {} removing {}",
*this, coid, head_oid);
log_entries.emplace_back(
pg_log_entry_t{
pg_log_entry_t::DELETE,
head_oid,
osd_op_p.at_version,
head_obc->obs.oi.version,
0,
osd_reqid_t(),
coi.mtime, // will be replaced in `apply_to()`
0}
);
logger().info("{}: remove snap head", *this);
object_info_t& oi = head_obc->obs.oi;
delta_stats.num_objects--;
if (oi.is_dirty()) {
delta_stats.num_objects_dirty--;
}
if (oi.is_omap()) {
delta_stats.num_objects_omap--;
}
if (oi.is_whiteout()) {
logger().debug("{}: trimming whiteout on {}",
*this, oi.soid);
delta_stats.num_whiteouts--;
}
head_obc->obs.exists = false;
head_obc->obs.oi = object_info_t(head_oid);
txn.remove(pg->get_collection_ref()->get_cid(),
ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
} else {
snapset.snaps.clear();
logger().info("{}: writing updated snapset on {}, snapset is {}",
*this, head_oid, snapset);
log_entries.emplace_back(
pg_log_entry_t{
pg_log_entry_t::MODIFY,
head_oid,
osd_op_p.at_version,
head_obc->obs.oi.version,
0,
osd_reqid_t(),
coi.mtime,
0}
);
head_obc->obs.oi.prior_version = head_obc->obs.oi.version;
head_obc->obs.oi.version = osd_op_p.at_version;
std::map<std::string, ceph::bufferlist, std::less<>> attrs;
ceph::bufferlist bl;
encode(snapset, bl);
attrs[SS_ATTR] = std::move(bl);
bl.clear();
encode(head_obc->obs.oi, bl,
pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
attrs[OI_ATTR] = std::move(bl);
txn.setattrs(
pg->get_collection_ref()->get_cid(),
ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
attrs);
}
// Stats reporting - Set number of objects trimmed
if (num_objects_before_trim > delta_stats.num_objects) {
int64_t num_objects_trimmed =
num_objects_before_trim - delta_stats.num_objects;
//add_objects_trimmed_count(num_objects_trimmed);
}
return std::make_pair(std::move(txn), std::move(log_entries));
}
seastar::future<> SnapTrimObjSubEvent::with_pg(
ShardServices &shard_services, Ref<PG> _pg)
{
return interruptor::with_interruption([this] {
return enter_stage<interruptor>(
pp().wait_for_active
).then_interruptible([this] {
return with_blocking_event<PGActivationBlocker::BlockingEvent,
interruptor>([this] (auto&& trigger) {
return pg->wait_for_active_blocker.wait(std::move(trigger));
});
}).then_interruptible([this] {
return enter_stage<interruptor>(
pp().recover_missing);
}).then_interruptible([this] {
//return do_recover_missing(pg, get_target_oid());
return seastar::now();
}).then_interruptible([this] {
return enter_stage<interruptor>(
pp().get_obc);
}).then_interruptible([this] {
logger().debug("{}: getting obc for {}", *this, coid);
// end of commonality
// with_cone_obc lock both clone's and head's obcs
return pg->obc_loader.with_clone_obc<RWState::RWWRITE>(coid, [this](auto clone_obc) {
logger().debug("{}: got clone_obc={}", *this, clone_obc);
return enter_stage<interruptor>(
pp().process
).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable {
logger().debug("{}: processing clone_obc={}", *this, clone_obc);
auto head_obc = clone_obc->head;
return interruptor::async([=, this]() mutable {
if (auto ret = remove_or_update(clone_obc, head_obc);
!ret.has_value()) {
logger().error("{}: trimmig error {}",
*this, ret.error());
//pg->state_set(PG_STATE_SNAPTRIM_ERROR);
} else {
auto [txn, log_entries] = std::move(ret).value();
auto [submitted, all_completed] = pg->submit_transaction(
std::move(clone_obc),
std::move(txn),
std::move(osd_op_p),
std::move(log_entries));
submitted.get();
all_completed.get();
}
}).then_interruptible([this] {
return enter_stage<interruptor>(
wait_repop
);
}).then_interruptible([this, clone_obc=std::move(clone_obc)] {
return PG::load_obc_iertr::now();
});
});
}).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] {
return seastar::now();
}));
}).then_interruptible([] {
// end of commonality
return seastar::now();
});
}, [this](std::exception_ptr eptr) {
// TODO: better debug output
logger().debug("{}: interrupted {}", *this, eptr);
}, pg);
}
void SnapTrimObjSubEvent::print(std::ostream &lhs) const
{
lhs << "SnapTrimObjSubEvent("
<< "coid=" << coid
<< " snapid=" << snap_to_trim
<< ")";
}
void SnapTrimObjSubEvent::dump_detail(Formatter *f) const
{
f->open_object_section("SnapTrimObjSubEvent");
f->dump_stream("coid") << coid;
f->close_section();
}
} // namespace crimson::osd

View File

@ -80,8 +80,69 @@ public:
> tracking_events;
};
// remove single object. a SnapTrimEvent can create multiple subrequests.
// the division of labour is needed because of the restriction that an Op
// cannot revisite a pipeline's stage it already saw.
class SnapTrimObjSubEvent : public PhasedOperationT<SnapTrimObjSubEvent> {
public:
static constexpr OperationTypeCode type =
OperationTypeCode::snaptrimobj_subevent;
SnapTrimObjSubEvent(
Ref<PG> pg,
const hobject_t& coid,
snapid_t snap_to_trim)
: pg(std::move(pg)),
coid(coid),
snap_to_trim(snap_to_trim) {
}
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;
seastar::future<> start();
seastar::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
CommonPGPipeline& pp();
private:
object_stat_sum_t delta_stats;
using remove_or_update_ret_t =
std::pair<ceph::os::Transaction, std::vector<pg_log_entry_t>>;
tl::expected<remove_or_update_ret_t, int>
remove_or_update(ObjectContextRef obc, ObjectContextRef head_obc);
// we don't need to synchronize with other instances started by
// SnapTrimEvent; it's here for the sake of op tracking.
struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> {
static constexpr auto type_name = "SnapTrimObjSubEvent::wait_repop";
} wait_repop;
Ref<PG> pg;
PipelineHandle handle;
osd_op_params_t osd_op_p;
const hobject_t coid;
const snapid_t snap_to_trim;
public:
PipelineHandle& get_handle() { return handle; }
std::tuple<
StartEvent,
CommonPGPipeline::WaitForActive::BlockingEvent,
PGActivationBlocker::BlockingEvent,
CommonPGPipeline::RecoverMissing::BlockingEvent,
CommonPGPipeline::GetOBC::BlockingEvent,
CommonPGPipeline::Process::BlockingEvent,
WaitRepop::BlockingEvent,
CompletionEvent
> tracking_events;
};
} // namespace crimson::osd
#if FMT_VERSION >= 90000
template <> struct fmt::formatter<crimson::osd::SnapTrimEvent> : fmt::ostream_formatter {};
template <> struct fmt::formatter<crimson::osd::SnapTrimObjSubEvent> : fmt::ostream_formatter {};
#endif

View File

@ -721,6 +721,7 @@ private:
friend class InternalClientRequest;
friend class WatchTimeoutRequest;
friend class SnapTrimEvent;
friend class SnapTrimObjSubEvent;
private:
seastar::future<bool> find_unfound() {
return seastar::make_ready_future<bool>(true);