Merge pull request #60598 from xxhdx1985126/wip-68808

crimson/osd/replicate_backend: add the skipped newly created clone object to the push queue after the clone request completes

Reviewed-by: Matan Breizman <mbreizma@redhat.com>
Reviewed-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Matan Breizman 2024-11-19 18:23:00 +02:00 committed by GitHub
commit db3db9c950
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 161 additions and 52 deletions

View File

@ -611,4 +611,12 @@ void BackfillState::ProgressTracker::complete_to(
}
}
void BackfillState::enqueue_standalone_push(
const hobject_t &obj,
const eversion_t &v,
const std::vector<pg_shard_t> &peers) {
progress_tracker->enqueue_push(obj);
backfill_machine.backfill_listener.enqueue_push(obj, v, peers);
}
} // namespace crimson::osd

View File

@ -304,6 +304,15 @@ public:
backfill_machine.process_event(*std::move(evt));
}
void enqueue_standalone_push(
const hobject_t &obj,
const eversion_t &v,
const std::vector<pg_shard_t> &peers);
bool is_triggered() const {
return backfill_machine.triggering_event() != nullptr;
}
hobject_t get_last_backfill_started() const {
return last_backfill_started;
}

View File

@ -26,6 +26,7 @@ ECBackend::_read(const hobject_t& hoid,
ECBackend::rep_op_fut_t
ECBackend::submit_transaction(const std::set<pg_shard_t> &pg_shards,
const hobject_t& hoid,
crimson::osd::ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p,
epoch_t min_epoch, epoch_t max_epoch,

View File

@ -28,6 +28,7 @@ private:
rep_op_fut_t
submit_transaction(const std::set<pg_shard_t> &pg_shards,
const hobject_t& hoid,
crimson::osd::ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& req,
epoch_t min_epoch, epoch_t max_epoch,

View File

@ -940,6 +940,7 @@ std::unique_ptr<OpsExecuter::CloningContext> OpsExecuter::execute_clone(
};
encode(cloned_snaps, cloning_ctx->log_entry.snaps);
cloning_ctx->log_entry.clean_regions.mark_data_region_dirty(0, initial_obs.oi.size);
cloning_ctx->clone_obc = clone_obc;
return cloning_ctx;
}
@ -966,7 +967,7 @@ void OpsExecuter::update_clone_overlap() {
void OpsExecuter::CloningContext::apply_to(
std::vector<pg_log_entry_t>& log_entries,
ObjectContext& processed_obc) &&
ObjectContext& processed_obc)
{
log_entry.mtime = processed_obc.obs.oi.mtime;
log_entries.insert(log_entries.begin(), std::move(log_entry));
@ -983,7 +984,7 @@ OpsExecuter::flush_clone_metadata(
assert(!txn.empty());
update_clone_overlap();
if (cloning_ctx) {
std::move(*cloning_ctx).apply_to(log_entries, *obc);
cloning_ctx->apply_to(log_entries, *obc);
}
if (snapc.seq > obc->ssc->snapset.seq) {
// update snapset with latest snap context

View File

@ -197,10 +197,11 @@ private:
struct CloningContext {
SnapSet new_snapset;
pg_log_entry_t log_entry;
ObjectContextRef clone_obc;
void apply_to(
std::vector<pg_log_entry_t>& log_entries,
ObjectContext& processed_obc) &&;
ObjectContext& processed_obc);
};
std::unique_ptr<CloningContext> cloning_ctx;
@ -504,6 +505,7 @@ OpsExecuter::flush_changes_n_do_ops_effects(
ceph_assert(want_mutate);
}
apply_stats();
if (want_mutate) {
auto log_entries = flush_clone_metadata(
prepare_transaction(ops),
@ -519,14 +521,15 @@ OpsExecuter::flush_changes_n_do_ops_effects(
std::move(txn),
std::move(obc),
std::move(*osd_op_params),
std::move(log_entries));
std::move(log_entries),
cloning_ctx
? std::move(cloning_ctx->clone_obc)
: nullptr);
submitted = std::move(_submitted);
all_completed = std::move(_all_completed);
}
apply_stats();
if (op_effects.size()) [[unlikely]] {
// need extra ref pg due to apply_stats() which can be executed after
// informing snap mapper

View File

@ -435,6 +435,7 @@ SnapTrimObjSubEvent::process_and_submit(ObjectContextRef head_obc,
auto [submitted, all_completed] = co_await pg->submit_transaction(
std::move(clone_obc),
nullptr,
std::move(txn),
std::move(osd_op_p),
std::move(log_entries)

View File

@ -907,11 +907,23 @@ void PG::mutate_object(
}
}
void PG::enqueue_push_for_backfill(
const hobject_t &obj,
const eversion_t &v,
const std::vector<pg_shard_t> &peers)
{
assert(recovery_handler);
assert(recovery_handler->backfill_state);
auto backfill_state = recovery_handler->backfill_state.get();
backfill_state->enqueue_standalone_push(obj, v, peers);
}
PG::interruptible_future<
std::tuple<PG::interruptible_future<>,
PG::interruptible_future<>>>
PG::submit_transaction(
ObjectContextRef&& obc,
ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p,
std::vector<pg_log_entry_t>&& log_entries)
@ -924,8 +936,9 @@ PG::submit_transaction(
}
epoch_t map_epoch = get_osdmap_epoch();
auto at_version = osd_op_p.at_version;
peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, at_version);
peering_state.update_trim_to();
ceph_assert(!log_entries.empty());
@ -939,6 +952,7 @@ PG::submit_transaction(
auto [submitted, all_completed] = co_await backend->submit_transaction(
peering_state.get_acting_recovery_backfill(),
obc->obs.oi.soid,
std::move(new_clone),
std::move(txn),
std::move(osd_op_p),
peering_state.get_last_peering_reset(),
@ -947,8 +961,8 @@ PG::submit_transaction(
co_return std::make_tuple(
std::move(submitted),
all_completed.then_interruptible(
[this, last_complete=peering_state.get_info().last_complete,
at_version=osd_op_p.at_version](auto acked) {
[this, last_complete=peering_state.get_info().last_complete, at_version]
(auto acked) {
for (const auto& peer : acked) {
peering_state.update_peer_last_complete_ondisk(
peer.shard, peer.last_complete_ondisk);
@ -1153,11 +1167,13 @@ PG::submit_executer_fut PG::submit_executer(
[FNAME, this](auto&& txn,
auto&& obc,
auto&& osd_op_p,
auto&& log_entries) {
auto&& log_entries,
auto&& new_clone) {
DEBUGDPP("object {} submitting txn", *this, obc->get_oid());
mutate_object(obc, txn, osd_op_p);
return submit_transaction(
std::move(obc),
std::move(new_clone),
std::move(txn),
std::move(osd_op_p),
std::move(log_entries));
@ -1604,7 +1620,7 @@ bool PG::should_send_op(
// missing set
hoid <= peering_state.get_peer_info(peer).last_backfill ||
(has_backfill_state() && hoid <= get_last_backfill_started() &&
!peering_state.get_peer_missing(peer).is_missing(hoid)));
!is_missing_on_peer(peer, hoid)));
if (!should_send) {
ceph_assert(is_backfill_target(peer));
logger().debug("{} issue_repop shipping empty opt to osd."

View File

@ -45,6 +45,7 @@
class MQuery;
class OSDMap;
class PGBackend;
class ReplicatedBackend;
class PGPeeringEvent;
class osd_op_params_t;
@ -678,6 +679,7 @@ private:
std::tuple<interruptible_future<>, interruptible_future<>>>
submit_transaction(
ObjectContextRef&& obc,
ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& oop,
std::vector<pg_log_entry_t>&& log_entries);
@ -885,6 +887,10 @@ private:
friend class SnapTrimObjSubEvent;
private:
void enqueue_push_for_backfill(
const hobject_t &obj,
const eversion_t &v,
const std::vector<pg_shard_t> &peers);
void mutate_object(
ObjectContextRef& obc,
ceph::os::Transaction& txn,
@ -893,7 +899,7 @@ private:
bool can_discard_op(const MOSDOp& m) const;
void context_registry_on_change();
bool is_missing_object(const hobject_t& soid) const {
return peering_state.get_pg_log().get_missing().get_items().count(soid);
return get_local_missing().is_missing(soid);
}
bool is_unreadable_object(const hobject_t &oid,
eversion_t* v = 0) const final {
@ -901,6 +907,11 @@ private:
!peering_state.get_missing_loc().readable_with_acting(
oid, get_actingset(), v);
}
bool is_missing_on_peer(
const pg_shard_t &peer,
const hobject_t &soid) const {
return peering_state.get_peer_missing(peer).is_missing(soid);
}
bool is_degraded_or_backfilling_object(const hobject_t& soid) const;
const std::set<pg_shard_t> &get_actingset() const {
return peering_state.get_actingset();
@ -908,6 +919,7 @@ private:
private:
friend class IOInterruptCondition;
friend class ::ReplicatedBackend;
struct log_update_t {
std::set<pg_shard_t> waiting_on;
seastar::shared_promise<> all_committed;

View File

@ -414,6 +414,7 @@ public:
virtual rep_op_fut_t
submit_transaction(const std::set<pg_shard_t> &pg_shards,
const hobject_t& hoid,
crimson::osd::ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p,
epoch_t min_epoch, epoch_t max_epoch,

View File

@ -45,6 +45,10 @@ public:
seastar::future<> stop() { return seastar::now(); }
void on_pg_clean();
void enqueue_push(
const hobject_t& obj,
const eversion_t& v,
const std::vector<pg_shard_t> &peers) final;
private:
PGRecoveryListener* pg;
size_t start_primary_recovery_ops(
@ -108,10 +112,6 @@ private:
const hobject_t& end) final;
void request_primary_scan(
const hobject_t& begin) final;
void enqueue_push(
const hobject_t& obj,
const eversion_t& v,
const std::vector<pg_shard_t> &peers) final;
void enqueue_drop(
const pg_shard_t& target,
const hobject_t& obj,

View File

@ -36,19 +36,59 @@ ReplicatedBackend::_read(const hobject_t& hoid,
return store->read(coll, ghobject_t{hoid}, off, len, flags);
}
MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
const pg_shard_t &pg_shard,
const hobject_t &hoid,
const bufferlist &encoded_txn,
const osd_op_params_t &osd_op_p,
epoch_t min_epoch,
epoch_t map_epoch,
const std::vector<pg_log_entry_t> &log_entries,
bool send_op,
ceph_tid_t tid)
{
ceph_assert(pg_shard != whoami);
auto m = crimson::make_message<MOSDRepOp>(
osd_op_p.req_id,
whoami,
spg_t{pgid, pg_shard.shard},
hoid,
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
map_epoch,
min_epoch,
tid,
osd_op_p.at_version);
if (send_op) {
m->set_data(encoded_txn);
} else {
ceph::os::Transaction t;
bufferlist bl;
encode(t, bl);
m->set_data(bl);
}
encode(log_entries, m->logbl);
m->pg_trim_to = osd_op_p.pg_trim_to;
m->pg_committed_to = osd_op_p.pg_committed_to;
m->pg_stats = pg.get_info().stats;
return m;
}
ReplicatedBackend::rep_op_fut_t
ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& t,
osd_op_params_t&& opp,
epoch_t min_epoch, epoch_t map_epoch,
std::vector<pg_log_entry_t>&& logv)
ReplicatedBackend::submit_transaction(
const std::set<pg_shard_t> &pg_shards,
const hobject_t& hoid,
crimson::osd::ObjectContextRef &&new_clone,
ceph::os::Transaction&& t,
osd_op_params_t&& opp,
epoch_t min_epoch, epoch_t map_epoch,
std::vector<pg_log_entry_t>&& logv)
{
LOG_PREFIX(ReplicatedBackend::submit_transaction);
DEBUGDPP("object {}", dpp, hoid);
auto log_entries = std::move(logv);
auto txn = std::move(t);
auto osd_op_p = std::move(opp);
auto _new_clone = std::move(new_clone);
const ceph_tid_t tid = shard_services.get_tid();
auto pending_txn =
@ -60,37 +100,34 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
le.mark_unrollbackable();
}
std::vector<pg_shard_t> to_push_clone;
auto sends = std::make_unique<std::vector<seastar::future<>>>();
for (auto pg_shard : pg_shards) {
if (pg_shard != whoami) {
auto m = crimson::make_message<MOSDRepOp>(
osd_op_p.req_id,
whoami,
spg_t{pgid, pg_shard.shard},
hoid,
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
map_epoch,
min_epoch,
tid,
osd_op_p.at_version);
if (pg.should_send_op(pg_shard, hoid)) {
m->set_data(encoded_txn);
} else {
ceph::os::Transaction t;
bufferlist bl;
encode(t, bl);
m->set_data(bl);
}
pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
encode(log_entries, m->logbl);
m->pg_trim_to = osd_op_p.pg_trim_to;
m->pg_committed_to = osd_op_p.pg_committed_to;
m->pg_stats = pg.get_info().stats;
// TODO: set more stuff. e.g., pg_states
sends->emplace_back(
shard_services.send_to_osd(
pg_shard.osd, std::move(m), map_epoch));
for (auto &pg_shard : pg_shards) {
if (pg_shard == whoami) {
continue;
}
MURef<MOSDRepOp> m;
if (pg.should_send_op(pg_shard, hoid)) {
m = new_repop_msg(
pg_shard, hoid, encoded_txn, osd_op_p,
min_epoch, map_epoch, log_entries, true, tid);
} else {
m = new_repop_msg(
pg_shard, hoid, encoded_txn, osd_op_p,
min_epoch, map_epoch, log_entries, false, tid);
if (_new_clone && pg.is_missing_on_peer(pg_shard, hoid)) {
// The head is in the push queue but hasn't been pushed yet.
// We need to ensure that the newly created clone will be
// pushed as well, otherwise we might skip it.
// See: https://tracker.ceph.com/issues/68808
to_push_clone.push_back(pg_shard);
}
}
pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
// TODO: set more stuff. e.g., pg_states
sends->emplace_back(
shard_services.send_to_osd(
pg_shard.osd, std::move(m), map_epoch));
}
co_await pg.update_snap_map(log_entries, txn);
@ -120,9 +157,16 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
return seastar::now();
}
return peers->all_committed.get_shared_future();
}).then_interruptible([pending_txn, this] {
}).then_interruptible([pending_txn, this, _new_clone,
to_push_clone=std::move(to_push_clone)] {
auto acked_peers = std::move(pending_txn->second.acked_peers);
pending_trans.erase(pending_txn);
if (_new_clone && !to_push_clone.empty()) {
pg.enqueue_push_for_backfill(
_new_clone->obs.oi.soid,
_new_clone->obs.oi.version,
to_push_clone);
}
return seastar::make_ready_future<
crimson::osd::acked_peers_t>(std::move(acked_peers));
});

View File

@ -35,6 +35,7 @@ private:
rep_op_fut_t submit_transaction(
const std::set<pg_shard_t> &pg_shards,
const hobject_t& hoid,
crimson::osd::ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p,
epoch_t min_epoch, epoch_t max_epoch,
@ -60,6 +61,17 @@ private:
pending_transactions_t pending_trans;
crimson::osd::PG& pg;
MURef<MOSDRepOp> new_repop_msg(
const pg_shard_t &pg_shard,
const hobject_t &hoid,
const bufferlist &encoded_txn,
const osd_op_params_t &osd_op_p,
epoch_t min_epoch,
epoch_t map_epoch,
const std::vector<pg_log_entry_t> &log_entries,
bool send_op,
ceph_tid_t tid);
seastar::future<> request_committed(
const osd_reqid_t& reqid, const eversion_t& at_version) final;
};