librados: expose new checksum osd operation

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2017-03-30 13:53:28 -04:00
parent 2f4b8c0cf9
commit fcb42c7076
13 changed files with 609 additions and 2 deletions

View File

@ -151,6 +151,12 @@ enum {
};
/** @} */
typedef enum {
LIBRADOS_CHECKSUM_TYPE_XXHASH32 = 0,
LIBRADOS_CHECKSUM_TYPE_XXHASH64 = 1,
LIBRADOS_CHECKSUM_TYPE_CRC32C = 2
} rados_checksum_type_t;
/*
* snap id contants
*/
@ -330,7 +336,7 @@ typedef void *rados_write_op_t;
* rados_read_op_omap_cmp()
* - Object properties: rados_read_op_stat(), rados_read_op_assert_exists(),
* rados_read_op_assert_version()
* - IO on objects: rados_read_op_read()
* - IO on objects: rados_read_op_read(), rados_read_op_checksum()
* - Custom operations: rados_read_op_exec(), rados_read_op_exec_user_buf()
* - Request properties: rados_read_op_set_flags()
* - Performing the operation: rados_read_op_operate(),
@ -1452,6 +1458,42 @@ CEPH_RADOS_API int rados_append(rados_ioctx_t io, const char *oid,
CEPH_RADOS_API int rados_read(rados_ioctx_t io, const char *oid, char *buf,
size_t len, uint64_t off);
/**
* Compute checksum from object data
*
* The io context determines the snapshot to checksum, if any was set
* by rados_ioctx_snap_set_read(). The length of the init_value and
* resulting checksum are dependent upon the checksum type:
*
* XXHASH64: le64
* XXHASH32: le32
* CRC32C: le32
*
* The checksum result is encoded the following manner:
*
* le32 num_checksum_chunks
* {
* leXX checksum for chunk (where XX = appropriate size for the checksum type)
* } * num_checksum_chunks
*
* @param io the context in which to perform the checksum
* @param oid the name of the object to checksum
* @param type the checksum algorithm to utilize
* @param init_value the init value for the algorithm
* @param init_value_len the length of the init value
* @param len the number of bytes to checksum
* @param off the offset to start checksuming in the object
* @param chunk_size optional length-aligned chunk size for checksums
* @param pchecksum where to store the checksum result
* @param checksum_len the number of bytes available for the result
* @return negative error code on failure
*/
CEPH_RADOS_API int rados_checksum(rados_ioctx_t io, const char *oid,
rados_checksum_type_t type,
const char *init_value, size_t init_value_len,
size_t len, uint64_t off, size_t chunk_size,
char *pchecksum, size_t checksum_len);
/**
* Delete an object
*
@ -3062,6 +3104,29 @@ CEPH_RADOS_API void rados_read_op_read(rados_read_op_t read_op,
size_t *bytes_read,
int *prval);
/**
* Compute checksum from object data
*
* @param read_op operation to add this action to
* @param oid the name of the object to checksum
* @param type the checksum algorithm to utilize
* @param init_value the init value for the algorithm
* @param init_value_len the length of the init value
* @param len the number of bytes to checksum
* @param off the offset to start checksuming in the object
* @param chunk_size optional length-aligned chunk size for checksums
* @param pchecksum where to store the checksum result for this action
* @param checksum_len the number of bytes available for the result
* @param prval where to store the return value for this action
*/
CEPH_RADOS_API void rados_read_op_checksum(rados_read_op_t read_op,
rados_checksum_type_t type,
const char *init_value,
size_t init_value_len,
uint64_t offset, size_t len,
size_t chunk_size, char *pchecksum,
size_t checksum_len, int *prval);
/**
* Execute an OSD class method on an object
* See rados_exec() for general description.

View File

@ -481,6 +481,10 @@ namespace librados
void getxattr(const char *name, bufferlist *pbl, int *prval);
void getxattrs(std::map<std::string, bufferlist> *pattrs, int *prval);
void read(size_t off, uint64_t len, bufferlist *pbl, int *prval);
void checksum(rados_checksum_type_t type, const bufferlist &init_value_bl,
uint64_t off, size_t len, size_t chunk_size, bufferlist *pbl,
int *prval);
/**
* see aio_sparse_read()
*/
@ -744,6 +748,9 @@ namespace librados
int writesame(const std::string& oid, bufferlist& bl,
size_t write_len, uint64_t off);
int read(const std::string& oid, bufferlist& bl, size_t len, uint64_t off);
int checksum(const std::string& o, rados_checksum_type_t type,
const bufferlist &init_value_bl, size_t len, uint64_t off,
size_t chunk_size, bufferlist *pbl);
int remove(const std::string& oid);
int remove(const std::string& oid, int flags);
int trunc(const std::string& oid, uint64_t size);

View File

@ -1426,6 +1426,27 @@ int librados::IoCtxImpl::sparse_read(const object_t& oid,
return m.size();
}
int librados::IoCtxImpl::checksum(const object_t& oid, uint8_t type,
const bufferlist &init_value, size_t len,
uint64_t off, size_t chunk_size,
bufferlist *pbl)
{
if (len > (size_t) INT_MAX) {
return -EDOM;
}
::ObjectOperation rd;
prepare_assert_ops(&rd);
rd.checksum(type, init_value, off, len, chunk_size, pbl, nullptr, nullptr);
int r = operate_read(oid, &rd, nullptr);
if (r < 0) {
return r;
}
return 0;
}
int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime)
{
uint64_t size;

View File

@ -131,6 +131,8 @@ struct librados::IoCtxImpl {
std::map<uint64_t,uint64_t>& m);
int sparse_read(const object_t& oid, std::map<uint64_t,uint64_t>& m,
bufferlist& bl, size_t len, uint64_t off);
int checksum(const object_t& oid, uint8_t type, const bufferlist &init_value,
size_t len, uint64_t off, size_t chunk_size, bufferlist *pbl);
int remove(const object_t& oid);
int remove(const object_t& oid, int flags);
int stat(const object_t& oid, uint64_t *psize, time_t *pmtime);

View File

@ -68,6 +68,19 @@ namespace {
TracepointProvider::Traits tracepoint_traits("librados_tp.so", "rados_tracing");
uint8_t get_checksum_op_type(rados_checksum_type_t type) {
switch (type) {
case LIBRADOS_CHECKSUM_TYPE_XXHASH32:
return CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH32;
case LIBRADOS_CHECKSUM_TYPE_XXHASH64:
return CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH64;
case LIBRADOS_CHECKSUM_TYPE_CRC32C:
return CEPH_OSD_CHECKSUM_OP_TYPE_CRC32C;
default:
return -1;
}
}
} // anonymous namespace
/*
@ -228,6 +241,17 @@ void librados::ObjectReadOperation::sparse_read(uint64_t off, uint64_t len,
o->sparse_read(off, len, m, data_bl, prval);
}
void librados::ObjectReadOperation::checksum(rados_checksum_type_t type,
const bufferlist &init_value_bl,
uint64_t off, size_t len,
size_t chunk_size, bufferlist *pbl,
int *prval)
{
::ObjectOperation *o = &impl->o;
o->checksum(get_checksum_op_type(type), init_value_bl, off, len, chunk_size,
pbl, prval, nullptr);
}
void librados::ObjectReadOperation::tmap_get(bufferlist *pbl, int *prval)
{
::ObjectOperation *o = &impl->o;
@ -1163,6 +1187,16 @@ int librados::IoCtx::read(const std::string& oid, bufferlist& bl, size_t len, ui
return io_ctx_impl->read(obj, bl, len, off);
}
int librados::IoCtx::checksum(const std::string& oid,
rados_checksum_type_t type,
const bufferlist &init_value_bl, size_t len,
uint64_t off, size_t chunk_size, bufferlist *pbl)
{
object_t obj(oid);
return io_ctx_impl->checksum(obj, get_checksum_op_type(type), init_value_bl,
len, off, chunk_size, pbl);
}
int librados::IoCtx::remove(const std::string& oid)
{
object_t obj(oid);
@ -3506,6 +3540,36 @@ extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len
return ret;
}
extern "C" int rados_checksum(rados_ioctx_t io, const char *o,
rados_checksum_type_t type,
const char *init_value, size_t init_value_len,
size_t len, uint64_t off, size_t chunk_size,
char *pchecksum, size_t checksum_len)
{
tracepoint(librados, rados_checksum_enter, io, o, type, init_value,
init_value_len, len, off, chunk_size);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
bufferlist init_value_bl;
init_value_bl.append(init_value, init_value_len);
bufferlist checksum_bl;
int retval = ctx->checksum(oid, get_checksum_op_type(type), init_value_bl,
len, off, chunk_size, &checksum_bl);
if (retval >= 0) {
if (checksum_bl.length() > checksum_len) {
tracepoint(librados, rados_checksum_exit, -ERANGE, NULL, 0);
return -ERANGE;
}
checksum_bl.copy(0, checksum_bl.length(), pchecksum);
}
tracepoint(librados, rados_checksum_exit, retval, pchecksum, checksum_len);
return retval;
}
extern "C" uint64_t rados_get_last_version(rados_ioctx_t io)
{
tracepoint(librados, rados_get_last_version_enter, io);
@ -5531,6 +5595,31 @@ extern "C" void rados_read_op_read(rados_read_op_t read_op,
tracepoint(librados, rados_read_op_read_exit);
}
extern "C" void rados_read_op_checksum(rados_read_op_t read_op,
rados_checksum_type_t type,
const char *init_value,
size_t init_value_len,
uint64_t offset, size_t len,
size_t chunk_size, char *pchecksum,
size_t checksum_len, int *prval)
{
tracepoint(librados, rados_read_op_checksum_enter, read_op, type, init_value,
init_value_len, offset, len, chunk_size);
bufferlist init_value_bl;
init_value_bl.append(init_value, init_value_len);
C_bl_to_buf *ctx = nullptr;
if (pchecksum != nullptr) {
ctx = new C_bl_to_buf(pchecksum, checksum_len, nullptr, prval);
}
((::ObjectOperation *)read_op)->checksum(get_checksum_op_type(type),
init_value_bl, offset, len,
chunk_size,
(ctx ? &ctx->out_bl : nullptr),
prval, ctx);
tracepoint(librados, rados_read_op_checksum_exit);
}
class C_out_buffer : public Context {
char **out_buf;
size_t *out_len;

View File

@ -367,6 +367,22 @@ struct ObjectOperation {
add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
}
void checksum(uint8_t type, const bufferlist &init_value_bl,
uint64_t off, uint64_t len, size_t chunk_size,
bufferlist *pbl, int *prval, Context *ctx) {
OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
osd_op.op.checksum.offset = off;
osd_op.op.checksum.length = len;
osd_op.op.checksum.type = type;
osd_op.op.checksum.chunk_size = chunk_size;
osd_op.indata.append(init_value_bl);
unsigned p = ops.size() - 1;
out_bl[p] = pbl;
out_rval[p] = prval;
out_handler[p] = ctx;
}
// object attrs
void getxattr(const char *name, bufferlist *pbl, int *prval) {
bufferlist bl;

View File

@ -11,6 +11,20 @@ using namespace librados;
std::string RadosTestNS::pool_name;
rados_t RadosTestNS::s_cluster = NULL;
namespace {
void init_rand() {
static bool seeded = false;
if (!seeded) {
seeded = true;
int seed = getpid();
std::cout << "seed " << seed << std::endl;
srand(seed);
}
}
} // anonymous namespace
void RadosTestNS::SetUpTestCase()
{
pool_name = get_temp_pool_name();
@ -306,6 +320,8 @@ Rados RadosTestPP::s_cluster;
void RadosTestPP::SetUpTestCase()
{
init_rand();
pool_name = get_temp_pool_name();
ASSERT_EQ("", create_one_pool_pp(pool_name, s_cluster));
}

View File

@ -424,6 +424,66 @@ TEST(LibRadosAio, RoundTrip2) {
rados_aio_release(my_completion2);
}
TEST(LibRadosAio, RoundTrip3) {
AioTestData test_data;
rados_completion_t my_completion;
ASSERT_EQ("", test_data.init());
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_complete, set_completion_safe, &my_completion));
char buf[128];
memset(buf, 0xcc, sizeof(buf));
rados_write_op_t op1 = rados_create_write_op();
rados_write_op_write(op1, buf, sizeof(buf), 0);
rados_write_op_set_alloc_hint2(op1, 0, 0, LIBRADOS_OP_FLAG_FADVISE_DONTNEED);
ASSERT_EQ(0, rados_aio_write_op_operate(op1, test_data.m_ioctx, my_completion,
"foo", NULL, 0));
rados_release_write_op(op1);
{
TestAlarm alarm;
sem_wait(test_data.m_sem);
sem_wait(test_data.m_sem);
}
ASSERT_EQ(0, rados_aio_get_return_value(my_completion));
rados_aio_release(my_completion);
char buf2[128];
memset(buf2, 0, sizeof(buf2));
rados_completion_t my_completion2;
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_complete, set_completion_safe, &my_completion2));
rados_read_op_t op2 = rados_create_read_op();
rados_read_op_read(op2, 0, sizeof(buf2), buf2, NULL, NULL);
rados_read_op_set_flags(op2, LIBRADOS_OP_FLAG_FADVISE_NOCACHE |
LIBRADOS_OP_FLAG_FADVISE_RANDOM);
__le32 init_value = -1;
__le32 checksum[2];
rados_read_op_checksum(op2, LIBRADOS_CHECKSUM_TYPE_CRC32C,
reinterpret_cast<char *>(&init_value),
sizeof(init_value), 0, 0, 0,
reinterpret_cast<char *>(&checksum),
sizeof(checksum), NULL);
ASSERT_EQ(0, rados_aio_read_op_operate(op2, test_data.m_ioctx, my_completion2,
"foo", 0));
rados_release_read_op(op2);
{
TestAlarm alarm;
ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2));
}
ASSERT_EQ(0, rados_aio_get_return_value(my_completion2));
ASSERT_EQ(0, memcmp(buf, buf2, sizeof(buf)));
rados_aio_release(my_completion2);
bufferlist bl;
bl.append(buf, sizeof(buf));
ASSERT_EQ(1U, checksum[0]);
ASSERT_EQ(bl.crc32c(-1), checksum[1]);
}
TEST(LibRadosAio, RoundTripPP) {
AioTestDataPP test_data;
ASSERT_EQ("", test_data.init());
@ -511,7 +571,7 @@ TEST(LibRadosAio, RoundTripPP3)
char buf[128];
memset(buf, 0xcc, sizeof(buf));
bufferlist bl;
bl.append(buf);
bl.append(buf, sizeof(buf));
op.write(0, bl);
op.set_op_flags2(LIBRADOS_OP_FLAG_FADVISE_DONTNEED);
@ -527,6 +587,11 @@ TEST(LibRadosAio, RoundTripPP3)
ObjectReadOperation op1;
op1.read(0, sizeof(buf), &bl, NULL);
op1.set_op_flags2(LIBRADOS_OP_FLAG_FADVISE_DONTNEED|LIBRADOS_OP_FLAG_FADVISE_RANDOM);
bufferlist init_value_bl;
::encode(static_cast<int32_t>(-1), init_value_bl);
bufferlist csum_bl;
op1.checksum(LIBRADOS_CHECKSUM_TYPE_CRC32C, init_value_bl,
0, 0, 0, &csum_bl, nullptr);
ioctx.aio_operate("test_obj", my_completion2.get(), &op1, 0);
{
TestAlarm alarm;
@ -534,6 +599,15 @@ TEST(LibRadosAio, RoundTripPP3)
}
EXPECT_EQ(0, my_completion2->get_return_value());
ASSERT_EQ(0, memcmp(buf, bl.c_str(), sizeof(buf)));
ASSERT_EQ(8U, csum_bl.length());
auto csum_bl_it = csum_bl.begin();
uint32_t csum_count;
uint32_t csum;
::decode(csum_count, csum_bl_it);
ASSERT_EQ(1U, csum_count);
::decode(csum, csum_bl_it);
ASSERT_EQ(bl.crc32c(-1), csum);
ioctx.remove("test_obj");
destroy_one_pool_pp(pool_name, cluster);
}

View File

@ -315,6 +315,72 @@ TEST_F(CReadOpsTest, Read) {
remove_object();
}
TEST_F(CReadOpsTest, Checksum) {
write_object();
{
rados_read_op_t op = rados_create_read_op();
uint64_t init_value = -1;
rados_read_op_checksum(op, LIBRADOS_CHECKSUM_TYPE_XXHASH64,
reinterpret_cast<char *>(&init_value),
sizeof(init_value), 0, len, 0, NULL, 0, NULL);
ASSERT_EQ(0, rados_read_op_operate(op, ioctx, obj, 0));
rados_release_read_op(op);
}
{
uint32_t init_value = -1;
uint32_t crc[2];
rados_read_op_t op = rados_create_read_op();
rados_read_op_checksum(op, LIBRADOS_CHECKSUM_TYPE_CRC32C,
reinterpret_cast<char *>(&init_value),
sizeof(init_value), 0, len, 0,
reinterpret_cast<char *>(&crc), sizeof(crc),
nullptr);
ASSERT_EQ(0, rados_read_op_operate(op, ioctx, obj, 0));
ASSERT_EQ(1U, crc[0]);
uint32_t expected_crc = ceph_crc32c(
-1, reinterpret_cast<const uint8_t*>(data), static_cast<uint32_t>(len));
ASSERT_EQ(expected_crc, crc[1]);
rados_release_read_op(op);
}
{
uint32_t init_value = -1;
int rval;
rados_read_op_t op = rados_create_read_op();
rados_read_op_checksum(op, LIBRADOS_CHECKSUM_TYPE_XXHASH32,
reinterpret_cast<char *>(&init_value),
sizeof(init_value), 0, len, 0, nullptr, 0, &rval);
ASSERT_EQ(0, rados_read_op_operate(op, ioctx, obj, 0));
ASSERT_EQ(0, rval);
rados_release_read_op(op);
}
{
uint32_t init_value = -1;
uint32_t crc[3];
int rval;
rados_read_op_t op = rados_create_read_op();
rados_read_op_checksum(op, LIBRADOS_CHECKSUM_TYPE_CRC32C,
reinterpret_cast<char *>(&init_value),
sizeof(init_value), 0, len, 4,
reinterpret_cast<char *>(&crc), sizeof(crc), &rval);
ASSERT_EQ(0, rados_read_op_operate(op, ioctx, obj, 0));
ASSERT_EQ(2U, crc[0]);
uint32_t expected_crc[2];
expected_crc[0] = ceph_crc32c(
-1, reinterpret_cast<const uint8_t*>(data), 4U);
expected_crc[1] = ceph_crc32c(
-1, reinterpret_cast<const uint8_t*>(data + 4), 4U);
ASSERT_EQ(expected_crc[0], crc[1]);
ASSERT_EQ(expected_crc[1], crc[2]);
ASSERT_EQ(0, rval);
rados_release_read_op(op);
}
remove_object();
}
TEST_F(CReadOpsTest, RWOrderedRead) {
write_object();

View File

@ -5,6 +5,7 @@
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
#include "include/encoding.h"
#include "include/scope_guard.h"
#include "test/librados/test.h"
#include "test/librados/TestCase.h"
@ -299,6 +300,44 @@ TEST_F(LibRadosIoPP, RoundTripPP2)
ASSERT_EQ(0, memcmp(bl.c_str(), "ceph", 4));
}
TEST_F(LibRadosIo, Checksum) {
char buf[128];
memset(buf, 0xcc, sizeof(buf));
ASSERT_EQ(0, rados_write(ioctx, "foo", buf, sizeof(buf), 0));
uint32_t expected_crc = ceph_crc32c(-1, reinterpret_cast<const uint8_t*>(buf),
sizeof(buf));
uint32_t init_value = -1;
uint32_t crc[2];
ASSERT_EQ(0, rados_checksum(ioctx, "foo", LIBRADOS_CHECKSUM_TYPE_CRC32C,
reinterpret_cast<char*>(&init_value),
sizeof(init_value), sizeof(buf), 0, 0,
reinterpret_cast<char*>(&crc), sizeof(crc)));
ASSERT_EQ(1U, crc[0]);
ASSERT_EQ(expected_crc, crc[1]);
}
TEST_F(LibRadosIoPP, Checksum) {
char buf[128];
Rados cluster;
memset(buf, 0xcc, sizeof(buf));
bufferlist bl;
bl.append(buf, sizeof(buf));
ASSERT_EQ(0, ioctx.write("foo", bl, sizeof(buf), 0));
bufferlist init_value_bl;
::encode(static_cast<uint32_t>(-1), init_value_bl);
bufferlist csum_bl;
ASSERT_EQ(0, ioctx.checksum("foo", LIBRADOS_CHECKSUM_TYPE_CRC32C,
init_value_bl, sizeof(buf), 0, 0, &csum_bl));
auto csum_bl_it = csum_bl.begin();
uint32_t csum_count;
::decode(csum_count, csum_bl_it);
ASSERT_EQ(1U, csum_count);
uint32_t csum;
::decode(csum, csum_bl_it);
ASSERT_EQ(bl.crc32c(-1), csum);
}
TEST_F(LibRadosIo, OverlappingWriteRoundTrip) {
char buf[128];
char buf2[64];

View File

@ -8,6 +8,7 @@
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
#include "include/stringify.h"
#include "common/Checksummer.h"
#include "global/global_context.h"
#include "test/librados/test.h"
#include "test/librados/TestCase.h"
@ -1070,3 +1071,130 @@ TEST_F(LibRadosMisc, WriteSame) {
/* write_len = data_len, i.e. same as rados_write() */
ASSERT_EQ(0, rados_writesame(ioctx, "ws", buf, sizeof(buf), sizeof(buf), 0));
}
template <typename T>
class LibRadosChecksum : public LibRadosMiscPP {
public:
typedef typename T::alg_t alg_t;
typedef typename T::value_t value_t;
typedef typename alg_t::init_value_t init_value_t;
static const rados_checksum_type_t type = T::type;
bufferlist content_bl;
using LibRadosMiscPP::SetUpTestCase;
using LibRadosMiscPP::TearDownTestCase;
void SetUp() override {
LibRadosMiscPP::SetUp();
std::string content(4096, '\0');
for (size_t i = 0; i < content.length(); ++i) {
content[i] = static_cast<char>(rand() % (126 - 33) + 33);
}
content_bl.append(content);
ASSERT_EQ(0, ioctx.write("foo", content_bl, content_bl.length(), 0));
}
};
template <rados_checksum_type_t _type, typename AlgT, typename ValueT>
class LibRadosChecksumParams {
public:
typedef AlgT alg_t;
typedef ValueT value_t;
static const rados_checksum_type_t type = _type;
};
typedef ::testing::Types<
LibRadosChecksumParams<LIBRADOS_CHECKSUM_TYPE_XXHASH32,
Checksummer::xxhash32, uint32_t>,
LibRadosChecksumParams<LIBRADOS_CHECKSUM_TYPE_XXHASH64,
Checksummer::xxhash64, uint64_t>,
LibRadosChecksumParams<LIBRADOS_CHECKSUM_TYPE_CRC32C,
Checksummer::crc32c, uint32_t>
> LibRadosChecksumTypes;
TYPED_TEST_CASE(LibRadosChecksum, LibRadosChecksumTypes);
TYPED_TEST(LibRadosChecksum, Subset) {
uint32_t chunk_size = 1024;
uint32_t csum_count = this->content_bl.length() / chunk_size;
typename TestFixture::init_value_t init_value = -1;
bufferlist init_value_bl;
::encode(init_value, init_value_bl);
std::vector<bufferlist> checksum_bls(csum_count);
std::vector<int> checksum_rvals(csum_count);
// individual checksum ops for each chunk
ObjectReadOperation op;
for (uint32_t i = 0; i < csum_count; ++i) {
op.checksum(TestFixture::type, init_value_bl, i * chunk_size, chunk_size,
0, &checksum_bls[i], &checksum_rvals[i]);
}
ASSERT_EQ(0, this->ioctx.operate("foo", &op, NULL));
for (uint32_t i = 0; i < csum_count; ++i) {
ASSERT_EQ(0, checksum_rvals[i]);
auto bl_it = checksum_bls[i].begin();
uint32_t count;
::decode(count, bl_it);
ASSERT_EQ(1U, count);
typename TestFixture::value_t value;
::decode(value, bl_it);
bufferlist content_sub_bl;
content_sub_bl.substr_of(this->content_bl, i * chunk_size, chunk_size);
typename TestFixture::value_t expected_value;
bufferptr expected_value_bp = buffer::create_static(
sizeof(expected_value), reinterpret_cast<char*>(&expected_value));
Checksummer::template calculate<typename TestFixture::alg_t>(
init_value, chunk_size, 0, chunk_size, content_sub_bl,
&expected_value_bp);
ASSERT_EQ(expected_value, value);
}
}
TYPED_TEST(LibRadosChecksum, Chunked) {
uint32_t chunk_size = 1024;
uint32_t csum_count = this->content_bl.length() / chunk_size;
typename TestFixture::init_value_t init_value = -1;
bufferlist init_value_bl;
::encode(init_value, init_value_bl);
bufferlist checksum_bl;
int checksum_rval;
// single op with chunked checksum results
ObjectReadOperation op;
op.checksum(TestFixture::type, init_value_bl, 0, this->content_bl.length(),
chunk_size, &checksum_bl, &checksum_rval);
ASSERT_EQ(0, this->ioctx.operate("foo", &op, NULL));
ASSERT_EQ(0, checksum_rval);
auto bl_it = checksum_bl.begin();
uint32_t count;
::decode(count, bl_it);
ASSERT_EQ(csum_count, count);
std::vector<typename TestFixture::value_t> expected_values(csum_count);
bufferptr expected_values_bp = buffer::create_static(
csum_count * sizeof(typename TestFixture::value_t),
reinterpret_cast<char*>(&expected_values[0]));
Checksummer::template calculate<typename TestFixture::alg_t>(
init_value, chunk_size, 0, this->content_bl.length(), this->content_bl,
&expected_values_bp);
for (uint32_t i = 0; i < csum_count; ++i) {
typename TestFixture::value_t value;
::decode(value, bl_it);
ASSERT_EQ(expected_values[i], value);
}
}

