mirror of
https://github.com/ceph/ceph
synced 2025-02-24 11:37:37 +00:00
crimson/net: batch messages instead of chaining futures
Instead of chaining writes with send_ready, connection will batch messages in out_q, and will reap them by write_events() in the open state. The performance of pingpong is 3.7 times better from observation. Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
This commit is contained in:
parent
55a76b4864
commit
a1ed20810b
@ -68,60 +68,73 @@ seastar::future<bool> SocketConnection::is_connected()
|
||||
});
|
||||
}
|
||||
|
||||
//TODO(performance): batch messages in out_q instead of chaining individual write events
|
||||
//TODO: should discard all the pending messages when reset
|
||||
seastar::future<> SocketConnection::write_event(MessageRef msg)
|
||||
void SocketConnection::write_event()
|
||||
{
|
||||
if (write_dispatching) {
|
||||
// already dispatching
|
||||
return;
|
||||
}
|
||||
write_dispatching = true;
|
||||
switch (write_state) {
|
||||
case write_state_t::open:
|
||||
case write_state_t::delay:
|
||||
return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
|
||||
seastar::shared_future<> f = send_ready.then([this, msg = std::move(msg)] {
|
||||
return seastar::repeat([this, msg=std::move(msg)] {
|
||||
switch (write_state) {
|
||||
case write_state_t::open:
|
||||
return seastar::futurize_apply([this] {
|
||||
if (m_keepalive) {
|
||||
return do_keepalive()
|
||||
.then([this] { m_keepalive = false; });
|
||||
seastar::with_gate(pending_dispatch, [this] {
|
||||
return seastar::repeat([this] {
|
||||
switch (write_state) {
|
||||
case write_state_t::open:
|
||||
return seastar::futurize_apply([this] {
|
||||
if (m_keepalive) {
|
||||
return do_keepalive()
|
||||
.then([this] { m_keepalive = false; });
|
||||
}
|
||||
return seastar::now();
|
||||
}).then([this] {
|
||||
if (m_keepalive_ack) {
|
||||
return do_keepalive_ack()
|
||||
.then([this] { m_keepalive_ack = false; });
|
||||
}
|
||||
return seastar::now();
|
||||
}).then([this] {
|
||||
if (!out_q.empty()){
|
||||
MessageRef msg = out_q.front();
|
||||
return write_message(msg)
|
||||
.then([this, msg] {
|
||||
if (msg == out_q.front()) {
|
||||
out_q.pop();
|
||||
}
|
||||
return seastar::now();
|
||||
}).then([this] {
|
||||
if (m_keepalive_ack) {
|
||||
return do_keepalive_ack()
|
||||
.then([this] { m_keepalive_ack = false; });
|
||||
}
|
||||
return seastar::now();
|
||||
}).then([this, msg] {
|
||||
if (msg) {
|
||||
return write_message(msg);
|
||||
}
|
||||
return seastar::now();
|
||||
}).then([this] {
|
||||
return socket->flush();
|
||||
}).then([] {
|
||||
return stop_t::yes;
|
||||
}).handle_exception([this] (std::exception_ptr eptr) {
|
||||
logger().warn("{} write_event fault: {}", *this, eptr);
|
||||
close();
|
||||
return stop_t::no;
|
||||
});
|
||||
case write_state_t::delay:
|
||||
// delay all the writes until open
|
||||
return state_changed.get_shared_future()
|
||||
.then([] { return stop_t::no; });
|
||||
case write_state_t::drop:
|
||||
return seastar::make_ready_future<stop_t>(stop_t::yes);
|
||||
default:
|
||||
ceph_assert(false);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
return socket->flush()
|
||||
.then([this] {
|
||||
if (!out_q.empty()) {
|
||||
return stop_t::no;
|
||||
} else {
|
||||
write_dispatching = false;
|
||||
return stop_t::yes;
|
||||
}
|
||||
});
|
||||
}
|
||||
}).handle_exception([this] (std::exception_ptr eptr) {
|
||||
logger().warn("{} write_event fault: {}", *this, eptr);
|
||||
close();
|
||||
return stop_t::no;
|
||||
});
|
||||
case write_state_t::delay:
|
||||
// delay dispatching writes until open
|
||||
return state_changed.get_shared_future()
|
||||
.then([] { return stop_t::no; });
|
||||
case write_state_t::drop:
|
||||
write_dispatching = false;
|
||||
return seastar::make_ready_future<stop_t>(stop_t::yes);
|
||||
default:
|
||||
ceph_assert(false);
|
||||
}
|
||||
});
|
||||
send_ready = f.get_future();
|
||||
return f.get_future();
|
||||
});
|
||||
return;
|
||||
case write_state_t::drop:
|
||||
return seastar::now();
|
||||
write_dispatching = false;
|
||||
default:
|
||||
ceph_assert(false);
|
||||
}
|
||||
@ -131,7 +144,10 @@ seastar::future<> SocketConnection::send(MessageRef msg)
|
||||
{
|
||||
logger().debug("{} --> {} === {}", messenger, get_peer_addr(), *msg);
|
||||
return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
|
||||
return write_event(msg);
|
||||
if (write_state != write_state_t::drop) {
|
||||
out_q.push(std::move(msg));
|
||||
write_event();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -123,12 +123,8 @@ class SocketConnection : public Connection {
|
||||
seastar::future<> handle_tags();
|
||||
seastar::future<> handle_ack();
|
||||
|
||||
seastar::future<> write_event(MessageRef msg=nullptr);
|
||||
|
||||
/// becomes available when handshake completes, and when all previous messages
|
||||
/// have been sent to the output stream. send() chains new messages as
|
||||
/// continuations to this future to act as a queue
|
||||
seastar::future<> send_ready = seastar::now();
|
||||
bool write_dispatching = false;
|
||||
void write_event();
|
||||
|
||||
/// encode/write a message
|
||||
seastar::future<> write_message(MessageRef msg);
|
||||
|
Loading…
Reference in New Issue
Block a user