mirror of
https://github.com/ceph/ceph
synced 2025-02-23 19:17:37 +00:00
crimson/osd: enable PGShardMapping access from all cores
Previously, all accesses (incuding loookups) had to occur on core 0. Now that we want to be able to dispatch from all cores, we need PGShardManager to be accessible from all cores. To that end, we now proxy updates to core 0, but maintain local copies of the map so that cores can perform local lookups. Signed-off-by: chunmei <chunmei.liu@intel.com>
This commit is contained in:
parent
5f82fbbc72
commit
a3052969bf
@ -357,13 +357,16 @@ seastar::future<> OSD::start()
|
||||
startup_time = ceph::mono_clock::now();
|
||||
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
|
||||
return store.start().then([this] {
|
||||
return osd_singleton_state.start_single(
|
||||
whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
|
||||
std::ref(*monc), std::ref(*mgrc)
|
||||
return pg_to_shard_mappings.start(0, seastar::smp::count
|
||||
).then([this] {
|
||||
return osd_singleton_state.start_single(
|
||||
whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
|
||||
std::ref(*monc), std::ref(*mgrc));
|
||||
}).then([this] {
|
||||
ceph::mono_time startup_time = ceph::mono_clock::now();
|
||||
return shard_services.start(
|
||||
std::ref(osd_singleton_state),
|
||||
std::ref(pg_to_shard_mappings),
|
||||
whoami,
|
||||
startup_time,
|
||||
osd_singleton_state.local().perf,
|
||||
@ -373,7 +376,8 @@ seastar::future<> OSD::start()
|
||||
return shard_dispatchers.start(
|
||||
std::ref(*this),
|
||||
whoami,
|
||||
std::ref(store));
|
||||
std::ref(store),
|
||||
std::ref(pg_to_shard_mappings));
|
||||
});
|
||||
}).then([this] {
|
||||
heartbeat.reset(new Heartbeat{
|
||||
@ -681,6 +685,8 @@ seastar::future<> OSD::stop()
|
||||
return shard_services.stop();
|
||||
}).then([this] {
|
||||
return osd_singleton_state.stop();
|
||||
}).then([this] {
|
||||
return pg_to_shard_mappings.stop();
|
||||
}).then([fut=std::move(gate_close_fut)]() mutable {
|
||||
return std::move(fut);
|
||||
}).then([this] {
|
||||
|
@ -69,8 +69,10 @@ public:
|
||||
ShardDispatcher(
|
||||
OSD& osd,
|
||||
int whoami,
|
||||
crimson::os::FuturizedStore& store)
|
||||
: pg_shard_manager(osd.osd_singleton_state, osd.shard_services),
|
||||
crimson::os::FuturizedStore& store,
|
||||
PGShardMapping& pg_to_shard_mapping)
|
||||
: pg_shard_manager(osd.osd_singleton_state,
|
||||
osd.shard_services, pg_to_shard_mapping),
|
||||
osd(osd),
|
||||
whoami(whoami),
|
||||
store(store) {}
|
||||
@ -185,6 +187,7 @@ public:
|
||||
void handle_authentication(const EntityName& name,
|
||||
const AuthCapsInfo& caps) final;
|
||||
|
||||
seastar::sharded<PGShardMapping> pg_to_shard_mappings;
|
||||
seastar::sharded<OSDSingletonState> osd_singleton_state;
|
||||
seastar::sharded<ShardServices> shard_services;
|
||||
seastar::sharded<ShardDispatcher> shard_dispatchers;
|
||||
|
@ -21,9 +21,11 @@ class PG;
|
||||
/**
|
||||
* PGShardMapping
|
||||
*
|
||||
* Maps pgs to shards.
|
||||
* Maintains a mapping from spg_t to the core containing that PG. Internally, each
|
||||
* core has a local copy of the mapping to enable core-local lookups. Updates
|
||||
* are proxied to core 0, and the back out to all other cores -- see maybe_create_pg.
|
||||
*/
|
||||
class PGShardMapping {
|
||||
class PGShardMapping : public seastar::peering_sharded_service<PGShardMapping> {
|
||||
public:
|
||||
/// Returns mapping if present, NULL_CORE otherwise
|
||||
core_id_t get_pg_mapping(spg_t pgid) {
|
||||
@ -33,44 +35,69 @@ public:
|
||||
}
|
||||
|
||||
/// Returns mapping for pgid, creates new one if it doesn't already exist
|
||||
core_id_t maybe_create_pg(spg_t pgid, core_id_t core = NULL_CORE) {
|
||||
auto [insert_iter, inserted] = pg_to_core.emplace(pgid, core);
|
||||
if (!inserted) {
|
||||
ceph_assert_always(insert_iter->second != NULL_CORE);
|
||||
seastar::future<core_id_t> maybe_create_pg(
|
||||
spg_t pgid,
|
||||
core_id_t core = NULL_CORE) {
|
||||
auto find_iter = pg_to_core.find(pgid);
|
||||
if (find_iter != pg_to_core.end()) {
|
||||
ceph_assert_always(find_iter->second != NULL_CORE);
|
||||
if (core != NULL_CORE) {
|
||||
ceph_assert_always(insert_iter->second == core);
|
||||
ceph_assert_always(find_iter->second == core);
|
||||
}
|
||||
return insert_iter->second;
|
||||
return seastar::make_ready_future<core_id_t>(find_iter->second);
|
||||
} else {
|
||||
ceph_assert_always(core_to_num_pgs.size() > 0);
|
||||
std::map<core_id_t, unsigned>::iterator core_iter;
|
||||
if (core == NULL_CORE) {
|
||||
core_iter = std::min_element(
|
||||
core_to_num_pgs.begin(),
|
||||
core_to_num_pgs.end(),
|
||||
[](const auto &left, const auto &right) {
|
||||
return left.second < right.second;
|
||||
return container().invoke_on(0,[pgid, core]
|
||||
(auto &primary_mapping) {
|
||||
auto [insert_iter, inserted] = primary_mapping.pg_to_core.emplace(pgid, core);
|
||||
ceph_assert_always(inserted);
|
||||
ceph_assert_always(primary_mapping.core_to_num_pgs.size() > 0);
|
||||
std::map<core_id_t, unsigned>::iterator core_iter;
|
||||
if (core == NULL_CORE) {
|
||||
core_iter = std::min_element(
|
||||
primary_mapping.core_to_num_pgs.begin(),
|
||||
primary_mapping.core_to_num_pgs.end(),
|
||||
[](const auto &left, const auto &right) {
|
||||
return left.second < right.second;
|
||||
});
|
||||
} else {
|
||||
core_iter = primary_mapping.core_to_num_pgs.find(core);
|
||||
}
|
||||
ceph_assert_always(primary_mapping.core_to_num_pgs.end() != core_iter);
|
||||
insert_iter->second = core_iter->first;
|
||||
core_iter->second++;
|
||||
return primary_mapping.container().invoke_on_others(
|
||||
[pgid = insert_iter->first, core = insert_iter->second]
|
||||
(auto &other_mapping) {
|
||||
ceph_assert_always(core != NULL_CORE);
|
||||
auto [insert_iter, inserted] = other_mapping.pg_to_core.emplace(pgid, core);
|
||||
ceph_assert_always(inserted);
|
||||
});
|
||||
} else {
|
||||
core_iter = core_to_num_pgs.find(core);
|
||||
}
|
||||
ceph_assert_always(core_to_num_pgs.end() != core_iter);
|
||||
insert_iter->second = core_iter->first;
|
||||
core_iter->second++;
|
||||
return insert_iter->second;
|
||||
}).then([this, pgid] {
|
||||
auto find_iter = pg_to_core.find(pgid);
|
||||
return seastar::make_ready_future<core_id_t>(find_iter->second);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove pgid
|
||||
void remove_pg(spg_t pgid) {
|
||||
auto iter = pg_to_core.find(pgid);
|
||||
ceph_assert_always(iter != pg_to_core.end());
|
||||
ceph_assert_always(iter->second != NULL_CORE);
|
||||
auto count_iter = core_to_num_pgs.find(iter->second);
|
||||
ceph_assert_always(count_iter != core_to_num_pgs.end());
|
||||
ceph_assert_always(count_iter->second > 0);
|
||||
--(count_iter->second);
|
||||
pg_to_core.erase(iter);
|
||||
seastar::future<> remove_pg(spg_t pgid) {
|
||||
return container().invoke_on(0, [pgid](auto &primary_mapping) {
|
||||
auto iter = primary_mapping.pg_to_core.find(pgid);
|
||||
ceph_assert_always(iter != primary_mapping.pg_to_core.end());
|
||||
ceph_assert_always(iter->second != NULL_CORE);
|
||||
auto count_iter = primary_mapping.core_to_num_pgs.find(iter->second);
|
||||
ceph_assert_always(count_iter != primary_mapping.core_to_num_pgs.end());
|
||||
ceph_assert_always(count_iter->second > 0);
|
||||
--(count_iter->second);
|
||||
primary_mapping.pg_to_core.erase(iter);
|
||||
return primary_mapping.container().invoke_on_others(
|
||||
[pgid](auto &other_mapping) {
|
||||
auto iter = other_mapping.pg_to_core.find(pgid);
|
||||
ceph_assert_always(iter != other_mapping.pg_to_core.end());
|
||||
ceph_assert_always(iter->second != NULL_CORE);
|
||||
other_mapping.pg_to_core.erase(iter);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
size_t get_num_pgs() const { return pg_to_core.size(); }
|
||||
|
@ -23,12 +23,12 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store)
|
||||
auto[coll, shard_core] = coll_core;
|
||||
spg_t pgid;
|
||||
if (coll.is_pg(&pgid)) {
|
||||
auto core = get_osd_singleton_state(
|
||||
).pg_to_shard_mapping.maybe_create_pg(
|
||||
pgid, shard_core);
|
||||
return with_remote_shard_state(
|
||||
core,
|
||||
[pgid](
|
||||
return pg_to_shard_mapping.maybe_create_pg(
|
||||
pgid, shard_core
|
||||
).then([this, pgid] (auto core) {
|
||||
return this->template with_remote_shard_state(
|
||||
core,
|
||||
[pgid](
|
||||
PerShardState &per_shard_state,
|
||||
ShardServices &shard_services) {
|
||||
return shard_services.load_pg(
|
||||
@ -39,6 +39,7 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store)
|
||||
return seastar::now();
|
||||
});
|
||||
});
|
||||
});
|
||||
} else if (coll.is_temp(&pgid)) {
|
||||
logger().warn(
|
||||
"found temp collection on crimson osd, should be impossible: {}",
|
||||
|
@ -26,6 +26,7 @@ namespace crimson::osd {
|
||||
class PGShardManager {
|
||||
seastar::sharded<OSDSingletonState> &osd_singleton_state;
|
||||
seastar::sharded<ShardServices> &shard_services;
|
||||
PGShardMapping &pg_to_shard_mapping;
|
||||
|
||||
#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
|
||||
template <typename... Args> \
|
||||
@ -48,9 +49,11 @@ public:
|
||||
|
||||
PGShardManager(
|
||||
seastar::sharded<OSDSingletonState> &osd_singleton_state,
|
||||
seastar::sharded<ShardServices> &shard_services)
|
||||
seastar::sharded<ShardServices> &shard_services,
|
||||
PGShardMapping &pg_to_shard_mapping)
|
||||
: osd_singleton_state(osd_singleton_state),
|
||||
shard_services(shard_services) {}
|
||||
shard_services(shard_services),
|
||||
pg_to_shard_mapping(pg_to_shard_mapping) {}
|
||||
|
||||
auto &get_osd_singleton_state() {
|
||||
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
|
||||
@ -188,15 +191,15 @@ public:
|
||||
static_assert(T::can_create());
|
||||
logger.debug("{}: can_create", *op);
|
||||
|
||||
auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
|
||||
op->get_pgid());
|
||||
|
||||
get_local_state().registry.remove_from_registry(*op);
|
||||
return with_remote_shard_state_and_op<T>(
|
||||
core, std::move(op),
|
||||
[](PerShardState &per_shard_state,
|
||||
ShardServices &shard_services,
|
||||
typename T::IRef op) {
|
||||
return pg_to_shard_mapping.maybe_create_pg(
|
||||
op->get_pgid()
|
||||
).then([this, op = std::move(op)](auto core) mutable {
|
||||
return this->template with_remote_shard_state_and_op<T>(
|
||||
core, std::move(op),
|
||||
[](PerShardState &per_shard_state,
|
||||
ShardServices &shard_services,
|
||||
typename T::IRef op) {
|
||||
per_shard_state.registry.add_to_registry(*op);
|
||||
auto &logger = crimson::get_logger(ceph_subsys_osd);
|
||||
auto &opref = *op;
|
||||
@ -219,6 +222,7 @@ public:
|
||||
})
|
||||
).then([op=std::move(op)] {});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Runs opref on the appropriate core, waiting for pg as necessary
|
||||
@ -232,15 +236,15 @@ public:
|
||||
static_assert(!T::can_create());
|
||||
logger.debug("{}: !can_create", *op);
|
||||
|
||||
auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
|
||||
op->get_pgid());
|
||||
|
||||
get_local_state().registry.remove_from_registry(*op);
|
||||
return with_remote_shard_state_and_op<T>(
|
||||
core, std::move(op),
|
||||
[](PerShardState &per_shard_state,
|
||||
ShardServices &shard_services,
|
||||
typename T::IRef op) {
|
||||
return pg_to_shard_mapping.maybe_create_pg(
|
||||
op->get_pgid()
|
||||
).then([this, op = std::move(op)](auto core) mutable {
|
||||
return this->template with_remote_shard_state_and_op<T>(
|
||||
core, std::move(op),
|
||||
[](PerShardState &per_shard_state,
|
||||
ShardServices &shard_services,
|
||||
typename T::IRef op) {
|
||||
per_shard_state.registry.add_to_registry(*op);
|
||||
auto &logger = crimson::get_logger(ceph_subsys_osd);
|
||||
auto &opref = *op;
|
||||
@ -260,6 +264,7 @@ public:
|
||||
})
|
||||
).then([op=std::move(op)] {});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<> load_pgs(crimson::os::FuturizedStore& store);
|
||||
@ -308,20 +313,19 @@ public:
|
||||
*/
|
||||
template <typename F>
|
||||
void for_each_pgid(F &&f) const {
|
||||
return get_osd_singleton_state().pg_to_shard_mapping.for_each_pgid(
|
||||
return pg_to_shard_mapping.for_each_pgid(
|
||||
std::forward<F>(f));
|
||||
}
|
||||
|
||||
auto get_num_pgs() const {
|
||||
return get_osd_singleton_state().pg_to_shard_mapping.get_num_pgs();
|
||||
return pg_to_shard_mapping.get_num_pgs();
|
||||
}
|
||||
|
||||
seastar::future<> broadcast_map_to_pgs(epoch_t epoch);
|
||||
|
||||
template <typename F>
|
||||
auto with_pg(spg_t pgid, F &&f) {
|
||||
core_id_t core = get_osd_singleton_state(
|
||||
).pg_to_shard_mapping.get_pg_mapping(pgid);
|
||||
core_id_t core = pg_to_shard_mapping.get_pg_mapping(pgid);
|
||||
return with_remote_shard_state(
|
||||
core,
|
||||
[pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable {
|
||||
|
@ -282,9 +282,6 @@ private:
|
||||
void requeue_pg_temp();
|
||||
seastar::future<> send_pg_temp();
|
||||
|
||||
// TODO: add config to control mapping
|
||||
PGShardMapping pg_to_shard_mapping{0, seastar::smp::count};
|
||||
|
||||
std::set<pg_t> pg_created;
|
||||
seastar::future<> send_pg_created(pg_t pgid);
|
||||
seastar::future<> send_pg_created();
|
||||
@ -328,6 +325,7 @@ class ShardServices : public OSDMapService {
|
||||
|
||||
PerShardState local_state;
|
||||
seastar::sharded<OSDSingletonState> &osd_singleton_state;
|
||||
PGShardMapping& pg_to_shard_mapping;
|
||||
|
||||
template <typename F, typename... Args>
|
||||
auto with_singleton(F &&f, Args&&... args) {
|
||||
@ -370,9 +368,11 @@ public:
|
||||
template <typename... PSSArgs>
|
||||
ShardServices(
|
||||
seastar::sharded<OSDSingletonState> &osd_singleton_state,
|
||||
PGShardMapping& pg_to_shard_mapping,
|
||||
PSSArgs&&... args)
|
||||
: local_state(std::forward<PSSArgs>(args)...),
|
||||
osd_singleton_state(osd_singleton_state) {}
|
||||
osd_singleton_state(osd_singleton_state),
|
||||
pg_to_shard_mapping(pg_to_shard_mapping) {}
|
||||
|
||||
FORWARD_TO_OSD_SINGLETON(send_to_osd)
|
||||
|
||||
@ -382,10 +382,7 @@ public:
|
||||
|
||||
auto remove_pg(spg_t pgid) {
|
||||
local_state.pg_map.remove_pg(pgid);
|
||||
return with_singleton(
|
||||
[pgid](auto &osstate) {
|
||||
osstate.pg_to_shard_mapping.remove_pg(pgid);
|
||||
});
|
||||
return pg_to_shard_mapping.remove_pg(pgid);
|
||||
}
|
||||
|
||||
crimson::common::CephContext *get_cct() {
|
||||
|
Loading…
Reference in New Issue
Block a user