diff --git a/src/crimson/osd/backfill_state.cc b/src/crimson/osd/backfill_state.cc index 0fdef54acce..837fd2eb2af 100644 --- a/src/crimson/osd/backfill_state.cc +++ b/src/crimson/osd/backfill_state.cc @@ -611,4 +611,12 @@ void BackfillState::ProgressTracker::complete_to( } } +void BackfillState::enqueue_standalone_push( + const hobject_t &obj, + const eversion_t &v, + const std::vector &peers) { + progress_tracker->enqueue_push(obj); + backfill_machine.backfill_listener.enqueue_push(obj, v, peers); +} + } // namespace crimson::osd diff --git a/src/crimson/osd/backfill_state.h b/src/crimson/osd/backfill_state.h index a49cbeaac06..072c91e079d 100644 --- a/src/crimson/osd/backfill_state.h +++ b/src/crimson/osd/backfill_state.h @@ -304,6 +304,15 @@ public: backfill_machine.process_event(*std::move(evt)); } + void enqueue_standalone_push( + const hobject_t &obj, + const eversion_t &v, + const std::vector &peers); + + bool is_triggered() const { + return backfill_machine.triggering_event() != nullptr; + } + hobject_t get_last_backfill_started() const { return last_backfill_started; } diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 32eaaf02b3f..007d0bf35f3 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -26,6 +26,7 @@ ECBackend::_read(const hobject_t& hoid, ECBackend::rep_op_fut_t ECBackend::submit_transaction(const std::set &pg_shards, const hobject_t& hoid, + crimson::osd::ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index 90a7e2b1f4d..b14c78c9fc4 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -28,6 +28,7 @@ private: rep_op_fut_t submit_transaction(const std::set &pg_shards, const hobject_t& hoid, + crimson::osd::ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& req, epoch_t min_epoch, epoch_t max_epoch, diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 4e735c3b4cb..97b241fdce4 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -940,6 +940,7 @@ std::unique_ptr OpsExecuter::execute_clone( }; encode(cloned_snaps, cloning_ctx->log_entry.snaps); cloning_ctx->log_entry.clean_regions.mark_data_region_dirty(0, initial_obs.oi.size); + cloning_ctx->clone_obc = clone_obc; return cloning_ctx; } @@ -966,7 +967,7 @@ void OpsExecuter::update_clone_overlap() { void OpsExecuter::CloningContext::apply_to( std::vector& log_entries, - ObjectContext& processed_obc) && + ObjectContext& processed_obc) { log_entry.mtime = processed_obc.obs.oi.mtime; log_entries.insert(log_entries.begin(), std::move(log_entry)); @@ -983,7 +984,7 @@ OpsExecuter::flush_clone_metadata( assert(!txn.empty()); update_clone_overlap(); if (cloning_ctx) { - std::move(*cloning_ctx).apply_to(log_entries, *obc); + cloning_ctx->apply_to(log_entries, *obc); } if (snapc.seq > obc->ssc->snapset.seq) { // update snapset with latest snap context diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index e770e825b32..94b64ccebb1 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -197,10 +197,11 @@ private: struct CloningContext { SnapSet new_snapset; pg_log_entry_t log_entry; + ObjectContextRef clone_obc; void apply_to( std::vector& log_entries, - ObjectContext& processed_obc) &&; + ObjectContext& processed_obc); }; std::unique_ptr cloning_ctx; @@ -504,6 +505,7 @@ OpsExecuter::flush_changes_n_do_ops_effects( ceph_assert(want_mutate); } + apply_stats(); if (want_mutate) { auto log_entries = flush_clone_metadata( prepare_transaction(ops), @@ -519,14 +521,15 @@ OpsExecuter::flush_changes_n_do_ops_effects( std::move(txn), std::move(obc), std::move(*osd_op_params), - std::move(log_entries)); + std::move(log_entries), + cloning_ctx + ? std::move(cloning_ctx->clone_obc) + : nullptr); submitted = std::move(_submitted); all_completed = std::move(_all_completed); } - apply_stats(); - if (op_effects.size()) [[unlikely]] { // need extra ref pg due to apply_stats() which can be executed after // informing snap mapper diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index c5bdcae47f2..8cab6125682 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -435,6 +435,7 @@ SnapTrimObjSubEvent::process_and_submit(ObjectContextRef head_obc, auto [submitted, all_completed] = co_await pg->submit_transaction( std::move(clone_obc), + nullptr, std::move(txn), std::move(osd_op_p), std::move(log_entries) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index de39fb45716..1e2988efbbe 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -907,11 +907,23 @@ void PG::mutate_object( } } +void PG::enqueue_push_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector &peers) +{ + assert(recovery_handler); + assert(recovery_handler->backfill_state); + auto backfill_state = recovery_handler->backfill_state.get(); + backfill_state->enqueue_standalone_push(obj, v, peers); +} + PG::interruptible_future< std::tuple, PG::interruptible_future<>>> PG::submit_transaction( ObjectContextRef&& obc, + ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, std::vector&& log_entries) @@ -924,8 +936,9 @@ PG::submit_transaction( } epoch_t map_epoch = get_osdmap_epoch(); + auto at_version = osd_op_p.at_version; - peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version); + peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, at_version); peering_state.update_trim_to(); ceph_assert(!log_entries.empty()); @@ -939,6 +952,7 @@ PG::submit_transaction( auto [submitted, all_completed] = co_await backend->submit_transaction( peering_state.get_acting_recovery_backfill(), obc->obs.oi.soid, + std::move(new_clone), std::move(txn), std::move(osd_op_p), peering_state.get_last_peering_reset(), @@ -947,8 +961,8 @@ PG::submit_transaction( co_return std::make_tuple( std::move(submitted), all_completed.then_interruptible( - [this, last_complete=peering_state.get_info().last_complete, - at_version=osd_op_p.at_version](auto acked) { + [this, last_complete=peering_state.get_info().last_complete, at_version] + (auto acked) { for (const auto& peer : acked) { peering_state.update_peer_last_complete_ondisk( peer.shard, peer.last_complete_ondisk); @@ -1153,11 +1167,13 @@ PG::submit_executer_fut PG::submit_executer( [FNAME, this](auto&& txn, auto&& obc, auto&& osd_op_p, - auto&& log_entries) { + auto&& log_entries, + auto&& new_clone) { DEBUGDPP("object {} submitting txn", *this, obc->get_oid()); mutate_object(obc, txn, osd_op_p); return submit_transaction( std::move(obc), + std::move(new_clone), std::move(txn), std::move(osd_op_p), std::move(log_entries)); @@ -1604,7 +1620,7 @@ bool PG::should_send_op( // missing set hoid <= peering_state.get_peer_info(peer).last_backfill || (has_backfill_state() && hoid <= get_last_backfill_started() && - !peering_state.get_peer_missing(peer).is_missing(hoid))); + !is_missing_on_peer(peer, hoid))); if (!should_send) { ceph_assert(is_backfill_target(peer)); logger().debug("{} issue_repop shipping empty opt to osd." diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index b0429c8fb4f..15aeec0e4f3 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -45,6 +45,7 @@ class MQuery; class OSDMap; class PGBackend; +class ReplicatedBackend; class PGPeeringEvent; class osd_op_params_t; @@ -678,6 +679,7 @@ private: std::tuple, interruptible_future<>>> submit_transaction( ObjectContextRef&& obc, + ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& oop, std::vector&& log_entries); @@ -885,6 +887,10 @@ private: friend class SnapTrimObjSubEvent; private: + void enqueue_push_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector &peers); void mutate_object( ObjectContextRef& obc, ceph::os::Transaction& txn, @@ -893,7 +899,7 @@ private: bool can_discard_op(const MOSDOp& m) const; void context_registry_on_change(); bool is_missing_object(const hobject_t& soid) const { - return peering_state.get_pg_log().get_missing().get_items().count(soid); + return get_local_missing().is_missing(soid); } bool is_unreadable_object(const hobject_t &oid, eversion_t* v = 0) const final { @@ -901,6 +907,11 @@ private: !peering_state.get_missing_loc().readable_with_acting( oid, get_actingset(), v); } + bool is_missing_on_peer( + const pg_shard_t &peer, + const hobject_t &soid) const { + return peering_state.get_peer_missing(peer).is_missing(soid); + } bool is_degraded_or_backfilling_object(const hobject_t& soid) const; const std::set &get_actingset() const { return peering_state.get_actingset(); @@ -908,6 +919,7 @@ private: private: friend class IOInterruptCondition; + friend class ::ReplicatedBackend; struct log_update_t { std::set waiting_on; seastar::shared_promise<> all_committed; diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index fa1f1405ffe..813218983fd 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -414,6 +414,7 @@ public: virtual rep_op_fut_t submit_transaction(const std::set &pg_shards, const hobject_t& hoid, + crimson::osd::ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h index 705b3176b97..657e6d3e888 100644 --- a/src/crimson/osd/pg_recovery.h +++ b/src/crimson/osd/pg_recovery.h @@ -45,6 +45,10 @@ public: seastar::future<> stop() { return seastar::now(); } void on_pg_clean(); + void enqueue_push( + const hobject_t& obj, + const eversion_t& v, + const std::vector &peers) final; private: PGRecoveryListener* pg; size_t start_primary_recovery_ops( @@ -108,10 +112,6 @@ private: const hobject_t& end) final; void request_primary_scan( const hobject_t& begin) final; - void enqueue_push( - const hobject_t& obj, - const eversion_t& v, - const std::vector &peers) final; void enqueue_drop( const pg_shard_t& target, const hobject_t& obj, diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 12ee38b4370..f09cd147ea9 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -36,19 +36,59 @@ ReplicatedBackend::_read(const hobject_t& hoid, return store->read(coll, ghobject_t{hoid}, off, len, flags); } +MURef ReplicatedBackend::new_repop_msg( + const pg_shard_t &pg_shard, + const hobject_t &hoid, + const bufferlist &encoded_txn, + const osd_op_params_t &osd_op_p, + epoch_t min_epoch, + epoch_t map_epoch, + const std::vector &log_entries, + bool send_op, + ceph_tid_t tid) +{ + ceph_assert(pg_shard != whoami); + auto m = crimson::make_message( + osd_op_p.req_id, + whoami, + spg_t{pgid, pg_shard.shard}, + hoid, + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, + map_epoch, + min_epoch, + tid, + osd_op_p.at_version); + if (send_op) { + m->set_data(encoded_txn); + } else { + ceph::os::Transaction t; + bufferlist bl; + encode(t, bl); + m->set_data(bl); + } + encode(log_entries, m->logbl); + m->pg_trim_to = osd_op_p.pg_trim_to; + m->pg_committed_to = osd_op_p.pg_committed_to; + m->pg_stats = pg.get_info().stats; + return m; +} + ReplicatedBackend::rep_op_fut_t -ReplicatedBackend::submit_transaction(const std::set& pg_shards, - const hobject_t& hoid, - ceph::os::Transaction&& t, - osd_op_params_t&& opp, - epoch_t min_epoch, epoch_t map_epoch, - std::vector&& logv) +ReplicatedBackend::submit_transaction( + const std::set &pg_shards, + const hobject_t& hoid, + crimson::osd::ObjectContextRef &&new_clone, + ceph::os::Transaction&& t, + osd_op_params_t&& opp, + epoch_t min_epoch, epoch_t map_epoch, + std::vector&& logv) { LOG_PREFIX(ReplicatedBackend::submit_transaction); DEBUGDPP("object {}", dpp, hoid); auto log_entries = std::move(logv); auto txn = std::move(t); auto osd_op_p = std::move(opp); + auto _new_clone = std::move(new_clone); const ceph_tid_t tid = shard_services.get_tid(); auto pending_txn = @@ -60,37 +100,34 @@ ReplicatedBackend::submit_transaction(const std::set& pg_shards, le.mark_unrollbackable(); } + std::vector to_push_clone; auto sends = std::make_unique>>(); - for (auto pg_shard : pg_shards) { - if (pg_shard != whoami) { - auto m = crimson::make_message( - osd_op_p.req_id, - whoami, - spg_t{pgid, pg_shard.shard}, - hoid, - CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, - map_epoch, - min_epoch, - tid, - osd_op_p.at_version); - if (pg.should_send_op(pg_shard, hoid)) { - m->set_data(encoded_txn); - } else { - ceph::os::Transaction t; - bufferlist bl; - encode(t, bl); - m->set_data(bl); - } - pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); - encode(log_entries, m->logbl); - m->pg_trim_to = osd_op_p.pg_trim_to; - m->pg_committed_to = osd_op_p.pg_committed_to; - m->pg_stats = pg.get_info().stats; - // TODO: set more stuff. e.g., pg_states - sends->emplace_back( - shard_services.send_to_osd( - pg_shard.osd, std::move(m), map_epoch)); + for (auto &pg_shard : pg_shards) { + if (pg_shard == whoami) { + continue; } + MURef m; + if (pg.should_send_op(pg_shard, hoid)) { + m = new_repop_msg( + pg_shard, hoid, encoded_txn, osd_op_p, + min_epoch, map_epoch, log_entries, true, tid); + } else { + m = new_repop_msg( + pg_shard, hoid, encoded_txn, osd_op_p, + min_epoch, map_epoch, log_entries, false, tid); + if (_new_clone && pg.is_missing_on_peer(pg_shard, hoid)) { + // The head is in the push queue but hasn't been pushed yet. + // We need to ensure that the newly created clone will be + // pushed as well, otherwise we might skip it. + // See: https://tracker.ceph.com/issues/68808 + to_push_clone.push_back(pg_shard); + } + } + pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); + // TODO: set more stuff. e.g., pg_states + sends->emplace_back( + shard_services.send_to_osd( + pg_shard.osd, std::move(m), map_epoch)); } co_await pg.update_snap_map(log_entries, txn); @@ -120,9 +157,16 @@ ReplicatedBackend::submit_transaction(const std::set& pg_shards, return seastar::now(); } return peers->all_committed.get_shared_future(); - }).then_interruptible([pending_txn, this] { + }).then_interruptible([pending_txn, this, _new_clone, + to_push_clone=std::move(to_push_clone)] { auto acked_peers = std::move(pending_txn->second.acked_peers); pending_trans.erase(pending_txn); + if (_new_clone && !to_push_clone.empty()) { + pg.enqueue_push_for_backfill( + _new_clone->obs.oi.soid, + _new_clone->obs.oi.version, + to_push_clone); + } return seastar::make_ready_future< crimson::osd::acked_peers_t>(std::move(acked_peers)); }); diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index fb8704d8742..d5844b23a0c 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -35,6 +35,7 @@ private: rep_op_fut_t submit_transaction( const std::set &pg_shards, const hobject_t& hoid, + crimson::osd::ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, @@ -60,6 +61,17 @@ private: pending_transactions_t pending_trans; crimson::osd::PG& pg; + MURef new_repop_msg( + const pg_shard_t &pg_shard, + const hobject_t &hoid, + const bufferlist &encoded_txn, + const osd_op_params_t &osd_op_p, + epoch_t min_epoch, + epoch_t map_epoch, + const std::vector &log_entries, + bool send_op, + ceph_tid_t tid); + seastar::future<> request_committed( const osd_reqid_t& reqid, const eversion_t& at_version) final; };