Merge pull request #25716 from cyx1231st/wip-crimson-msgr-errorleak

crimson/net: fix crimson msgr error leaks to caller

Reviewed-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2019-01-05 18:45:50 +08:00 committed by GitHub
commit 43b6c32964
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 102 deletions

View File

@ -54,9 +54,6 @@ SocketConnection::SocketConnection(SocketMessenger& messenger,
SocketConnection::~SocketConnection()
{
ceph_assert(pending_dispatch.is_closed());
// errors were reported to callers of send()
ceph_assert(send_ready.available());
send_ready.ignore_ready_future();
}
ceph::net::Messenger*
@ -69,53 +66,66 @@ bool SocketConnection::is_connected()
return !send_ready.failed();
}
void SocketConnection::read_tags_until_next_message()
seastar::future<> SocketConnection::send(MessageRef msg)
{
seastar::repeat([this] {
// read the next tag
return socket->read_exactly(1)
.then([this] (auto buf) {
if (buf.empty()) {
throw std::system_error(make_error_code(error::read_eof));
}
switch (buf[0]) {
case CEPH_MSGR_TAG_MSG:
// stop looping and notify read_header()
return seastar::make_ready_future<stop_t>(stop_t::yes);
case CEPH_MSGR_TAG_ACK:
return handle_ack();
case CEPH_MSGR_TAG_KEEPALIVE:
break;
case CEPH_MSGR_TAG_KEEPALIVE2:
return handle_keepalive2()
.then([this] { return stop_t::no; });
case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
return handle_keepalive2_ack()
.then([this] { return stop_t::no; });
case CEPH_MSGR_TAG_CLOSE:
logger().info("{} got tag close", *this);
break;
}
return seastar::make_ready_future<stop_t>(stop_t::no);
if (state == state_t::closing)
return seastar::now();
return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
return do_send(std::move(msg))
.handle_exception([this] (std::exception_ptr eptr) {
logger().warn("{} send fault: {}", *this, eptr);
close();
});
}).handle_exception_type([this] (const std::system_error& e) {
if (e.code() == error::read_eof) {
close();
}
throw e;
}).then_wrapped([this] (auto fut) {
// satisfy the message promise
fut.forward_to(std::move(on_message));
});
}
seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
seastar::future<> SocketConnection::keepalive()
{
if (state == state_t::closing)
return seastar::now();
return seastar::with_gate(pending_dispatch, [this] {
return do_keepalive()
.handle_exception([this] (std::exception_ptr eptr) {
logger().warn("{} keepalive fault: {}", *this, eptr);
close();
});
});
}
seastar::future<> SocketConnection::handle_tags()
{
return seastar::keep_doing([this] {
// read the next tag
return socket->read_exactly(1)
.then([this] (auto buf) {
switch (buf[0]) {
case CEPH_MSGR_TAG_MSG:
return read_message();
case CEPH_MSGR_TAG_ACK:
return handle_ack();
case CEPH_MSGR_TAG_KEEPALIVE:
return seastar::now();
case CEPH_MSGR_TAG_KEEPALIVE2:
return handle_keepalive2();
case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
return handle_keepalive2_ack();
case CEPH_MSGR_TAG_CLOSE:
logger().info("{} got tag close", *this);
throw std::system_error(make_error_code(error::connection_aborted));
default:
logger().error("{} got unknown msgr tag {}", *this, static_cast<int>(buf[0]));
throw std::system_error(make_error_code(error::read_eof));
}
});
});
}
seastar::future<> SocketConnection::handle_ack()
{
return socket->read_exactly(sizeof(ceph_le64))
.then([this] (auto buf) {
auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
discard_up_to(&sent, *seq);
return stop_t::no;
});
}
@ -149,14 +159,10 @@ seastar::future<> SocketConnection::maybe_throttle()
return policy.throttler_bytes->get(to_read);
}
seastar::future<MessageRef> SocketConnection::do_read_message()
seastar::future<> SocketConnection::read_message()
{
return on_message.get_future()
.then([this] {
on_message = seastar::promise<>{};
// read header
return socket->read(sizeof(m.header));
}).then([this] (bufferlist bl) {
return socket->read(sizeof(m.header))
.then([this] (bufferlist bl) {
// throttle the traffic, maybe
auto p = bl.cbegin();
::decode(m.header, p);
@ -177,30 +183,27 @@ seastar::future<MessageRef> SocketConnection::do_read_message()
// read footer
return socket->read(sizeof(m.footer));
}).then([this] (bufferlist bl) {
// resume background processing of tags
read_tags_until_next_message();
auto p = bl.cbegin();
::decode(m.footer, p);
auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
m.front, m.middle, m.data, nullptr);
// TODO: set time stamps
msg->set_byte_throttler(policy.throttler_bytes);
constexpr bool add_ref = false; // Message starts with 1 ref
return MessageRef{msg, add_ref};
});
}
seastar::future<MessageRef> SocketConnection::read_message()
{
return seastar::repeat_until_value([this] {
return do_read_message()
.then([this] (MessageRef msg) -> std::optional<MessageRef> {
if (!update_rx_seq(msg->get_seq())) {
// skip this request and read the next
return {};
}
return msg;
if (!update_rx_seq(msg->get_seq())) {
// skip this message
return;
}
constexpr bool add_ref = false; // Message starts with 1 ref
auto msg_ref = MessageRef{msg, add_ref};
// start dispatch, ignoring exceptions from the application layer
seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
return dispatcher.ms_dispatch(this, std::move(msg))
.handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_dispatch caught exception: {}", *this, eptr);
ceph_assert(false);
});
});
});
}
@ -263,11 +266,14 @@ seastar::future<> SocketConnection::write_message(MessageRef msg)
});
}
seastar::future<> SocketConnection::send(MessageRef msg)
seastar::future<> SocketConnection::do_send(MessageRef msg)
{
// chain the message after the last message is sent
// TODO: retry send for lossless connection
seastar::shared_future<> f = send_ready.then(
[this, msg = std::move(msg)] {
if (state == state_t::closing)
return seastar::now();
return write_message(std::move(msg));
});
@ -277,9 +283,12 @@ seastar::future<> SocketConnection::send(MessageRef msg)
return f.get_future();
}
seastar::future<> SocketConnection::keepalive()
seastar::future<> SocketConnection::do_keepalive()
{
// TODO: retry keepalive for lossless connection
seastar::shared_future<> f = send_ready.then([this] {
if (state == state_t::closing)
return seastar::now();
k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
ceph::coarse_real_clock::now());
return socket->write_flush(make_static_packet(k.req));
@ -813,11 +822,6 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
return seastar::repeat([this] {
return repeat_connect();
});
}).then_wrapped([this] (auto fut) {
// TODO: do not forward the exception
// and let the reconnect happen transparently inside connection
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
}).then([this] {
// notify the dispatcher and allow them to reject the connection
return dispatcher.ms_handle_connect(this);
@ -826,6 +830,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the connecting state
logger().warn("{} connecting fault: {}", *this, eptr);
h.promise.set_value();
close();
});
});
@ -864,15 +869,9 @@ SocketConnection::start_accept(seastar::connected_socket&& fd,
peer_addr.set_type(addr.get_type());
peer_addr.set_port(addr.get_port());
peer_addr.set_nonce(addr.get_nonce());
}).then([this] {
return seastar::repeat([this] {
return repeat_handle_connect();
});
}).then_wrapped([this] (auto fut) {
// TODO: do not forward the exception
// and let the reconnect happen transparently inside connection
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
}).then([this] {
// notify the dispatcher and allow them to reject the connection
return dispatcher.ms_handle_accept(this);
@ -883,6 +882,7 @@ SocketConnection::start_accept(seastar::connected_socket&& fd,
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the accepting state
logger().warn("{} accepting fault: {}", *this, eptr);
h.promise.set_value();
close();
});
});
@ -893,32 +893,31 @@ SocketConnection::execute_open()
{
logger().debug("{} trigger open, was {}", *this, static_cast<int>(state));
state = state_t::open;
// satisfy the handshake's promise
h.promise.set_value();
seastar::with_gate(pending_dispatch, [this] {
// start background processing of tags
read_tags_until_next_message();
return seastar::keep_doing([this] {
return read_message()
.then([this] (MessageRef msg) {
// start dispatch, ignoring exceptions from the application layer
seastar::with_gate(pending_dispatch, [this, msg = std::move(msg)] {
return dispatcher.ms_dispatch(this, std::move(msg))
.handle_exception([] (std::exception_ptr eptr) {});
});
// return immediately to start on the next message
return seastar::now();
});
}).handle_exception_type([this] (const std::system_error& e) {
return handle_tags()
.handle_exception_type([this] (const std::system_error& e) {
logger().warn("{} open fault: {}", *this, e);
if (e.code() == error::connection_aborted ||
e.code() == error::connection_reset) {
return dispatcher.ms_handle_reset(this);
return dispatcher.ms_handle_reset(this)
.then([this] {
close();
});
} else if (e.code() == error::read_eof) {
return dispatcher.ms_handle_remote_reset(this);
return dispatcher.ms_handle_remote_reset(this)
.then([this] {
close();
});
} else {
throw e;
}
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the open state
logger().warn("{} open fault: {}", *this, eptr);
close();
});
});
}

View File

@ -108,13 +108,9 @@ class SocketConnection : public Connection {
bufferlist data;
} m;
/// satisfied when a CEPH_MSGR_TAG_MSG is read, indicating that a message
/// header will follow
seastar::promise<> on_message;
seastar::future<> maybe_throttle();
void read_tags_until_next_message();
seastar::future<stop_t> handle_ack();
seastar::future<> handle_tags();
seastar::future<> handle_ack();
/// becomes available when handshake completes, and when all previous messages
/// have been sent to the output stream. send() chains new messages as
@ -139,7 +135,7 @@ class SocketConnection : public Connection {
/// false otherwise.
bool update_rx_seq(seq_num_t seq);
seastar::future<MessageRef> do_read_message();
seastar::future<> read_message();
std::unique_ptr<AuthSessionHandler> session_security;
@ -165,6 +161,9 @@ class SocketConnection : public Connection {
void execute_open();
seastar::future<> do_send(MessageRef msg);
seastar::future<> do_keepalive();
public:
SocketConnection(SocketMessenger& messenger,
Dispatcher& dispatcher);
@ -196,9 +195,6 @@ class SocketConnection : public Connection {
void start_accept(seastar::connected_socket&& socket,
const entity_addr_t& peer_addr);
/// read a message from a connection that has completed its handshake
seastar::future<MessageRef> read_message();
/// the number of connections initiated in this session, increment when a
/// new connection is established
uint32_t connect_seq() const {