crimson/osd: wire up local and remote async reservers

Signed-off-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Samuel Just 2019-09-20 15:00:14 -07:00 committed by Xuehan Xu
parent ce659e3fd7
commit 3fe103b3e8
3 changed files with 101 additions and 16 deletions

View File

@ -10,6 +10,7 @@
#include <boost/smart_ptr/local_shared_ptr.hpp>
#include <seastar/core/future.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
#include "common/dout.h"
#include "crimson/net/Fwd.h"
@ -174,51 +175,85 @@ public:
// will be needed for unblocking IO operations/peering
}
template <typename T>
void start_peering_event_operation(T &&evt) {
shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
pgid,
std::forward<T>(evt));
}
void schedule_event_after(
PGPeeringEventRef event,
float delay) final {
ceph_assert(0 == "Not implemented yet");
// TODO: this is kind of a hack -- once the start_operation call
// happens, the operation will be registered, but during the delay
// it's just a dangling future. It would be nice for the
// operation machinery to have something to take care of this.
(void)seastar::sleep(std::chrono::milliseconds(std::lround(delay*1000))).then(
[this, event=std::move(event)]() {
start_peering_event_operation(std::move(*event));
});
}
void request_local_background_io_reservation(
unsigned priority,
PGPeeringEventRef on_grant,
PGPeeringEventRef on_preempt) final {
ceph_assert(0 == "Not implemented yet");
shard_services.local_reserver.request_reservation(
pgid,
on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
start_peering_event_operation(std::move(*on_grant));
}) : nullptr,
priority,
on_preempt ? make_lambda_context(
[this, on_preempt=std::move(on_preempt)] (int) {
start_peering_event_operation(std::move(*on_preempt));
}) : nullptr);
}
void update_local_background_io_priority(
unsigned priority) final {
ceph_assert(0 == "Not implemented yet");
shard_services.local_reserver.update_priority(
pgid,
priority);
}
void cancel_local_background_io_reservation() final {
// Not implemented yet, but gets called on exit() from some states
shard_services.local_reserver.cancel_reservation(
pgid);
}
void request_remote_recovery_reservation(
unsigned priority,
PGPeeringEventRef on_grant,
PGPeeringEventRef on_preempt) final {
ceph_assert(0 == "Not implemented yet");
shard_services.remote_reserver.request_reservation(
pgid,
on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
start_peering_event_operation(std::move(*on_grant));
}) : nullptr,
priority,
on_preempt ? make_lambda_context(
[this, on_preempt=std::move(on_preempt)] (int) {
start_peering_event_operation(std::move(*on_preempt));
}) : nullptr);
}
void cancel_remote_recovery_reservation() final {
// Not implemented yet, but gets called on exit() from some states
shard_services.remote_reserver.cancel_reservation(
pgid);
}
void schedule_event_on_commit(
ceph::os::Transaction &t,
PGPeeringEventRef on_commit) final {
t.register_on_commit(
new LambdaContext(
[this, on_commit=std::move(on_commit)](int r){
shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
pgid,
std::move(*on_commit));
make_lambda_context(
[this, on_commit=std::move(on_commit)](int) {
start_peering_event_operation(std::move(*on_commit));
}));
}

View File

@ -5,6 +5,7 @@
#include "osd/osd_perf_counters.h"
#include "osd/PeeringState.h"
#include "crimson/common/config_proxy.h"
#include "crimson/mgr/client.h"
#include "crimson/mon/MonClient.h"
#include "crimson/net/Messenger.h"
@ -38,13 +39,48 @@ ShardServices::ShardServices(
monc(monc),
mgrc(mgrc),
store(store),
obc_registry(crimson::common::local_conf())
obc_registry(crimson::common::local_conf()),
local_reserver(
&cct,
&finisher,
crimson::common::local_conf()->osd_max_backfills,
crimson::common::local_conf()->osd_min_recovery_priority),
remote_reserver(
&cct,
&finisher,
crimson::common::local_conf()->osd_max_backfills,
crimson::common::local_conf()->osd_min_recovery_priority)
{
perf = build_osd_logger(&cct);
cct.get_perfcounters_collection()->add(perf);
recoverystate_perf = build_recoverystate_perf(&cct);
cct.get_perfcounters_collection()->add(recoverystate_perf);
crimson::common::local_conf().add_observer(this);
}
const char** ShardServices::get_tracked_conf_keys() const
{
static const char* KEYS[] = {
"osd_max_backfills",
"osd_min_recovery_priority",
nullptr
};
return KEYS;
}
void ShardServices::handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed)
{
if (changed.count("osd_max_backfills")) {
local_reserver.set_max(conf->osd_max_backfills);
remote_reserver.set_max(conf->osd_max_backfills);
}
if (changed.count("osd_min_recovery_priority")) {
local_reserver.set_min_priority(conf->osd_min_recovery_priority);
remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
}
}
seastar::future<> ShardServices::send_to_osd(

View File

@ -13,6 +13,7 @@
#include "osd/PeeringState.h"
#include "crimson/osd/osdmap_service.h"
#include "crimson/osd/object_context.h"
#include "common/AsyncReserver.h"
namespace crimson::net {
class Messenger;
@ -39,7 +40,7 @@ namespace crimson::osd {
/**
* Represents services available to each PG
*/
class ShardServices {
class ShardServices : public md_config_obs_t {
using cached_map_t = boost::local_shared_ptr<const OSDMap>;
OSDMapService &osdmap_service;
crimson::net::Messenger &cluster_msgr;
@ -53,6 +54,9 @@ class ShardServices {
PerfCounters *perf = nullptr;
PerfCounters *recoverystate_perf = nullptr;
const char** get_tracked_conf_keys() const final;
void handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed) final;
public:
ShardServices(
OSDMapService &osdmap_service,
@ -172,8 +176,18 @@ public:
crimson::osd::ObjectContextRegistry obc_registry;
// Async Reservers
private:
unsigned num_pgs = 0;
struct DirectFinisher {
void queue(Context *c) {
c->complete(0);
}
} finisher;
public:
AsyncReserver<spg_t, DirectFinisher> local_reserver;
AsyncReserver<spg_t, DirectFinisher> remote_reserver;
};
}