diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 66680e28248..3930575f2d6 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -711,6 +711,63 @@ PG::do_osd_ops_execute( })); })); } +seastar::future<> PG::submit_error_log( + Ref m, + const OpInfo &op_info, + ObjectContextRef obc, + const std::error_code e, + ceph_tid_t rep_tid, + eversion_t &version) +{ + const osd_reqid_t &reqid = m->get_reqid(); + mempool::osd_pglog::list log_entries; + log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, + obc->obs.oi.soid, + next_version(), + eversion_t(), 0, + reqid, utime_t(), + -e.value())); + if (op_info.allows_returnvec()) { + log_entries.back().set_op_returns(m->ops); + } + ceph_assert(is_primary()); + if (!log_entries.empty()) { + ceph_assert(log_entries.rbegin()->version >= projected_last_update); + version = projected_last_update = log_entries.rbegin()->version; + } + ceph::os::Transaction t; + peering_state.merge_new_log_entries( + log_entries, t, peering_state.get_pg_trim_to(), + peering_state.get_min_last_complete_ondisk()); + + set waiting_on; + for (auto &i : get_acting_recovery_backfill()) { + pg_shard_t peer(i); + if (peer == pg_whoami) continue; + ceph_assert(peering_state.get_peer_missing().count(peer)); + ceph_assert(peering_state.has_peer_info(peer)); + auto log_m = crimson::make_message( + log_entries, + spg_t(peering_state.get_info().pgid.pgid, i.shard), + pg_whoami.shard, + get_osdmap_epoch(), + get_last_peering_reset(), + rep_tid, + peering_state.get_pg_trim_to(), + peering_state.get_min_last_complete_ondisk()); + send_cluster_message(peer.osd, std::move(log_m), get_osdmap_epoch()); + waiting_on.insert(peer); + } + waiting_on.insert(pg_whoami); + log_entry_update_waiting_on.insert( + std::make_pair(rep_tid, log_update_t{std::move(waiting_on)})); + return shard_services.get_store().do_transaction( + get_collection_ref(), std::move(t)) + .then([this] { + peering_state.update_trim_to(); + return seastar::now(); + }); +} PG::do_osd_ops_iertr::future>> PG::do_osd_ops( @@ -763,18 +820,59 @@ PG::do_osd_ops( return do_osd_ops_iertr::make_ready_future>( std::move(reply)); }, - [m, this] (const std::error_code& e) { - auto reply = crimson::make_message( - m.get(), -e.value(), get_osdmap_epoch(), 0, false); - if (m->ops.empty() ? 0 : - m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) { - reply->set_result(0); + [m, &op_info, obc, this] (const std::error_code& e) { + return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) { + auto fut = seastar::now(); + epoch_t epoch = get_osdmap_epoch(); + ceph_tid_t rep_tid = shard_services.get_tid(); + auto last_complete = peering_state.get_info().last_complete; + if (op_info.may_write()) { + fut = submit_error_log(m, op_info, obc, e, rep_tid, version); } - reply->set_enoent_reply_versions( - peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - return do_osd_ops_iertr::make_ready_future>(std::move(reply)); + return fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete, this] { + auto log_reply = [m, e, this] { + auto reply = crimson::make_message( + m.get(), -e.value(), get_osdmap_epoch(), 0, false); + if (m->ops.empty() ? 0 : + m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) { + reply->set_result(0); + } + reply->set_enoent_reply_versions( + peering_state.get_info().last_update, + peering_state.get_info().last_user_version); + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + return do_osd_ops_iertr::make_ready_future>( + std::move(reply)); + }; + + if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) { + auto it = log_entry_update_waiting_on.find(rep_tid); + ceph_assert(it != log_entry_update_waiting_on.end()); + auto it2 = it->second.waiting_on.find(pg_whoami); + ceph_assert(it2 != it->second.waiting_on.end()); + it->second.waiting_on.erase(it2); + + if (it->second.waiting_on.empty()) { + log_entry_update_waiting_on.erase(it); + if (version != eversion_t()) { + peering_state.complete_write(version, last_complete); + } + return log_reply(); + } else { + return it->second.all_committed.get_shared_future() + .then([this, &version, last_complete, log_reply = std::move(log_reply)] { + if (version != eversion_t()) { + peering_state.complete_write(version, last_complete); + } + return log_reply(); + }); + } + } else { + return log_reply(); + } + }); }); + }); } PG::do_osd_ops_iertr::future> diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 0a23e55d131..5e764e738fa 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -551,6 +551,13 @@ public: void print(std::ostream& os) const; void dump_primary(Formatter*); + seastar::future<> submit_error_log( + Ref m, + const OpInfo &op_info, + ObjectContextRef obc, + const std::error_code e, + ceph_tid_t rep_tid, + eversion_t &version); private: template diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 6913a52cbb5..39a635a070b 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -141,9 +141,9 @@ public: // -- tids -- // for ops i issue - unsigned int last_tid{0}; + unsigned int next_tid{0}; ceph_tid_t get_tid() { - return (ceph_tid_t)last_tid++; + return (ceph_tid_t)next_tid++; } // PG Temp State