compressor: Refactor to allow bufferlist::iterator as an input

Signed-off-by: Igor Fedotov <ifedotov@mirantis.com>
This commit is contained in:
Igor Fedotov 2016-05-13 18:16:47 +03:00 committed by Sage Weil
parent 9cfb096300
commit a305d6b5a3
8 changed files with 45 additions and 30 deletions

View File

@ -27,6 +27,7 @@ class Compressor {
virtual ~Compressor() {}
virtual int compress(const bufferlist &in, bufferlist &out) = 0;
virtual int decompress(const bufferlist &in, bufferlist &out) = 0;
virtual int decompress(bufferlist::iterator &p, bufferlist &out) = 0; //that's a bit weird but we need non-const iterator to be in alignment with decode methods
static CompressorRef create(CephContext *cct, const string &type);
};

View File

@ -3,6 +3,7 @@ noinst_HEADERS += \
compressor/snappy/SnappyCompressor.h
snappy_sources = \
common/buffer.cc \
compressor/Compressor.cc \
compressor/snappy/CompressionPluginSnappy.cc

View File

@ -20,41 +20,37 @@
#include "include/buffer.h"
#include "compressor/Compressor.h"
class BufferlistSource : public snappy::Source {
list<bufferptr>::const_iterator pb;
size_t pb_off;
size_t left;
class CEPH_BUFFER_API BufferlistSource : public snappy::Source {
bufferlist::iterator pb;
public:
explicit BufferlistSource(const bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {}
explicit BufferlistSource(bufferlist::iterator _pb): pb(_pb) {}
virtual ~BufferlistSource() {}
virtual size_t Available() const { return left; }
virtual size_t Available() const { return pb.get_remaining(); }
virtual const char* Peek(size_t* len) {
if (left) {
*len = pb->length() - pb_off;
return pb->c_str() + pb_off;
} else {
*len = 0;
return NULL;
const char* data = NULL;
*len = 0;
if(pb.get_remaining()) {
auto ptmp = pb;
*len = ptmp.get_ptr_and_advance(pb.get_remaining(), &data);
}
return data;
}
virtual void Skip(size_t n) {
if (n + pb_off == pb->length()) {
++pb;
pb_off = 0;
} else {
pb_off += n;
}
left -= n;
pb.advance(n);
}
bufferlist::iterator get_pos() const { return pb; }
};
class SnappyCompressor : public Compressor {
public:
virtual ~SnappyCompressor() {}
virtual const char* get_method_name() { return "snappy"; }
virtual int compress(const bufferlist &src, bufferlist &dst) {
BufferlistSource source(src);
BufferlistSource source(const_cast<bufferlist&>(src).begin());
bufferptr ptr(snappy::MaxCompressedLength(src.length()));
snappy::UncheckedByteArraySink sink(ptr.c_str());
snappy::Compress(&source, &sink);
@ -62,15 +58,21 @@ class SnappyCompressor : public Compressor {
return 0;
}
virtual int decompress(const bufferlist &src, bufferlist &dst) {
bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
return decompress(i, dst);
}
virtual int decompress(bufferlist::iterator &p, bufferlist &dst) {
size_t res_len = 0;
// Trick, decompress only need first 32bits buffer
bufferlist::const_iterator ptmp = p;
bufferlist tmp;
tmp.substr_of( src, 0, 4 );
ptmp.copy(4, tmp);
if (!snappy::GetUncompressedLength(tmp.c_str(), tmp.length(), &res_len))
return -1;
BufferlistSource source(src);
BufferlistSource source(p);
bufferptr ptr(res_len);
if (snappy::RawUncompress(&source, ptr.c_str())) {
p = source.get_pos();
dst.append(ptr);
return 0;
}

View File

@ -99,12 +99,12 @@ int CompressionZlib::compress(const bufferlist &in, bufferlist &out)
return 0;
}
int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)
int CompressionZlib::decompress(bufferlist::iterator &p, bufferlist &out)
{
int ret;
unsigned have;
z_stream strm;
unsigned char* c_in;
const char* c_in;
/* allocate inflate state */
strm.zalloc = Z_NULL;
@ -121,14 +121,12 @@ int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)
unsigned char c_out[max_len];
for (std::list<buffer::ptr>::const_iterator i = in.buffers().begin();
i != in.buffers().end(); ++i) {
while(!p.end()) {
c_in = (unsigned char*) (*i).c_str();
long unsigned int len = (*i).length();
long unsigned int len = p.get_ptr_and_advance(p.get_remaining(), &c_in);
strm.avail_in = len;
strm.next_in = c_in;
strm.next_in = (unsigned char*)c_in;
do {
strm.avail_out = max_len;
@ -150,3 +148,9 @@ int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)
(void)inflateEnd(&strm);
return 0;
}
int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)
{
bufferlist::iterator i = const_cast<bufferlist&>(in).begin();
return decompress(i, out);
}

View File

@ -37,6 +37,7 @@ public:
virtual int compress(const bufferlist &in, bufferlist &out);
virtual int decompress(const bufferlist &in, bufferlist &out);
virtual int decompress(bufferlist::iterator &p, bufferlist &out);
virtual const char* get_method_name();
};

View File

@ -3,6 +3,7 @@ noinst_HEADERS += \
compressor/zlib/CompressionZlib.h
zlib_sources = \
common/buffer.cc \
compressor/Compressor.cc \
compressor/zlib/CompressionPluginZlib.cc \
compressor/zlib/CompressionZlib.cc

View File

@ -3,6 +3,7 @@ if WITH_OSD
libceph_example_la_SOURCES = \
common/buffer.cc \
compressor/Compressor.cc \
test/compressor/compressor_plugin_example.cc
noinst_HEADERS += test/compressor/compressor_example.h

View File

@ -42,7 +42,11 @@ public:
out = in;
return 0;
}
virtual int decompress(bufferlist::iterator &p, bufferlist &out)
{
p.copy(p.get_remaining(), out);
return 0;
}
virtual const char* get_method_name()
{
return "example";