crimson/net: add ESTABLISHING state

With the new ESTABLISHING state, connection lookup and acceptance can be
atomic, solving the issues related to racing connect.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2019-09-06 11:31:06 +08:00
parent 3677af941a
commit 4660615235
2 changed files with 89 additions and 46 deletions

View File

@ -1214,7 +1214,14 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
" this connection", conn, *existing_conn);
existing_proto->dispatch_reset();
existing_proto->close();
return send_server_ident();
if (unlikely(state != state_t::ACCEPTING)) {
logger().debug("{} triggered {} in execute_accepting()",
conn, get_state_name(state));
abort_protocol();
}
execute_establishing();
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
if (existing_proto->server_cookie != 0) {
@ -1348,10 +1355,13 @@ ProtocolV2::server_connect()
}
}
// TODO: atomically register & unaccept the connecton with lookup_conn()
// if everything is OK reply with server identification
return send_server_ident();
if (unlikely(state != state_t::ACCEPTING)) {
logger().debug("{} triggered {} in execute_accepting()",
conn, get_state_name(state));
abort_protocol();
}
execute_establishing();
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
}
@ -1540,8 +1550,7 @@ ProtocolV2::server_reconnect()
void ProtocolV2::execute_accepting()
{
// TODO: change to write_state_t::none
trigger_state(state_t::ACCEPTING, write_state_t::delay, false);
trigger_state(state_t::ACCEPTING, write_state_t::none, false);
seastar::with_gate(pending_dispatch, [this] {
return seastar::futurize_apply([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
@ -1584,41 +1593,21 @@ void ProtocolV2::execute_accepting()
}
}
}).then([this] (next_step_t next) {
if (unlikely(state != state_t::ACCEPTING)) {
logger().debug("{} triggered {} at the end of execute_accepting()",
conn, get_state_name(state));
abort_protocol();
}
switch (next) {
case next_step_t::ready: {
seastar::with_gate(pending_dispatch, [this] {
return dispatcher.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_accept()");
});
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
logger().info("{} accepted: gs={}, pgs={}, cs={},"
" client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie, conn.in_seq, conn.out_seq);
execute_ready();
case next_step_t::ready:
assert(state != state_t::ACCEPTING);
break;
}
case next_step_t::wait: {
case next_step_t::wait:
if (unlikely(state != state_t::ACCEPTING)) {
logger().debug("{} triggered {} at the end of execute_accepting()",
conn, get_state_name(state));
abort_protocol();
}
logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
execute_server_wait();
break;
}
default: {
default:
ceph_abort("impossible next step");
}
}
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
@ -1663,9 +1652,53 @@ seastar::future<> ProtocolV2::finish_auth()
});
}
// ACCEPTING or REPLACING state
// ESTABLISHING
seastar::future<ProtocolV2::next_step_t>
void ProtocolV2::execute_establishing() {
trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
seastar::with_gate(pending_dispatch, [this] {
return dispatcher.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_accept()");
});
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
logger().info("{} accepted: gs={}, pgs={}, cs={},"
" client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie, conn.in_seq, conn.out_seq);
execution_done = seastar::with_gate(pending_dispatch, [this] {
return seastar::futurize_apply([this] {
return send_server_ident();
}).then([this] {
if (unlikely(state != state_t::ESTABLISHING)) {
logger().debug("{} triggered {} at the end of execute_establishing()",
conn, get_state_name(state));
abort_protocol();
}
execute_ready();
}).handle_exception([this] (std::exception_ptr eptr) {
if (state != state_t::ESTABLISHING) {
logger().info("{} execute_establishing() protocol aborted at {} -- {}",
conn, get_state_name(state), eptr);
assert(state == state_t::CLOSING ||
state == state_t::REPLACING);
return;
}
fault(false, "execute_establishing()", eptr);
});
});
}
// ESTABLISHING or REPLACING state
seastar::future<>
ProtocolV2::send_server_ident()
{
// send_server_ident() logic
@ -1707,8 +1740,6 @@ ProtocolV2::send_server_ident()
conn.set_features(connection_features);
return write_frame(server_ident);
}).then([] {
return next_step_t::ready;
});
}
@ -1730,6 +1761,15 @@ void ProtocolV2::trigger_replacing(bool reconnect,
if (socket) {
socket->shutdown();
}
if (!reconnect && new_client_cookie != client_cookie) {
seastar::with_gate(pending_dispatch, [this] {
return dispatcher.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_accept()");
});
}
seastar::with_gate(pending_dispatch,
[this,
reconnect,
@ -1782,9 +1822,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
client_cookie = new_client_cookie;
conn.set_peer_name(new_peer_name);
connection_features = new_conn_features;
return send_server_ident().then([] (next_step_t next) {
assert(next == next_step_t::ready);
});
return send_server_ident();
}
}).then([this] {
if (unlikely(state != state_t::REPLACING)) {
@ -2126,7 +2164,7 @@ void ProtocolV2::trigger_close()
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
} else if (state >= state_t::CONNECTING && state < state_t::CLOSING) {
} else if (state >= state_t::ESTABLISHING && state < state_t::CLOSING) {
messenger.unregister_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));

View File

@ -43,6 +43,7 @@ class ProtocolV2 final : public Protocol {
NONE = 0,
ACCEPTING,
SERVER_WAIT,
ESTABLISHING,
CONNECTING,
READY,
STANDBY,
@ -56,6 +57,7 @@ class ProtocolV2 final : public Protocol {
const char *const statenames[] = {"NONE",
"ACCEPTING",
"SERVER_WAIT",
"ESTABLISHING",
"CONNECTING",
"READY",
"STANDBY",
@ -171,8 +173,11 @@ class ProtocolV2 final : public Protocol {
// CONNECTING/ACCEPTING
seastar::future<> finish_auth();
// ACCEPTING/REPLACING (server)
seastar::future<next_step_t> send_server_ident();
// ESTABLISHING
void execute_establishing();
// ESTABLISHING/REPLACING (server)
seastar::future<> send_server_ident();
// REPLACING (server)
void trigger_replacing(bool reconnect,