1
0
mirror of https://github.com/ceph/ceph synced 2025-04-01 23:02:17 +00:00

Merge pull request from xxhdx1985126/wip-crimson-recovery-pr

crimson: pglog based recovery ---- part 1

Reviewed-by: Samuel Just <sjust@redhat.com>
Reviewed-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2020-04-28 17:22:06 +08:00 committed by GitHub
commit 9486206f5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 3375 additions and 114 deletions

View File

@ -15,7 +15,6 @@
#ifndef ASYNC_RESERVER_H
#define ASYNC_RESERVER_H
#include "common/Finisher.h"
#include "common/Formatter.h"
#define rdout(x) lgeneric_subdout(cct,reserver,x)
@ -27,10 +26,10 @@
* linear with respect to the total number of priorities used
* over all time.
*/
template <typename T>
template <typename T, typename F>
class AsyncReserver {
CephContext *cct;
Finisher *f;
F *f;
unsigned max_allowed;
unsigned min_priority;
ceph::mutex lock = ceph::make_mutex("AsyncReserver::lock");
@ -122,7 +121,7 @@ class AsyncReserver {
public:
AsyncReserver(
CephContext *cct,
Finisher *f,
F *f,
unsigned max_allowed,
unsigned min_priority = 0)
: cct(cct),

View File

@ -5348,7 +5348,11 @@ std::vector<Option> get_global_options() {
Option("crimson_osd_obc_lru_size", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(10)
.set_description("Number of obcs to cache")
.set_description("Number of obcs to cache"),
Option("crimson_osd_scheduler_concurrency", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description("The maximum number concurrent IO operations, 0 for unlimited")
});
}

View File

@ -221,6 +221,30 @@ AlienStore::read(CollectionRef ch,
});
}
AlienStore::read_errorator::future<ceph::bufferlist>
AlienStore::readv(CollectionRef ch,
const ghobject_t& oid,
interval_set<uint64_t>& m,
uint32_t op_flags)
{
logger().debug("{}", __func__);
return seastar::do_with(ceph::bufferlist{},
[this, ch, oid, &m, op_flags](auto& bl) {
return tp->submit([this, ch, oid, &m, op_flags, &bl] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->readv(c->collection, oid, m, bl, op_flags);
}).then([&bl](int r) -> read_errorator::future<ceph::bufferlist> {
if (r == -ENOENT) {
return crimson::ct_error::enoent::make();
} else if (r == -EIO) {
return crimson::ct_error::input_output_error::make();
} else {
return read_errorator::make_ready_future<ceph::bufferlist>(std::move(bl));
}
});
});
}
AlienStore::get_attr_errorator::future<ceph::bufferptr>
AlienStore::get_attr(CollectionRef ch,
const ghobject_t& oid,

View File

@ -53,6 +53,11 @@ public:
uint64_t offset,
size_t len,
uint32_t op_flags = 0) final;
read_errorator::future<ceph::bufferlist> readv(CollectionRef c,
const ghobject_t& oid,
interval_set<uint64_t>& m,
uint32_t op_flags = 0) final;
get_attr_errorator::future<ceph::bufferptr> get_attr(CollectionRef c,
const ghobject_t& oid,

View File

@ -198,6 +198,27 @@ CyanStore::read_errorator::future<ceph::bufferlist> CyanStore::read(
return read_errorator::make_ready_future<ceph::bufferlist>(o->read(offset, l));
}
CyanStore::read_errorator::future<ceph::bufferlist> CyanStore::readv(
CollectionRef ch,
const ghobject_t& oid,
interval_set<uint64_t>& m,
uint32_t op_flags)
{
return seastar::do_with(ceph::bufferlist{},
[this, ch, oid, &m, op_flags](auto& bl) {
return crimson::do_for_each(m,
[this, ch, oid, op_flags, &bl](auto& p) {
return read(ch, oid, p.first, p.second, op_flags)
.safe_then([this, &bl](auto ret) {
bl.claim_append(ret);
});
}).safe_then([&bl] {
return read_errorator::make_ready_future<ceph::bufferlist>(std::move(bl));
});
});
}
CyanStore::get_attr_errorator::future<ceph::bufferptr> CyanStore::get_attr(
CollectionRef ch,
const ghobject_t& oid,

View File

@ -84,6 +84,12 @@ public:
uint64_t offset,
size_t len,
uint32_t op_flags = 0) final;
read_errorator::future<ceph::bufferlist> readv(
CollectionRef c,
const ghobject_t& oid,
interval_set<uint64_t>& m,
uint32_t op_flags = 0) final;
get_attr_errorator::future<ceph::bufferptr> get_attr(
CollectionRef c,
const ghobject_t& oid,

View File

@ -92,6 +92,11 @@ public:
uint64_t offset,
size_t len,
uint32_t op_flags = 0) = 0;
virtual read_errorator::future<ceph::bufferlist> readv(
CollectionRef c,
const ghobject_t& oid,
interval_set<uint64_t>& m,
uint32_t op_flags = 0) = 0;
using get_attr_errorator = crimson::errorator<
crimson::ct_error::enoent,

View File

@ -18,6 +18,13 @@ add_executable(crimson-osd
osd_operations/peering_event.cc
osd_operations/pg_advance_map.cc
osd_operations/replicated_request.cc
osd_operations/background_recovery.cc
osd_operations/recovery_subrequest.cc
pg_recovery.cc
recovery_backend.cc
replicated_recovery_backend.cc
scheduler/scheduler.cc
scheduler/mclock_scheduler.cc
osdmap_gate.cc
pg_map.cc
objclass.cc
@ -33,7 +40,11 @@ add_executable(crimson-osd
watch.cc
)
target_link_libraries(crimson-osd
crimson-common crimson-os crimson fmt::fmt)
crimson-common
crimson-os
crimson
fmt::fmt
dmclock::dmclock)
set_target_properties(crimson-osd PROPERTIES
POSITION_INDEPENDENT_CODE ${EXE_LINKER_USE_PIE})
install(TARGETS crimson-osd DESTINATION bin)

View File

@ -697,12 +697,14 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op)
return backend.create(os, osd_op, txn);
}, true);
case CEPH_OSD_OP_WRITE:
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.write(os, osd_op, txn);
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
osd_op_params = osd_op_params_t();
return backend.write(os, osd_op, txn, *osd_op_params);
}, true);
case CEPH_OSD_OP_WRITEFULL:
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.writefull(os, osd_op, txn);
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
osd_op_params = osd_op_params_t();
return backend.writefull(os, osd_op, txn, *osd_op_params);
}, true);
case CEPH_OSD_OP_SETALLOCHINT:
return osd_op_errorator::now();
@ -746,8 +748,9 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op)
return crimson::ct_error::operation_not_supported::make();
}
#endif
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.omap_set_vals(os, osd_op, txn);
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
osd_op_params = osd_op_params_t();
return backend.omap_set_vals(os, osd_op, txn, *osd_op_params);
}, true);
// watch/notify

View File

