Merge PR #57454 into main

* refs/pull/57454/head:
	mds/quiesce-db: optimize peer updates
	mds/quiesce-db: track db epoch separately from the membership epoch
	mds/quiesce-db: test that a peer on a newer membership epoch can ack a root

Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
Patrick Donnelly 2024-05-15 23:01:16 -04:00
commit cbb1da1189
No known key found for this signature in database
GPG Key ID: FA47FD0B0367D313
4 changed files with 187 additions and 77 deletions

View File

@ -54,23 +54,23 @@ operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceState& qs)
{
switch (qs) {
case QS__INVALID:
return os << "QS__INVALID (" << (int)qs << ")";
return os << "QS__INVALID";
case QS_QUIESCING:
return os << "QS_QUIESCING (" << (int)qs << ")";
return os << "QS_QUIESCING";
case QS_QUIESCED:
return os << "QS_QUIESCED (" << (int)qs << ")";
return os << "QS_QUIESCED";
case QS_RELEASING:
return os << "QS_RELEASING (" << (int)qs << ")";
return os << "QS_RELEASING";
case QS_RELEASED:
return os << "QS_RELEASED (" << (int)qs << ")";
return os << "QS_RELEASED";
case QS_FAILED:
return os << "QS_FAILED (" << (int)qs << ")";
return os << "QS_FAILED";
case QS_CANCELED:
return os << "QS_CANCELED (" << (int)qs << ")";
return os << "QS_CANCELED";
case QS_TIMEDOUT:
return os << "QS_TIMEDOUT (" << (int)qs << ")";
return os << "QS_TIMEDOUT";
case QS_EXPIRED:
return os << "QS_EXPIRED (" << (int)qs << ")";
return os << "QS_EXPIRED";
default:
return os << "!Unknown quiesce state! (" << (int)qs << ")";
}

View File

@ -58,7 +58,7 @@ bool QuiesceDbManager::db_thread_has_work() const
|| pending_acks.size() > 0
|| pending_requests.size() > 0
|| pending_db_updates.size() > 0
|| (agent_callback.has_value() && agent_callback->if_newer < db_version())
|| (agent_callback.has_value() && agent_callback->if_newer < db.version())
|| (cluster_membership.has_value() && cluster_membership->epoch != membership.epoch);
}
@ -105,16 +105,21 @@ void* QuiesceDbManager::quiesce_db_thread_main()
ls.unlock();
if (membership.is_leader()) {
if (leader_bootstrap(std::move(db_updates), next_event_at_age)) {
const QuiesceTimeInterval bootstrap_delay = leader_bootstrap(std::move(db_updates));
if (bootstrap_delay == QuiesceTimeInterval::zero()){
// we're good to process things
next_event_at_age = leader_upkeep(std::move(acks), std::move(requests));
} else {
// not yet there. Put the requests back onto the queue
// not yet there. Put the requests back onto the queue and wait for updates
ls.lock();
while (!requests.empty()) {
pending_requests.emplace_front(std::move(requests.back()));
requests.pop_back();
}
if (pending_db_updates.empty()) {
dout(5) << "bootstrap: waiting for peer updates with timeout " << bootstrap_delay << dendl;
submit_condition.wait_for(ls, bootstrap_delay);
}
continue;
}
} else {
@ -129,16 +134,16 @@ void* QuiesceDbManager::quiesce_db_thread_main()
complete_requests();
// by default, only send ack if the version has changed
bool send_ack = last_acked != db_version();
QuiesceMap quiesce_map(db_version());
bool send_ack = last_acked != db.version();
QuiesceMap quiesce_map(db.version());
{
std::lock_guard lc(agent_mutex);
if (agent_callback) {
if (agent_callback->if_newer < db_version()) {
dout(20) << "notifying agent with db version " << db_version() << dendl;
if (agent_callback->if_newer < db.version()) {
dout(20) << "notifying agent with db version " << db.version() << dendl;
calculate_quiesce_map(quiesce_map);
send_ack = agent_callback->notify(quiesce_map);
agent_callback->if_newer = db_version();
agent_callback->if_newer = db.version();
} else {
send_ack = false;
}
@ -251,6 +256,8 @@ QuiesceDbManager::membership_upkeep()
for (auto peer : cluster_membership->members) {
peers.try_emplace(peer);
}
// update the db epoch
db.epoch = cluster_membership->epoch;
} else {
peers.clear();
// abort awaits with EINPROGRESS
@ -291,14 +298,8 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates
QuiesceDbListing &update = db_updates.back().db;
if (update.db_version.epoch != membership.epoch) {
dout(10) << "ignoring db update from another epoch: " << update.db_version << " != " << db_version() << dendl;
return QuiesceTimeInterval::max();
}
if (update.db_version.set_version == 0) {
// this is a call from a leader
// to upload our local db version
// this is a call from the leader to upload our local db version
update.sets = db.sets;
update.db_version.set_version = db.set_version;
update.db_age = db.get_age();
@ -311,6 +312,7 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates
dout(10) << "significant db_time_zero change to " << time_zero << " from " << db.time_zero << dendl;
}
db.time_zero = time_zero;
db.epoch = update.db_version.epoch;
if (db.set_version > update.db_version.set_version) {
dout(3) << "got an older version of DB from the leader: " << update.db_version.set_version << " < " << db.set_version << dendl;
@ -327,8 +329,11 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates
return QuiesceTimeInterval::max();
}
bool QuiesceDbManager::leader_bootstrap(decltype(pending_db_updates)&& db_updates, QuiesceTimeInterval &next_event_at_age)
QuiesceTimeInterval QuiesceDbManager::leader_bootstrap(decltype(pending_db_updates)&& db_updates)
{
const QuiesceTimeInterval PEER_DISCOVERY_INTERVAL = std::chrono::seconds(1);
QuiesceTimeInterval bootstrap_delay = PEER_DISCOVERY_INTERVAL;
// check that we've heard from all peers in this epoch
std::unordered_set<QuiesceInterface::PeerId> unknown_peers;
for (auto&& [peer, info] : peers) {
@ -348,7 +353,7 @@ bool QuiesceDbManager::leader_bootstrap(decltype(pending_db_updates)&& db_update
if (db.set_version < update.db_version.set_version) {
dout(3) << "preferring version from peer "
<< from << " (" << update.db_version
<< ") over mine (" << db_version() << ")"
<< ") over mine (" << db.version() << ")"
<< " and incrementing it to collect acks" << dendl;
db.time_zero = QuiesceClock::now() - update.db_age;
db.set_version = update.db_version.set_version + 1;
@ -361,24 +366,33 @@ bool QuiesceDbManager::leader_bootstrap(decltype(pending_db_updates)&& db_update
db_updates.pop();
}
QuiesceTimePoint const now = QuiesceClock::now();
for (auto & peer: unknown_peers) {
PeerInfo & info = peers[peer];
QuiesceTimePoint next_discovery = info.last_seen + std::chrono::seconds(1);
if (info.last_seen == QuiesceClock::zero() || next_discovery < QuiesceClock::now()) {
QuiesceTimePoint next_discovery = info.last_activity + PEER_DISCOVERY_INTERVAL;
if (next_discovery < now) {
// send a discovery request to unknown peers
dout(5) << " sending a discovery request to " << peer << dendl;
membership.send_listing_to(peer, QuiesceDbListing(membership.epoch));
info.last_seen = QuiesceClock::now();
next_discovery = info.last_seen + std::chrono::seconds(1);
info.last_activity = now;
next_discovery = info.last_activity + PEER_DISCOVERY_INTERVAL;
}
// next_discovery is >= now
if (bootstrap_delay > next_discovery - now) {
bootstrap_delay = (next_discovery - now);
}
QuiesceTimeInterval next_discovery_at_age = next_discovery - db.time_zero;
next_event_at_age = std::min(next_event_at_age, next_discovery_at_age);
}
// true if all peers are known
return unknown_peers.empty();
bool all_peers_known = unknown_peers.empty();
if (!all_peers_known) {
dout(10) << "unknown peers: " << unknown_peers << dendl;
}
// add some margin to hit the discovery interval for the earliest discovery.
const QuiesceTimeInterval a_little_more = std::chrono::milliseconds(100);
return all_peers_known ? QuiesceTimeInterval::zero() : (bootstrap_delay + a_little_more);
}
QuiesceTimeInterval QuiesceDbManager::leader_upkeep(decltype(pending_acks)&& acks, decltype(pending_requests)&& requests)
@ -412,7 +426,7 @@ void QuiesceDbManager::complete_requests() {
r.clear();
if (membership.leader == membership.me) {
r.db_age = db.get_age();
r.db_version = db_version();
r.db_version = db.version();
if (req->request.set_id) {
Db::Sets::const_iterator it = db.sets.find(*req->request.set_id);
@ -436,19 +450,25 @@ void QuiesceDbManager::leader_record_ack(QuiesceInterface::PeerId from, QuiesceM
auto it = peers.find(from);
if (it == peers.end()) {
dout(5) << "unknown peer " << from << dendl;
// ignore updates from unknown peers
return;
}
auto & info = it->second;
if (diff_map.db_version > db_version()) {
dout(3) << "ignoring unknown version ack by rank " << from << " (" << diff_map.db_version << " > " << db_version() << ")" << dendl;
dout(5) << "will send the peer a full DB" << dendl;
info.diff_map.clear();
if (diff_map.db_version > db.version()) {
dout(15) << "future version ack by peer " << from << " (" << diff_map.db_version << " > " << db.version() << ")" << dendl;
if (diff_map.db_version.epoch > db.version().epoch && diff_map.db_version.set_version <= db.version().set_version) {
dout(15) << "my epoch is behind, ignoring this until my membership is updated" << dendl;
} else {
dout(5) << "will send the peer a full DB" << dendl;
info.clear();
}
} else {
dout(20) << "ack " << diff_map << " from peer " << from << dendl;
info.diff_map = std::move(diff_map);
info.last_seen = QuiesceClock::now();
info.last_activity = QuiesceClock::now();
}
}
@ -808,11 +828,33 @@ int QuiesceDbManager::leader_update_set(Db::Sets::value_type& set_it, const Quie
QuiesceTimeInterval QuiesceDbManager::leader_upkeep_db()
{
std::map<QuiesceInterface::PeerId, std::deque<std::reference_wrapper<Db::Sets::value_type>>> peer_updates;
QuiesceTimeInterval next_event_at_age = QuiesceTimeInterval::max();
QuiesceSetVersion max_set_version = db.set_version;
struct PeerUpdate {
QuiesceInterface::PeerId peer;
PeerInfo& info;
std::deque<std::reference_wrapper<Db::Sets::value_type>> set_refs;
PeerUpdate(QuiesceInterface::PeerId peer, PeerInfo& info)
: peer(peer)
, info(info)
{}
QuiesceSetVersion known_set_version() const
{
return info.diff_map.db_version.set_version;
}
};
// populate peer_updates with peers except me
std::vector<PeerUpdate> peer_updates;
for (auto& [peer, info]: peers) {
// no need to replicate to myself
if (peer != membership.me) {
peer_updates.emplace_back(peer, info);
}
}
for(auto & set_it: db.sets) {
auto & [set_id, set] = set_it;
auto next_set_event_at_age = leader_upkeep_set(set_it);
@ -820,14 +862,10 @@ QuiesceTimeInterval QuiesceDbManager::leader_upkeep_db()
max_set_version = std::max(max_set_version, set.version);
next_event_at_age = std::min(next_event_at_age, next_set_event_at_age);
for(auto const & [peer, info]: peers) {
for(auto & peer_update: peer_updates) {
// update remote peers if their version is lower than this set's
// don't update myself
if (peer == membership.me) {
continue;
}
if (info.diff_map.db_version.set_version < set.version) {
peer_updates[peer].emplace_back(set_it);
if (peer_update.known_set_version() < set.version) {
peer_update.set_refs.emplace_back(set_it);
}
}
}
@ -835,21 +873,34 @@ QuiesceTimeInterval QuiesceDbManager::leader_upkeep_db()
db.set_version = max_set_version;
// update the peers
for (auto &[peer, sets]: peer_updates) {
QuiesceDbListing update;
update.db_age = db.get_age();
update.db_version = db_version();
std::ranges::copy(sets, std::inserter(update.sets, update.sets.end()));
const auto now = QuiesceClock::now();
static const QuiesceTimeInterval PEER_REPEATED_UPDATE_INTERVAL = std::chrono::seconds(1);
for (auto const & peer_update: peer_updates) {
if (peer_update.info.last_sent_version == db.version()) {
if (now < (peer_update.info.last_activity + PEER_REPEATED_UPDATE_INTERVAL)) {
// don't spam the peer with the same version
continue;
}
dout(5) << "repeated update of the peer " << peer_update.peer << " with version " << db.version() << dendl;
}
dout(20) << "updating peer " << peer << " with " << sets.size()
QuiesceDbListing listing;
listing.db_age = db.get_age();
listing.db_version = db.version();
std::ranges::copy(peer_update.set_refs, std::inserter(listing.sets, listing.sets.end()));
dout(20) << "updating peer " << peer_update.peer << " with " << peer_update.set_refs.size()
<< " sets modified in db version range ("
<< peers[peer].diff_map.db_version << ".." << db.set_version << "]" << dendl;
<< peer_update.known_set_version() << ".." << db.set_version << "]" << dendl;
auto rc = membership.send_listing_to(peer, std::move(update));
auto rc = membership.send_listing_to(peer_update.peer, std::move(listing));
if (rc != 0) {
dout(1) << "ERROR (" << rc << ") trying to replicate db version "
<< db.set_version << " with " << sets.size()
<< " sets to the peer " << peer << dendl;
<< db.set_version << " with " << peer_update.set_refs.size()
<< " sets to the peer " << peer_update.peer << dendl;
} else {
peer_update.info.last_activity = now;
peer_update.info.last_sent_version = db.version();
}
}
@ -874,13 +925,10 @@ size_t QuiesceDbManager::check_peer_reports(const QuiesceSetId& set_id, const Qu
max_reported_state = QS__INVALID;
size_t up_to_date_peers = 0;
std::multimap<QuiesceState, std::pair<QuiesceInterface::PeerId, QuiesceDbVersion>> reporting_peers;
for (auto& [peer, info] : peers) {
// we consider the last bit of information we had from a given peer
// however, we want to skip peers which haven't been bootstrapped yet
if (info.diff_map.db_version.set_version == 0) {
continue;
}
// we consider the last bit of information we had from the peer
auto dit = info.diff_map.roots.find(root);
QuiesceState reported_state = set.get_requested_member_state();
@ -892,6 +940,7 @@ size_t QuiesceDbManager::check_peer_reports(const QuiesceSetId& set_id, const Qu
continue;
}
reported_state = pr_state.state;
reporting_peers.insert({pr_state.state, {peer, info.diff_map.db_version}});
}
// but we only consider the peer up to date given the version
@ -904,10 +953,18 @@ size_t QuiesceDbManager::check_peer_reports(const QuiesceSetId& set_id, const Qu
}
if (min_reported_state == QS__MAX) {
// this means that we had 0 eligible peer reports
min_reported_state = set.get_requested_member_state();
max_reported_state = set.get_requested_member_state();
}
dout(20) << dsetroot("")
<< "up_to_date_peers: " << up_to_date_peers
<< " min_reported_state: " << min_reported_state
<< " max_reported_state: " << max_reported_state
<< " peer_acks: " << reporting_peers
<< dendl;
return up_to_date_peers;
}
@ -1119,7 +1176,7 @@ static QuiesceTimeInterval get_root_ttl(const QuiesceSet & set, const QuiesceSet
void QuiesceDbManager::calculate_quiesce_map(QuiesceMap &map)
{
map.roots.clear();
map.db_version = db_version();
map.db_version = db.version();
auto db_age = db.get_age();
for(auto & [set_id, set]: db.sets) {

View File

@ -227,6 +227,7 @@ class QuiesceDbManager {
// the database.
struct Db {
QuiesceTimePoint time_zero;
epoch_t epoch;
QuiesceSetVersion set_version = 0;
using Sets = std::unordered_map<QuiesceSetId, QuiesceSet>;
Sets sets;
@ -235,25 +236,34 @@ class QuiesceDbManager {
return QuiesceClock::now() - time_zero;
}
void clear() {
set_version = 0;
set_version = 0;
epoch = 0;
sets.clear();
time_zero = QuiesceClock::now();
}
} db;
QuiesceDbVersion db_version() const { return {membership.epoch, db.set_version}; }
QuiesceDbVersion version() const { return {epoch, set_version}; }
} db;
QuiesceClusterMembership membership;
struct PeerInfo {
QuiesceMap diff_map;
QuiesceTimePoint last_seen;
PeerInfo(QuiesceMap&& diff_map, QuiesceTimePoint last_seen)
QuiesceTimePoint last_activity;
QuiesceDbVersion last_sent_version;
PeerInfo(QuiesceMap&& diff_map, QuiesceTimePoint last_activity)
: diff_map(diff_map)
, last_seen(last_seen)
, last_activity(last_activity)
{
}
PeerInfo() { }
PeerInfo() {
last_activity = QuiesceTimePoint::min();
}
void clear() {
diff_map.clear();
last_activity = QuiesceTimePoint::min();
last_sent_version = {};
}
};
std::unordered_map<QuiesceInterface::PeerId, PeerInfo> peers;
@ -278,7 +288,8 @@ class QuiesceDbManager {
std::pair<IsMemberBool, ShouldExitBool> membership_upkeep();
QuiesceTimeInterval replica_upkeep(decltype(pending_db_updates)&& db_updates);
bool leader_bootstrap(decltype(pending_db_updates)&& db_updates, QuiesceTimeInterval &next_event_at_age);
// returns zero interval if bootstrapped, otherwise the time to sleep while we wait for peer responses
QuiesceTimeInterval leader_bootstrap(decltype(pending_db_updates)&& db_updates);
QuiesceTimeInterval leader_upkeep(decltype(pending_acks)&& acks, decltype(pending_requests)&& requests);

View File

@ -77,9 +77,6 @@ class QuiesceDbTest: public testing::Test {
Db& internal_db() {
return db;
}
QuiesceClusterMembership& internal_membership() {
return membership;
}
decltype(pending_requests)& internal_pending_requests() {
return pending_requests;
}
@ -89,6 +86,11 @@ class QuiesceDbTest: public testing::Test {
decltype(peers)& internal_peers() {
return peers;
}
epoch_t bump_epoch() {
std::lock_guard l(submit_mutex);
submit_condition.notify_all();
return ++cluster_membership->epoch;
}
};
epoch_t epoch = 0;
@ -1617,4 +1619,44 @@ TEST_F(QuiesceDbTest, MultiRankRecovery)
ASSERT_EQ(2, last_request->response.sets.size());
EXPECT_EQ(QS_QUIESCING, last_request->response.sets.at("set1").rstate.state);
EXPECT_EQ(QS_QUIESCING, last_request->response.sets.at("set2").rstate.state);
}
/* ========================================= */
TEST_F(QuiesceDbTest, AckDuringEpochMismatch)
{
ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2) }));
managers.at(mds_gid_t(1))->reset_agent_callback(QUIESCING_AGENT_CB);
ASSERT_EQ(OK(), run_request([](auto& r) {
r.set_id = "set1";
r.timeout = sec(60);
r.expiration = sec(60);
r.include_roots({ "root1" });
}));
// we are quiescing because manager 2 hasn't yet acknowledged the new state
EXPECT_EQ(QS_QUIESCING, last_request->response.sets.at("set1").rstate.state);
// imagine that a new epoch has started on the peer before it did for the leader
managers.at(mds_gid_t(2))->bump_epoch();
// do the acking while our epoch is higher
{
// wait for the agent to ack root1 as failed
auto did_ack = add_ack_hook([](auto gid, auto const& ack) {
return gid == mds_gid_t(2) && ack.roots.contains("file:/root1") && ack.roots.at("file:/root1").state == QS_QUIESCED;
});
// allow acks
managers.at(mds_gid_t(2))->reset_agent_callback(QUIESCING_AGENT_CB);
EXPECT_EQ(std::future_status::ready, did_ack.wait_for(std::chrono::milliseconds(100)));
}
// now, bump the epoch on the leader and make sure it quiesces the set
managers.at(mds_gid_t(1))->bump_epoch();
EXPECT_EQ(OK(), run_request([](auto& r) {
r.set_id = "set1";
r.await = sec(10);
}));
}