diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index b8dd99d688e..2a70b9f4b3e 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -96,6 +96,14 @@ void Protocol::notify_keepalive_ack(utime_t _keepalive_ack) write_event(); } +void Protocol::notify_ack() +{ + if (!conn.policy.lossy) { + ++ack_left; + write_event(); + } +} + void Protocol::requeue_sent() { assert(write_state != write_state_t::open); @@ -146,6 +154,7 @@ void Protocol::reset_write() conn.sent.clear(); need_keepalive = false; keepalive_ack = std::nullopt; + ack_left = 0; } void Protocol::ack_writes(seq_num_t seq) @@ -177,14 +186,18 @@ seastar::future Protocol::do_write_dispatch_sweep() conn.pending_q.begin(), conn.pending_q.end()); } + auto acked = ack_left; + assert(acked == 0 || conn.in_seq > 0); // sweep all pending writes with the concrete Protocol return socket->write(do_sweep_messages( - conn.pending_q, num_msgs, need_keepalive, keepalive_ack)) - .then([this, prv_keepalive_ack=keepalive_ack] { + conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0) + ).then([this, prv_keepalive_ack=keepalive_ack, acked] { need_keepalive = false; if (keepalive_ack == prv_keepalive_ack) { keepalive_ack = std::nullopt; } + assert(ack_left >= acked); + ack_left -= acked; if (!is_queued()) { // good, we have nothing pending to send now. return socket->flush().then([this] { diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 5f8a8d4b89b..c3d127083d8 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -44,7 +44,8 @@ class Protocol { const std::deque& msgs, size_t num_msgs, bool require_keepalive, - std::optional keepalive_ack) = 0; + std::optional keepalive_ack, + bool require_ack) = 0; public: const proto_t proto_type; @@ -102,6 +103,8 @@ class Protocol { void notify_keepalive_ack(utime_t keepalive_ack); + void notify_ack(); + void requeue_up_to(seq_num_t seq); void requeue_sent(); @@ -110,6 +113,7 @@ class Protocol { bool is_queued() const { return (!conn.out_q.empty() || + ack_left > 0 || need_keepalive || keepalive_ack.has_value()); } @@ -123,6 +127,7 @@ class Protocol { bool need_keepalive = false; std::optional keepalive_ack = std::nullopt; + uint64_t ack_left = 0; bool write_dispatching = false; // Indicate if we are in the middle of writing. bool open_write = false; diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 2de312faee5..9e2ba07c9b5 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -680,7 +680,8 @@ ceph::bufferlist ProtocolV1::do_sweep_messages( const std::deque& msgs, size_t num_msgs, bool require_keepalive, - std::optional _keepalive_ack) + std::optional _keepalive_ack, + bool require_ack) { static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) + sizeof(ceph_msg_header) + @@ -711,6 +712,15 @@ ceph::bufferlist ProtocolV1::do_sweep_messages( bl.append(create_static(k.ack)); } + if (require_ack) { + // XXX: we decided not to support lossless connection in v1. as the + // client's default policy is + // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is + // lossy. And by the time of crimson-osd's GA, the in-cluster communication + // will all be performed using v2 protocol. + ceph_abort("lossless policy not supported for v1"); + } + std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { ceph_assert(!msg->get_seq() && "message already has seq"); msg->set_seq(++conn.out_seq); diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h index 53539ca0f7f..834016e5ec7 100644 --- a/src/crimson/net/ProtocolV1.h +++ b/src/crimson/net/ProtocolV1.h @@ -30,7 +30,8 @@ class ProtocolV1 final : public Protocol { const std::deque& msgs, size_t num_msgs, bool require_keepalive, - std::optional keepalive_ack) override; + std::optional keepalive_ack, + bool require_ack) override; private: SocketMessenger &messenger; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 7dc055a3dfd..76e0f150889 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1540,7 +1540,8 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( const std::deque& msgs, size_t num_msgs, bool require_keepalive, - std::optional _keepalive_ack) + std::optional _keepalive_ack, + bool require_ack) { ceph::bufferlist bl; @@ -1554,6 +1555,11 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); } + if (require_ack && !num_msgs) { + auto ack_frame = AckFrame::Encode(conn.in_seq); + bl.append(ack_frame.get_buffer(session_stream_handlers)); + } + std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { // TODO: move to common code // set priority @@ -1563,8 +1569,6 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( ceph_assert(!msg->get_seq() && "message already has seq"); msg->set_seq(++conn.out_seq); - uint64_t ack_seq = conn.in_seq; - // ack_left = 0; ceph_msg_header &header = msg->get_header(); ceph_msg_footer &footer = msg->get_footer(); @@ -1573,7 +1577,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages( header.type, header.priority, header.version, 0, header.data_off, - ack_seq, + conn.in_seq, footer.flags, header.compat_version, header.reserved}; @@ -1661,9 +1665,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) conn.in_seq = message->get_seq(); logger().debug("{} <== #{} === {} ({})", conn, message->get_seq(), *message, message->get_type()); - if (!conn.policy.lossy) { - // ++ack_left; - } + notify_ack(); ack_writes(current_header.ack_seq); // TODO: change MessageRef with seastar::shared_ptr diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 5c48fc71927..3bd31af7248 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -29,7 +29,8 @@ class ProtocolV2 final : public Protocol { const std::deque& msgs, size_t num_msgs, bool require_keepalive, - std::optional keepalive_ack) override; + std::optional keepalive_ack, + bool require_ack) override; private: SocketMessenger &messenger;