@ -84,6 +84,7 @@ private:
PG& pg;
PGBackend& backend;
Ref<MOSDOp> msg;
std::optional<osd_op_params_t> osd_op_params;
bool user_modify = false;
ceph::os::Transaction txn;
@ -225,20 +226,22 @@ auto OpsExecuter::with_effect_on_obc(
template <typename Func>
OpsExecuter::osd_op_errorator::future<> OpsExecuter::submit_changes(Func&& f) && {
assert(obc);
osd_op_params_t osd_op_params(std::move(msg));
if (!osd_op_params) {
osd_op_params = osd_op_params_t();
}
osd_op_params->req = std::move(msg);
eversion_t at_version = pg.next_version();
osd_op_params.at_version = at_version;
osd_op_params.pg_trim_to = pg.get_pg_trim_to();
osd_op_params.min_last_complete_ondisk = pg.get_min_last_complete_ondisk();
osd_op_params.last_complete = pg.get_info().last_complete;
osd_op_params->at_version = at_version;
osd_op_params->pg_trim_to = pg.get_pg_trim_to();
osd_op_params->min_last_complete_ondisk = pg.get_min_last_complete_ondisk();
osd_op_params->last_complete = pg.get_info().last_complete;
if (user_modify)
osd_op_params.user_at_version = at_version.version;
osd_op_params->user_at_version = at_version.version;
if (__builtin_expect(op_effects.empty(), true)) {
return std::forward<Func>(f)(std::move(txn), std::move(obc), std::move(osd_op_params));
return std::forward<Func>(f)(std::move(txn), std::move(obc), std::move(*osd_op_params));
}
return std::forward<Func>(f)(std::move(txn), std::move(obc), std::move(osd_op_params)).safe_then([this] {
return std::forward<Func>(f)(std::move(txn), std::move(obc), std::move(*osd_op_params)).safe_then([this] {
// let's do the cleaning of `op_effects` in destructor
return crimson::do_for_each(op_effects, [] (auto& op_effect) {
return op_effect->execute();

View File

@ -21,6 +21,11 @@
#include "messages/MOSDMap.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGPull.h"
#include "messages/MOSDPGPush.h"
#include "messages/MOSDPGPushReply.h"
#include "messages/MOSDPGRecoveryDelete.h"
#include "messages/MOSDPGRecoveryDeleteReply.h"
#include "messages/MOSDRepOpReply.h"
#include "messages/MPGStats.h"
@ -32,7 +37,6 @@
#include "crimson/mon/MonClient.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
#include "crimson/os/cyanstore/cyan_object.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/os/futurized_store.h"
#include "crimson/osd/heartbeat.h"
@ -44,6 +48,7 @@
#include "crimson/osd/osd_operations/compound_peering_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operations/recovery_subrequest.h"
#include "crimson/osd/osd_operations/replicated_request.h"
namespace {
@ -585,6 +590,16 @@ seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
return seastar::now();
case MSG_COMMAND:
return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
case MSG_OSD_PG_PULL:
[[fallthrough]];
case MSG_OSD_PG_PUSH:
[[fallthrough]];
case MSG_OSD_PG_PUSH_REPLY:
[[fallthrough]];
case MSG_OSD_PG_RECOVERY_DELETE:
[[fallthrough]];
case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
case MSG_OSD_PG_LEASE:
[[fallthrough]];
case MSG_OSD_PG_LEASE_ACK:
@ -595,6 +610,10 @@ seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
[[fallthrough]];
case MSG_OSD_PG_QUERY2:
[[fallthrough]];
case MSG_OSD_BACKFILL_RESERVE:
[[fallthrough]];
case MSG_OSD_RECOVERY_RESERVE:
[[fallthrough]];
case MSG_OSD_PG_LOG:
return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
case MSG_OSD_REPOP:
@ -995,6 +1014,16 @@ seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn,
return seastar::now();
}
seastar::future<> OSD::handle_recovery_subreq(crimson::net::Connection* conn,
Ref<MOSDFastDispatchOp> m)
{
shard_services.start_operation<RecoverySubRequest>(
*this,
conn->get_shared(),
std::move(m));
return seastar::now();
}
bool OSD::should_restart() const
{
if (!osdmap->is_up(whoami)) {

View File

@ -180,6 +180,9 @@ private:
Ref<MOSDRepOpReply> m);
seastar::future<> handle_peering_op(crimson::net::Connection* conn,
Ref<MOSDPeeringOp> m);
seastar::future<> handle_recovery_subreq(crimson::net::Connection* conn,
Ref<MOSDFastDispatchOp> m);
seastar::future<> committed_osd_maps(version_t first,
version_t last,

View File

@ -51,6 +51,85 @@ void Blocker::dump(ceph::Formatter* f) const
f->close_section();
}
void AggregateBlocker::dump_detail(ceph::Formatter *f) const
{
f->open_array_section("parent_blockers");
for (auto b : parent_blockers) {
f->open_object_section("parent_blocker");
b->dump(f);
f->close_section();
}
f->close_section();
}
OperationThrottler::OperationThrottler(ConfigProxy &conf)
: scheduler(crimson::osd::scheduler::make_scheduler(conf))
{
conf.add_observer(this);
update_from_config(conf);
}
void OperationThrottler::wake()
{
while ((!max_in_progress || in_progress < max_in_progress) &&
!scheduler->empty()) {
auto item = scheduler->dequeue();
item.wake.set_value();
++in_progress;
--pending;
}
}
void OperationThrottler::release_throttle()
{
ceph_assert(in_progress > 0);
--in_progress;
wake();
}
blocking_future<> OperationThrottler::acquire_throttle(
crimson::osd::scheduler::params_t params)
{
crimson::osd::scheduler::item_t item{params, seastar::promise<>()};
auto fut = item.wake.get_future();
scheduler->enqueue(std::move(item));
return make_blocking_future(std::move(fut));
}
void OperationThrottler::dump_detail(Formatter *f) const
{
f->dump_unsigned("max_in_progress", max_in_progress);
f->dump_unsigned("in_progress", in_progress);
f->open_object_section("scheduler");
{
scheduler->dump(*f);
}
f->close_section();
}
void OperationThrottler::update_from_config(const ConfigProxy &conf)
{
max_in_progress = conf.get_val<uint64_t>("crimson_osd_scheduler_concurrency");
wake();
}
const char** OperationThrottler::get_tracked_conf_keys() const
{
static const char* KEYS[] = {
"crimson_osd_scheduler_concurrency",
NULL
};
return KEYS;
}
void OperationThrottler::handle_conf_change(
const ConfigProxy& conf,
const std::set<std::string> &changed)
{
update_from_config(conf);
}
void OrderedPipelinePhase::Handle::exit()
{
if (phase) {

View File

@ -14,6 +14,7 @@
#include <seastar/core/future.hh>
#include "include/ceph_assert.h"
#include "crimson/osd/scheduler/scheduler.h"
namespace ceph {
class Formatter;
@ -23,12 +24,14 @@ namespace crimson::osd {
enum class OperationTypeCode {
client_request = 0,
peering_event = 1,
compound_peering_request = 2,
pg_advance_map = 3,
pg_creation = 4,
replicated_request = 5,
last_op = 6
peering_event,
compound_peering_request,
pg_advance_map,
pg_creation,
replicated_request,
background_recovery,
background_recovery_sub,
last_op
};
static constexpr const char* const OP_NAMES[] = {
@ -38,6 +41,8 @@ static constexpr const char* const OP_NAMES[] = {
"pg_advance_map",
"pg_creation",
"replicated_request",
"background_recovery",
"background_recovery_sub",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
@ -57,21 +62,39 @@ class Blocker;
* Provides an abstraction for registering and unregistering a blocker
* for the duration of a future becoming available.
*/
template <typename... T>
class blocking_future {
template <typename Fut>
class blocking_future_detail {
friend class Operation;
friend class Blocker;
Blocker *blocker;
seastar::future<T...> fut;
blocking_future(Blocker *b, seastar::future<T...> &&f)
Fut fut;
blocking_future_detail(Blocker *b, Fut &&f)
: blocker(b), fut(std::move(f)) {}
template <typename... V, typename... U>
friend blocking_future<V...> make_ready_blocking_future(U&&... args);
friend blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... args);
template <typename U>
friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
template <typename U>
friend class blocking_future_detail;
public:
template <typename F>
auto then(F &&f) && {
using result = decltype(std::declval<Fut>().then(f));
return blocking_future_detail<seastar::futurize_t<result>>(
blocker,
std::move(fut).then(std::forward<F>(f)));
}
};
template <typename... T>
using blocking_future = blocking_future_detail<seastar::future<T...>>;
template <typename... V, typename... U>
blocking_future<V...> make_ready_blocking_future(U&&... args) {
blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... args) {
return blocking_future<V...>(
nullptr,
seastar::make_ready_future<V...>(std::forward<U>(args)...));
@ -88,7 +111,7 @@ protected:
public:
template <typename... T>
blocking_future<T...> make_blocking_future(seastar::future<T...> &&f) {
return blocking_future(this, std::move(f));
return blocking_future<T...>(this, std::move(f));
}
void dump(ceph::Formatter *f) const;
@ -108,6 +131,36 @@ public:
virtual ~BlockerT() = default;
};
class AggregateBlocker : public BlockerT<AggregateBlocker> {
vector<Blocker*> parent_blockers;
protected:
void dump_detail(ceph::Formatter *f) const final;
public:
AggregateBlocker(vector<Blocker*> &&parent_blockers)
: parent_blockers(std::move(parent_blockers)) {}
static constexpr const char *type_name = "AggregateBlocker";
};
template <typename T>
blocking_future<> join_blocking_futures(T &&t) {
vector<Blocker*> blockers;
blockers.reserve(t.size());
for (auto &&bf: t) {
blockers.push_back(bf.blocker);
bf.blocker = nullptr;
}
auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
return agg->make_blocking_future(
seastar::parallel_for_each(
std::forward<T>(t),
[](auto &&bf) {
return std::move(bf.fut);
}).then([agg=std::move(agg)] {
return seastar::now();
}));
}
/**
* Common base for all crimson-osd operations. Mainly provides
* an interface for registering ops in flight and dumping
@ -226,6 +279,70 @@ public:
}
};
/**
* Throttles set of currently running operations
*
* Very primitive currently, assumes all ops are equally
* expensive and simply limits the number that can be
* concurrently active.
*/
class OperationThrottler : public Blocker, md_config_obs_t {
public:
OperationThrottler(ConfigProxy &conf);
const char** get_tracked_conf_keys() const final;
void handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed) final;
void update_from_config(const ConfigProxy &conf);
template <typename F>
auto with_throttle(
OperationRef op,
crimson::osd::scheduler::params_t params,
F &&f) {
if (!max_in_progress) return f();
auto fut = acquire_throttle(params);
return op->with_blocking_future(std::move(fut))
.then(std::forward<F>(f))
.then([this](auto x) {
release_throttle();
return x;
});
}
template <typename F>
seastar::future<> with_throttle_while(
OperationRef op,
crimson::osd::scheduler::params_t params,
F &&f) {
return with_throttle(op, params, f).then([this, params, op, f](bool cont) {
if (cont)
return with_throttle_while(op, params, f);
else
return seastar::now();
});
}
protected:
void dump_detail(Formatter *f) const final;
const char *get_type_name() const final {
return "OperationThrottler";
}
private:
crimson::osd::scheduler::SchedulerRef scheduler;
uint64_t max_in_progress = 0;
uint64_t in_progress = 0;
uint64_t pending = 0;
void wake();
blocking_future<> acquire_throttle(
crimson::osd::scheduler::params_t params);
void release_throttle();
};
/**
* Ensures that at most one op may consider itself in the phase at a time.
* Ops will see enter() unblock in the order in which they tried to enter

View File

@ -0,0 +1,71 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <seastar/core/future.hh>
#include "messages/MOSDOp.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/shard_services.h"
#include "common/Formatter.h"
#include "crimson/osd/osd_operations/background_recovery.h"
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_osd);
}
}
namespace crimson::osd {
BackgroundRecovery::BackgroundRecovery(
Ref<PG> pg,
ShardServices &ss,
epoch_t epoch_started,
crimson::osd::scheduler::scheduler_class_t scheduler_class)
: pg(pg), ss(ss), epoch_started(epoch_started),
scheduler_class(scheduler_class)
{}
seastar::future<bool> BackgroundRecovery::do_recovery()
{
if (pg->has_reset_since(epoch_started))
return seastar::make_ready_future<bool>(false);
return with_blocking_future(
pg->get_recovery_handler()->start_recovery_ops(
crimson::common::local_conf()->osd_recovery_max_single_start));
}
void BackgroundRecovery::print(std::ostream &lhs) const
{
lhs << "BackgroundRecovery(" << pg->get_pgid() << ")";
}
void BackgroundRecovery::dump_detail(Formatter *f) const
{
f->dump_stream("pgid") << pg->get_pgid();
f->open_object_section("recovery_detail");
{
// TODO pg->dump_recovery_state(f);
}
f->close_section();
}
seastar::future<> BackgroundRecovery::start()
{
logger().debug("{}: start", *this);
IRef ref = this;
return ss.throttler.with_throttle_while(
this, get_scheduler_params(), [this] {
return do_recovery();
}).handle_exception_type([ref, this](const std::system_error& err) {
if (err.code() == std::make_error_code(std::errc::interrupted)) {
logger().debug("{} recovery interruped: {}", *pg, err.what());
return seastar::now();
}
return seastar::make_exception_future<>(err);
});
}
}

View File

@ -0,0 +1,46 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include "crimson/net/Connection.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/common/type_helpers.h"
class MOSDOp;
namespace crimson::osd {
class PG;
class ShardServices;
class BackgroundRecovery final : public OperationT<BackgroundRecovery> {
public:
static constexpr OperationTypeCode type = OperationTypeCode::background_recovery;
BackgroundRecovery(
Ref<PG> pg,
ShardServices &ss,
epoch_t epoch_started,
crimson::osd::scheduler::scheduler_class_t scheduler_class);
void print(std::ostream &) const final;
void dump_detail(Formatter *f) const final;
seastar::future<> start();
private:
Ref<PG> pg;
ShardServices &ss;
epoch_t epoch_started;
crimson::osd::scheduler::scheduler_class_t scheduler_class;
auto get_scheduler_params() const {
return crimson::osd::scheduler::params_t{
1, // cost
0, // owner
scheduler_class
};
}
seastar::future<bool> do_recovery();
};
}

View File

@ -13,9 +13,11 @@ struct osd_op_params_t {
eversion_t pg_trim_to;
eversion_t min_last_complete_ondisk;
eversion_t last_complete;
version_t user_at_version;
version_t user_at_version = 0;
bool user_modify = false;
ObjectCleanRegions clean_regions;
osd_op_params_t() = default;
osd_op_params_t(Ref<MOSDOp>&& req) : req(req) {}
osd_op_params_t(Ref<MOSDOp>&& req, eversion_t at_version, eversion_t pg_trim_to,
eversion_t mlcod, eversion_t lc, version_t user_at_version) :

View File

@ -53,7 +53,16 @@ seastar::future<> PeeringEvent::start()
logger().debug("{}: start", *this);
IRef ref = this;
return get_pg().then([this](Ref<PG> pg) {
return [this] {
if (delay) {
return seastar::sleep(std::chrono::milliseconds(
std::lround(delay*1000)));
} else {
return seastar::now();
}
}().then([this] {
return get_pg();
}).then([this](Ref<PG> pg) {
if (!pg) {
logger().warn("{}: pg absent, did not create", *this);
on_pg_absent();

View File

@ -44,6 +44,7 @@ protected:
PeeringCtx ctx;
pg_shard_t from;
spg_t pgid;
float delay = 0;
PGPeeringEvent evt;
const pg_shard_t get_from() const {
@ -73,7 +74,17 @@ public:
pgid(pgid),
evt(std::forward<Args>(args)...)
{}
template <typename... Args>
PeeringEvent(
ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid,
float delay, Args&&... args) :
shard_services(shard_services),
ctx{ceph_release_t::octopus},
from(from),
pgid(pgid),
delay(delay),
evt(std::forward<Args>(args)...)
{}
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;

View File

@ -0,0 +1,29 @@
#include <fmt/format.h>
#include <fmt/ostream.h>
#include "crimson/osd/osd_operations/recovery_subrequest.h"
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_osd);
}
}
namespace crimson::osd {
seastar::future<> RecoverySubRequest::start() {
logger().debug("{}: start", *this);
IRef opref = this;
return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()))
.then([this] (epoch_t epoch) {
return with_blocking_future(osd.wait_for_pg(m->get_spg()));
}).then([this, opref=std::move(opref)] (Ref<PG> pgref) {
return seastar::do_with(std::move(pgref), std::move(opref),
[this](auto& pgref, auto& opref) {
return pgref->get_recovery_backend()->handle_recovery_op(m);
});
});
}
}

View File

@ -0,0 +1,45 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include "osd/osd_op_util.h"
#include "crimson/net/Connection.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd.h"
#include "crimson/common/type_helpers.h"
#include "messages/MOSDPGPull.h"
#include "messages/MOSDPGPush.h"
#include "messages/MOSDPGPushReply.h"
#include "messages/MOSDPGRecoveryDelete.h"
#include "messages/MOSDPGRecoveryDeleteReply.h"
namespace crimson::osd {
class OSD;
class PG;
class RecoverySubRequest final : public OperationT<RecoverySubRequest> {
public:
static constexpr OperationTypeCode type = OperationTypeCode::background_recovery_sub;
RecoverySubRequest(OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDFastDispatchOp>&& m)
: osd(osd), conn(conn), m(m) {}
void print(std::ostream& out) const final
{
out << *m;
}
void dump_detail(Formatter *f) const final
{
}
seastar::future<> start();
private:
OSD& osd;
crimson::net::ConnectionRef conn;
Ref<MOSDFastDispatchOp> m;
};
}

View File

@ -42,12 +42,11 @@ class OSDMapGate {
}
};
// order the promises in descending order of the waited osdmap epoch,
// order the promises in ascending order of the waited osdmap epoch,
// so we can access all the waiters expecting a map whose epoch is less
// than a given epoch
// than or equal to a given epoch
using waiting_peering_t = std::map<epoch_t,
OSDMapBlocker,
std::greater<epoch_t>>;
OSDMapBlocker>;
const char *blocker_type;
waiting_peering_t waiting_peering;
epoch_t current = 0;

View File

@ -37,6 +37,8 @@
#include "crimson/osd/ops_executer.h"
#include "crimson/osd/osd_operations/osdop_params.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/pg_recovery.h"
#include "crimson/osd/replicated_recovery_backend.h"
namespace {
seastar::logger& logger() {
@ -98,12 +100,16 @@ PG::PG(
coll_ref,
shard_services,
profile)),
recovery_backend(
std::make_unique<ReplicatedRecoveryBackend>(
*this, shard_services, coll_ref, backend.get())),
recovery_handler(
std::make_unique<PGRecovery>(this)),
peering_state(
shard_services.get_cct(),
pg_shard,
pgid,
PGPool(
shard_services.get_cct(),
osdmap,
pgid.pool(),
pool,
@ -249,6 +255,8 @@ void PG::on_activate_complete()
get_osdmap_epoch(),
PeeringState::RequestBackfill{});
} else {
logger().debug("{}: no need to recover or backfill, AllReplicasRecovered",
" for pg: {}", __func__, pgid);
shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
@ -418,7 +426,7 @@ void PG::do_peering_event(
PGPeeringEvent& evt, PeeringCtx &rctx)
{
if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
logger().debug("{} handling {}", __func__, evt.get_desc());
logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
do_peering_event(evt.get_event(), rctx);
} else {
logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
@ -512,6 +520,7 @@ seastar::future<> PG::submit_transaction(const OpInfo& op_info,
logger().debug("{} op_returns: {}",
__func__, log_entries.back().op_returns);
}
log_entries.back().clean_regions = std::move(osd_op_p.clean_regions);
peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
txn, true, false);

View File

@ -10,6 +10,7 @@
#include <boost/smart_ptr/local_shared_ptr.hpp>
#include <seastar/core/future.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
#include "common/dout.h"
#include "crimson/net/Fwd.h"
@ -23,8 +24,12 @@
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/replicated_request.h"
#include "crimson/osd/osd_operations/background_recovery.h"
#include "crimson/osd/shard_services.h"
#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/pg_recovery.h"
#include "crimson/osd/pg_recovery_listener.h"
#include "crimson/osd/recovery_backend.h"
class OSDMap;
class MQuery;
@ -50,6 +55,7 @@ class ClientRequest;
class PG : public boost::intrusive_ref_counter<
PG,
boost::thread_unsafe_counter>,
public PGRecoveryListener,
PeeringState::PeeringListener,
DoutPrefixProvider
{
@ -80,11 +86,11 @@ public:
~PG();
const pg_shard_t& get_pg_whoami() const {
const pg_shard_t& get_pg_whoami() const final {
return pg_whoami;
}
const spg_t& get_pgid() const {
const spg_t& get_pgid() const final {
return pgid;
}
@ -94,7 +100,6 @@ public:
const PGBackend& get_backend() const {
return *backend;
}
// EpochSource
epoch_t get_osdmap_epoch() const final {
return peering_state.get_osdmap_epoch();
@ -174,51 +179,81 @@ public:
// will be needed for unblocking IO operations/peering
}
template <typename T>
void start_peering_event_operation(T &&evt, float delay = 0) {
shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
pgid,
delay,
std::forward<T>(evt));
}
void schedule_event_after(
PGPeeringEventRef event,
float delay) final {
ceph_assert(0 == "Not implemented yet");
start_peering_event_operation(std::move(*event), delay);
}
std::vector<pg_shard_t> get_replica_recovery_order() const final {
return peering_state.get_replica_recovery_order();
}
void request_local_background_io_reservation(
unsigned priority,
PGPeeringEventRef on_grant,
PGPeeringEventRef on_preempt) final {
ceph_assert(0 == "Not implemented yet");
shard_services.local_reserver.request_reservation(
pgid,
on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
start_peering_event_operation(std::move(*on_grant));
}) : nullptr,
priority,
on_preempt ? make_lambda_context(
[this, on_preempt=std::move(on_preempt)] (int) {
start_peering_event_operation(std::move(*on_preempt));
}) : nullptr);
}
void update_local_background_io_priority(
unsigned priority) final {
ceph_assert(0 == "Not implemented yet");
shard_services.local_reserver.update_priority(
pgid,
priority);
}
void cancel_local_background_io_reservation() final {
// Not implemented yet, but gets called on exit() from some states
shard_services.local_reserver.cancel_reservation(
pgid);
}
void request_remote_recovery_reservation(
unsigned priority,
PGPeeringEventRef on_grant,
PGPeeringEventRef on_preempt) final {
ceph_assert(0 == "Not implemented yet");
shard_services.remote_reserver.request_reservation(
pgid,
on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
start_peering_event_operation(std::move(*on_grant));
}) : nullptr,
priority,
on_preempt ? make_lambda_context(
[this, on_preempt=std::move(on_preempt)] (int) {
start_peering_event_operation(std::move(*on_preempt));
}) : nullptr);
}
void cancel_remote_recovery_reservation() final {
// Not implemented yet, but gets called on exit() from some states
shard_services.remote_reserver.cancel_reservation(
pgid);
}
void schedule_event_on_commit(
ceph::os::Transaction &t,
PGPeeringEventRef on_commit) final {
t.register_on_commit(
new LambdaContext(
[this, on_commit=std::move(on_commit)](int r){
shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
pgid,
std::move(*on_commit));
make_lambda_context(
[this, on_commit=std::move(on_commit)](int) {
start_peering_event_operation(std::move(*on_commit));
}));
}
@ -266,7 +301,7 @@ public:
// Not needed yet
}
void on_change(ceph::os::Transaction &t) final {
// Not needed yet
recovery_backend->on_peering_interval_change(t);
}
void on_activate(interval_set<snapid_t> to_trim) final;
void on_activate_complete() final;
@ -307,15 +342,16 @@ public:
return 0;
}
void on_backfill_reserved() final {
ceph_assert(0 == "Not implemented");
recovery_handler->start_background_recovery(
crimson::osd::scheduler::scheduler_class_t::background_best_effort);
}
void on_backfill_canceled() final {
ceph_assert(0 == "Not implemented");
}
void on_recovery_reserved() final {
ceph_assert(0 == "Not implemented");
recovery_handler->start_background_recovery(
crimson::osd::scheduler::scheduler_class_t::background_recovery);
}
@ -390,9 +426,18 @@ public:
// Utility
bool is_primary() const {
bool is_primary() const final {
return peering_state.is_primary();
}
bool is_peered() const final {
return peering_state.is_peered();
}
bool is_recovering() const final {
return peering_state.is_recovering();
}
bool is_backfilling() const final {
return peering_state.is_backfilling();
}
pg_stat_t get_stats() {
auto stats = peering_state.prepare_stats_for_publish(
false,
@ -509,13 +554,70 @@ public:
return eversion_t(projected_last_update.epoch,
++projected_last_update.version);
}
ShardServices& get_shard_services() final {
return shard_services;
}
private:
std::unique_ptr<PGBackend> backend;
std::unique_ptr<RecoveryBackend> recovery_backend;
std::unique_ptr<PGRecovery> recovery_handler;
PeeringState peering_state;
eversion_t projected_last_update;
public:
RecoveryBackend* get_recovery_backend() final {
return recovery_backend.get();
}
PGRecovery* get_recovery_handler() final {
return recovery_handler.get();
}
PeeringState& get_peering_state() final {
return peering_state;
}
bool has_reset_since(epoch_t epoch) const final {
return peering_state.pg_has_reset_since(epoch);
}
const pg_missing_tracker_t& get_local_missing() const {
return peering_state.get_pg_log().get_missing();
}
epoch_t get_last_peering_reset() const {
return peering_state.get_last_peering_reset();
}
const set<pg_shard_t> &get_acting_recovery_backfill() const {
return peering_state.get_acting_recovery_backfill();
}
void begin_peer_recover(pg_shard_t peer, const hobject_t oid) {
peering_state.begin_peer_recover(peer, oid);
}
uint64_t min_peer_features() const {
return peering_state.get_min_peer_features();
}
const map<hobject_t, set<pg_shard_t>>&
get_missing_loc_shards() const {
return peering_state.get_missing_loc().get_missing_locs();
}
const map<pg_shard_t, pg_missing_t> &get_shard_missing() const {
return peering_state.get_peer_missing();
}
const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const {
if (shard == pg_whoami)
return &get_local_missing();
else {
auto it = peering_state.get_peer_missing().find(shard);
if (it == peering_state.get_peer_missing().end())
return nullptr;
else
return &it->second;
}
}
int get_recovery_op_priority() const {
int64_t pri = 0;
get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
}
private:
class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
PG *pg;
@ -538,6 +640,22 @@ private:
friend class PGAdvanceMap;
friend class PeeringEvent;
friend class RepRequest;
private:
seastar::future<bool> find_unfound() {
return seastar::make_ready_future<bool>(true);
}
bool is_missing_object(const hobject_t& soid) const {
return peering_state.get_pg_log().get_missing().get_items().count(soid);
}
bool is_unreadable_object(const hobject_t &oid) const final {
return is_missing_object(oid) ||
!peering_state.get_missing_loc().readable_with_acting(
oid, get_actingset());
}
const set<pg_shard_t> &get_actingset() const {
return peering_state.get_actingset();
}
};
std::ostream& operator<<(std::ostream&, const PG& pg);

View File

@ -15,11 +15,11 @@
#include "os/Transaction.h"
#include "common/Clock.h"
#include "crimson/os/cyanstore/cyan_object.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/os/futurized_store.h"
#include "crimson/osd/osd_operation.h"
#include "replicated_backend.h"
#include "replicated_recovery_backend.h"
#include "ec_backend.h"
#include "exceptions.h"
@ -199,6 +199,7 @@ PGBackend::read(const object_info_t& oi,
return _read(oi.soid, offset, length, flags).safe_then(
[&oi](auto&& bl) -> read_errorator::future<ceph::bufferlist> {
if (const bool is_fine = _read_verify_data(oi, bl); is_fine) {
logger().debug("read: data length: {}", bl.length());
return read_errorator::make_ready_future<bufferlist>(std::move(bl));
} else {
return crimson::ct_error::object_corrupted::make();
@ -244,7 +245,8 @@ bool PGBackend::maybe_create_new_object(
seastar::future<> PGBackend::write(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn)
ceph::os::Transaction& txn,
osd_op_params_t& osd_op_params)
{
const ceph_osd_op& op = osd_op.op;
uint64_t offset = op.extent.offset;
@ -273,6 +275,13 @@ seastar::future<> PGBackend::write(
if (op.extent.truncate_size != os.oi.size) {
os.oi.size = length;
// TODO: truncate_update_size_and_usage()
if (op.extent.truncate_size > os.oi.size) {
osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size,
op.extent.truncate_size - os.oi.size);
} else {
osd_op_params.clean_regions.mark_data_region_dirty(op.extent.truncate_size,
os.oi.size - op.extent.truncate_size);
}
}
}
os.oi.truncate_seq = op.extent.truncate_seq;
@ -290,13 +299,17 @@ seastar::future<> PGBackend::write(
offset, length, std::move(buf), op.flags);
os.oi.size = std::max(offset + length, os.oi.size);
}
osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset,
op.extent.length);
return seastar::now();
}
seastar::future<> PGBackend::writefull(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn)
ceph::os::Transaction& txn,
osd_op_params_t& osd_op_params)
{
const ceph_osd_op& op = osd_op.op;
if (op.extent.length != osd_op.indata.length()) {
@ -306,11 +319,15 @@ seastar::future<> PGBackend::writefull(
const bool existing = maybe_create_new_object(os, txn);
if (existing && op.extent.length < os.oi.size) {
txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.length);
osd_op_params.clean_regions.mark_data_region_dirty(op.extent.length,
os.oi.size - op.extent.length);
}
if (op.extent.length) {
txn.write(coll->get_cid(), ghobject_t{os.oi.soid}, 0, op.extent.length,
osd_op.indata, op.flags);
os.oi.size = op.extent.length;
osd_op_params.clean_regions.mark_data_region_dirty(0,
std::max((uint64_t) op.extent.length, os.oi.size));
}
return seastar::now();
}
@ -480,6 +497,13 @@ maybe_get_omap_vals(
}
}
seastar::future<ceph::bufferlist> PGBackend::omap_get_header(
crimson::os::CollectionRef& c,
const ghobject_t& oid)
{
return store->omap_get_header(c, oid);
}
seastar::future<> PGBackend::omap_get_keys(
const ObjectState& os,
OSDOp& osd_op) const
@ -598,7 +622,8 @@ seastar::future<> PGBackend::omap_get_vals_by_keys(
seastar::future<> PGBackend::omap_set_vals(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn)
ceph::os::Transaction& txn,
osd_op_params_t& osd_op_params)
{
maybe_create_new_object(os, txn);
@ -620,5 +645,24 @@ seastar::future<> PGBackend::omap_set_vals(
//ctx->delta_stats.num_wr_kb += shift_round_up(to_set_bl.length(), 10);
os.oi.set_flag(object_info_t::FLAG_OMAP);
os.oi.clear_omap_digest();
osd_op_params.clean_regions.mark_omap_dirty();
return seastar::now();
}
seastar::future<struct stat> PGBackend::stat(
CollectionRef c,
const ghobject_t& oid) const
{
return store->stat(c, oid);
}
seastar::future<std::map<uint64_t, uint64_t>>
PGBackend::fiemap(
CollectionRef c,
const ghobject_t& oid,
uint64_t off,
uint64_t len)
{
return store->fiemap(c, oid, off, len);
}

View File

@ -7,13 +7,16 @@
#include <memory>
#include <string>
#include <boost/smart_ptr/local_shared_ptr.hpp>
#include <boost/container/flat_set.hpp>
#include "crimson/os/futurized_store.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/osd/acked_peers.h"
#include "crimson/osd/pg.h"
#include "crimson/common/shared_lru.h"
#include "osd/osd_types.h"
#include "crimson/osd/object_context.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd_operations/osdop_params.h"
struct hobject_t;
@ -36,6 +39,8 @@ protected:
using ll_read_errorator = crimson::os::FuturizedStore::read_errorator;
public:
using load_metadata_ertr = crimson::errorator<
crimson::ct_error::object_corrupted>;
PGBackend(shard_id_t shard, CollectionRef coll, crimson::os::FuturizedStore* store);
virtual ~PGBackend() = default;
static std::unique_ptr<PGBackend> create(pg_t pgid,
@ -44,7 +49,8 @@ public:
crimson::os::CollectionRef coll,
crimson::osd::ShardServices& shard_services,
const ec_profile_t& ec_profile);
using attrs_t =
std::map<std::string, ceph::bufferptr, std::less<>>;
using read_errorator = ll_read_errorator::extend<
crimson::ct_error::object_corrupted>;
read_errorator::future<ceph::bufferlist> read(
@ -70,11 +76,13 @@ public:
seastar::future<> write(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
seastar::future<> writefull(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
seastar::future<crimson::osd::acked_peers_t> mutate_object(
std::set<pg_shard_t> pg_shards,
crimson::osd::ObjectContextRef &&obc,
@ -97,6 +105,14 @@ public:
get_attr_errorator::future<ceph::bufferptr> getxattr(
const hobject_t& soid,
std::string_view key) const;
seastar::future<struct stat> stat(
CollectionRef c,
const ghobject_t& oid) const;
seastar::future<std::map<uint64_t, uint64_t>> fiemap(
CollectionRef c,
const ghobject_t& oid,
uint64_t off,
uint64_t len);
// OMAP
seastar::future<> omap_get_keys(
@ -111,18 +127,18 @@ public:
seastar::future<> omap_set_vals(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
seastar::future<ceph::bufferlist> omap_get_header(
crimson::os::CollectionRef& c,
const ghobject_t& oid);
virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
protected:
const shard_id_t shard;
CollectionRef coll;
crimson::os::FuturizedStore* store;
public:
using load_metadata_ertr = crimson::errorator<
crimson::ct_error::object_corrupted>;
struct loaded_object_md_t {
ObjectState os;
std::optional<SnapSet> ss;
@ -146,4 +162,5 @@ private:
const osd_op_params_t& osd_op_p,
epoch_t min_epoch, epoch_t max_epoch,
std::vector<pg_log_entry_t>&& log_entries) = 0;
friend class ReplicatedRecoveryBackend;
};

View File

@ -0,0 +1,389 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <fmt/format.h>
#include <fmt/ostream.h>
#include "crimson/common/type_helpers.h"
#include "crimson/osd/osd_operations/background_recovery.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/pg_backend.h"
#include "crimson/osd/pg_recovery.h"
#include "messages/MOSDPGPull.h"
#include "messages/MOSDPGPush.h"
#include "messages/MOSDPGPushReply.h"
#include "messages/MOSDPGRecoveryDelete.h"
#include "messages/MOSDPGRecoveryDeleteReply.h"
#include "osd/osd_types.h"
#include "osd/PeeringState.h"
void PGRecovery::start_background_recovery(
crimson::osd::scheduler::scheduler_class_t klass)
{
using BackgroundRecovery = crimson::osd::BackgroundRecovery;
pg->get_shard_services().start_operation<BackgroundRecovery>(
static_cast<crimson::osd::PG*>(pg),
pg->get_shard_services(),
pg->get_osdmap_epoch(),
klass);
}
crimson::osd::blocking_future<bool>
PGRecovery::start_recovery_ops(size_t max_to_start)
{
assert(pg->is_primary());
assert(pg->is_peered());
assert(!pg->get_peering_state().is_deleting());
if (!pg->is_recovering() && !pg->is_backfilling()) {
return crimson::osd::make_ready_blocking_future<bool>(false);
}
std::vector<crimson::osd::blocking_future<>> started;
started.reserve(max_to_start);
max_to_start -= start_primary_recovery_ops(max_to_start, &started);
if (max_to_start > 0) {
max_to_start -= start_replica_recovery_ops(max_to_start, &started);
}
if (max_to_start > 0) {
max_to_start -= start_backfill_ops(max_to_start, &started);
}
return crimson::osd::join_blocking_futures(std::move(started)).then(
[this] {
bool done = !pg->get_peering_state().needs_recovery();
if (done) {
crimson::get_logger(ceph_subsys_osd).debug("start_recovery_ops: AllReplicasRecovered for pg: {}",
pg->get_pgid());
using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
pg->get_shard_services().start_operation<LocalPeeringEvent>(
static_cast<crimson::osd::PG*>(pg),
pg->get_shard_services(),
pg->get_pg_whoami(),
pg->get_pgid(),
pg->get_osdmap_epoch(),
pg->get_osdmap_epoch(),
PeeringState::AllReplicasRecovered{});
}
return seastar::make_ready_future<bool>(!done);
});
}
size_t PGRecovery::start_primary_recovery_ops(
size_t max_to_start,
std::vector<crimson::osd::blocking_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
}
if (!pg->get_peering_state().have_missing()) {
pg->get_peering_state().local_recovery_complete();
return 0;
}
const auto &missing = pg->get_peering_state().get_pg_log().get_missing();
crimson::get_logger(ceph_subsys_osd).info(
"{} recovering {} in pg {}, missing {}",
__func__,
pg->get_recovery_backend()->total_recovering(),
*static_cast<crimson::osd::PG*>(pg),
missing);
unsigned started = 0;
int skipped = 0;
map<version_t, hobject_t>::const_iterator p =
missing.get_rmissing().lower_bound(pg->get_peering_state().get_pg_log().get_log().last_requested);
while (started < max_to_start && p != missing.get_rmissing().end()) {
// TODO: chain futures here to enable yielding to scheduler?
hobject_t soid;
version_t v = p->first;
auto it_objects = pg->get_peering_state().get_pg_log().get_log().objects.find(p->second);
if (it_objects != pg->get_peering_state().get_pg_log().get_log().objects.end()) {
// look at log!
pg_log_entry_t *latest = it_objects->second;
assert(latest->is_update() || latest->is_delete());
soid = latest->soid;
} else {
soid = p->second;
}
const pg_missing_item& item = missing.get_items().find(p->second)->second;
++p;
hobject_t head = soid.get_head();
crimson::get_logger(ceph_subsys_osd).info(
"{} {} item.need {} {} {} {} {}",
__func__,
soid,
item.need,
missing.is_missing(soid) ? " (missing)":"",
missing.is_missing(head) ? " (missing head)":"",
pg->get_recovery_backend()->is_recovering(soid) ? " (recovering)":"",
pg->get_recovery_backend()->is_recovering(head) ? " (recovering head)":"");
// TODO: handle lost/unfound
if (!pg->get_recovery_backend()->is_recovering(soid)) {
if (pg->get_recovery_backend()->is_recovering(head)) {
++skipped;
} else {
auto futopt = recover_missing(soid, item.need);
if (futopt) {
out->push_back(std::move(*futopt));
++started;
} else {
++skipped;
}
}
}
if (!skipped)
pg->get_peering_state().set_last_requested(v);
}
crimson::get_logger(ceph_subsys_osd).info(
"{} started {} skipped {}",
__func__,
started,
skipped);
return started;
}
size_t PGRecovery::start_replica_recovery_ops(
size_t max_to_start,
std::vector<crimson::osd::blocking_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
}
uint64_t started = 0;
assert(!pg->get_peering_state().get_acting_recovery_backfill().empty());
auto recovery_order = get_replica_recovery_order();
for (auto &peer : recovery_order) {
assert(peer != pg->get_peering_state().get_primary());
auto pm = pg->get_peering_state().get_peer_missing().find(peer);
assert(pm != pg->get_peering_state().get_peer_missing().end());
size_t m_sz = pm->second.num_missing();
crimson::get_logger(ceph_subsys_osd).debug(
"{}: peer osd.{} missing {} objects",
__func__,
peer,
m_sz);
crimson::get_logger(ceph_subsys_osd).trace(
"{}: peer osd.{} missing {}", __func__,
peer, pm->second.get_items());
// recover oldest first
const pg_missing_t &m(pm->second);
for (auto p = m.get_rmissing().begin();
p != m.get_rmissing().end() && started < max_to_start;
++p) {
const auto &soid = p->second;
if (pg->get_peering_state().get_missing_loc().is_unfound(soid)) {
crimson::get_logger(ceph_subsys_osd).debug(
"{}: object {} still unfound", __func__, soid);
continue;
}
const pg_info_t &pi = pg->get_peering_state().get_peer_info(peer);
if (soid > pi.last_backfill) {
if (!pg->get_recovery_backend()->is_recovering(soid)) {
crimson::get_logger(ceph_subsys_osd).error(
"{}: object {} in missing set for backfill (last_backfill {})"
" but not in recovering",
__func__,
soid,
pi.last_backfill);
ceph_abort();
}
continue;
}
if (pg->get_recovery_backend()->is_recovering(soid)) {
crimson::get_logger(ceph_subsys_osd).debug(
"{}: already recovering object {}", __func__, soid);
continue;
}
if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
crimson::get_logger(ceph_subsys_osd).debug(
"{}: soid {} is a delete, removing", __func__, soid);
map<hobject_t,pg_missing_item>::const_iterator r =
m.get_items().find(soid);
started += prep_object_replica_deletes(
soid, r->second.need, out);
continue;
}
if (soid.is_snap() &&
pg->get_peering_state().get_pg_log().get_missing().is_missing(
soid.get_head())) {
crimson::get_logger(ceph_subsys_osd).debug(
"{}: head {} still missing on primary",
__func__, soid.get_head());
continue;
}
if (pg->get_peering_state().get_pg_log().get_missing().is_missing(soid)) {
crimson::get_logger(ceph_subsys_osd).debug(
"{}: soid {} still missing on primary", __func__, soid);
continue;
}
crimson::get_logger(ceph_subsys_osd).debug(
"{}: recover_object_replicas({})",
__func__,
soid);
map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(
soid);
started += prep_object_replica_pushes(
soid, r->second.need, out);
}
}
return started;
}
size_t PGRecovery::start_backfill_ops(
size_t max_to_start,
std::vector<crimson::osd::blocking_future<>> *out)
{
assert(!pg->get_peering_state().get_backfill_targets().empty());
ceph_abort("not implemented!");
}
std::optional<crimson::osd::blocking_future<>> PGRecovery::recover_missing(
const hobject_t &soid, eversion_t need)
{
if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
return pg->get_recovery_backend()->get_recovering(soid).make_blocking_future(
pg->get_recovery_backend()->recover_delete(soid, need));
} else {
return pg->get_recovery_backend()->get_recovering(soid).make_blocking_future(
pg->get_recovery_backend()->recover_object(soid, need).handle_exception(
[=, soid = std::move(soid)] (auto e) {
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
return seastar::make_ready_future<>();
})
);
}
}
size_t PGRecovery::prep_object_replica_deletes(
const hobject_t& soid,
eversion_t need,
std::vector<crimson::osd::blocking_future<>> *in_progress)
{
in_progress->push_back(
pg->get_recovery_backend()->get_recovering(soid).make_blocking_future(
pg->get_recovery_backend()->push_delete(soid, need).then([=] {
object_stat_sum_t stat_diff;
stat_diff.num_objects_recovered = 1;
on_global_recover(soid, stat_diff, true);
return seastar::make_ready_future<>();
})
)
);
return 1;
}
size_t PGRecovery::prep_object_replica_pushes(
const hobject_t& soid,
eversion_t need,
std::vector<crimson::osd::blocking_future<>> *in_progress)
{
in_progress->push_back(
pg->get_recovery_backend()->get_recovering(soid).make_blocking_future(
pg->get_recovery_backend()->recover_object(soid, need).handle_exception(
[=, soid = std::move(soid)] (auto e) {
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
return seastar::make_ready_future<>();
})
)
);
return 1;
}
void PGRecovery::on_local_recover(
const hobject_t& soid,
const ObjectRecoveryInfo& recovery_info,
const bool is_delete,
ceph::os::Transaction& t)
{
pg->get_peering_state().recover_got(soid,
recovery_info.version, is_delete, t);
if (pg->is_primary()) {
if (!is_delete) {
auto& obc = pg->get_recovery_backend()->get_recovering(soid).obc; //TODO: move to pg backend?
obc->obs.exists = true;
obc->obs.oi = recovery_info.oi;
// obc is loaded the excl lock
obc->put_lock_type(RWState::RWEXCL);
assert(obc->get_recovery_read());
}
if (!pg->is_unreadable_object(soid)) {
pg->get_recovery_backend()->get_recovering(soid).set_readable();
}
}
}
void PGRecovery::on_global_recover (
const hobject_t& soid,
const object_stat_sum_t& stat_diff,
const bool is_delete)
{
pg->get_peering_state().object_recovered(soid, stat_diff);
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
if (!is_delete)
recovery_waiter.obc->drop_recovery_read();
recovery_waiter.set_recovered();
pg->get_recovery_backend()->remove_recovering(soid);
}
void PGRecovery::on_failed_recover(
const set<pg_shard_t>& from,
const hobject_t& soid,
const eversion_t& v)
{
for (auto pg_shard : from) {
if (pg_shard != pg->get_pg_whoami()) {
pg->get_peering_state().force_object_missing(pg_shard, soid, v);
}
}
}
void PGRecovery::on_peer_recover(
pg_shard_t peer,
const hobject_t &oid,
const ObjectRecoveryInfo &recovery_info)
{
crimson::get_logger(ceph_subsys_osd).debug(
"{}: {}, {} on {}", __func__, oid,
recovery_info.version, peer);
pg->get_peering_state().on_peer_recover(peer, oid, recovery_info.version);
}
void PGRecovery::_committed_pushed_object(epoch_t epoch,
eversion_t last_complete)
{
if (!pg->has_reset_since(epoch)) {
pg->get_peering_state().recovery_committed_to(last_complete);
} else {
crimson::get_logger(ceph_subsys_osd).debug(
"{} pg has changed, not touching last_complete_ondisk",
__func__);
}
}

View File

@ -0,0 +1,79 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <seastar/core/future.hh>
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/pg_recovery_listener.h"
#include "crimson/osd/scheduler/scheduler.h"
#include "crimson/osd/shard_services.h"
#include "osd/object_state.h"
class PGBackend;
class PGRecovery {
public:
PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
virtual ~PGRecovery() {}
void start_background_recovery(
crimson::osd::scheduler::scheduler_class_t klass);
crimson::osd::blocking_future<bool> start_recovery_ops(size_t max_to_start);
private:
PGRecoveryListener* pg;
size_t start_primary_recovery_ops(
size_t max_to_start,
std::vector<crimson::osd::blocking_future<>> *out);
size_t start_replica_recovery_ops(
size_t max_to_start,
std::vector<crimson::osd::blocking_future<>> *out);
size_t start_backfill_ops(
size_t max_to_start,
std::vector<crimson::osd::blocking_future<>> *out);
std::vector<pg_shard_t> get_replica_recovery_order() const {
return pg->get_replica_recovery_order();
}
std::optional<crimson::osd::blocking_future<>> recover_missing(
const hobject_t &soid, eversion_t need);
size_t prep_object_replica_deletes(
const hobject_t& soid,
eversion_t need,
std::vector<crimson::osd::blocking_future<>> *in_progress);
size_t prep_object_replica_pushes(
const hobject_t& soid,
eversion_t need,
std::vector<crimson::osd::blocking_future<>> *in_progress);
void on_local_recover(
const hobject_t& soid,
const ObjectRecoveryInfo& recovery_info,
bool is_delete,
ceph::os::Transaction& t);
void on_global_recover (
const hobject_t& soid,
const object_stat_sum_t& stat_diff,
bool is_delete);
void on_failed_recover(
const set<pg_shard_t>& from,
const hobject_t& soid,
const eversion_t& v);
void on_peer_recover(
pg_shard_t peer,
const hobject_t &oid,
const ObjectRecoveryInfo &recovery_info);
void _committed_pushed_object(epoch_t epoch,
eversion_t last_complete);
friend class ReplicatedRecoveryBackend;
seastar::future<> handle_pull(Ref<MOSDPGPull> m);
seastar::future<> handle_push(Ref<MOSDPGPush> m);
seastar::future<> handle_push_reply(Ref<MOSDPGPushReply> m);
seastar::future<> handle_recovery_delete(Ref<MOSDPGRecoveryDelete> m);
seastar::future<> handle_recovery_delete_reply(
Ref<MOSDPGRecoveryDeleteReply> m);
seastar::future<> handle_pull_response(Ref<MOSDPGPush> m);
};

View File

@ -0,0 +1,33 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include "common/hobject.h"
#include "include/types.h"
#include "osd/osd_types.h"
namespace crimson::osd {
class ShardServices;
};
class RecoveryBackend;
class PGRecovery;
class PGRecoveryListener {
public:
virtual crimson::osd::ShardServices& get_shard_services() = 0;
virtual PGRecovery* get_recovery_handler() = 0;
virtual epoch_t get_osdmap_epoch() const = 0;
virtual bool is_primary() const = 0;
virtual bool is_peered() const = 0;
virtual bool is_recovering() const = 0;
virtual bool is_backfilling() const = 0;
virtual PeeringState& get_peering_state() = 0;
virtual const pg_shard_t& get_pg_whoami() const = 0;
virtual const spg_t& get_pgid() const = 0;
virtual RecoveryBackend* get_recovery_backend() = 0;
virtual bool is_unreadable_object(const hobject_t&) const = 0;
virtual bool has_reset_since(epoch_t) const = 0;
virtual std::vector<pg_shard_t> get_replica_recovery_order() const = 0;
};

View File

@ -0,0 +1,44 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "crimson/osd/recovery_backend.h"
#include "crimson/osd/pg.h"
#include "osd/osd_types.h"
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_osd);
}
}
hobject_t RecoveryBackend::get_temp_recovery_object(
const hobject_t& target,
eversion_t version)
{
ostringstream ss;
ss << "temp_recovering_" << pg.get_info().pgid << "_" << version
<< "_" << pg.get_info().history.same_interval_since << "_" << target.snap;
hobject_t hoid = target.make_temp_hobject(ss.str());
logger().debug("{} {}", __func__, hoid);
return hoid;
}
void RecoveryBackend::clean_up(ceph::os::Transaction& t,
const std::string& why)
{
for (auto& soid : temp_contents) {
t.remove(pg.get_collection_ref()->get_cid(),
ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
}
temp_contents.clear();
for (auto& [soid, recovery_waiter] : recovering) {
if (recovery_waiter.obc) {
recovery_waiter.obc->drop_recovery_read();
recovery_waiter.interrupt(why);
}
}
recovering.clear();
}

View File

@ -0,0 +1,155 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <seastar/core/future.hh>
#include "crimson/common/type_helpers.h"
#include "crimson/os/futurized_store.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/osd/object_context.h"
#include "crimson/osd/shard_services.h"
#include "osd/osd_types.h"
namespace crimson::osd{
class PG;
}
class PGBackend;
class RecoveryBackend {
protected:
class WaitForObjectRecovery;
public:
RecoveryBackend(crimson::osd::PG& pg,
crimson::osd::ShardServices& shard_services,
crimson::os::CollectionRef coll,
PGBackend* backend)
: pg{pg},
shard_services{shard_services},
store{&shard_services.get_store()},
coll{coll},
backend{backend} {}
virtual ~RecoveryBackend() {}
WaitForObjectRecovery& get_recovering(const hobject_t& soid) {
return recovering[soid];
}
void remove_recovering(const hobject_t& soid) {
recovering.erase(soid);
}
bool is_recovering(const hobject_t& soid) {
return recovering.count(soid) != 0;
}
uint64_t total_recovering() {
return recovering.size();
}
virtual seastar::future<> handle_recovery_op(
Ref<MOSDFastDispatchOp> m) = 0;
virtual seastar::future<> recover_object(
const hobject_t& soid,
eversion_t need) = 0;
virtual seastar::future<> recover_delete(
const hobject_t& soid,
eversion_t need) = 0;
virtual seastar::future<> push_delete(
const hobject_t& soid,
eversion_t need) = 0;
void on_peering_interval_change(ceph::os::Transaction& t) {
clean_up(t, "new peering interval");
}
protected:
crimson::osd::PG& pg;
crimson::osd::ShardServices& shard_services;
crimson::os::FuturizedStore* store;
crimson::os::CollectionRef coll;
PGBackend* backend;
struct PullInfo {
pg_shard_t from;
hobject_t soid;
ObjectRecoveryProgress recovery_progress;
ObjectRecoveryInfo recovery_info;
crimson::osd::ObjectContextRef head_ctx;
crimson::osd::ObjectContextRef obc;
object_stat_sum_t stat;
bool is_complete() const {
return recovery_progress.is_complete(recovery_info);
}
};
struct PushInfo {
ObjectRecoveryProgress recovery_progress;
ObjectRecoveryInfo recovery_info;
crimson::osd::ObjectContextRef obc;
object_stat_sum_t stat;
};
class WaitForObjectRecovery : public crimson::osd::BlockerT<WaitForObjectRecovery> {
seastar::shared_promise<> readable, recovered, pulled;
std::map<pg_shard_t, seastar::shared_promise<>> pushes;
public:
static constexpr const char* type_name = "WaitForObjectRecovery";
crimson::osd::ObjectContextRef obc;
PullInfo pi;
std::map<pg_shard_t, PushInfo> pushing;
seastar::future<> wait_for_readable() {
return readable.get_shared_future();
}
seastar::future<> wait_for_pushes(pg_shard_t shard) {
return pushes[shard].get_shared_future();
}
seastar::future<> wait_for_recovered() {
return recovered.get_shared_future();
}
seastar::future<> wait_for_pull() {
return pulled.get_shared_future();
}
void set_readable() {
readable.set_value();
}
void set_recovered() {
recovered.set_value();
}
void set_pushed(pg_shard_t shard) {
pushes[shard].set_value();
}
void set_pulled() {
pulled.set_value();
}
void interrupt(const std::string& why) {
readable.set_exception(std::system_error(
std::make_error_code(std::errc::interrupted), why));
recovered.set_exception(std::system_error(
std::make_error_code(std::errc::interrupted), why));
pulled.set_exception(std::system_error(
std::make_error_code(std::errc::interrupted), why));
for (auto& [pg_shard, pr] : pushes) {
pr.set_exception(std::system_error(
std::make_error_code(std::errc::interrupted), why));
}
}
void dump_detail(Formatter* f) const {
}
};
std::map<hobject_t, WaitForObjectRecovery> recovering;
hobject_t get_temp_recovery_object(
const hobject_t& target,
eversion_t version);
boost::container::flat_set<hobject_t> temp_contents;
void add_temp_obj(const hobject_t &oid) {
temp_contents.insert(oid);
}
void clear_temp_obj(const hobject_t &oid) {
temp_contents.erase(oid);
}
void clean_up(ceph::os::Transaction& t, const std::string& why);
};

View File

@ -6,10 +6,8 @@
#include "messages/MOSDRepOpReply.h"
#include "crimson/common/log.h"
#include "crimson/os/cyanstore/cyan_object.h"
#include "crimson/os/futurized_store.h"
#include "crimson/osd/shard_services.h"
#include "crimson/osd/pg.h"
#include "osd/PeeringState.h"
namespace {

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,108 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include "crimson/osd/recovery_backend.h"
#include "messages/MOSDPGPull.h"
#include "messages/MOSDPGPush.h"
#include "messages/MOSDPGPushReply.h"
#include "messages/MOSDPGRecoveryDelete.h"
#include "messages/MOSDPGRecoveryDeleteReply.h"
#include "os/ObjectStore.h"
class ReplicatedRecoveryBackend : public RecoveryBackend {
public:
ReplicatedRecoveryBackend(crimson::osd::PG& pg,
crimson::osd::ShardServices& shard_services,
crimson::os::CollectionRef coll,
PGBackend* backend)
: RecoveryBackend(pg, shard_services, coll, backend) {}
seastar::future<> handle_recovery_op(
Ref<MOSDFastDispatchOp> m) final;
seastar::future<> recover_object(
const hobject_t& soid,
eversion_t need) final;
seastar::future<> recover_delete(
const hobject_t& soid,
eversion_t need) final;
seastar::future<> push_delete(
const hobject_t& soid,
eversion_t need) final;
protected:
seastar::future<> handle_pull(
Ref<MOSDPGPull> m);
seastar::future<> handle_pull_response(
Ref<MOSDPGPush> m);
seastar::future<> handle_push(
Ref<MOSDPGPush> m);
seastar::future<> handle_push_reply(
Ref<MOSDPGPushReply> m);
seastar::future<> handle_recovery_delete(
Ref<MOSDPGRecoveryDelete> m);
seastar::future<> handle_recovery_delete_reply(
Ref<MOSDPGRecoveryDeleteReply> m);
seastar::future<> prep_push(
const hobject_t& soid,
eversion_t need,
std::map<pg_shard_t, PushOp>* pops,
const std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>& shards);
void prepare_pull(
PullOp& po,
PullInfo& pi,
const hobject_t& soid,
eversion_t need);
std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator> get_shards_to_push(
const hobject_t& soid);
seastar::future<ObjectRecoveryProgress> build_push_op(
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress& progress,
object_stat_sum_t* stat,
PushOp* pop);
seastar::future<bool> _handle_pull_response(
pg_shard_t from,
PushOp& pop,
PullOp* response,
ceph::os::Transaction* t);
void trim_pushed_data(
const interval_set<uint64_t> &copy_subset,
const interval_set<uint64_t> &intervals_received,
ceph::bufferlist data_received,
interval_set<uint64_t> *intervals_usable,
bufferlist *data_usable);
seastar::future<> submit_push_data(
const ObjectRecoveryInfo &recovery_info,
bool first,
bool complete,
bool clear_omap,
interval_set<uint64_t> &data_zeros,
const interval_set<uint64_t> &intervals_included,
ceph::bufferlist data_included,
ceph::bufferlist omap_header,
const std::map<string, bufferlist> &attrs,
const std::map<string, bufferlist> &omap_entries,
ceph::os::Transaction *t);
void submit_push_complete(
const ObjectRecoveryInfo &recovery_info,
ObjectStore::Transaction *t);
seastar::future<> _handle_push(
pg_shard_t from,
const PushOp &pop,
PushReplyOp *response,
ceph::os::Transaction *t);
seastar::future<bool> _handle_push_reply(
pg_shard_t peer,
const PushReplyOp &op,
PushOp *reply);
seastar::future<> on_local_recover_persist(
const hobject_t& soid,
const ObjectRecoveryInfo& _recovery_info,
bool is_delete,
epoch_t epoch_to_freeze);
seastar::future<> local_recover_delete(
const hobject_t& soid,
eversion_t need,
epoch_t epoch_frozen);
};

View File

@ -0,0 +1,165 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <memory>
#include <functional>
#include "crimson/osd/scheduler/mclock_scheduler.h"
#include "common/dout.h"
namespace dmc = crimson::dmclock;
using namespace std::placeholders;
#define dout_context cct
#define dout_subsys ceph_subsys_osd
#undef dout_prefix
#define dout_prefix *_dout
namespace crimson::osd::scheduler {
mClockScheduler::mClockScheduler(ConfigProxy &conf) :
scheduler(
std::bind(&mClockScheduler::ClientRegistry::get_info,
&client_registry,
_1),
dmc::AtLimit::Allow,
conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
{
conf.add_observer(this);
client_registry.update_from_config(conf);
}
void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf)
{
default_external_client_info.update(
conf.get_val<uint64_t>("osd_mclock_scheduler_client_res"),
conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"),
conf.get_val<uint64_t>("osd_mclock_scheduler_client_lim"));
internal_client_infos[
static_cast<size_t>(scheduler_class_t::background_recovery)].update(
conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_res"),
conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"),
conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_lim"));
internal_client_infos[
static_cast<size_t>(scheduler_class_t::background_best_effort)].update(
conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_res"),
conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"),
conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_lim"));
}
const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client(
const client_profile_id_t &client) const
{
auto ret = external_client_infos.find(client);
if (ret == external_client_infos.end())
return &default_external_client_info;
else
return &(ret->second);
}
const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
const scheduler_id_t &id) const {
switch (id.class_id) {
case scheduler_class_t::immediate:
ceph_assert(0 == "Cannot schedule immediate");
return (dmc::ClientInfo*)nullptr;
case scheduler_class_t::repop:
case scheduler_class_t::client:
return get_external_client(id.client_profile_id);
default:
ceph_assert(static_cast<size_t>(id.class_id) < internal_client_infos.size());
return &internal_client_infos[static_cast<size_t>(id.class_id)];
}
}
void mClockScheduler::dump(ceph::Formatter &f) const
{
}
void mClockScheduler::enqueue(item_t&& item)
{
auto id = get_scheduler_id(item);
auto cost = item.params.cost;
if (scheduler_class_t::immediate == item.params.klass) {
immediate.push_front(std::move(item));
} else {
scheduler.add_request(
std::move(item),
id,
cost);
}
}
void mClockScheduler::enqueue_front(item_t&& item)
{
immediate.push_back(std::move(item));
// TODO: item may not be immediate, update mclock machinery to permit
// putting the item back in the queue
}
item_t mClockScheduler::dequeue()
{
if (!immediate.empty()) {
auto ret = std::move(immediate.back());
immediate.pop_back();
return ret;
} else {
mclock_queue_t::PullReq result = scheduler.pull_request();
if (result.is_future()) {
ceph_assert(
0 == "Not implemented, user would have to be able to be woken up");
return std::move(*(item_t*)nullptr);
} else if (result.is_none()) {
ceph_assert(
0 == "Impossible, must have checked empty() first");
return std::move(*(item_t*)nullptr);
} else {
ceph_assert(result.is_retn());
auto &retn = result.get_retn();
return std::move(*retn.request);
}
}
}
const char** mClockScheduler::get_tracked_conf_keys() const
{
static const char* KEYS[] = {
"osd_mclock_scheduler_client_res",
"osd_mclock_scheduler_client_wgt",
"osd_mclock_scheduler_client_lim",
"osd_mclock_scheduler_background_recovery_res",
"osd_mclock_scheduler_background_recovery_wgt",
"osd_mclock_scheduler_background_recovery_lim",
"osd_mclock_scheduler_background_best_effort_res",
"osd_mclock_scheduler_background_best_effort_wgt",
"osd_mclock_scheduler_background_best_effort_lim",
NULL
};
return KEYS;
}
void mClockScheduler::handle_conf_change(
const ConfigProxy& conf,
const std::set<std::string> &changed)
{
client_registry.update_from_config(conf);
}
}

View File

@ -0,0 +1,130 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <ostream>
#include <map>
#include <vector>
#include "boost/variant.hpp"
#include "dmclock/src/dmclock_server.h"
#include "crimson/osd/scheduler/scheduler.h"
#include "common/config.h"
#include "include/cmp.h"
#include "common/ceph_context.h"
namespace crimson::osd::scheduler {
using client_id_t = uint64_t;
using profile_id_t = uint64_t;
struct client_profile_id_t {
client_id_t client_id;
profile_id_t profile_id;
};
WRITE_EQ_OPERATORS_2(client_profile_id_t, client_id, profile_id)
WRITE_CMP_OPERATORS_2(client_profile_id_t, client_id, profile_id)
struct scheduler_id_t {
scheduler_class_t class_id;
client_profile_id_t client_profile_id;
};
WRITE_EQ_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
WRITE_CMP_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
/**
* Scheduler implementation based on mclock.
*
* TODO: explain configs
*/
class mClockScheduler : public Scheduler, md_config_obs_t {
class ClientRegistry {
std::array<
crimson::dmclock::ClientInfo,
static_cast<size_t>(scheduler_class_t::client)
> internal_client_infos = {
// Placeholder, gets replaced with configured values
crimson::dmclock::ClientInfo(1, 1, 1),
crimson::dmclock::ClientInfo(1, 1, 1)
};
crimson::dmclock::ClientInfo default_external_client_info = {1, 1, 1};
std::map<client_profile_id_t,
crimson::dmclock::ClientInfo> external_client_infos;
const crimson::dmclock::ClientInfo *get_external_client(
const client_profile_id_t &client) const;
public:
void update_from_config(const ConfigProxy &conf);
const crimson::dmclock::ClientInfo *get_info(
const scheduler_id_t &id) const;
} client_registry;
using mclock_queue_t = crimson::dmclock::PullPriorityQueue<
scheduler_id_t,
item_t,
true,
true,
2>;
mclock_queue_t scheduler;
std::list<item_t> immediate;
static scheduler_id_t get_scheduler_id(const item_t &item) {
return scheduler_id_t{
item.params.klass,
client_profile_id_t{
item.params.owner,
0
}
};
}
public:
mClockScheduler(ConfigProxy &conf);
// Enqueue op in the back of the regular queue
void enqueue(item_t &&item) final;
// Enqueue the op in the front of the regular queue
void enqueue_front(item_t &&item) final;
// Return an op to be dispatch
item_t dequeue() final;
// Returns if the queue is empty
bool empty() const final {
return immediate.empty() && scheduler.empty();
}
// Formatted output of the queue
void dump(ceph::Formatter &f) const final;
void print(std::ostream &ostream) const final {
ostream << "mClockScheduler";
}
const char** get_tracked_conf_keys() const final;
void handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed) final;
};
}

View File

@ -0,0 +1,181 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2019 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <ostream>
#include <seastar/core/print.hh>
#include "crimson/osd/scheduler/scheduler.h"
#include "crimson/osd/scheduler/mclock_scheduler.h"
#include "common/WeightedPriorityQueue.h"
namespace crimson::osd::scheduler {
std::ostream &operator<<(std::ostream &lhs, const scheduler_class_t &c)
{
switch (c) {
case scheduler_class_t::background_best_effort:
return lhs << "background_best_effort";
case scheduler_class_t::background_recovery:
return lhs << "background_recovery";
case scheduler_class_t::client:
return lhs << "client";
case scheduler_class_t::repop:
return lhs << "repop";
case scheduler_class_t::immediate:
return lhs << "immediate";
default:
return lhs;
}
}
/**
* Implements Scheduler in terms of OpQueue
*
* Templated on queue type to avoid dynamic dispatch, T should implement
* OpQueue<Scheduleritem_t, client_t>. This adapter is mainly responsible for
* the boilerplate priority cutoff/strict concept which is needed for
* OpQueue based implementations.
*/
template <typename T>
class ClassedOpQueueScheduler : public Scheduler {
const scheduler_class_t cutoff;
T queue;
using priority_t = uint64_t;
std::array<
priority_t,
static_cast<size_t>(scheduler_class_t::immediate)
> priority_map = {
// Placeholder, gets replaced with configured values
0, 0, 0
};
static scheduler_class_t get_io_prio_cut(ConfigProxy &conf) {
if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
srand(time(NULL));
return (rand() % 2 < 1) ?
scheduler_class_t::repop : scheduler_class_t::immediate;
} else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
return scheduler_class_t::immediate;
} else {
return scheduler_class_t::repop;
}
}
bool use_strict(scheduler_class_t kl) const {
return static_cast<uint8_t>(kl) >= static_cast<uint8_t>(cutoff);
}
priority_t get_priority(scheduler_class_t kl) const {
ceph_assert(static_cast<size_t>(kl) <
static_cast<size_t>(scheduler_class_t::immediate));
return priority_map[static_cast<size_t>(kl)];
}
public:
template <typename... Args>
ClassedOpQueueScheduler(ConfigProxy &conf, Args&&... args) :
cutoff(get_io_prio_cut(conf)),
queue(std::forward<Args>(args)...)
{
priority_map[
static_cast<size_t>(scheduler_class_t::background_best_effort)
] = conf.get_val<uint64_t>("osd_scrub_priority");
priority_map[
static_cast<size_t>(scheduler_class_t::background_recovery)
] = conf.get_val<uint64_t>("osd_recovery_op_priority");
priority_map[
static_cast<size_t>(scheduler_class_t::client)
] = conf.get_val<uint64_t>("osd_client_op_priority");
priority_map[
static_cast<size_t>(scheduler_class_t::repop)
] = conf.get_val<uint64_t>("osd_client_op_priority");
}
void enqueue(item_t &&item) final {
if (use_strict(item.params.klass))
queue.enqueue_strict(
item.params.owner, get_priority(item.params.klass), std::move(item));
else
queue.enqueue(
item.params.owner, get_priority(item.params.klass),
item.params.cost, std::move(item));
}
void enqueue_front(item_t &&item) final {
if (use_strict(item.params.klass))
queue.enqueue_strict_front(
item.params.owner, get_priority(item.params.klass), std::move(item));
else
queue.enqueue_front(
item.params.owner, get_priority(item.params.klass),
item.params.cost, std::move(item));
}
bool empty() const final {
return queue.empty();
}
item_t dequeue() final {
return queue.dequeue();
}
void dump(ceph::Formatter &f) const final {
return queue.dump(&f);
}
void print(std::ostream &out) const final {
out << "ClassedOpQueueScheduler(queue=";
queue.print(out);
out << ", cutoff=" << cutoff << ")";
}
~ClassedOpQueueScheduler() final {};
};
SchedulerRef make_scheduler(ConfigProxy &conf)
{
const std::string _type = conf.get_val<std::string>("osd_op_queue");
const std::string *type = &_type;
if (*type == "debug_random") {
static const std::string index_lookup[] = { "mclock_scheduler",
"wpq" };
srand(time(NULL));
unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
type = &index_lookup[which];
}
if (*type == "wpq" ) {
// default is 'wpq'
return std::make_unique<
ClassedOpQueueScheduler<WeightedPriorityQueue<item_t, client_t>>>(
conf,
conf.get_val<uint64_t>("osd_op_pq_max_tokens_per_priority"),
conf->osd_op_pq_min_cost
);
} else if (*type == "mclock_scheduler") {
return std::make_unique<mClockScheduler>(conf);
} else {
ceph_assert("Invalid choice of wq" == 0);
return std::unique_ptr<mClockScheduler>();
}
}
std::ostream &operator<<(std::ostream &lhs, const Scheduler &rhs) {
rhs.print(lhs);
return lhs;
}
}

View File

@ -0,0 +1,82 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2019 Red Hat Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <seastar/core/future.hh>
#include <ostream>
#include "crimson/common/config_proxy.h"
namespace crimson::osd::scheduler {
enum class scheduler_class_t : uint8_t {
background_best_effort = 0,
background_recovery,
client,
repop,
immediate,
};
std::ostream &operator<<(std::ostream &, const scheduler_class_t &);
using client_t = uint64_t;
using cost_t = uint64_t;
struct params_t {
cost_t cost = 1;
client_t owner;
scheduler_class_t klass;
};
struct item_t {
params_t params;
seastar::promise<> wake;
};
/**
* Base interface for classes responsible for choosing
* op processing order in the OSD.
*/
class Scheduler {
public:
// Enqueue op for scheduling
virtual void enqueue(item_t &&item) = 0;
// Enqueue op for processing as though it were enqueued prior
// to other items already scheduled.
virtual void enqueue_front(item_t &&item) = 0;
// Returns true iff there are no ops scheduled
virtual bool empty() const = 0;
// Return next op to be processed
virtual item_t dequeue() = 0;
// Dump formatted representation for the queue
virtual void dump(ceph::Formatter &f) const = 0;
// Print human readable brief description with relevant parameters
virtual void print(std::ostream &out) const = 0;
// Destructor
virtual ~Scheduler() {};
};
std::ostream &operator<<(std::ostream &lhs, const Scheduler &);
using SchedulerRef = std::unique_ptr<Scheduler>;
SchedulerRef make_scheduler(ConfigProxy &);
}

View File

@ -5,6 +5,7 @@
#include "osd/osd_perf_counters.h"
#include "osd/PeeringState.h"
#include "crimson/common/config_proxy.h"
#include "crimson/mgr/client.h"
#include "crimson/mon/MonClient.h"
#include "crimson/net/Messenger.h"
@ -38,13 +39,49 @@ ShardServices::ShardServices(
monc(monc),
mgrc(mgrc),
store(store),
obc_registry(crimson::common::local_conf())
throttler(crimson::common::local_conf()),
obc_registry(crimson::common::local_conf()),
local_reserver(
&cct,
&finisher,
crimson::common::local_conf()->osd_max_backfills,
crimson::common::local_conf()->osd_min_recovery_priority),
remote_reserver(
&cct,
&finisher,
crimson::common::local_conf()->osd_max_backfills,
crimson::common::local_conf()->osd_min_recovery_priority)
{
perf = build_osd_logger(&cct);
cct.get_perfcounters_collection()->add(perf);
recoverystate_perf = build_recoverystate_perf(&cct);
cct.get_perfcounters_collection()->add(recoverystate_perf);
crimson::common::local_conf().add_observer(this);
}
const char** ShardServices::get_tracked_conf_keys() const
{
static const char* KEYS[] = {
"osd_max_backfills",
"osd_min_recovery_priority",
nullptr
};
return KEYS;
}
void ShardServices::handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed)
{
if (changed.count("osd_max_backfills")) {
local_reserver.set_max(conf->osd_max_backfills);
remote_reserver.set_max(conf->osd_max_backfills);
}
if (changed.count("osd_min_recovery_priority")) {
local_reserver.set_min_priority(conf->osd_min_recovery_priority);
remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
}
}
seastar::future<> ShardServices::send_to_osd(

View File

@ -13,6 +13,7 @@
#include "osd/PeeringState.h"
#include "crimson/osd/osdmap_service.h"
#include "crimson/osd/object_context.h"
#include "common/AsyncReserver.h"
namespace crimson::net {
class Messenger;
@ -39,7 +40,7 @@ namespace crimson::osd {
/**
* Represents services available to each PG
*/
class ShardServices {
class ShardServices : public md_config_obs_t {
using cached_map_t = boost::local_shared_ptr<const OSDMap>;
OSDMapService &osdmap_service;
crimson::net::Messenger &cluster_msgr;
@ -53,6 +54,9 @@ class ShardServices {
PerfCounters *perf = nullptr;
PerfCounters *recoverystate_perf = nullptr;
const char** get_tracked_conf_keys() const final;
void handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed) final;
public:
ShardServices(
OSDMapService &osdmap_service,
@ -80,8 +84,9 @@ public:
return osdmap_service;
}
// Op Tracking
// Op Management
OperationRegistry registry;
OperationThrottler throttler;
template <typename T, typename... Args>
auto start_operation(Args&&... args) {
@ -172,8 +177,18 @@ public:
crimson::osd::ObjectContextRegistry obc_registry;
// Async Reservers
private:
unsigned num_pgs = 0;
struct DirectFinisher {
void queue(Context *c) {
c->complete(0);
}
} finisher;
public:
AsyncReserver<spg_t, DirectFinisher> local_reserver;
AsyncReserver<spg_t, DirectFinisher> remote_reserver;
};
}

View File

@ -4527,7 +4527,7 @@ PG* OSD::_make_pg(
}
decode(ec_profile, p);
}
PGPool pool(cct, createmap, pgid.pool(), pi, name);
PGPool pool(createmap, pgid.pool(), pi, name);
PG *pg;
if (pi.type == pg_pool_t::TYPE_REPLICATED ||
pi.type == pg_pool_t::TYPE_ERASURE)

View File

@ -59,6 +59,7 @@
#include "messages/MOSDOp.h"
#include "common/EventTrace.h"
#include "osd/osd_perf_counters.h"
#include "common/Finisher.h"
#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
@ -556,8 +557,8 @@ public:
// -- backfill_reservation --
Finisher reserver_finisher;
AsyncReserver<spg_t> local_reserver;
AsyncReserver<spg_t> remote_reserver;
AsyncReserver<spg_t, Finisher> local_reserver;
AsyncReserver<spg_t, Finisher> remote_reserver;
// -- pg merge --
ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
@ -605,7 +606,7 @@ public:
void prune_pg_created();
void send_pg_created();
AsyncReserver<spg_t> snap_reserver;
AsyncReserver<spg_t, Finisher> snap_reserver;
void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
void queue_for_snap_trim(PG *pg);
void queue_for_scrub(PG *pg, bool with_high_priority);

View File

@ -104,7 +104,7 @@ void BufferedRecoveryMessages::send_info(
}
}
void PGPool::update(CephContext *cct, OSDMapRef map)
void PGPool::update(OSDMapRef map)
{
const pg_pool_t *pi = map->get_pg_pool(id);
if (!pi) {
@ -258,6 +258,16 @@ void PeeringState::update_history(const pg_history_t& new_history)
pl->on_info_history_change();
}
hobject_t PeeringState::earliest_backfill() const
{
hobject_t e = hobject_t::get_max();
for (const pg_shard_t& bt : get_backfill_targets()) {
const pg_info_t &pi = get_peer_info(bt);
e = std::min(pi.last_backfill, e);
}
return e;
}
void PeeringState::purge_strays()
{
if (is_premerge()) {
@ -421,7 +431,7 @@ void PeeringState::advance_map(
<< dendl;
update_osdmap_ref(osdmap);
pool.update(cct, osdmap);
pool.update(osdmap);
AdvMap evt(
osdmap, lastmap, newup, up_primary,
@ -430,7 +440,7 @@ void PeeringState::advance_map(
if (pool.info.last_change == osdmap_ref->get_epoch()) {
pl->on_pool_change();
}
readable_interval = pool.get_readable_interval();
readable_interval = pool.get_readable_interval(cct->_conf);
last_require_osd_release = osdmap->require_osd_release;
}
@ -7045,3 +7055,45 @@ ostream &operator<<(ostream &out, const PeeringState &ps) {
}
return out;
}
std::vector<pg_shard_t> PeeringState::get_replica_recovery_order() const
{
std::vector<std::pair<unsigned int, pg_shard_t>> replicas_by_num_missing,
async_by_num_missing;
replicas_by_num_missing.reserve(get_acting_recovery_backfill().size() - 1);
for (auto &p : get_acting_recovery_backfill()) {
if (p == get_primary()) {
continue;
}
auto pm = get_peer_missing().find(p);
assert(pm != get_peer_missing().end());
auto nm = pm->second.num_missing();
if (nm != 0) {
if (is_async_recovery_target(p)) {
async_by_num_missing.push_back(make_pair(nm, p));
} else {
replicas_by_num_missing.push_back(make_pair(nm, p));
}
}
}
// sort by number of missing objects, in ascending order.
auto func = [](const std::pair<unsigned int, pg_shard_t> &lhs,
const std::pair<unsigned int, pg_shard_t> &rhs) {
return lhs.first < rhs.first;
};
// acting goes first
std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func);
// then async_recovery_targets
std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func);
replicas_by_num_missing.insert(replicas_by_num_missing.end(),
async_by_num_missing.begin(), async_by_num_missing.end());
std::vector<pg_shard_t> ret;
ret.reserve(replicas_by_num_missing.size());
for (auto p : replicas_by_num_missing) {
ret.push_back(p.second);
}
return ret;
}

View File

@ -27,7 +27,6 @@
#include "common/ostream_temp.h"
struct PGPool {
CephContext* cct;
epoch_t cached_epoch;
int64_t id;
std::string name;
@ -35,25 +34,24 @@ struct PGPool {
pg_pool_t info;
SnapContext snapc; // the default pool snapc, ready to go.
PGPool(CephContext* cct, OSDMapRef map, int64_t i, const pg_pool_t& info,
PGPool(OSDMapRef map, int64_t i, const pg_pool_t& info,
const std::string& name)
: cct(cct),
cached_epoch(map->get_epoch()),
: cached_epoch(map->get_epoch()),
id(i),
name(name),
info(info) {
snapc = info.get_snap_context();
}
void update(CephContext *cct, OSDMapRef map);
void update(OSDMapRef map);
ceph::timespan get_readable_interval() const {
ceph::timespan get_readable_interval(ConfigProxy &conf) const {
double v = 0;
if (info.opts.get(pool_opts_t::READ_LEASE_INTERVAL, &v)) {
return ceph::make_timespan(v);
} else {
auto hbi = cct->_conf->osd_heartbeat_grace;
auto fac = cct->_conf->osd_pool_default_read_lease_ratio;
auto hbi = conf->osd_heartbeat_grace;
auto fac = conf->osd_pool_default_read_lease_ratio;
return ceph::make_timespan(hbi * fac);
}
}
@ -1741,6 +1739,9 @@ public:
/// Updates info.hit_set to hset_history, does not dirty
void update_hset(const pg_hit_set_history_t &hset_history);
/// Get all pg_shards that needs recovery
std::vector<pg_shard_t> get_replica_recovery_order() const;
/**
* update_history
*
@ -1795,6 +1796,12 @@ public:
bool transaction_applied,
bool async);
/**
* retrieve the min last_backfill among backfill targets
*/
hobject_t earliest_backfill() const;
/**
* Updates local log/missing to reflect new oob log update from primary
*/
@ -2155,6 +2162,9 @@ public:
bool is_recovery_unfound() const {
return state_test(PG_STATE_RECOVERY_UNFOUND);
}
bool is_backfilling() const {
return state_test(PG_STATE_BACKFILLING);
}
bool is_backfill_unfound() const {
return state_test(PG_STATE_BACKFILL_UNFOUND);
}

View File

@ -1818,16 +1818,6 @@ void PrimaryLogPG::do_request(
}
}
hobject_t PrimaryLogPG::earliest_backfill() const
{
hobject_t e = hobject_t::get_max();
for (const pg_shard_t& bt : get_backfill_targets()) {
const pg_info_t &pi = recovery_state.get_peer_info(bt);
e = std::min(pi.last_backfill, e);
}
return e;
}
/** do_op - do an op
* pg lock will be held (if multithreaded)
* osd_lock NOT held.
@ -12164,7 +12154,7 @@ void PrimaryLogPG::on_activate_complete()
publish_stats_to_osd();
if (get_backfill_targets().size()) {
last_backfill_started = earliest_backfill();
last_backfill_started = recovery_state.earliest_backfill();
new_backfill = true;
ceph_assert(!last_backfill_started.is_max());
dout(5) << __func__ << ": bft=" << get_backfill_targets()
@ -12447,6 +12437,7 @@ bool PrimaryLogPG::start_recovery_ops(
dout(10) << "deferring backfill due to NOREBALANCE" << dendl;
deferred_backfill = true;
} else if (!recovery_state.is_backfill_reserved()) {
/* DNMNOTE I think this branch is dead */
dout(10) << "deferring backfill due to !backfill_reserved" << dendl;
if (!backfill_reserving) {
dout(10) << "queueing RequestBackfill" << dendl;
@ -12997,7 +12988,7 @@ uint64_t PrimaryLogPG::recover_backfill(
// Initialize from prior backfill state
if (new_backfill) {
// on_activate() was called prior to getting here
ceph_assert(last_backfill_started == earliest_backfill());
ceph_assert(last_backfill_started == recovery_state.earliest_backfill());
new_backfill = false;
// initialize BackfillIntervals
@ -13263,7 +13254,7 @@ uint64_t PrimaryLogPG::recover_backfill(
hobject_t next_backfill_to_complete = backfills_in_flight.empty() ?
backfill_pos : *(backfills_in_flight.begin());
hobject_t new_last_backfill = earliest_backfill();
hobject_t new_last_backfill = recovery_state.earliest_backfill();
dout(10) << "starting new_last_backfill at " << new_last_backfill << dendl;
for (map<hobject_t, pg_stat_t>::iterator i =
pending_backfill_updates.begin();

View File

@ -1557,7 +1557,6 @@ public:
void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
private:
int do_scrub_ls(const MOSDOp *op, OSDOp *osd_op);
hobject_t earliest_backfill() const;
bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
uint64_t temp_seq; ///< last id for naming temp objects