diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 6c64db82fd5..f7f2bccfe41 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -187,7 +187,7 @@ void ProtocolV2::start_accept(SocketFRef&& new_socket, execute_accepting(); } -void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state) +void ProtocolV2::trigger_state_phase1(state_t new_state) { ceph_assert_always(!gate.is_closed()); if (new_state == state) { @@ -205,19 +205,27 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state) if (state == state_t::READY) { // from READY + ceph_assert_always(!need_exit_io); ceph_assert_always(!pr_exit_io.has_value()); + need_exit_io = true; pr_exit_io = seastar::shared_promise<>(); } - bool need_notify_out; if (new_state == state_t::STANDBY && !conn.policy.server) { need_notify_out = true; } else { need_notify_out = false; } - auto pre_state = state; state = new_state; +} + +void ProtocolV2::trigger_state_phase2( + state_t new_state, io_state_t new_io_state) +{ + ceph_assert_always(new_state == state); + ceph_assert_always(!gate.is_closed()); + ceph_assert_always(!pr_switch_io_shard.has_value()); FrameAssemblerV2Ref fa; if (new_state == state_t::READY) { @@ -231,9 +239,12 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state) } io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out); - if (pre_state == state_t::READY) { + if (need_exit_io) { + // from READY logger().debug("{} IOHandler::wait_io_exit_dispatching() start ...", conn); + assert(pr_exit_io.has_value()); assert(new_io_state != io_state_t::open); + need_exit_io = false; gate.dispatch_in_background("exit_io", conn, [this] { return seastar::smp::submit_to( io_handler.get_shard_id(), [this] { @@ -246,6 +257,7 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state) ceph_assert_always( seastar::this_shard_id() == frame_assembler->get_shard_id()); ceph_assert_always(!frame_assembler->is_socket_valid()); + assert(!need_exit_io); io_states = ret.io_states; pr_exit_io->set_value(); pr_exit_io = std::nullopt; @@ -1810,11 +1822,11 @@ void ProtocolV2::trigger_replacing(bool reconnect, conn, reconnect ? "reconnected" : "connected", peer_global_seq, connect_seq, client_cookie, io_states, mover.socket->get_shard_id()); - trigger_state(state_t::REPLACING, io_state_t::delay); if (is_socket_valid) { frame_assembler->shutdown_socket(&gate); is_socket_valid = false; } + trigger_state_phase1(state_t::REPLACING); gate.dispatch_in_background( "trigger_replacing", conn, @@ -1832,13 +1844,22 @@ void ProtocolV2::trigger_replacing(bool reconnect, // state may become CLOSING below, but we cannot abort the chain until // mover.socket is correctly handled (closed or replaced). - return wait_exit_io( + // this is preemptive + return wait_switch_io_shard( ).then([this] { if (unlikely(state != state_t::REPLACING)) { ceph_assert_always(state == state_t::CLOSING); return seastar::now(); } + trigger_state_phase2(state_t::REPLACING, io_state_t::delay); + return wait_exit_io(); + }).then([this] { + if (unlikely(state != state_t::REPLACING)) { + ceph_assert_always(state == state_t::CLOSING); + return seastar::now(); + } + ceph_assert_always(frame_assembler); protocol_timer.cancel(); auto done = std::move(execution_done); @@ -2115,38 +2136,54 @@ void ProtocolV2::do_close( is_socket_valid = false; } - trigger_state(state_t::CLOSING, io_state_t::drop); + trigger_state_phase1(state_t::CLOSING); gate.dispatch_in_background( "close_io", conn, [this, is_dispatch_reset, is_replace] { - return io_handler.close_io(is_dispatch_reset, is_replace); - }); + // this is preemptive + return wait_switch_io_shard( + ).then([this, is_dispatch_reset, is_replace] { + trigger_state_phase2(state_t::CLOSING, io_state_t::drop); + logger().debug("{} IOHandler::close_io(reset={}, replace={})", + conn, is_dispatch_reset, is_replace); - std::ignore = gate.close( - ).then([this] { - ceph_assert_always(!pr_exit_io.has_value()); - if (has_socket) { - ceph_assert_always(frame_assembler); - return frame_assembler->close_shutdown_socket(); - } else { - return seastar::now(); - } - }).then([this] { - logger().debug("{} closed!", conn); - messenger.closed_conn( - seastar::static_pointer_cast( - conn.shared_from_this())); - pr_closed_clean.set_value(); + std::ignore = gate.close( + ).then([this] { + ceph_assert_always(!need_exit_io); + ceph_assert_always(!pr_exit_io.has_value()); + if (has_socket) { + ceph_assert_always(frame_assembler); + return frame_assembler->close_shutdown_socket(); + } else { + return seastar::now(); + } + }).then([this] { + logger().debug("{} closed!", conn); + messenger.closed_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + pr_closed_clean.set_value(); #ifdef UNIT_TESTS_BUILT - closed_clean = true; - if (conn.interceptor) { - conn.interceptor->register_conn_closed( - conn.get_local_shared_foreign_from_this()); - } + closed_clean = true; + if (conn.interceptor) { + conn.interceptor->register_conn_closed( + conn.get_local_shared_foreign_from_this()); + } #endif - }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) { - logger().error("{} closing got unexpected exception {}", - conn, eptr); - ceph_abort(); + // connection is unreferenced from the messenger, + // so need to hold the additional reference. + }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) { + logger().error("{} closing got unexpected exception {}", + conn, eptr); + ceph_abort(); + }); + + return seastar::smp::submit_to( + io_handler.get_shard_id(), + [this, is_dispatch_reset, is_replace] { + return io_handler.close_io(is_dispatch_reset, is_replace); + }); + // user can make changes + }); }); } diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index d74425344b4..f083d1721ac 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -59,10 +59,19 @@ public: private: using io_state_t = IOHandler::io_state_t; + seastar::future<> wait_switch_io_shard() { + if (pr_switch_io_shard.has_value()) { + return pr_switch_io_shard->get_shared_future(); + } else { + return seastar::now(); + } + } + seastar::future<> wait_exit_io() { if (pr_exit_io.has_value()) { return pr_exit_io->get_shared_future(); } else { + assert(!need_exit_io); return seastar::now(); } } @@ -94,7 +103,15 @@ private: return statenames[static_cast(state)]; } - void trigger_state(state_t new_state, io_state_t new_io_state); + void trigger_state_phase1(state_t new_state); + + void trigger_state_phase2(state_t new_state, io_state_t new_io_state); + + void trigger_state(state_t new_state, io_state_t new_io_state) { + ceph_assert_always(!pr_switch_io_shard.has_value()); + trigger_state_phase1(new_state); + trigger_state_phase2(new_state, new_io_state); + } template void gated_execute(const char *what, T &who, Func &&func) { @@ -227,8 +244,12 @@ private: FrameAssemblerV2Ref frame_assembler; + bool need_notify_out = false; + std::optional> pr_switch_io_shard; + bool need_exit_io = false; + std::optional> pr_exit_io; AuthConnectionMetaRef auth_meta;