crimson/net: concurrent dispatch for SocketMessenger

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2018-09-13 14:33:03 -04:00
parent f8d5eba4de
commit 43abe8995a
2 changed files with 16 additions and 6 deletions

View File

@ -48,11 +48,13 @@ seastar::future<> SocketMessenger::dispatch(ConnectionRef conn)
return seastar::keep_doing([=] {
return conn->read_message()
.then([=] (MessageRef msg) {
if (msg) {
return dispatcher->ms_dispatch(conn, std::move(msg));
} else {
return seastar::now();
}
// start dispatch, ignoring exceptions from the application layer
seastar::with_gate(pending_dispatch, [=, msg = std::move(msg)] {
return dispatcher->ms_dispatch(conn, std::move(msg))
.handle_exception([] (std::exception_ptr eptr) {});
});
// return immediately to start on the next message
return seastar::now();
});
}).handle_exception_type([=] (const std::system_error& e) {
if (e.code() == error::connection_aborted ||
@ -147,10 +149,16 @@ seastar::future<> SocketMessenger::shutdown()
if (listener) {
listener->abort_accept();
}
// close all connections
return seastar::parallel_for_each(connections.begin(), connections.end(),
[this] (auto conn) {
return conn.second->close();
}).finally([this] { connections.clear(); });
}).finally([this] {
connections.clear();
// closing connections will unblock any dispatchers that were waiting to
// send(). wait for any pending calls to finish
return pending_dispatch.close();
});
}
void SocketMessenger::set_default_policy(const SocketPolicy& p)

View File

@ -16,6 +16,7 @@
#include <map>
#include <optional>
#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include "msg/Policy.h"
@ -32,6 +33,7 @@ class SocketMessenger final : public Messenger {
std::map<entity_addr_t, ConnectionRef> connections;
using Throttle = ceph::thread::Throttle;
ceph::net::PolicySet<Throttle> policy_set;
seastar::gate pending_dispatch;
seastar::future<> dispatch(ConnectionRef conn);