Merge pull request #46435 from liu-chunmei/crimson-error-log

Crimson/osd:  add error log

Reviewed-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Liu-Chunmei 2022-06-22 15:28:48 -07:00 committed by GitHub
commit 6ae30f61cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 536 additions and 13 deletions

View File

@ -21,6 +21,8 @@ add_executable(crimson-osd
osd_operations/peering_event.cc
osd_operations/pg_advance_map.cc
osd_operations/replicated_request.cc
osd_operations/logmissing_request.cc
osd_operations/logmissing_request_reply.cc
osd_operations/background_recovery.cc
osd_operations/recovery_subrequest.cc
pg_recovery.cc

View File

@ -22,6 +22,8 @@
#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDPeeringOp.h"
#include "messages/MOSDPGUpdateLogMissing.h"
#include "messages/MOSDPGUpdateLogMissingReply.h"
#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDScrub2.h"
#include "messages/MPGStats.h"
@ -781,6 +783,12 @@ OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
case MSG_OSD_SCRUB2:
return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
case MSG_OSD_PG_UPDATE_LOG_MISSING:
return handle_update_log_missing(conn, boost::static_pointer_cast<
MOSDPGUpdateLogMissing>(m));
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
MOSDPGUpdateLogMissingReply>(m));
default:
dispatched = false;
return seastar::now();
@ -1209,6 +1217,28 @@ seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
return seastar::now();
}
seastar::future<> OSD::handle_update_log_missing(
crimson::net::ConnectionRef conn,
Ref<MOSDPGUpdateLogMissing> m)
{
m->decode_payload();
(void) start_pg_operation<LogMissingRequest>(
std::move(conn),
std::move(m));
return seastar::now();
}
seastar::future<> OSD::handle_update_log_missing_reply(
crimson::net::ConnectionRef conn,
Ref<MOSDPGUpdateLogMissingReply> m)
{
m->decode_payload();
(void) start_pg_operation<LogMissingRequestReply>(
std::move(conn),
std::move(m));
return seastar::now();
}
seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
epoch_t first)
{

View File

@ -207,7 +207,11 @@ private:
seastar::future<> handle_command(crimson::net::ConnectionRef conn,
Ref<MCommand> m);
seastar::future<> start_asok_admin();
seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
Ref<MOSDPGUpdateLogMissing> m);
seastar::future<> handle_update_log_missing_reply(
crimson::net::ConnectionRef conn,
Ref<MOSDPGUpdateLogMissingReply> m);
public:
OSD_OSDMapGate osdmap_gate;

View File

@ -44,6 +44,8 @@ enum class OperationTypeCode {
background_recovery_sub,
internal_client_request,
historic_client_request,
logmissing_request,
logmissing_request_reply,
last_op
};
@ -58,6 +60,8 @@ static constexpr const char* const OP_NAMES[] = {
"background_recovery_sub",
"internal_client_request",
"historic_client_request",
"logmissing_request",
"logmissing_request_reply",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:

View File

@ -279,6 +279,21 @@ struct EventBackendRegistry<osd::RepRequest> {
}
};
template <>
struct EventBackendRegistry<osd::LogMissingRequest> {
static std::tuple<> get_backends() {
return {/* no extenral backends */};
}
};
template <>
struct EventBackendRegistry<osd::LogMissingRequestReply> {
static std::tuple<> get_backends() {
return {/* no extenral backends */};
}
};
template <>
struct EventBackendRegistry<osd::RecoverySubRequest> {
static std::tuple<> get_backends() {

View File

@ -0,0 +1,68 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "logmissing_request.h"
#include "common/Formatter.h"
#include "crimson/osd/osd.h"
#include "crimson/osd/osd_connection_priv.h"
#include "crimson/osd/osd_operation_external_tracking.h"
#include "crimson/osd/pg.h"
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_osd);
}
}
namespace crimson::osd {
LogMissingRequest::LogMissingRequest(crimson::net::ConnectionRef&& conn,
Ref<MOSDPGUpdateLogMissing> &&req)
: conn{std::move(conn)},
req{std::move(req)}
{}
void LogMissingRequest::print(std::ostream& os) const
{
os << "LogMissingRequest("
<< "from=" << req->from
<< " req=" << *req
<< ")";
}
void LogMissingRequest::dump_detail(Formatter *f) const
{
f->open_object_section("LogMissingRequest");
f->dump_stream("req_tid") << req->get_tid();
f->dump_stream("pgid") << req->get_spg();
f->dump_unsigned("map_epoch", req->get_map_epoch());
f->dump_unsigned("min_epoch", req->get_min_epoch());
f->dump_stream("entries") << req->entries;
f->dump_stream("from") << req->from;
f->close_section();
}
ConnectionPipeline &LogMissingRequest::get_connection_pipeline()
{
return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
}
RepRequest::PGPipeline &LogMissingRequest::pp(PG &pg)
{
return pg.replicated_request_pg_pipeline;
}
seastar::future<> LogMissingRequest::with_pg(
ShardServices &shard_services, Ref<PG> pg)
{
logger().debug("{}: LogMissingRequest::with_pg", *this);
IRef ref = this;
return interruptor::with_interruption([this, pg] {
return pg->do_update_log_missing(std::move(req));
}, [ref](std::exception_ptr) { return seastar::now(); }, pg);
}
}

