Merge pull request #33909 from cyx1231st/wip-seastar-msgr-fix-reset

crimson: misc fixes for writes to multiple-osd cluster

Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2020-03-23 17:05:57 +08:00 committed by GitHub
commit 5305cfbc43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 358 additions and 353 deletions

View File

@ -40,9 +40,7 @@ seastar::future<> Client::stop()
{
return gate.close().then([this] {
if (conn) {
return conn->close();
} else {
return seastar::now();
conn->mark_down();
}
});
}
@ -85,8 +83,7 @@ seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c)
seastar::future<> Client::reconnect()
{
if (conn) {
// crimson::net::Protocol::close() is able to close() in background
(void)conn->close();
conn->mark_down();
conn = {};
}
if (!mgrmap.get_available()) {

View File

@ -79,7 +79,7 @@ public:
const std::vector<uint32_t>& allowed_modes);
// v1 and v2
seastar::future<> close();
void close();
bool is_my_peer(const entity_addr_t& addr) const;
AuthAuthorizer* get_authorizer(entity_type_t peer) const;
KeyStore& get_keys();
@ -427,16 +427,14 @@ int Connection::handle_auth_bad_method(uint32_t old_auth_method,
return 0;
}
seastar::future<> Connection::close()
void Connection::close()
{
reply.set_value(Ref<MAuthReply>(nullptr));
reply = {};
auth_done.set_value(AuthResult::canceled);
auth_done = {};
if (conn && !std::exchange(closed, true)) {
return conn->close();
} else {
return seastar::now();
conn->mark_down();
}
}
@ -551,7 +549,8 @@ seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn)
});
if (found != pending_conns.end()) {
logger().warn("pending conn reset by {}", conn->get_peer_addr());
return (*found)->close();
(*found)->close();
return seastar::now();
} else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
logger().warn("active conn reset {}", conn->get_peer_addr());
active_con.reset();
@ -920,9 +919,7 @@ seastar::future<> Client::stop()
return tick_gate.close().then([this] {
timer.cancel();
if (active_con) {
return active_con->close();
} else {
return seastar::now();
active_con->close();
}
});
}
@ -953,9 +950,8 @@ seastar::future<> Client::reopen_session(int rank)
} else {
return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys)
.handle_exception([conn](auto ep) {
return conn->close().then([ep=std::move(ep)](){
return seastar::make_exception_future<Connection::AuthResult>(ep);
});
conn->mark_down();
return seastar::make_exception_future<Connection::AuthResult>(ep);
});
}
}).then([peer, this](auto result) {
@ -986,21 +982,13 @@ seastar::future<> Client::reopen_session(int rank)
ceph_assert(!active_con && !pending_conns.empty());
active_con = std::move(*found);
found->reset();
auto ret = seastar::do_with(
std::move(pending_conns),
[](auto &pending_conns) {
return seastar::parallel_for_each(
pending_conns,
[] (auto &conn) {
if (!conn) {
return seastar::now();
} else {
return conn->close();
}
});
});
for (auto& conn : pending_conns) {
if (conn) {
conn->close();
}
}
pending_conns.clear();
return ret;
return seastar::now();
}).then([]() {
logger().debug("reopen_session mon connection attempts complete");
}).handle_exception([](auto ep) {

View File

@ -95,6 +95,8 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
#ifdef UNIT_TESTS_BUILT
virtual bool is_closed() const = 0;
virtual bool is_closed_clean() const = 0;
virtual bool peer_wins() const = 0;
#endif
@ -105,10 +107,9 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
/// handshake
virtual seastar::future<> keepalive() = 0;
// close the connection and cancel any any pending futures from read/send
// Note it's OK to discard the returned future because Messenger::shutdown()
// will wait for all connections closed
virtual seastar::future<> close() = 0;
// close the connection and cancel any any pending futures from read/send,
// without dispatching any reset event
virtual void mark_down() = 0;
virtual void print(ostream& out) const = 0;

View File

@ -7,6 +7,7 @@
#include "crimson/common/log.h"
#include "crimson/net/Errors.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/net/Socket.h"
#include "crimson/net/SocketConnection.h"
#include "msg/Message.h"
@ -39,37 +40,62 @@ bool Protocol::is_connected() const
return write_state == write_state_t::open;
}
seastar::future<> Protocol::close()
void Protocol::close(bool dispatch_reset,
std::optional<std::function<void()>> f_accept_new)
{
if (closed) {
// already closing
assert(close_ready.valid());
return close_ready.get_future();
return;
}
logger().info("{} closing: reset {}, replace {}", conn,
dispatch_reset ? "yes" : "no",
f_accept_new ? "yes" : "no");
// unregister_conn() drops a reference, so hold another until completion
auto cleanup = [conn_ref = conn.shared_from_this(), this] {
logger().debug("{} closed!", conn);
#ifdef UNIT_TESTS_BUILT
is_closed_clean = true;
if (conn.interceptor) {
conn.interceptor->register_conn_closed(conn);
}
#endif
};
// atomic operations
closed = true;
trigger_close();
// close_ready become valid only after state is state_t::closing
assert(!close_ready.valid());
if (f_accept_new) {
(*f_accept_new)();
}
if (socket) {
socket->shutdown();
close_ready = pending_dispatch.close().finally([this] {
return socket->close();
}).finally(std::move(cleanup));
} else {
close_ready = pending_dispatch.close().finally(std::move(cleanup));
}
closed = true;
set_write_state(write_state_t::drop);
auto gate_closed = pending_dispatch.close();
auto reset_dispatched = seastar::futurize_apply([this, dispatch_reset] {
if (dispatch_reset) {
return dispatcher.ms_handle_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
return seastar::now();
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_reset()");
});
return close_ready.get_future();
// asynchronous operations
assert(!close_ready.valid());
close_ready = seastar::when_all_succeed(
std::move(gate_closed).finally([this] {
if (socket) {
return socket->close();
}
return seastar::now();
}),
std::move(reset_dispatched)
).finally(std::move(cleanup));
}
seastar::future<> Protocol::send(MessageRef msg)
@ -289,13 +315,8 @@ void Protocol::write_event()
case write_state_t::open:
[[fallthrough]];
case write_state_t::delay:
(void) seastar::with_gate(pending_dispatch, [this] {
return do_write_dispatch_sweep(
).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} do_write_dispatch_sweep(): unexpected exception {}",
conn, eptr);
ceph_abort();
});
gated_dispatch("do_write_dispatch_sweep", [this] {
return do_write_dispatch_sweep();
});
return;
case write_state_t::drop:

