mirror of
https://github.com/ceph/ceph
synced 2025-03-11 02:39:05 +00:00
Merge pull request #55296 from Matan-B/wip-matanb-crimson-pg-map-logs
crimson/osd/pg_map: Refactor Reviewed-by: Samuel Just <sjust@redhat.com> Reviewed-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
commit
f63c485e60
@ -2,20 +2,86 @@
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
|
||||
#include "crimson/osd/pg_map.h"
|
||||
|
||||
#include "crimson/common/log.h"
|
||||
#include "crimson/osd/pg.h"
|
||||
#include "common/Formatter.h"
|
||||
|
||||
namespace {
|
||||
seastar::logger& logger() {
|
||||
return crimson::get_logger(ceph_subsys_osd);
|
||||
}
|
||||
}
|
||||
SET_SUBSYS(osd);
|
||||
|
||||
using std::make_pair;
|
||||
|
||||
namespace crimson::osd {
|
||||
|
||||
seastar::future<core_id_t> PGShardMapping::get_or_create_pg_mapping(
|
||||
spg_t pgid,
|
||||
core_id_t core)
|
||||
{
|
||||
LOG_PREFIX(PGShardMapping::get_or_create_pg_mapping);
|
||||
auto find_iter = pg_to_core.find(pgid);
|
||||
if (find_iter != pg_to_core.end()) {
|
||||
ceph_assert_always(find_iter->second != NULL_CORE);
|
||||
if (core != NULL_CORE) {
|
||||
ceph_assert_always(find_iter->second == core);
|
||||
}
|
||||
return seastar::make_ready_future<core_id_t>(find_iter->second);
|
||||
} else {
|
||||
return container().invoke_on(0,[pgid, core, FNAME]
|
||||
(auto &primary_mapping) {
|
||||
auto [insert_iter, inserted] = primary_mapping.pg_to_core.emplace(pgid, core);
|
||||
ceph_assert_always(inserted);
|
||||
ceph_assert_always(primary_mapping.core_to_num_pgs.size() > 0);
|
||||
std::map<core_id_t, unsigned>::iterator core_iter;
|
||||
if (core == NULL_CORE) {
|
||||
core_iter = std::min_element(
|
||||
primary_mapping.core_to_num_pgs.begin(),
|
||||
primary_mapping.core_to_num_pgs.end(),
|
||||
[](const auto &left, const auto &right) {
|
||||
return left.second < right.second;
|
||||
});
|
||||
} else {
|
||||
core_iter = primary_mapping.core_to_num_pgs.find(core);
|
||||
}
|
||||
ceph_assert_always(primary_mapping.core_to_num_pgs.end() != core_iter);
|
||||
insert_iter->second = core_iter->first;
|
||||
core_iter->second++;
|
||||
DEBUG("mapping pg {} to core: {} with num_pgs of: {}",
|
||||
pgid, insert_iter->second, core_iter->second);
|
||||
return primary_mapping.container().invoke_on_others(
|
||||
[pgid = insert_iter->first, core = insert_iter->second]
|
||||
(auto &other_mapping) {
|
||||
ceph_assert_always(core != NULL_CORE);
|
||||
auto [insert_iter, inserted] = other_mapping.pg_to_core.emplace(pgid, core);
|
||||
ceph_assert_always(inserted);
|
||||
});
|
||||
}).then([this, pgid] {
|
||||
auto find_iter = pg_to_core.find(pgid);
|
||||
return seastar::make_ready_future<core_id_t>(find_iter->second);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
seastar::future<> PGShardMapping::remove_pg_mapping(spg_t pgid) {
|
||||
LOG_PREFIX(PGShardMapping::remove_pg_mapping);
|
||||
DEBUG("{}", pgid);
|
||||
return container().invoke_on(0, [pgid](auto &primary_mapping) {
|
||||
auto iter = primary_mapping.pg_to_core.find(pgid);
|
||||
ceph_assert_always(iter != primary_mapping.pg_to_core.end());
|
||||
ceph_assert_always(iter->second != NULL_CORE);
|
||||
auto count_iter = primary_mapping.core_to_num_pgs.find(iter->second);
|
||||
ceph_assert_always(count_iter != primary_mapping.core_to_num_pgs.end());
|
||||
ceph_assert_always(count_iter->second > 0);
|
||||
--(count_iter->second);
|
||||
primary_mapping.pg_to_core.erase(iter);
|
||||
return primary_mapping.container().invoke_on_others(
|
||||
[pgid](auto &other_mapping) {
|
||||
auto iter = other_mapping.pg_to_core.find(pgid);
|
||||
ceph_assert_always(iter != other_mapping.pg_to_core.end());
|
||||
ceph_assert_always(iter->second != NULL_CORE);
|
||||
other_mapping.pg_to_core.erase(iter);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
PGMap::PGCreationState::PGCreationState(spg_t pgid) : pgid(pgid) {}
|
||||
PGMap::PGCreationState::~PGCreationState() {}
|
||||
|
||||
@ -56,7 +122,8 @@ Ref<PG> PGMap::get_pg(spg_t pgid)
|
||||
|
||||
void PGMap::set_creating(spg_t pgid)
|
||||
{
|
||||
logger().debug("Creating {}", pgid);
|
||||
LOG_PREFIX(PGMap::set_creating);
|
||||
DEBUG("Creating {}", pgid);
|
||||
ceph_assert(pgs.count(pgid) == 0);
|
||||
auto pg = pgs_creating.find(pgid);
|
||||
ceph_assert(pg != pgs_creating.end());
|
||||
@ -66,7 +133,8 @@ void PGMap::set_creating(spg_t pgid)
|
||||
|
||||
void PGMap::pg_created(spg_t pgid, Ref<PG> pg)
|
||||
{
|
||||
logger().debug("Created {}", pgid);
|
||||
LOG_PREFIX(PGMap::pg_created);
|
||||
DEBUG("Created {}", pgid);
|
||||
ceph_assert(!pgs.count(pgid));
|
||||
pgs.emplace(pgid, pg);
|
||||
|
||||
@ -85,7 +153,8 @@ void PGMap::pg_loaded(spg_t pgid, Ref<PG> pg)
|
||||
|
||||
void PGMap::pg_creation_canceled(spg_t pgid)
|
||||
{
|
||||
logger().debug("PGMap::pg_creation_canceled: {}", pgid);
|
||||
LOG_PREFIX(PGMap::pg_creation_canceled);
|
||||
DEBUG("{}", pgid);
|
||||
ceph_assert(!pgs.count(pgid));
|
||||
|
||||
auto creating_iter = pgs_creating.find(pgid);
|
||||
|
@ -23,7 +23,7 @@ class PG;
|
||||
*
|
||||
* Maintains a mapping from spg_t to the core containing that PG. Internally, each
|
||||
* core has a local copy of the mapping to enable core-local lookups. Updates
|
||||
* are proxied to core 0, and the back out to all other cores -- see maybe_create_pg.
|
||||
* are proxied to core 0, and the back out to all other cores -- see get_or_create_pg_mapping.
|
||||
*/
|
||||
class PGShardMapping : public seastar::peering_sharded_service<PGShardMapping> {
|
||||
public:
|
||||
@ -35,70 +35,12 @@ public:
|
||||
}
|
||||
|
||||
/// Returns mapping for pgid, creates new one if it doesn't already exist
|
||||
seastar::future<core_id_t> maybe_create_pg(
|
||||
seastar::future<core_id_t> get_or_create_pg_mapping(
|
||||
spg_t pgid,
|
||||
core_id_t core = NULL_CORE) {
|
||||
auto find_iter = pg_to_core.find(pgid);
|
||||
if (find_iter != pg_to_core.end()) {
|
||||
ceph_assert_always(find_iter->second != NULL_CORE);
|
||||
if (core != NULL_CORE) {
|
||||
ceph_assert_always(find_iter->second == core);
|
||||
}
|
||||
return seastar::make_ready_future<core_id_t>(find_iter->second);
|
||||
} else {
|
||||
return container().invoke_on(0,[pgid, core]
|
||||
(auto &primary_mapping) {
|
||||
auto [insert_iter, inserted] = primary_mapping.pg_to_core.emplace(pgid, core);
|
||||
ceph_assert_always(inserted);
|
||||
ceph_assert_always(primary_mapping.core_to_num_pgs.size() > 0);
|
||||
std::map<core_id_t, unsigned>::iterator core_iter;
|
||||
if (core == NULL_CORE) {
|
||||
core_iter = std::min_element(
|
||||
primary_mapping.core_to_num_pgs.begin(),
|
||||
primary_mapping.core_to_num_pgs.end(),
|
||||
[](const auto &left, const auto &right) {
|
||||
return left.second < right.second;
|
||||
});
|
||||
} else {
|
||||
core_iter = primary_mapping.core_to_num_pgs.find(core);
|
||||
}
|
||||
ceph_assert_always(primary_mapping.core_to_num_pgs.end() != core_iter);
|
||||
insert_iter->second = core_iter->first;
|
||||
core_iter->second++;
|
||||
return primary_mapping.container().invoke_on_others(
|
||||
[pgid = insert_iter->first, core = insert_iter->second]
|
||||
(auto &other_mapping) {
|
||||
ceph_assert_always(core != NULL_CORE);
|
||||
auto [insert_iter, inserted] = other_mapping.pg_to_core.emplace(pgid, core);
|
||||
ceph_assert_always(inserted);
|
||||
});
|
||||
}).then([this, pgid] {
|
||||
auto find_iter = pg_to_core.find(pgid);
|
||||
return seastar::make_ready_future<core_id_t>(find_iter->second);
|
||||
});
|
||||
}
|
||||
}
|
||||
core_id_t core = NULL_CORE);
|
||||
|
||||
/// Remove pgid
|
||||
seastar::future<> remove_pg(spg_t pgid) {
|
||||
return container().invoke_on(0, [pgid](auto &primary_mapping) {
|
||||
auto iter = primary_mapping.pg_to_core.find(pgid);
|
||||
ceph_assert_always(iter != primary_mapping.pg_to_core.end());
|
||||
ceph_assert_always(iter->second != NULL_CORE);
|
||||
auto count_iter = primary_mapping.core_to_num_pgs.find(iter->second);
|
||||
ceph_assert_always(count_iter != primary_mapping.core_to_num_pgs.end());
|
||||
ceph_assert_always(count_iter->second > 0);
|
||||
--(count_iter->second);
|
||||
primary_mapping.pg_to_core.erase(iter);
|
||||
return primary_mapping.container().invoke_on_others(
|
||||
[pgid](auto &other_mapping) {
|
||||
auto iter = other_mapping.pg_to_core.find(pgid);
|
||||
ceph_assert_always(iter != other_mapping.pg_to_core.end());
|
||||
ceph_assert_always(iter->second != NULL_CORE);
|
||||
other_mapping.pg_to_core.erase(iter);
|
||||
});
|
||||
});
|
||||
}
|
||||
/// Remove pgid mapping
|
||||
seastar::future<> remove_pg_mapping(spg_t pgid);
|
||||
|
||||
size_t get_num_pgs() const { return pg_to_core.size(); }
|
||||
|
||||
|
@ -23,7 +23,7 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store)
|
||||
auto[coll, shard_core] = coll_core;
|
||||
spg_t pgid;
|
||||
if (coll.is_pg(&pgid)) {
|
||||
return get_pg_to_shard_mapping().maybe_create_pg(
|
||||
return get_pg_to_shard_mapping().get_or_create_pg_mapping(
|
||||
pgid, shard_core
|
||||
).then([this, pgid] (auto core) {
|
||||
return this->template with_remote_shard_state(
|
||||
|
@ -376,7 +376,7 @@ public:
|
||||
return opref.template enter_stage<>(
|
||||
opref.get_connection_pipeline().get_pg_mapping);
|
||||
}).then([this, &opref] {
|
||||
return get_pg_to_shard_mapping().maybe_create_pg(opref.get_pgid());
|
||||
return get_pg_to_shard_mapping().get_or_create_pg_mapping(opref.get_pgid());
|
||||
}).then_wrapped([this, &logger, op=std::move(op)](auto fut) mutable {
|
||||
if (unlikely(fut.failed())) {
|
||||
logger.error("{}: failed before with_pg", *op);
|
||||
|
@ -404,7 +404,7 @@ public:
|
||||
|
||||
auto remove_pg(spg_t pgid) {
|
||||
local_state.pg_map.remove_pg(pgid);
|
||||
return pg_to_shard_mapping.remove_pg(pgid);
|
||||
return pg_to_shard_mapping.remove_pg_mapping(pgid);
|
||||
}
|
||||
|
||||
crimson::common::CephContext *get_cct() {
|
||||
|
Loading…
Reference in New Issue
Block a user