Merge pull request #50347 from cbodley/wip-rgw-sal-aio

rgw/aio: remove RGWSI_RADOS from generic Aio::get()

Reviewed-by: Adam C. Emerson <aemerson@redhat.com>
This commit is contained in:
Casey Bodley 2023-03-14 09:40:45 -04:00 committed by GitHub
commit a22b26eade
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 86 additions and 116 deletions

View File

@ -12,5 +12,4 @@ tasks:
- cls/test_cls_2pc_queue.sh
- rgw/test_rgw_gc_log.sh
- rgw/test_rgw_obj.sh
- rgw/test_rgw_throttle.sh
- rgw/test_librgw_file.sh

View File

@ -1,5 +0,0 @@
#!/bin/sh -e
ceph_test_rgw_throttle
exit 0

View File

@ -207,7 +207,8 @@ int D3nRGWDataCache<T>::get_obj_iterate_cb(const DoutPrefixProvider *dpp, const
const uint64_t cost = len;
const uint64_t id = obj_ofs; // use logical object offset for sorting replies
auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id);
auto& ref = obj.get_ref();
auto completed = d->aio->get(ref.obj, rgw::Aio::librados_op(ref.pool.ioctx(), std::move(op), d->yield), cost, id);
return d->flush(std::move(completed));
} else {
ldpp_dout(dpp, 20) << "D3nDataCache::" << __func__ << "(): oid=" << read_obj.oid << ", is_head_obj=" << is_head_obj << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << ", len=" << len << dendl;
@ -225,13 +226,14 @@ int D3nRGWDataCache<T>::get_obj_iterate_cb(const DoutPrefixProvider *dpp, const
lsubdout(g_ceph_context, rgw, 0) << "D3nDataCache: Error: failed to open rados context for " << read_obj << ", r=" << r << dendl;
return r;
}
auto& ref = obj.get_ref();
const bool is_compressed = (astate->attrset.find(RGW_ATTR_COMPRESSION) != astate->attrset.end());
const bool is_encrypted = (astate->attrset.find(RGW_ATTR_CRYPT_MODE) != astate->attrset.end());
if (read_ofs != 0 || astate->size != astate->accounted_size || is_compressed || is_encrypted) {
d->d3n_bypass_cache_write = true;
lsubdout(g_ceph_context, rgw, 5) << "D3nDataCache: " << __func__ << "(): Note - bypassing datacache: oid=" << read_obj.oid << ", read_ofs!=0 = " << read_ofs << ", size=" << astate->size << " != accounted_size=" << astate->accounted_size << ", is_compressed=" << is_compressed << ", is_encrypted=" << is_encrypted << dendl;
auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id);
auto completed = d->aio->get(ref.obj, rgw::Aio::librados_op(ref.pool.ioctx(), std::move(op), d->yield), cost, id);
r = d->flush(std::move(completed));
return r;
}
@ -239,7 +241,7 @@ int D3nRGWDataCache<T>::get_obj_iterate_cb(const DoutPrefixProvider *dpp, const
if (d->rgwrados->d3n_data_cache->get(oid, len)) {
// Read From Cache
ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): READ FROM CACHE: oid=" << read_obj.oid << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << ", len=" << len << dendl;
auto completed = d->aio->get(obj, rgw::Aio::d3n_cache_op(dpp, d->yield, read_ofs, len, d->rgwrados->d3n_data_cache->cache_location), cost, id);
auto completed = d->aio->get(ref.obj, rgw::Aio::d3n_cache_op(dpp, d->yield, read_ofs, len, d->rgwrados->d3n_data_cache->cache_location), cost, id);
r = d->flush(std::move(completed));
if (r < 0) {
lsubdout(g_ceph_context, rgw, 0) << "D3nDataCache: " << __func__ << "(): Error: failed to drain/flush, r= " << r << dendl;
@ -248,7 +250,7 @@ int D3nRGWDataCache<T>::get_obj_iterate_cb(const DoutPrefixProvider *dpp, const
} else {
// Write To Cache
ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): WRITE TO CACHE: oid=" << read_obj.oid << ", obj-ofs=" << obj_ofs << ", read_ofs=" << read_ofs << " len=" << len << dendl;
auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id);
auto completed = d->aio->get(ref.obj, rgw::Aio::librados_op(ref.pool.ioctx(), std::move(op), d->yield), cost, id);
return d->flush(std::move(completed));
}
}

