From 3f853538b172d538aaf0b345a237773e7d1e8c69 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sat, 30 Oct 2021 19:47:02 -0400 Subject: [PATCH] rgw/beast: replace beast::tcp_stream with manual timeouts remove the beast::tcp_stream wrapper from the socket, and track timeouts manually with a timeout_timer. this timer uses ceph's coarse_mono_clock which is cheaper to sample than std::chrono::steady_clock Signed-off-by: Casey Bodley --- src/rgw/rgw_asio_frontend.cc | 72 ++++++++++++++++++------------- src/rgw/rgw_asio_frontend_timer.h | 53 +++++++++++++++++++++++ 2 files changed, 94 insertions(+), 31 deletions(-) create mode 100644 src/rgw/rgw_asio_frontend_timer.h diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index 2d1fab0bdbe..2954e64883d 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -21,7 +21,6 @@ #ifdef WITH_RADOSGW_BEAST_OPENSSL #include -#include #endif #include "common/split.h" @@ -31,6 +30,7 @@ #include "rgw_zone.h" +#include "rgw_asio_frontend_timer.h" #include "rgw_dmclock_async_scheduler.h" #define dout_subsys ceph_subsys_rgw @@ -49,6 +49,9 @@ using executor_type = boost::asio::io_context::executor_type; using tcp_socket = boost::asio::basic_stream_socket; using tcp_stream = boost::beast::basic_stream; +using timeout_timer = rgw::basic_timeout_timer; + using parse_buffer = boost::beast::flat_static_buffer<65536>; // use mmap/mprotect to allocate 512k coroutine stacks @@ -62,32 +65,35 @@ template class StreamIO : public rgw::asio::ClientIO { CephContext* const cct; Stream& stream; + timeout_timer& timeout; yield_context yield; parse_buffer& buffer; ceph::timespan request_timeout; public: - StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser, - yield_context yield, parse_buffer& buffer, bool is_ssl, + StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout, + rgw::asio::parser_type& parser, yield_context yield, + parse_buffer& buffer, bool is_ssl, const tcp::endpoint& local_endpoint, const tcp::endpoint& remote_endpoint, ceph::timespan request_timeout) : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint), - cct(cct), stream(stream), yield(yield), buffer(buffer), request_timeout(request_timeout) + cct(cct), stream(stream), timeout(timeout), yield(yield), + buffer(buffer), request_timeout(request_timeout) {} size_t write_data(const char* buf, size_t len) override { boost::system::error_code ec; - auto& timeout = get_lowest_layer(stream); if (request_timeout.count()) { - timeout.expires_after(request_timeout); + timeout.expires_after(stream.lowest_layer(), request_timeout); } auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len), yield[ec]); + timeout.cancel(); if (ec) { ldout(cct, 4) << "write_data failed: " << ec.message() << dendl; - if (ec==boost::asio::error::broken_pipe) { + if (ec == boost::asio::error::broken_pipe) { boost::system::error_code ec_ignored; - timeout.socket().shutdown(tcp_socket::shutdown_both, ec_ignored); + stream.lowest_layer().shutdown(tcp_socket::shutdown_both, ec_ignored); } throw rgw::io::Exception(ec.value(), std::system_category()); } @@ -95,7 +101,6 @@ class StreamIO : public rgw::asio::ClientIO { } size_t recv_body(char* buf, size_t max) override { - auto& timeout = get_lowest_layer(stream); auto& message = parser.get(); auto& body_remaining = message.body(); body_remaining.data = buf; @@ -104,9 +109,10 @@ class StreamIO : public rgw::asio::ClientIO { while (body_remaining.size && !parser.is_done()) { boost::system::error_code ec; if (request_timeout.count()) { - timeout.expires_after(request_timeout); + timeout.expires_after(stream.lowest_layer(), request_timeout); } http::async_read_some(stream, buffer, parser, yield[ec]); + timeout.cancel(); if (ec == http::error::need_buffer) { break; } @@ -175,6 +181,7 @@ using SharedMutex = ceph::async::SharedMutex void handle_connection(boost::asio::io_context& context, RGWProcessEnv& env, Stream& stream, + timeout_timer& timeout, parse_buffer& buffer, bool is_ssl, SharedMutex& pause_mutex, rgw::dmclock::Scheduler *scheduler, @@ -195,12 +202,12 @@ void handle_connection(boost::asio::io_context& context, rgw::asio::parser_type parser; parser.header_limit(header_limit); parser.body_limit(body_limit); - auto& timeout = get_lowest_layer(stream); if (request_timeout.count()) { - timeout.expires_after(request_timeout); + timeout.expires_after(stream.lowest_layer(), request_timeout); } // parse the header http::async_read_header(stream, buffer, parser, yield[ec]); + timeout.cancel(); if (ec == boost::asio::error::connection_reset || ec == boost::asio::error::bad_descriptor || ec == boost::asio::error::operation_aborted || @@ -219,9 +226,10 @@ void handle_connection(boost::asio::io_context& context, response.version(message.version() == 10 ? 10 : 11); response.prepare_payload(); if (request_timeout.count()) { - timeout.expires_after(request_timeout); + timeout.expires_after(stream.lowest_layer(), request_timeout); } http::async_write(stream, response, yield[ec]); + timeout.cancel(); if (ec) { ldout(cct, 5) << "failed to write response: " << ec.message() << dendl; } @@ -241,16 +249,16 @@ void handle_connection(boost::asio::io_context& context, // process the request RGWRequest req{env.store->get_new_req_id()}; - auto& socket = get_lowest_layer(stream).socket(); + auto& socket = stream.lowest_layer(); const auto& remote_endpoint = socket.remote_endpoint(ec); if (ec) { ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl; return; } - StreamIO real_client{cct, stream, parser, yield, buffer, is_ssl, - socket.local_endpoint(), - remote_endpoint,request_timeout}; + StreamIO real_client{cct, stream, timeout, parser, yield, buffer, + is_ssl, socket.local_endpoint(), + remote_endpoint, request_timeout}; auto real_client_io = rgw::io::add_reordering( rgw::io::add_buffering(cct, @@ -299,9 +307,10 @@ void handle_connection(boost::asio::io_context& context, body.data = discard_buffer.data(); if (request_timeout.count()) { - timeout.expires_after(request_timeout); + timeout.expires_after(stream.lowest_layer(), request_timeout); } http::async_read_some(stream, buffer, parser, yield[ec]); + timeout.cancel(); if (ec == http::error::need_buffer) { continue; } @@ -946,44 +955,44 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl; return; } - auto socket = std::move(l.socket); - tcp::no_delay options(l.use_nodelay); - socket.set_option(options,ec); + auto stream = std::move(l.socket); + stream.set_option(tcp::no_delay(l.use_nodelay), ec); l.acceptor.async_accept(l.socket, [this, &l] (boost::system::error_code ec) { accept(l, ec); }); - tcp_stream stream(std::move(socket)); // spawn a coroutine to handle the connection #ifdef WITH_RADOSGW_BEAST_OPENSSL if (l.use_ssl) { spawn::spawn(context, [this, s=std::move(stream)] (yield_context yield) mutable { - Connection conn{s.socket()}; + Connection conn{s}; auto c = connections.add(conn); - // wrap the tcp_stream in an ssl stream - boost::beast::ssl_stream stream{s, *ssl_context}; + // wrap the tcp stream in an ssl stream + boost::asio::ssl::stream stream{s, *ssl_context}; + auto timeout = timeout_timer{context.get_executor()}; auto buffer = std::make_unique(); // do ssl handshake boost::system::error_code ec; if (request_timeout.count()) { - get_lowest_layer(stream).expires_after(request_timeout); + timeout.expires_after(s, request_timeout); } auto bytes = stream.async_handshake(ssl::stream_base::server, buffer->data(), yield[ec]); + timeout.cancel(); if (ec) { ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl; return; } buffer->consume(bytes); - handle_connection(context, env, stream, *buffer, true, pause_mutex, + handle_connection(context, env, stream, timeout, *buffer, true, pause_mutex, scheduler.get(), ec, yield, request_timeout); if (!ec) { // ssl shutdown (ignoring errors) stream.async_shutdown(yield[ec]); } - s.socket().shutdown(tcp_socket::shutdown_both, ec); + s.shutdown(tcp::socket::shutdown_both, ec); }, make_stack_allocator()); } else { #else @@ -991,13 +1000,14 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) #endif // WITH_RADOSGW_BEAST_OPENSSL spawn::spawn(context, [this, s=std::move(stream)] (yield_context yield) mutable { - Connection conn{s.socket()}; + Connection conn{s}; auto c = connections.add(conn); + auto timeout = timeout_timer{context.get_executor()}; auto buffer = std::make_unique(); boost::system::error_code ec; - handle_connection(context, env, s, *buffer, false, pause_mutex, + handle_connection(context, env, s, timeout, *buffer, false, pause_mutex, scheduler.get(), ec, yield, request_timeout); - s.socket().shutdown(tcp_socket::shutdown_both, ec); + s.shutdown(tcp_socket::shutdown_both, ec); }, make_stack_allocator()); } } diff --git a/src/rgw/rgw_asio_frontend_timer.h b/src/rgw/rgw_asio_frontend_timer.h new file mode 100644 index 00000000000..4fc81ee6585 --- /dev/null +++ b/src/rgw/rgw_asio_frontend_timer.h @@ -0,0 +1,53 @@ +#pragma once + +#include + +#include "common/ceph_time.h" + +namespace rgw { + +// a WaitHandler that closes a stream if the timeout expires +template +struct timeout_handler { + Stream* stream; + + explicit timeout_handler(Stream* stream) noexcept : stream(stream) {} + + void operator()(boost::system::error_code ec) { + if (!ec) { // wait was not canceled + boost::system::error_code ec_ignored; + stream->close(ec_ignored); + } + } +}; + +// a timeout timer for stream operations +template +class basic_timeout_timer { + public: + using clock_type = Clock; + using duration = typename clock_type::duration; + using executor_type = Executor; + + explicit basic_timeout_timer(const executor_type& ex) : timer(ex) {} + + basic_timeout_timer(const basic_timeout_timer&) = delete; + basic_timeout_timer& operator=(const basic_timeout_timer&) = delete; + + template + void expires_after(Stream& stream, duration dur) { + timer.expires_after(dur); + timer.async_wait(timeout_handler{&stream}); + } + + void cancel() { + timer.cancel(); + } + + private: + using Timer = boost::asio::basic_waitable_timer, executor_type>; + Timer timer; +}; + +} // namespace rgw