crimson/osd: create and send error log when has error

Signed-off-by: chunmei-liu <chunmei.liu@intel.com>
This commit is contained in:
chunmei-liu 2022-05-24 22:55:42 -07:00
parent 76cf4c7b2f
commit a48a8fb81f
3 changed files with 117 additions and 12 deletions

View File

@ -711,6 +711,63 @@ PG::do_osd_ops_execute(
}));
}));
}
seastar::future<> PG::submit_error_log(
Ref<MOSDOp> m,
const OpInfo &op_info,
ObjectContextRef obc,
const std::error_code e,
ceph_tid_t rep_tid,
eversion_t &version)
{
const osd_reqid_t &reqid = m->get_reqid();
mempool::osd_pglog::list<pg_log_entry_t> log_entries;
log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR,
obc->obs.oi.soid,
next_version(),
eversion_t(), 0,
reqid, utime_t(),
-e.value()));
if (op_info.allows_returnvec()) {
log_entries.back().set_op_returns(m->ops);
}
ceph_assert(is_primary());
if (!log_entries.empty()) {
ceph_assert(log_entries.rbegin()->version >= projected_last_update);
version = projected_last_update = log_entries.rbegin()->version;
}
ceph::os::Transaction t;
peering_state.merge_new_log_entries(
log_entries, t, peering_state.get_pg_trim_to(),
peering_state.get_min_last_complete_ondisk());
set<pg_shard_t> waiting_on;
for (auto &i : get_acting_recovery_backfill()) {
pg_shard_t peer(i);
if (peer == pg_whoami) continue;
ceph_assert(peering_state.get_peer_missing().count(peer));
ceph_assert(peering_state.has_peer_info(peer));
auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
log_entries,
spg_t(peering_state.get_info().pgid.pgid, i.shard),
pg_whoami.shard,
get_osdmap_epoch(),
get_last_peering_reset(),
rep_tid,
peering_state.get_pg_trim_to(),
peering_state.get_min_last_complete_ondisk());
send_cluster_message(peer.osd, std::move(log_m), get_osdmap_epoch());
waiting_on.insert(peer);
}
waiting_on.insert(pg_whoami);
log_entry_update_waiting_on.insert(
std::make_pair(rep_tid, log_update_t{std::move(waiting_on)}));
return shard_services.get_store().do_transaction(
get_collection_ref(), std::move(t))
.then([this] {
peering_state.update_trim_to();
return seastar::now();
});
}
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
PG::do_osd_ops(
@ -763,18 +820,59 @@ PG::do_osd_ops(
return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
std::move(reply));
},
[m, this] (const std::error_code& e) {
auto reply = crimson::make_message<MOSDOpReply>(
m.get(), -e.value(), get_osdmap_epoch(), 0, false);
if (m->ops.empty() ? 0 :
m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
reply->set_result(0);
[m, &op_info, obc, this] (const std::error_code& e) {
return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) {
auto fut = seastar::now();
epoch_t epoch = get_osdmap_epoch();
ceph_tid_t rep_tid = shard_services.get_tid();
auto last_complete = peering_state.get_info().last_complete;
if (op_info.may_write()) {
fut = submit_error_log(m, op_info, obc, e, rep_tid, version);
}
reply->set_enoent_reply_versions(
peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
return fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete, this] {
auto log_reply = [m, e, this] {
auto reply = crimson::make_message<MOSDOpReply>(
m.get(), -e.value(), get_osdmap_epoch(), 0, false);
if (m->ops.empty() ? 0 :
m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
reply->set_result(0);
}
reply->set_enoent_reply_versions(
peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
std::move(reply));
};
if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
auto it = log_entry_update_waiting_on.find(rep_tid);
ceph_assert(it != log_entry_update_waiting_on.end());
auto it2 = it->second.waiting_on.find(pg_whoami);
ceph_assert(it2 != it->second.waiting_on.end());
it->second.waiting_on.erase(it2);
if (it->second.waiting_on.empty()) {
log_entry_update_waiting_on.erase(it);
if (version != eversion_t()) {
peering_state.complete_write(version, last_complete);
}
return log_reply();
} else {
return it->second.all_committed.get_shared_future()
.then([this, &version, last_complete, log_reply = std::move(log_reply)] {
if (version != eversion_t()) {
peering_state.complete_write(version, last_complete);
}
return log_reply();
});
}
} else {
return log_reply();
}
});
});
});
}
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>

View File

@ -551,6 +551,13 @@ public:
void print(std::ostream& os) const;
void dump_primary(Formatter*);
seastar::future<> submit_error_log(
Ref<MOSDOp> m,
const OpInfo &op_info,
ObjectContextRef obc,
const std::error_code e,
ceph_tid_t rep_tid,
eversion_t &version);
private:
template<RWState::State State>

View File

@ -141,9 +141,9 @@ public:
// -- tids --
// for ops i issue
unsigned int last_tid{0};
unsigned int next_tid{0};
ceph_tid_t get_tid() {
return (ceph_tid_t)last_tid++;
return (ceph_tid_t)next_tid++;
}
// PG Temp State