diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index bd40d16e709..9134c260213 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -40,9 +40,7 @@ seastar::future<> Client::stop() { return gate.close().then([this] { if (conn) { - return conn->close(); - } else { - return seastar::now(); + conn->mark_down(); } }); } @@ -85,8 +83,7 @@ seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c) seastar::future<> Client::reconnect() { if (conn) { - // crimson::net::Protocol::close() is able to close() in background - (void)conn->close(); + conn->mark_down(); conn = {}; } if (!mgrmap.get_available()) { diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 03d57bb71aa..00180370137 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -79,7 +79,7 @@ public: const std::vector& allowed_modes); // v1 and v2 - seastar::future<> close(); + void close(); bool is_my_peer(const entity_addr_t& addr) const; AuthAuthorizer* get_authorizer(entity_type_t peer) const; KeyStore& get_keys(); @@ -427,16 +427,14 @@ int Connection::handle_auth_bad_method(uint32_t old_auth_method, return 0; } -seastar::future<> Connection::close() +void Connection::close() { reply.set_value(Ref(nullptr)); reply = {}; auth_done.set_value(AuthResult::canceled); auth_done = {}; if (conn && !std::exchange(closed, true)) { - return conn->close(); - } else { - return seastar::now(); + conn->mark_down(); } } @@ -551,7 +549,8 @@ seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn) }); if (found != pending_conns.end()) { logger().warn("pending conn reset by {}", conn->get_peer_addr()); - return (*found)->close(); + (*found)->close(); + return seastar::now(); } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) { logger().warn("active conn reset {}", conn->get_peer_addr()); active_con.reset(); @@ -920,9 +919,7 @@ seastar::future<> Client::stop() return tick_gate.close().then([this] { timer.cancel(); if (active_con) { - return active_con->close(); - } else { - return seastar::now(); + active_con->close(); } }); } @@ -953,9 +950,8 @@ seastar::future<> Client::reopen_session(int rank) } else { return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys) .handle_exception([conn](auto ep) { - return conn->close().then([ep=std::move(ep)](){ - return seastar::make_exception_future(ep); - }); + conn->mark_down(); + return seastar::make_exception_future(ep); }); } }).then([peer, this](auto result) { @@ -986,21 +982,13 @@ seastar::future<> Client::reopen_session(int rank) ceph_assert(!active_con && !pending_conns.empty()); active_con = std::move(*found); found->reset(); - auto ret = seastar::do_with( - std::move(pending_conns), - [](auto &pending_conns) { - return seastar::parallel_for_each( - pending_conns, - [] (auto &conn) { - if (!conn) { - return seastar::now(); - } else { - return conn->close(); - } - }); - }); + for (auto& conn : pending_conns) { + if (conn) { + conn->close(); + } + } pending_conns.clear(); - return ret; + return seastar::now(); }).then([]() { logger().debug("reopen_session mon connection attempts complete"); }).handle_exception([](auto ep) { diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index d43d61b699e..04c57cb120f 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -95,6 +95,8 @@ class Connection : public seastar::enable_shared_from_this { #ifdef UNIT_TESTS_BUILT virtual bool is_closed() const = 0; + virtual bool is_closed_clean() const = 0; + virtual bool peer_wins() const = 0; #endif @@ -105,10 +107,9 @@ class Connection : public seastar::enable_shared_from_this { /// handshake virtual seastar::future<> keepalive() = 0; - // close the connection and cancel any any pending futures from read/send - // Note it's OK to discard the returned future because Messenger::shutdown() - // will wait for all connections closed - virtual seastar::future<> close() = 0; + // close the connection and cancel any any pending futures from read/send, + // without dispatching any reset event + virtual void mark_down() = 0; virtual void print(ostream& out) const = 0; diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index bf2633c1c22..3d01f26e50d 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -7,6 +7,7 @@ #include "crimson/common/log.h" #include "crimson/net/Errors.h" +#include "crimson/net/Dispatcher.h" #include "crimson/net/Socket.h" #include "crimson/net/SocketConnection.h" #include "msg/Message.h" @@ -39,37 +40,62 @@ bool Protocol::is_connected() const return write_state == write_state_t::open; } -seastar::future<> Protocol::close() +void Protocol::close(bool dispatch_reset, + std::optional> f_accept_new) { if (closed) { // already closing - assert(close_ready.valid()); - return close_ready.get_future(); + return; } + logger().info("{} closing: reset {}, replace {}", conn, + dispatch_reset ? "yes" : "no", + f_accept_new ? "yes" : "no"); + // unregister_conn() drops a reference, so hold another until completion auto cleanup = [conn_ref = conn.shared_from_this(), this] { logger().debug("{} closed!", conn); +#ifdef UNIT_TESTS_BUILT + is_closed_clean = true; + if (conn.interceptor) { + conn.interceptor->register_conn_closed(conn); + } +#endif }; + // atomic operations + closed = true; trigger_close(); - - // close_ready become valid only after state is state_t::closing - assert(!close_ready.valid()); - + if (f_accept_new) { + (*f_accept_new)(); + } if (socket) { socket->shutdown(); - close_ready = pending_dispatch.close().finally([this] { - return socket->close(); - }).finally(std::move(cleanup)); - } else { - close_ready = pending_dispatch.close().finally(std::move(cleanup)); } - - closed = true; set_write_state(write_state_t::drop); + auto gate_closed = pending_dispatch.close(); + auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset] { + if (dispatch_reset) { + return dispatcher.ms_handle_reset( + seastar::static_pointer_cast(conn.shared_from_this())); + } + return seastar::now(); + }).handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_reset caught exception: {}", conn, eptr); + ceph_abort("unexpected exception from ms_handle_reset()"); + }); - return close_ready.get_future(); + // asynchronous operations + assert(!close_ready.valid()); + close_ready = seastar::when_all_succeed( + std::move(gate_closed).finally([this] { + if (socket) { + return socket->close(); + } + return seastar::now(); + }), + std::move(reset_dispatched) + ).finally(std::move(cleanup)); } seastar::future<> Protocol::send(MessageRef msg) @@ -289,13 +315,8 @@ void Protocol::write_event() case write_state_t::open: [[fallthrough]]; case write_state_t::delay: - (void) seastar::with_gate(pending_dispatch, [this] { - return do_write_dispatch_sweep( - ).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} do_write_dispatch_sweep(): unexpected exception {}", - conn, eptr); - ceph_abort(); - }); + gated_dispatch("do_write_dispatch_sweep", [this] { + return do_write_dispatch_sweep(); }); return; case write_state_t::drop: diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 4df2549c37c..b765b23539d 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -6,6 +6,7 @@ #include #include +#include "crimson/common/log.h" #include "Fwd.h" #include "SocketConnection.h" @@ -24,10 +25,20 @@ class Protocol { bool is_connected() const; +#ifdef UNIT_TESTS_BUILT + bool is_closed_clean = false; bool is_closed() const { return closed; } +#endif // Reentrant closing - seastar::future<> close(); + void close(bool dispatch_reset, std::optional> f_accept_new=std::nullopt); + seastar::future<> close_clean(bool dispatch_reset) { + close(dispatch_reset); + // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset() + // which will otherwise result in deadlock + assert(close_ready.valid()); + return close_ready.get_future(); + } virtual void start_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) = 0; @@ -53,19 +64,29 @@ class Protocol { public: const proto_t proto_type; + SocketRef socket; protected: + template + void gated_dispatch(const char* what, Func&& func) { + (void) seastar::with_gate(pending_dispatch, std::forward(func) + ).handle_exception([this, what] (std::exception_ptr eptr) { + crimson::get_logger(ceph_subsys_ms).error( + "{} gated_dispatch() {} caught exception: {}", conn, what, eptr); + ceph_abort("unexpected exception from gated_dispatch()"); + }); + } + Dispatcher &dispatcher; SocketConnection &conn; - SocketRef socket; - seastar::gate pending_dispatch; AuthConnectionMetaRef auth_meta; private: bool closed = false; // become valid only after closed == true seastar::shared_future<> close_ready; + seastar::gate pending_dispatch; // the write state-machine public: diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 3eac0d2c924..b22ce30653a 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -309,8 +309,6 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, state = state_t::connecting; set_write_state(write_state_t::delay); - // we don't know my ephemeral port yet - conn.set_ephemeral_port(0, SocketConnection::side_t::none); ceph_assert(!socket); conn.peer_addr = _peer_addr; conn.target_addr = _peer_addr; @@ -318,7 +316,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, conn.policy = messenger.get_policy(_peer_type); messenger.register_conn( seastar::static_pointer_cast(conn.shared_from_this())); - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("start_connect", [this] { return Socket::connect(conn.peer_addr) .then([this](SocketRef sock) { socket = std::move(sock); @@ -347,8 +345,10 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, throw std::system_error( make_error_code(crimson::net::error::bad_peer_address)); } - conn.set_ephemeral_port(caddr.get_port(), - SocketConnection::side_t::connector); + if (state != state_t::connecting) { + throw std::system_error(make_error_code(error::protocol_aborted)); + } + socket->learn_ephemeral_port_as_connector(caddr.get_port()); if (unlikely(caddr.is_msgr2())) { logger().warn("{} peer sent a v2 address for me: {}", conn, caddr); @@ -377,7 +377,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the connecting state logger().warn("{} connecting fault: {}", conn, eptr); - (void) close(); + close(true); }); }); } @@ -466,7 +466,7 @@ seastar::future ProtocolV1::replace_existing( // will all be performed using v2 protocol. ceph_abort("lossless policy not supported for v1"); } - (void) existing->close(); + existing->protocol->close(true); return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); } @@ -583,7 +583,8 @@ seastar::future ProtocolV1::repeat_handle_connect() logger().warn("{} existing {} proto version is {} not 1, close existing", conn, *existing, static_cast(existing->protocol->proto_type)); - (void) existing->close(); + // NOTE: this is following async messenger logic, but we may miss the reset event. + existing->mark_down(); } else { return handle_connect_with_existing(existing, std::move(authorizer_reply)); } @@ -612,12 +613,10 @@ void ProtocolV1::start_accept(SocketRef&& sock, ceph_assert(!socket); // until we know better conn.target_addr = _peer_addr; - conn.set_ephemeral_port(_peer_addr.get_port(), - SocketConnection::side_t::acceptor); socket = std::move(sock); messenger.accept_conn( seastar::static_pointer_cast(conn.shared_from_this())); - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("start_accept", [this] { // stop learning my_addr before sending it out, so it won't change return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] { // encode/send server's handshake header @@ -663,7 +662,7 @@ void ProtocolV1::start_accept(SocketRef&& sock, }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the accepting state logger().warn("{} accepting fault: {}", conn, eptr); - (void) close(); + close(false); }); }); } @@ -847,15 +846,11 @@ seastar::future<> ProtocolV1::read_message() } // start dispatch, ignoring exceptions from the application layer - (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { - logger().debug("{} <== #{} === {} ({})", - conn, msg->get_seq(), *msg, msg->get_type()); - return dispatcher.ms_dispatch(&conn, std::move(msg)) - .handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_dispatch caught exception: {}", conn, eptr); - ceph_assert(false); - }); - }); + gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] { + logger().debug("{} <== #{} === {} ({})", + conn, msg->get_seq(), *msg, msg->get_type()); + return dispatcher.ms_dispatch(&conn, std::move(msg)); + }); }); } @@ -894,31 +889,23 @@ void ProtocolV1::execute_open() state = state_t::open; set_write_state(write_state_t::open); - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("execute_open", [this] { // start background processing of tags return handle_tags() .handle_exception_type([this] (const std::system_error& e) { logger().warn("{} open fault: {}", conn, e); if (e.code() == error::protocol_aborted || - e.code() == std::errc::connection_reset) { - return dispatcher.ms_handle_reset( - seastar::static_pointer_cast(conn.shared_from_this())) - .then([this] { - (void) close(); - }); - } else if (e.code() == error::read_eof) { - return dispatcher.ms_handle_remote_reset( - seastar::static_pointer_cast(conn.shared_from_this())) - .then([this] { - (void) close(); - }); + e.code() == std::errc::connection_reset || + e.code() == error::read_eof) { + close(true); + return seastar::now(); } else { throw e; } }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the open state logger().warn("{} open fault: {}", conn, eptr); - (void) close(); + close(true); }); }); } diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index befbaa5862e..b6d35514d59 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -62,8 +62,8 @@ void abort_protocol() { throw std::system_error(make_error_code(crimson::net::error::protocol_aborted)); } -void abort_in_close(crimson::net::ProtocolV2& proto) { - (void) proto.close(); +void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) { + proto.close(dispatch_reset); abort_protocol(); } @@ -180,8 +180,6 @@ void ProtocolV2::start_accept(SocketRef&& sock, ceph_assert(!socket); // until we know better conn.target_addr = _peer_addr; - conn.set_ephemeral_port(_peer_addr.get_port(), - SocketConnection::side_t::acceptor); socket = std::move(sock); logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr); messenger.accept_conn( @@ -413,8 +411,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e if (conn.policy.lossy) { logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}", conn, func_name, get_state_name(state), eptr); - dispatch_reset(); - (void) close(); + close(true); } else if (conn.policy.server || (conn.policy.standby && (!is_queued() && conn.sent.empty()))) { @@ -432,17 +429,6 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e } } -void ProtocolV2::dispatch_reset() -{ - (void) seastar::with_gate(pending_dispatch, [this] { - return dispatcher.ms_handle_reset( - seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_reset caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_reset()"); - }); -} - void ProtocolV2::reset_session(bool full) { server_cookie = 0; @@ -452,17 +438,14 @@ void ProtocolV2::reset_session(bool full) client_cookie = generate_client_cookie(); peer_global_seq = 0; reset_write(); - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("ms_handle_remote_reset", [this] { return dispatcher.ms_handle_remote_reset( seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_remote_reset caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_remote_reset()"); }); } } -seastar::future ProtocolV2::banner_exchange() +seastar::future ProtocolV2::banner_exchange(bool is_connect) { // 1. prepare and send banner bufferlist banner_payload; @@ -515,7 +498,7 @@ seastar::future ProtocolV2::banner_exchange() logger().debug("{} GOT banner: payload_len={}", conn, payload_len); INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ); return read(payload_len); - }).then([this] (bufferlist bl) { + }).then([this, is_connect] (bufferlist bl) { // 4. process peer banner_payload and send HelloFrame auto p = bl.cbegin(); uint64_t peer_supported_features; @@ -538,13 +521,13 @@ seastar::future ProtocolV2::banner_exchange() logger().error("{} peer does not support all required features" " required={} peer_supported={}", conn, required_features, peer_supported_features); - abort_in_close(*this); + abort_in_close(*this, is_connect); } if ((supported_features & peer_required_features) != peer_required_features) { logger().error("{} we do not support all peer required features" " peer_required={} supported={}", conn, peer_required_features, supported_features); - abort_in_close(*this); + abort_in_close(*this, is_connect); } this->peer_required_features = peer_required_features; if (this->peer_required_features == 0) { @@ -668,8 +651,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods }); } catch (const crimson::auth::error& e) { logger().error("{} get_initial_auth_request returned {}", conn, e); - dispatch_reset(); - abort_in_close(*this); + abort_in_close(*this, true); return seastar::now(); } } @@ -863,9 +845,7 @@ void ProtocolV2::execute_connecting() if (socket) { socket->shutdown(); } - execution_done = seastar::with_gate(pending_dispatch, [this] { - // we don't know my socket_port yet - conn.set_ephemeral_port(0, SocketConnection::side_t::none); + gated_execute("execute_connecting", [this] { return messenger.get_global_seq().then([this] (auto gs) { global_seq = gs; assert(client_cookie != 0); @@ -887,7 +867,8 @@ void ProtocolV2::execute_connecting() abort_protocol(); } if (socket) { - (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable { + gated_dispatch("close_sockect_connecting", + [sock = std::move(socket)] () mutable { return sock->close().then([sock = std::move(sock)] {}); }); } @@ -908,18 +889,21 @@ void ProtocolV2::execute_connecting() auth_meta = seastar::make_lw_shared(); session_stream_handlers = { nullptr, nullptr }; enable_recording(); - return banner_exchange(); + return banner_exchange(true); }).then([this] (entity_type_t _peer_type, entity_addr_t _my_addr_from_peer) { if (conn.get_peer_type() != _peer_type) { logger().warn("{} connection peer type does not match what peer advertises {} != {}", conn, ceph_entity_type_name(conn.get_peer_type()), ceph_entity_type_name(_peer_type)); - dispatch_reset(); - abort_in_close(*this); + abort_in_close(*this, true); } - conn.set_ephemeral_port(_my_addr_from_peer.get_port(), - SocketConnection::side_t::connector); + if (unlikely(state != state_t::CONNECTING)) { + logger().debug("{} triggered {} during banner_exchange(), abort", + conn, get_state_name(state)); + abort_protocol(); + } + socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port()); if (unlikely(_my_addr_from_peer.is_legacy())) { logger().warn("{} peer sent a legacy address for me: {}", conn, _my_addr_from_peer); @@ -946,12 +930,9 @@ void ProtocolV2::execute_connecting() } switch (next) { case next_step_t::ready: { - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("ms_handle_connect", [this] { return dispatcher.ms_handle_connect( seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_connect caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_connect()"); }); logger().info("{} connected:" " gs={}, pgs={}, cs={}, client_cookie={}," @@ -1128,7 +1109,7 @@ ProtocolV2::reuse_connection( // close this connection because all the necessary information is delivered // to the exisiting connection, and jump to error handling code to abort the // current state. - abort_in_close(*this); + abort_in_close(*this, false); return seastar::make_ready_future(next_step_t::none); } @@ -1172,15 +1153,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn) logger().warn("{} server_connect:" " existing connection {} is a lossy channel. Close existing in favor of" " this connection", conn, *existing_conn); - existing_proto->dispatch_reset(); - (void) existing_proto->close(); - - if (unlikely(state != state_t::ACCEPTING)) { - logger().debug("{} triggered {} in execute_accepting()", - conn, get_state_name(state)); - abort_protocol(); - } - execute_establishing(); + execute_establishing(existing_conn, true); return seastar::make_ready_future(next_step_t::ready); } @@ -1304,24 +1277,21 @@ ProtocolV2::server_connect() SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr); + bool dispatch_reset = true; if (existing_conn) { if (existing_conn->protocol->proto_type != proto_t::v2) { logger().warn("{} existing connection {} proto version is {}, close existing", conn, *existing_conn, static_cast(existing_conn->protocol->proto_type)); // should unregister the existing from msgr atomically - (void) existing_conn->close(); + // NOTE: this is following async messenger logic, but we may miss the reset event. + dispatch_reset = false; } else { return handle_existing_connection(existing_conn); } } - if (unlikely(state != state_t::ACCEPTING)) { - logger().debug("{} triggered {} in execute_accepting()", - conn, get_state_name(state)); - abort_protocol(); - } - execute_establishing(); + execute_establishing(existing_conn, dispatch_reset); return seastar::make_ready_future(next_step_t::ready); }); } @@ -1419,7 +1389,8 @@ ProtocolV2::server_reconnect() "close existing and reset client.", conn, *existing_conn, static_cast(existing_conn->protocol->proto_type)); - (void) existing_conn->close(); + // NOTE: this is following async messenger logic, but we may miss the reset event. + existing_conn->mark_down(); return send_reset(true); } @@ -1512,13 +1483,13 @@ ProtocolV2::server_reconnect() void ProtocolV2::execute_accepting() { trigger_state(state_t::ACCEPTING, write_state_t::none, false); - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("execute_accepting", [this] { return seastar::futurize_apply([this] { INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); auth_meta = seastar::make_lw_shared(); session_stream_handlers = { nullptr, nullptr }; enable_recording(); - return banner_exchange(); + return banner_exchange(false); }).then([this] (entity_type_t _peer_type, entity_addr_t _my_addr_from_peer) { ceph_assert(conn.get_peer_type() == 0); @@ -1573,7 +1544,7 @@ void ProtocolV2::execute_accepting() }).handle_exception([this] (std::exception_ptr eptr) { logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}", conn, get_state_name(state), eptr); - (void) close(); + close(false); }); }); } @@ -1615,22 +1586,36 @@ seastar::future<> ProtocolV2::finish_auth() // ESTABLISHING -void ProtocolV2::execute_establishing() { +void ProtocolV2::execute_establishing( + SocketConnectionRef existing_conn, bool dispatch_reset) { + if (unlikely(state != state_t::ACCEPTING)) { + logger().debug("{} triggered {} before execute_establishing()", + conn, get_state_name(state)); + abort_protocol(); + } + + auto accept_me = [this] { + messenger.register_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + messenger.unaccept_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + }; + trigger_state(state_t::ESTABLISHING, write_state_t::delay, false); - (void) seastar::with_gate(pending_dispatch, [this] { + if (existing_conn) { + existing_conn->protocol->close(dispatch_reset, std::move(accept_me)); + } else { + accept_me(); + } + + gated_dispatch("ms_handle_accept_establishing", [this] { return dispatcher.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_accept caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_accept()"); }); - messenger.register_conn( - seastar::static_pointer_cast( - conn.shared_from_this())); - messenger.unaccept_conn( - seastar::static_pointer_cast( - conn.shared_from_this())); - execution_done = seastar::with_gate(pending_dispatch, [this] { + + gated_execute("execute_establishing", [this] { return seastar::futurize_apply([this] { return send_server_ident(); }).then([this] { @@ -1723,23 +1708,20 @@ void ProtocolV2::trigger_replacing(bool reconnect, if (socket) { socket->shutdown(); } - (void) seastar::with_gate(pending_dispatch, [this] { + gated_dispatch("ms_handle_accept_replacing", [this] { return dispatcher.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_accept caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_handle_accept()"); }); - (void) seastar::with_gate(pending_dispatch, - [this, - reconnect, - do_reset, - new_socket = std::move(new_socket), - new_auth_meta = std::move(new_auth_meta), - new_rxtx = std::move(new_rxtx), - new_client_cookie, new_peer_name, - new_conn_features, new_peer_global_seq, - new_connect_seq, new_msg_seq] () mutable { + gated_dispatch("trigger_replacing", + [this, + reconnect, + do_reset, + new_socket = std::move(new_socket), + new_auth_meta = std::move(new_auth_meta), + new_rxtx = std::move(new_rxtx), + new_client_cookie, new_peer_name, + new_conn_features, new_peer_global_seq, + new_connect_seq, new_msg_seq] () mutable { return wait_write_exit().then([this, do_reset] { if (do_reset) { reset_session(true); @@ -1761,7 +1743,8 @@ void ProtocolV2::trigger_replacing(bool reconnect, } if (socket) { - (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable { + gated_dispatch("close_socket_replacing", + [sock = std::move(socket)] () mutable { return sock->close().then([sock = std::move(sock)] {}); }); } @@ -1953,11 +1936,8 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; - (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { + gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] { return dispatcher.ms_dispatch(&conn, std::move(msg)); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_dispatch caught exception: {}", conn, eptr); - ceph_abort("unexpected exception from ms_dispatch()"); }); }); } @@ -1971,7 +1951,7 @@ void ProtocolV2::execute_ready() conn.interceptor->register_conn_ready(conn); } #endif - execution_done = seastar::with_gate(pending_dispatch, [this] { + gated_execute("execute_ready", [this] { protocol_timer.cancel(); return seastar::keep_doing([this] { return read_main_preamble() @@ -2077,8 +2057,7 @@ void ProtocolV2::execute_wait(bool max_backoff) if (socket) { socket->shutdown(); } - execution_done = seastar::with_gate(pending_dispatch, - [this, max_backoff] { + gated_execute("execute_wait", [this, max_backoff] { double backoff = protocol_timer.last_dur(); if (max_backoff) { backoff = conf.ms_max_backoff; @@ -2109,14 +2088,14 @@ void ProtocolV2::execute_wait(bool max_backoff) void ProtocolV2::execute_server_wait() { trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false); - execution_done = seastar::with_gate(pending_dispatch, [this] { + gated_execute("execute_server_wait", [this] { return read_exactly(1).then([this] (auto bl) { logger().warn("{} SERVER_WAIT got read, abort", conn); abort_in_fault(); }).handle_exception([this] (std::exception_ptr eptr) { logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}", conn, get_state_name(state), eptr); - (void) close(); + close(false); }); }); } @@ -2141,11 +2120,6 @@ void ProtocolV2::trigger_close() protocol_timer.cancel(); trigger_state(state_t::CLOSING, write_state_t::drop, false); -#ifdef UNIT_TESTS_BUILT - if (conn.interceptor) { - conn.interceptor->register_conn_closed(conn); - } -#endif } } // namespace crimson::net diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 0e8f2ff90e8..f98bf3d4366 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -80,6 +80,14 @@ class ProtocolV2 final : public Protocol { seastar::shared_future<> execution_done = seastar::now(); + template + void gated_execute(const char* what, Func&& func) { + gated_dispatch(what, [this, &func] { + execution_done = seastar::futurize_apply(std::forward(func)); + return execution_done.get_future(); + }); + } + class Timer { double last_dur_ = 0.0; const SocketConnection& conn; @@ -124,9 +132,8 @@ class ProtocolV2 final : public Protocol { private: void fault(bool backoff, const char* func_name, std::exception_ptr eptr); - void dispatch_reset(); void reset_session(bool full); - seastar::future banner_exchange(); + seastar::future banner_exchange(bool is_connect); enum class next_step_t { ready, @@ -174,7 +181,7 @@ class ProtocolV2 final : public Protocol { seastar::future<> finish_auth(); // ESTABLISHING - void execute_establishing(); + void execute_establishing(SocketConnectionRef existing_conn, bool dispatch_reset); // ESTABLISHING/REPLACING (server) seastar::future<> send_server_ident(); diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index c97554f50ba..a9989650c9d 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -25,31 +25,25 @@ using SocketRef = std::unique_ptr; class Socket { - const seastar::shard_id sid; - seastar::connected_socket socket; - seastar::input_stream in; - seastar::output_stream out; - -#ifndef NDEBUG - bool closed = false; -#endif - - /// buffer state for read() - struct { - bufferlist buffer; - size_t remaining; - } r; - struct construct_tag {}; public: - Socket(seastar::connected_socket&& _socket, construct_tag) + // if acceptor side, peer is using a different port (ephemeral_port) + // if connector side, I'm using a different port (ephemeral_port) + enum class side_t { + acceptor, + connector + }; + + Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag) : sid{seastar::engine().cpu_id()}, socket(std::move(_socket)), in(socket.input()), // the default buffer size 8192 is too small that may impact our write // performance. see seastar::net::connected_socket::output() - out(socket.output(65536)) {} + out(socket.output(65536)), + side(_side), + ephemeral_port(e_port) {} ~Socket() { #ifndef NDEBUG @@ -63,7 +57,8 @@ class Socket connect(const entity_addr_t& peer_addr) { return seastar::connect(peer_addr.in4_addr() ).then([] (seastar::connected_socket socket) { - return std::make_unique(std::move(socket), construct_tag{}); + return std::make_unique( + std::move(socket), side_t::connector, 0, construct_tag{}); }); } @@ -115,7 +110,45 @@ class Socket socket.shutdown_output(); } + side_t get_side() const { + return side; + } + + uint16_t get_ephemeral_port() const { + return ephemeral_port; + } + + // learn my ephemeral_port as connector. + // unfortunately, there's no way to identify which port I'm using as + // connector with current seastar interface. + void learn_ephemeral_port_as_connector(uint16_t port) { + assert(side == side_t::connector && + (ephemeral_port == 0 || ephemeral_port == port)); + ephemeral_port = port; + } + + private: + const seastar::shard_id sid; + seastar::connected_socket socket; + seastar::input_stream in; + seastar::output_stream out; + side_t side; + uint16_t ephemeral_port; + +#ifndef NDEBUG + bool closed = false; +#endif + + /// buffer state for read() + struct { + bufferlist buffer; + size_t remaining; + } r; + #ifdef UNIT_TESTS_BUILT + public: + void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_); + private: bp_action_t next_trap_read = bp_action_t::CONTINUE; bp_action_t next_trap_write = bp_action_t::CONTINUE; @@ -123,9 +156,6 @@ class Socket seastar::future<> try_trap_pre(bp_action_t& trap); seastar::future<> try_trap_post(bp_action_t& trap); - public: - void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_); - #endif friend class FixedCPUServerSocket; }; @@ -214,7 +244,8 @@ public: peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); peer_addr.set_type(entity_addr_t::TYPE_ANY); SocketRef _socket = std::make_unique( - std::move(socket), Socket::construct_tag{}); + std::move(socket), Socket::side_t::acceptor, + peer_addr.get_port(), Socket::construct_tag{}); std::ignore = seastar::with_gate(ss.shutdown_gate, [socket = std::move(_socket), peer_addr, &ss, fn_accept = std::move(fn_accept)] () mutable { diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 4a73034e922..6d84acf1469 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -63,6 +63,12 @@ bool SocketConnection::is_closed() const return protocol->is_closed(); } +bool SocketConnection::is_closed_clean() const +{ + assert(seastar::engine().cpu_id() == shard_id()); + return protocol->is_closed_clean; +} + #endif bool SocketConnection::peer_wins() const { @@ -81,10 +87,10 @@ seastar::future<> SocketConnection::keepalive() return protocol->keepalive(); } -seastar::future<> SocketConnection::close() +void SocketConnection::mark_down() { assert(seastar::engine().cpu_id() == shard_id()); - return protocol->close(); + protocol->close(false); } bool SocketConnection::update_rx_seq(seq_num_t seq) @@ -120,19 +126,25 @@ SocketConnection::start_accept(SocketRef&& sock, protocol->start_accept(std::move(sock), _peer_addr); } +seastar::future<> +SocketConnection::close_clean(bool dispatch_reset) +{ + return protocol->close_clean(dispatch_reset); +} + seastar::shard_id SocketConnection::shard_id() const { return messenger.shard_id(); } void SocketConnection::print(ostream& out) const { messenger.print(out); - if (side == side_t::none) { + if (!protocol->socket) { out << " >> " << get_peer_name() << " " << peer_addr; - } else if (side == side_t::acceptor) { + } else if (protocol->socket->get_side() == Socket::side_t::acceptor) { out << " >> " << get_peer_name() << " " << peer_addr - << "@" << ephemeral_port; - } else { // side == side_t::connector - out << "@" << ephemeral_port + << "@" << protocol->socket->get_ephemeral_port(); + } else { // protocol->socket->get_side() == Socket::side_t::connector + out << "@" << protocol->socket->get_ephemeral_port() << " >> " << get_peer_name() << " " << peer_addr; } } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 503d4e55fb0..358e8e00807 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -33,20 +33,6 @@ class SocketConnection : public Connection { SocketMessenger& messenger; std::unique_ptr protocol; - // if acceptor side, ephemeral_port is different from peer_addr.get_port(); - // if connector side, ephemeral_port is different from my_addr.get_port(). - enum class side_t { - none, - acceptor, - connector - }; - side_t side = side_t::none; - uint16_t ephemeral_port = 0; - void set_ephemeral_port(uint16_t port, side_t _side) { - ephemeral_port = port; - side = _side; - } - ceph::net::Policy policy; /// the seq num of the last transmitted message @@ -77,6 +63,8 @@ class SocketConnection : public Connection { bool is_connected() const override; #ifdef UNIT_TESTS_BUILT + bool is_closed_clean() const override; + bool is_closed() const override; bool peer_wins() const override; @@ -88,7 +76,7 @@ class SocketConnection : public Connection { seastar::future<> keepalive() override; - seastar::future<> close() override; + void mark_down() override; void print(ostream& out) const override; @@ -101,6 +89,8 @@ class SocketConnection : public Connection { void start_accept(SocketRef&& socket, const entity_addr_t& peer_addr); + seastar::future<> close_clean(bool dispatch_reset); + bool is_server_side() const { return policy.server; } diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 423e7d4edc9..c11e2b32753 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -141,6 +141,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe ceph_assert(peer_addr.get_port() > 0); if (auto found = lookup_conn(peer_addr); found) { + logger().info("{} connect to existing", *found); return found->shared_from_this(); } SocketConnectionRef conn = seastar::make_shared( @@ -163,12 +164,12 @@ seastar::future<> SocketMessenger::shutdown() // close all connections }).then([this] { return seastar::parallel_for_each(accepting_conns, [] (auto conn) { - return conn->close(); + return conn->close_clean(false); }); }).then([this] { ceph_assert(accepting_conns.empty()); return seastar::parallel_for_each(connections, [] (auto conn) { - return conn.second->close(); + return conn.second->close_clean(false); }); }).then([this] { ceph_assert(connections.empty()); diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index 4318d29f880..6f60654854c 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -113,31 +113,19 @@ void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) } } -seastar::future Heartbeat::remove_down_peers() +Heartbeat::osds_t Heartbeat::remove_down_peers() { osds_t osds; for (auto& peer : peers) { - osds.push_back(peer.first); + auto osd = peer.first; + auto osdmap = service.get_osdmap_service().get_map(); + if (!osdmap->is_up(osd)) { + remove_peer(osd); + } else if (peers[osd].epoch < osdmap->get_epoch()) { + osds.push_back(osd); + } } - return seastar::map_reduce(std::move(osds), - [this](auto& osd) { - auto osdmap = service.get_osdmap_service().get_map(); - if (!osdmap->is_up(osd)) { - return remove_peer(osd).then([] { - return seastar::make_ready_future(-1); - }); - } else if (peers[osd].epoch < osdmap->get_epoch()) { - return seastar::make_ready_future(osd); - } else { - return seastar::make_ready_future(-1); - } - }, osds_t{}, - [](osds_t&& extras, osd_id_t extra) { - if (extra >= 0) { - extras.push_back(extra); - } - return std::move(extras); - }); + return osds; } void Heartbeat::add_reporter_peers(int whoami) @@ -163,49 +151,37 @@ void Heartbeat::add_reporter_peers(int whoami) }; } -seastar::future<> Heartbeat::update_peers(int whoami) +void Heartbeat::update_peers(int whoami) { const auto min_peers = static_cast( local_conf().get_val("osd_heartbeat_min_peers")); add_reporter_peers(whoami); - return remove_down_peers().then([=](osds_t&& extra) { - // too many? - struct iteration_state { - osds_t::const_iterator where; - osds_t::const_iterator end; - }; - return seastar::do_with(iteration_state{extra.begin(),extra.end()}, - [=](iteration_state& s) { - return seastar::do_until( - [min_peers, &s, this] { - return peers.size() <= min_peers || s.where == s.end; }, - [&s, this] { - return remove_peer(*s.where); } - ); - }); - }).then([=] { - // or too few? - auto osdmap = service.get_osdmap_service().get_map(); - auto epoch = osdmap->get_epoch(); - for (auto next = osdmap->get_next_up_osd_after(whoami); - peers.size() < min_peers && next >= 0 && next != whoami; - next = osdmap->get_next_up_osd_after(next)) { - add_peer(next, epoch); + auto extra = remove_down_peers(); + // too many? + for (auto& osd : extra) { + if (peers.size() <= min_peers) { + break; } - }); + remove_peer(osd); + } + // or too few? + auto osdmap = service.get_osdmap_service().get_map(); + auto epoch = osdmap->get_epoch(); + for (auto next = osdmap->get_next_up_osd_after(whoami); + peers.size() < min_peers && next >= 0 && next != whoami; + next = osdmap->get_next_up_osd_after(next)) { + add_peer(next, epoch); + } } -seastar::future<> Heartbeat::remove_peer(osd_id_t peer) +void Heartbeat::remove_peer(osd_id_t peer) { + logger().info("remove_peer({})", peer); auto found = peers.find(peer); assert(found != peers.end()); - logger().info("remove_peer({})", peer); - return seastar::when_all_succeed(found->second.con_front->close(), - found->second.con_back->close()).then( - [this, peer] { - peers.erase(peer); - return seastar::now(); - }); + found->second.con_front->mark_down(); + found->second.con_back->mark_down(); + peers.erase(peer); } seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn, @@ -231,9 +207,9 @@ seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn) } const auto peer = found->first; const auto epoch = found->second.epoch; - return remove_peer(peer).then([peer, epoch, this] { - add_peer(peer, epoch); - }); + remove_peer(peer); + add_peer(peer, epoch); + return seastar::now(); } seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn, @@ -350,46 +326,41 @@ void Heartbeat::heartbeat_check() seastar::future<> Heartbeat::send_heartbeats() { - using peers_item_t = typename peers_map_t::value_type; - return seastar::parallel_for_each(peers, - [this](peers_item_t& item) { - const auto mnow = service.get_mnow(); - const auto now = clock::now(); - const auto deadline = - now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); - auto& info = item.second; - info.last_tx = now; - if (clock::is_zero(info.first_tx)) { - info.first_tx = now; + const auto mnow = service.get_mnow(); + const auto now = clock::now(); + const auto deadline = + now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); + const utime_t sent_stamp{now}; + + std::vector> futures; + for (auto& item : peers) { + auto& info = item.second; + info.last_tx = now; + if (clock::is_zero(info.first_tx)) { + info.first_tx = now; + } + [[maybe_unused]] auto [reply, added] = + info.ping_history.emplace(sent_stamp, reply_t{deadline, 0}); + crimson::net::ConnectionRef conns[] = {info.con_front, info.con_back}; + for (auto& con : conns) { + if (con) { + auto min_message = static_cast( + local_conf()->osd_heartbeat_min_size); + auto ping = make_message( + monc.get_fsid(), + service.get_osdmap_service().get_map()->get_epoch(), + MOSDPing::PING, + sent_stamp, + mnow, + mnow, + service.get_osdmap_service().get_up_epoch(), + min_message); + reply->second.unacknowledged++; + futures.push_back(con->send(std::move(ping))); } - const utime_t sent_stamp{now}; - [[maybe_unused]] auto [reply, added] = - info.ping_history.emplace(sent_stamp, reply_t{deadline, 0}); - std::vector conns{info.con_front, - info.con_back}; - return seastar::parallel_for_each(std::move(conns), - [sent_stamp, mnow, &reply=reply->second, this] (auto con) { - if (con) { - auto min_message = static_cast( - local_conf()->osd_heartbeat_min_size); - auto ping = make_message( - monc.get_fsid(), - service.get_osdmap_service().get_map()->get_epoch(), - MOSDPing::PING, - sent_stamp, - mnow, - mnow, - service.get_osdmap_service().get_up_epoch(), - min_message); - return con->send(ping).then([&reply] { - reply.unacknowledged++; - return seastar::now(); - }); - } else { - return seastar::now(); - } - }); - }); + } + } + return seastar::when_all_succeed(futures.begin(), futures.end()); } seastar::future<> Heartbeat::send_failures(failure_queue_t&& failure_queue) diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index c51e81de67b..a0e6146cd47 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -35,8 +35,8 @@ public: seastar::future<> stop(); void add_peer(osd_id_t peer, epoch_t epoch); - seastar::future<> update_peers(int whoami); - seastar::future<> remove_peer(osd_id_t peer); + void update_peers(int whoami); + void remove_peer(osd_id_t peer); const entity_addrvec_t& get_front_addrs() const; const entity_addrvec_t& get_back_addrs() const; @@ -62,7 +62,7 @@ private: using osds_t = std::vector; /// remove down OSDs /// @return peers not needed in this epoch - seastar::future remove_down_peers(); + osds_t remove_down_peers(); /// add enough reporters for fast failure detection void add_reporter_peers(int whoami); diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 9ce90100535..01f93853577 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -78,7 +78,7 @@ OSD::OSD(int id, uint32_t nonce, shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store}, heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}}, // do this in background - heartbeat_timer{[this] { (void)update_heartbeat_peers(); }}, + heartbeat_timer{[this] { update_heartbeat_peers(); }}, asok{seastar::make_lw_shared()}, osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services))) { @@ -1049,10 +1049,10 @@ seastar::future<> OSD::send_beacon() return monc->send_message(m); } -seastar::future<> OSD::update_heartbeat_peers() +void OSD::update_heartbeat_peers() { if (!state.is_active()) { - return seastar::now(); + return; } for (auto& pg : pg_map.get_pgs()) { vector up, acting; @@ -1067,7 +1067,7 @@ seastar::future<> OSD::update_heartbeat_peers() } } } - return heartbeat->update_peers(whoami); + heartbeat->update_peers(whoami); } seastar::future<> OSD::handle_peering_op( diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index baf90c8d37f..be090fb90e1 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -216,7 +216,7 @@ public: seastar::future<> shutdown(); seastar::future<> send_beacon(); - seastar::future<> update_heartbeat_peers(); + void update_heartbeat_peers(); friend class PGAdvanceMap; }; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 68d6da744dd..4a5b3f745d8 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -932,7 +932,7 @@ class FailoverSuite : public Dispatcher { unsigned pending_establish = 0; unsigned replaced_conns = 0; for (auto& result : interceptor.results) { - if (result.conn->is_closed()) { + if (result.conn->is_closed_clean()) { if (result.state == conn_state_t::replaced) { ++replaced_conns; } @@ -1122,7 +1122,8 @@ class FailoverSuite : public Dispatcher { seastar::future<> markdown() { logger().info("[Test] markdown()"); ceph_assert(tracked_conn); - return tracked_conn->close(); + tracked_conn->mark_down(); + return seastar::now(); } seastar::future<> wait_blocked() { @@ -1375,6 +1376,7 @@ class FailoverSuitePeer : public Dispatcher { } seastar::future<> ms_handle_accept(ConnectionRef conn) override { + logger().info("[TestPeer] got accept from Test"); ceph_assert(!tracked_conn || tracked_conn->is_closed() || tracked_conn == conn); @@ -1383,6 +1385,7 @@ class FailoverSuitePeer : public Dispatcher { } seastar::future<> ms_handle_reset(ConnectionRef conn) override { + logger().info("[TestPeer] got reset from Test"); ceph_assert(tracked_conn == conn); tracked_conn = nullptr; return seastar::now(); @@ -1468,7 +1471,8 @@ class FailoverSuitePeer : public Dispatcher { seastar::future<> markdown() { logger().info("[TestPeer] markdown()"); ceph_assert(tracked_conn); - return tracked_conn->close(); + tracked_conn->mark_down(); + return seastar::now(); } static seastar::future>