View File

@ -6,6 +6,7 @@
#include <seastar/core/gate.hh>
#include <seastar/core/shared_future.hh>
#include "crimson/common/log.h"
#include "Fwd.h"
#include "SocketConnection.h"
@ -24,10 +25,20 @@ class Protocol {
bool is_connected() const;
#ifdef UNIT_TESTS_BUILT
bool is_closed_clean = false;
bool is_closed() const { return closed; }
#endif
// Reentrant closing
seastar::future<> close();
void close(bool dispatch_reset, std::optional<std::function<void()>> f_accept_new=std::nullopt);
seastar::future<> close_clean(bool dispatch_reset) {
close(dispatch_reset);
// it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
// which will otherwise result in deadlock
assert(close_ready.valid());
return close_ready.get_future();
}
virtual void start_connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type) = 0;
@ -53,19 +64,29 @@ class Protocol {
public:
const proto_t proto_type;
SocketRef socket;
protected:
template <typename Func>
void gated_dispatch(const char* what, Func&& func) {
(void) seastar::with_gate(pending_dispatch, std::forward<Func>(func)
).handle_exception([this, what] (std::exception_ptr eptr) {
crimson::get_logger(ceph_subsys_ms).error(
"{} gated_dispatch() {} caught exception: {}", conn, what, eptr);
ceph_abort("unexpected exception from gated_dispatch()");
});
}
Dispatcher &dispatcher;
SocketConnection &conn;
SocketRef socket;
seastar::gate pending_dispatch;
AuthConnectionMetaRef auth_meta;
private:
bool closed = false;
// become valid only after closed == true
seastar::shared_future<> close_ready;
seastar::gate pending_dispatch;
// the write state-machine
public:

View File

@ -309,8 +309,6 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
state = state_t::connecting;
set_write_state(write_state_t::delay);
// we don't know my ephemeral port yet
conn.set_ephemeral_port(0, SocketConnection::side_t::none);
ceph_assert(!socket);
conn.peer_addr = _peer_addr;
conn.target_addr = _peer_addr;
@ -318,7 +316,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
conn.policy = messenger.get_policy(_peer_type);
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
(void) seastar::with_gate(pending_dispatch, [this] {
gated_dispatch("start_connect", [this] {
return Socket::connect(conn.peer_addr)
.then([this](SocketRef sock) {
socket = std::move(sock);
@ -347,8 +345,10 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
throw std::system_error(
make_error_code(crimson::net::error::bad_peer_address));
}
conn.set_ephemeral_port(caddr.get_port(),
SocketConnection::side_t::connector);
if (state != state_t::connecting) {
throw std::system_error(make_error_code(error::protocol_aborted));
}
socket->learn_ephemeral_port_as_connector(caddr.get_port());
if (unlikely(caddr.is_msgr2())) {
logger().warn("{} peer sent a v2 address for me: {}",
conn, caddr);
@ -377,7 +377,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the connecting state
logger().warn("{} connecting fault: {}", conn, eptr);
(void) close();
close(true);
});
});
}
@ -466,7 +466,7 @@ seastar::future<stop_t> ProtocolV1::replace_existing(
// will all be performed using v2 protocol.
ceph_abort("lossless policy not supported for v1");
}
(void) existing->close();
existing->protocol->close(true);
return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
}
@ -583,7 +583,8 @@ seastar::future<stop_t> ProtocolV1::repeat_handle_connect()
logger().warn("{} existing {} proto version is {} not 1, close existing",
conn, *existing,
static_cast<int>(existing->protocol->proto_type));
(void) existing->close();
// NOTE: this is following async messenger logic, but we may miss the reset event.
existing->mark_down();
} else {
return handle_connect_with_existing(existing, std::move(authorizer_reply));
}
@ -612,12 +613,10 @@ void ProtocolV1::start_accept(SocketRef&& sock,
ceph_assert(!socket);
// until we know better
conn.target_addr = _peer_addr;
conn.set_ephemeral_port(_peer_addr.get_port(),
SocketConnection::side_t::acceptor);
socket = std::move(sock);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
(void) seastar::with_gate(pending_dispatch, [this] {
gated_dispatch("start_accept", [this] {
// stop learning my_addr before sending it out, so it won't change
return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] {
// encode/send server's handshake header
@ -663,7 +662,7 @@ void ProtocolV1::start_accept(SocketRef&& sock,
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the accepting state
logger().warn("{} accepting fault: {}", conn, eptr);
(void) close();
close(false);
});
});
}
@ -847,15 +846,11 @@ seastar::future<> ProtocolV1::read_message()
}
// start dispatch, ignoring exceptions from the application layer
(void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
logger().debug("{} <== #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
return dispatcher.ms_dispatch(&conn, std::move(msg))
.handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
ceph_assert(false);
});
});
gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] {
logger().debug("{} <== #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
return dispatcher.ms_dispatch(&conn, std::move(msg));
});
});
}
@ -894,31 +889,23 @@ void ProtocolV1::execute_open()
state = state_t::open;
set_write_state(write_state_t::open);
(void) seastar::with_gate(pending_dispatch, [this] {
gated_dispatch("execute_open", [this] {
// start background processing of tags
return handle_tags()
.handle_exception_type([this] (const std::system_error& e) {
logger().warn("{} open fault: {}", conn, e);
if (e.code() == error::protocol_aborted ||
e.code() == std::errc::connection_reset) {
return dispatcher.ms_handle_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
.then([this] {
(void) close();
});
} else if (e.code() == error::read_eof) {
return dispatcher.ms_handle_remote_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
.then([this] {
(void) close();
});
e.code() == std::errc::connection_reset ||
e.code() == error::read_eof) {
close(true);
return seastar::now();
} else {
throw e;
}
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the open state
logger().warn("{} open fault: {}", conn, eptr);
(void) close();
close(true);
});
});
}

View File

@ -62,8 +62,8 @@ void abort_protocol() {
throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
}
void abort_in_close(crimson::net::ProtocolV2& proto) {
(void) proto.close();
void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
proto.close(dispatch_reset);
abort_protocol();
}
@ -180,8 +180,6 @@ void ProtocolV2::start_accept(SocketRef&& sock,
ceph_assert(!socket);
// until we know better
conn.target_addr = _peer_addr;
conn.set_ephemeral_port(_peer_addr.get_port(),
SocketConnection::side_t::acceptor);
socket = std::move(sock);
logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
messenger.accept_conn(
@ -413,8 +411,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e
if (conn.policy.lossy) {
logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
conn, func_name, get_state_name(state), eptr);
dispatch_reset();
(void) close();
close(true);
} else if (conn.policy.server ||
(conn.policy.standby &&
(!is_queued() && conn.sent.empty()))) {
@ -432,17 +429,6 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e
}
}
void ProtocolV2::dispatch_reset()
{
(void) seastar::with_gate(pending_dispatch, [this] {
return dispatcher.ms_handle_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_reset()");
});
}
void ProtocolV2::reset_session(bool full)
{
server_cookie = 0;
@ -452,17 +438,14 @@ void ProtocolV2::reset_session(bool full)
client_cookie = generate_client_cookie();
peer_global_seq = 0;
reset_write();
(void) seastar::with_gate(pending_dispatch, [this] {
gated_dispatch("ms_handle_remote_reset", [this] {
return dispatcher.ms_handle_remote_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_remote_reset caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_remote_reset()");
});
}
}
seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange(bool is_connect)
{
// 1. prepare and send banner
bufferlist banner_payload;
@ -515,7 +498,7 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
return read(payload_len);
}).then([this] (bufferlist bl) {
}).then([this, is_connect] (bufferlist bl) {
// 4. process peer banner_payload and send HelloFrame
auto p = bl.cbegin();
uint64_t peer_supported_features;
@ -538,13 +521,13 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
logger().error("{} peer does not support all required features"
" required={} peer_supported={}",
conn, required_features, peer_supported_features);
abort_in_close(*this);
abort_in_close(*this, is_connect);
}
if ((supported_features & peer_required_features) != peer_required_features) {
logger().error("{} we do not support all peer required features"
" peer_required={} supported={}",
conn, peer_required_features, supported_features);
abort_in_close(*this);
abort_in_close(*this, is_connect);
}
this->peer_required_features = peer_required_features;
if (this->peer_required_features == 0) {
@ -668,8 +651,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
});
} catch (const crimson::auth::error& e) {
logger().error("{} get_initial_auth_request returned {}", conn, e);
dispatch_reset();
abort_in_close(*this);
abort_in_close(*this, true);
return seastar::now();
}
}
@ -863,9 +845,7 @@ void ProtocolV2::execute_connecting()
if (socket) {
socket->shutdown();
}
execution_done = seastar::with_gate(pending_dispatch, [this] {
// we don't know my socket_port yet
conn.set_ephemeral_port(0, SocketConnection::side_t::none);
gated_execute("execute_connecting", [this] {
return messenger.get_global_seq().then([this] (auto gs) {
global_seq = gs;
assert(client_cookie != 0);
@ -887,7 +867,8 @@ void ProtocolV2::execute_connecting()
abort_protocol();
}
if (socket) {
(void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
gated_dispatch("close_sockect_connecting",
[sock = std::move(socket)] () mutable {
return sock->close().then([sock = std::move(sock)] {});
});
}
@ -908,18 +889,21 @@ void ProtocolV2::execute_connecting()
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
session_stream_handlers = { nullptr, nullptr };
enable_recording();
return banner_exchange();
return banner_exchange(true);
}).then([this] (entity_type_t _peer_type,
entity_addr_t _my_addr_from_peer) {
if (conn.get_peer_type() != _peer_type) {
logger().warn("{} connection peer type does not match what peer advertises {} != {}",
conn, ceph_entity_type_name(conn.get_peer_type()),
ceph_entity_type_name(_peer_type));
dispatch_reset();
abort_in_close(*this);
abort_in_close(*this, true);
}
conn.set_ephemeral_port(_my_addr_from_peer.get_port(),
SocketConnection::side_t::connector);
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} during banner_exchange(), abort",
conn, get_state_name(state));
abort_protocol();
}
socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
if (unlikely(_my_addr_from_peer.is_legacy())) {
logger().warn("{} peer sent a legacy address for me: {}",
conn, _my_addr_from_peer);
@ -946,12 +930,9 @@ void ProtocolV2::execute_connecting()
}
switch (next) {
case next_step_t::ready: {
(void) seastar::with_gate(pending_dispatch, [this] {
gated_dispatch("ms_handle_connect", [this] {
return dispatcher.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_connect caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_connect()");
});
logger().info("{} connected:"
" gs={}, pgs={}, cs={}, client_cookie={},"
@ -1128,7 +1109,7 @@ ProtocolV2::reuse_connection(
// close this connection because all the necessary information is delivered
// to the exisiting connection, and jump to error handling code to abort the
// current state.
abort_in_close(*this);
abort_in_close(*this, false);
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
@ -1172,15 +1153,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
logger().warn("{} server_connect:"
" existing connection {} is a lossy channel. Close existing in favor of"
" this connection", conn, *existing_conn);
existing_proto->dispatch_reset();
(void) existing_proto->close();
if (unlikely(state != state_t::ACCEPTING)) {
logger().debug("{} triggered {} in execute_accepting()",
conn, get_state_name(state));
abort_protocol();
}
execute_establishing();
execute_establishing(existing_conn, true);
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
@ -1304,24 +1277,21 @@ ProtocolV2::server_connect()
SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
bool dispatch_reset = true;
if (existing_conn) {
if (existing_conn->protocol->proto_type != proto_t::v2) {
logger().warn("{} existing connection {} proto version is {}, close existing",
conn, *existing_conn,
static_cast<int>(existing_conn->protocol->proto_type));
// should unregister the existing from msgr atomically
(void) existing_conn->close();
// NOTE: this is following async messenger logic, but we may miss the reset event.
dispatch_reset = false;
} else {
return handle_existing_connection(existing_conn);
}
}
if (unlikely(state != state_t::ACCEPTING)) {
logger().debug("{} triggered {} in execute_accepting()",
conn, get_state_name(state));
abort_protocol();
}
execute_establishing();
execute_establishing(existing_conn, dispatch_reset);
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
}
@ -1419,7 +1389,8 @@ ProtocolV2::server_reconnect()
"close existing and reset client.",
conn, *existing_conn,
static_cast<int>(existing_conn->protocol->proto_type));
(void) existing_conn->close();
// NOTE: this is following async messenger logic, but we may miss the reset event.
existing_conn->mark_down();
return send_reset(true);
}
@ -1512,13 +1483,13 @@ ProtocolV2::server_reconnect()
void ProtocolV2::execute_accepting()
{
trigger_state(state_t::ACCEPTING, write_state_t::none, false);
(void) seastar::with_gate(pending_dispatch, [this] {
gated_dispatch("execute_accepting", [this] {
return seastar::futurize_apply([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
session_stream_handlers = { nullptr, nullptr };
enable_recording();
return banner_exchange();
return banner_exchange(false);
}).then([this] (entity_type_t _peer_type,
entity_addr_t _my_addr_from_peer) {
ceph_assert(conn.get_peer_type() == 0);
@ -1573,7 +1544,7 @@ void ProtocolV2::execute_accepting()
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
conn, get_state_name(state), eptr);
(void) close();
close(false);
});
});
}
@ -1615,22 +1586,36 @@ seastar::future<> ProtocolV2::finish_auth()
// ESTABLISHING
void ProtocolV2::execute_establishing() {
void ProtocolV2::execute_establishing(
SocketConnectionRef existing_conn, bool dispatch_reset) {
if (unlikely(state != state_t::ACCEPTING)) {
logger().debug("{} triggered {} before execute_establishing()",
conn, get_state_name(state));
abort_protocol();
}
auto accept_me = [this] {
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
};
trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
(void) seastar::with_gate(pending_dispatch, [this] {
if (existing_conn) {
existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
} else {
accept_me();
}
gated_dispatch("ms_handle_accept_establishing", [this] {
return dispatcher.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_accept()");
});
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
execution_done = seastar::with_gate(pending_dispatch, [this] {
gated_execute("execute_establishing", [this] {
return seastar::futurize_apply([this] {
return send_server_ident();
}).then([this] {
@ -1723,23 +1708,20 @@ void ProtocolV2::trigger_replacing(bool reconnect,
if (socket) {
socket->shutdown();
}
(void) seastar::with_gate(pending_dispatch, [this] {
gated_dispatch("ms_handle_accept_replacing", [this] {
return dispatcher.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_accept()");
});
(void) seastar::with_gate(pending_dispatch,
[this,
reconnect,
do_reset,
new_socket = std::move(new_socket),
new_auth_meta = std::move(new_auth_meta),
new_rxtx = std::move(new_rxtx),
new_client_cookie, new_peer_name,
new_conn_features, new_peer_global_seq,
new_connect_seq, new_msg_seq] () mutable {
gated_dispatch("trigger_replacing",
[this,
reconnect,
do_reset,
new_socket = std::move(new_socket),
new_auth_meta = std::move(new_auth_meta),
new_rxtx = std::move(new_rxtx),
new_client_cookie, new_peer_name,
new_conn_features, new_peer_global_seq,
new_connect_seq, new_msg_seq] () mutable {
return wait_write_exit().then([this, do_reset] {
if (do_reset) {
reset_session(true);
@ -1761,7 +1743,8 @@ void ProtocolV2::trigger_replacing(bool reconnect,
}
if (socket) {
(void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
gated_dispatch("close_socket_replacing",
[sock = std::move(socket)] () mutable {
return sock->close().then([sock = std::move(sock)] {});
});
}
@ -1953,11 +1936,8 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
(void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] {
return dispatcher.ms_dispatch(&conn, std::move(msg));
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_dispatch()");
});
});
}
@ -1971,7 +1951,7 @@ void ProtocolV2::execute_ready()
conn.interceptor->register_conn_ready(conn);
}
#endif
execution_done = seastar::with_gate(pending_dispatch, [this] {
gated_execute("execute_ready", [this] {
protocol_timer.cancel();
return seastar::keep_doing([this] {
return read_main_preamble()
@ -2077,8 +2057,7 @@ void ProtocolV2::execute_wait(bool max_backoff)
if (socket) {
socket->shutdown();
}
execution_done = seastar::with_gate(pending_dispatch,
[this, max_backoff] {
gated_execute("execute_wait", [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
backoff = conf.ms_max_backoff;
@ -2109,14 +2088,14 @@ void ProtocolV2::execute_wait(bool max_backoff)
void ProtocolV2::execute_server_wait()
{
trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
execution_done = seastar::with_gate(pending_dispatch, [this] {
gated_execute("execute_server_wait", [this] {
return read_exactly(1).then([this] (auto bl) {
logger().warn("{} SERVER_WAIT got read, abort", conn);
abort_in_fault();
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
conn, get_state_name(state), eptr);
(void) close();
close(false);
});
});
}
@ -2141,11 +2120,6 @@ void ProtocolV2::trigger_close()
protocol_timer.cancel();
trigger_state(state_t::CLOSING, write_state_t::drop, false);
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
conn.interceptor->register_conn_closed(conn);
}
#endif
}
} // namespace crimson::net

View File

@ -80,6 +80,14 @@ class ProtocolV2 final : public Protocol {
seastar::shared_future<> execution_done = seastar::now();
template <typename Func>
void gated_execute(const char* what, Func&& func) {
gated_dispatch(what, [this, &func] {
execution_done = seastar::futurize_apply(std::forward<Func>(func));
return execution_done.get_future();
});
}
class Timer {
double last_dur_ = 0.0;
const SocketConnection& conn;
@ -124,9 +132,8 @@ class ProtocolV2 final : public Protocol {
private:
void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
void dispatch_reset();
void reset_session(bool full);
seastar::future<entity_type_t, entity_addr_t> banner_exchange();
seastar::future<entity_type_t, entity_addr_t> banner_exchange(bool is_connect);
enum class next_step_t {
ready,
@ -174,7 +181,7 @@ class ProtocolV2 final : public Protocol {
seastar::future<> finish_auth();
// ESTABLISHING
void execute_establishing();
void execute_establishing(SocketConnectionRef existing_conn, bool dispatch_reset);
// ESTABLISHING/REPLACING (server)
seastar::future<> send_server_ident();

View File

@ -25,31 +25,25 @@ using SocketRef = std::unique_ptr<Socket>;
class Socket
{
const seastar::shard_id sid;
seastar::connected_socket socket;
seastar::input_stream<char> in;
seastar::output_stream<char> out;
#ifndef NDEBUG
bool closed = false;
#endif
/// buffer state for read()
struct {
bufferlist buffer;
size_t remaining;
} r;
struct construct_tag {};
public:
Socket(seastar::connected_socket&& _socket, construct_tag)
// if acceptor side, peer is using a different port (ephemeral_port)
// if connector side, I'm using a different port (ephemeral_port)
enum class side_t {
acceptor,
connector
};
Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
: sid{seastar::engine().cpu_id()},
socket(std::move(_socket)),
in(socket.input()),
// the default buffer size 8192 is too small that may impact our write
// performance. see seastar::net::connected_socket::output()
out(socket.output(65536)) {}
out(socket.output(65536)),
side(_side),
ephemeral_port(e_port) {}
~Socket() {
#ifndef NDEBUG
@ -63,7 +57,8 @@ class Socket
connect(const entity_addr_t& peer_addr) {
return seastar::connect(peer_addr.in4_addr()
).then([] (seastar::connected_socket socket) {
return std::make_unique<Socket>(std::move(socket), construct_tag{});
return std::make_unique<Socket>(
std::move(socket), side_t::connector, 0, construct_tag{});
});
}
@ -115,7 +110,45 @@ class Socket
socket.shutdown_output();
}
side_t get_side() const {
return side;
}
uint16_t get_ephemeral_port() const {
return ephemeral_port;
}
// learn my ephemeral_port as connector.
// unfortunately, there's no way to identify which port I'm using as
// connector with current seastar interface.
void learn_ephemeral_port_as_connector(uint16_t port) {
assert(side == side_t::connector &&
(ephemeral_port == 0 || ephemeral_port == port));
ephemeral_port = port;
}
private:
const seastar::shard_id sid;
seastar::connected_socket socket;
seastar::input_stream<char> in;
seastar::output_stream<char> out;
side_t side;
uint16_t ephemeral_port;
#ifndef NDEBUG
bool closed = false;
#endif
/// buffer state for read()
struct {
bufferlist buffer;
size_t remaining;
} r;
#ifdef UNIT_TESTS_BUILT
public:
void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
private:
bp_action_t next_trap_read = bp_action_t::CONTINUE;
bp_action_t next_trap_write = bp_action_t::CONTINUE;
@ -123,9 +156,6 @@ class Socket
seastar::future<> try_trap_pre(bp_action_t& trap);
seastar::future<> try_trap_post(bp_action_t& trap);
public:
void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
#endif
friend class FixedCPUServerSocket;
};
@ -214,7 +244,8 @@ public:
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
peer_addr.set_type(entity_addr_t::TYPE_ANY);
SocketRef _socket = std::make_unique<Socket>(
std::move(socket), Socket::construct_tag{});
std::move(socket), Socket::side_t::acceptor,
peer_addr.get_port(), Socket::construct_tag{});
std::ignore = seastar::with_gate(ss.shutdown_gate,
[socket = std::move(_socket), peer_addr,
&ss, fn_accept = std::move(fn_accept)] () mutable {

View File

@ -63,6 +63,12 @@ bool SocketConnection::is_closed() const
return protocol->is_closed();
}
bool SocketConnection::is_closed_clean() const
{
assert(seastar::engine().cpu_id() == shard_id());
return protocol->is_closed_clean;
}
#endif
bool SocketConnection::peer_wins() const
{
@ -81,10 +87,10 @@ seastar::future<> SocketConnection::keepalive()
return protocol->keepalive();
}
seastar::future<> SocketConnection::close()
void SocketConnection::mark_down()
{
assert(seastar::engine().cpu_id() == shard_id());
return protocol->close();
protocol->close(false);
}
bool SocketConnection::update_rx_seq(seq_num_t seq)
@ -120,19 +126,25 @@ SocketConnection::start_accept(SocketRef&& sock,
protocol->start_accept(std::move(sock), _peer_addr);
}
seastar::future<>
SocketConnection::close_clean(bool dispatch_reset)
{
return protocol->close_clean(dispatch_reset);
}
seastar::shard_id SocketConnection::shard_id() const {
return messenger.shard_id();
}
void SocketConnection::print(ostream& out) const {
messenger.print(out);
if (side == side_t::none) {
if (!protocol->socket) {
out << " >> " << get_peer_name() << " " << peer_addr;
} else if (side == side_t::acceptor) {
} else if (protocol->socket->get_side() == Socket::side_t::acceptor) {
out << " >> " << get_peer_name() << " " << peer_addr
<< "@" << ephemeral_port;
} else { // side == side_t::connector
out << "@" << ephemeral_port
<< "@" << protocol->socket->get_ephemeral_port();
} else { // protocol->socket->get_side() == Socket::side_t::connector
out << "@" << protocol->socket->get_ephemeral_port()
<< " >> " << get_peer_name() << " " << peer_addr;
}
}

View File

@ -33,20 +33,6 @@ class SocketConnection : public Connection {
SocketMessenger& messenger;
std::unique_ptr<Protocol> protocol;
// if acceptor side, ephemeral_port is different from peer_addr.get_port();
// if connector side, ephemeral_port is different from my_addr.get_port().
enum class side_t {
none,
acceptor,
connector
};
side_t side = side_t::none;
uint16_t ephemeral_port = 0;
void set_ephemeral_port(uint16_t port, side_t _side) {
ephemeral_port = port;
side = _side;
}
ceph::net::Policy<crimson::thread::Throttle> policy;
/// the seq num of the last transmitted message
@ -77,6 +63,8 @@ class SocketConnection : public Connection {
bool is_connected() const override;
#ifdef UNIT_TESTS_BUILT
bool is_closed_clean() const override;
bool is_closed() const override;
bool peer_wins() const override;
@ -88,7 +76,7 @@ class SocketConnection : public Connection {
seastar::future<> keepalive() override;
seastar::future<> close() override;
void mark_down() override;
void print(ostream& out) const override;
@ -101,6 +89,8 @@ class SocketConnection : public Connection {
void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr);
seastar::future<> close_clean(bool dispatch_reset);
bool is_server_side() const {
return policy.server;
}

View File

@ -141,6 +141,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe
ceph_assert(peer_addr.get_port() > 0);
if (auto found = lookup_conn(peer_addr); found) {
logger().info("{} connect to existing", *found);
return found->shared_from_this();
}
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
@ -163,12 +164,12 @@ seastar::future<> SocketMessenger::shutdown()
// close all connections
}).then([this] {
return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
return conn->close();
return conn->close_clean(false);
});
}).then([this] {
ceph_assert(accepting_conns.empty());
return seastar::parallel_for_each(connections, [] (auto conn) {
return conn.second->close();
return conn.second->close_clean(false);
});
}).then([this] {
ceph_assert(connections.empty());

View File

@ -113,31 +113,19 @@ void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
}
}
seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers()
Heartbeat::osds_t Heartbeat::remove_down_peers()
{
osds_t osds;
for (auto& peer : peers) {
osds.push_back(peer.first);
auto osd = peer.first;
auto osdmap = service.get_osdmap_service().get_map();
if (!osdmap->is_up(osd)) {
remove_peer(osd);
} else if (peers[osd].epoch < osdmap->get_epoch()) {
osds.push_back(osd);
}
}
return seastar::map_reduce(std::move(osds),
[this](auto& osd) {
auto osdmap = service.get_osdmap_service().get_map();
if (!osdmap->is_up(osd)) {
return remove_peer(osd).then([] {
return seastar::make_ready_future<osd_id_t>(-1);
});
} else if (peers[osd].epoch < osdmap->get_epoch()) {
return seastar::make_ready_future<osd_id_t>(osd);
} else {
return seastar::make_ready_future<osd_id_t>(-1);
}
}, osds_t{},
[](osds_t&& extras, osd_id_t extra) {
if (extra >= 0) {
extras.push_back(extra);
}
return std::move(extras);
});
return osds;
}
void Heartbeat::add_reporter_peers(int whoami)
@ -163,49 +151,37 @@ void Heartbeat::add_reporter_peers(int whoami)
};
}
seastar::future<> Heartbeat::update_peers(int whoami)
void Heartbeat::update_peers(int whoami)
{
const auto min_peers = static_cast<size_t>(
local_conf().get_val<int64_t>("osd_heartbeat_min_peers"));
add_reporter_peers(whoami);
return remove_down_peers().then([=](osds_t&& extra) {
// too many?
struct iteration_state {
osds_t::const_iterator where;
osds_t::const_iterator end;
};
return seastar::do_with(iteration_state{extra.begin(),extra.end()},
[=](iteration_state& s) {
return seastar::do_until(
[min_peers, &s, this] {
return peers.size() <= min_peers || s.where == s.end; },
[&s, this] {
return remove_peer(*s.where); }
);
});
}).then([=] {
// or too few?
auto osdmap = service.get_osdmap_service().get_map();
auto epoch = osdmap->get_epoch();
for (auto next = osdmap->get_next_up_osd_after(whoami);
peers.size() < min_peers && next >= 0 && next != whoami;
next = osdmap->get_next_up_osd_after(next)) {
add_peer(next, epoch);
auto extra = remove_down_peers();
// too many?
for (auto& osd : extra) {
if (peers.size() <= min_peers) {
break;
}
});
remove_peer(osd);
}
// or too few?
auto osdmap = service.get_osdmap_service().get_map();
auto epoch = osdmap->get_epoch();
for (auto next = osdmap->get_next_up_osd_after(whoami);
peers.size() < min_peers && next >= 0 && next != whoami;
next = osdmap->get_next_up_osd_after(next)) {
add_peer(next, epoch);
}
}
seastar::future<> Heartbeat::remove_peer(osd_id_t peer)
void Heartbeat::remove_peer(osd_id_t peer)
{
logger().info("remove_peer({})", peer);
auto found = peers.find(peer);
assert(found != peers.end());
logger().info("remove_peer({})", peer);
return seastar::when_all_succeed(found->second.con_front->close(),
found->second.con_back->close()).then(
[this, peer] {
peers.erase(peer);
return seastar::now();
});
found->second.con_front->mark_down();
found->second.con_back->mark_down();
peers.erase(peer);
}
seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn,
@ -231,9 +207,9 @@ seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn)
}
const auto peer = found->first;
const auto epoch = found->second.epoch;
return remove_peer(peer).then([peer, epoch, this] {
add_peer(peer, epoch);
});
remove_peer(peer);
add_peer(peer, epoch);
return seastar::now();
}
seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
@ -350,46 +326,41 @@ void Heartbeat::heartbeat_check()
seastar::future<> Heartbeat::send_heartbeats()
{
using peers_item_t = typename peers_map_t::value_type;
return seastar::parallel_for_each(peers,
[this](peers_item_t& item) {
const auto mnow = service.get_mnow();
const auto now = clock::now();
const auto deadline =
now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
auto& info = item.second;
info.last_tx = now;
if (clock::is_zero(info.first_tx)) {
info.first_tx = now;
const auto mnow = service.get_mnow();
const auto now = clock::now();
const auto deadline =
now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
const utime_t sent_stamp{now};
std::vector<seastar::future<>> futures;
for (auto& item : peers) {
auto& info = item.second;
info.last_tx = now;
if (clock::is_zero(info.first_tx)) {
info.first_tx = now;
}
[[maybe_unused]] auto [reply, added] =
info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
crimson::net::ConnectionRef conns[] = {info.con_front, info.con_back};
for (auto& con : conns) {
if (con) {
auto min_message = static_cast<uint32_t>(
local_conf()->osd_heartbeat_min_size);
auto ping = make_message<MOSDPing>(
monc.get_fsid(),
service.get_osdmap_service().get_map()->get_epoch(),
MOSDPing::PING,
sent_stamp,
mnow,
mnow,
service.get_osdmap_service().get_up_epoch(),
min_message);
reply->second.unacknowledged++;
futures.push_back(con->send(std::move(ping)));
}
const utime_t sent_stamp{now};
[[maybe_unused]] auto [reply, added] =
info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
std::vector<crimson::net::ConnectionRef> conns{info.con_front,
info.con_back};
return seastar::parallel_for_each(std::move(conns),
[sent_stamp, mnow, &reply=reply->second, this] (auto con) {
if (con) {
auto min_message = static_cast<uint32_t>(
local_conf()->osd_heartbeat_min_size);
auto ping = make_message<MOSDPing>(
monc.get_fsid(),
service.get_osdmap_service().get_map()->get_epoch(),
MOSDPing::PING,
sent_stamp,
mnow,
mnow,
service.get_osdmap_service().get_up_epoch(),
min_message);
return con->send(ping).then([&reply] {
reply.unacknowledged++;
return seastar::now();
});
} else {
return seastar::now();
}
});
});
}
}
return seastar::when_all_succeed(futures.begin(), futures.end());
}
seastar::future<> Heartbeat::send_failures(failure_queue_t&& failure_queue)

View File

@ -35,8 +35,8 @@ public:
seastar::future<> stop();
void add_peer(osd_id_t peer, epoch_t epoch);
seastar::future<> update_peers(int whoami);
seastar::future<> remove_peer(osd_id_t peer);
void update_peers(int whoami);
void remove_peer(osd_id_t peer);
const entity_addrvec_t& get_front_addrs() const;
const entity_addrvec_t& get_back_addrs() const;
@ -62,7 +62,7 @@ private:
using osds_t = std::vector<osd_id_t>;
/// remove down OSDs
/// @return peers not needed in this epoch
seastar::future<osds_t> remove_down_peers();
osds_t remove_down_peers();
/// add enough reporters for fast failure detection
void add_reporter_peers(int whoami);

View File

@ -78,7 +78,7 @@ OSD::OSD(int id, uint32_t nonce,
shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
// do this in background
heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
heartbeat_timer{[this] { update_heartbeat_peers(); }},
asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
{
@ -1049,10 +1049,10 @@ seastar::future<> OSD::send_beacon()
return monc->send_message(m);
}
seastar::future<> OSD::update_heartbeat_peers()
void OSD::update_heartbeat_peers()
{
if (!state.is_active()) {
return seastar::now();
return;
}
for (auto& pg : pg_map.get_pgs()) {
vector<int> up, acting;
@ -1067,7 +1067,7 @@ seastar::future<> OSD::update_heartbeat_peers()
}
}
}
return heartbeat->update_peers(whoami);
heartbeat->update_peers(whoami);
}
seastar::future<> OSD::handle_peering_op(

View File

@ -216,7 +216,7 @@ public:
seastar::future<> shutdown();
seastar::future<> send_beacon();
seastar::future<> update_heartbeat_peers();
void update_heartbeat_peers();
friend class PGAdvanceMap;
};

View File

@ -932,7 +932,7 @@ class FailoverSuite : public Dispatcher {
unsigned pending_establish = 0;
unsigned replaced_conns = 0;
for (auto& result : interceptor.results) {
if (result.conn->is_closed()) {
if (result.conn->is_closed_clean()) {
if (result.state == conn_state_t::replaced) {
++replaced_conns;
}
@ -1122,7 +1122,8 @@ class FailoverSuite : public Dispatcher {
seastar::future<> markdown() {
logger().info("[Test] markdown()");
ceph_assert(tracked_conn);
return tracked_conn->close();
tracked_conn->mark_down();
return seastar::now();
}
seastar::future<> wait_blocked() {
@ -1375,6 +1376,7 @@ class FailoverSuitePeer : public Dispatcher {
}
seastar::future<> ms_handle_accept(ConnectionRef conn) override {
logger().info("[TestPeer] got accept from Test");
ceph_assert(!tracked_conn ||
tracked_conn->is_closed() ||
tracked_conn == conn);
@ -1383,6 +1385,7 @@ class FailoverSuitePeer : public Dispatcher {
}
seastar::future<> ms_handle_reset(ConnectionRef conn) override {
logger().info("[TestPeer] got reset from Test");
ceph_assert(tracked_conn == conn);
tracked_conn = nullptr;
return seastar::now();
@ -1468,7 +1471,8 @@ class FailoverSuitePeer : public Dispatcher {
seastar::future<> markdown() {
logger().info("[TestPeer] markdown()");
ceph_assert(tracked_conn);
return tracked_conn->close();
tracked_conn->mark_down();
return seastar::now();
}
static seastar::future<std::unique_ptr<FailoverSuitePeer>>