mirror of
https://github.com/ceph/ceph
synced 2025-02-23 11:07:35 +00:00
rgw: beast frontend uses async SharedMutex for pause
the strategy for pause relied on stopping the io_context and waiting for io_context.run() to return control to all of the worker threads. this relies on the fact that process_request() is completely synchronous (so considered a single unit of work in the io_context) - otherwise, pause could complete in the middle of a call to process_request(), and destroy the RGWRados instance while it's still in use calling io_context.stop() to pause the worker threads also assumes that no other work will be scheduled on these threads to decouple pause from worker threads, handle_connection() now uses an async shared mutex to synchronize with pause/unpause Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
parent
539c675db9
commit
378b01064c
@ -2,14 +2,14 @@
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
|
||||
#include "common/async/shared_mutex.h"
|
||||
|
||||
#include "rgw_asio_client.h"
|
||||
#include "rgw_asio_frontend.h"
|
||||
|
||||
@ -21,49 +21,6 @@
|
||||
|
||||
namespace {
|
||||
|
||||
class Pauser {
|
||||
std::mutex mutex;
|
||||
std::condition_variable cond_ready; // signaled on ready==true
|
||||
std::condition_variable cond_paused; // signaled on waiters==thread_count
|
||||
bool ready{false};
|
||||
int waiters{0};
|
||||
public:
|
||||
template <typename Func>
|
||||
void pause(int thread_count, Func&& func);
|
||||
void unpause();
|
||||
void wait();
|
||||
};
|
||||
|
||||
template <typename Func>
|
||||
void Pauser::pause(int thread_count, Func&& func)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
ready = false;
|
||||
lock.unlock();
|
||||
|
||||
func();
|
||||
|
||||
// wait for all threads to pause
|
||||
lock.lock();
|
||||
cond_paused.wait(lock, [=] { return waiters == thread_count; });
|
||||
}
|
||||
|
||||
void Pauser::unpause()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
ready = true;
|
||||
cond_ready.notify_all();
|
||||
}
|
||||
|
||||
void Pauser::wait()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
++waiters;
|
||||
cond_paused.notify_one(); // notify pause() that we're waiting
|
||||
cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
|
||||
--waiters;
|
||||
}
|
||||
|
||||
using tcp = boost::asio::ip::tcp;
|
||||
namespace beast = boost::beast;
|
||||
#ifdef WITH_RADOSGW_BEAST_OPENSSL
|
||||
@ -115,9 +72,12 @@ class StreamIO : public rgw::asio::ClientIO {
|
||||
}
|
||||
};
|
||||
|
||||
using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
|
||||
|
||||
template <typename Stream>
|
||||
void handle_connection(RGWProcessEnv& env, Stream& stream,
|
||||
beast::flat_buffer& buffer, bool is_ssl,
|
||||
SharedMutex& pause_mutex,
|
||||
boost::system::error_code& ec,
|
||||
boost::asio::yield_context yield)
|
||||
{
|
||||
@ -159,22 +119,32 @@ void handle_connection(RGWProcessEnv& env, Stream& stream,
|
||||
return;
|
||||
}
|
||||
|
||||
// process the request
|
||||
RGWRequest req{env.store->get_new_req_id()};
|
||||
{
|
||||
auto lock = pause_mutex.async_lock_shared(yield[ec]);
|
||||
if (ec == boost::asio::error::operation_aborted) {
|
||||
return;
|
||||
} else if (ec) {
|
||||
ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
|
||||
return;
|
||||
}
|
||||
|
||||
auto& socket = stream.lowest_layer();
|
||||
StreamIO real_client{stream, parser, buffer, is_ssl,
|
||||
socket.local_endpoint(),
|
||||
socket.remote_endpoint()};
|
||||
// process the request
|
||||
RGWRequest req{env.store->get_new_req_id()};
|
||||
|
||||
auto real_client_io = rgw::io::add_reordering(
|
||||
rgw::io::add_buffering(cct,
|
||||
rgw::io::add_chunking(
|
||||
rgw::io::add_conlen_controlling(
|
||||
&real_client))));
|
||||
RGWRestfulIO client(cct, &real_client_io);
|
||||
process_request(env.store, env.rest, &req, env.uri_prefix,
|
||||
*env.auth_registry, &client, env.olog);
|
||||
auto& socket = stream.lowest_layer();
|
||||
StreamIO real_client{stream, parser, buffer, is_ssl,
|
||||
socket.local_endpoint(),
|
||||
socket.remote_endpoint()};
|
||||
|
||||
auto real_client_io = rgw::io::add_reordering(
|
||||
rgw::io::add_buffering(cct,
|
||||
rgw::io::add_chunking(
|
||||
rgw::io::add_conlen_controlling(
|
||||
&real_client))));
|
||||
RGWRestfulIO client(cct, &real_client_io);
|
||||
process_request(env.store, env.rest, &req, env.uri_prefix,
|
||||
*env.auth_registry, &client, env.olog);
|
||||
}
|
||||
|
||||
if (!parser.keep_alive()) {
|
||||
return;
|
||||
@ -205,11 +175,12 @@ void handle_connection(RGWProcessEnv& env, Stream& stream,
|
||||
class AsioFrontend {
|
||||
RGWProcessEnv env;
|
||||
RGWFrontendConfig* conf;
|
||||
boost::asio::io_service service;
|
||||
boost::asio::io_context context;
|
||||
#ifdef WITH_RADOSGW_BEAST_OPENSSL
|
||||
boost::optional<ssl::context> ssl_context;
|
||||
int init_ssl();
|
||||
#endif
|
||||
SharedMutex pause_mutex;
|
||||
|
||||
struct Listener {
|
||||
tcp::endpoint endpoint;
|
||||
@ -217,13 +188,16 @@ class AsioFrontend {
|
||||
tcp::socket socket;
|
||||
bool use_ssl = false;
|
||||
|
||||
explicit Listener(boost::asio::io_service& service)
|
||||
: acceptor(service), socket(service) {}
|
||||
explicit Listener(boost::asio::io_context& context)
|
||||
: acceptor(context), socket(context) {}
|
||||
};
|
||||
std::vector<Listener> listeners;
|
||||
|
||||
// work guard to keep run() threads busy while listeners are paused
|
||||
using Executor = boost::asio::io_context::executor_type;
|
||||
std::optional<boost::asio::executor_work_guard<Executor>> work;
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
Pauser pauser;
|
||||
std::atomic<bool> going_down{false};
|
||||
|
||||
CephContext* ctx() const { return env.store->ctx(); }
|
||||
@ -232,7 +206,9 @@ class AsioFrontend {
|
||||
|
||||
public:
|
||||
AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf)
|
||||
: env(env), conf(conf) {}
|
||||
: env(env), conf(conf),
|
||||
pause_mutex(context.get_executor())
|
||||
{}
|
||||
|
||||
int init();
|
||||
int run();
|
||||
@ -293,7 +269,7 @@ int AsioFrontend::init()
|
||||
lderr(ctx()) << "failed to parse port=" << i->second << dendl;
|
||||
return -ec.value();
|
||||
}
|
||||
listeners.emplace_back(service);
|
||||
listeners.emplace_back(context);
|
||||
listeners.back().endpoint.port(port);
|
||||
}
|
||||
|
||||
@ -304,7 +280,7 @@ int AsioFrontend::init()
|
||||
lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
|
||||
return -ec.value();
|
||||
}
|
||||
listeners.emplace_back(service);
|
||||
listeners.emplace_back(context);
|
||||
listeners.back().endpoint = endpoint;
|
||||
}
|
||||
|
||||
@ -391,7 +367,7 @@ int AsioFrontend::init_ssl()
|
||||
lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
|
||||
return -ec.value();
|
||||
}
|
||||
listeners.emplace_back(service);
|
||||
listeners.emplace_back(context);
|
||||
listeners.back().endpoint.port(port);
|
||||
listeners.back().use_ssl = true;
|
||||
}
|
||||
@ -407,7 +383,7 @@ int AsioFrontend::init_ssl()
|
||||
lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
|
||||
return -ec.value();
|
||||
}
|
||||
listeners.emplace_back(service);
|
||||
listeners.emplace_back(context);
|
||||
listeners.back().endpoint = endpoint;
|
||||
listeners.back().use_ssl = true;
|
||||
}
|
||||
@ -433,7 +409,7 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
|
||||
// spawn a coroutine to handle the connection
|
||||
#ifdef WITH_RADOSGW_BEAST_OPENSSL
|
||||
if (l.use_ssl) {
|
||||
boost::asio::spawn(service,
|
||||
boost::asio::spawn(context,
|
||||
[this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
|
||||
// wrap the socket in an ssl stream
|
||||
ssl::stream<tcp::socket&> stream{s, *ssl_context};
|
||||
@ -447,7 +423,7 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
|
||||
return;
|
||||
}
|
||||
buffer.consume(bytes);
|
||||
handle_connection(env, stream, buffer, true, ec, yield);
|
||||
handle_connection(env, stream, buffer, true, pause_mutex, ec, yield);
|
||||
if (!ec) {
|
||||
// ssl shutdown (ignoring errors)
|
||||
stream.async_shutdown(yield[ec]);
|
||||
@ -458,11 +434,11 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
|
||||
#else
|
||||
{
|
||||
#endif // WITH_RADOSGW_BEAST_OPENSSL
|
||||
boost::asio::spawn(service,
|
||||
boost::asio::spawn(context,
|
||||
[this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
|
||||
beast::flat_buffer buffer;
|
||||
boost::system::error_code ec;
|
||||
handle_connection(env, s, buffer, false, ec, yield);
|
||||
handle_connection(env, s, buffer, false, pause_mutex, ec, yield);
|
||||
s.shutdown(tcp::socket::shutdown_both, ec);
|
||||
});
|
||||
}
|
||||
@ -476,15 +452,14 @@ int AsioFrontend::run()
|
||||
|
||||
ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
|
||||
|
||||
// the worker threads call io_context::run(), which will return when there's
|
||||
// no work left. hold a work guard to keep these threads going until join()
|
||||
work.emplace(boost::asio::make_work_guard(context));
|
||||
|
||||
for (int i = 0; i < thread_count; i++) {
|
||||
threads.emplace_back([=] {
|
||||
for (;;) {
|
||||
service.run();
|
||||
if (going_down) {
|
||||
break;
|
||||
}
|
||||
pauser.wait();
|
||||
}
|
||||
boost::system::error_code ec;
|
||||
context.run(ec);
|
||||
});
|
||||
}
|
||||
return 0;
|
||||
@ -503,7 +478,7 @@ void AsioFrontend::stop()
|
||||
}
|
||||
|
||||
// unblock the run() threads
|
||||
service.stop();
|
||||
context.stop(); // XXX: kill connections instead
|
||||
}
|
||||
|
||||
void AsioFrontend::join()
|
||||
@ -511,6 +486,8 @@ void AsioFrontend::join()
|
||||
if (!going_down) {
|
||||
stop();
|
||||
}
|
||||
work.reset();
|
||||
|
||||
ldout(ctx(), 4) << "frontend joining threads..." << dendl;
|
||||
for (auto& thread : threads) {
|
||||
thread.join();
|
||||
@ -520,12 +497,22 @@ void AsioFrontend::join()
|
||||
|
||||
void AsioFrontend::pause()
|
||||
{
|
||||
ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
|
||||
pauser.pause(threads.size(), [=] {
|
||||
// unblock the run() threads
|
||||
service.stop();
|
||||
});
|
||||
ldout(ctx(), 4) << "frontend paused" << dendl;
|
||||
ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
|
||||
|
||||
// cancel pending calls to accept(), but don't close the sockets
|
||||
boost::system::error_code ec;
|
||||
for (auto& l : listeners) {
|
||||
l.acceptor.cancel(ec);
|
||||
}
|
||||
|
||||
// pause and wait for outstanding requests to complete
|
||||
pause_mutex.lock(ec);
|
||||
|
||||
if (ec) {
|
||||
ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
|
||||
} else {
|
||||
ldout(ctx(), 4) << "frontend paused" << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
void AsioFrontend::unpause(RGWRados* const store,
|
||||
@ -533,9 +520,19 @@ void AsioFrontend::unpause(RGWRados* const store,
|
||||
{
|
||||
env.store = store;
|
||||
env.auth_registry = std::move(auth_registry);
|
||||
|
||||
// unpause to unblock connections
|
||||
pause_mutex.unlock();
|
||||
|
||||
// start accepting connections again
|
||||
for (auto& l : listeners) {
|
||||
l.acceptor.async_accept(l.socket,
|
||||
[this, &l] (boost::system::error_code ec) {
|
||||
accept(l, ec);
|
||||
});
|
||||
}
|
||||
|
||||
ldout(ctx(), 4) << "frontend unpaused" << dendl;
|
||||
service.reset();
|
||||
pauser.unpause();
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
Loading…
Reference in New Issue
Block a user