From a1ed20810bbe05f957e73f70cbe55e5416a3d2d9 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 29 Jan 2019 20:49:29 +0800 Subject: [PATCH] crimson/net: batch messages instead of chaining futures Instead of chaining writes with send_ready, connection will batch messages in out_q, and will reap them by write_events() in the open state. The performance of pingpong is 3.7 times better from observation. Signed-off-by: Yingxin Cheng --- src/crimson/net/SocketConnection.cc | 106 ++++++++++++++++------------ src/crimson/net/SocketConnection.h | 8 +-- 2 files changed, 63 insertions(+), 51 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index a3045b1dedc..ff0c9c07b37 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -68,60 +68,73 @@ seastar::future SocketConnection::is_connected() }); } -//TODO(performance): batch messages in out_q instead of chaining individual write events -//TODO: should discard all the pending messages when reset -seastar::future<> SocketConnection::write_event(MessageRef msg) +void SocketConnection::write_event() { + if (write_dispatching) { + // already dispatching + return; + } + write_dispatching = true; switch (write_state) { case write_state_t::open: case write_state_t::delay: - return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] { - seastar::shared_future<> f = send_ready.then([this, msg = std::move(msg)] { - return seastar::repeat([this, msg=std::move(msg)] { - switch (write_state) { - case write_state_t::open: - return seastar::futurize_apply([this] { - if (m_keepalive) { - return do_keepalive() - .then([this] { m_keepalive = false; }); + seastar::with_gate(pending_dispatch, [this] { + return seastar::repeat([this] { + switch (write_state) { + case write_state_t::open: + return seastar::futurize_apply([this] { + if (m_keepalive) { + return do_keepalive() + .then([this] { m_keepalive = false; }); + } + return seastar::now(); + }).then([this] { + if (m_keepalive_ack) { + return do_keepalive_ack() + .then([this] { m_keepalive_ack = false; }); + } + return seastar::now(); + }).then([this] { + if (!out_q.empty()){ + MessageRef msg = out_q.front(); + return write_message(msg) + .then([this, msg] { + if (msg == out_q.front()) { + out_q.pop(); } - return seastar::now(); - }).then([this] { - if (m_keepalive_ack) { - return do_keepalive_ack() - .then([this] { m_keepalive_ack = false; }); - } - return seastar::now(); - }).then([this, msg] { - if (msg) { - return write_message(msg); - } - return seastar::now(); - }).then([this] { - return socket->flush(); - }).then([] { - return stop_t::yes; - }).handle_exception([this] (std::exception_ptr eptr) { - logger().warn("{} write_event fault: {}", *this, eptr); - close(); return stop_t::no; }); - case write_state_t::delay: - // delay all the writes until open - return state_changed.get_shared_future() - .then([] { return stop_t::no; }); - case write_state_t::drop: - return seastar::make_ready_future(stop_t::yes); - default: - ceph_assert(false); - } - }); + } else { + return socket->flush() + .then([this] { + if (!out_q.empty()) { + return stop_t::no; + } else { + write_dispatching = false; + return stop_t::yes; + } + }); + } + }).handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} write_event fault: {}", *this, eptr); + close(); + return stop_t::no; + }); + case write_state_t::delay: + // delay dispatching writes until open + return state_changed.get_shared_future() + .then([] { return stop_t::no; }); + case write_state_t::drop: + write_dispatching = false; + return seastar::make_ready_future(stop_t::yes); + default: + ceph_assert(false); + } }); - send_ready = f.get_future(); - return f.get_future(); }); + return; case write_state_t::drop: - return seastar::now(); + write_dispatching = false; default: ceph_assert(false); } @@ -131,7 +144,10 @@ seastar::future<> SocketConnection::send(MessageRef msg) { logger().debug("{} --> {} === {}", messenger, get_peer_addr(), *msg); return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] { - return write_event(msg); + if (write_state != write_state_t::drop) { + out_q.push(std::move(msg)); + write_event(); + } }); } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index cc8ab2db49c..93794310c0c 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -123,12 +123,8 @@ class SocketConnection : public Connection { seastar::future<> handle_tags(); seastar::future<> handle_ack(); - seastar::future<> write_event(MessageRef msg=nullptr); - - /// becomes available when handshake completes, and when all previous messages - /// have been sent to the output stream. send() chains new messages as - /// continuations to this future to act as a queue - seastar::future<> send_ready = seastar::now(); + bool write_dispatching = false; + void write_event(); /// encode/write a message seastar::future<> write_message(MessageRef msg);