diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 5cc2ffaf5a8..9ea0d1e6564 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -22,7 +22,7 @@ ECBackend::_read(const hobject_t& hoid, return seastar::make_ready_future(); } -ECBackend::interruptible_future +ECBackend::rep_op_fut_t ECBackend::_submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, @@ -31,5 +31,6 @@ ECBackend::_submit_transaction(std::set&& pg_shards, std::vector&& log_entries) { // todo - return seastar::make_ready_future(); + return {seastar::now(), + seastar::make_ready_future()}; } diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index 44da609cb8e..4b736622b4c 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -24,7 +24,7 @@ public: private: ll_read_ierrorator::future _read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override; - interruptible_future + rep_op_fut_t _submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index d9d854b8780..d78354a4e24 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -261,8 +261,12 @@ public: interruptible_errorated_future execute_op(OSDOp& osd_op); + using rep_op_fut_tuple = + std::tuple, osd_op_ierrorator::future<>>; + using rep_op_fut_t = + interruptible_future; template - osd_op_ierrorator::future<> flush_changes_n_do_ops_effects( + rep_op_fut_t flush_changes_n_do_ops_effects( Ref pg, MutFunc&& mut_func) &&; @@ -325,32 +329,42 @@ auto OpsExecuter::with_effect_on_obc( } template -OpsExecuter::osd_op_ierrorator::future<> +OpsExecuter::rep_op_fut_t OpsExecuter::flush_changes_n_do_ops_effects(Ref pg, MutFunc&& mut_func) && { const bool want_mutate = !txn.empty(); // osd_op_params are instantiated by every wr-like operation. assert(osd_op_params || !want_mutate); assert(obc); - auto maybe_mutated = interruptor::make_interruptible(osd_op_errorator::now()); + rep_op_fut_t maybe_mutated = + interruptor::make_ready_future( + seastar::now(), + interruptor::make_interruptible(osd_op_errorator::now())); if (want_mutate) { osd_op_params->req_id = msg->get_reqid(); osd_op_params->mtime = msg->get_mtime(); - maybe_mutated = std::forward(mut_func)(std::move(txn), + auto [submitted, all_completed] = std::forward(mut_func)(std::move(txn), std::move(obc), std::move(*osd_op_params), user_modify); + maybe_mutated = interruptor::make_ready_future( + std::move(submitted), + osd_op_ierrorator::future<>(std::move(all_completed))); } if (__builtin_expect(op_effects.empty(), true)) { return maybe_mutated; } else { - return maybe_mutated.safe_then_interruptible([pg=std::move(pg), - this] () mutable { - // let's do the cleaning of `op_effects` in destructor - return interruptor::do_for_each(op_effects, - [pg=std::move(pg)] (auto& op_effect) { - return op_effect->execute(pg); - }); + return maybe_mutated.then_unpack_interruptible( + [this, pg=std::move(pg)](auto&& submitted, auto&& all_completed) mutable { + return interruptor::make_ready_future( + std::move(submitted), + all_completed.safe_then_interruptible([this, pg=std::move(pg)] { + // let's do the cleaning of `op_effects` in destructor + return interruptor::do_for_each(op_effects, + [pg=std::move(pg)](auto& op_effect) { + return op_effect->execute(pg); + }); + })); }); } } diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index d178256e4d5..b7ac62044f9 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -202,9 +202,25 @@ ClientRequest::do_process(Ref& pg, crimson::osd::ObjectContextRef obc) return conn->send(std::move(reply)); } } - return pg->do_osd_ops(m, obc, op_info).safe_then_interruptible( - [this](Ref reply) -> interruptible_future<> { - return conn->send(std::move(reply)); + return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible( + [this, pg](auto submitted, auto all_completed) mutable { + return submitted.then_interruptible( + [this, pg] { + return with_blocking_future_interruptible( + handle.enter(pp(*pg).wait_repop)); + }).then_interruptible( + [this, pg, all_completed=std::move(all_completed)]() mutable { + return all_completed.safe_then_interruptible( + [this, pg](Ref reply) { + return with_blocking_future_interruptible( + handle.enter(pp(*pg).send_reply)).then_interruptible( + [this, reply=std::move(reply)] { + return conn->send(std::move(reply)); + }); + }, crimson::ct_error::eagain::handle([this, pg]() mutable { + return process_op(pg); + })); + }); }, crimson::ct_error::eagain::handle([this, pg]() mutable { return process_op(pg); })); diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index 9ea028f2e4d..fcc64c545a4 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -81,9 +81,12 @@ seastar::future<> InternalClientRequest::start() [] (const std::error_code& e) { return PG::do_osd_ops_iertr::now(); } - ).safe_then_interruptible( - [] { - return interruptor::now(); + ).safe_then_unpack_interruptible( + [](auto submitted, auto all_completed) { + return all_completed.handle_error_interruptible( + crimson::ct_error::eagain::handle([] { + return seastar::now(); + })); }, crimson::ct_error::eagain::handle([] { return interruptor::now(); }) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 35b41fbc294..3ff19636d78 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -566,7 +566,9 @@ seastar::future<> PG::WaitForActiveBlocker::stop() return seastar::now(); } -PG::interruptible_future<> PG::submit_transaction( +std::tuple, + PG::interruptible_future<>> +PG::submit_transaction( const OpInfo& op_info, const std::vector& ops, ObjectContextRef&& obc, @@ -574,8 +576,9 @@ PG::interruptible_future<> PG::submit_transaction( osd_op_params_t&& osd_op_p) { if (__builtin_expect(stopping, false)) { - return seastar::make_exception_future<>( - crimson::common::system_shutdown_exception()); + return {seastar::make_exception_future<>( + crimson::common::system_shutdown_exception()), + seastar::now()}; } epoch_t map_epoch = get_osdmap_epoch(); @@ -603,13 +606,15 @@ PG::interruptible_future<> PG::submit_transaction( peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version, txn, true, false); - return backend->mutate_object(peering_state.get_acting_recovery_backfill(), - std::move(obc), - std::move(txn), - std::move(osd_op_p), - peering_state.get_last_peering_reset(), - map_epoch, - std::move(log_entries)).then_interruptible( + auto [submitted, all_completed] = backend->mutate_object( + peering_state.get_acting_recovery_backfill(), + std::move(obc), + std::move(txn), + std::move(osd_op_p), + peering_state.get_last_peering_reset(), + map_epoch, + std::move(log_entries)); + return std::make_tuple(std::move(submitted), all_completed.then_interruptible( [this, last_complete=peering_state.get_info().last_complete, at_version=osd_op_p.at_version](auto acked) { for (const auto& peer : acked) { @@ -618,7 +623,7 @@ PG::interruptible_future<> PG::submit_transaction( } peering_state.complete_write(at_version, last_complete); return seastar::now(); - }); + })); } void PG::fill_op_params_bump_pg_version( @@ -697,7 +702,8 @@ PG::interruptible_future<> PG::repair_object( } template -PG::do_osd_ops_iertr::future PG::do_osd_ops_execute( +PG::do_osd_ops_iertr::future> +PG::do_osd_ops_execute( OpsExecuter&& ox, std::vector ops, const OpInfo &op_info, @@ -708,6 +714,7 @@ PG::do_osd_ops_iertr::future PG::do_osd_ops_execute( return reload_obc(obc).handle_error_interruptible( load_obc_ertr::assert_all{"can't live with object state messed up"}); }); + auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func)); return interruptor::do_for_each(ops, [&ox](OSDOp& osd_op) { logger().debug( "do_osd_ops_execute: object {} - handling op {}", @@ -735,31 +742,48 @@ PG::do_osd_ops_iertr::future PG::do_osd_ops_execute( std::move(txn), std::move(osd_op_p)); }); - }).safe_then_interruptible_tuple([success_func=std::move(success_func)] { - return std::move(success_func)(); - }, crimson::ct_error::object_corrupted::handle( - [rollbacker, this] (const std::error_code& e) mutable { - // this is a path for EIO. it's special because we want to fix the obejct - // and try again. that is, the layer above `PG::do_osd_ops` is supposed to - // restart the execution. - return rollbacker.rollback_obc_if_modified(e).then_interruptible( - [obc=rollbacker.get_obc(), this] { - return repair_object(obc->obs.oi.soid, - obc->obs.oi.version).then_interruptible([] { - return do_osd_ops_iertr::future{crimson::ct_error::eagain::make()}; - }); - }); - }), OpsExecuter::osd_op_errorator::all_same_way( - [rollbacker, failure_func=std::move(failure_func)] + }).safe_then_unpack_interruptible( + [success_func=std::move(success_func), rollbacker, this, failure_func_ptr] + (auto submitted_fut, auto all_completed_fut) mutable { + return PG::do_osd_ops_iertr::make_ready_future>( + std::move(submitted_fut), + all_completed_fut.safe_then_interruptible_tuple( + std::move(success_func), + crimson::ct_error::object_corrupted::handle( + [rollbacker, this] (const std::error_code& e) mutable { + // this is a path for EIO. it's special because we want to fix the obejct + // and try again. that is, the layer above `PG::do_osd_ops` is supposed to + // restart the execution. + return rollbacker.rollback_obc_if_modified(e).then_interruptible( + [obc=rollbacker.get_obc(), this] { + return repair_object(obc->obs.oi.soid, + obc->obs.oi.version).then_interruptible([] { + return do_osd_ops_iertr::future{crimson::ct_error::eagain::make()}; + }); + }); + }), OpsExecuter::osd_op_errorator::all_same_way( + [rollbacker, failure_func_ptr] + (const std::error_code& e) mutable { + return rollbacker.rollback_obc_if_modified(e).then_interruptible( + [&e, failure_func_ptr] { + return (*failure_func_ptr)(e); + }); + }) + ) + ); + }, OpsExecuter::osd_op_errorator::all_same_way( + [rollbacker, failure_func_ptr] (const std::error_code& e) mutable { - return rollbacker.rollback_obc_if_modified(e).then_interruptible( - [&e, failure_func=std::move(failure_func)] { - return std::move(failure_func)(e); - }); + return PG::do_osd_ops_iertr::make_ready_future>( + seastar::now(), + rollbacker.rollback_obc_if_modified(e).then_interruptible( + [&e, failure_func_ptr] { + return (*failure_func_ptr)(e); + })); })); } -PG::do_osd_ops_iertr::future> +PG::do_osd_ops_iertr::future>> PG::do_osd_ops( Ref m, ObjectContextRef obc, @@ -803,7 +827,7 @@ PG::do_osd_ops( ).finally([ox_deleter=std::move(ox)] {}); } -PG::do_osd_ops_iertr::future<> +PG::do_osd_ops_iertr::future> PG::do_osd_ops( ObjectContextRef obc, std::vector ops, diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 45958c4b8e2..45655d25e33 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -573,7 +573,11 @@ private: ::crimson::interruptible::interruptible_errorator< ::crimson::osd::IOInterruptCondition, ::crimson::errorator>; - do_osd_ops_iertr::future> do_osd_ops( + template + using pg_rep_op_fut_t = + std::tuple, + do_osd_ops_iertr::future>; + do_osd_ops_iertr::future>> do_osd_ops( Ref m, ObjectContextRef obc, const OpInfo &op_info); @@ -582,7 +586,7 @@ private: using do_osd_ops_failure_func_t = std::function(const std::error_code&)>; struct do_osd_ops_params_t; - do_osd_ops_iertr::future<> do_osd_ops( + do_osd_ops_iertr::future> do_osd_ops( ObjectContextRef obc, std::vector ops, const OpInfo &op_info, @@ -590,14 +594,15 @@ private: do_osd_ops_success_func_t success_func, do_osd_ops_failure_func_t failure_func); template - do_osd_ops_iertr::future do_osd_ops_execute( + do_osd_ops_iertr::future> do_osd_ops_execute( OpsExecuter&& ox, std::vector ops, const OpInfo &op_info, SuccessFunc&& success_func, FailureFunc&& failure_func); interruptible_future> do_pg_ops(Ref m); - interruptible_future<> submit_transaction( + std::tuple, interruptible_future<>> + submit_transaction( const OpInfo& op_info, const std::vector& ops, ObjectContextRef&& obc, diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index ab87a6e7e3e..14546b83195 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -119,7 +119,7 @@ PGBackend::load_metadata(const hobject_t& oid) })); } -PGBackend::interruptible_future +PGBackend::rep_op_fut_t PGBackend::mutate_object( std::set pg_shards, crimson::osd::ObjectContextRef &&obc, diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index 7f5a5275455..699bad16167 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -58,6 +58,9 @@ public: using interruptible_future = ::crimson::interruptible::interruptible_future< ::crimson::osd::IOInterruptCondition, T>; + using rep_op_fut_t = + std::tuple, + interruptible_future>; PGBackend(shard_id_t shard, CollectionRef coll, crimson::os::FuturizedStore* store); virtual ~PGBackend() = default; static std::unique_ptr create(pg_t pgid, @@ -158,7 +161,7 @@ public: const OSDOp& osd_op, ceph::os::Transaction& trans, osd_op_params_t& osd_op_params); - interruptible_future mutate_object( + rep_op_fut_t mutate_object( std::set pg_shards, crimson::osd::ObjectContextRef &&obc, ceph::os::Transaction&& txn, @@ -279,7 +282,7 @@ private: uint32_t flags) = 0; bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn); - virtual interruptible_future + virtual rep_op_fut_t _submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index d49c03e33dd..5a6e8e0141b 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -104,8 +104,7 @@ RecoveryBackend::handle_backfill_progress( m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS, t); return shard_services.get_store().do_transaction( - pg.get_collection_ref(), std::move(t) - ).or_terminate(); + pg.get_collection_ref(), std::move(t)).or_terminate(); } RecoveryBackend::interruptible_future<> @@ -153,8 +152,7 @@ RecoveryBackend::handle_backfill_remove( ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard)); } return shard_services.get_store().do_transaction( - pg.get_collection_ref(), std::move(t) - ).or_terminate(); + pg.get_collection_ref(), std::move(t)).or_terminate(); } RecoveryBackend::interruptible_future diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index db4ac59c645..66ebfd2ff4f 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -39,7 +39,7 @@ ReplicatedBackend::_read(const hobject_t& hoid, return store->read(coll, ghobject_t{hoid}, off, len, flags); } -ReplicatedBackend::interruptible_future +ReplicatedBackend::rep_op_fut_t ReplicatedBackend::_submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, @@ -60,49 +60,50 @@ ReplicatedBackend::_submit_transaction(std::set&& pg_shards, bufferlist encoded_txn; encode(txn, encoded_txn); - return interruptor::parallel_for_each(std::move(pg_shards), - [=, encoded_txn=std::move(encoded_txn), txn=std::move(txn)] - (auto pg_shard) mutable { - if (pg_shard == whoami) { - return shard_services.get_store().do_transaction(coll,std::move(txn)); - } else { - auto m = crimson::net::make_message( - osd_op_p.req_id, - whoami, - spg_t{pgid, pg_shard.shard}, - hoid, - CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, - map_epoch, - min_epoch, - tid, - osd_op_p.at_version); - m->set_data(encoded_txn); - pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); - encode(log_entries, m->logbl); - m->pg_trim_to = osd_op_p.pg_trim_to; - m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk; - m->set_rollback_to(osd_op_p.at_version); - // TODO: set more stuff. e.g., pg_states - return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch); - } - }).then_interruptible([this, peers=pending_txn->second.weak_from_this()] { - if (!peers) { - // for now, only actingset_changed can cause peers - // to be nullptr - assert(peering); - throw crimson::common::actingset_changed(peering->is_primary); - } - if (--peers->pending == 0) { - peers->all_committed.set_value(); - peers->all_committed = {}; - return seastar::now(); - } - return peers->all_committed.get_shared_future(); - }).then_interruptible([pending_txn, this] { - auto acked_peers = std::move(pending_txn->second.acked_peers); - pending_trans.erase(pending_txn); - return seastar::make_ready_future(std::move(acked_peers)); - }); + auto all_completed = interruptor::make_interruptible( + shard_services.get_store().do_transaction(coll, std::move(txn))) + .then_interruptible([this, peers=pending_txn->second.weak_from_this()] { + if (!peers) { + // for now, only actingset_changed can cause peers + // to be nullptr + assert(peering); + throw crimson::common::actingset_changed(peering->is_primary); + } + if (--peers->pending == 0) { + peers->all_committed.set_value(); + peers->all_committed = {}; + return seastar::now(); + } + return peers->all_committed.get_shared_future(); + }).then_interruptible([pending_txn, this] { + auto acked_peers = std::move(pending_txn->second.acked_peers); + pending_trans.erase(pending_txn); + return seastar::make_ready_future(std::move(acked_peers)); + }); + + for (auto pg_shard : pg_shards) { + if (pg_shard != whoami) { + auto m = crimson::net::make_message( + osd_op_p.req_id, + whoami, + spg_t{pgid, pg_shard.shard}, + hoid, + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, + map_epoch, + min_epoch, + tid, + osd_op_p.at_version); + m->set_data(encoded_txn); + pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); + encode(log_entries, m->logbl); + m->pg_trim_to = osd_op_p.pg_trim_to; + m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk; + m->set_rollback_to(osd_op_p.at_version); + // TODO: set more stuff. e.g., pg_states + (void) shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch); + } + } + return {seastar::now(), std::move(all_completed)}; } void ReplicatedBackend::on_actingset_changed(peering_info_t pi) diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index 1e2ae752a59..b768fed717b 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -29,13 +29,12 @@ private: ll_read_ierrorator::future _read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override; - interruptible_future - _submit_transaction(std::set&& pg_shards, - const hobject_t& hoid, - ceph::os::Transaction&& txn, - osd_op_params_t&& osd_op_p, - epoch_t min_epoch, epoch_t max_epoch, - std::vector&& log_entries) final; + rep_op_fut_t _submit_transaction(std::set&& pg_shards, + const hobject_t& hoid, + ceph::os::Transaction&& txn, + osd_op_params_t&& osd_op_p, + epoch_t min_epoch, epoch_t max_epoch, + std::vector&& log_entries) final; const pg_t pgid; const pg_shard_t whoami; crimson::osd::ShardServices& shard_services;