From 3fe103b3e879ee1dfd7b338a2b388153f5bad340 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 20 Sep 2019 15:00:14 -0700 Subject: [PATCH] crimson/osd: wire up local and remote async reservers Signed-off-by: Samuel Just --- src/crimson/osd/pg.h | 63 ++++++++++++++++++++++++------- src/crimson/osd/shard_services.cc | 38 ++++++++++++++++++- src/crimson/osd/shard_services.h | 16 +++++++- 3 files changed, 101 insertions(+), 16 deletions(-) diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 464d4e2bccb..eeb0d1c6650 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -10,6 +10,7 @@ #include #include #include +#include #include "common/dout.h" #include "crimson/net/Fwd.h" @@ -174,51 +175,85 @@ public: // will be needed for unblocking IO operations/peering } + template + void start_peering_event_operation(T &&evt) { + shard_services.start_operation( + this, + shard_services, + pg_whoami, + pgid, + std::forward(evt)); + } + void schedule_event_after( PGPeeringEventRef event, float delay) final { - ceph_assert(0 == "Not implemented yet"); + // TODO: this is kind of a hack -- once the start_operation call + // happens, the operation will be registered, but during the delay + // it's just a dangling future. It would be nice for the + // operation machinery to have something to take care of this. + (void)seastar::sleep(std::chrono::milliseconds(std::lround(delay*1000))).then( + [this, event=std::move(event)]() { + start_peering_event_operation(std::move(*event)); + }); } void request_local_background_io_reservation( unsigned priority, PGPeeringEventRef on_grant, PGPeeringEventRef on_preempt) final { - ceph_assert(0 == "Not implemented yet"); + shard_services.local_reserver.request_reservation( + pgid, + on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) { + start_peering_event_operation(std::move(*on_grant)); + }) : nullptr, + priority, + on_preempt ? make_lambda_context( + [this, on_preempt=std::move(on_preempt)] (int) { + start_peering_event_operation(std::move(*on_preempt)); + }) : nullptr); } void update_local_background_io_priority( unsigned priority) final { - ceph_assert(0 == "Not implemented yet"); + shard_services.local_reserver.update_priority( + pgid, + priority); } void cancel_local_background_io_reservation() final { - // Not implemented yet, but gets called on exit() from some states + shard_services.local_reserver.cancel_reservation( + pgid); } void request_remote_recovery_reservation( unsigned priority, PGPeeringEventRef on_grant, PGPeeringEventRef on_preempt) final { - ceph_assert(0 == "Not implemented yet"); + shard_services.remote_reserver.request_reservation( + pgid, + on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) { + start_peering_event_operation(std::move(*on_grant)); + }) : nullptr, + priority, + on_preempt ? make_lambda_context( + [this, on_preempt=std::move(on_preempt)] (int) { + start_peering_event_operation(std::move(*on_preempt)); + }) : nullptr); } void cancel_remote_recovery_reservation() final { - // Not implemented yet, but gets called on exit() from some states + shard_services.remote_reserver.cancel_reservation( + pgid); } void schedule_event_on_commit( ceph::os::Transaction &t, PGPeeringEventRef on_commit) final { t.register_on_commit( - new LambdaContext( - [this, on_commit=std::move(on_commit)](int r){ - shard_services.start_operation( - this, - shard_services, - pg_whoami, - pgid, - std::move(*on_commit)); + make_lambda_context( + [this, on_commit=std::move(on_commit)](int) { + start_peering_event_operation(std::move(*on_commit)); })); } diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 3804280b712..b89fd0b8310 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -5,6 +5,7 @@ #include "osd/osd_perf_counters.h" #include "osd/PeeringState.h" +#include "crimson/common/config_proxy.h" #include "crimson/mgr/client.h" #include "crimson/mon/MonClient.h" #include "crimson/net/Messenger.h" @@ -38,13 +39,48 @@ ShardServices::ShardServices( monc(monc), mgrc(mgrc), store(store), - obc_registry(crimson::common::local_conf()) + obc_registry(crimson::common::local_conf()), + local_reserver( + &cct, + &finisher, + crimson::common::local_conf()->osd_max_backfills, + crimson::common::local_conf()->osd_min_recovery_priority), + remote_reserver( + &cct, + &finisher, + crimson::common::local_conf()->osd_max_backfills, + crimson::common::local_conf()->osd_min_recovery_priority) { perf = build_osd_logger(&cct); cct.get_perfcounters_collection()->add(perf); recoverystate_perf = build_recoverystate_perf(&cct); cct.get_perfcounters_collection()->add(recoverystate_perf); + + crimson::common::local_conf().add_observer(this); +} + +const char** ShardServices::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "osd_max_backfills", + "osd_min_recovery_priority", + nullptr + }; + return KEYS; +} + +void ShardServices::handle_conf_change(const ConfigProxy& conf, + const std::set &changed) +{ + if (changed.count("osd_max_backfills")) { + local_reserver.set_max(conf->osd_max_backfills); + remote_reserver.set_max(conf->osd_max_backfills); + } + if (changed.count("osd_min_recovery_priority")) { + local_reserver.set_min_priority(conf->osd_min_recovery_priority); + remote_reserver.set_min_priority(conf->osd_min_recovery_priority); + } } seastar::future<> ShardServices::send_to_osd( diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 66ba0702714..67d03eff09e 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -13,6 +13,7 @@ #include "osd/PeeringState.h" #include "crimson/osd/osdmap_service.h" #include "crimson/osd/object_context.h" +#include "common/AsyncReserver.h" namespace crimson::net { class Messenger; @@ -39,7 +40,7 @@ namespace crimson::osd { /** * Represents services available to each PG */ -class ShardServices { +class ShardServices : public md_config_obs_t { using cached_map_t = boost::local_shared_ptr; OSDMapService &osdmap_service; crimson::net::Messenger &cluster_msgr; @@ -53,6 +54,9 @@ class ShardServices { PerfCounters *perf = nullptr; PerfCounters *recoverystate_perf = nullptr; + const char** get_tracked_conf_keys() const final; + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) final; public: ShardServices( OSDMapService &osdmap_service, @@ -172,8 +176,18 @@ public: crimson::osd::ObjectContextRegistry obc_registry; + // Async Reservers private: unsigned num_pgs = 0; + + struct DirectFinisher { + void queue(Context *c) { + c->complete(0); + } + } finisher; +public: + AsyncReserver local_reserver; + AsyncReserver remote_reserver; }; }