crimson/net: adjust the IO path in FrameAssemblerV2 with the foreign socket

FrameAssemblerV2 and Socket may in different cores during handshake.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2023-04-26 17:33:58 +08:00
parent 50bc62a9da
commit d1b4323606
3 changed files with 111 additions and 29 deletions

View File

@ -53,7 +53,12 @@ void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write)
auto action = conn.interceptor->intercept(
conn.get_local_shared_foreign_from_this(),
Breakpoint{tag, type});
socket->set_trap(type, action, &conn.interceptor->blocker);
// tolerate leaking future in tests
std::ignore = seastar::smp::submit_to(
socket->get_shard_id(),
[this, type, action] {
socket->set_trap(type, action, &conn.interceptor->blocker);
});
}
}
#endif
@ -165,6 +170,7 @@ void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
// Note: may not invoke on the socket core
socket->learn_ephemeral_port_as_connector(port);
}
@ -219,74 +225,130 @@ seastar::future<> FrameAssemblerV2::close_shutdown_socket()
});
}
template <bool may_cross_core>
seastar::future<ceph::bufferptr>
FrameAssemblerV2::read_exactly(std::size_t bytes)
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
if (unlikely(record_io)) {
return socket->read_exactly(bytes
).then([this](auto bptr) {
rxbuf.append(bptr);
if constexpr (may_cross_core) {
assert(conn.get_messenger_shard_id() == sid);
return seastar::smp::submit_to(
socket->get_shard_id(), [this, bytes] {
return socket->read_exactly(bytes);
}).then([this](auto bptr) {
if (record_io) {
rxbuf.append(bptr);
}
return bptr;
});
} else {
assert(socket->get_shard_id() == sid);
return socket->read_exactly(bytes);
};
}
}
template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<true>(std::size_t);
template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<false>(std::size_t);
template <bool may_cross_core>
seastar::future<ceph::bufferlist>
FrameAssemblerV2::read(std::size_t bytes)
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
if (unlikely(record_io)) {
return socket->read(bytes
).then([this](auto buf) {
rxbuf.append(buf);
if constexpr (may_cross_core) {
assert(conn.get_messenger_shard_id() == sid);
return seastar::smp::submit_to(
socket->get_shard_id(), [this, bytes] {
return socket->read(bytes);
}).then([this](auto buf) {
if (record_io) {
rxbuf.append(buf);
}
return buf;
});
} else {
assert(socket->get_shard_id() == sid);
return socket->read(bytes);
}
}
template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<true>(std::size_t);
template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<false>(std::size_t);
template <bool may_cross_core>
seastar::future<>
FrameAssemblerV2::write(ceph::bufferlist buf)
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
if (unlikely(record_io)) {
txbuf.append(buf);
if constexpr (may_cross_core) {
assert(conn.get_messenger_shard_id() == sid);
if (record_io) {
txbuf.append(buf);
}
return seastar::smp::submit_to(
socket->get_shard_id(), [this, buf = std::move(buf)]() mutable {
return socket->write(std::move(buf));
});
} else {
assert(socket->get_shard_id() == sid);
return socket->write(std::move(buf));
}
return socket->write(std::move(buf));
}
template seastar::future<> FrameAssemblerV2::write<true>(ceph::bufferlist);
template seastar::future<> FrameAssemblerV2::write<false>(ceph::bufferlist);
template <bool may_cross_core>
seastar::future<>
FrameAssemblerV2::flush()
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
return socket->flush();
if constexpr (may_cross_core) {
assert(conn.get_messenger_shard_id() == sid);
return seastar::smp::submit_to(
socket->get_shard_id(), [this] {
return socket->flush();
});
} else {
assert(socket->get_shard_id() == sid);
return socket->flush();
}
}
template seastar::future<> FrameAssemblerV2::flush<true>();
template seastar::future<> FrameAssemblerV2::flush<false>();
template <bool may_cross_core>
seastar::future<>
FrameAssemblerV2::write_flush(ceph::bufferlist buf)
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
if (unlikely(record_io)) {
txbuf.append(buf);
if constexpr (may_cross_core) {
assert(conn.get_messenger_shard_id() == sid);
if (unlikely(record_io)) {
txbuf.append(buf);
}
return seastar::smp::submit_to(
socket->get_shard_id(), [this, buf = std::move(buf)]() mutable {
return socket->write_flush(std::move(buf));
});
} else {
assert(socket->get_shard_id() == sid);
return socket->write_flush(std::move(buf));
}
return socket->write_flush(std::move(buf));
}
template seastar::future<> FrameAssemblerV2::write_flush<true>(ceph::bufferlist);
template seastar::future<> FrameAssemblerV2::write_flush<false>(ceph::bufferlist);
template <bool may_cross_core>
seastar::future<FrameAssemblerV2::read_main_t>
FrameAssemblerV2::read_main_preamble()
{
assert(seastar::this_shard_id() == sid);
rx_preamble.clear();
return read_exactly(rx_frame_asm.get_preamble_onwire_len()
return read_exactly<may_cross_core>(
rx_frame_asm.get_preamble_onwire_len()
).then([this](auto bptr) {
try {
rx_preamble.append(std::move(bptr));
@ -301,7 +363,10 @@ FrameAssemblerV2::read_main_preamble()
}
});
}
template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<true>();
template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<false>();
template <bool may_cross_core>
seastar::future<FrameAssemblerV2::read_payload_t*>
FrameAssemblerV2::read_frame_payload()
{
@ -321,7 +386,7 @@ FrameAssemblerV2::read_frame_payload()
}
uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
// TODO: create aligned and contiguous buffer from socket
return read_exactly(onwire_len
return read_exactly<may_cross_core>(onwire_len
).then([this](auto bptr) {
logger().trace("{} RECV({}) frame segment[{}]",
conn, bptr.length(), rx_segments_data.size());
@ -331,7 +396,7 @@ FrameAssemblerV2::read_frame_payload()
});
}
).then([this] {
return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
return read_exactly<may_cross_core>(rx_frame_asm.get_epilogue_onwire_len());
}).then([this](auto bptr) {
logger().trace("{} RECV({}) frame epilogue", conn, bptr.length());
bool ok = false;
@ -355,6 +420,8 @@ FrameAssemblerV2::read_frame_payload()
return &rx_segments_data;
});
}
template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<true>();
template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<false>();
void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl)
{

View File

@ -83,14 +83,19 @@ public:
* socket read and write interfaces
*/
template <bool may_cross_core = true>
seastar::future<ceph::bufferptr> read_exactly(std::size_t bytes);
template <bool may_cross_core = true>
seastar::future<ceph::bufferlist> read(std::size_t bytes);
template <bool may_cross_core = true>
seastar::future<> write(ceph::bufferlist);
template <bool may_cross_core = true>
seastar::future<> flush();
template <bool may_cross_core = true>
seastar::future<> write_flush(ceph::bufferlist);
/*
@ -102,11 +107,13 @@ public:
ceph::msgr::v2::Tag tag;
const ceph::msgr::v2::FrameAssembler *rx_frame_asm;
};
template <bool may_cross_core = true>
seastar::future<read_main_t> read_main_preamble();
/// may throw negotiation_failure as fault
using read_payload_t = ceph::msgr::v2::segment_bls_t;
// FIXME: read_payload_t cannot be no-throw move constructible
template <bool may_cross_core = true>
seastar::future<read_payload_t*> read_frame_payload();
template <class F>
@ -120,11 +127,11 @@ public:
return bl;
}
template <class F>
template <class F, bool may_cross_core = true>
seastar::future<> write_flush_frame(F &tx_frame) {
assert(seastar::this_shard_id() == sid);
auto bl = get_buffer(tx_frame);
return write_flush(std::move(bl));
return write_flush<may_cross_core>(std::move(bl));
}
static FrameAssemblerV2Ref create(SocketConnection &conn);
@ -148,10 +155,14 @@ private:
// different from the socket sid.
bool is_socket_shutdown = false;
// the current working shard, can be messenger or socket shard.
// if is messenger shard, should call interfaces with may_cross_core = true.
seastar::shard_id sid;
/*
* auth signature
*
* only in the messenger core
*/
bool record_io = false;
@ -179,6 +190,10 @@ private:
&session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data,
&session_comp_handlers};
// in the messenger core during handshake,
// and in the socket core during open,
// must be cleaned before switching cores.
ceph::bufferlist rx_preamble;
read_payload_t rx_segments_data;

