crimson/osd: add more peering facilities

* add PG::should_restart_peering() for telling if we should start
  a peering at seeing a new mapping
* add PG::start_peering_interval() for starting a peering
* add PG::activate() for activating a PG
* add PG::on_activated() to be called once a PG is activated
* add PG::send_to_osd() for sending message to given OSD

Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2019-03-04 10:58:12 +08:00
parent 673473208a
commit 34570c9427
2 changed files with 276 additions and 0 deletions

View File

@ -543,6 +543,226 @@ void PG::clear_primary_state()
peer_activated.clear();
}
bool PG::should_restart_peering(int new_up_primary,
int new_acting_primary,
const std::vector<int>& new_up,
const std::vector<int>& new_acting,
cached_map_t last_map,
cached_map_t osd_map) const
{
auto pgid = info.pgid.pgid;
auto pool = last_map->get_pg_pool(pgid.pool());
if (!pool) {
return false;
}
auto new_pool = osd_map->get_pg_pool(pgid.pool());
if (!new_pool) {
return true;
}
if (PastIntervals::is_new_interval(
primary.osd,
new_acting_primary,
acting,
new_acting,
up_primary.osd,
new_up_primary,
up,
new_up,
pool->size,
new_pool->size,
pool->min_size,
new_pool->min_size,
pool->get_pg_num(),
new_pool->get_pg_num(),
pool->get_pg_num_pending(),
new_pool->get_pg_num_pending(),
last_map->test_flag(CEPH_OSDMAP_SORTBITWISE),
osd_map->test_flag(CEPH_OSDMAP_SORTBITWISE),
last_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
osd_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
pgid)) {
logger().info("new interval new_up {} new_acting {}",
new_up, new_acting);
return true;
}
if (!last_map->is_up(whoami.osd) && osd_map->is_up(whoami.osd)) {
logger().info(" osd transitioned from down -> up");
return true;
}
return false;
}
template<class T>
bool compare_n_set(T& v, const T& new_v)
{
if (v != new_v) {
v = new_v;
return true;
} else {
return false;
}
}
void PG::start_peering_interval(int new_up_primary,
int new_acting_primary,
const std::vector<int>& new_up,
const std::vector<int>& new_acting,
cached_map_t last_map)
{
// todo
update_last_peering_reset();
auto old_acting_primary = primary;
auto old_acting = std::move(acting);
auto old_up_primary = up_primary;
auto old_up = std::move(up);
update_primary_state(new_up, new_up_primary,
new_acting, new_acting_primary);
if (compare_n_set(info.stats.up, up) +
compare_n_set(info.stats.up_primary, up_primary.osd) +
compare_n_set(info.stats.acting, acting) +
compare_n_set(info.stats.acting_primary, primary.osd)) {
info.stats.mapping_epoch = osdmap->get_epoch();
}
if (old_up_primary != up_primary || old_up != up) {
info.history.same_up_since = osdmap->get_epoch();
}
// this comparison includes primary rank via pg_shard_t
if (old_acting_primary != get_primary()) {
info.history.same_primary_since = osdmap->get_epoch();
}
// todo: always start a new interval
info.history.same_interval_since = osdmap->get_epoch();
// This will now be remapped during a backfill in cases
// that it would not have been before.
if (up != acting) {
set_state(PG_STATE_REMAPPED);
} else {
clear_state(PG_STATE_REMAPPED);
}
// deactivate.
clear_state(PG_STATE_ACTIVE);
clear_state(PG_STATE_PEERED);
clear_state(PG_STATE_DOWN);
acting_recovery_backfill.clear();
// should we tell the primary we are here?
should_notify_primary = !is_primary();
}
void PG::activate(epoch_t activation_epoch)
{
clear_state(PG_STATE_DOWN);
if (is_primary()) {
// only update primary last_epoch_started if we will go active
if (acting.size() >= pool.min_size) {
info.last_epoch_started = activation_epoch;
info.last_interval_started = info.history.same_interval_since;
}
} else if (is_acting(whoami)) {
// update last_epoch_started on acting replica to whatever the primary sent
// unless it's smaller (could happen if we are going peered rather than
// active, see doc/dev/osd_internals/last_epoch_started.rst)
if (info.last_epoch_started < activation_epoch) {
info.last_epoch_started = activation_epoch;
info.last_interval_started = info.history.same_interval_since;
}
}
if (is_primary()) {
// start up replicas
seastar::do_for_each(
acting_recovery_backfill.begin(),
acting_recovery_backfill.end(),
[this](pg_shard_t peer) { return activate_peer(peer); });
set_state(PG_STATE_ACTIVATING);
} else {
// todo: write/commit pg log, activate myself, and then tell primary
on_activated();
pg_notify_t notify{get_primary().shard,
whoami.shard,
get_osdmap_epoch(),
get_osdmap_epoch(),
info};
auto m = make_message<MOSDPGInfo>(
get_osdmap_epoch(),
MOSDPGInfo::pg_list_t{make_pair(std::move(notify), PastIntervals{})});
send_to_osd(get_primary().osd, std::move(m), get_osdmap_epoch());
}
// todo:
info.last_complete = info.last_update;
update_need_up_thru();
}
void PG::on_activated()
{
if (acting.size() >= pool.min_size) {
set_state(PG_STATE_ACTIVE);
} else {
set_state(PG_STATE_PEERED);
}
}
seastar::future<> PG::activate_peer(pg_shard_t peer)
{
if (peer == whoami) {
// todo: write/commit pg log
peer_activated.insert(whoami);
return seastar::now();
}
auto& pi = peer_info[peer];
MOSDPGLog* m = nullptr;
if (pi.last_update == info.last_update) {
// empty log
logger().info("activate peer osd.{} is up to date, "
"but sending pg_log anyway", peer);
m = new MOSDPGLog{peer.shard,
whoami.shard,
get_osdmap_epoch(),
get_info(),
get_last_peering_reset()};
} else if (pi.last_backfill.is_min()) {
logger().info("starting backfill to osd.{} from ({},{}] {} to {}", peer,
pi.log_tail, pi.last_update,
pi.last_backfill, info.last_update);
// backfill
pi.last_update = info.last_update;
pi.last_complete = info.last_update;
pi.last_epoch_started = info.last_epoch_started;
pi.last_interval_started = info.last_interval_started;
pi.history = info.history;
pi.hit_set = info.hit_set;
pi.stats.stats.clear();
pi.purged_snaps = info.purged_snaps;
m = new MOSDPGLog{peer.shard,
whoami.shard,
get_osdmap_epoch(),
pi,
get_last_peering_reset()};
} else {
// catch up
logger().info("send missing log to osd.{}", peer);
m = new MOSDPGLog{peer.shard,
whoami.shard,
get_osdmap_epoch(),
get_info(),
get_last_peering_reset()};
// todo. send pg_log
pi.last_update = info.last_update;
}
return send_to_osd(peer.osd, Ref<Message>{m, false}, get_osdmap_epoch());
}
void PG::maybe_mark_clean()
{
if (actingset.size() == osdmap->get_pg_size(pgid.pgid)) {
set_state(PG_STATE_CLEAN);
info.history.last_epoch_clean = get_osdmap_epoch();
info.history.last_interval_clean = info.history.same_interval_since;
}
}
seastar::future<> PG::do_peering_event(std::unique_ptr<PGPeeringEvent> evt)
{
// todo
@ -578,3 +798,41 @@ std::ostream& operator<<(std::ostream& os, const PG& pg)
pg.print(os);
return os;
}
seastar::future<> PG::send_to_osd(int peer, Ref<Message> m, epoch_t from_epoch)
{
if (osdmap->is_down(peer) || osdmap->get_info(peer).up_from > from_epoch) {
return seastar::now();
} else {
return msgr.connect(osdmap->get_cluster_addrs(peer).legacy_addr(),
CEPH_ENTITY_TYPE_OSD)
.then([m, this] (auto xconn) {
return (*xconn)->send(m);
});
}
}
seastar::future<> PG::share_pg_info()
{
return seastar::do_for_each(
acting_recovery_backfill.begin(),
acting_recovery_backfill.end(),
[this](pg_shard_t peer) {
if (peer == whoami) return seastar::now();
if (auto pi = peer_info.find(peer); pi != peer_info.end()) {
pi->second.last_epoch_started = info.last_epoch_started;
pi->second.last_interval_started = info.last_interval_started;
pi->second.history.merge(info.history);
}
pg_notify_t notify{peer.shard,
whoami.shard,
get_osdmap_epoch(),
get_osdmap_epoch(),
info};
auto m = make_message<MOSDPGInfo>(
get_osdmap_epoch(),
MOSDPGInfo::pg_list_t{make_pair(std::move(notify),
past_intervals)});
return send_to_osd(peer.osd, m, get_osdmap_epoch());
});
}

View File

@ -83,13 +83,31 @@ public:
bool is_last_activated_peer(pg_shard_t peer);
void clear_primary_state();
bool should_restart_peering(int new_up_primary,
int new_acting_primary,
const std::vector<int>& new_up,
const std::vector<int>& new_acting,
cached_map_t last_map,
cached_map_t osd_map) const;
void start_peering_interval(int new_up_primary,
int new_acting_primary,
const std::vector<int>& new_up,
const std::vector<int>& new_acting,
cached_map_t last_map);
void activate(epoch_t activation_epoch);
void on_activated();
void maybe_mark_clean();
seastar::future<> do_peering_event(std::unique_ptr<PGPeeringEvent> evt);
seastar::future<> handle_advance_map(cached_map_t next_map);
seastar::future<> handle_activate_map();
seastar::future<> share_pg_info();
void print(ostream& os) const;
private:
seastar::future<> activate_peer(pg_shard_t peer);
seastar::future<> send_to_osd(int peer, Ref<Message> m, epoch_t from_epoch);
void update_primary_state(const std::vector<int>& new_up,
int new_up_primary,
const std::vector<int>& new_acting,