View File

@ -1180,6 +1180,9 @@ public:
vector<bool> is_sparse_read;
uint64_t waiting_on;
vector<bufferlist> checksums;
vector<int> checksum_retvals;
map<string, bufferlist> attrs;
int attrretval;
@ -1205,6 +1208,8 @@ public:
extent_results(3),
is_sparse_read(3, false),
waiting_on(0),
checksums(3),
checksum_retvals(3),
attrretval(0)
{}
@ -1218,6 +1223,10 @@ public:
len,
&results[index],
&retvals[index]);
bufferlist init_value_bl;
::encode(static_cast<uint32_t>(-1), init_value_bl);
read_op.checksum(LIBRADOS_CHECKSUM_TYPE_CRC32C, init_value_bl, 0, len,
0, &checksums[index], &checksum_retvals[index]);
} else {
is_sparse_read[index] = true;
read_op.sparse_read(0,
@ -1391,6 +1400,24 @@ public:
cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl;
context->errors++;
}
uint32_t checksum = 0;
if (checksum_retvals[i] == 0) {
try {
auto bl_it = checksums[i].begin();
uint32_t csum_count;
::decode(csum_count, bl_it);
::decode(checksum, bl_it);
} catch (const buffer::error &err) {
checksum_retvals[i] = -EBADMSG;
}
}
if (checksum_retvals[i] != 0 || checksum != results[i].crc32c(-1)) {
cerr << num << ": oid " << oid << " checksum " << checksums[i]
<< " incorrect, expecting " << results[i].crc32c(-1)
<< std::endl;
context->errors++;
}
}
}
if (context->errors) ceph_abort();

