Merge pull request #30639 from tchaikov/wip-crimson-readable

crimson/osd: implement readable/lease related methods

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Kefu Chai 2019-09-30 17:51:04 +08:00 committed by GitHub
commit a06e7e094e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 91 additions and 24 deletions

View File

@ -97,24 +97,19 @@ namespace ceph {
const std::chrono::time_point<Clock>& t) {
return m << std::fixed << std::chrono::duration<double>(
t.time_since_epoch()).count()
<< "s";
<< 's';
}
std::ostream& operator<<(std::ostream& m, const timespan& t) {
int64_t ns = t.count();
if (ns < 0) {
ns = -ns;
m << "-";
}
m << (ns / 1000000000ll);
ns %= 1000000000ll;
if (ns) {
static_assert(std::is_unsigned_v<timespan::rep>);
m << std::chrono::duration_cast<std::chrono::seconds>(t).count();
if (auto ns = (t % 1s).count(); ns > 0) {
char oldfill = m.fill();
m.fill('0');
m << '.' << std::setw(9) << ns;
m.fill(oldfill);
}
return m << "s";
return m << 's';
}
template<typename Clock,

View File

@ -30,10 +30,8 @@
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
#include "crimson/os/futurized_collection.h"
#include "os/Transaction.h"
#include "crimson/os/cyan_store.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/osd/exceptions.h"
#include "crimson/osd/pg_meta.h"
#include "crimson/osd/pg_backend.h"
@ -46,6 +44,15 @@ namespace {
}
}
namespace std::chrono {
std::ostream& operator<<(std::ostream& out, const signedspan& d)
{
auto s = std::chrono::duration_cast<std::chrono::seconds>(d).count();
auto ns = std::abs((d % 1s).count());
fmt::print(out, "{}{}s", s, ns ? fmt::format(".{:0>9}", ns) : "");
}
}
namespace ceph::osd {
using ceph::common::local_conf;
@ -129,6 +136,61 @@ bool PG::try_flush_or_schedule_async() {
return false;
}
void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
{
seastar::sleep(delay).then([last_peering_reset, this] {
shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
pgid,
last_peering_reset,
last_peering_reset,
PeeringState::CheckReadable{});
});
}
void PG::recheck_readable()
{
bool changed = false;
const auto mnow = shard_services.get_mnow();
if (peering_state.state_test(PG_STATE_WAIT)) {
auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub();
if (mnow < prior_readable_until_ub) {
logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
__func__, mnow, prior_readable_until_ub);
} else {
logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
__func__, mnow, prior_readable_until_ub);
peering_state.state_clear(PG_STATE_WAIT);
peering_state.clear_prior_readable_until_ub();
changed = true;
}
}
if (peering_state.state_test(PG_STATE_LAGGY)) {
auto readable_until = peering_state.get_readable_until();
if (readable_until == readable_until.zero()) {
logger().info("{} still laggy (mnow {}, readable_until zero)",
__func__, mnow);
} else if (mnow >= readable_until) {
logger().info("{} still laggy (mnow {} >= readable_until {})",
__func__, mnow, readable_until);
} else {
logger().info("{} no longer laggy (mnow {} < readable_until {})",
__func__, mnow, readable_until);
peering_state.state_clear(PG_STATE_LAGGY);
changed = true;
}
}
if (changed) {
publish_stats_to_osd();
if (!peering_state.state_test(PG_STATE_WAIT) &&
!peering_state.state_test(PG_STATE_LAGGY)) {
// TODO: requeue ops waiting for readable
}
}
}
void PG::on_activate(interval_set<snapid_t>)
{
projected_last_update = peering_state.get_info().last_update;
@ -194,9 +256,18 @@ HeartbeatStampsRef PG::get_hb_stamps(int peer)
return shard_services.get_hb_stamps(peer);
}
void PG::schedule_renew_lease(epoch_t plr, ceph::timespan delay)
void PG::schedule_renew_lease(epoch_t last_peering_reset, ceph::timespan delay)
{
#warning implement me
seastar::sleep(delay).then([last_peering_reset, this] {
shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
pgid,
last_peering_reset,
last_peering_reset,
RenewLease{});
});
}

View File

@ -258,12 +258,9 @@ public:
// Not needed yet
}
void queue_check_readable(epoch_t lpr, ceph::timespan delay) final {
#warning write me
}
void recheck_readable() final {
#warning write me
}
void queue_check_readable(epoch_t last_peering_reset,
ceph::timespan delay) final;
void recheck_readable() final;
void on_pool_change() final {
// Not needed yet

View File

@ -239,8 +239,11 @@ seastar::future<> ShardServices::osdmap_subscribe(version_t epoch, bool force_re
HeartbeatStampsRef ShardServices::get_hb_stamps(int peer)
{
#warning writeme
return HeartbeatStampsRef();
auto [stamps, added] = heartbeat_stamps.try_emplace(peer);
if (added) {
stamps->second = ceph::make_ref<HeartbeatStamps>(peer);
}
return stamps->second;
}
};

View File

@ -157,6 +157,7 @@ public:
return ceph::mono_clock::now() - startup_time;
}
HeartbeatStampsRef get_hb_stamps(int peer);
std::map<int, HeartbeatStampsRef> heartbeat_stamps;
};

View File

@ -806,7 +806,7 @@ void PrimaryLogPG::recheck_readable()
if (mnow < prior_readable_until_ub) {
dout(10) << __func__ << " still wait (mnow " << mnow
<< " < prior_readable_until_ub " << prior_readable_until_ub
<< dendl;
<< ")" << dendl;
} else {
dout(10) << __func__ << " no longer wait (mnow " << mnow
<< " >= prior_readable_until_ub " << prior_readable_until_ub