View File

@ -0,0 +1,68 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include "crimson/net/Connection.h"
#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd_operations/replicated_request.h"
#include "crimson/osd/pg_map.h"
#include "crimson/common/type_helpers.h"
#include "messages/MOSDPGUpdateLogMissing.h"
namespace ceph {
class Formatter;
}
namespace crimson::osd {
class ShardServices;
class OSD;
class PG;
class LogMissingRequest final : public PhasedOperationT<LogMissingRequest> {
public:
class PGPipeline {
struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
static constexpr auto type_name = "LogMissingRequest::PGPipeline::await_map";
} await_map;
struct Process : OrderedExclusivePhaseT<Process> {
static constexpr auto type_name = "LogMissingRequest::PGPipeline::process";
} process;
friend LogMissingRequest;
};
static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request;
LogMissingRequest(crimson::net::ConnectionRef&&, Ref<MOSDPGUpdateLogMissing>&&);
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;
static constexpr bool can_create() { return false; }
spg_t get_pgid() const {
return req->get_spg();
}
ConnectionPipeline &get_connection_pipeline();
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return req->get_min_epoch(); }
seastar::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
std::tuple<
ConnectionPipeline::AwaitActive::BlockingEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
ConnectionPipeline::GetPG::BlockingEvent,
PGMap::PGCreationBlockingEvent,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
> tracking_events;
private:
RepRequest::PGPipeline &pp(PG &pg);
crimson::net::ConnectionRef conn;
Ref<MOSDPGUpdateLogMissing> req;
};
}

View File

@ -0,0 +1,68 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "logmissing_request_reply.h"
#include "common/Formatter.h"
#include "crimson/osd/osd.h"
#include "crimson/osd/osd_connection_priv.h"
#include "crimson/osd/osd_operation_external_tracking.h"
#include "crimson/osd/pg.h"
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_osd);
}
}
namespace crimson::osd {
LogMissingRequestReply::LogMissingRequestReply(
crimson::net::ConnectionRef&& conn,
Ref<MOSDPGUpdateLogMissingReply> &&req)
: conn{std::move(conn)},
req{std::move(req)}
{}
void LogMissingRequestReply::print(std::ostream& os) const
{
os << "LogMissingRequestReply("
<< "from=" << req->from
<< " req=" << *req
<< ")";
}
void LogMissingRequestReply::dump_detail(Formatter *f) const
{
f->open_object_section("LogMissingRequestReply");
f->dump_stream("rep_tid") << req->get_tid();
f->dump_stream("pgid") << req->get_spg();
f->dump_unsigned("map_epoch", req->get_map_epoch());
f->dump_unsigned("min_epoch", req->get_min_epoch());
f->dump_stream("from") << req->from;
f->close_section();
}
ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline()
{
return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
}
RepRequest::PGPipeline &LogMissingRequestReply::pp(PG &pg)
{
return pg.replicated_request_pg_pipeline;
}
seastar::future<> LogMissingRequestReply::with_pg(
ShardServices &shard_services, Ref<PG> pg)
{
logger().debug("{}: LogMissingRequestReply::with_pg", *this);
IRef ref = this;
return interruptor::with_interruption([this, pg] {
return pg->do_update_log_missing_reply(std::move(req));
}, [ref](std::exception_ptr) { return seastar::now(); }, pg);
}
}

