crimson/net: prevent racing in protocol to switch core and to call io-handler interfaces

Otherwise, calling io-handler interfaces may result in wrong core/order.

This needs to take special care to handle preemptive cases such as
closing and replacing.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2023-06-05 10:40:23 +08:00
parent b10f0bd3cc
commit 5b8003386d
2 changed files with 92 additions and 34 deletions

View File

@ -187,7 +187,7 @@ void ProtocolV2::start_accept(SocketFRef&& new_socket,
execute_accepting();
}
void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
void ProtocolV2::trigger_state_phase1(state_t new_state)
{
ceph_assert_always(!gate.is_closed());
if (new_state == state) {
@ -205,19 +205,27 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
if (state == state_t::READY) {
// from READY
ceph_assert_always(!need_exit_io);
ceph_assert_always(!pr_exit_io.has_value());
need_exit_io = true;
pr_exit_io = seastar::shared_promise<>();
}
bool need_notify_out;
if (new_state == state_t::STANDBY && !conn.policy.server) {
need_notify_out = true;
} else {
need_notify_out = false;
}
auto pre_state = state;
state = new_state;
}
void ProtocolV2::trigger_state_phase2(
state_t new_state, io_state_t new_io_state)
{
ceph_assert_always(new_state == state);
ceph_assert_always(!gate.is_closed());
ceph_assert_always(!pr_switch_io_shard.has_value());
FrameAssemblerV2Ref fa;
if (new_state == state_t::READY) {
@ -231,9 +239,12 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
}
io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out);
if (pre_state == state_t::READY) {
if (need_exit_io) {
// from READY
logger().debug("{} IOHandler::wait_io_exit_dispatching() start ...", conn);
assert(pr_exit_io.has_value());
assert(new_io_state != io_state_t::open);
need_exit_io = false;
gate.dispatch_in_background("exit_io", conn, [this] {
return seastar::smp::submit_to(
io_handler.get_shard_id(), [this] {
@ -246,6 +257,7 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
ceph_assert_always(
seastar::this_shard_id() == frame_assembler->get_shard_id());
ceph_assert_always(!frame_assembler->is_socket_valid());
assert(!need_exit_io);
io_states = ret.io_states;
pr_exit_io->set_value();
pr_exit_io = std::nullopt;
@ -1810,11 +1822,11 @@ void ProtocolV2::trigger_replacing(bool reconnect,
conn, reconnect ? "reconnected" : "connected",
peer_global_seq, connect_seq, client_cookie,
io_states, mover.socket->get_shard_id());
trigger_state(state_t::REPLACING, io_state_t::delay);
if (is_socket_valid) {
frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
}
trigger_state_phase1(state_t::REPLACING);
gate.dispatch_in_background(
"trigger_replacing",
conn,
@ -1832,13 +1844,22 @@ void ProtocolV2::trigger_replacing(bool reconnect,
// state may become CLOSING below, but we cannot abort the chain until
// mover.socket is correctly handled (closed or replaced).
return wait_exit_io(
// this is preemptive
return wait_switch_io_shard(
).then([this] {
if (unlikely(state != state_t::REPLACING)) {
ceph_assert_always(state == state_t::CLOSING);
return seastar::now();
}
trigger_state_phase2(state_t::REPLACING, io_state_t::delay);
return wait_exit_io();
}).then([this] {
if (unlikely(state != state_t::REPLACING)) {
ceph_assert_always(state == state_t::CLOSING);
return seastar::now();
}
ceph_assert_always(frame_assembler);
protocol_timer.cancel();
auto done = std::move(execution_done);
@ -2115,38 +2136,54 @@ void ProtocolV2::do_close(
is_socket_valid = false;
}
trigger_state(state_t::CLOSING, io_state_t::drop);
trigger_state_phase1(state_t::CLOSING);
gate.dispatch_in_background(
"close_io", conn, [this, is_dispatch_reset, is_replace] {
return io_handler.close_io(is_dispatch_reset, is_replace);
});
// this is preemptive
return wait_switch_io_shard(
).then([this, is_dispatch_reset, is_replace] {
trigger_state_phase2(state_t::CLOSING, io_state_t::drop);
logger().debug("{} IOHandler::close_io(reset={}, replace={})",
conn, is_dispatch_reset, is_replace);
std::ignore = gate.close(
).then([this] {
ceph_assert_always(!pr_exit_io.has_value());
if (has_socket) {
ceph_assert_always(frame_assembler);
return frame_assembler->close_shutdown_socket();
} else {
return seastar::now();
}
}).then([this] {
logger().debug("{} closed!", conn);
messenger.closed_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
pr_closed_clean.set_value();
std::ignore = gate.close(
).then([this] {
ceph_assert_always(!need_exit_io);
ceph_assert_always(!pr_exit_io.has_value());
if (has_socket) {
ceph_assert_always(frame_assembler);
return frame_assembler->close_shutdown_socket();
} else {
return seastar::now();
}
}).then([this] {
logger().debug("{} closed!", conn);
messenger.closed_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
pr_closed_clean.set_value();
#ifdef UNIT_TESTS_BUILT
closed_clean = true;
if (conn.interceptor) {
conn.interceptor->register_conn_closed(
conn.get_local_shared_foreign_from_this());
}
closed_clean = true;
if (conn.interceptor) {
conn.interceptor->register_conn_closed(
conn.get_local_shared_foreign_from_this());
}
#endif
}).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
logger().error("{} closing got unexpected exception {}",
conn, eptr);
ceph_abort();
// connection is unreferenced from the messenger,
// so need to hold the additional reference.
}).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
logger().error("{} closing got unexpected exception {}",
conn, eptr);
ceph_abort();
});
return seastar::smp::submit_to(
io_handler.get_shard_id(),
[this, is_dispatch_reset, is_replace] {
return io_handler.close_io(is_dispatch_reset, is_replace);
});
// user can make changes
});
});
}

View File

@ -59,10 +59,19 @@ public:
private:
using io_state_t = IOHandler::io_state_t;
seastar::future<> wait_switch_io_shard() {
if (pr_switch_io_shard.has_value()) {
return pr_switch_io_shard->get_shared_future();
} else {
return seastar::now();
}
}
seastar::future<> wait_exit_io() {
if (pr_exit_io.has_value()) {
return pr_exit_io->get_shared_future();
} else {
assert(!need_exit_io);
return seastar::now();
}
}
@ -94,7 +103,15 @@ private:
return statenames[static_cast<int>(state)];
}
void trigger_state(state_t new_state, io_state_t new_io_state);
void trigger_state_phase1(state_t new_state);
void trigger_state_phase2(state_t new_state, io_state_t new_io_state);
void trigger_state(state_t new_state, io_state_t new_io_state) {
ceph_assert_always(!pr_switch_io_shard.has_value());
trigger_state_phase1(new_state);
trigger_state_phase2(new_state, new_io_state);
}
template <typename Func, typename T>
void gated_execute(const char *what, T &who, Func &&func) {
@ -227,8 +244,12 @@ private:
FrameAssemblerV2Ref frame_assembler;
bool need_notify_out = false;
std::optional<seastar::shared_promise<>> pr_switch_io_shard;
bool need_exit_io = false;
std::optional<seastar::shared_promise<>> pr_exit_io;
AuthConnectionMetaRef auth_meta;