From a3052969bfcfd9329b2baf7883765642d09ff038 Mon Sep 17 00:00:00 2001 From: chunmei Date: Fri, 31 Mar 2023 02:49:33 +0000 Subject: [PATCH] crimson/osd: enable PGShardMapping access from all cores Previously, all accesses (incuding loookups) had to occur on core 0. Now that we want to be able to dispatch from all cores, we need PGShardManager to be accessible from all cores. To that end, we now proxy updates to core 0, but maintain local copies of the map so that cores can perform local lookups. Signed-off-by: chunmei --- src/crimson/osd/osd.cc | 14 +++-- src/crimson/osd/osd.h | 7 ++- src/crimson/osd/pg_map.h | 91 +++++++++++++++++++---------- src/crimson/osd/pg_shard_manager.cc | 13 +++-- src/crimson/osd/pg_shard_manager.h | 48 ++++++++------- src/crimson/osd/shard_services.h | 13 ++--- 6 files changed, 112 insertions(+), 74 deletions(-) diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index bf3a9e2d7c4..c7bae8b511b 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -357,13 +357,16 @@ seastar::future<> OSD::start() startup_time = ceph::mono_clock::now(); ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); return store.start().then([this] { - return osd_singleton_state.start_single( - whoami, std::ref(*cluster_msgr), std::ref(*public_msgr), - std::ref(*monc), std::ref(*mgrc) + return pg_to_shard_mappings.start(0, seastar::smp::count ).then([this] { + return osd_singleton_state.start_single( + whoami, std::ref(*cluster_msgr), std::ref(*public_msgr), + std::ref(*monc), std::ref(*mgrc)); + }).then([this] { ceph::mono_time startup_time = ceph::mono_clock::now(); return shard_services.start( std::ref(osd_singleton_state), + std::ref(pg_to_shard_mappings), whoami, startup_time, osd_singleton_state.local().perf, @@ -373,7 +376,8 @@ seastar::future<> OSD::start() return shard_dispatchers.start( std::ref(*this), whoami, - std::ref(store)); + std::ref(store), + std::ref(pg_to_shard_mappings)); }); }).then([this] { heartbeat.reset(new Heartbeat{ @@ -681,6 +685,8 @@ seastar::future<> OSD::stop() return shard_services.stop(); }).then([this] { return osd_singleton_state.stop(); + }).then([this] { + return pg_to_shard_mappings.stop(); }).then([fut=std::move(gate_close_fut)]() mutable { return std::move(fut); }).then([this] { diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 1093cab3410..4f811c059ad 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -69,8 +69,10 @@ public: ShardDispatcher( OSD& osd, int whoami, - crimson::os::FuturizedStore& store) - : pg_shard_manager(osd.osd_singleton_state, osd.shard_services), + crimson::os::FuturizedStore& store, + PGShardMapping& pg_to_shard_mapping) + : pg_shard_manager(osd.osd_singleton_state, + osd.shard_services, pg_to_shard_mapping), osd(osd), whoami(whoami), store(store) {} @@ -185,6 +187,7 @@ public: void handle_authentication(const EntityName& name, const AuthCapsInfo& caps) final; + seastar::sharded pg_to_shard_mappings; seastar::sharded osd_singleton_state; seastar::sharded shard_services; seastar::sharded shard_dispatchers; diff --git a/src/crimson/osd/pg_map.h b/src/crimson/osd/pg_map.h index f4b38ae45f6..3269de43497 100644 --- a/src/crimson/osd/pg_map.h +++ b/src/crimson/osd/pg_map.h @@ -21,9 +21,11 @@ class PG; /** * PGShardMapping * - * Maps pgs to shards. + * Maintains a mapping from spg_t to the core containing that PG. Internally, each + * core has a local copy of the mapping to enable core-local lookups. Updates + * are proxied to core 0, and the back out to all other cores -- see maybe_create_pg. */ -class PGShardMapping { +class PGShardMapping : public seastar::peering_sharded_service { public: /// Returns mapping if present, NULL_CORE otherwise core_id_t get_pg_mapping(spg_t pgid) { @@ -33,44 +35,69 @@ public: } /// Returns mapping for pgid, creates new one if it doesn't already exist - core_id_t maybe_create_pg(spg_t pgid, core_id_t core = NULL_CORE) { - auto [insert_iter, inserted] = pg_to_core.emplace(pgid, core); - if (!inserted) { - ceph_assert_always(insert_iter->second != NULL_CORE); + seastar::future maybe_create_pg( + spg_t pgid, + core_id_t core = NULL_CORE) { + auto find_iter = pg_to_core.find(pgid); + if (find_iter != pg_to_core.end()) { + ceph_assert_always(find_iter->second != NULL_CORE); if (core != NULL_CORE) { - ceph_assert_always(insert_iter->second == core); + ceph_assert_always(find_iter->second == core); } - return insert_iter->second; + return seastar::make_ready_future(find_iter->second); } else { - ceph_assert_always(core_to_num_pgs.size() > 0); - std::map::iterator core_iter; - if (core == NULL_CORE) { - core_iter = std::min_element( - core_to_num_pgs.begin(), - core_to_num_pgs.end(), - [](const auto &left, const auto &right) { - return left.second < right.second; + return container().invoke_on(0,[pgid, core] + (auto &primary_mapping) { + auto [insert_iter, inserted] = primary_mapping.pg_to_core.emplace(pgid, core); + ceph_assert_always(inserted); + ceph_assert_always(primary_mapping.core_to_num_pgs.size() > 0); + std::map::iterator core_iter; + if (core == NULL_CORE) { + core_iter = std::min_element( + primary_mapping.core_to_num_pgs.begin(), + primary_mapping.core_to_num_pgs.end(), + [](const auto &left, const auto &right) { + return left.second < right.second; + }); + } else { + core_iter = primary_mapping.core_to_num_pgs.find(core); + } + ceph_assert_always(primary_mapping.core_to_num_pgs.end() != core_iter); + insert_iter->second = core_iter->first; + core_iter->second++; + return primary_mapping.container().invoke_on_others( + [pgid = insert_iter->first, core = insert_iter->second] + (auto &other_mapping) { + ceph_assert_always(core != NULL_CORE); + auto [insert_iter, inserted] = other_mapping.pg_to_core.emplace(pgid, core); + ceph_assert_always(inserted); }); - } else { - core_iter = core_to_num_pgs.find(core); - } - ceph_assert_always(core_to_num_pgs.end() != core_iter); - insert_iter->second = core_iter->first; - core_iter->second++; - return insert_iter->second; + }).then([this, pgid] { + auto find_iter = pg_to_core.find(pgid); + return seastar::make_ready_future(find_iter->second); + }); } } /// Remove pgid - void remove_pg(spg_t pgid) { - auto iter = pg_to_core.find(pgid); - ceph_assert_always(iter != pg_to_core.end()); - ceph_assert_always(iter->second != NULL_CORE); - auto count_iter = core_to_num_pgs.find(iter->second); - ceph_assert_always(count_iter != core_to_num_pgs.end()); - ceph_assert_always(count_iter->second > 0); - --(count_iter->second); - pg_to_core.erase(iter); + seastar::future<> remove_pg(spg_t pgid) { + return container().invoke_on(0, [pgid](auto &primary_mapping) { + auto iter = primary_mapping.pg_to_core.find(pgid); + ceph_assert_always(iter != primary_mapping.pg_to_core.end()); + ceph_assert_always(iter->second != NULL_CORE); + auto count_iter = primary_mapping.core_to_num_pgs.find(iter->second); + ceph_assert_always(count_iter != primary_mapping.core_to_num_pgs.end()); + ceph_assert_always(count_iter->second > 0); + --(count_iter->second); + primary_mapping.pg_to_core.erase(iter); + return primary_mapping.container().invoke_on_others( + [pgid](auto &other_mapping) { + auto iter = other_mapping.pg_to_core.find(pgid); + ceph_assert_always(iter != other_mapping.pg_to_core.end()); + ceph_assert_always(iter->second != NULL_CORE); + other_mapping.pg_to_core.erase(iter); + }); + }); } size_t get_num_pgs() const { return pg_to_core.size(); } diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc index e0ef2374891..e586a40890a 100644 --- a/src/crimson/osd/pg_shard_manager.cc +++ b/src/crimson/osd/pg_shard_manager.cc @@ -23,12 +23,12 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store) auto[coll, shard_core] = coll_core; spg_t pgid; if (coll.is_pg(&pgid)) { - auto core = get_osd_singleton_state( - ).pg_to_shard_mapping.maybe_create_pg( - pgid, shard_core); - return with_remote_shard_state( - core, - [pgid]( + return pg_to_shard_mapping.maybe_create_pg( + pgid, shard_core + ).then([this, pgid] (auto core) { + return this->template with_remote_shard_state( + core, + [pgid]( PerShardState &per_shard_state, ShardServices &shard_services) { return shard_services.load_pg( @@ -39,6 +39,7 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store) return seastar::now(); }); }); + }); } else if (coll.is_temp(&pgid)) { logger().warn( "found temp collection on crimson osd, should be impossible: {}", diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index 9526ddcd05a..8333b1f483b 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -26,6 +26,7 @@ namespace crimson::osd { class PGShardManager { seastar::sharded &osd_singleton_state; seastar::sharded &shard_services; + PGShardMapping &pg_to_shard_mapping; #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ template \ @@ -48,9 +49,11 @@ public: PGShardManager( seastar::sharded &osd_singleton_state, - seastar::sharded &shard_services) + seastar::sharded &shard_services, + PGShardMapping &pg_to_shard_mapping) : osd_singleton_state(osd_singleton_state), - shard_services(shard_services) {} + shard_services(shard_services), + pg_to_shard_mapping(pg_to_shard_mapping) {} auto &get_osd_singleton_state() { ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); @@ -188,15 +191,15 @@ public: static_assert(T::can_create()); logger.debug("{}: can_create", *op); - auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg( - op->get_pgid()); - get_local_state().registry.remove_from_registry(*op); - return with_remote_shard_state_and_op( - core, std::move(op), - [](PerShardState &per_shard_state, - ShardServices &shard_services, - typename T::IRef op) { + return pg_to_shard_mapping.maybe_create_pg( + op->get_pgid() + ).then([this, op = std::move(op)](auto core) mutable { + return this->template with_remote_shard_state_and_op( + core, std::move(op), + [](PerShardState &per_shard_state, + ShardServices &shard_services, + typename T::IRef op) { per_shard_state.registry.add_to_registry(*op); auto &logger = crimson::get_logger(ceph_subsys_osd); auto &opref = *op; @@ -219,6 +222,7 @@ public: }) ).then([op=std::move(op)] {}); }); + }); } /// Runs opref on the appropriate core, waiting for pg as necessary @@ -232,15 +236,15 @@ public: static_assert(!T::can_create()); logger.debug("{}: !can_create", *op); - auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg( - op->get_pgid()); - get_local_state().registry.remove_from_registry(*op); - return with_remote_shard_state_and_op( - core, std::move(op), - [](PerShardState &per_shard_state, - ShardServices &shard_services, - typename T::IRef op) { + return pg_to_shard_mapping.maybe_create_pg( + op->get_pgid() + ).then([this, op = std::move(op)](auto core) mutable { + return this->template with_remote_shard_state_and_op( + core, std::move(op), + [](PerShardState &per_shard_state, + ShardServices &shard_services, + typename T::IRef op) { per_shard_state.registry.add_to_registry(*op); auto &logger = crimson::get_logger(ceph_subsys_osd); auto &opref = *op; @@ -260,6 +264,7 @@ public: }) ).then([op=std::move(op)] {}); }); + }); } seastar::future<> load_pgs(crimson::os::FuturizedStore& store); @@ -308,20 +313,19 @@ public: */ template void for_each_pgid(F &&f) const { - return get_osd_singleton_state().pg_to_shard_mapping.for_each_pgid( + return pg_to_shard_mapping.for_each_pgid( std::forward(f)); } auto get_num_pgs() const { - return get_osd_singleton_state().pg_to_shard_mapping.get_num_pgs(); + return pg_to_shard_mapping.get_num_pgs(); } seastar::future<> broadcast_map_to_pgs(epoch_t epoch); template auto with_pg(spg_t pgid, F &&f) { - core_id_t core = get_osd_singleton_state( - ).pg_to_shard_mapping.get_pg_mapping(pgid); + core_id_t core = pg_to_shard_mapping.get_pg_mapping(pgid); return with_remote_shard_state( core, [pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable { diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 34b3a32e34b..bf1b29ee4e0 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -282,9 +282,6 @@ private: void requeue_pg_temp(); seastar::future<> send_pg_temp(); - // TODO: add config to control mapping - PGShardMapping pg_to_shard_mapping{0, seastar::smp::count}; - std::set pg_created; seastar::future<> send_pg_created(pg_t pgid); seastar::future<> send_pg_created(); @@ -328,6 +325,7 @@ class ShardServices : public OSDMapService { PerShardState local_state; seastar::sharded &osd_singleton_state; + PGShardMapping& pg_to_shard_mapping; template auto with_singleton(F &&f, Args&&... args) { @@ -370,9 +368,11 @@ public: template ShardServices( seastar::sharded &osd_singleton_state, + PGShardMapping& pg_to_shard_mapping, PSSArgs&&... args) : local_state(std::forward(args)...), - osd_singleton_state(osd_singleton_state) {} + osd_singleton_state(osd_singleton_state), + pg_to_shard_mapping(pg_to_shard_mapping) {} FORWARD_TO_OSD_SINGLETON(send_to_osd) @@ -382,10 +382,7 @@ public: auto remove_pg(spg_t pgid) { local_state.pg_map.remove_pg(pgid); - return with_singleton( - [pgid](auto &osstate) { - osstate.pg_to_shard_mapping.remove_pg(pgid); - }); + return pg_to_shard_mapping.remove_pg(pgid); } crimson::common::CephContext *get_cct() {