rgw: add BlockingAioThrottle

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2018-10-10 08:03:55 -04:00
parent 3d2d0892cc
commit 346b924cb2
7 changed files with 47 additions and 46 deletions

View File

@ -20,7 +20,7 @@
namespace rgw {
bool AioThrottle::waiter_ready() const
bool Throttle::waiter_ready() const
{
switch (waiter) {
case Wait::Available: return is_available();
@ -30,9 +30,9 @@ bool AioThrottle::waiter_ready() const
}
}
AioResultList AioThrottle::get(const RGWSI_RADOS::Obj& obj,
OpFunc&& f,
uint64_t cost, uint64_t id)
AioResultList BlockingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
OpFunc&& f,
uint64_t cost, uint64_t id)
{
auto p = std::make_unique<Pending>();
p->obj = obj;
@ -64,7 +64,7 @@ AioResultList AioThrottle::get(const RGWSI_RADOS::Obj& obj,
return std::move(completed);
}
void AioThrottle::put(AioResult& r)
void BlockingAioThrottle::put(AioResult& r)
{
auto& p = static_cast<Pending&>(r);
std::scoped_lock lock{mutex};
@ -80,13 +80,13 @@ void AioThrottle::put(AioResult& r)
}
}
AioResultList AioThrottle::poll()
AioResultList BlockingAioThrottle::poll()
{
std::unique_lock lock{mutex};
return std::move(completed);
}
AioResultList AioThrottle::wait()
AioResultList BlockingAioThrottle::wait()
{
std::unique_lock lock{mutex};
if (completed.empty() && !pending.empty()) {
@ -98,7 +98,7 @@ AioResultList AioThrottle::wait()
return std::move(completed);
}
AioResultList AioThrottle::drain()
AioResultList BlockingAioThrottle::drain()
{
std::unique_lock lock{mutex};
if (!pending.empty()) {

View File

@ -23,53 +23,57 @@
namespace rgw {
// a throttle for aio operations that enforces a maximum window on outstanding
// bytes. only supports a single waiter, so all public functions must be called
// from the same thread
class AioThrottle : public Aio {
class Throttle {
protected:
const uint64_t window;
uint64_t pending_size = 0;
AioResultList pending;
AioResultList completed;
bool is_available() const { return pending_size <= window; }
bool has_completion() const { return !completed.empty(); }
bool is_drained() const { return pending.empty(); }
struct Pending : AioResultEntry {
AioThrottle *parent = nullptr;
uint64_t cost = 0;
};
OwningList<Pending> pending;
AioResultList completed;
enum class Wait { None, Available, Completion, Drained };
Wait waiter = Wait::None;
bool waiter_ready() const;
ceph::mutex mutex = ceph::make_mutex("AioThrottle");
ceph::condition_variable cond;
public:
AioThrottle(uint64_t window) : window(window) {}
Throttle(uint64_t window) : window(window) {}
virtual ~AioThrottle() {
~Throttle() {
// must drain before destructing
ceph_assert(pending.empty());
ceph_assert(completed.empty());
}
};
AioResultList get(const RGWSI_RADOS::Obj& obj,
OpFunc&& f,
uint64_t cost, uint64_t id) override;
void put(AioResult& r) override;
// a throttle for aio operations. all public functions must be called from
// the same thread
class BlockingAioThrottle final : public Aio, private Throttle {
ceph::mutex mutex = ceph::make_mutex("AioThrottle");
ceph::condition_variable cond;
struct Pending : AioResultEntry {
BlockingAioThrottle *parent = nullptr;
uint64_t cost = 0;
librados::AioCompletion *completion = nullptr;
};
public:
BlockingAioThrottle(uint64_t window) : Throttle(window) {}
AioResultList poll() override;
AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f,
uint64_t cost, uint64_t id) override final;
AioResultList wait() override;
void put(AioResult& r) override final;
AioResultList drain() override;
AioResultList poll() override final;
AioResultList wait() override final;
AioResultList drain() override final;
};
} // namespace rgw

View File

@ -2351,7 +2351,7 @@ public:
const std::string& bucket_name;
const std::string& obj_name;
RGWFileHandle* rgw_fh;
std::optional<rgw::AioThrottle> aio;
std::optional<rgw::BlockingAioThrottle> aio;
std::optional<rgw::putobj::AtomicObjectProcessor> processor;
rgw::putobj::DataProcessor* filter;
boost::optional<RGWPutObj_Compress> compressor;

View File

@ -3636,7 +3636,7 @@ void RGWPutObj::execute()
}
// create the object processor
rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
constexpr auto max_processor_size = std::max({sizeof(MultipartObjectProcessor),
sizeof(AtomicObjectProcessor),
@ -3999,7 +3999,7 @@ void RGWPostObj::execute()
store->gen_rand_obj_instance_name(&obj);
}
rgw::AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
rgw::BlockingAioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
AtomicObjectProcessor processor(&aio, store, s->bucket_info,
@ -6745,10 +6745,9 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
rgw_placement_rule dest_placement = s->dest_placement;
dest_placement.inherit_from(binfo.placement_rule);
rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
AtomicObjectProcessor processor(&aio, store, binfo, &s->dest_placement, bowner.get_id(),
obj_ctx, obj, 0, s->req_id, this);

View File

@ -4278,7 +4278,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
set_mtime_weight.high_precision = high_precision_time;
int ret;
rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
AtomicObjectProcessor processor(&aio, this, dest_bucket_info, ptail_rule, user_id,
@ -4856,7 +4856,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
string tag;
append_rand_alpha(cct, tag, tag, 32);
rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
AtomicObjectProcessor processor(&aio, this, dest_bucket_info, &dest_placement,
dest_bucket_info.owner, obj_ctx,
@ -6826,7 +6826,7 @@ int RGWRados::Object::Read::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb)
const uint64_t chunk_size = cct->_conf->rgw_get_obj_max_req_size;
const uint64_t window_size = cct->_conf->rgw_get_obj_window_size;
rgw::AioThrottle aio(window_size);
rgw::BlockingAioThrottle aio(window_size);
get_obj_data data(store, cb, &aio, ofs);
int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj,

View File

@ -429,8 +429,7 @@ int RGWDataAccess::Object::put(bufferlist& data,
RGWBucketInfo& bucket_info = bucket->bucket_info;
using namespace rgw::putobj;
rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
RGWObjectCtx obj_ctx(store);
rgw_obj obj(bucket_info.bucket, key);
@ -439,6 +438,7 @@ int RGWDataAccess::Object::put(bufferlist& data,
string req_id = store->svc.zone_utils->unique_id(store->get_new_req_id());
using namespace rgw::putobj;
AtomicObjectProcessor processor(&aio, store, bucket_info,
nullptr,
owner.get_id(),
@ -448,8 +448,6 @@ int RGWDataAccess::Object::put(bufferlist& data,
if (ret < 0)
return ret;
using namespace rgw::putobj;
DataProcessor *filter = &processor;
CompressorRef plugin;

View File

@ -57,7 +57,7 @@ namespace rgw {
TEST_F(Aio_Throttle, NoThrottleUpToMax)
{
AioThrottle throttle(4);
BlockingAioThrottle throttle(4);
auto obj = make_obj(__PRETTY_FUNCTION__);
{
librados::ObjectWriteOperation op1;
@ -84,7 +84,7 @@ TEST_F(Aio_Throttle, NoThrottleUpToMax)
TEST_F(Aio_Throttle, CostOverWindow)
{
AioThrottle throttle(4);
BlockingAioThrottle throttle(4);
auto obj = make_obj(__PRETTY_FUNCTION__);
librados::ObjectWriteOperation op;
@ -96,7 +96,7 @@ TEST_F(Aio_Throttle, CostOverWindow)
TEST_F(Aio_Throttle, ThrottleOverMax)
{
constexpr uint64_t window = 4;
AioThrottle throttle(window);
BlockingAioThrottle throttle(window);
auto obj = make_obj(__PRETTY_FUNCTION__);