mirror of
https://github.com/ceph/ceph
synced 2025-01-19 17:41:39 +00:00
crimson/net: only do zero-copy writes to out_stream
seastar doesn't support mixed buffered writes and zero-copy writes. Signed-off-by: Yingxin <yingxin.cheng@intel.com>
This commit is contained in:
parent
9b66c5bc11
commit
b9061bb9cb
@ -29,6 +29,11 @@
|
||||
|
||||
using namespace ceph::net;
|
||||
|
||||
template <typename T>
|
||||
seastar::net::packet make_static_packet(const T& value) {
|
||||
return { reinterpret_cast<const char*>(&value), sizeof(value) };
|
||||
}
|
||||
|
||||
SocketConnection::SocketConnection(Messenger *messenger,
|
||||
const entity_addr_t& my_addr,
|
||||
const entity_addr_t& peer_addr,
|
||||
@ -318,9 +323,7 @@ seastar::future<> SocketConnection::keepalive()
|
||||
seastar::shared_future<> f = send_ready.then([this] {
|
||||
k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
|
||||
ceph::coarse_real_clock::now());
|
||||
seastar::net::packet msg{reinterpret_cast<const char*>(&k.req),
|
||||
sizeof(k.req)};
|
||||
return out.write(std::move(msg));
|
||||
return out.write(make_static_packet(k.req));
|
||||
}).then([this] {
|
||||
return out.flush();
|
||||
});
|
||||
@ -514,7 +517,7 @@ SocketConnection::send_connect_reply(msgr_tag_t tag,
|
||||
policy.features_supported) |
|
||||
policy.features_required);
|
||||
h.reply.authorizer_len = authorizer_reply.length();
|
||||
return out.write(reinterpret_cast<const char*>(&h.reply), sizeof(h.reply))
|
||||
return out.write(make_static_packet(h.reply))
|
||||
.then([this, reply=std::move(authorizer_reply)]() mutable {
|
||||
return out.write(std::move(reply));
|
||||
}).then([this] {
|
||||
@ -536,7 +539,7 @@ SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
|
||||
h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
|
||||
}
|
||||
h.reply.authorizer_len = authorizer_reply.length();
|
||||
return out.write(reinterpret_cast<const char*>(&h.reply), sizeof(h.reply))
|
||||
return out.write(make_static_packet(h.reply))
|
||||
.then([this, reply=std::move(authorizer_reply)]() mutable {
|
||||
if (reply.length()) {
|
||||
return out.write(std::move(reply));
|
||||
@ -545,9 +548,9 @@ SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
|
||||
}
|
||||
}).then([this] {
|
||||
if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
|
||||
return out.write(reinterpret_cast<const char*>(&in_seq),
|
||||
sizeof(in_seq)).then([this] {
|
||||
return out.flush();
|
||||
return out.write(make_static_packet(in_seq))
|
||||
.then([this] {
|
||||
return out.flush();
|
||||
}).then([this] {
|
||||
return in.read_exactly(sizeof(seq_num_t));
|
||||
}).then([this] (auto buf) {
|
||||
@ -569,9 +572,7 @@ SocketConnection::handle_keepalive2()
|
||||
.then([this] (auto buf) {
|
||||
k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
|
||||
std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
|
||||
seastar::net::packet msg{reinterpret_cast<const char*>(&k.ack),
|
||||
sizeof(k.ack)};
|
||||
return out.write(std::move(msg));
|
||||
return out.write(make_static_packet(k.ack));
|
||||
}).then([this] {
|
||||
return out.flush();
|
||||
});
|
||||
@ -697,8 +698,7 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
|
||||
.then([this] (auto buf) {
|
||||
auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
|
||||
discard_up_to(&out_q, *acked_seq);
|
||||
}).then([this] {
|
||||
return out.write(reinterpret_cast<const char*>(&in_seq), sizeof(in_seq));
|
||||
return out.write(make_static_packet(in_seq));
|
||||
}).then([this] {
|
||||
return out.flush();
|
||||
}).then([this] {
|
||||
@ -774,8 +774,6 @@ seastar::future<> SocketConnection::connect(entity_type_t peer_type,
|
||||
h.connect.authorizer_len = 0;
|
||||
bl.append(create_static(h.connect));
|
||||
};
|
||||
return bl;
|
||||
}).then([this](bufferlist&& bl) {
|
||||
return out.write(std::move(bl));
|
||||
}).then([this] {
|
||||
return out.flush();
|
||||
|
Loading…
Reference in New Issue
Block a user