Merge pull request #30474 from tchaikov/wip-discard-future

crimson: run in foreground if possible, silence warnings

Reviewed-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Kefu Chai 2019-09-20 10:51:54 +08:00 committed by GitHub
commit a363dea882
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 86 additions and 61 deletions

View File

@ -12,7 +12,7 @@
namespace ceph::net {
enum class custom_bp_t {
enum class custom_bp_t : uint8_t {
BANNER_WRITE = 0,
BANNER_READ,
BANNER_PAYLOAD_READ,
@ -20,13 +20,14 @@ enum class custom_bp_t {
SOCKET_ACCEPTED
};
inline const char* get_bp_name(custom_bp_t bp) {
uint8_t index = static_cast<uint8_t>(bp);
static const char *const bp_names[] = {"BANNER_WRITE",
"BANNER_READ",
"BANNER_PAYLOAD_READ",
"SOCKET_CONNECTING",
"SOCKET_ACCEPTED"};
assert(static_cast<int>(bp) < std::size(bp_names));
return bp_names[static_cast<int>(bp)];
assert(index < std::size(bp_names));
return bp_names[index];
}
enum class bp_type_t {

View File

@ -76,7 +76,7 @@ class Protocol {
protected:
// write_state is changed with state atomically, indicating the write
// behavior of the according state.
enum class write_state_t {
enum class write_state_t : uint8_t {
none,
delay,
open,
@ -84,12 +84,13 @@ class Protocol {
};
static const char* get_state_name(write_state_t state) {
uint8_t index = static_cast<uint8_t>(state);
static const char *const state_names[] = {"none",
"delay",
"open",
"drop"};
assert(static_cast<int>(state) < std::size(state_names));
return state_names[static_cast<int>(state)];
assert(index < std::size(state_names));
return state_names[index];
}
void set_write_state(const write_state_t& state) {

View File

@ -29,7 +29,8 @@ Heartbeat::Heartbeat(const ceph::osd::ShardServices& service,
monc{monc},
front_msgr{front_msgr},
back_msgr{back_msgr},
timer{[this] {send_heartbeats();}}
// do this in background
timer{[this] { (void)send_heartbeats(); }}
{}
seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
@ -90,8 +91,10 @@ void Heartbeat::set_require_authorizer(bool require_authorizer)
seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
{
auto found = peers.find(peer);
if (found == peers.end()) {
auto [peer_info, added] = peers.try_emplace(peer);
auto& info = peer_info->second;
info.epoch = epoch;
if (added) {
logger().info("add_peer({})", peer);
auto osdmap = service.get_osdmap_service().get_map();
// TODO: use addrs
@ -100,16 +103,12 @@ seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
CEPH_ENTITY_TYPE_OSD),
back_msgr.connect(osdmap->get_hb_back_addrs(peer).front(),
CEPH_ENTITY_TYPE_OSD))
.then([this, peer, epoch] (auto xcon_front, auto xcon_back) {
PeerInfo info;
.then([this, &info=peer_info->second] (auto xcon_front, auto xcon_back) {
// sharded-messenger compatible mode
info.con_front = xcon_front->release();
info.con_back = xcon_back->release();
info.epoch = epoch;
peers.emplace(peer, std::move(info));
});
} else {
found->second.epoch = epoch;
return seastar::now();
}
}
@ -141,7 +140,7 @@ seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers()
});
}
void Heartbeat::add_reporter_peers(int whoami)
seastar::future<> Heartbeat::add_reporter_peers(int whoami)
{
auto osdmap = service.get_osdmap_service().get_map();
// include next and previous up osds to ensure we have a fully-connected set
@ -158,17 +157,20 @@ void Heartbeat::add_reporter_peers(int whoami)
auto subtree = local_conf().get_val<string>("mon_osd_reporter_subtree_level");
osdmap->get_random_up_osds_by_subtree(
whoami, subtree, min_down, want, &want);
for (auto osd : want) {
add_peer(osd, osdmap->get_epoch());
}
return seastar::parallel_for_each(
std::move(want),
[epoch=osdmap->get_epoch(), this](int osd) {
return add_peer(osd, epoch);
});
}
seastar::future<> Heartbeat::update_peers(int whoami)
{
const auto min_peers = static_cast<size_t>(
local_conf().get_val<int64_t>("osd_heartbeat_min_peers"));
return remove_down_peers().then([=](osds_t&& extra) {
add_reporter_peers(whoami);
return add_reporter_peers(whoami).then([this] {
return remove_down_peers();
}).then([=](osds_t&& extra) {
// too many?
struct iteration_state {
osds_t::const_iterator where;
@ -185,13 +187,18 @@ seastar::future<> Heartbeat::update_peers(int whoami)
});
}).then([=] {
// or too few?
vector<int> want;
auto osdmap = service.get_osdmap_service().get_map();
for (auto next = osdmap->get_next_up_osd_after(whoami);
peers.size() < min_peers && next >= 0 && next != whoami;
next = osdmap->get_next_up_osd_after(next)) {
add_peer(next, osdmap->get_epoch());
want.push_back(next);
}
return seastar::now();
return seastar::parallel_for_each(
std::move(want),
[epoch=osdmap->get_epoch(), this](int osd) {
return add_peer(osd, epoch);
});
});
}

View File

@ -67,7 +67,7 @@ private:
/// @return peers not needed in this epoch
seastar::future<osds_t> remove_down_peers();
/// add enough reporters for fast failure detection
void add_reporter_peers(int whoami);
seastar::future<> add_reporter_peers(int whoami);
seastar::future<> start_messenger(ceph::net::Messenger& msgr,
const entity_addrvec_t& addrs);

View File

@ -60,7 +60,8 @@ OSD::OSD(int id, uint32_t nonce,
ceph::net::Messenger& hb_back_msgr)
: whoami{id},
nonce{nonce},
beacon_timer{[this] { send_beacon(); }},
// do this in background
beacon_timer{[this] { (void)send_beacon(); }},
cluster_msgr{cluster_msgr},
public_msgr{public_msgr},
monc{new ceph::mon::Client{public_msgr, *this}},
@ -70,7 +71,8 @@ OSD::OSD(int id, uint32_t nonce,
local_conf().get_val<std::string>("osd_data"))},
shard_services{*this, cluster_msgr, public_msgr, *monc, *mgrc, *store},
heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
heartbeat_timer{[this] { update_heartbeat_peers(); }},
// do this in background
heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
{
osdmaps[0] = boost::make_local_shared<OSDMap>();
@ -264,8 +266,11 @@ seastar::future<> OSD::start()
if (auto [addrs, changed] =
replace_unknown_addrs(cluster_msgr.get_myaddrs(),
public_msgr.get_myaddrs()); changed) {
cluster_msgr.set_myaddrs(addrs);
return cluster_msgr.set_myaddrs(addrs);
} else {
return seastar::now();
}
}).then([this] {
return heartbeat->start(public_msgr.get_myaddrs(),
cluster_msgr.get_myaddrs());
}).then([this] {
@ -960,23 +965,30 @@ seastar::future<> OSD::send_beacon()
return monc->send_message(m);
}
void OSD::update_heartbeat_peers()
seastar::future<> OSD::update_heartbeat_peers()
{
if (!state.is_active()) {
return;
return seastar::now();
}
for (auto& pg : pg_map.get_pgs()) {
vector<int> up, acting;
osdmap->pg_to_up_acting_osds(pg.first.pgid,
&up, nullptr,
&acting, nullptr);
for (auto osd : boost::join(up, acting)) {
if (osd != CRUSH_ITEM_NONE && osd != whoami) {
heartbeat->add_peer(osd, osdmap->get_epoch());
}
}
}
heartbeat->update_peers(whoami);
return seastar::parallel_for_each(
pg_map.get_pgs(),
[this](auto& pg) {
vector<int> up, acting;
osdmap->pg_to_up_acting_osds(pg.first.pgid,
&up, nullptr,
&acting, nullptr);
return seastar::parallel_for_each(
boost::join(up, acting),
[this](int osd) {
if (osd == CRUSH_ITEM_NONE || osd == whoami) {
return seastar::now();
} else {
return heartbeat->add_peer(osd, osdmap->get_epoch());
}
});
}).then([this] {
return heartbeat->update_peers(whoami);
});
}
seastar::future<> OSD::handle_peering_op(
@ -1024,7 +1036,7 @@ OSD::get_or_create_pg(
auto [fut, creating] = pg_map.get_pg(pgid, bool(info));
if (!creating && info) {
pg_map.set_creating(pgid);
handle_pg_create_info(std::move(info));
(void)handle_pg_create_info(std::move(info));
}
return std::move(fut);
}

View File

@ -203,7 +203,7 @@ public:
seastar::future<> shutdown();
seastar::future<> send_beacon();
void update_heartbeat_peers();
seastar::future<> update_heartbeat_peers();
friend class PGAdvanceMap;
};

View File

@ -113,7 +113,7 @@ PG::PG(
PG::~PG() {}
bool PG::try_flush_or_schedule_async() {
shard_services.get_store().do_transaction(
(void)shard_services.get_store().do_transaction(
coll_ref,
ObjectStore::Transaction()).then(
[this, epoch=get_osdmap_epoch()]() {
@ -384,13 +384,13 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
}).handle_exception_type([=,&oid](const ceph::osd::error& e) {
logger().debug("got ceph::osd::error while handling object {}: {} ({})",
oid, e.code(), e.what());
backend->evict_object_state(oid);
auto reply = make_message<MOSDOpReply>(
m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
return backend->evict_object_state(oid).then([=] {
auto reply = make_message<MOSDOpReply>(
m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
});
});
}

View File

@ -158,11 +158,11 @@ public:
void send_cluster_message(
int osd, Message *m,
epoch_t epoch, bool share_map_update=false) final {
shard_services.send_to_osd(osd, m, epoch);
(void)shard_services.send_to_osd(osd, m, epoch);
}
void send_pg_created(pg_t pgid) final {
shard_services.send_pg_created(pgid);
(void)shard_services.send_pg_created(pgid);
}
bool try_flush_or_schedule_async() final;

View File

@ -153,10 +153,10 @@ std::ostream& operator<<(
return out;
}
void ShardServices::send_pg_temp()
seastar::future<> ShardServices::send_pg_temp()
{
if (pg_temp_wanted.empty())
return;
return seastar::now();
logger().debug("{}: {}", __func__, pg_temp_wanted);
boost::intrusive_ptr<MOSDPGTemp> ms[2] = {nullptr, nullptr};
for (auto& [pgid, pg_temp] : pg_temp_wanted) {
@ -167,12 +167,16 @@ void ShardServices::send_pg_temp()
}
m->pg_temp.emplace(pgid, pg_temp.acting);
}
for (auto &m : ms) {
if (m) {
monc.send_message(m);
}
}
_sent_pg_temp();
return seastar::parallel_for_each(std::begin(ms), std::end(ms),
[this](auto m) {
if (m) {
return monc.send_message(m);
} else {
return seastar::now();
}
}).then([this] {
_sent_pg_temp();
});
}
void ShardServices::update_map(cached_map_t new_osdmap)

View File

@ -132,7 +132,7 @@ public:
bool forced = false);
void remove_want_pg_temp(pg_t pgid);
void requeue_pg_temp();
void send_pg_temp();
seastar::future<> send_pg_temp();
// Shard-local OSDMap
private: