mirror of
https://github.com/ceph/ceph
synced 2025-02-23 02:57:21 +00:00
compressor/zlib:make zlib windowBits configurable for compression
Signed-off-by: WangPengfei <wpf_1253@qq.com> modified: src/common/legacy_config_opts.h modified: src/common/options.cc modified: src/compressor/Compressor.h modified: src/compressor/QatAccel.cc modified: src/compressor/QatAccel.h modified: src/compressor/brotli/BrotliCompressor.cc modified: src/compressor/brotli/BrotliCompressor.h modified: src/compressor/lz4/LZ4Compressor.h modified: src/compressor/snappy/SnappyCompressor.h modified: src/compressor/zlib/ZlibCompressor.cc modified: src/compressor/zlib/ZlibCompressor.h modified: src/compressor/zstd/ZstdCompressor.h modified: src/os/bluestore/BlueStore.cc modified: src/os/bluestore/bluestore_types.cc modified: src/os/bluestore/bluestore_types.h modified: src/rgw/rgw_compression.cc modified: src/rgw/rgw_compression.h modified: src/rgw/rgw_compression_types.h modified: src/rgw/rgw_json_enc.cc modified: src/rgw/rgw_op.cc modified: src/rgw/rgw_rados.cc modified: src/test/compressor/compressor_example.h modified: src/test/compressor/test_compression.cc modified: src/test/rgw/test_rgw_compression.cc
This commit is contained in:
parent
84fbf1c5cd
commit
81e9fe6ffe
@ -82,6 +82,7 @@ SAFE_OPTION(plugin_dir, OPT_STR)
|
||||
|
||||
OPTION(compressor_zlib_isal, OPT_BOOL)
|
||||
OPTION(compressor_zlib_level, OPT_INT) //regular zlib compression level, not applicable to isa-l optimized version
|
||||
OPTION(compressor_zlib_winsize, OPT_INT) //regular zlib compression winsize, not applicable to isa-l optimized version
|
||||
OPTION(compressor_zstd_level, OPT_INT) //regular zstd compression level
|
||||
|
||||
OPTION(qat_compressor_enabled, OPT_BOOL)
|
||||
|
@ -791,6 +791,11 @@ std::vector<Option> get_global_options() {
|
||||
.set_default(5)
|
||||
.set_description("Zlib compression level to use"),
|
||||
|
||||
Option("compressor_zlib_winsize", Option::TYPE_INT, Option::LEVEL_ADVANCED)
|
||||
.set_default(-15)
|
||||
.set_min_max(-15,32)
|
||||
.set_description("Zlib compression winsize to use"),
|
||||
|
||||
Option("compressor_zstd_level", Option::TYPE_INT, Option::LEVEL_ADVANCED)
|
||||
.set_default(1)
|
||||
.set_description("Zstd compression level to use"),
|
||||
|
@ -88,11 +88,11 @@ public:
|
||||
CompressionAlgorithm get_type() const {
|
||||
return alg;
|
||||
}
|
||||
virtual int compress(const ceph::bufferlist &in, ceph::bufferlist &out) = 0;
|
||||
virtual int decompress(const ceph::bufferlist &in, ceph::bufferlist &out) = 0;
|
||||
virtual int compress(const ceph::bufferlist &in, ceph::bufferlist &out, boost::optional<int32_t> &compressor_message) = 0;
|
||||
virtual int decompress(const ceph::bufferlist &in, ceph::bufferlist &out, boost::optional<int32_t> compressor_message) = 0;
|
||||
// this is a bit weird but we need non-const iterator to be in
|
||||
// alignment with decode methods
|
||||
virtual int decompress(ceph::bufferlist::const_iterator &p, size_t compressed_len, ceph::bufferlist &out) = 0;
|
||||
virtual int decompress(ceph::bufferlist::const_iterator &p, size_t compressed_len, ceph::bufferlist &out, boost::optional<int32_t> compressor_message) = 0;
|
||||
|
||||
static CompressorRef create(CephContext *cct, const std::string &type);
|
||||
static CompressorRef create(CephContext *cct, int alg);
|
||||
|
@ -59,7 +59,7 @@ bool QatAccel::init(const std::string &alg) {
|
||||
return true;
|
||||
}
|
||||
|
||||
int QatAccel::compress(const bufferlist &in, bufferlist &out) {
|
||||
int QatAccel::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) {
|
||||
for (auto &i : in.buffers()) {
|
||||
const unsigned char* c_in = (unsigned char*) i.c_str();
|
||||
unsigned int len = i.length();
|
||||
@ -75,14 +75,15 @@ int QatAccel::compress(const bufferlist &in, bufferlist &out) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int QatAccel::decompress(const bufferlist &in, bufferlist &out) {
|
||||
int QatAccel::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) {
|
||||
auto i = in.begin();
|
||||
return decompress(i, in.length(), out);
|
||||
return decompress(i, in.length(), out, compressor_message);
|
||||
}
|
||||
|
||||
int QatAccel::decompress(bufferlist::const_iterator &p,
|
||||
size_t compressed_len,
|
||||
bufferlist &dst) {
|
||||
bufferlist &dst,
|
||||
boost::optional<int32_t> compressor_message) {
|
||||
unsigned int ratio_idx = 0;
|
||||
bool read_more = false;
|
||||
bool joint = false;
|
||||
|
@ -27,9 +27,9 @@ class QatAccel {
|
||||
|
||||
bool init(const std::string &alg);
|
||||
|
||||
int compress(const bufferlist &in, bufferlist &out);
|
||||
int decompress(const bufferlist &in, bufferlist &out);
|
||||
int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst);
|
||||
int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message);
|
||||
int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message);
|
||||
int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst, boost::optional<int32_t> compressor_message);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -5,7 +5,7 @@
|
||||
|
||||
#define MAX_LEN (CEPH_PAGE_SIZE)
|
||||
|
||||
int BrotliCompressor::compress(const bufferlist &in, bufferlist &out)
|
||||
int BrotliCompressor::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
|
||||
{
|
||||
BrotliEncoderState* s = BrotliEncoderCreateInstance(nullptr,
|
||||
nullptr,
|
||||
@ -49,7 +49,8 @@ int BrotliCompressor::compress(const bufferlist &in, bufferlist &out)
|
||||
|
||||
int BrotliCompressor::decompress(bufferlist::const_iterator &p,
|
||||
size_t compressed_size,
|
||||
bufferlist &out)
|
||||
bufferlist &out,
|
||||
boost::optional<int32_t> compressor_message)
|
||||
{
|
||||
BrotliDecoderState* s = BrotliDecoderCreateInstance(nullptr,
|
||||
nullptr,
|
||||
@ -88,8 +89,8 @@ int BrotliCompressor::decompress(bufferlist::const_iterator &p,
|
||||
return 0;
|
||||
}
|
||||
|
||||
int BrotliCompressor::decompress(const bufferlist &in, bufferlist &out)
|
||||
int BrotliCompressor::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message)
|
||||
{
|
||||
auto i = std::cbegin(in);
|
||||
return decompress(i, in.length(), out);
|
||||
return decompress(i, in.length(), out, compressor_message);
|
||||
}
|
||||
|
@ -22,9 +22,9 @@ class BrotliCompressor : public Compressor
|
||||
public:
|
||||
BrotliCompressor() : Compressor(COMP_ALG_BROTLI, "brotli") {}
|
||||
|
||||
int compress(const bufferlist &in, bufferlist &out) override;
|
||||
int decompress(const bufferlist &in, bufferlist &out) override;
|
||||
int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out) override;
|
||||
int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) override;
|
||||
int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) override;
|
||||
int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out, boost::optional<int32_t> compressor_message) override;
|
||||
};
|
||||
|
||||
#endif //CEPH_BROTLICOMPRESSOR_H
|
||||
|
@ -35,7 +35,7 @@ class LZ4Compressor : public Compressor {
|
||||
#endif
|
||||
}
|
||||
|
||||
int compress(const ceph::buffer::list &src, ceph::buffer::list &dst) override {
|
||||
int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> &compressor_message) override {
|
||||
// older versions of liblz4 introduce bit errors when compressing
|
||||
// fragmented buffers. this was fixed in lz4 commit
|
||||
// af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first
|
||||
@ -45,12 +45,12 @@ class LZ4Compressor : public Compressor {
|
||||
if (!src.is_contiguous()) {
|
||||
ceph::buffer::list new_src = src;
|
||||
new_src.rebuild();
|
||||
return compress(new_src, dst);
|
||||
return compress(new_src, dst, compressor_message);
|
||||
}
|
||||
|
||||
#ifdef HAVE_QATZIP
|
||||
if (qat_enabled)
|
||||
return qat_accel.compress(src, dst);
|
||||
return qat_accel.compress(src, dst, compressor_message);
|
||||
#endif
|
||||
ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(
|
||||
LZ4_compressBound(src.length()));
|
||||
@ -83,21 +83,22 @@ class LZ4Compressor : public Compressor {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst) override {
|
||||
int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> compressor_message) override {
|
||||
#ifdef HAVE_QATZIP
|
||||
if (qat_enabled)
|
||||
return qat_accel.decompress(src, dst);
|
||||
return qat_accel.decompress(src, dst, compressor_message);
|
||||
#endif
|
||||
auto i = std::cbegin(src);
|
||||
return decompress(i, src.length(), dst);
|
||||
return decompress(i, src.length(), dst, compressor_message);
|
||||
}
|
||||
|
||||
int decompress(ceph::buffer::list::const_iterator &p,
|
||||
size_t compressed_len,
|
||||
ceph::buffer::list &dst) override {
|
||||
ceph::buffer::list &dst,
|
||||
boost::optional<int32_t> compressor_message) override {
|
||||
#ifdef HAVE_QATZIP
|
||||
if (qat_enabled)
|
||||
return qat_accel.decompress(p, compressed_len, dst);
|
||||
return qat_accel.decompress(p, compressed_len, dst, compressor_message);
|
||||
#endif
|
||||
using ceph::decode;
|
||||
uint32_t count;
|
||||
|
@ -66,10 +66,10 @@ class SnappyCompressor : public Compressor {
|
||||
#endif
|
||||
}
|
||||
|
||||
int compress(const ceph::bufferlist &src, ceph::bufferlist &dst) override {
|
||||
int compress(const ceph::bufferlist &src, ceph::bufferlist &dst, boost::optional<int32_t> &compressor_message) override {
|
||||
#ifdef HAVE_QATZIP
|
||||
if (qat_enabled)
|
||||
return qat_accel.compress(src, dst);
|
||||
return qat_accel.compress(src, dst, compressor_message);
|
||||
#endif
|
||||
BufferlistSource source(const_cast<ceph::bufferlist&>(src).begin(), src.length());
|
||||
ceph::bufferptr ptr = ceph::buffer::create_small_page_aligned(
|
||||
@ -80,21 +80,22 @@ class SnappyCompressor : public Compressor {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int decompress(const ceph::bufferlist &src, ceph::bufferlist &dst) override {
|
||||
int decompress(const ceph::bufferlist &src, ceph::bufferlist &dst, boost::optional<int32_t> compressor_message) override {
|
||||
#ifdef HAVE_QATZIP
|
||||
if (qat_enabled)
|
||||
return qat_accel.decompress(src, dst);
|
||||
return qat_accel.decompress(src, dst, compressor_message);
|
||||
#endif
|
||||
auto i = src.begin();
|
||||
return decompress(i, src.length(), dst);
|
||||
return decompress(i, src.length(), dst, compressor_message);
|
||||
}
|
||||
|
||||
int decompress(ceph::bufferlist::const_iterator &p,
|
||||
size_t compressed_len,
|
||||
ceph::bufferlist &dst) override {
|
||||
ceph::bufferlist &dst,
|
||||
boost::optional<int32_t> compressor_message) override {
|
||||
#ifdef HAVE_QATZIP
|
||||
if (qat_enabled)
|
||||
return qat_accel.decompress(p, compressed_len, dst);
|
||||
return qat_accel.decompress(p, compressed_len, dst, compressor_message);
|
||||
#endif
|
||||
snappy::uint32 res_len = 0;
|
||||
BufferlistSource source_1(p, compressed_len);
|
||||
|
@ -52,7 +52,7 @@ _prefix(std::ostream* _dout)
|
||||
// compression ratio.
|
||||
#define ZLIB_MEMORY_LEVEL 8
|
||||
|
||||
int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out)
|
||||
int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
|
||||
{
|
||||
int ret;
|
||||
unsigned have;
|
||||
@ -64,12 +64,13 @@ int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out)
|
||||
strm.zalloc = Z_NULL;
|
||||
strm.zfree = Z_NULL;
|
||||
strm.opaque = Z_NULL;
|
||||
ret = deflateInit2(&strm, cct->_conf->compressor_zlib_level, Z_DEFLATED, ZLIB_DEFAULT_WIN_SIZE, ZLIB_MEMORY_LEVEL, Z_DEFAULT_STRATEGY);
|
||||
ret = deflateInit2(&strm, cct->_conf->compressor_zlib_level, Z_DEFLATED, cct->_conf->compressor_zlib_winsize, ZLIB_MEMORY_LEVEL, Z_DEFAULT_STRATEGY);
|
||||
if (ret != Z_OK) {
|
||||
dout(1) << "Compression init error: init return "
|
||||
<< ret << " instead of Z_OK" << dendl;
|
||||
return -1;
|
||||
}
|
||||
compressor_message = cct->_conf->compressor_zlib_winsize;
|
||||
|
||||
for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin();
|
||||
i != in.buffers().end();) {
|
||||
@ -113,7 +114,7 @@ int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out)
|
||||
}
|
||||
|
||||
#if __x86_64__ && defined(HAVE_BETTER_YASM_ELF64)
|
||||
int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out)
|
||||
int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
|
||||
{
|
||||
int ret;
|
||||
unsigned have;
|
||||
@ -124,6 +125,7 @@ int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out)
|
||||
/* allocate deflate state */
|
||||
isal_deflate_init(&strm);
|
||||
strm.end_of_stream = 0;
|
||||
compressor_message = ZLIB_DEFAULT_WIN_SIZE;
|
||||
|
||||
for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin();
|
||||
i != in.buffers().end();) {
|
||||
@ -166,27 +168,27 @@ int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out)
|
||||
}
|
||||
#endif
|
||||
|
||||
int ZlibCompressor::compress(const bufferlist &in, bufferlist &out)
|
||||
int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
|
||||
{
|
||||
#ifdef HAVE_QATZIP
|
||||
if (qat_enabled)
|
||||
return qat_accel.compress(in, out);
|
||||
return qat_accel.compress(in, out, compressor_message);
|
||||
#endif
|
||||
#if __x86_64__ && defined(HAVE_BETTER_YASM_ELF64)
|
||||
if (isal_enabled)
|
||||
return isal_compress(in, out);
|
||||
return isal_compress(in, out, compressor_message);
|
||||
else
|
||||
return zlib_compress(in, out);
|
||||
return zlib_compress(in, out, compressor_message);
|
||||
#else
|
||||
return zlib_compress(in, out);
|
||||
return zlib_compress(in, out, compressor_message);
|
||||
#endif
|
||||
}
|
||||
|
||||
int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out)
|
||||
int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, boost::optional<int32_t> compressor_message)
|
||||
{
|
||||
#ifdef HAVE_QATZIP
|
||||
if (qat_enabled)
|
||||
return qat_accel.decompress(p, compressed_size, out);
|
||||
return qat_accel.decompress(p, compressed_size, out, compressor_message);
|
||||
#endif
|
||||
|
||||
int ret;
|
||||
@ -203,7 +205,9 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
|
||||
strm.next_in = Z_NULL;
|
||||
|
||||
// choose the variation of compressor
|
||||
ret = inflateInit2(&strm, ZLIB_DEFAULT_WIN_SIZE);
|
||||
if (!compressor_message)
|
||||
compressor_message = ZLIB_DEFAULT_WIN_SIZE;
|
||||
ret = inflateInit2(&strm, *compressor_message);
|
||||
if (ret != Z_OK) {
|
||||
dout(1) << "Decompression init error: init return "
|
||||
<< ret << " instead of Z_OK" << dendl;
|
||||
@ -240,12 +244,12 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ZlibCompressor::decompress(const bufferlist &in, bufferlist &out)
|
||||
int ZlibCompressor::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message)
|
||||
{
|
||||
#ifdef HAVE_QATZIP
|
||||
if (qat_enabled)
|
||||
return qat_accel.decompress(in, out);
|
||||
return qat_accel.decompress(in, out, compressor_message);
|
||||
#endif
|
||||
auto i = std::cbegin(in);
|
||||
return decompress(i, in.length(), out);
|
||||
return decompress(i, in.length(), out, compressor_message);
|
||||
}
|
||||
|
@ -34,12 +34,12 @@ public:
|
||||
#endif
|
||||
}
|
||||
|
||||
int compress(const ceph::buffer::list &in, ceph::buffer::list &out) override;
|
||||
int decompress(const ceph::buffer::list &in, ceph::buffer::list &out) override;
|
||||
int decompress(ceph::buffer::list::const_iterator &p, size_t compressed_len, ceph::buffer::list &out) override;
|
||||
int compress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> &compressor_message) override;
|
||||
int decompress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> compressor_message) override;
|
||||
int decompress(ceph::buffer::list::const_iterator &p, size_t compressed_len, ceph::buffer::list &out, boost::optional<int32_t> compressor_message) override;
|
||||
private:
|
||||
int zlib_compress(const ceph::buffer::list &in, ceph::buffer::list &out);
|
||||
int isal_compress(const ceph::buffer::list &in, ceph::buffer::list &out);
|
||||
int zlib_compress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> &compressor_message);
|
||||
int isal_compress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> &compressor_message);
|
||||
};
|
||||
|
||||
|
||||
|
@ -26,7 +26,7 @@ class ZstdCompressor : public Compressor {
|
||||
public:
|
||||
ZstdCompressor(CephContext *cct) : Compressor(COMP_ALG_ZSTD, "zstd"), cct(cct) {}
|
||||
|
||||
int compress(const ceph::buffer::list &src, ceph::buffer::list &dst) override {
|
||||
int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> &compressor_message) override {
|
||||
ZSTD_CStream *s = ZSTD_createCStream();
|
||||
ZSTD_initCStream_srcSize(s, cct->_conf->compressor_zstd_level, src.length());
|
||||
auto p = src.begin();
|
||||
@ -61,14 +61,15 @@ class ZstdCompressor : public Compressor {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst) override {
|
||||
int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> compressor_message) override {
|
||||
auto i = std::cbegin(src);
|
||||
return decompress(i, src.length(), dst);
|
||||
return decompress(i, src.length(), dst, compressor_message);
|
||||
}
|
||||
|
||||
int decompress(ceph::buffer::list::const_iterator &p,
|
||||
size_t compressed_len,
|
||||
ceph::buffer::list &dst) override {
|
||||
ceph::buffer::list &dst,
|
||||
boost::optional<int32_t> compressor_message) override {
|
||||
if (compressed_len < 4) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -9990,7 +9990,7 @@ int BlueStore::_decompress(bufferlist& source, bufferlist* result)
|
||||
_set_compression_alert(false, alg_name);
|
||||
r = -EIO;
|
||||
} else {
|
||||
r = cp->decompress(i, chdr.length, *result);
|
||||
r = cp->decompress(i, chdr.length, *result, chdr.compressor_message);
|
||||
if (r < 0) {
|
||||
derr << __func__ << " decompression failed with exit code " << r << dendl;
|
||||
r = -EIO;
|
||||
@ -13672,7 +13672,8 @@ int BlueStore::_do_alloc_write(
|
||||
|
||||
// FIXME: memory alignment here is bad
|
||||
bufferlist t;
|
||||
int r = c->compress(wi.bl, t);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int r = c->compress(wi.bl, t, compressor_message);
|
||||
uint64_t want_len_raw = wi.blob_length * crr;
|
||||
uint64_t want_len = p2roundup(want_len_raw, min_alloc_size);
|
||||
bool rejected = false;
|
||||
@ -13684,6 +13685,7 @@ int BlueStore::_do_alloc_write(
|
||||
bluestore_compression_header_t chdr;
|
||||
chdr.type = c->get_type();
|
||||
chdr.length = t.length();
|
||||
chdr.compressor_message = compressor_message;
|
||||
encode(chdr, wi.compressed_bl);
|
||||
wi.compressed_bl.claim_append(t);
|
||||
|
||||
|
@ -1163,6 +1163,9 @@ void bluestore_compression_header_t::dump(Formatter *f) const
|
||||
{
|
||||
f->dump_unsigned("type", type);
|
||||
f->dump_unsigned("length", length);
|
||||
if (compressor_message) {
|
||||
f->dump_int("compressor_message", *compressor_message);
|
||||
}
|
||||
}
|
||||
|
||||
void bluestore_compression_header_t::generate_test_instances(
|
||||
|
@ -1074,15 +1074,19 @@ WRITE_CLASS_DENC(bluestore_deferred_transaction_t)
|
||||
struct bluestore_compression_header_t {
|
||||
uint8_t type = Compressor::COMP_ALG_NONE;
|
||||
uint32_t length = 0;
|
||||
boost::optional<int32_t> compressor_message;
|
||||
|
||||
bluestore_compression_header_t() {}
|
||||
bluestore_compression_header_t(uint8_t _type)
|
||||
: type(_type) {}
|
||||
|
||||
DENC(bluestore_compression_header_t, v, p) {
|
||||
DENC_START(1, 1, p);
|
||||
DENC_START(2, 1, p);
|
||||
denc(v.type, p);
|
||||
denc(v.length, p);
|
||||
if (struct_v >= 2) {
|
||||
denc(v.compressor_message, p);
|
||||
}
|
||||
DENC_FINISH(p);
|
||||
}
|
||||
void dump(ceph::Formatter *f) const;
|
||||
|
@ -40,7 +40,7 @@ int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset)
|
||||
if ((logical_offset > 0 && compressed) || // if previous part was compressed
|
||||
(logical_offset == 0)) { // or it's the first part
|
||||
ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl;
|
||||
int cr = compressor->compress(in, out);
|
||||
int cr = compressor->compress(in, out, compressor_message);
|
||||
if (cr < 0) {
|
||||
if (logical_offset > 0) {
|
||||
lderr(cct) << "Compression failed with exit code " << cr
|
||||
@ -128,7 +128,7 @@ int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len
|
||||
iter_in_bl.seek(ofs_in_bl);
|
||||
}
|
||||
iter_in_bl.copy(first_block->len, tmp);
|
||||
int cr = compressor->decompress(tmp, out_bl);
|
||||
int cr = compressor->decompress(tmp, out_bl, cs_info->compressor_message);
|
||||
if (cr < 0) {
|
||||
lderr(cct) << "Decompression failed with exit code " << cr << dendl;
|
||||
return cr;
|
||||
|
@ -40,6 +40,7 @@ class RGWPutObj_Compress : public rgw::putobj::Pipe
|
||||
CephContext* cct;
|
||||
bool compressed{false};
|
||||
CompressorRef compressor;
|
||||
boost::optional<int32_t> compressor_message;
|
||||
std::vector<compression_block> blocks;
|
||||
public:
|
||||
RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor,
|
||||
@ -50,6 +51,7 @@ public:
|
||||
|
||||
bool is_compressed() { return compressed; }
|
||||
vector<compression_block>& get_compression_blocks() { return blocks; }
|
||||
boost::optional<int32_t> get_compressor_message() { return compressor_message; }
|
||||
|
||||
}; /* RGWPutObj_Compress */
|
||||
|
||||
|
@ -44,25 +44,31 @@ WRITE_CLASS_ENCODER(compression_block)
|
||||
struct RGWCompressionInfo {
|
||||
string compression_type;
|
||||
uint64_t orig_size;
|
||||
boost::optional<int32_t> compressor_message;
|
||||
vector<compression_block> blocks;
|
||||
|
||||
RGWCompressionInfo() : compression_type("none"), orig_size(0) {}
|
||||
RGWCompressionInfo(const RGWCompressionInfo& cs_info) : compression_type(cs_info.compression_type),
|
||||
orig_size(cs_info.orig_size),
|
||||
compressor_message(cs_info.compressor_message),
|
||||
blocks(cs_info.blocks) {}
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
ENCODE_START(2, 1, bl);
|
||||
encode(compression_type, bl);
|
||||
encode(orig_size, bl);
|
||||
encode(compressor_message, bl);
|
||||
encode(blocks, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void decode(bufferlist::const_iterator& bl) {
|
||||
DECODE_START(1, bl);
|
||||
DECODE_START(2, bl);
|
||||
decode(compression_type, bl);
|
||||
decode(orig_size, bl);
|
||||
if (struct_v >= 2) {
|
||||
decode(compressor_message, bl);
|
||||
}
|
||||
decode(blocks, bl);
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
|
@ -2051,6 +2051,9 @@ void RGWCompressionInfo::dump(Formatter *f) const
|
||||
{
|
||||
f->dump_string("compression_type", compression_type);
|
||||
f->dump_unsigned("orig_size", orig_size);
|
||||
if (compressor_message) {
|
||||
f->dump_int("compressor_message", *compressor_message);
|
||||
}
|
||||
::encode_json("blocks", blocks, f);
|
||||
}
|
||||
|
||||
|
@ -4089,6 +4089,7 @@ void RGWPutObj::execute()
|
||||
RGWCompressionInfo cs_info;
|
||||
cs_info.compression_type = plugin->get_type_name();
|
||||
cs_info.orig_size = s->obj_size;
|
||||
cs_info.compressor_message = compressor->get_compressor_message();
|
||||
cs_info.blocks = move(compressor->get_compression_blocks());
|
||||
encode(cs_info, tmp);
|
||||
attrs[RGW_ATTR_COMPRESSION] = tmp;
|
||||
@ -4394,6 +4395,7 @@ void RGWPostObj::execute()
|
||||
RGWCompressionInfo cs_info;
|
||||
cs_info.compression_type = plugin->get_type_name();
|
||||
cs_info.orig_size = s->obj_size;
|
||||
cs_info.compressor_message = compressor->get_compressor_message();
|
||||
cs_info.blocks = move(compressor->get_compression_blocks());
|
||||
encode(cs_info, tmp);
|
||||
emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
|
||||
@ -7294,6 +7296,7 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
|
||||
RGWCompressionInfo cs_info;
|
||||
cs_info.compression_type = plugin->get_type_name();
|
||||
cs_info.orig_size = s->obj_size;
|
||||
cs_info.compressor_message = compressor->get_compressor_message();
|
||||
cs_info.blocks = std::move(compressor->get_compression_blocks());
|
||||
encode(cs_info, tmp);
|
||||
attrs.emplace(RGW_ATTR_COMPRESSION, std::move(tmp));
|
||||
|
@ -3848,6 +3848,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
|
||||
RGWCompressionInfo cs_info;
|
||||
cs_info.compression_type = plugin->get_type_name();
|
||||
cs_info.orig_size = cb.get_data_len();
|
||||
cs_info.compressor_message = compressor->get_compressor_message();
|
||||
cs_info.blocks = move(compressor->get_compression_blocks());
|
||||
encode(cs_info, tmp);
|
||||
cb.get_attrs()[RGW_ATTR_COMPRESSION] = tmp;
|
||||
|
@ -32,18 +32,18 @@ public:
|
||||
CompressorExample() : Compressor(COMP_ALG_NONE, "example") {}
|
||||
~CompressorExample() override {}
|
||||
|
||||
int compress(const bufferlist &in, bufferlist &out) override
|
||||
int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) override
|
||||
{
|
||||
out = in;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int decompress(const bufferlist &in, bufferlist &out) override
|
||||
int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) override
|
||||
{
|
||||
out = in;
|
||||
return 0;
|
||||
}
|
||||
int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out) override
|
||||
int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out, boost::optional<int32_t> compressor_message) override
|
||||
{
|
||||
p.copy(std::min<size_t>(p.get_remaining(), compressed_len), out);
|
||||
return 0;
|
||||
|
@ -76,10 +76,11 @@ TEST_P(CompressorTest, small_round_trip)
|
||||
bufferlist orig;
|
||||
orig.append("This is a short string. There are many strings like it but this one is mine.");
|
||||
bufferlist compressed;
|
||||
int r = compressor->compress(orig, compressed);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int r = compressor->compress(orig, compressed, compressor_message);
|
||||
ASSERT_EQ(0, r);
|
||||
bufferlist decompressed;
|
||||
r = compressor->decompress(compressed, decompressed);
|
||||
r = compressor->decompress(compressed, decompressed, compressor_message);
|
||||
ASSERT_EQ(0, r);
|
||||
ASSERT_EQ(decompressed.length(), orig.length());
|
||||
ASSERT_TRUE(decompressed.contents_equal(orig));
|
||||
@ -95,10 +96,11 @@ TEST_P(CompressorTest, big_round_trip_repeated)
|
||||
orig.append("This is a short string. There are many strings like it but this one is mine.");
|
||||
}
|
||||
bufferlist compressed;
|
||||
int r = compressor->compress(orig, compressed);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int r = compressor->compress(orig, compressed, compressor_message);
|
||||
ASSERT_EQ(0, r);
|
||||
bufferlist decompressed;
|
||||
r = compressor->decompress(compressed, decompressed);
|
||||
r = compressor->decompress(compressed, decompressed, compressor_message);
|
||||
ASSERT_EQ(0, r);
|
||||
ASSERT_EQ(decompressed.length(), orig.length());
|
||||
ASSERT_TRUE(decompressed.contents_equal(orig));
|
||||
@ -124,10 +126,11 @@ TEST_P(CompressorTest, big_round_trip_randomish)
|
||||
orig.append(bp);
|
||||
}
|
||||
bufferlist compressed;
|
||||
int r = compressor->compress(orig, compressed);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int r = compressor->compress(orig, compressed, compressor_message);
|
||||
ASSERT_EQ(0, r);
|
||||
bufferlist decompressed;
|
||||
r = compressor->decompress(compressed, decompressed);
|
||||
r = compressor->decompress(compressed, decompressed, compressor_message);
|
||||
ASSERT_EQ(0, r);
|
||||
ASSERT_EQ(decompressed.length(), orig.length());
|
||||
ASSERT_TRUE(decompressed.contents_equal(orig));
|
||||
@ -178,10 +181,11 @@ TEST_P(CompressorTest, round_trip_osdmap)
|
||||
chunk.substr_of(fbl, j*size, l);
|
||||
//fbl.rebuild();
|
||||
bufferlist compressed;
|
||||
int r = compressor->compress(chunk, compressed);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int r = compressor->compress(chunk, compressed, compressor_message);
|
||||
ASSERT_EQ(0, r);
|
||||
bufferlist decompressed;
|
||||
r = compressor->decompress(compressed, decompressed);
|
||||
r = compressor->decompress(compressed, decompressed, compressor_message);
|
||||
ASSERT_EQ(0, r);
|
||||
ASSERT_EQ(decompressed.length(), chunk.length());
|
||||
if (!decompressed.contents_equal(chunk)) {
|
||||
@ -205,9 +209,10 @@ TEST_P(CompressorTest, compress_decompress)
|
||||
bufferlist after;
|
||||
bufferlist exp;
|
||||
in.append(test, len);
|
||||
res = compressor->compress(in, out);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
res = compressor->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
res = compressor->decompress(out, after);
|
||||
res = compressor->decompress(out, after, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
exp.append(test);
|
||||
EXPECT_TRUE(exp.contents_equal(after));
|
||||
@ -215,7 +220,7 @@ TEST_P(CompressorTest, compress_decompress)
|
||||
size_t compressed_len = out.length();
|
||||
out.append_zero(12);
|
||||
auto it = out.cbegin();
|
||||
res = compressor->decompress(it, compressed_len, after);
|
||||
res = compressor->decompress(it, compressed_len, after, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
EXPECT_TRUE(exp.contents_equal(after));
|
||||
|
||||
@ -228,7 +233,7 @@ TEST_P(CompressorTest, compress_decompress)
|
||||
out.clear();
|
||||
in.append(data);
|
||||
exp = in;
|
||||
res = compressor->compress(in, out);
|
||||
res = compressor->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
compressed_len = out.length();
|
||||
out.append_zero(0x10000 - out.length());
|
||||
@ -241,7 +246,7 @@ TEST_P(CompressorTest, compress_decompress)
|
||||
out.swap(prefix);
|
||||
it = out.cbegin();
|
||||
it += prefix_len;
|
||||
res = compressor->decompress(it, compressed_len, after);
|
||||
res = compressor->decompress(it, compressed_len, after, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
EXPECT_TRUE(exp.contents_equal(after));
|
||||
}
|
||||
@ -254,7 +259,8 @@ TEST_P(CompressorTest, sharded_input_decompress)
|
||||
int len = test.size();
|
||||
bufferlist in, out;
|
||||
in.append(test.c_str(), len);
|
||||
int res = compressor->compress(in, out);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int res = compressor->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
EXPECT_GT(out.length(), small_prefix_size);
|
||||
|
||||
@ -272,7 +278,7 @@ TEST_P(CompressorTest, sharded_input_decompress)
|
||||
}
|
||||
|
||||
bufferlist after;
|
||||
res = compressor->decompress(out2, after);
|
||||
res = compressor->decompress(out2, after, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
}
|
||||
|
||||
@ -286,7 +292,8 @@ void test_compress(CompressorRef compressor, size_t size)
|
||||
in.append(data, size);
|
||||
for (size_t t = 0; t < 10000; t++) {
|
||||
bufferlist out;
|
||||
int res = compressor->compress(in, out);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int res = compressor->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
}
|
||||
free(data);
|
||||
@ -300,11 +307,12 @@ void test_decompress(CompressorRef compressor, size_t size)
|
||||
}
|
||||
bufferlist in, out;
|
||||
in.append(data, size);
|
||||
int res = compressor->compress(in, out);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int res = compressor->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
for (size_t t = 0; t < 10000; t++) {
|
||||
bufferlist out_dec;
|
||||
int res = compressor->decompress(out, out_dec);
|
||||
int res = compressor->decompress(out, out_dec, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
}
|
||||
free(data);
|
||||
@ -401,10 +409,11 @@ TEST(ZlibCompressor, zlib_isal_compatibility)
|
||||
bufferlist in, out;
|
||||
in.append(test, len);
|
||||
// isal -> zlib
|
||||
int res = isal->compress(in, out);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int res = isal->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
bufferlist after;
|
||||
res = zlib->decompress(out, after);
|
||||
res = zlib->decompress(out, after, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
bufferlist exp;
|
||||
exp.append(static_cast<char*>(test));
|
||||
@ -413,9 +422,9 @@ TEST(ZlibCompressor, zlib_isal_compatibility)
|
||||
out.clear();
|
||||
exp.clear();
|
||||
// zlib -> isal
|
||||
res = zlib->compress(in, out);
|
||||
res = zlib->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
res = isal->decompress(out, after);
|
||||
res = isal->decompress(out, after, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
exp.append(static_cast<char*>(test));
|
||||
EXPECT_TRUE(exp.contents_equal(after));
|
||||
@ -469,10 +478,11 @@ TEST(ZlibCompressor, isal_compress_zlib_decompress_random)
|
||||
bufferlist in, out;
|
||||
in.append(test, size);
|
||||
|
||||
int res = isal->compress(in, out);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int res = isal->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
bufferlist after;
|
||||
res = zlib->decompress(out, after);
|
||||
res = zlib->decompress(out, after, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
bufferlist exp;
|
||||
exp.append(test, size);
|
||||
@ -508,10 +518,11 @@ TEST(ZlibCompressor, isal_compress_zlib_decompress_walk)
|
||||
bufferlist in, out;
|
||||
in.append(test, size);
|
||||
|
||||
int res = isal->compress(in, out);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int res = isal->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
bufferlist after;
|
||||
res = zlib->decompress(out, after);
|
||||
res = zlib->decompress(out, after, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
bufferlist exp;
|
||||
exp.append(test, size);
|
||||
@ -546,10 +557,11 @@ TEST(QAT, enc_qat_dec_noqat) {
|
||||
bufferlist in, out;
|
||||
in.append(test, size);
|
||||
|
||||
int res = q->compress(in, out);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int res = q->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
bufferlist after;
|
||||
res = noq->decompress(out, after);
|
||||
res = noq->decompress(out, after, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
bufferlist exp;
|
||||
exp.append(test, size);
|
||||
@ -582,10 +594,11 @@ TEST(QAT, enc_noqat_dec_qat) {
|
||||
bufferlist in, out;
|
||||
in.append(test, size);
|
||||
|
||||
int res = noq->compress(in, out);
|
||||
boost::optional<int32_t> compressor_message;
|
||||
int res = noq->compress(in, out, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
bufferlist after;
|
||||
res = q->decompress(out, after);
|
||||
res = q->decompress(out, after, compressor_message);
|
||||
EXPECT_EQ(res, 0);
|
||||
bufferlist exp;
|
||||
exp.append(test, size);
|
||||
|
@ -129,6 +129,7 @@ TEST(Compress, LimitedChunkSize)
|
||||
RGWCompressionInfo cs_info;
|
||||
cs_info.compression_type = plugin->get_type_name();
|
||||
cs_info.orig_size = s;
|
||||
cs_info.compressor_message = compressor.get_compressor_message();
|
||||
cs_info.blocks = move(compressor.get_compression_blocks());
|
||||
|
||||
ut_get_sink_size d_sink;
|
||||
@ -167,6 +168,7 @@ TEST(Compress, BillionZeros)
|
||||
RGWCompressionInfo cs_info;
|
||||
cs_info.compression_type = plugin->get_type_name();
|
||||
cs_info.orig_size = size*1000;
|
||||
cs_info.compressor_message = compressor.get_compressor_message();
|
||||
cs_info.blocks = move(compressor.get_compression_blocks());
|
||||
|
||||
ut_get_sink d_sink;
|
||||
|
Loading…
Reference in New Issue
Block a user