Compressor: add Zlib compression plugin

Signed-off-by: Kiseleva Alyona <akiselyova@mirantis.com>
This commit is contained in:
Ved-vampir 2015-11-10 18:20:23 +03:00
parent 64fbda8371
commit 7bde5c43cf
8 changed files with 297 additions and 2 deletions

View File

@ -583,6 +583,7 @@ OPTION(osd_pool_default_min_size, OPT_INT, 0) // 0 means no specific default; c
OPTION(osd_pool_default_pg_num, OPT_INT, 8) // number of PGs for new pools. Configure in global or mon section of ceph.conf
OPTION(osd_pool_default_pgp_num, OPT_INT, 8) // number of PGs for placement purposes. Should be equal to pg_num
OPTION(osd_compression_plugins, OPT_STR,
"zlib"
" snappy"
) // list of compression plugins
OPTION(osd_pool_default_erasure_code_profile,

View File

@ -3,8 +3,10 @@
set(compressorlibdir ${LIBRARY_OUTPUT_PATH}/compressor)
add_subdirectory(snappy)
add_subdirectory(zlib)
add_library(compressor_objs OBJECT Compressor.cc)
add_custom_target(compressor_plugins DEPENDS
ceph_snappy)
ceph_snappy
ceph_zlib)

View File

@ -1,6 +1,7 @@
compressorlibdir = $(pkglibdir)/compressor
compressorlib_LTLIBRARIES =
include compressor/zlib/Makefile.am
include compressor/snappy/Makefile.am
libcompressor_la_SOURCES = \

View File

@ -0,0 +1,14 @@
# zlib
set(zlib_sources
CompressionPluginZlib.cc
CompressionZlib.cc
$<TARGET_OBJECTS:compressor_objs>
)
add_library(ceph_zlib SHARED ${zlib_sources})
add_dependencies(ceph_zlib ${CMAKE_SOURCE_DIR}/src/ceph_ver.h)
target_link_libraries(ceph_zlib ${EXTRALIBS})
set_target_properties(ceph_zlib PROPERTIES VERSION 2.14.0 SOVERSION 2)
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -lz")
install(TARGETS ceph_zlib DESTINATION lib/compressor)

View File

@ -0,0 +1,59 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Mirantis, Inc.
*
* Author: Alyona Kiseleva <akiselyova@mirantis.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
*/
// -----------------------------------------------------------------------------
#include "ceph_ver.h"
#include "compressor/CompressionPlugin.h"
#include "CompressionZlib.h"
#include "common/debug.h"
#define dout_subsys ceph_subsys_mon
// -----------------------------------------------------------------------------
class CompressionPluginZlib : public CompressionPlugin {
public:
CompressionPluginZlib(CephContext *cct) : CompressionPlugin(cct)
{}
virtual int factory(CompressorRef *cs,
ostream *ss)
{
if (compressor == 0) {
CompressionZlib *interface = new CompressionZlib();
compressor = CompressorRef(interface);
}
*cs = compressor;
return 0;
}
};
// -----------------------------------------------------------------------------
const char *__ceph_plugin_version()
{
return CEPH_GIT_NICE_VER;
}
// -----------------------------------------------------------------------------
int __ceph_plugin_init(CephContext *cct,
const std::string& type,
const std::string& name)
{
PluginRegistry *instance = cct->get_plugin_registry();
return instance->add(type, name, new CompressionPluginZlib(cct));
}

View File

