mirror of
https://github.com/ceph/ceph
synced 2025-02-23 19:17:37 +00:00
rgw/beast: replace beast::tcp_stream with manual timeouts
remove the beast::tcp_stream wrapper from the socket, and track timeouts manually with a timeout_timer. this timer uses ceph's coarse_mono_clock which is cheaper to sample than std::chrono::steady_clock Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
parent
0bee64d875
commit
3f853538b1
@ -21,7 +21,6 @@
|
||||
|
||||
#ifdef WITH_RADOSGW_BEAST_OPENSSL
|
||||
#include <boost/asio/ssl.hpp>
|
||||
#include <boost/beast/ssl/ssl_stream.hpp>
|
||||
#endif
|
||||
|
||||
#include "common/split.h"
|
||||
@ -31,6 +30,7 @@
|
||||
|
||||
#include "rgw_zone.h"
|
||||
|
||||
#include "rgw_asio_frontend_timer.h"
|
||||
#include "rgw_dmclock_async_scheduler.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
@ -49,6 +49,9 @@ using executor_type = boost::asio::io_context::executor_type;
|
||||
using tcp_socket = boost::asio::basic_stream_socket<tcp, executor_type>;
|
||||
using tcp_stream = boost::beast::basic_stream<tcp, executor_type>;
|
||||
|
||||
using timeout_timer = rgw::basic_timeout_timer<ceph::coarse_mono_clock,
|
||||
executor_type>;
|
||||
|
||||
using parse_buffer = boost::beast::flat_static_buffer<65536>;
|
||||
|
||||
// use mmap/mprotect to allocate 512k coroutine stacks
|
||||
@ -62,32 +65,35 @@ template <typename Stream>
|
||||
class StreamIO : public rgw::asio::ClientIO {
|
||||
CephContext* const cct;
|
||||
Stream& stream;
|
||||
timeout_timer& timeout;
|
||||
yield_context yield;
|
||||
parse_buffer& buffer;
|
||||
ceph::timespan request_timeout;
|
||||
public:
|
||||
StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
|
||||
yield_context yield, parse_buffer& buffer, bool is_ssl,
|
||||
StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout,
|
||||
rgw::asio::parser_type& parser, yield_context yield,
|
||||
parse_buffer& buffer, bool is_ssl,
|
||||
const tcp::endpoint& local_endpoint,
|
||||
const tcp::endpoint& remote_endpoint,
|
||||
ceph::timespan request_timeout)
|
||||
: ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
|
||||
cct(cct), stream(stream), yield(yield), buffer(buffer), request_timeout(request_timeout)
|
||||
cct(cct), stream(stream), timeout(timeout), yield(yield),
|
||||
buffer(buffer), request_timeout(request_timeout)
|
||||
{}
|
||||
|
||||
size_t write_data(const char* buf, size_t len) override {
|
||||
boost::system::error_code ec;
|
||||
auto& timeout = get_lowest_layer(stream);
|
||||
if (request_timeout.count()) {
|
||||
timeout.expires_after(request_timeout);
|
||||
timeout.expires_after(stream.lowest_layer(), request_timeout);
|
||||
}
|
||||
auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len),
|
||||
yield[ec]);
|
||||
timeout.cancel();
|
||||
if (ec) {
|
||||
ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
|
||||
if (ec==boost::asio::error::broken_pipe) {
|
||||
if (ec == boost::asio::error::broken_pipe) {
|
||||
boost::system::error_code ec_ignored;
|
||||
timeout.socket().shutdown(tcp_socket::shutdown_both, ec_ignored);
|
||||
stream.lowest_layer().shutdown(tcp_socket::shutdown_both, ec_ignored);
|
||||
}
|
||||
throw rgw::io::Exception(ec.value(), std::system_category());
|
||||
}
|
||||
@ -95,7 +101,6 @@ class StreamIO : public rgw::asio::ClientIO {
|
||||
}
|
||||
|
||||
size_t recv_body(char* buf, size_t max) override {
|
||||
auto& timeout = get_lowest_layer(stream);
|
||||
auto& message = parser.get();
|
||||
auto& body_remaining = message.body();
|
||||
body_remaining.data = buf;
|
||||
@ -104,9 +109,10 @@ class StreamIO : public rgw::asio::ClientIO {
|
||||
while (body_remaining.size && !parser.is_done()) {
|
||||
boost::system::error_code ec;
|
||||
if (request_timeout.count()) {
|
||||
timeout.expires_after(request_timeout);
|
||||
timeout.expires_after(stream.lowest_layer(), request_timeout);
|
||||
}
|
||||
http::async_read_some(stream, buffer, parser, yield[ec]);
|
||||
timeout.cancel();
|
||||
if (ec == http::error::need_buffer) {
|
||||
break;
|
||||
}
|
||||
@ -175,6 +181,7 @@ using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_t
|
||||
template <typename Stream>
|
||||
void handle_connection(boost::asio::io_context& context,
|
||||
RGWProcessEnv& env, Stream& stream,
|
||||
timeout_timer& timeout,
|
||||
parse_buffer& buffer, bool is_ssl,
|
||||
SharedMutex& pause_mutex,
|
||||
rgw::dmclock::Scheduler *scheduler,
|
||||
@ -195,12 +202,12 @@ void handle_connection(boost::asio::io_context& context,
|
||||
rgw::asio::parser_type parser;
|
||||
parser.header_limit(header_limit);
|
||||
parser.body_limit(body_limit);
|
||||
auto& timeout = get_lowest_layer(stream);
|
||||
if (request_timeout.count()) {
|
||||
timeout.expires_after(request_timeout);
|
||||
timeout.expires_after(stream.lowest_layer(), request_timeout);
|
||||
}
|
||||
// parse the header
|
||||
http::async_read_header(stream, buffer, parser, yield[ec]);
|
||||
timeout.cancel();
|
||||
if (ec == boost::asio::error::connection_reset ||
|
||||
ec == boost::asio::error::bad_descriptor ||
|
||||
ec == boost::asio::error::operation_aborted ||
|
||||
@ -219,9 +226,10 @@ void handle_connection(boost::asio::io_context& context,
|
||||
response.version(message.version() == 10 ? 10 : 11);
|
||||
response.prepare_payload();
|
||||
if (request_timeout.count()) {
|
||||
timeout.expires_after(request_timeout);
|
||||
timeout.expires_after(stream.lowest_layer(), request_timeout);
|
||||
}
|
||||
http::async_write(stream, response, yield[ec]);
|
||||
timeout.cancel();
|
||||
if (ec) {
|
||||
ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
|
||||
}
|
||||
@ -241,16 +249,16 @@ void handle_connection(boost::asio::io_context& context,
|
||||
// process the request
|
||||
RGWRequest req{env.store->get_new_req_id()};
|
||||
|
||||
auto& socket = get_lowest_layer(stream).socket();
|
||||
auto& socket = stream.lowest_layer();
|
||||
const auto& remote_endpoint = socket.remote_endpoint(ec);
|
||||
if (ec) {
|
||||
ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
|
||||
return;
|
||||
}
|
||||
|
||||
StreamIO real_client{cct, stream, parser, yield, buffer, is_ssl,
|
||||
socket.local_endpoint(),
|
||||
remote_endpoint,request_timeout};
|
||||
StreamIO real_client{cct, stream, timeout, parser, yield, buffer,
|
||||
is_ssl, socket.local_endpoint(),
|
||||
remote_endpoint, request_timeout};
|
||||
|
||||
auto real_client_io = rgw::io::add_reordering(
|
||||
rgw::io::add_buffering(cct,
|
||||
@ -299,9 +307,10 @@ void handle_connection(boost::asio::io_context& context,
|
||||
body.data = discard_buffer.data();
|
||||
|
||||
if (request_timeout.count()) {
|
||||
timeout.expires_after(request_timeout);
|
||||
timeout.expires_after(stream.lowest_layer(), request_timeout);
|
||||
}
|
||||
http::async_read_some(stream, buffer, parser, yield[ec]);
|
||||
timeout.cancel();
|
||||
if (ec == http::error::need_buffer) {
|
||||
continue;
|
||||
}
|
||||
@ -946,44 +955,44 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
|
||||
ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
|
||||
return;
|
||||
}
|
||||
auto socket = std::move(l.socket);
|
||||
tcp::no_delay options(l.use_nodelay);
|
||||
socket.set_option(options,ec);
|
||||
auto stream = std::move(l.socket);
|
||||
stream.set_option(tcp::no_delay(l.use_nodelay), ec);
|
||||
l.acceptor.async_accept(l.socket,
|
||||
[this, &l] (boost::system::error_code ec) {
|
||||
accept(l, ec);
|
||||
});
|
||||
|
||||
tcp_stream stream(std::move(socket));
|
||||
// spawn a coroutine to handle the connection
|
||||
#ifdef WITH_RADOSGW_BEAST_OPENSSL
|
||||
if (l.use_ssl) {
|
||||
spawn::spawn(context,
|
||||
[this, s=std::move(stream)] (yield_context yield) mutable {
|
||||
Connection conn{s.socket()};
|
||||
Connection conn{s};
|
||||
auto c = connections.add(conn);
|
||||
// wrap the tcp_stream in an ssl stream
|
||||
boost::beast::ssl_stream<tcp_stream&> stream{s, *ssl_context};
|
||||
// wrap the tcp stream in an ssl stream
|
||||
boost::asio::ssl::stream<tcp_socket&> stream{s, *ssl_context};
|
||||
auto timeout = timeout_timer{context.get_executor()};
|
||||
auto buffer = std::make_unique<parse_buffer>();
|
||||
// do ssl handshake
|
||||
boost::system::error_code ec;
|
||||
if (request_timeout.count()) {
|
||||
get_lowest_layer(stream).expires_after(request_timeout);
|
||||
timeout.expires_after(s, request_timeout);
|
||||
}
|
||||
auto bytes = stream.async_handshake(ssl::stream_base::server,
|
||||
buffer->data(), yield[ec]);
|
||||
timeout.cancel();
|
||||
if (ec) {
|
||||
ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
|
||||
return;
|
||||
}
|
||||
buffer->consume(bytes);
|
||||
handle_connection(context, env, stream, *buffer, true, pause_mutex,
|
||||
handle_connection(context, env, stream, timeout, *buffer, true, pause_mutex,
|
||||
scheduler.get(), ec, yield, request_timeout);
|
||||
if (!ec) {
|
||||
// ssl shutdown (ignoring errors)
|
||||
stream.async_shutdown(yield[ec]);
|
||||
}
|
||||
s.socket().shutdown(tcp_socket::shutdown_both, ec);
|
||||
s.shutdown(tcp::socket::shutdown_both, ec);
|
||||
}, make_stack_allocator());
|
||||
} else {
|
||||
#else
|
||||
@ -991,13 +1000,14 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
|
||||
#endif // WITH_RADOSGW_BEAST_OPENSSL
|
||||
spawn::spawn(context,
|
||||
[this, s=std::move(stream)] (yield_context yield) mutable {
|
||||
Connection conn{s.socket()};
|
||||
Connection conn{s};
|
||||
auto c = connections.add(conn);
|
||||
auto timeout = timeout_timer{context.get_executor()};
|
||||
auto buffer = std::make_unique<parse_buffer>();
|
||||
boost::system::error_code ec;
|
||||
handle_connection(context, env, s, *buffer, false, pause_mutex,
|
||||
handle_connection(context, env, s, timeout, *buffer, false, pause_mutex,
|
||||
scheduler.get(), ec, yield, request_timeout);
|
||||
s.socket().shutdown(tcp_socket::shutdown_both, ec);
|
||||
s.shutdown(tcp_socket::shutdown_both, ec);
|
||||
}, make_stack_allocator());
|
||||
}
|
||||
}
|
||||
|
53
src/rgw/rgw_asio_frontend_timer.h
Normal file
53
src/rgw/rgw_asio_frontend_timer.h
Normal file
@ -0,0 +1,53 @@
|
||||
#pragma once
|
||||
|
||||
#include <boost/asio/basic_waitable_timer.hpp>
|
||||
|
||||
#include "common/ceph_time.h"
|
||||
|
||||
namespace rgw {
|
||||
|
||||
// a WaitHandler that closes a stream if the timeout expires
|
||||
template <typename Stream>
|
||||
struct timeout_handler {
|
||||
Stream* stream;
|
||||
|
||||
explicit timeout_handler(Stream* stream) noexcept : stream(stream) {}
|
||||
|
||||
void operator()(boost::system::error_code ec) {
|
||||
if (!ec) { // wait was not canceled
|
||||
boost::system::error_code ec_ignored;
|
||||
stream->close(ec_ignored);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// a timeout timer for stream operations
|
||||
template <typename Clock, typename Executor>
|
||||
class basic_timeout_timer {
|
||||
public:
|
||||
using clock_type = Clock;
|
||||
using duration = typename clock_type::duration;
|
||||
using executor_type = Executor;
|
||||
|
||||
explicit basic_timeout_timer(const executor_type& ex) : timer(ex) {}
|
||||
|
||||
basic_timeout_timer(const basic_timeout_timer&) = delete;
|
||||
basic_timeout_timer& operator=(const basic_timeout_timer&) = delete;
|
||||
|
||||
template <typename Stream>
|
||||
void expires_after(Stream& stream, duration dur) {
|
||||
timer.expires_after(dur);
|
||||
timer.async_wait(timeout_handler{&stream});
|
||||
}
|
||||
|
||||
void cancel() {
|
||||
timer.cancel();
|
||||
}
|
||||
|
||||
private:
|
||||
using Timer = boost::asio::basic_waitable_timer<clock_type,
|
||||
boost::asio::wait_traits<clock_type>, executor_type>;
|
||||
Timer timer;
|
||||
};
|
||||
|
||||
} // namespace rgw
|
Loading…
Reference in New Issue
Block a user