From d1b4323606a7bdf3ab0db6eca444c1c346c11354 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 26 Apr 2023 17:33:58 +0800 Subject: [PATCH] crimson/net: adjust the IO path in FrameAssemblerV2 with the foreign socket FrameAssemblerV2 and Socket may in different cores during handshake. Signed-off-by: Yingxin Cheng --- src/crimson/net/FrameAssemblerV2.cc | 107 ++++++++++++++++++++++------ src/crimson/net/FrameAssemblerV2.h | 19 ++++- src/crimson/net/io_handler.cc | 14 ++-- 3 files changed, 111 insertions(+), 29 deletions(-) diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index 4608f83da53..a043fcc5717 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -53,7 +53,12 @@ void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write) auto action = conn.interceptor->intercept( conn.get_local_shared_foreign_from_this(), Breakpoint{tag, type}); - socket->set_trap(type, action, &conn.interceptor->blocker); + // tolerate leaking future in tests + std::ignore = seastar::smp::submit_to( + socket->get_shard_id(), + [this, type, action] { + socket->set_trap(type, action, &conn.interceptor->blocker); + }); } } #endif @@ -165,6 +170,7 @@ void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port) { assert(seastar::this_shard_id() == sid); assert(has_socket()); + // Note: may not invoke on the socket core socket->learn_ephemeral_port_as_connector(port); } @@ -219,74 +225,130 @@ seastar::future<> FrameAssemblerV2::close_shutdown_socket() }); } +template seastar::future FrameAssemblerV2::read_exactly(std::size_t bytes) { assert(seastar::this_shard_id() == sid); assert(has_socket()); - if (unlikely(record_io)) { - return socket->read_exactly(bytes - ).then([this](auto bptr) { - rxbuf.append(bptr); + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + return seastar::smp::submit_to( + socket->get_shard_id(), [this, bytes] { + return socket->read_exactly(bytes); + }).then([this](auto bptr) { + if (record_io) { + rxbuf.append(bptr); + } return bptr; }); } else { + assert(socket->get_shard_id() == sid); return socket->read_exactly(bytes); - }; + } } +template seastar::future FrameAssemblerV2::read_exactly(std::size_t); +template seastar::future FrameAssemblerV2::read_exactly(std::size_t); +template seastar::future FrameAssemblerV2::read(std::size_t bytes) { assert(seastar::this_shard_id() == sid); assert(has_socket()); - if (unlikely(record_io)) { - return socket->read(bytes - ).then([this](auto buf) { - rxbuf.append(buf); + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + return seastar::smp::submit_to( + socket->get_shard_id(), [this, bytes] { + return socket->read(bytes); + }).then([this](auto buf) { + if (record_io) { + rxbuf.append(buf); + } return buf; }); } else { + assert(socket->get_shard_id() == sid); return socket->read(bytes); } } +template seastar::future FrameAssemblerV2::read(std::size_t); +template seastar::future FrameAssemblerV2::read(std::size_t); +template seastar::future<> FrameAssemblerV2::write(ceph::bufferlist buf) { assert(seastar::this_shard_id() == sid); assert(has_socket()); - if (unlikely(record_io)) { - txbuf.append(buf); + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + if (record_io) { + txbuf.append(buf); + } + return seastar::smp::submit_to( + socket->get_shard_id(), [this, buf = std::move(buf)]() mutable { + return socket->write(std::move(buf)); + }); + } else { + assert(socket->get_shard_id() == sid); + return socket->write(std::move(buf)); } - return socket->write(std::move(buf)); } +template seastar::future<> FrameAssemblerV2::write(ceph::bufferlist); +template seastar::future<> FrameAssemblerV2::write(ceph::bufferlist); +template seastar::future<> FrameAssemblerV2::flush() { assert(seastar::this_shard_id() == sid); assert(has_socket()); - return socket->flush(); + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + return seastar::smp::submit_to( + socket->get_shard_id(), [this] { + return socket->flush(); + }); + } else { + assert(socket->get_shard_id() == sid); + return socket->flush(); + } } +template seastar::future<> FrameAssemblerV2::flush(); +template seastar::future<> FrameAssemblerV2::flush(); +template seastar::future<> FrameAssemblerV2::write_flush(ceph::bufferlist buf) { assert(seastar::this_shard_id() == sid); assert(has_socket()); - if (unlikely(record_io)) { - txbuf.append(buf); + if constexpr (may_cross_core) { + assert(conn.get_messenger_shard_id() == sid); + if (unlikely(record_io)) { + txbuf.append(buf); + } + return seastar::smp::submit_to( + socket->get_shard_id(), [this, buf = std::move(buf)]() mutable { + return socket->write_flush(std::move(buf)); + }); + } else { + assert(socket->get_shard_id() == sid); + return socket->write_flush(std::move(buf)); } - return socket->write_flush(std::move(buf)); } +template seastar::future<> FrameAssemblerV2::write_flush(ceph::bufferlist); +template seastar::future<> FrameAssemblerV2::write_flush(ceph::bufferlist); +template seastar::future FrameAssemblerV2::read_main_preamble() { assert(seastar::this_shard_id() == sid); rx_preamble.clear(); - return read_exactly(rx_frame_asm.get_preamble_onwire_len() + return read_exactly( + rx_frame_asm.get_preamble_onwire_len() ).then([this](auto bptr) { try { rx_preamble.append(std::move(bptr)); @@ -301,7 +363,10 @@ FrameAssemblerV2::read_main_preamble() } }); } +template seastar::future FrameAssemblerV2::read_main_preamble(); +template seastar::future FrameAssemblerV2::read_main_preamble(); +template seastar::future FrameAssemblerV2::read_frame_payload() { @@ -321,7 +386,7 @@ FrameAssemblerV2::read_frame_payload() } uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx); // TODO: create aligned and contiguous buffer from socket - return read_exactly(onwire_len + return read_exactly(onwire_len ).then([this](auto bptr) { logger().trace("{} RECV({}) frame segment[{}]", conn, bptr.length(), rx_segments_data.size()); @@ -331,7 +396,7 @@ FrameAssemblerV2::read_frame_payload() }); } ).then([this] { - return read_exactly(rx_frame_asm.get_epilogue_onwire_len()); + return read_exactly(rx_frame_asm.get_epilogue_onwire_len()); }).then([this](auto bptr) { logger().trace("{} RECV({}) frame epilogue", conn, bptr.length()); bool ok = false; @@ -355,6 +420,8 @@ FrameAssemblerV2::read_frame_payload() return &rx_segments_data; }); } +template seastar::future FrameAssemblerV2::read_frame_payload(); +template seastar::future FrameAssemblerV2::read_frame_payload(); void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl) { diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index 0cc495574c2..c2d3318f87d 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -83,14 +83,19 @@ public: * socket read and write interfaces */ + template seastar::future read_exactly(std::size_t bytes); + template seastar::future read(std::size_t bytes); + template seastar::future<> write(ceph::bufferlist); + template seastar::future<> flush(); + template seastar::future<> write_flush(ceph::bufferlist); /* @@ -102,11 +107,13 @@ public: ceph::msgr::v2::Tag tag; const ceph::msgr::v2::FrameAssembler *rx_frame_asm; }; + template seastar::future read_main_preamble(); /// may throw negotiation_failure as fault using read_payload_t = ceph::msgr::v2::segment_bls_t; // FIXME: read_payload_t cannot be no-throw move constructible + template seastar::future read_frame_payload(); template @@ -120,11 +127,11 @@ public: return bl; } - template + template seastar::future<> write_flush_frame(F &tx_frame) { assert(seastar::this_shard_id() == sid); auto bl = get_buffer(tx_frame); - return write_flush(std::move(bl)); + return write_flush(std::move(bl)); } static FrameAssemblerV2Ref create(SocketConnection &conn); @@ -148,10 +155,14 @@ private: // different from the socket sid. bool is_socket_shutdown = false; + // the current working shard, can be messenger or socket shard. + // if is messenger shard, should call interfaces with may_cross_core = true. seastar::shard_id sid; /* * auth signature + * + * only in the messenger core */ bool record_io = false; @@ -179,6 +190,10 @@ private: &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data, &session_comp_handlers}; + // in the messenger core during handshake, + // and in the socket core during open, + // must be cleaned before switching cores. + ceph::bufferlist rx_preamble; read_payload_t rx_segments_data; diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index 0c0cdc76a2c..9fcccde53c4 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -372,7 +372,7 @@ void IOHandler::ack_out_sent(seq_num_t seq) seastar::future IOHandler::try_exit_out_dispatch() { assert(!is_out_queued()); - return frame_assembler->flush( + return frame_assembler->flush( ).then([this] { if (!is_out_queued()) { // still nothing pending to send after flush, @@ -405,7 +405,7 @@ seastar::future<> IOHandler::do_out_dispatch() } auto to_ack = ack_left; assert(to_ack == 0 || in_seq > 0); - return frame_assembler->write( + return frame_assembler->write( sweep_out_pending_msgs_to_sent( need_keepalive, next_keepalive_ack, to_ack > 0) ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] { @@ -505,7 +505,7 @@ void IOHandler::notify_out_dispatch() seastar::future<> IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size) { - return frame_assembler->read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this, throttle_stamp, msg_size](auto payload) { if (unlikely(io_state != io_state_t::open)) { logger().debug("{} triggered {} during read_message()", @@ -623,7 +623,7 @@ void IOHandler::do_in_dispatch() in_exit_dispatching = seastar::promise<>(); gate.dispatch_in_background("do_in_dispatch", conn, [this] { return seastar::keep_doing([this] { - return frame_assembler->read_main_preamble( + return frame_assembler->read_main_preamble( ).then([this](auto ret) { switch (ret.tag) { case Tag::MESSAGE: { @@ -656,7 +656,7 @@ void IOHandler::do_in_dispatch() }); } case Tag::ACK: - return frame_assembler->read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_message_ack() logic auto ack = AckFrame::Decode(payload->back()); @@ -664,7 +664,7 @@ void IOHandler::do_in_dispatch() ack_out_sent(ack.seq()); }); case Tag::KEEPALIVE2: - return frame_assembler->read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_keepalive2() logic auto keepalive_frame = KeepAliveFrame::Decode(payload->back()); @@ -677,7 +677,7 @@ void IOHandler::do_in_dispatch() last_keepalive = seastar::lowres_system_clock::now(); }); case Tag::KEEPALIVE2_ACK: - return frame_assembler->read_frame_payload( + return frame_assembler->read_frame_payload( ).then([this](auto payload) { // handle_keepalive2_ack() logic auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());