View File

@ -0,0 +1,68 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include "crimson/net/Connection.h"
#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd_operations/replicated_request.h"
#include "crimson/osd/pg_map.h"
#include "crimson/common/type_helpers.h"
#include "messages/MOSDPGUpdateLogMissingReply.h"
namespace ceph {
class Formatter;
}
namespace crimson::osd {
class ShardServices;
class OSD;
class PG;
class LogMissingRequestReply final : public PhasedOperationT<LogMissingRequestReply> {
public:
class PGPipeline {
struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
static constexpr auto type_name = "LogMissingRequestReply::PGPipeline::await_map";
} await_map;
struct Process : OrderedExclusivePhaseT<Process> {
static constexpr auto type_name = "LogMissingRequestReply::PGPipeline::process";
} process;
friend LogMissingRequestReply;
};
static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request_reply;
LogMissingRequestReply(crimson::net::ConnectionRef&&, Ref<MOSDPGUpdateLogMissingReply>&&);
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;
static constexpr bool can_create() { return false; }
spg_t get_pgid() const {
return req->get_spg();
}
ConnectionPipeline &get_connection_pipeline();
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return req->get_min_epoch(); }
seastar::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
std::tuple<
ConnectionPipeline::AwaitActive::BlockingEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
ConnectionPipeline::GetPG::BlockingEvent,
PGMap::PGCreationBlockingEvent,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
> tracking_events;
private:
RepRequest::PGPipeline &pp(PG &pg);
crimson::net::ConnectionRef conn;
Ref<MOSDPGUpdateLogMissingReply> req;
};
}

View File