@ -0,0 +1,152 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Mirantis, Inc.
*
* Author: Alyona Kiseleva <akiselyova@mirantis.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
*/
// -----------------------------------------------------------------------------
#include "common/debug.h"
#include "CompressionZlib.h"
#include "osd/osd_types.h"
// -----------------------------------------------------------------------------
#include <zlib.h>
// -----------------------------------------------------------------------------
#define dout_subsys ceph_subsys_compressor
#undef dout_prefix
#define dout_prefix _prefix(_dout)
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
static ostream&
_prefix(std::ostream* _dout)
{
return *_dout << "CompressionZlib: ";
}
// -----------------------------------------------------------------------------
const long unsigned int max_len = 2048;
const char* CompressionZlib::get_method_name()
{
return "zlib";
}
int CompressionZlib::compress(const bufferlist &in, bufferlist &out)
{
int ret, flush;
unsigned have;
z_stream strm;
unsigned char* c_in;
int level = 5;
/* allocate deflate state */
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
ret = deflateInit(&strm, level);
if (ret != Z_OK) {
dout(1) << "Compression init error: init return "
<< ret << " instead of Z_OK" << dendl;
return -1;
}
unsigned char c_out [max_len];
for (std::list<buffer::ptr>::const_iterator i = in.buffers().begin();
i != in.buffers().end();) {
c_in = (unsigned char*) (*i).c_str();
long unsigned int len = (*i).length();
++i;
strm.avail_in = len;
flush = i != in.buffers().end() ? Z_NO_FLUSH : Z_FINISH;
strm.next_in = c_in;
do {
strm.avail_out = max_len;
strm.next_out = c_out;
ret = deflate(&strm, flush); /* no bad return value */
if (ret == Z_STREAM_ERROR) {
dout(1) << "Compression error: compress return Z_STREAM_ERROR("
<< ret << ")" << dendl;
deflateEnd(&strm);
return -1;
}
have = max_len - strm.avail_out;
out.append((char*)c_out, have);
} while (strm.avail_out == 0);
if (strm.avail_in != 0) {
dout(10) << "Compression error: unused input" << dendl;
deflateEnd(&strm);
return -1;
}
}
deflateEnd(&strm);
return 0;
}
int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)
{
int ret;
unsigned have;
z_stream strm;
unsigned char* c_in;
/* allocate inflate state */
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = 0;
strm.next_in = Z_NULL;
ret = inflateInit(&strm);
if (ret != Z_OK) {
dout(1) << "Decompression init error: init return "
<< ret << " instead of Z_OK" << dendl;
return -1;
}
unsigned char c_out[max_len];
for (std::list<buffer::ptr>::const_iterator i = in.buffers().begin();
i != in.buffers().end(); ++i) {
c_in = (unsigned char*) (*i).c_str();
long unsigned int len = (*i).length();
strm.avail_in = len;
strm.next_in = c_in;
do {
strm.avail_out = max_len;
strm.next_out = c_out;
ret = inflate(&strm, Z_NO_FLUSH);
if (ret != Z_OK && ret != Z_STREAM_END) {
dout(1) << "Decompression error: decompress return "
<< ret << dendl;
inflateEnd(&strm);
return -1;
}
have = max_len - strm.avail_out;
out.append((char*)c_out, have);
} while (strm.avail_out == 0);
}
/* clean up and return */
(void)inflateEnd(&strm);
return 0;
}

View File

@ -0,0 +1,45 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Mirantis, Inc.
*
* Author: Alyona Kiseleva <akiselyova@mirantis.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
*/
#ifndef CEPH_COMPRESSION_ZLIB_H
#define CEPH_COMPRESSION_ZLIB_H
// -----------------------------------------------------------------------------
#include "compressor/Compressor.h"
// -----------------------------------------------------------------------------
#include <list>
// -----------------------------------------------------------------------------
class CompressionZlib : public Compressor {
const char version = '1';
public:
CompressionZlib()
{
}
virtual
~CompressionZlib()
{
}
virtual int compress(const bufferlist &in, bufferlist &out);
virtual int decompress(const bufferlist &in, bufferlist &out);
virtual const char* get_method_name();
};
#endif

View File

@ -0,0 +1,21 @@
# zlib plugin
noinst_HEADERS += \
compressor/zlib/CompressionZlib.h
zlib_sources = \
compressor/Compressor.cc \
compressor/zlib/CompressionPluginZlib.cc \
compressor/zlib/CompressionZlib.cc
compressor/zlib/CompressionPluginZlib.cc: ./ceph_ver.h
libceph_zlib_la_SOURCES = ${zlib_sources}
libceph_zlib_la_CFLAGS = ${AM_CFLAGS}
libceph_zlib_la_CXXFLAGS= ${AM_CXXFLAGS}
libceph_zlib_la_LIBADD = $(LIBCRUSH) $(PTHREAD_LIBS) $(EXTRALIBS)
libceph_zlib_la_LDFLAGS = ${AM_LDFLAGS} -lz -version-info 2:0:0
if LINUX
libceph_zlib_la_LDFLAGS += -export-symbols-regex '.*__compressor_.*'
endif
compressorlib_LTLIBRARIES += libceph_zlib.la