diff --git a/src/common/ceph_time.cc b/src/common/ceph_time.cc index c44b2eac14d..bb708bb4675 100644 --- a/src/common/ceph_time.cc +++ b/src/common/ceph_time.cc @@ -97,24 +97,19 @@ namespace ceph { const std::chrono::time_point& t) { return m << std::fixed << std::chrono::duration( t.time_since_epoch()).count() - << "s"; + << 's'; } std::ostream& operator<<(std::ostream& m, const timespan& t) { - int64_t ns = t.count(); - if (ns < 0) { - ns = -ns; - m << "-"; - } - m << (ns / 1000000000ll); - ns %= 1000000000ll; - if (ns) { + static_assert(std::is_unsigned_v); + m << std::chrono::duration_cast(t).count(); + if (auto ns = (t % 1s).count(); ns > 0) { char oldfill = m.fill(); m.fill('0'); m << '.' << std::setw(9) << ns; m.fill(oldfill); } - return m << "s"; + return m << 's'; } template(d).count(); + auto ns = std::abs((d % 1s).count()); + fmt::print(out, "{}{}s", s, ns ? fmt::format(".{:0>9}", ns) : ""); +} +} + namespace ceph::osd { using ceph::common::local_conf; @@ -129,6 +136,61 @@ bool PG::try_flush_or_schedule_async() { return false; } +void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay) +{ + seastar::sleep(delay).then([last_peering_reset, this] { + shard_services.start_operation( + this, + shard_services, + pg_whoami, + pgid, + last_peering_reset, + last_peering_reset, + PeeringState::CheckReadable{}); + }); +} + +void PG::recheck_readable() +{ + bool changed = false; + const auto mnow = shard_services.get_mnow(); + if (peering_state.state_test(PG_STATE_WAIT)) { + auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub(); + if (mnow < prior_readable_until_ub) { + logger().info("{} will wait (mnow {} < prior_readable_until_ub {})", + __func__, mnow, prior_readable_until_ub); + } else { + logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})", + __func__, mnow, prior_readable_until_ub); + peering_state.state_clear(PG_STATE_WAIT); + peering_state.clear_prior_readable_until_ub(); + changed = true; + } + } + if (peering_state.state_test(PG_STATE_LAGGY)) { + auto readable_until = peering_state.get_readable_until(); + if (readable_until == readable_until.zero()) { + logger().info("{} still laggy (mnow {}, readable_until zero)", + __func__, mnow); + } else if (mnow >= readable_until) { + logger().info("{} still laggy (mnow {} >= readable_until {})", + __func__, mnow, readable_until); + } else { + logger().info("{} no longer laggy (mnow {} < readable_until {})", + __func__, mnow, readable_until); + peering_state.state_clear(PG_STATE_LAGGY); + changed = true; + } + } + if (changed) { + publish_stats_to_osd(); + if (!peering_state.state_test(PG_STATE_WAIT) && + !peering_state.state_test(PG_STATE_LAGGY)) { + // TODO: requeue ops waiting for readable + } + } +} + void PG::on_activate(interval_set) { projected_last_update = peering_state.get_info().last_update; @@ -194,9 +256,18 @@ HeartbeatStampsRef PG::get_hb_stamps(int peer) return shard_services.get_hb_stamps(peer); } -void PG::schedule_renew_lease(epoch_t plr, ceph::timespan delay) +void PG::schedule_renew_lease(epoch_t last_peering_reset, ceph::timespan delay) { -#warning implement me + seastar::sleep(delay).then([last_peering_reset, this] { + shard_services.start_operation( + this, + shard_services, + pg_whoami, + pgid, + last_peering_reset, + last_peering_reset, + RenewLease{}); + }); } diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index b25c2f911fc..6a87b701c1d 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -258,12 +258,9 @@ public: // Not needed yet } - void queue_check_readable(epoch_t lpr, ceph::timespan delay) final { -#warning write me - } - void recheck_readable() final { -#warning write me - } + void queue_check_readable(epoch_t last_peering_reset, + ceph::timespan delay) final; + void recheck_readable() final; void on_pool_change() final { // Not needed yet diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index e41b77ac27a..f727c66ca24 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -239,8 +239,11 @@ seastar::future<> ShardServices::osdmap_subscribe(version_t epoch, bool force_re HeartbeatStampsRef ShardServices::get_hb_stamps(int peer) { -#warning writeme - return HeartbeatStampsRef(); + auto [stamps, added] = heartbeat_stamps.try_emplace(peer); + if (added) { + stamps->second = ceph::make_ref(peer); + } + return stamps->second; } }; diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index d9a2112b6b2..407fa8a3643 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -157,6 +157,7 @@ public: return ceph::mono_clock::now() - startup_time; } HeartbeatStampsRef get_hb_stamps(int peer); + std::map heartbeat_stamps; }; diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index ab0e55d7417..52386a344ba 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -806,7 +806,7 @@ void PrimaryLogPG::recheck_readable() if (mnow < prior_readable_until_ub) { dout(10) << __func__ << " still wait (mnow " << mnow << " < prior_readable_until_ub " << prior_readable_until_ub - << dendl; + << ")" << dendl; } else { dout(10) << __func__ << " no longer wait (mnow " << mnow << " >= prior_readable_until_ub " << prior_readable_until_ub