crimson/net: make ms_dispatch() return optional<future<>>

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2020-12-02 11:25:10 +08:00
parent a5a77b5187
commit 66d17ffa9c
12 changed files with 45 additions and 47 deletions

View File

@ -47,7 +47,7 @@ seastar::future<> Client::stop()
return fut;
}
std::tuple<bool, seastar::future<>>
std::optional<seastar::future<>>
Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
bool dispatched = true;
@ -62,7 +62,7 @@ Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
return seastar::now();
}
});
return {dispatched, seastar::now()};
return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
void Client::ms_handle_connect(crimson::net::ConnectionRef c)

View File

@ -37,7 +37,7 @@ public:
void report();
private:
std::tuple<bool, seastar::future<>> ms_dispatch(
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef conn, Ref<Message> m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
void ms_handle_connect(crimson::net::ConnectionRef conn) final;

View File

@ -518,7 +518,7 @@ bool Client::is_hunting() const {
return !active_con;
}
std::tuple<bool, seastar::future<>>
std::optional<seastar::future<>>
Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
bool dispatched = true;
@ -550,7 +550,7 @@ Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
return seastar::now();
}
});
return {dispatched, seastar::now()};
return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */)

View File

@ -140,8 +140,8 @@ private:
private:
void tick();
std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::ConnectionRef conn,
MessageRef m) override;
std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef conn,
MessageRef m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
seastar::future<> handle_monmap(crimson::net::ConnectionRef conn,

View File

@ -25,11 +25,10 @@ class Dispatcher {
virtual ~Dispatcher() {}
// Dispatchers are put into a chain as described by chain-of-responsibility
// pattern. If any of the dispatchers claims this message, it returns true
// to prevent other dispatchers from processing it, and returns a future
// to throttle the connection if it's too busy. Else, it returns false and
// the second future is ignored.
virtual std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
// pattern. If any of the dispatchers claims this message, it returns a valid
// future to prevent other dispatchers from processing it, and this is also
// used to throttle the connection if it's too busy.
virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
virtual void ms_handle_accept(ConnectionRef conn) {}

View File

@ -17,16 +17,15 @@ ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn,
MessageRef m) {
try {
for (auto& dispatcher : dispatchers) {
auto [dispatched, throttle_future] = dispatcher->ms_dispatch(conn, m);
if (dispatched) {
return std::move(throttle_future
auto dispatched = dispatcher->ms_dispatch(conn, m);
if (dispatched.has_value()) {
return std::move(*dispatched
).handle_exception([conn] (std::exception_ptr eptr) {
logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
*conn, eptr);
ceph_abort();
});
}
assert(throttle_future.available());
}
} catch (...) {
logger().error("{} got unexpected exception in ms_dispatch() {}",

View File

@ -203,7 +203,7 @@ void Heartbeat::remove_peer(osd_id_t peer)
peers.erase(peer);
}
std::tuple<bool, seastar::future<>>
std::optional<seastar::future<>>
Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
bool dispatched = true;
@ -216,7 +216,7 @@ Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
return seastar::now();
}
});
return {dispatched, seastar::now()};
return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)

View File

@ -48,7 +48,7 @@ public:
void set_require_authorizer(bool);
// Dispatcher methods
std::tuple<bool, seastar::future<>> ms_dispatch(
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef conn, MessageRef m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
void ms_handle_connect(crimson::net::ConnectionRef conn) override;

View File

@ -614,11 +614,11 @@ seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
});
}
std::tuple<bool, seastar::future<>>
std::optional<seastar::future<>>
OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
if (state.is_stopping()) {
return {false, seastar::now()};
return {};
}
bool dispatched = true;
gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
@ -678,7 +678,7 @@ OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
return seastar::now();
}
});
return {dispatched, seastar::now()};
return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)

View File

@ -96,7 +96,7 @@ class OSD final : public crimson::net::Dispatcher,
OSDSuperblock superblock;
// Dispatcher methods
std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::ConnectionRef, MessageRef) final;
std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef, MessageRef) final;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;

View File