@ -712,6 +712,63 @@ PG::do_osd_ops_execute(
}));
}));
}
seastar::future<> PG::submit_error_log(
Ref<MOSDOp> m,
const OpInfo &op_info,
ObjectContextRef obc,
const std::error_code e,
ceph_tid_t rep_tid,
eversion_t &version)
{
const osd_reqid_t &reqid = m->get_reqid();
mempool::osd_pglog::list<pg_log_entry_t> log_entries;
log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR,
obc->obs.oi.soid,
next_version(),
eversion_t(), 0,
reqid, utime_t(),
-e.value()));
if (op_info.allows_returnvec()) {
log_entries.back().set_op_returns(m->ops);
}
ceph_assert(is_primary());
if (!log_entries.empty()) {
ceph_assert(log_entries.rbegin()->version >= projected_last_update);
version = projected_last_update = log_entries.rbegin()->version;
}
ceph::os::Transaction t;
peering_state.merge_new_log_entries(
log_entries, t, peering_state.get_pg_trim_to(),
peering_state.get_min_last_complete_ondisk());
set<pg_shard_t> waiting_on;
for (auto &i : get_acting_recovery_backfill()) {
pg_shard_t peer(i);
if (peer == pg_whoami) continue;
ceph_assert(peering_state.get_peer_missing().count(peer));
ceph_assert(peering_state.has_peer_info(peer));
auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
log_entries,
spg_t(peering_state.get_info().pgid.pgid, i.shard),
pg_whoami.shard,
get_osdmap_epoch(),
get_last_peering_reset(),
rep_tid,
peering_state.get_pg_trim_to(),
peering_state.get_min_last_complete_ondisk());
send_cluster_message(peer.osd, std::move(log_m), get_osdmap_epoch());
waiting_on.insert(peer);
}
waiting_on.insert(pg_whoami);
log_entry_update_waiting_on.insert(
std::make_pair(rep_tid, log_update_t{std::move(waiting_on)}));
return shard_services.get_store().do_transaction(
get_collection_ref(), std::move(t))
.then([this] {
peering_state.update_trim_to();
return seastar::now();
});
}
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
PG::do_osd_ops(
@ -764,18 +821,59 @@ PG::do_osd_ops(
return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
std::move(reply));
},
[m, this] (const std::error_code& e) {
auto reply = crimson::make_message<MOSDOpReply>(
m.get(), -e.value(), get_osdmap_epoch(), 0, false);
if (m->ops.empty() ? 0 :
m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
reply->set_result(0);
[m, &op_info, obc, this] (const std::error_code& e) {
return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) {
auto fut = seastar::now();
epoch_t epoch = get_osdmap_epoch();
ceph_tid_t rep_tid = shard_services.get_tid();
auto last_complete = peering_state.get_info().last_complete;
if (op_info.may_write()) {
fut = submit_error_log(m, op_info, obc, e, rep_tid, version);
}
reply->set_enoent_reply_versions(
peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
return fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete, this] {
auto log_reply = [m, e, this] {
auto reply = crimson::make_message<MOSDOpReply>(
m.get(), -e.value(), get_osdmap_epoch(), 0, false);
if (m->ops.empty() ? 0 :
m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
reply->set_result(0);
}
reply->set_enoent_reply_versions(
peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
std::move(reply));
};
if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
auto it = log_entry_update_waiting_on.find(rep_tid);
ceph_assert(it != log_entry_update_waiting_on.end());
auto it2 = it->second.waiting_on.find(pg_whoami);
ceph_assert(it2 != it->second.waiting_on.end());
it->second.waiting_on.erase(it2);
if (it->second.waiting_on.empty()) {
log_entry_update_waiting_on.erase(it);
if (version != eversion_t()) {
peering_state.complete_write(version, last_complete);
}
return log_reply();
} else {
return it->second.all_committed.get_shared_future()
.then([this, &version, last_complete, log_reply = std::move(log_reply)] {
if (version != eversion_t()) {
peering_state.complete_write(version, last_complete);
}
return log_reply();
});
}
} else {
return log_reply();
}
});
});
});
}
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
@ -1122,6 +1220,78 @@ void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn,
}
}
PG::interruptible_future<> PG::do_update_log_missing(
Ref<MOSDPGUpdateLogMissing> m)
{
if (__builtin_expect(stopping, false)) {
return seastar::make_exception_future<>(
crimson::common::system_shutdown_exception());
}
ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
ObjectStore::Transaction t;
std::optional<eversion_t> op_trim_to, op_roll_forward_to;
if (m->pg_trim_to != eversion_t())
op_trim_to = m->pg_trim_to;
if (m->pg_roll_forward_to != eversion_t())
op_roll_forward_to = m->pg_roll_forward_to;
logger().debug("op_trim_to = {}, op_roll_forward_to = {}",
op_trim_to, op_roll_forward_to);
peering_state.append_log_entries_update_missing(
m->entries, t, op_trim_to, op_roll_forward_to);
return interruptor::make_interruptible(shard_services.get_store().do_transaction(
coll_ref, std::move(t))).then_interruptible(
[m, lcod=peering_state.get_info().last_complete, this] {
if (!peering_state.pg_has_reset_since(m->get_epoch())) {
peering_state.update_last_complete_ondisk(lcod);
auto reply =
crimson::make_message<MOSDPGUpdateLogMissingReply>(
spg_t(peering_state.get_info().pgid.pgid, get_primary().shard),
pg_whoami.shard,
m->get_epoch(),
m->min_epoch,
m->get_tid(),
lcod);
reply->set_priority(CEPH_MSG_PRIO_HIGH);
return m->get_connection()->send(std::move(reply));
}
return seastar::now();
});
}
PG::interruptible_future<> PG::do_update_log_missing_reply(
Ref<MOSDPGUpdateLogMissingReply> m)
{
logger().debug("{}: got reply from {}", __func__, m->get_from());
auto it = log_entry_update_waiting_on.find(m->get_tid());
if (it != log_entry_update_waiting_on.end()) {
if (it->second.waiting_on.count(m->get_from())) {
it->second.waiting_on.erase(m->get_from());
if (m->last_complete_ondisk != eversion_t()) {
peering_state.update_peer_last_complete_ondisk(
m->get_from(), m->last_complete_ondisk);
}
} else {
logger().error("{} : {} got reply {} from shard we are not waiting for ",
__func__, peering_state.get_info().pgid, *m, m->get_from());
}
if (it->second.waiting_on.empty()) {
it->second.all_committed.set_value();
it->second.all_committed = {};
log_entry_update_waiting_on.erase(it);
}
} else {
logger().error("{} : {} got reply {} on unknown tid {}",
__func__, peering_state.get_info().pgid, *m, m->get_tid());
}
return seastar::now();
}
bool PG::old_peering_msg(
const epoch_t reply_epoch,
const epoch_t query_epoch) const

