From 43abe8995ae66f373269d41059d2a2b4aee709db Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 13 Sep 2018 14:33:03 -0400 Subject: [PATCH] crimson/net: concurrent dispatch for SocketMessenger Signed-off-by: Casey Bodley --- src/crimson/net/SocketMessenger.cc | 20 ++++++++++++++------ src/crimson/net/SocketMessenger.h | 2 ++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index c52d9be2a6d..0a8173d8e20 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -48,11 +48,13 @@ seastar::future<> SocketMessenger::dispatch(ConnectionRef conn) return seastar::keep_doing([=] { return conn->read_message() .then([=] (MessageRef msg) { - if (msg) { - return dispatcher->ms_dispatch(conn, std::move(msg)); - } else { - return seastar::now(); - } + // start dispatch, ignoring exceptions from the application layer + seastar::with_gate(pending_dispatch, [=, msg = std::move(msg)] { + return dispatcher->ms_dispatch(conn, std::move(msg)) + .handle_exception([] (std::exception_ptr eptr) {}); + }); + // return immediately to start on the next message + return seastar::now(); }); }).handle_exception_type([=] (const std::system_error& e) { if (e.code() == error::connection_aborted || @@ -147,10 +149,16 @@ seastar::future<> SocketMessenger::shutdown() if (listener) { listener->abort_accept(); } + // close all connections return seastar::parallel_for_each(connections.begin(), connections.end(), [this] (auto conn) { return conn.second->close(); - }).finally([this] { connections.clear(); }); + }).finally([this] { + connections.clear(); + // closing connections will unblock any dispatchers that were waiting to + // send(). wait for any pending calls to finish + return pending_dispatch.close(); + }); } void SocketMessenger::set_default_policy(const SocketPolicy& p) diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 84e972a413c..9297b37087f 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -16,6 +16,7 @@ #include #include +#include #include #include "msg/Policy.h" @@ -32,6 +33,7 @@ class SocketMessenger final : public Messenger { std::map connections; using Throttle = ceph::thread::Throttle; ceph::net::PolicySet policy_set; + seastar::gate pending_dispatch; seastar::future<> dispatch(ConnectionRef conn);