From 0142e5440dc1e3e6370f61747881f04edee730b8 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Thu, 22 Nov 2018 04:39:37 +0800 Subject: [PATCH 1/8] crimson/net: dispatch events in SocketConnection * move dispatch(), and exception handling logics in accept() and connect() from SocketMessenger into SocketConnection, so we can manage the state transition in the same class and at the same abstraction level. * gate the dangling futures in SocketConnection, because the connection's smart_ptr won't be hold by messenger any more during exception handling. * don't return close() inside SocketConnection to prevent recursive gating -- dead lock. Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 118 ++++++++++++++++++++++++---- src/crimson/net/SocketConnection.h | 26 ++++-- src/crimson/net/SocketMessenger.cc | 66 +--------------- src/crimson/net/SocketMessenger.h | 6 +- 4 files changed, 126 insertions(+), 90 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index a78bd5864bb..480e416525a 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -26,6 +26,7 @@ #include "crimson/common/log.h" #include "Config.h" +#include "Dispatcher.h" #include "Errors.h" #include "SocketMessenger.h" @@ -43,15 +44,18 @@ namespace { } SocketConnection::SocketConnection(SocketMessenger& messenger, - const entity_addr_t& my_addr) + const entity_addr_t& my_addr, + Dispatcher& dispatcher) : Connection(my_addr), messenger(messenger), + dispatcher(dispatcher), send_ready(h.promise.get_future()) { } SocketConnection::~SocketConnection() { + ceph_assert(pending_dispatch.is_closed()); // errors were reported to callers of send() ceph_assert(send_ready.available()); send_ready.ignore_ready_future(); @@ -310,10 +314,13 @@ seastar::future<> SocketConnection::close() assert(!close_ready.valid()); if (socket) { - close_ready = socket->close().finally(std::move(cleanup)); + close_ready = socket->close() + .then([this] { + return pending_dispatch.close(); + }).finally(std::move(cleanup)); } else { ceph_assert(state == state_t::connecting); - close_ready = seastar::now(); + close_ready = pending_dispatch.close().finally(std::move(cleanup)); } state = state_t::closing; return close_ready.get_future(); @@ -765,15 +772,8 @@ SocketConnection::repeat_connect() } seastar::future<> -SocketConnection::start_connect(const entity_addr_t& _peer_addr, - const entity_type_t& _peer_type) +SocketConnection::start_connect() { - ceph_assert(state == state_t::none); - ceph_assert(!socket); - peer_addr = _peer_addr; - peer_type = _peer_type; - messenger.register_conn(this); - state = state_t::connecting; return seastar::connect(peer_addr.in4_addr()) .then([this](seastar::connected_socket fd) { if (state == state_t::closing) { @@ -819,16 +819,39 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, }); } -seastar::future<> -SocketConnection::start_accept(seastar::connected_socket&& fd, - const entity_addr_t& _peer_addr) +void +SocketConnection::connect(const entity_addr_t& _peer_addr, + const entity_type_t& _peer_type) { ceph_assert(state == state_t::none); ceph_assert(!socket); peer_addr = _peer_addr; - socket.emplace(std::move(fd)); - messenger.accept_conn(this); - state = state_t::accepting; + peer_type = _peer_type; + messenger.register_conn(this); + state = state_t::connecting; + seastar::with_gate(pending_dispatch, [this] { + return start_connect() + .then([this] { + // notify the dispatcher and allow them to reject the connection + return seastar::with_gate(messenger.pending_dispatch, [this] { + return dispatcher.ms_handle_connect(this); + }); + }).handle_exception([this] (std::exception_ptr eptr) { + // close the connection before returning errors + return seastar::make_exception_future<>(eptr) + .finally([this] { close(); }); + // TODO: retry on fault + }).then([this] { + // dispatch replies on this connection + dispatch() + .handle_exception([] (std::exception_ptr eptr) {}); + }); + }); +} + +seastar::future<> +SocketConnection::start_accept() +{ // encode/send server's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); @@ -862,6 +885,67 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, }); } +seastar::future<> +SocketConnection::accept(seastar::connected_socket&& fd, + const entity_addr_t& _peer_addr) +{ + ceph_assert(state == state_t::none); + ceph_assert(!socket); + peer_addr = _peer_addr; + socket.emplace(std::move(fd)); + messenger.accept_conn(this); + state = state_t::accepting; + return seastar::with_gate(pending_dispatch, [this] { + return start_accept() + .then([this] { + // notify the dispatcher and allow them to reject the connection + return seastar::with_gate(messenger.pending_dispatch, [=] { + return dispatcher.ms_handle_accept(this); + }); + }).handle_exception([this] (std::exception_ptr eptr) { + // close the connection before returning errors + return seastar::make_exception_future<>(eptr) + .finally([this] { close(); }); + }).then([this] { + // dispatch messages until the connection closes or the dispatch + // queue shuts down + return dispatch(); + }); + }); +} + +seastar::future<> +SocketConnection::dispatch() +{ + return seastar::with_gate(pending_dispatch, [this] { + return seastar::keep_doing([=] { + return read_message() + .then([=] (MessageRef msg) { + // start dispatch, ignoring exceptions from the application layer + seastar::with_gate(messenger.pending_dispatch, [=, msg = std::move(msg)] { + return dispatcher.ms_dispatch(this, std::move(msg)) + .handle_exception([] (std::exception_ptr eptr) {}); + }); + // return immediately to start on the next message + return seastar::now(); + }); + }).handle_exception_type([=] (const std::system_error& e) { + if (e.code() == error::connection_aborted || + e.code() == error::connection_reset) { + return seastar::with_gate(messenger.pending_dispatch, [=] { + return dispatcher.ms_handle_reset(this); + }); + } else if (e.code() == error::read_eof) { + return seastar::with_gate(messenger.pending_dispatch, [=] { + return dispatcher.ms_handle_remote_reset(this); + }); + } else { + throw e; + } + }); + }); +} + seastar::future<> SocketConnection::fault() { if (policy.lossy) { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 3054744450d..3413b4d054e 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -36,6 +37,8 @@ using SocketConnectionRef = boost::intrusive_ptr; class SocketConnection : public Connection { SocketMessenger& messenger; std::optional socket; + Dispatcher& dispatcher; + seastar::gate pending_dispatch; enum class state_t { none, @@ -150,9 +153,19 @@ class SocketConnection : public Connection { seastar::future<> fault(); + seastar::future<> dispatch(); + + /// start a handshake from the client's perspective, + /// only call when SocketConnection first construct + seastar::future<> start_connect(); + /// start a handshake from the server's perspective, + /// only call when SocketConnection first construct + seastar::future<> start_accept(); + public: SocketConnection(SocketMessenger& messenger, - const entity_addr_t& my_addr); + const entity_addr_t& my_addr, + Dispatcher& dispatcher); ~SocketConnection(); Messenger* get_messenger() const override; @@ -170,13 +183,10 @@ class SocketConnection : public Connection { seastar::future<> close() override; public: - /// complete a handshake from the client's perspective - seastar::future<> start_connect(const entity_addr_t& peer_addr, - const entity_type_t& peer_type); - - /// complete a handshake from the server's perspective - seastar::future<> start_accept(seastar::connected_socket&& socket, - const entity_addr_t& peer_addr); + void connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); + seastar::future<> accept(seastar::connected_socket&& socket, + const entity_addr_t& peer_addr); /// read a message from a connection that has completed its handshake seastar::future read_message(); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 827267f238d..779f5fe2e5f 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -40,35 +40,6 @@ void SocketMessenger::bind(const entity_addr_t& addr) listener = seastar::listen(address, lo); } -seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn) -{ - return seastar::keep_doing([=] { - return conn->read_message() - .then([=] (MessageRef msg) { - // start dispatch, ignoring exceptions from the application layer - seastar::with_gate(pending_dispatch, [=, msg = std::move(msg)] { - return dispatcher->ms_dispatch(conn, std::move(msg)) - .handle_exception([] (std::exception_ptr eptr) {}); - }); - // return immediately to start on the next message - return seastar::now(); - }); - }).handle_exception_type([=] (const std::system_error& e) { - if (e.code() == error::connection_aborted || - e.code() == error::connection_reset) { - return seastar::with_gate(pending_dispatch, [=] { - return dispatcher->ms_handle_reset(conn); - }); - } else if (e.code() == error::read_eof) { - return seastar::with_gate(pending_dispatch, [=] { - return dispatcher->ms_handle_remote_reset(conn); - }); - } else { - throw e; - } - }); -} - seastar::future<> SocketMessenger::accept(seastar::connected_socket socket, seastar::socket_address paddr) { @@ -76,23 +47,9 @@ seastar::future<> SocketMessenger::accept(seastar::connected_socket socket, entity_addr_t peer_addr; peer_addr.set_type(entity_addr_t::TYPE_DEFAULT); peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr()); + SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); // initiate the handshake - return conn->start_accept(std::move(socket), peer_addr) - .then([this, conn] { - // notify the dispatcher and allow them to reject the connection - return seastar::with_gate(pending_dispatch, [=] { - return dispatcher->ms_handle_accept(conn); - }); - }).handle_exception([conn] (std::exception_ptr eptr) { - // close the connection before returning errors - return seastar::make_exception_future<>(eptr) - .finally([conn] { return conn->close(); }); - }).then([this, conn] { - // dispatch messages until the connection closes or the dispatch - // queue shuts down - return dispatch(std::move(conn)); - }); + return conn->accept(std::move(socket), peer_addr); } seastar::future<> SocketMessenger::start(Dispatcher *disp) @@ -127,23 +84,8 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe if (auto found = lookup_conn(peer_addr); found) { return found; } - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr()); - conn->start_connect(peer_addr, peer_type) - .then([this, conn] { - // notify the dispatcher and allow them to reject the connection - return seastar::with_gate(pending_dispatch, [this, conn] { - return dispatcher->ms_handle_connect(conn); - }); - }).handle_exception([conn] (std::exception_ptr eptr) { - // close the connection before returning errors - return seastar::make_exception_future<>(eptr) - .finally([conn] { return conn->close(); }); - // TODO: retry on fault - }).then([this, conn] { - // dispatch replies on this connection - dispatch(conn) - .handle_exception([] (std::exception_ptr eptr) {}); - }); + SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); + conn->connect(peer_addr, peer_type); return conn; } diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index d2ef0b6456d..077f8a2716e 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -36,9 +36,6 @@ class SocketMessenger final : public Messenger { std::set accepting_conns; using Throttle = ceph::thread::Throttle; ceph::net::PolicySet policy_set; - seastar::gate pending_dispatch; - - seastar::future<> dispatch(SocketConnectionRef conn); seastar::future<> accept(seastar::connected_socket socket, seastar::socket_address paddr); @@ -65,6 +62,9 @@ class SocketMessenger final : public Messenger { bool force_new) override; public: + // TODO: change to per-connection messenger gate + seastar::gate pending_dispatch; + void set_default_policy(const SocketPolicy& p); void set_policy(entity_type_t peer_type, const SocketPolicy& p); void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); From 60d5dd0c2491c8803ccd6eee925848c2eacf3b67 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Thu, 22 Nov 2018 05:09:20 +0800 Subject: [PATCH 2/8] crimson/net: remove unecessary future dependencies for accept/dispatch Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 15 ++++++++------- src/crimson/net/SocketConnection.h | 6 +++--- src/crimson/net/SocketMessenger.cc | 21 ++++++--------------- 3 files changed, 17 insertions(+), 25 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 480e416525a..1359dc89008 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -843,8 +843,7 @@ SocketConnection::connect(const entity_addr_t& _peer_addr, // TODO: retry on fault }).then([this] { // dispatch replies on this connection - dispatch() - .handle_exception([] (std::exception_ptr eptr) {}); + dispatch(); }); }); } @@ -885,7 +884,7 @@ SocketConnection::start_accept() }); } -seastar::future<> +void SocketConnection::accept(seastar::connected_socket&& fd, const entity_addr_t& _peer_addr) { @@ -895,7 +894,7 @@ SocketConnection::accept(seastar::connected_socket&& fd, socket.emplace(std::move(fd)); messenger.accept_conn(this); state = state_t::accepting; - return seastar::with_gate(pending_dispatch, [this] { + seastar::with_gate(pending_dispatch, [this] { return start_accept() .then([this] { // notify the dispatcher and allow them to reject the connection @@ -909,15 +908,15 @@ SocketConnection::accept(seastar::connected_socket&& fd, }).then([this] { // dispatch messages until the connection closes or the dispatch // queue shuts down - return dispatch(); + dispatch(); }); }); } -seastar::future<> +void SocketConnection::dispatch() { - return seastar::with_gate(pending_dispatch, [this] { + seastar::with_gate(pending_dispatch, [this] { return seastar::keep_doing([=] { return read_message() .then([=] (MessageRef msg) { @@ -942,6 +941,8 @@ SocketConnection::dispatch() } else { throw e; } + }).handle_exception([] (std::exception_ptr eptr) { + // TODO: handle fault in the open state }); }); } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 3413b4d054e..bf249d8e46a 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -153,7 +153,7 @@ class SocketConnection : public Connection { seastar::future<> fault(); - seastar::future<> dispatch(); + void dispatch(); /// start a handshake from the client's perspective, /// only call when SocketConnection first construct @@ -185,8 +185,8 @@ class SocketConnection : public Connection { public: void connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type); - seastar::future<> accept(seastar::connected_socket&& socket, - const entity_addr_t& peer_addr); + void accept(seastar::connected_socket&& socket, + const entity_addr_t& peer_addr); /// read a message from a connection that has completed its handshake seastar::future read_message(); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 779f5fe2e5f..6ecf9d3ddef 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -40,18 +40,6 @@ void SocketMessenger::bind(const entity_addr_t& addr) listener = seastar::listen(address, lo); } -seastar::future<> SocketMessenger::accept(seastar::connected_socket socket, - seastar::socket_address paddr) -{ - // allocate the connection - entity_addr_t peer_addr; - peer_addr.set_type(entity_addr_t::TYPE_DEFAULT); - peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); - // initiate the handshake - return conn->accept(std::move(socket), peer_addr); -} - seastar::future<> SocketMessenger::start(Dispatcher *disp) { dispatcher = disp; @@ -62,10 +50,13 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) return listener->accept() .then([this] (seastar::connected_socket socket, seastar::socket_address paddr) { - // start processing the connection - accept(std::move(socket), paddr) - .handle_exception([] (std::exception_ptr eptr) {}); + // allocate the connection + entity_addr_t peer_addr; + peer_addr.set_type(entity_addr_t::TYPE_DEFAULT); + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); // don't wait before accepting another + conn->accept(std::move(socket), peer_addr); }); }).handle_exception_type([this] (const std::system_error& e) { // stop gracefully on connection_aborted From b22cf59c835b0e836f45186d1f3a69342d272376 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Thu, 22 Nov 2018 05:24:31 +0800 Subject: [PATCH 3/8] crimson/net: encapsulate protocol implementations with open state Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 32 +++++++++++------------------ src/crimson/net/SocketConnection.h | 2 +- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 1359dc89008..23a9e3de251 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -809,10 +809,6 @@ SocketConnection::start_connect() return repeat_connect(); }); // TODO: handle errors for state_t::connecting - }).then([this] { - state = state_t::open; - // start background processing of tags - read_tags_until_next_message(); }).then_wrapped([this] (auto fut) { // satisfy the handshake's promise fut.forward_to(std::move(h.promise)); @@ -842,8 +838,7 @@ SocketConnection::connect(const entity_addr_t& _peer_addr, .finally([this] { close(); }); // TODO: retry on fault }).then([this] { - // dispatch replies on this connection - dispatch(); + execute_open(); }); }); } @@ -874,10 +869,6 @@ SocketConnection::start_accept() return repeat_handle_connect(); }); // TODO: handle errors for state_t::accepting - }).then([this] { - state = state_t::open; - // start background processing of tags - read_tags_until_next_message(); }).then_wrapped([this] (auto fut) { // satisfy the handshake's promise fut.forward_to(std::move(h.promise)); @@ -906,36 +897,37 @@ SocketConnection::accept(seastar::connected_socket&& fd, return seastar::make_exception_future<>(eptr) .finally([this] { close(); }); }).then([this] { - // dispatch messages until the connection closes or the dispatch - // queue shuts down - dispatch(); + execute_open(); }); }); } void -SocketConnection::dispatch() +SocketConnection::execute_open() { + state = state_t::open; seastar::with_gate(pending_dispatch, [this] { - return seastar::keep_doing([=] { + // start background processing of tags + read_tags_until_next_message(); + return seastar::keep_doing([this] { return read_message() - .then([=] (MessageRef msg) { + .then([this] (MessageRef msg) { // start dispatch, ignoring exceptions from the application layer - seastar::with_gate(messenger.pending_dispatch, [=, msg = std::move(msg)] { + seastar::with_gate(messenger.pending_dispatch, [this, msg = std::move(msg)] { return dispatcher.ms_dispatch(this, std::move(msg)) .handle_exception([] (std::exception_ptr eptr) {}); }); // return immediately to start on the next message return seastar::now(); }); - }).handle_exception_type([=] (const std::system_error& e) { + }).handle_exception_type([this] (const std::system_error& e) { if (e.code() == error::connection_aborted || e.code() == error::connection_reset) { - return seastar::with_gate(messenger.pending_dispatch, [=] { + return seastar::with_gate(messenger.pending_dispatch, [this] { return dispatcher.ms_handle_reset(this); }); } else if (e.code() == error::read_eof) { - return seastar::with_gate(messenger.pending_dispatch, [=] { + return seastar::with_gate(messenger.pending_dispatch, [this] { return dispatcher.ms_handle_remote_reset(this); }); } else { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index bf249d8e46a..b69d78d3c63 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -153,7 +153,7 @@ class SocketConnection : public Connection { seastar::future<> fault(); - void dispatch(); + void execute_open(); /// start a handshake from the client's perspective, /// only call when SocketConnection first construct From 1643f957915e7be424320b8e7a41486553655b10 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Thu, 22 Nov 2018 05:35:10 +0800 Subject: [PATCH 4/8] crimson/net: don't execute_open() if exception is thrown Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 23a9e3de251..306ec60119a 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -808,7 +808,6 @@ SocketConnection::start_connect() return seastar::repeat([this] { return repeat_connect(); }); - // TODO: handle errors for state_t::connecting }).then_wrapped([this] (auto fut) { // satisfy the handshake's promise fut.forward_to(std::move(h.promise)); @@ -832,13 +831,11 @@ SocketConnection::connect(const entity_addr_t& _peer_addr, return seastar::with_gate(messenger.pending_dispatch, [this] { return dispatcher.ms_handle_connect(this); }); - }).handle_exception([this] (std::exception_ptr eptr) { - // close the connection before returning errors - return seastar::make_exception_future<>(eptr) - .finally([this] { close(); }); - // TODO: retry on fault }).then([this] { execute_open(); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the connecting state + close(); }); }); } @@ -868,7 +865,6 @@ SocketConnection::start_accept() return seastar::repeat([this] { return repeat_handle_connect(); }); - // TODO: handle errors for state_t::accepting }).then_wrapped([this] (auto fut) { // satisfy the handshake's promise fut.forward_to(std::move(h.promise)); @@ -892,12 +888,11 @@ SocketConnection::accept(seastar::connected_socket&& fd, return seastar::with_gate(messenger.pending_dispatch, [=] { return dispatcher.ms_handle_accept(this); }); - }).handle_exception([this] (std::exception_ptr eptr) { - // close the connection before returning errors - return seastar::make_exception_future<>(eptr) - .finally([this] { close(); }); }).then([this] { execute_open(); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the accepting state + close(); }); }); } From 2dbe7499ea153352545ac5d84ea7f0e018c09082 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Thu, 22 Nov 2018 05:40:58 +0800 Subject: [PATCH 5/8] crimson/net: no open connections in accepting_conns It should an atomic operation to move the connection from accepting_conns to connections when switch to the open state. Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 306ec60119a..e0d4c36b915 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -533,8 +533,6 @@ SocketConnection::send_connect_reply_ready(msgr_tag_t tag, return socket->flush(); } }).then([this] { - messenger.register_conn(this); - messenger.unaccept_conn(this); return stop_t::yes; }); } @@ -889,6 +887,8 @@ SocketConnection::accept(seastar::connected_socket&& fd, return dispatcher.ms_handle_accept(this); }); }).then([this] { + messenger.register_conn(this); + messenger.unaccept_conn(this); execute_open(); }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the accepting state From f56fcd095a68653e2eaa899cd64515beadd5be6e Mon Sep 17 00:00:00 2001 From: Yingxin Date: Thu, 22 Nov 2018 06:00:08 +0800 Subject: [PATCH 6/8] crimson/net: encapsulate protocol implementations with accept/connect Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 152 +++++++++++++--------------- src/crimson/net/SocketConnection.h | 19 ++-- src/crimson/net/SocketMessenger.cc | 4 +- 3 files changed, 81 insertions(+), 94 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index e0d4c36b915..e09d2b408e5 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -769,52 +769,9 @@ SocketConnection::repeat_connect() }); } -seastar::future<> -SocketConnection::start_connect() -{ - return seastar::connect(peer_addr.in4_addr()) - .then([this](seastar::connected_socket fd) { - if (state == state_t::closing) { - fd.shutdown_input(); - fd.shutdown_output(); - throw std::system_error(make_error_code(error::connection_aborted)); - } - socket.emplace(std::move(fd)); - // read server's handshake header - return socket->read(server_header_size); - }).then([this] (bufferlist headerbl) { - auto p = headerbl.cbegin(); - validate_banner(p); - entity_addr_t saddr, caddr; - ::decode(saddr, p); - ::decode(caddr, p); - ceph_assert(p.end()); - validate_peer_addr(saddr, peer_addr); - - if (my_addr != caddr) { - // take peer's address for me, but preserve my nonce - caddr.nonce = my_addr.nonce; - my_addr = caddr; - } - // encode/send client's handshake header - bufferlist bl; - bl.append(buffer::create_static(banner_size, banner)); - ::encode(my_addr, bl, 0); - h.global_seq = messenger.get_global_seq(); - return socket->write_flush(std::move(bl)); - }).then([=] { - return seastar::repeat([this] { - return repeat_connect(); - }); - }).then_wrapped([this] (auto fut) { - // satisfy the handshake's promise - fut.forward_to(std::move(h.promise)); - }); -} - void -SocketConnection::connect(const entity_addr_t& _peer_addr, - const entity_type_t& _peer_type) +SocketConnection::start_connect(const entity_addr_t& _peer_addr, + const entity_type_t& _peer_type) { ceph_assert(state == state_t::none); ceph_assert(!socket); @@ -823,8 +780,46 @@ SocketConnection::connect(const entity_addr_t& _peer_addr, messenger.register_conn(this); state = state_t::connecting; seastar::with_gate(pending_dispatch, [this] { - return start_connect() - .then([this] { + return seastar::connect(peer_addr.in4_addr()) + .then([this](seastar::connected_socket fd) { + if (state == state_t::closing) { + fd.shutdown_input(); + fd.shutdown_output(); + throw std::system_error(make_error_code(error::connection_aborted)); + } + socket.emplace(std::move(fd)); + // read server's handshake header + return socket->read(server_header_size); + }).then([this] (bufferlist headerbl) { + auto p = headerbl.cbegin(); + validate_banner(p); + entity_addr_t saddr, caddr; + ::decode(saddr, p); + ::decode(caddr, p); + ceph_assert(p.end()); + validate_peer_addr(saddr, peer_addr); + + if (my_addr != caddr) { + // take peer's address for me, but preserve my nonce + caddr.nonce = my_addr.nonce; + my_addr = caddr; + } + // encode/send client's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(my_addr, bl, 0); + h.global_seq = messenger.get_global_seq(); + return socket->write_flush(std::move(bl)); + }).then([=] { + return seastar::repeat([this] { + return repeat_connect(); + }); + }).then_wrapped([this] (auto fut) { + // TODO: do not forward the exception + // and let the reconnect happen transparently inside connection + // satisfy the handshake's promise + fut.forward_to(std::move(h.promise)); + }).then([this] { // notify the dispatcher and allow them to reject the connection return seastar::with_gate(messenger.pending_dispatch, [this] { return dispatcher.ms_handle_connect(this); @@ -838,40 +833,9 @@ SocketConnection::connect(const entity_addr_t& _peer_addr, }); } -seastar::future<> -SocketConnection::start_accept() -{ - // encode/send server's handshake header - bufferlist bl; - bl.append(buffer::create_static(banner_size, banner)); - ::encode(my_addr, bl, 0); - ::encode(peer_addr, bl, 0); - return socket->write_flush(std::move(bl)) - .then([this] { - // read client's handshake header and connect request - return socket->read(client_header_size); - }).then([this] (bufferlist bl) { - auto p = bl.cbegin(); - validate_banner(p); - entity_addr_t addr; - ::decode(addr, p); - ceph_assert(p.end()); - if (!addr.is_blank_ip()) { - peer_addr = addr; - } - }).then([this] { - return seastar::repeat([this] { - return repeat_handle_connect(); - }); - }).then_wrapped([this] (auto fut) { - // satisfy the handshake's promise - fut.forward_to(std::move(h.promise)); - }); -} - void -SocketConnection::accept(seastar::connected_socket&& fd, - const entity_addr_t& _peer_addr) +SocketConnection::start_accept(seastar::connected_socket&& fd, + const entity_addr_t& _peer_addr) { ceph_assert(state == state_t::none); ceph_assert(!socket); @@ -880,8 +844,34 @@ SocketConnection::accept(seastar::connected_socket&& fd, messenger.accept_conn(this); state = state_t::accepting; seastar::with_gate(pending_dispatch, [this] { - return start_accept() + // encode/send server's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(my_addr, bl, 0); + ::encode(peer_addr, bl, 0); + return socket->write_flush(std::move(bl)) .then([this] { + // read client's handshake header and connect request + return socket->read(client_header_size); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + validate_banner(p); + entity_addr_t addr; + ::decode(addr, p); + ceph_assert(p.end()); + if (!addr.is_blank_ip()) { + peer_addr = addr; + } + }).then([this] { + return seastar::repeat([this] { + return repeat_handle_connect(); + }); + }).then_wrapped([this] (auto fut) { + // TODO: do not forward the exception + // and let the reconnect happen transparently inside connection + // satisfy the handshake's promise + fut.forward_to(std::move(h.promise)); + }).then([this] { // notify the dispatcher and allow them to reject the connection return seastar::with_gate(messenger.pending_dispatch, [=] { return dispatcher.ms_handle_accept(this); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index b69d78d3c63..effb594c14f 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -155,13 +155,6 @@ class SocketConnection : public Connection { void execute_open(); - /// start a handshake from the client's perspective, - /// only call when SocketConnection first construct - seastar::future<> start_connect(); - /// start a handshake from the server's perspective, - /// only call when SocketConnection first construct - seastar::future<> start_accept(); - public: SocketConnection(SocketMessenger& messenger, const entity_addr_t& my_addr, @@ -183,10 +176,14 @@ class SocketConnection : public Connection { seastar::future<> close() override; public: - void connect(const entity_addr_t& peer_addr, - const entity_type_t& peer_type); - void accept(seastar::connected_socket&& socket, - const entity_addr_t& peer_addr); + /// start a handshake from the client's perspective, + /// only call when SocketConnection first construct + void start_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); + /// start a handshake from the server's perspective, + /// only call when SocketConnection first construct + void start_accept(seastar::connected_socket&& socket, + const entity_addr_t& peer_addr); /// read a message from a connection that has completed its handshake seastar::future read_message(); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 6ecf9d3ddef..8ec2db7b293 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -56,7 +56,7 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); // don't wait before accepting another - conn->accept(std::move(socket), peer_addr); + conn->start_accept(std::move(socket), peer_addr); }); }).handle_exception_type([this] (const std::system_error& e) { // stop gracefully on connection_aborted @@ -76,7 +76,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe return found; } SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); - conn->connect(peer_addr, peer_type); + conn->start_connect(peer_addr, peer_type); return conn; } From 03d435f79d9a556905361b4e3c813c72ad379ef3 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Thu, 22 Nov 2018 06:09:49 +0800 Subject: [PATCH 7/8] crimson/net: remove the messenger gate There is no need to use messenger gate if SocketConnection themselves are properly gated. Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 18 +++++------------- src/crimson/net/SocketMessenger.cc | 11 ++--------- src/crimson/net/SocketMessenger.h | 3 --- 3 files changed, 7 insertions(+), 25 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index e09d2b408e5..6afd6b2d6b1 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -821,9 +821,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, fut.forward_to(std::move(h.promise)); }).then([this] { // notify the dispatcher and allow them to reject the connection - return seastar::with_gate(messenger.pending_dispatch, [this] { - return dispatcher.ms_handle_connect(this); - }); + return dispatcher.ms_handle_connect(this); }).then([this] { execute_open(); }).handle_exception([this] (std::exception_ptr eptr) { @@ -873,9 +871,7 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, fut.forward_to(std::move(h.promise)); }).then([this] { // notify the dispatcher and allow them to reject the connection - return seastar::with_gate(messenger.pending_dispatch, [=] { - return dispatcher.ms_handle_accept(this); - }); + return dispatcher.ms_handle_accept(this); }).then([this] { messenger.register_conn(this); messenger.unaccept_conn(this); @@ -898,7 +894,7 @@ SocketConnection::execute_open() return read_message() .then([this] (MessageRef msg) { // start dispatch, ignoring exceptions from the application layer - seastar::with_gate(messenger.pending_dispatch, [this, msg = std::move(msg)] { + seastar::with_gate(pending_dispatch, [this, msg = std::move(msg)] { return dispatcher.ms_dispatch(this, std::move(msg)) .handle_exception([] (std::exception_ptr eptr) {}); }); @@ -908,13 +904,9 @@ SocketConnection::execute_open() }).handle_exception_type([this] (const std::system_error& e) { if (e.code() == error::connection_aborted || e.code() == error::connection_reset) { - return seastar::with_gate(messenger.pending_dispatch, [this] { - return dispatcher.ms_handle_reset(this); - }); + return dispatcher.ms_handle_reset(this); } else if (e.code() == error::read_eof) { - return seastar::with_gate(messenger.pending_dispatch, [this] { - return dispatcher.ms_handle_remote_reset(this); - }); + return dispatcher.ms_handle_remote_reset(this); } else { throw e; } diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 8ec2db7b293..3791463455d 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -95,9 +95,6 @@ seastar::future<> SocketMessenger::shutdown() }); }).finally([this] { ceph_assert(connections.empty()); - // closing connections will unblock any dispatchers that were waiting to - // send(). wait for any pending calls to finish - return pending_dispatch.close(); }); } @@ -160,15 +157,11 @@ SocketMessenger::verify_authorizer(peer_type_t peer_type, auth_proto_t protocol, bufferlist& auth) { - return seastar::with_gate(pending_dispatch, [=, &auth] { - return dispatcher->ms_verify_authorizer(peer_type, protocol, auth); - }); + return dispatcher->ms_verify_authorizer(peer_type, protocol, auth); } seastar::future> SocketMessenger::get_authorizer(peer_type_t peer_type, bool force_new) { - return seastar::with_gate(pending_dispatch, [=] { - return dispatcher->ms_get_authorizer(peer_type, force_new); - }); + return dispatcher->ms_get_authorizer(peer_type, force_new); } diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 077f8a2716e..5977ba3a892 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -62,9 +62,6 @@ class SocketMessenger final : public Messenger { bool force_new) override; public: - // TODO: change to per-connection messenger gate - seastar::gate pending_dispatch; - void set_default_policy(const SocketPolicy& p); void set_policy(entity_type_t peer_type, const SocketPolicy& p); void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); From 1e6a0a9a423371efd4d201d279e1e4165e9a6077 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Fri, 21 Dec 2018 03:30:02 +0800 Subject: [PATCH 8/8] crimson/net: let conn directly verify the authorizer with dispatcher Signed-off-by: Yingxin --- src/crimson/net/Messenger.h | 9 --------- src/crimson/net/SocketConnection.cc | 10 +++++----- src/crimson/net/SocketMessenger.cc | 14 -------------- src/crimson/net/SocketMessenger.h | 9 --------- 4 files changed, 5 insertions(+), 37 deletions(-) diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index fcdd5ad32a6..0d8484fd2c2 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -62,15 +62,6 @@ class Messenger { return ++global_seq; } - // @returns a tuple of - virtual seastar::future /// auth_reply - verify_authorizer(peer_type_t peer_type, - auth_proto_t protocol, - bufferlist& auth) = 0; - virtual seastar::future> - get_authorizer(peer_type_t peer_type, - bool force_new) = 0; uint32_t get_crc_flags() const { return crc_flags; } diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 6afd6b2d6b1..ef8281d5a79 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -460,9 +460,9 @@ SocketConnection::repeat_handle_connect() return seastar::make_ready_future( CEPH_MSGR_TAG_FEATURES, bufferlist{}); } - return messenger.verify_authorizer(peer_type, - h.connect.authorizer_protocol, - authorizer); + return dispatcher.ms_verify_authorizer(peer_type, + h.connect.authorizer_protocol, + authorizer); }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) { memset(&h.reply, 0, sizeof(h.reply)); if (tag) { @@ -642,7 +642,7 @@ SocketConnection::handle_connect_reply(msgr_tag_t tag) } h.got_bad_auth = true; // try harder - return messenger.get_authorizer(peer_type, true) + return dispatcher.ms_get_authorizer(peer_type, true) .then([this](auto&& auth) { h.authorizer = std::move(auth); return stop_t::no; @@ -734,7 +734,7 @@ SocketConnection::repeat_connect() // this is fyi, actually, server decides! h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; - return messenger.get_authorizer(peer_type, false) + return dispatcher.ms_get_authorizer(peer_type, false) .then([this](auto&& auth) { h.authorizer = std::move(auth); bufferlist bl; diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 3791463455d..75c2870f3f8 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -151,17 +151,3 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn) ceph_assert(found->second == conn); connections.erase(found); } - -seastar::future -SocketMessenger::verify_authorizer(peer_type_t peer_type, - auth_proto_t protocol, - bufferlist& auth) -{ - return dispatcher->ms_verify_authorizer(peer_type, protocol, auth); -} - -seastar::future> -SocketMessenger::get_authorizer(peer_type_t peer_type, bool force_new) -{ - return dispatcher->ms_get_authorizer(peer_type, force_new); -} diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 5977ba3a892..c348f5920b3 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -52,15 +52,6 @@ class SocketMessenger final : public Messenger { seastar::future<> shutdown() override; - seastar::future - verify_authorizer(peer_type_t peer_type, - auth_proto_t protocol, - bufferlist& auth) override; - - seastar::future> - get_authorizer(peer_type_t peer_type, - bool force_new) override; - public: void set_default_policy(const SocketPolicy& p); void set_policy(entity_type_t peer_type, const SocketPolicy& p);