crimson/osd: advance pg in consume_map()

Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2019-03-08 21:01:07 +08:00
parent 169c5e3b3c
commit e2bc6ad734
2 changed files with 16 additions and 11 deletions

View File

@ -622,7 +622,8 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
}
}
// yay!
consume_map(osdmap->get_epoch());
return consume_map(osdmap->get_epoch());
}).then([m, this] {
if (state.is_active()) {
logger().info("osd.{}: now active", whoami);
if (!osdmap->exists(whoami)) {
@ -804,17 +805,20 @@ seastar::future<> OSD::handle_pg_log(ceph::net::ConnectionRef conn,
return do_peering_event(m->get_spg(), std::move(evt));
}
void OSD::consume_map(epoch_t epoch)
seastar::future<> OSD::consume_map(epoch_t epoch)
{
// todo: m-to-n: broadcast this news to all shards
auto first = waiting_peering.lower_bound(epoch);
auto last = waiting_peering.end();
std::for_each(first, last,
[epoch, this](auto& blocked_requests) {
blocked_requests.second.set_value(epoch);
});
waiting_peering.erase(first, last);
return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
return advance_pg_to(pg.second, epoch);
}).then([epoch, this] {
auto first = waiting_peering.lower_bound(epoch);
auto last = waiting_peering.end();
std::for_each(first, last, [epoch, this](auto& blocked_requests) {
blocked_requests.second.set_value(epoch);
});
waiting_peering.erase(first, last);
return seastar::now();
});
}
seastar::future<>
@ -831,6 +835,7 @@ OSD::do_peering_event(spg_t pgid,
return _send_alive(pg->get_need_up_thru());
});
} else {
logger().warn("pg not found: {}", pgid);
// todo: handle_pg_query_nopg()
return seastar::now();
}

View File

@ -151,7 +151,7 @@ private:
waiting_peering_t waiting_peering;
// wait for an osdmap whose epoch is greater or equal to given epoch
seastar::future<epoch_t> wait_for_map(epoch_t epoch);
void consume_map(epoch_t epoch);
seastar::future<> consume_map(epoch_t epoch);
seastar::future<> do_peering_event(spg_t pgid,
std::unique_ptr<PGPeeringEvent> evt);
seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);