View File

@ -372,7 +372,7 @@ void IOHandler::ack_out_sent(seq_num_t seq)
seastar::future<stop_t> IOHandler::try_exit_out_dispatch() {
assert(!is_out_queued());
return frame_assembler->flush(
return frame_assembler->flush<false>(
).then([this] {
if (!is_out_queued()) {
// still nothing pending to send after flush,
@ -405,7 +405,7 @@ seastar::future<> IOHandler::do_out_dispatch()
}
auto to_ack = ack_left;
assert(to_ack == 0 || in_seq > 0);
return frame_assembler->write(
return frame_assembler->write<false>(
sweep_out_pending_msgs_to_sent(
need_keepalive, next_keepalive_ack, to_ack > 0)
).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
@ -505,7 +505,7 @@ void IOHandler::notify_out_dispatch()
seastar::future<>
IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
{
return frame_assembler->read_frame_payload(
return frame_assembler->read_frame_payload<false>(
).then([this, throttle_stamp, msg_size](auto payload) {
if (unlikely(io_state != io_state_t::open)) {
logger().debug("{} triggered {} during read_message()",
@ -623,7 +623,7 @@ void IOHandler::do_in_dispatch()
in_exit_dispatching = seastar::promise<>();
gate.dispatch_in_background("do_in_dispatch", conn, [this] {
return seastar::keep_doing([this] {
return frame_assembler->read_main_preamble(
return frame_assembler->read_main_preamble<false>(
).then([this](auto ret) {
switch (ret.tag) {
case Tag::MESSAGE: {
@ -656,7 +656,7 @@ void IOHandler::do_in_dispatch()
});
}
case Tag::ACK:
return frame_assembler->read_frame_payload(
return frame_assembler->read_frame_payload<false>(
).then([this](auto payload) {
// handle_message_ack() logic
auto ack = AckFrame::Decode(payload->back());
@ -664,7 +664,7 @@ void IOHandler::do_in_dispatch()
ack_out_sent(ack.seq());
});
case Tag::KEEPALIVE2:
return frame_assembler->read_frame_payload(
return frame_assembler->read_frame_payload<false>(
).then([this](auto payload) {
// handle_keepalive2() logic
auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
@ -677,7 +677,7 @@ void IOHandler::do_in_dispatch()
last_keepalive = seastar::lowres_system_clock::now();
});
case Tag::KEEPALIVE2_ACK:
return frame_assembler->read_frame_payload(
return frame_assembler->read_frame_payload<false>(
).then([this](auto payload) {
// handle_keepalive2_ack() logic
auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());