@ -57,14 +57,14 @@ static seastar::future<> test_echo(unsigned rounds,
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
std::tuple<bool, seastar::future<>> ms_dispatch(
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
if (verbose) {
logger().info("server got {}", *m);
}
// reply with a pong
std::ignore = c->send(make_message<MPing>());
return {true, seastar::now()};
return {seastar::now()};
}
seastar::future<> init(const entity_name_t& name,
@ -127,7 +127,7 @@ static seastar::future<> test_echo(unsigned rounds,
ceph_assert(added);
session->connected_time = mono_clock::now();
}
std::tuple<bool, seastar::future<>> ms_dispatch(
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
auto session = find_session(c);
++(session->count);
@ -142,7 +142,7 @@ static seastar::future<> test_echo(unsigned rounds,
ceph_assert(found != pending_conns.end());
found->second.set_value();
}
return {true, seastar::now()};
return {seastar::now()};
}
seastar::future<> init(const entity_name_t& name,
@ -277,7 +277,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
seastar::promise<> on_done; // satisfied when first dispatch unblocks
crimson::auth::DummyAuthClientServer dummy_auth;
std::tuple<bool, seastar::future<>> ms_dispatch(
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef, MessageRef m) override {
switch (++count) {
case 1:
@ -290,7 +290,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
default:
throw std::runtime_error("unexpected count");
}
return {true, seastar::now()};
return {seastar::now()};
}
seastar::future<> wait() { return on_done.get_future(); }
@ -319,9 +319,9 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
std::tuple<bool, seastar::future<>> ms_dispatch(
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef, MessageRef m) override {
return {true, seastar::now()};
return {seastar::now()};
}
seastar::future<> init(const entity_name_t& name,
@ -378,10 +378,10 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
std::tuple<bool, seastar::future<>> ms_dispatch(
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
std::ignore = c->send(make_message<MPing>());
return {true, seastar::now()};
return {seastar::now()};
}
public:
@ -419,9 +419,9 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
bool stop_send = false;
seastar::promise<> stopped_send_promise;
std::tuple<bool, seastar::future<>> ms_dispatch(
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef, MessageRef m) override {
return {true, seastar::now()};
return {seastar::now()};
}
public:
@ -813,7 +813,7 @@ class FailoverSuite : public Dispatcher {
unsigned pending_peer_receive = 0;
unsigned pending_receive = 0;
std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
auto result = interceptor.find_result(c);
if (result == nullptr) {
logger().error("Untracked ms dispatched connection: {}", *c);
@ -835,7 +835,7 @@ class FailoverSuite : public Dispatcher {
}
logger().info("[Test] got op, left {} ops -- [{}] {}",
pending_receive, result->index, *c);
return {true, seastar::now()};
return {seastar::now()};
}
void ms_handle_accept(ConnectionRef conn) override {
@ -1209,7 +1209,7 @@ class FailoverTest : public Dispatcher {
std::unique_ptr<FailoverSuite> test_suite;
std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
switch (m->get_type()) {
case CEPH_MSG_PING:
ceph_assert(recv_pong);
@ -1232,7 +1232,7 @@ class FailoverTest : public Dispatcher {
logger().error("{} got unexpected msg from cmd server: {}", *c, *m);
ceph_abort();
}
return {true, seastar::now()};
return {seastar::now()};
}
private:
@ -1407,12 +1407,12 @@ class FailoverSuitePeer : public Dispatcher {
ConnectionRef tracked_conn;
unsigned pending_send = 0;
std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
logger().info("[TestPeer] got op from Test");
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
ceph_assert(tracked_conn == c);
std::ignore = op_callback();
return {true, seastar::now()};
return {seastar::now()};
}
void ms_handle_accept(ConnectionRef conn) override {
@ -1537,7 +1537,7 @@ class FailoverTestPeer : public Dispatcher {
const entity_addr_t test_peer_addr;
std::unique_ptr<FailoverSuitePeer> test_suite;
std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
ceph_assert(cmd_conn == c);
switch (m->get_type()) {
case CEPH_MSG_PING:
@ -1562,7 +1562,7 @@ class FailoverTestPeer : public Dispatcher {
logger().error("{} got unexpected msg from cmd client: {}", *c, m);
ceph_abort();
}
return {true, seastar::now()};
return {seastar::now()};
}
void ms_handle_accept(ConnectionRef conn) override {

View File

@ -152,7 +152,7 @@ static seastar::future<> run(
msg_data.append_zero(msg_len);
}
std::tuple<bool, seastar::future<>> ms_dispatch(
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
@ -167,7 +167,7 @@ static seastar::future<> run(
rep->write(0, msg_len, data);
rep->set_tid(m->get_tid());
std::ignore = c->send(std::move(rep));
return {true, seastar::now()};
return {seastar::now()};
}
seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) {
@ -307,7 +307,7 @@ static seastar::future<> run(
void ms_handle_connect(crimson::net::ConnectionRef conn) override {
conn_stats.connected_time = mono_clock::now();
}
std::tuple<bool, seastar::future<>> ms_dispatch(
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef, MessageRef m) override {
// server replies with MOSDOp to generate server-side write workload
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
@ -327,7 +327,7 @@ static seastar::future<> run(
++(conn_stats.received_count);
depth.signal(1);
return {true, seastar::now()};
return {seastar::now()};
}
// should start messenger at this shard?