View File

@ -26,6 +26,8 @@
#include "crimson/osd/pg_interval_interrupt_condition.h"
#include "crimson/osd/ops_executer.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/logmissing_request.h"
#include "crimson/osd/osd_operations/logmissing_request_reply.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/replicated_request.h"
#include "crimson/osd/osd_operations/background_recovery.h"
@ -542,9 +544,20 @@ public:
interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
void handle_rep_op_reply(crimson::net::ConnectionRef conn,
const MOSDRepOpReply& m);
interruptible_future<> do_update_log_missing(Ref<MOSDPGUpdateLogMissing> m);
interruptible_future<> do_update_log_missing_reply(
Ref<MOSDPGUpdateLogMissingReply> m);
void print(std::ostream& os) const;
void dump_primary(Formatter*);
seastar::future<> submit_error_log(
Ref<MOSDOp> m,
const OpInfo &op_info,
ObjectContextRef obc,
const std::error_code e,
ceph_tid_t rep_tid,
eversion_t &version);
private:
template<RWState::State State>
@ -732,6 +745,8 @@ private:
template <class T>
friend class PeeringEvent;
friend class RepRequest;
friend class LogMissingRequest;
friend class LogMissingRequestReply;
friend class BackfillRecovery;
friend struct PGFacade;
friend class InternalClientRequest;
@ -761,6 +776,11 @@ private:
BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline;
friend class IOInterruptCondition;
struct log_update_t {
std::set<pg_shard_t> waiting_on;
seastar::shared_promise<> all_committed;
};
std::map<ceph_tid_t, log_update_t> log_entry_update_waiting_on;
};
struct PG::do_osd_ops_params_t {

View File

@ -53,7 +53,7 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
throw crimson::common::actingset_changed(peering->is_primary);
}
const ceph_tid_t tid = next_txn_id++;
const ceph_tid_t tid = shard_services.get_tid();
auto pending_txn =
pending_trans.try_emplace(tid, pg_shards.size(), osd_op_p.at_version).first;
bufferlist encoded_txn;

View File

@ -37,7 +37,6 @@ private:
std::vector<pg_log_entry_t>&& log_entries) final;
const pg_t pgid;
const pg_shard_t whoami;
ceph_tid_t next_txn_id = 0;
class pending_on_t : public seastar::weakly_referencable<pending_on_t> {
public:
pending_on_t(size_t pending, const eversion_t& at_version)

View File

@ -139,6 +139,13 @@ public:
return dispatch_context({}, std::move(ctx));
}
// -- tids --
// for ops i issue
unsigned int next_tid{0};
ceph_tid_t get_tid() {
return (ceph_tid_t)next_tid++;
}
// PG Temp State
private:
// TODO: hook into map processing and some kind of heartbeat/peering