View File

@ -70,7 +70,7 @@ static int process_completed(const AioResultList& completed, RawObjSet *written)
std::optional<int> error;
for (auto& r : completed) {
if (r.result >= 0) {
written->insert(r.obj.get_ref().obj);
written->insert(r.obj);
} else if (!error) { // record first error code
error = r.result;
}
@ -110,7 +110,8 @@ int RadosWriter::process(bufferlist&& bl, uint64_t offset)
op.write(offset, data);
}
constexpr uint64_t id = 0; // unused
auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
auto& ref = stripe_obj.get_ref();
auto c = aio->get(ref.obj, Aio::librados_op(ref.pool.ioctx(), std::move(op), y), cost, id);
return process_completed(c, &written);
}
@ -124,7 +125,8 @@ int RadosWriter::write_exclusive(const bufferlist& data)
op.write_full(data);
constexpr uint64_t id = 0; // unused
auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
auto& ref = stripe_obj.get_ref();
auto c = aio->get(ref.obj, Aio::librados_op(ref.pool.ioctx(), std::move(op), y), cost, id);
auto d = aio->drain();
c.splice(c.end(), d);
return process_completed(c, &written);

View File

@ -4506,7 +4506,8 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
static constexpr uint64_t cost = 1; // 1 throttle unit per request
static constexpr uint64_t id = 0; // ids unused
rgw::AioResultList completed = aio->get(obj, rgw::Aio::librados_op(std::move(op), y), cost, id);
auto& ref = obj.get_ref();
rgw::AioResultList completed = aio->get(ref.obj, rgw::Aio::librados_op(ref.pool.ioctx(), std::move(op), y), cost, id);
ret = rgw::check_for_errors(completed);
all_results.splice(all_results.end(), completed);
if (ret < 0) {
@ -4573,12 +4574,19 @@ done_ret:
if (r.result < 0) {
continue; // skip errors
}
auto obj = svc.rados->obj(r.obj);
ret2 = obj.open(dpp);
if (ret2 < 0) {
continue;
}
auto& ref = obj.get_ref();
ObjectWriteOperation op;
cls_refcount_put(op, ref_tag, true);
static constexpr uint64_t cost = 1; // 1 throttle unit per request
static constexpr uint64_t id = 0; // ids unused
rgw::AioResultList completed = aio->get(r.obj, rgw::Aio::librados_op(std::move(op), y), cost, id);
rgw::AioResultList completed = aio->get(ref.obj, rgw::Aio::librados_op(ref.pool.ioctx(), std::move(op), y), cost, id);
ret2 = rgw::check_for_errors(completed);
if (ret2 < 0) {
ldpp_dout(dpp, 0) << "ERROR: cleanup after error failed to drop reference on obj=" << r.obj << dendl;
@ -4611,14 +4619,11 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
string tag;
append_rand_alpha(cct, tag, tag, 32);
rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
auto aio = rgw::make_throttle(cct->_conf->rgw_put_obj_min_window_size, y);
using namespace rgw::putobj;
// do not change the null_yield in the initialization of this AtomicObjectProcessor
// it causes crashes in the ragweed tests
AtomicObjectProcessor processor(&aio, this, dest_bucket_info, &dest_placement,
dest_bucket_info.owner, obj_ctx,
dest_obj, olh_epoch, tag,
dpp, null_yield);
AtomicObjectProcessor processor(aio.get(), this, dest_bucket_info,
&dest_placement, dest_bucket_info.owner,
obj_ctx, dest_obj, olh_epoch, tag, dpp, y);
int ret = processor.prepare(y);
if (ret < 0)
return ret;
@ -6527,7 +6532,7 @@ int get_obj_data::flush(rgw::AioResultList&& results) {
if (rgwrados->get_use_datacache()) {
const std::lock_guard l(d3n_get_data.d3n_lock);
auto oid = completed.front().obj.get_ref().obj.oid;
auto oid = completed.front().obj.oid;
if (bl.length() <= g_conf()->rgw_get_obj_max_req_size && !d3n_bypass_cache_write) {
lsubdout(g_ceph_context, rgw_datacache, 10) << "D3nDataCache: " << __func__ << "(): bl.length <= rgw_get_obj_max_req_size (default 4MB) - write to datacache, bl.length=" << bl.length() << dendl;
rgwrados->d3n_data_cache->put(bl, bl.length(), oid);
@ -6595,7 +6600,8 @@ int RGWRados::get_obj_iterate_cb(const DoutPrefixProvider *dpp,
const uint64_t cost = len;
const uint64_t id = obj_ofs; // use logical object offset for sorting replies
auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id);
auto& ref = obj.get_ref();
auto completed = d->aio->get(ref.obj, rgw::Aio::librados_op(ref.pool.ioctx(), std::move(op), d->yield), cost, id);
return d->flush(std::move(completed));
}

View File

@ -28,42 +28,49 @@ void cb(librados::completion_t, void* arg);
struct state {
Aio* aio;
librados::IoCtx ctx;
librados::AioCompletion* c;
state(Aio* aio, AioResult& r)
: aio(aio),
state(Aio* aio, librados::IoCtx ctx, AioResult& r)
: aio(aio), ctx(std::move(ctx)),
c(librados::Rados::aio_create_completion(&r, &cb)) {}
};
void cb(librados::completion_t, void* arg) {
static_assert(sizeof(AioResult::user_data) >= sizeof(state));
static_assert(std::is_trivially_destructible_v<state>);
auto& r = *(static_cast<AioResult*>(arg));
auto s = reinterpret_cast<state*>(&r.user_data);
r.result = s->c->get_return_value();
s->c->release();
s->aio->put(r);
Aio* aio = s->aio;
// manually destroy the state that was constructed with placement new
s->~state();
aio->put(r);
}
template <typename Op>
Aio::OpFunc aio_abstract(Op&& op) {
return [op = std::move(op)] (Aio* aio, AioResult& r) mutable {
Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op) {
return [ctx = std::move(ctx), op = std::move(op)] (Aio* aio, AioResult& r) mutable {
constexpr bool read = std::is_same_v<std::decay_t<Op>, librados::ObjectReadOperation>;
auto s = new (&r.user_data) state(aio, r);
// use placement new to construct the rados state inside of user_data
auto s = new (&r.user_data) state(aio, ctx, r);
if constexpr (read) {
r.result = r.obj.aio_operate(s->c, &op, &r.data);
r.result = ctx.aio_operate(r.obj.oid, s->c, &op, &r.data);
} else {
r.result = r.obj.aio_operate(s->c, &op);
r.result = ctx.aio_operate(r.obj.oid, s->c, &op);
}
if (r.result < 0) {
// cb() won't be called, so release everything here
s->c->release();
aio->put(r);
s->~state();
}
};
}
struct Handler {
Aio* throttle = nullptr;
librados::IoCtx ctx;
AioResult& r;
// write callback
void operator()(boost::system::error_code ec) const {
@ -79,18 +86,18 @@ struct Handler {
};
template <typename Op>
Aio::OpFunc aio_abstract(Op&& op, boost::asio::io_context& context,
Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op,
boost::asio::io_context& context,
yield_context yield) {
return [op = std::move(op), &context, yield] (Aio* aio, AioResult& r) mutable {
return [ctx = std::move(ctx), op = std::move(op), &context, yield] (Aio* aio, AioResult& r) mutable {
// arrange for the completion Handler to run on the yield_context's strand
// executor so it can safely call back into Aio without locking
using namespace boost::asio;
async_completion<yield_context, void()> init(yield);
auto ex = get_associated_executor(init.completion_handler);
auto& ref = r.obj.get_ref();
librados::async_operate(context, ref.pool.ioctx(), ref.obj.oid, &op, 0,
bind_executor(ex, Handler{aio, r}));
librados::async_operate(context, ctx, r.obj.oid, &op, 0,
bind_executor(ex, Handler{aio, ctx, r}));
};
}
@ -99,35 +106,36 @@ Aio::OpFunc d3n_cache_aio_abstract(const DoutPrefixProvider *dpp, optional_yield
return [dpp, y, read_ofs, read_len, location] (Aio* aio, AioResult& r) mutable {
// d3n data cache requires yield context (rgw_beast_enable_async=true)
ceph_assert(y);
auto& ref = r.obj.get_ref();
auto c = std::make_unique<D3nL1CacheRequest>();
lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: d3n_cache_aio_abstract(): libaio Read From Cache, oid=" << ref.obj.oid << dendl;
lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: d3n_cache_aio_abstract(): libaio Read From Cache, oid=" << r.obj.oid << dendl;
c->file_aio_read_abstract(dpp, y.get_io_context(), y.get_yield_context(), location, read_ofs, read_len, aio, r);
};
}
template <typename Op>
Aio::OpFunc aio_abstract(Op&& op, optional_yield y) {
Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, optional_yield y) {
static_assert(std::is_base_of_v<librados::ObjectOperation, std::decay_t<Op>>);
static_assert(!std::is_lvalue_reference_v<Op>);
static_assert(!std::is_const_v<Op>);
if (y) {
return aio_abstract(std::forward<Op>(op), y.get_io_context(),
y.get_yield_context());
return aio_abstract(std::move(ctx), std::forward<Op>(op),
y.get_io_context(), y.get_yield_context());
}
return aio_abstract(std::forward<Op>(op));
return aio_abstract(std::move(ctx), std::forward<Op>(op));
}
} // anonymous namespace
Aio::OpFunc Aio::librados_op(librados::ObjectReadOperation&& op,
Aio::OpFunc Aio::librados_op(librados::IoCtx ctx,
librados::ObjectReadOperation&& op,
optional_yield y) {
return aio_abstract(std::move(op), y);
return aio_abstract(std::move(ctx), std::move(op), y);
}
Aio::OpFunc Aio::librados_op(librados::ObjectWriteOperation&& op,
Aio::OpFunc Aio::librados_op(librados::IoCtx ctx,
librados::ObjectWriteOperation&& op,
optional_yield y) {
return aio_abstract(std::move(op), y);
return aio_abstract(std::move(ctx), std::move(op), y);
}
Aio::OpFunc Aio::d3n_cache_op(const DoutPrefixProvider *dpp, optional_yield y,

View File

@ -23,8 +23,6 @@
#include "include/rados/librados_fwd.hpp"
#include "common/async/yield_context.h"
#include "services/svc_rados.h" // cant forward declare RGWSI_RADOS::Obj
#include "rgw_common.h"
#include "include/function2.hpp"
@ -34,7 +32,7 @@ struct D3nGetObjData;
namespace rgw {
struct AioResult {
RGWSI_RADOS::Obj obj;
rgw_raw_obj obj;
uint64_t id = 0; // id allows caller to associate a result with its request
bufferlist data; // result buffer for reads
int result = 0;
@ -79,7 +77,7 @@ class Aio {
virtual ~Aio() {}
virtual AioResultList get(const RGWSI_RADOS::Obj& obj,
virtual AioResultList get(rgw_raw_obj obj,
OpFunc&& f,
uint64_t cost, uint64_t id) = 0;
virtual void put(AioResult& r) = 0;
@ -93,9 +91,11 @@ class Aio {
// wait for all outstanding completions and return their results
virtual AioResultList drain() = 0;
static OpFunc librados_op(librados::ObjectReadOperation&& op,
static OpFunc librados_op(librados::IoCtx ctx,
librados::ObjectReadOperation&& op,
optional_yield y);
static OpFunc librados_op(librados::ObjectWriteOperation&& op,
static OpFunc librados_op(librados::IoCtx ctx,
librados::ObjectWriteOperation&& op,
optional_yield y);
static OpFunc d3n_cache_op(const DoutPrefixProvider *dpp, optional_yield y,
off_t read_ofs, off_t read_len, std::string& location);

View File

@ -13,8 +13,6 @@
*
*/
#include "include/rados/librados.hpp"
#include "rgw_aio_throttle.h"
namespace rgw {
@ -29,12 +27,12 @@ bool Throttle::waiter_ready() const
}
}
AioResultList BlockingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
AioResultList BlockingAioThrottle::get(rgw_raw_obj obj,
OpFunc&& f,
uint64_t cost, uint64_t id)
{
auto p = std::make_unique<Pending>();
p->obj = obj;
p->obj = std::move(obj);
p->id = id;
p->cost = cost;
@ -120,12 +118,12 @@ auto YieldingAioThrottle::async_wait(CompletionToken&& token)
return init.result.get();
}
AioResultList YieldingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
AioResultList YieldingAioThrottle::get(rgw_raw_obj obj,
OpFunc&& f,
uint64_t cost, uint64_t id)
{
auto p = std::make_unique<Pending>();
p->obj = obj;
p->obj = std::move(obj);
p->id = id;
p->cost = cost;

View File

@ -15,12 +15,10 @@
#pragma once
#include "include/rados/librados_fwd.hpp"
#include <memory>
#include "common/ceph_mutex.h"
#include "common/async/completion.h"
#include "common/async/yield_context.h"
#include "services/svc_rados.h"
#include "rgw_aio.h"
namespace rgw {
@ -61,14 +59,13 @@ class BlockingAioThrottle final : public Aio, private Throttle {
struct Pending : AioResultEntry {
BlockingAioThrottle *parent = nullptr;
uint64_t cost = 0;
librados::AioCompletion *completion = nullptr;
};
public:
BlockingAioThrottle(uint64_t window) : Throttle(window) {}
virtual ~BlockingAioThrottle() override {};
AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f,
AioResultList get(rgw_raw_obj obj, OpFunc&& f,
uint64_t cost, uint64_t id) override final;
void put(AioResult& r) override final;
@ -104,7 +101,7 @@ class YieldingAioThrottle final : public Aio, private Throttle {
virtual ~YieldingAioThrottle() override {};
AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f,
AioResultList get(rgw_raw_obj obj, OpFunc&& f,
uint64_t cost, uint64_t id) override final;
void put(AioResult& r) override final;

View File

@ -137,9 +137,8 @@ struct D3nL1CacheRequest {
async_completion<yield_context, void()> init(yield);
auto ex = get_associated_executor(init.completion_handler);
auto& ref = r.obj.get_ref();
ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): oid=" << ref.obj.oid << dendl;
async_read(dpp, context, file_path+"/"+ref.obj.oid, read_ofs, read_len, bind_executor(ex, d3n_libaio_handler{aio, r}));
ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): oid=" << r.obj.oid << dendl;
async_read(dpp, context, file_path+"/"+r.obj.oid, read_ofs, read_len, bind_executor(ex, d3n_libaio_handler{aio, r}));
}
};

View File

@ -147,12 +147,9 @@ add_executable(unittest_rgw_putobj test_rgw_putobj.cc)
add_ceph_unittest(unittest_rgw_putobj)
target_link_libraries(unittest_rgw_putobj ${rgw_libs} ${UNITTEST_LIBS})
add_executable(ceph_test_rgw_throttle
test_rgw_throttle.cc
$<TARGET_OBJECTS:unit-main>)
target_link_libraries(ceph_test_rgw_throttle ${rgw_libs}
librados global ${UNITTEST_LIBS})
install(TARGETS ceph_test_rgw_throttle DESTINATION ${CMAKE_INSTALL_BINDIR})
add_executable(unittest_rgw_throttle test_rgw_throttle.cc)
add_ceph_unittest(unittest_rgw_throttle)
target_link_libraries(unittest_rgw_throttle ${rgw_libs} ${UNITTEST_LIBS})
add_executable(unittest_rgw_iam_policy test_rgw_iam_policy.cc)
add_ceph_unittest(unittest_rgw_iam_policy)

View File

@ -21,43 +21,10 @@
#include <spawn/spawn.hpp>
#include <gtest/gtest.h>
struct RadosEnv : public ::testing::Environment {
public:
static constexpr auto poolname = "ceph_test_rgw_throttle";
static std::optional<RGWSI_RADOS> rados;
void SetUp() override {
rados.emplace(g_ceph_context);
const NoDoutPrefix no_dpp(g_ceph_context, 1);
ASSERT_EQ(0, rados->start(null_yield, &no_dpp));
int r = rados->pool({poolname}).create(&no_dpp);
if (r == -EEXIST)
r = 0;
ASSERT_EQ(0, r);
}
void TearDown() override {
ASSERT_EQ(0, rados->get_rados_handle()->pool_delete(poolname));
rados->shutdown();
rados.reset();
}
};
std::optional<RGWSI_RADOS> RadosEnv::rados;
auto *const rados_env = ::testing::AddGlobalTestEnvironment(new RadosEnv);
// test fixture for global setup/teardown
class RadosFixture : public ::testing::Test {
protected:
RGWSI_RADOS::Obj make_obj(const std::string& oid) {
auto obj = RadosEnv::rados->obj({{RadosEnv::poolname}, oid});
const NoDoutPrefix no_dpp(g_ceph_context, 1);
ceph_assert_always(0 == obj.open(&no_dpp));
return obj;
}
};
using Aio_Throttle = RadosFixture;
static rgw_raw_obj make_obj(const std::string& oid)
{
return {{"testpool"}, oid};
}
namespace rgw {
@ -90,7 +57,7 @@ auto wait_for(boost::asio::io_context& context, ceph::timespan duration) {
};
}
TEST_F(Aio_Throttle, NoThrottleUpToMax)
TEST(Aio_Throttle, NoThrottleUpToMax)
{
BlockingAioThrottle throttle(4);
auto obj = make_obj(__PRETTY_FUNCTION__);
@ -118,7 +85,7 @@ TEST_F(Aio_Throttle, NoThrottleUpToMax)
}
}
TEST_F(Aio_Throttle, CostOverWindow)
TEST(Aio_Throttle, CostOverWindow)
{
BlockingAioThrottle throttle(4);
auto obj = make_obj(__PRETTY_FUNCTION__);
@ -129,7 +96,7 @@ TEST_F(Aio_Throttle, CostOverWindow)
EXPECT_EQ(-EDEADLK, c.front().result);
}
TEST_F(Aio_Throttle, ThrottleOverMax)
TEST(Aio_Throttle, ThrottleOverMax)
{
constexpr uint64_t window = 4;
BlockingAioThrottle throttle(window);
@ -167,7 +134,7 @@ TEST_F(Aio_Throttle, ThrottleOverMax)
EXPECT_EQ(window, max_outstanding);
}
TEST_F(Aio_Throttle, YieldCostOverWindow)
TEST(Aio_Throttle, YieldCostOverWindow)
{
auto obj = make_obj(__PRETTY_FUNCTION__);
@ -183,7 +150,7 @@ TEST_F(Aio_Throttle, YieldCostOverWindow)
context.run();
}
TEST_F(Aio_Throttle, YieldingThrottleOverMax)
TEST(Aio_Throttle, YieldingThrottleOverMax)
{
constexpr uint64_t window = 4;