diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index c00370cf3b3..5aa8a88ba21 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -47,7 +47,7 @@ seastar::future<> Client::stop() return fut; } -std::tuple> +std::optional> Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { bool dispatched = true; @@ -62,7 +62,7 @@ Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) return seastar::now(); } }); - return {dispatched, seastar::now()}; + return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); } void Client::ms_handle_connect(crimson::net::ConnectionRef c) diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h index 555e779bfbc..ad7e1fde54e 100644 --- a/src/crimson/mgr/client.h +++ b/src/crimson/mgr/client.h @@ -37,7 +37,7 @@ public: void report(); private: - std::tuple> ms_dispatch( + std::optional> ms_dispatch( crimson::net::ConnectionRef conn, Ref m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; void ms_handle_connect(crimson::net::ConnectionRef conn) final; diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 5df743aaeb0..9dfbb103a38 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -518,7 +518,7 @@ bool Client::is_hunting() const { return !active_con; } -std::tuple> +std::optional> Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { bool dispatched = true; @@ -550,7 +550,7 @@ Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) return seastar::now(); } }); - return {dispatched, seastar::now()}; + return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); } void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */) diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h index bc8593d60a1..e7d2df86393 100644 --- a/src/crimson/mon/MonClient.h +++ b/src/crimson/mon/MonClient.h @@ -140,8 +140,8 @@ private: private: void tick(); - std::tuple> ms_dispatch(crimson::net::ConnectionRef conn, - MessageRef m) override; + std::optional> ms_dispatch(crimson::net::ConnectionRef conn, + MessageRef m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; seastar::future<> handle_monmap(crimson::net::ConnectionRef conn, diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index 71be61783b0..cc6fd4574c7 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -25,11 +25,10 @@ class Dispatcher { virtual ~Dispatcher() {} // Dispatchers are put into a chain as described by chain-of-responsibility - // pattern. If any of the dispatchers claims this message, it returns true - // to prevent other dispatchers from processing it, and returns a future - // to throttle the connection if it's too busy. Else, it returns false and - // the second future is ignored. - virtual std::tuple> ms_dispatch(ConnectionRef, MessageRef) = 0; + // pattern. If any of the dispatchers claims this message, it returns a valid + // future to prevent other dispatchers from processing it, and this is also + // used to throttle the connection if it's too busy. + virtual std::optional> ms_dispatch(ConnectionRef, MessageRef) = 0; virtual void ms_handle_accept(ConnectionRef conn) {} diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc index 635794e0db3..b13d40c8f73 100644 --- a/src/crimson/net/chained_dispatchers.cc +++ b/src/crimson/net/chained_dispatchers.cc @@ -17,16 +17,15 @@ ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { try { for (auto& dispatcher : dispatchers) { - auto [dispatched, throttle_future] = dispatcher->ms_dispatch(conn, m); - if (dispatched) { - return std::move(throttle_future + auto dispatched = dispatcher->ms_dispatch(conn, m); + if (dispatched.has_value()) { + return std::move(*dispatched ).handle_exception([conn] (std::exception_ptr eptr) { logger().error("{} got unexpected exception in ms_dispatch() throttling {}", *conn, eptr); ceph_abort(); }); } - assert(throttle_future.available()); } } catch (...) { logger().error("{} got unexpected exception in ms_dispatch() {}", diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index 3f4fff3aac5..81ec06ecd5d 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -203,7 +203,7 @@ void Heartbeat::remove_peer(osd_id_t peer) peers.erase(peer); } -std::tuple> +std::optional> Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { bool dispatched = true; @@ -216,7 +216,7 @@ Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) return seastar::now(); } }); - return {dispatched, seastar::now()}; + return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); } void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index 46d12463c3d..4947e871ff5 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -48,7 +48,7 @@ public: void set_require_authorizer(bool); // Dispatcher methods - std::tuple> ms_dispatch( + std::optional> ms_dispatch( crimson::net::ConnectionRef conn, MessageRef m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; void ms_handle_connect(crimson::net::ConnectionRef conn) override; diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 43ccd812c0c..cdd05a64cfe 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -614,11 +614,11 @@ seastar::future> OSD::load_pg(spg_t pgid) }); } -std::tuple> +std::optional> OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { if (state.is_stopping()) { - return {false, seastar::now()}; + return {}; } bool dispatched = true; gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { @@ -678,7 +678,7 @@ OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) return seastar::now(); } }); - return {dispatched, seastar::now()}; + return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); } void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 5b6fcc8448c..889960ced8d 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -96,7 +96,7 @@ class OSD final : public crimson::net::Dispatcher, OSDSuperblock superblock; // Dispatcher methods - std::tuple> ms_dispatch(crimson::net::ConnectionRef, MessageRef) final; + std::optional> ms_dispatch(crimson::net::ConnectionRef, MessageRef) final; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index aed86c3ab14..9e7adbe3bff 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -57,14 +57,14 @@ static seastar::future<> test_echo(unsigned rounds, crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; - std::tuple> ms_dispatch( + std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { if (verbose) { logger().info("server got {}", *m); } // reply with a pong std::ignore = c->send(make_message()); - return {true, seastar::now()}; + return {seastar::now()}; } seastar::future<> init(const entity_name_t& name, @@ -127,7 +127,7 @@ static seastar::future<> test_echo(unsigned rounds, ceph_assert(added); session->connected_time = mono_clock::now(); } - std::tuple> ms_dispatch( + std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { auto session = find_session(c); ++(session->count); @@ -142,7 +142,7 @@ static seastar::future<> test_echo(unsigned rounds, ceph_assert(found != pending_conns.end()); found->second.set_value(); } - return {true, seastar::now()}; + return {seastar::now()}; } seastar::future<> init(const entity_name_t& name, @@ -277,7 +277,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2) seastar::promise<> on_done; // satisfied when first dispatch unblocks crimson::auth::DummyAuthClientServer dummy_auth; - std::tuple> ms_dispatch( + std::optional> ms_dispatch( crimson::net::ConnectionRef, MessageRef m) override { switch (++count) { case 1: @@ -290,7 +290,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2) default: throw std::runtime_error("unexpected count"); } - return {true, seastar::now()}; + return {seastar::now()}; } seastar::future<> wait() { return on_done.get_future(); } @@ -319,9 +319,9 @@ static seastar::future<> test_concurrent_dispatch(bool v2) crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; - std::tuple> ms_dispatch( + std::optional> ms_dispatch( crimson::net::ConnectionRef, MessageRef m) override { - return {true, seastar::now()}; + return {seastar::now()}; } seastar::future<> init(const entity_name_t& name, @@ -378,10 +378,10 @@ seastar::future<> test_preemptive_shutdown(bool v2) { crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; - std::tuple> ms_dispatch( + std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { std::ignore = c->send(make_message()); - return {true, seastar::now()}; + return {seastar::now()}; } public: @@ -419,9 +419,9 @@ seastar::future<> test_preemptive_shutdown(bool v2) { bool stop_send = false; seastar::promise<> stopped_send_promise; - std::tuple> ms_dispatch( + std::optional> ms_dispatch( crimson::net::ConnectionRef, MessageRef m) override { - return {true, seastar::now()}; + return {seastar::now()}; } public: @@ -813,7 +813,7 @@ class FailoverSuite : public Dispatcher { unsigned pending_peer_receive = 0; unsigned pending_receive = 0; - std::tuple> ms_dispatch(ConnectionRef c, MessageRef m) override { + std::optional> ms_dispatch(ConnectionRef c, MessageRef m) override { auto result = interceptor.find_result(c); if (result == nullptr) { logger().error("Untracked ms dispatched connection: {}", *c); @@ -835,7 +835,7 @@ class FailoverSuite : public Dispatcher { } logger().info("[Test] got op, left {} ops -- [{}] {}", pending_receive, result->index, *c); - return {true, seastar::now()}; + return {seastar::now()}; } void ms_handle_accept(ConnectionRef conn) override { @@ -1209,7 +1209,7 @@ class FailoverTest : public Dispatcher { std::unique_ptr test_suite; - std::tuple> ms_dispatch(ConnectionRef c, MessageRef m) override { + std::optional> ms_dispatch(ConnectionRef c, MessageRef m) override { switch (m->get_type()) { case CEPH_MSG_PING: ceph_assert(recv_pong); @@ -1232,7 +1232,7 @@ class FailoverTest : public Dispatcher { logger().error("{} got unexpected msg from cmd server: {}", *c, *m); ceph_abort(); } - return {true, seastar::now()}; + return {seastar::now()}; } private: @@ -1407,12 +1407,12 @@ class FailoverSuitePeer : public Dispatcher { ConnectionRef tracked_conn; unsigned pending_send = 0; - std::tuple> ms_dispatch(ConnectionRef c, MessageRef m) override { + std::optional> ms_dispatch(ConnectionRef c, MessageRef m) override { logger().info("[TestPeer] got op from Test"); ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); ceph_assert(tracked_conn == c); std::ignore = op_callback(); - return {true, seastar::now()}; + return {seastar::now()}; } void ms_handle_accept(ConnectionRef conn) override { @@ -1537,7 +1537,7 @@ class FailoverTestPeer : public Dispatcher { const entity_addr_t test_peer_addr; std::unique_ptr test_suite; - std::tuple> ms_dispatch(ConnectionRef c, MessageRef m) override { + std::optional> ms_dispatch(ConnectionRef c, MessageRef m) override { ceph_assert(cmd_conn == c); switch (m->get_type()) { case CEPH_MSG_PING: @@ -1562,7 +1562,7 @@ class FailoverTestPeer : public Dispatcher { logger().error("{} got unexpected msg from cmd client: {}", *c, m); ceph_abort(); } - return {true, seastar::now()}; + return {seastar::now()}; } void ms_handle_accept(ConnectionRef conn) override { diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc index d53c619e9f7..e76f273a921 100644 --- a/src/tools/crimson/perf_crimson_msgr.cc +++ b/src/tools/crimson/perf_crimson_msgr.cc @@ -152,7 +152,7 @@ static seastar::future<> run( msg_data.append_zero(msg_len); } - std::tuple> ms_dispatch( + std::optional> ms_dispatch( crimson::net::ConnectionRef c, MessageRef m) override { ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); @@ -167,7 +167,7 @@ static seastar::future<> run( rep->write(0, msg_len, data); rep->set_tid(m->get_tid()); std::ignore = c->send(std::move(rep)); - return {true, seastar::now()}; + return {seastar::now()}; } seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) { @@ -307,7 +307,7 @@ static seastar::future<> run( void ms_handle_connect(crimson::net::ConnectionRef conn) override { conn_stats.connected_time = mono_clock::now(); } - std::tuple> ms_dispatch( + std::optional> ms_dispatch( crimson::net::ConnectionRef, MessageRef m) override { // server replies with MOSDOp to generate server-side write workload ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); @@ -327,7 +327,7 @@ static seastar::future<> run( ++(conn_stats.received_count); depth.signal(1); - return {true, seastar::now()}; + return {seastar::now()}; } // should start messenger at this shard?