Merge pull request #41679 from AmnonHanuhov/wip-get_rid_of_pending_q

crimson/net: Use out_q instead of pending_q

Reviewed-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Kefu Chai 2021-06-04 20:13:54 +08:00 committed by GitHub
commit 523fcb711f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 10 deletions

View File

@ -90,6 +90,26 @@ void Protocol::close(bool dispatch_reset,
}); });
} }
ceph::bufferlist Protocol::sweep_messages_and_move_to_sent(
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> keepalive_ack,
bool require_ack)
{
ceph::bufferlist bl = do_sweep_messages(conn.out_q,
num_msgs,
require_keepalive,
keepalive_ack,
require_ack);
if (!conn.policy.lossy) {
conn.sent.insert(conn.sent.end(),
conn.out_q.begin(),
conn.out_q.end());
}
conn.out_q.clear();
return bl;
}
seastar::future<> Protocol::send(MessageRef msg) seastar::future<> Protocol::send(MessageRef msg)
{ {
if (write_state != write_state_t::drop) { if (write_state != write_state_t::drop) {
@ -222,18 +242,11 @@ seastar::future<> Protocol::do_write_dispatch_sweep()
if (unlikely(!still_queued)) { if (unlikely(!still_queued)) {
return try_exit_sweep(); return try_exit_sweep();
} }
conn.pending_q.clear();
conn.pending_q.swap(conn.out_q);
if (!conn.policy.lossy) {
conn.sent.insert(conn.sent.end(),
conn.pending_q.begin(),
conn.pending_q.end());
}
auto acked = ack_left; auto acked = ack_left;
assert(acked == 0 || conn.in_seq > 0); assert(acked == 0 || conn.in_seq > 0);
// sweep all pending writes with the concrete Protocol // sweep all pending writes with the concrete Protocol
return socket->write(do_sweep_messages( return socket->write(sweep_messages_and_move_to_sent(
conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0) num_msgs, need_keepalive, keepalive_ack, acked > 0)
).then([this, prv_keepalive_ack=keepalive_ack, acked] { ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
need_keepalive = false; need_keepalive = false;
if (keepalive_ack == prv_keepalive_ack) { if (keepalive_ack == prv_keepalive_ack) {

View File

@ -65,6 +65,13 @@ class Protocol {
virtual void notify_write() {}; virtual void notify_write() {};
virtual void on_closed() {} virtual void on_closed() {}
private:
ceph::bufferlist sweep_messages_and_move_to_sent(
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> keepalive_ack,
bool require_ack);
public: public:
const proto_t proto_type; const proto_t proto_type;

View File

@ -45,7 +45,6 @@ class SocketConnection : public Connection {
// messages to be resent after connection gets reset // messages to be resent after connection gets reset
std::deque<MessageRef> out_q; std::deque<MessageRef> out_q;
std::deque<MessageRef> pending_q;
// messages sent, but not yet acked by peer // messages sent, but not yet acked by peer
std::deque<MessageRef> sent; std::deque<MessageRef> sent;