1
0
mirror of https://github.com/ceph/ceph synced 2025-03-30 23:40:09 +00:00

rgw: add rgw::putobj::ChunkProcessor and test

ChunkProcessor turns the input stream into a series of discrete chunks
before forwarding to the wrapped DataProcessor

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2018-10-10 15:11:48 -04:00
parent 504b7d9c21
commit 594dc4cceb
5 changed files with 189 additions and 0 deletions

View File

@ -84,6 +84,7 @@ set(librgw_common_srcs
rgw_op.cc
rgw_otp.cc
rgw_policy_s3.cc
rgw_putobj.cc
rgw_quota.cc
rgw_rados.cc
rgw_resolve.cc

50
src/rgw/rgw_putobj.cc Normal file
View File

@ -0,0 +1,50 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "rgw_putobj.h"
namespace rgw::putobj {
int ChunkProcessor::process(bufferlist&& data, uint64_t offset)
{
ceph_assert(offset >= chunk.length());
uint64_t position = offset - chunk.length();
const bool flush = (data.length() == 0);
if (flush) {
if (chunk.length() > 0) {
int r = Pipe::process(std::move(chunk), position);
if (r < 0) {
return r;
}
}
return Pipe::process({}, offset);
}
chunk.claim_append(data);
// write each full chunk
while (chunk.length() >= chunk_size) {
bufferlist bl;
chunk.splice(0, chunk_size, &bl);
int r = Pipe::process(std::move(bl), position);
if (r < 0) {
return r;
}
position += chunk_size;
}
return 0;
}
} // namespace rgw::putobj

View File

@ -41,4 +41,16 @@ class Pipe : public DataProcessor {
}
};
// pipe that writes to the next processor in discrete chunks
class ChunkProcessor : public Pipe {
uint64_t chunk_size;
bufferlist chunk; // leftover bytes from the last call to process()
public:
ChunkProcessor(DataProcessor *next, uint64_t chunk_size)
: Pipe(next), chunk_size(chunk_size)
{}
int process(bufferlist&& data, uint64_t offset) override;
};
} // namespace rgw::putobj

View File

@ -92,6 +92,10 @@ target_link_libraries(unittest_rgw_crypto
${CRYPTO_LIBS}
)
add_executable(unittest_rgw_putobj test_rgw_putobj.cc)
add_ceph_unittest(unittest_rgw_putobj)
target_link_libraries(unittest_rgw_putobj rgw_a ${UNITTEST_LIBS})
add_executable(unittest_rgw_iam_policy test_rgw_iam_policy.cc)
add_ceph_unittest(unittest_rgw_iam_policy)
target_link_libraries(unittest_rgw_iam_policy

View File

@ -0,0 +1,122 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "rgw/rgw_putobj.h"
#include <gtest/gtest.h>
inline bufferlist string_buf(const char* buf) {
bufferlist bl;
bl.append(buffer::create_static(strlen(buf), (char*)buf));
return bl;
}
struct Op {
std::string data;
uint64_t offset;
};
inline bool operator==(const Op& lhs, const Op& rhs) {
return lhs.data == rhs.data && lhs.offset == rhs.offset;
}
inline std::ostream& operator<<(std::ostream& out, const Op& op) {
return out << "{off=" << op.offset << " data='" << op.data << "'}";
}
struct MockProcessor : rgw::putobj::DataProcessor {
std::vector<Op> ops;
int process(bufferlist&& data, uint64_t offset) override {
ops.push_back({data.to_str(), offset});
return {};
}
};
TEST(PutObj_Chunk, FlushHalf)
{
MockProcessor mock;
rgw::putobj::ChunkProcessor chunk(&mock, 4);
ASSERT_EQ(0, chunk.process(string_buf("22"), 0));
ASSERT_TRUE(mock.ops.empty()); // no writes
ASSERT_EQ(0, chunk.process({}, 2)); // flush
ASSERT_EQ(2u, mock.ops.size());
EXPECT_EQ(Op({"22", 0}), mock.ops[0]);
EXPECT_EQ(Op({"", 2}), mock.ops[1]);
}
TEST(PutObj_Chunk, One)
{
MockProcessor mock;
rgw::putobj::ChunkProcessor chunk(&mock, 4);
ASSERT_EQ(0, chunk.process(string_buf("4444"), 0));
ASSERT_EQ(1u, mock.ops.size());
EXPECT_EQ(Op({"4444", 0}), mock.ops[0]);
ASSERT_EQ(0, chunk.process({}, 4)); // flush
ASSERT_EQ(2u, mock.ops.size());
EXPECT_EQ(Op({"", 4}), mock.ops[1]);
}
TEST(PutObj_Chunk, OneAndFlushHalf)
{
MockProcessor mock;
rgw::putobj::ChunkProcessor chunk(&mock, 4);
ASSERT_EQ(0, chunk.process(string_buf("22"), 0));
ASSERT_TRUE(mock.ops.empty());
ASSERT_EQ(0, chunk.process(string_buf("4444"), 2));
ASSERT_EQ(1u, mock.ops.size());
EXPECT_EQ(Op({"2244", 0}), mock.ops[0]);
ASSERT_EQ(0, chunk.process({}, 6)); // flush
ASSERT_EQ(3u, mock.ops.size());
EXPECT_EQ(Op({"44", 4}), mock.ops[1]);
EXPECT_EQ(Op({"", 6}), mock.ops[2]);
}
TEST(PutObj_Chunk, Two)
{
MockProcessor mock;
rgw::putobj::ChunkProcessor chunk(&mock, 4);
ASSERT_EQ(0, chunk.process(string_buf("88888888"), 0));
ASSERT_EQ(2u, mock.ops.size());
EXPECT_EQ(Op({"8888", 0}), mock.ops[0]);
EXPECT_EQ(Op({"8888", 4}), mock.ops[1]);
ASSERT_EQ(0, chunk.process({}, 8)); // flush
ASSERT_EQ(3u, mock.ops.size());
EXPECT_EQ(Op({"", 8}), mock.ops[2]);
}
TEST(PutObj_Chunk, TwoAndFlushHalf)
{
MockProcessor mock;
rgw::putobj::ChunkProcessor chunk(&mock, 4);
ASSERT_EQ(0, chunk.process(string_buf("22"), 0));
ASSERT_TRUE(mock.ops.empty());
ASSERT_EQ(0, chunk.process(string_buf("88888888"), 2));
ASSERT_EQ(2u, mock.ops.size());
EXPECT_EQ(Op({"2288", 0}), mock.ops[0]);
EXPECT_EQ(Op({"8888", 4}), mock.ops[1]);
ASSERT_EQ(0, chunk.process({}, 10)); // flush
ASSERT_EQ(4u, mock.ops.size());
EXPECT_EQ(Op({"88", 8}), mock.ops[2]);
EXPECT_EQ(Op({"", 10}), mock.ops[3]);
}