Merge pull request #25207 from cyx1231st/wip-crimson-msgr-dispatch-events

crimson/net: encapsulate protocol implementations with states (remaining part)

Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2018-12-20 23:51:20 +08:00 committed by GitHub
commit 539a16f376
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 158 additions and 206 deletions

View File

@ -62,15 +62,6 @@ class Messenger {
return ++global_seq;
}
// @returns a tuple of <is_valid, auth_reply, session_key>
virtual seastar::future<msgr_tag_t, /// tag for error, 0 if authorized
bufferlist> /// auth_reply
verify_authorizer(peer_type_t peer_type,
auth_proto_t protocol,
bufferlist& auth) = 0;
virtual seastar::future<std::unique_ptr<AuthAuthorizer>>
get_authorizer(peer_type_t peer_type,
bool force_new) = 0;
uint32_t get_crc_flags() const {
return crc_flags;
}

View File

@ -26,6 +26,7 @@
#include "crimson/common/log.h"
#include "Config.h"
#include "Dispatcher.h"
#include "Errors.h"
#include "SocketMessenger.h"
@ -43,15 +44,18 @@ namespace {
}
SocketConnection::SocketConnection(SocketMessenger& messenger,
const entity_addr_t& my_addr)
const entity_addr_t& my_addr,
Dispatcher& dispatcher)
: Connection(my_addr),
messenger(messenger),
dispatcher(dispatcher),
send_ready(h.promise.get_future())
{
}
SocketConnection::~SocketConnection()
{
ceph_assert(pending_dispatch.is_closed());
// errors were reported to callers of send()
ceph_assert(send_ready.available());
send_ready.ignore_ready_future();
@ -310,10 +314,13 @@ seastar::future<> SocketConnection::close()
assert(!close_ready.valid());
if (socket) {
close_ready = socket->close().finally(std::move(cleanup));
close_ready = socket->close()
.then([this] {
return pending_dispatch.close();
}).finally(std::move(cleanup));
} else {
ceph_assert(state == state_t::connecting);
close_ready = seastar::now();
close_ready = pending_dispatch.close().finally(std::move(cleanup));
}
state = state_t::closing;
return close_ready.get_future();
@ -453,9 +460,9 @@ SocketConnection::repeat_handle_connect()
return seastar::make_ready_future<msgr_tag_t, bufferlist>(
CEPH_MSGR_TAG_FEATURES, bufferlist{});
}
return messenger.verify_authorizer(peer_type,
h.connect.authorizer_protocol,
authorizer);
return dispatcher.ms_verify_authorizer(peer_type,
h.connect.authorizer_protocol,
authorizer);
}).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
memset(&h.reply, 0, sizeof(h.reply));
if (tag) {
@ -526,8 +533,6 @@ SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
return socket->flush();
}
}).then([this] {
messenger.register_conn(this);
messenger.unaccept_conn(this);
return stop_t::yes;
});
}
@ -637,7 +642,7 @@ SocketConnection::handle_connect_reply(msgr_tag_t tag)
}
h.got_bad_auth = true;
// try harder
return messenger.get_authorizer(peer_type, true)
return dispatcher.ms_get_authorizer(peer_type, true)
.then([this](auto&& auth) {
h.authorizer = std::move(auth);
return stop_t::no;
@ -729,7 +734,7 @@ SocketConnection::repeat_connect()
// this is fyi, actually, server decides!
h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
return messenger.get_authorizer(peer_type, false)
return dispatcher.ms_get_authorizer(peer_type, false)
.then([this](auto&& auth) {
h.authorizer = std::move(auth);
bufferlist bl;
@ -764,7 +769,7 @@ SocketConnection::repeat_connect()
});
}
seastar::future<>
void
SocketConnection::start_connect(const entity_addr_t& _peer_addr,
const entity_type_t& _peer_type)
{
@ -774,52 +779,59 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
peer_type = _peer_type;
messenger.register_conn(this);
state = state_t::connecting;
return seastar::connect(peer_addr.in4_addr())
.then([this](seastar::connected_socket fd) {
if (state == state_t::closing) {
fd.shutdown_input();
fd.shutdown_output();
throw std::system_error(make_error_code(error::connection_aborted));
}
socket.emplace(std::move(fd));
// read server's handshake header
return socket->read(server_header_size);
}).then([this] (bufferlist headerbl) {
auto p = headerbl.cbegin();
validate_banner(p);
entity_addr_t saddr, caddr;
::decode(saddr, p);
::decode(caddr, p);
ceph_assert(p.end());
validate_peer_addr(saddr, peer_addr);
seastar::with_gate(pending_dispatch, [this] {
return seastar::connect(peer_addr.in4_addr())
.then([this](seastar::connected_socket fd) {
if (state == state_t::closing) {
fd.shutdown_input();
fd.shutdown_output();
throw std::system_error(make_error_code(error::connection_aborted));
}
socket.emplace(std::move(fd));
// read server's handshake header
return socket->read(server_header_size);
}).then([this] (bufferlist headerbl) {
auto p = headerbl.cbegin();
validate_banner(p);
entity_addr_t saddr, caddr;
::decode(saddr, p);
::decode(caddr, p);
ceph_assert(p.end());
validate_peer_addr(saddr, peer_addr);
if (my_addr != caddr) {
// take peer's address for me, but preserve my nonce
caddr.nonce = my_addr.nonce;
my_addr = caddr;
}
// encode/send client's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
h.global_seq = messenger.get_global_seq();
return socket->write_flush(std::move(bl));
}).then([=] {
return seastar::repeat([this] {
return repeat_connect();
});
// TODO: handle errors for state_t::connecting
}).then([this] {
state = state_t::open;
// start background processing of tags
read_tags_until_next_message();
}).then_wrapped([this] (auto fut) {
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
if (my_addr != caddr) {
// take peer's address for me, but preserve my nonce
caddr.nonce = my_addr.nonce;
my_addr = caddr;
}
// encode/send client's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
h.global_seq = messenger.get_global_seq();
return socket->write_flush(std::move(bl));
}).then([=] {
return seastar::repeat([this] {
return repeat_connect();
});
}).then_wrapped([this] (auto fut) {
// TODO: do not forward the exception
// and let the reconnect happen transparently inside connection
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
}).then([this] {
// notify the dispatcher and allow them to reject the connection
return dispatcher.ms_handle_connect(this);
}).then([this] {
execute_open();
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the connecting state
close();
});
});
}
seastar::future<>
void
SocketConnection::start_accept(seastar::connected_socket&& fd,
const entity_addr_t& _peer_addr)
{
@ -829,36 +841,78 @@ SocketConnection::start_accept(seastar::connected_socket&& fd,
socket.emplace(std::move(fd));
messenger.accept_conn(this);
state = state_t::accepting;
// encode/send server's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
::encode(peer_addr, bl, 0);
return socket->write_flush(std::move(bl))
.then([this] {
// read client's handshake header and connect request
return socket->read(client_header_size);
}).then([this] (bufferlist bl) {
auto p = bl.cbegin();
validate_banner(p);
entity_addr_t addr;
::decode(addr, p);
ceph_assert(p.end());
if (!addr.is_blank_ip()) {
peer_addr = addr;
}
}).then([this] {
return seastar::repeat([this] {
return repeat_handle_connect();
});
// TODO: handle errors for state_t::accepting
}).then([this] {
state = state_t::open;
seastar::with_gate(pending_dispatch, [this] {
// encode/send server's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
::encode(peer_addr, bl, 0);
return socket->write_flush(std::move(bl))
.then([this] {
// read client's handshake header and connect request
return socket->read(client_header_size);
}).then([this] (bufferlist bl) {
auto p = bl.cbegin();
validate_banner(p);
entity_addr_t addr;
::decode(addr, p);
ceph_assert(p.end());
if (!addr.is_blank_ip()) {
peer_addr = addr;
}
}).then([this] {
return seastar::repeat([this] {
return repeat_handle_connect();
});
}).then_wrapped([this] (auto fut) {
// TODO: do not forward the exception
// and let the reconnect happen transparently inside connection
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
}).then([this] {
// notify the dispatcher and allow them to reject the connection
return dispatcher.ms_handle_accept(this);
}).then([this] {
messenger.register_conn(this);
messenger.unaccept_conn(this);
execute_open();
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the accepting state
close();
});
});
}
void
SocketConnection::execute_open()
{
state = state_t::open;
seastar::with_gate(pending_dispatch, [this] {
// start background processing of tags
read_tags_until_next_message();
}).then_wrapped([this] (auto fut) {
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
return seastar::keep_doing([this] {
return read_message()
.then([this] (MessageRef msg) {
// start dispatch, ignoring exceptions from the application layer
seastar::with_gate(pending_dispatch, [this, msg = std::move(msg)] {
return dispatcher.ms_dispatch(this, std::move(msg))
.handle_exception([] (std::exception_ptr eptr) {});
});
// return immediately to start on the next message
return seastar::now();
});
}).handle_exception_type([this] (const std::system_error& e) {
if (e.code() == error::connection_aborted ||
e.code() == error::connection_reset) {
return dispatcher.ms_handle_reset(this);
} else if (e.code() == error::read_eof) {
return dispatcher.ms_handle_remote_reset(this);
} else {
throw e;
}
}).handle_exception([] (std::exception_ptr eptr) {
// TODO: handle fault in the open state
});
});
}

View File

@ -14,6 +14,7 @@
#pragma once
#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/shared_future.hh>
@ -36,6 +37,8 @@ using SocketConnectionRef = boost::intrusive_ptr<SocketConnection>;
class SocketConnection : public Connection {
SocketMessenger& messenger;
std::optional<Socket> socket;
Dispatcher& dispatcher;
seastar::gate pending_dispatch;
enum class state_t {
none,
@ -150,9 +153,12 @@ class SocketConnection : public Connection {
seastar::future<> fault();
void execute_open();
public:
SocketConnection(SocketMessenger& messenger,
const entity_addr_t& my_addr);
const entity_addr_t& my_addr,
Dispatcher& dispatcher);
~SocketConnection();
Messenger* get_messenger() const override;
@ -170,13 +176,14 @@ class SocketConnection : public Connection {
seastar::future<> close() override;
public:
/// complete a handshake from the client's perspective
seastar::future<> start_connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type);
/// complete a handshake from the server's perspective
seastar::future<> start_accept(seastar::connected_socket&& socket,
const entity_addr_t& peer_addr);
/// start a handshake from the client's perspective,
/// only call when SocketConnection first construct
void start_connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type);
/// start a handshake from the server's perspective,
/// only call when SocketConnection first construct
void start_accept(seastar::connected_socket&& socket,
const entity_addr_t& peer_addr);
/// read a message from a connection that has completed its handshake
seastar::future<MessageRef> read_message();

View File

@ -40,61 +40,6 @@ void SocketMessenger::bind(const entity_addr_t& addr)
listener = seastar::listen(address, lo);
}
seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn)
{
return seastar::keep_doing([=] {
return conn->read_message()
.then([=] (MessageRef msg) {
// start dispatch, ignoring exceptions from the application layer
seastar::with_gate(pending_dispatch, [=, msg = std::move(msg)] {
return dispatcher->ms_dispatch(conn, std::move(msg))
.handle_exception([] (std::exception_ptr eptr) {});
});
// return immediately to start on the next message
return seastar::now();
});
}).handle_exception_type([=] (const std::system_error& e) {
if (e.code() == error::connection_aborted ||
e.code() == error::connection_reset) {
return seastar::with_gate(pending_dispatch, [=] {
return dispatcher->ms_handle_reset(conn);
});
} else if (e.code() == error::read_eof) {
return seastar::with_gate(pending_dispatch, [=] {
return dispatcher->ms_handle_remote_reset(conn);
});
} else {
throw e;
}
});
}
seastar::future<> SocketMessenger::accept(seastar::connected_socket socket,
seastar::socket_address paddr)
{
// allocate the connection
entity_addr_t peer_addr;
peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
SocketConnectionRef conn = new SocketConnection(*this, get_myaddr());
// initiate the handshake
return conn->start_accept(std::move(socket), peer_addr)
.then([this, conn] {
// notify the dispatcher and allow them to reject the connection
return seastar::with_gate(pending_dispatch, [=] {
return dispatcher->ms_handle_accept(conn);
});
}).handle_exception([conn] (std::exception_ptr eptr) {
// close the connection before returning errors
return seastar::make_exception_future<>(eptr)
.finally([conn] { return conn->close(); });
}).then([this, conn] {
// dispatch messages until the connection closes or the dispatch
// queue shuts down
return dispatch(std::move(conn));
});
}
seastar::future<> SocketMessenger::start(Dispatcher *disp)
{
dispatcher = disp;
@ -105,10 +50,13 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp)
return listener->accept()
.then([this] (seastar::connected_socket socket,
seastar::socket_address paddr) {
// start processing the connection
accept(std::move(socket), paddr)
.handle_exception([] (std::exception_ptr eptr) {});
// allocate the connection
entity_addr_t peer_addr;
peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
// don't wait before accepting another
conn->start_accept(std::move(socket), peer_addr);
});
}).handle_exception_type([this] (const std::system_error& e) {
// stop gracefully on connection_aborted
@ -127,23 +75,8 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe
if (auto found = lookup_conn(peer_addr); found) {
return found;
}
SocketConnectionRef conn = new SocketConnection(*this, get_myaddr());
conn->start_connect(peer_addr, peer_type)
.then([this, conn] {
// notify the dispatcher and allow them to reject the connection
return seastar::with_gate(pending_dispatch, [this, conn] {
return dispatcher->ms_handle_connect(conn);
});
}).handle_exception([conn] (std::exception_ptr eptr) {
// close the connection before returning errors
return seastar::make_exception_future<>(eptr)
.finally([conn] { return conn->close(); });
// TODO: retry on fault
}).then([this, conn] {
// dispatch replies on this connection
dispatch(conn)
.handle_exception([] (std::exception_ptr eptr) {});
});
SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher);
conn->start_connect(peer_addr, peer_type);
return conn;
}
@ -162,9 +95,6 @@ seastar::future<> SocketMessenger::shutdown()
});
}).finally([this] {
ceph_assert(connections.empty());
// closing connections will unblock any dispatchers that were waiting to
// send(). wait for any pending calls to finish
return pending_dispatch.close();
});
}
@ -221,21 +151,3 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn)
ceph_assert(found->second == conn);
connections.erase(found);
}
seastar::future<msgr_tag_t, bufferlist>
SocketMessenger::verify_authorizer(peer_type_t peer_type,
auth_proto_t protocol,
bufferlist& auth)
{
return seastar::with_gate(pending_dispatch, [=, &auth] {
return dispatcher->ms_verify_authorizer(peer_type, protocol, auth);
});
}
seastar::future<std::unique_ptr<AuthAuthorizer>>
SocketMessenger::get_authorizer(peer_type_t peer_type, bool force_new)
{
return seastar::with_gate(pending_dispatch, [=] {
return dispatcher->ms_get_authorizer(peer_type, force_new);
});
}

View File

@ -36,9 +36,6 @@ class SocketMessenger final : public Messenger {
std::set<SocketConnectionRef> accepting_conns;
using Throttle = ceph::thread::Throttle;
ceph::net::PolicySet<Throttle> policy_set;
seastar::gate pending_dispatch;
seastar::future<> dispatch(SocketConnectionRef conn);
seastar::future<> accept(seastar::connected_socket socket,
seastar::socket_address paddr);
@ -55,15 +52,6 @@ class SocketMessenger final : public Messenger {
seastar::future<> shutdown() override;
seastar::future<msgr_tag_t, bufferlist>
verify_authorizer(peer_type_t peer_type,
auth_proto_t protocol,
bufferlist& auth) override;
seastar::future<std::unique_ptr<AuthAuthorizer>>
get_authorizer(peer_type_t peer_type,
bool force_new) override;
public:
void set_default_policy(const SocketPolicy& p);
void set_policy(entity_type_t peer_type, const SocketPolicy& p);