View File

@ -940,6 +940,39 @@ TRACEPOINT_EVENT(librados, rados_read_exit,
)
)
TRACEPOINT_EVENT(librados, rados_checksum_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
const char*, oid,
int, type,
const char*, init_value,
size_t, init_value_len,
size_t, len,
uint64_t, off,
size_t, chunk_size),
TP_FIELDS(
ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
ctf_string(oid, oid)
ctf_integer(int, type, type)
ceph_ctf_sequence(unsigned char, init_value, init_value, size_t, init_value_len)
ctf_integer(size_t, len, len)
ctf_integer(uint64_t, off, off)
ctf_integer(size_t, chunk_size, chunk_size)
)
)
TRACEPOINT_EVENT(librados, rados_checksum_exit,
TP_ARGS(
int, retval,
const char*, checksum,
size_t, checksum_len
),
TP_FIELDS(
ctf_integer(int, retval, retval)
ceph_ctf_sequence(unsigned char, checksum, checksum, size_t, checksum_len)
)
)
TRACEPOINT_EVENT(librados, rados_get_last_version_enter,
TP_ARGS(
rados_ioctx_t, ioctx),
@ -3562,6 +3595,30 @@ TRACEPOINT_EVENT(librados, rados_read_op_read_exit,
TP_FIELDS()
)
TRACEPOINT_EVENT(librados, rados_read_op_checksum_enter,
TP_ARGS(
rados_read_op_t, read_op,
int, type,
const char*, init_value,
size_t, init_value_len,
uint64_t, offset,
size_t, len,
size_t, chunk_size),
TP_FIELDS(
ctf_integer_hex(rados_read_op_t, read_op, read_op)
ctf_integer(int, type, type)
ceph_ctf_sequence(unsigned char, init_value, init_value, size_t, init_value_len)
ctf_integer(uint64_t, offset, offset)
ctf_integer(size_t, len, len)
ctf_integer(size_t, chunk_size, chunk_size)
)
)
TRACEPOINT_EVENT(librados, rados_read_op_checksum_exit,
TP_ARGS(),
TP_FIELDS()
)
TRACEPOINT_EVENT(librados, rados_read_op_exec_enter,
TP_ARGS(
rados_read_op_t, read_op,