compressor: add QAT support

This patch adds new QATzip plugin to support QAT for compression.

QATZip is a user space library which builds on top of the Intel
QAT (QuickAssist Technology) user space library, to provide extended
accelerated compression and decompression services by offloading the
actual compression and decompression request(s) to the hardware
QAT accelerators, which are more efficient in terms of cost and power
than general purpose CPUs for those specific compute-intensive
workloads.

Based on QAT accelerators, QATZip can support several compression
algorithm, including deflate, snappy, lz4, etc..

Signed-off-by: Qiaowei Ren <qiaowei.ren@intel.com>
This commit is contained in:
Qiaowei Ren 2017-12-29 12:50:20 +08:00
parent c643ffd703
commit 9f3965aef3
18 changed files with 367 additions and 7 deletions

View File

@ -258,6 +258,12 @@ endif()
option(WITH_BLUEFS "libbluefs library" OFF)
option(WITH_QATZIP "Enable QATZIP" OFF)
if(WITH_QATZIP)
find_package(qatzip REQUIRED)
set(HAVE_QATZIP ${QATZIP_FOUND})
endif(WITH_QATZIP)
# needs mds and? XXX
option(WITH_LIBCEPHFS "libcephfs client library" ON)

View File

@ -0,0 +1,17 @@
# - Find Qatzip
# Find the qatzip compression library and includes
#
# QATZIP_INCLUDE_DIR - where to find qatzip.h, etc.
# QATZIP_LIBRARIES - List of libraries when using qatzip.
# QATZIP_FOUND - True if qatzip found.
find_path(QATZIP_INCLUDE_DIR NAMES qatzip.h)
find_library(QATZIP_LIBRARIES NAMES qatzip)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(qatzip DEFAULT_MSG QATZIP_LIBRARIES QATZIP_INCLUDE_DIR)
mark_as_advanced(
QATZIP_LIBRARIES
QATZIP_INCLUDE_DIR)

View File

@ -648,6 +648,9 @@ endif()
if(NOT WITH_SYSTEM_BOOST)
list(APPEND ceph_common_deps ${ZLIB_LIBRARIES})
endif()
if(HAVE_QATZIP)
list(APPEND ceph_common_deps ${QATZIP_LIBRARIES})
endif()
set_source_files_properties(${CMAKE_SOURCE_DIR}/src/ceph_ver.c
${CMAKE_SOURCE_DIR}/src/common/version.cc

View File

@ -94,6 +94,8 @@ OPTION(xio_max_send_inline, OPT_INT) // xio maximum threshold to send inline
OPTION(compressor_zlib_isal, OPT_BOOL)
OPTION(compressor_zlib_level, OPT_INT) //regular zlib compression level, not applicable to isa-l optimized version
OPTION(qat_compressor_enabled, OPT_BOOL)
OPTION(plugin_crypto_accelerator, OPT_STR)
OPTION(mempool_debug, OPT_BOOL)

View File

@ -776,6 +776,10 @@ std::vector<Option> get_global_options() {
.set_default(5)
.set_description(""),
Option("qat_compressor_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(false)
.set_description("enable qat acceleration support for compression"),
Option("plugin_crypto_accelerator", Option::TYPE_STR, Option::LEVEL_ADVANCED)
.set_default("crypto_isal")
.set_description(""),

View File

@ -1,6 +1,9 @@
set(compressor_srcs
Compressor.cc)
if (HAVE_QATZIP)
list(APPEND compressor_srcs QatAccel.cc)
endif()
add_library(compressor_objs OBJECT ${compressor_srcs})
## compressor plugins
@ -38,6 +41,9 @@ add_custom_target(compressor_plugins DEPENDS
if(WITH_EMBEDDED)
include(MergeStaticLibraries)
add_library(cephd_compressor_base STATIC ${compressor_srcs})
if(HAVE_QATZIP)
target_link_libraries(cephd_compressor_base ${QATZIP_LIBRARIES})
endif()
set_target_properties(cephd_compressor_base PROPERTIES COMPILE_DEFINITIONS BUILDING_FOR_EMBEDDED)
set(cephd_compressor_libs
cephd_compressor_base

View File

@ -23,6 +23,9 @@
#include "include/assert.h" // boost clobbers this
#include "include/buffer.h"
#include "include/int_types.h"
#ifdef HAVE_QATZIP
#include "QatAccel.h"
#endif
class Compressor;
typedef std::shared_ptr<Compressor> CompressorRef;
@ -65,6 +68,11 @@ public:
COMP_FORCE ///< compress always
};
#ifdef HAVE_QATZIP
bool qat_enabled;
QatAccel qat_accel;
#endif
static std::string get_comp_alg_name(int a);
static boost::optional<CompressionAlgorithm> get_comp_alg_type(const std::string &s);

141
src/compressor/QatAccel.cc Normal file
View File

@ -0,0 +1,141 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Intel Corporation
*
* Author: Qiaowei Ren <qiaowei.ren@intel.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "QatAccel.h"
/* Estimate data expansion after decompression */
static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200};
QatAccel::~QatAccel() {
if (NULL != session.internal) {
qzTeardownSession(&session);
qzClose(&session);
}
}
bool QatAccel::init(const std::string &alg) {
QzSessionParams_T params = {(QzHuffmanHdr_T)0,};
int rc;
rc = qzGetDefaults(&params);
if (rc != QZ_OK)
return false;
params.direction = QZ_DIR_BOTH;
if (alg == "snappy")
params.comp_algorithm = QZ_SNAPPY;
else if (alg == "zlib")
params.comp_algorithm = QZ_DEFLATE;
else if (alg == "lz4")
params.comp_algorithm = QZ_LZ4;
else
return false;
rc = qzSetDefaults(&params);
if (rc != QZ_OK)
return false;
rc = qzInit(&session, QZ_SW_BACKUP_DEFAULT);
if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW)
return false;
rc = qzSetupSession(&session, &params);
if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW ) {
qzTeardownSession(&session);
qzClose(&session);
return false;
}
return true;
}
int QatAccel::compress(const bufferlist &in, bufferlist &out) {
for (auto &i : in.buffers()) {
const unsigned char* c_in = (unsigned char*) i.c_str();
unsigned int len = i.length();
unsigned int out_len = qzMaxCompressedLength(len);
bufferptr ptr = buffer::create_page_aligned(out_len);
int rc = qzCompress(&session, c_in, &len, (unsigned char *)ptr.c_str(), &out_len, 1);
if (rc != QZ_OK)
return -1;
out.append(ptr, 0, out_len);
}
return 0;
}
int QatAccel::decompress(const bufferlist &in, bufferlist &out) {
bufferlist::iterator i = const_cast<bufferlist&>(in).begin();
return decompress(i, in.length(), out);
}
int QatAccel::decompress(bufferlist::iterator &p,
size_t compressed_len,
bufferlist &dst) {
unsigned int ratio_idx = 0;
bool read_more = false;
bool joint = false;
int rc = 0;
bufferlist tmp;
size_t remaining = MIN(p.get_remaining(), compressed_len);
while (remaining) {
if (p.end()) {
return -1;
}
bufferptr cur_ptr = p.get_current_ptr();
unsigned int len = cur_ptr.length();
if (joint) {
if (read_more)
tmp.append(cur_ptr.c_str(), len);
len = tmp.length();
}
unsigned int out_len = len * expansion_ratio[ratio_idx];
bufferptr ptr = buffer::create_page_aligned(out_len);
if (joint)
rc = qzDecompress(&session, (const unsigned char*)tmp.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
else
rc = qzDecompress(&session, (const unsigned char*)cur_ptr.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
if (rc == QZ_DATA_ERROR) {
if (!joint) {
tmp.append(cur_ptr.c_str(), cur_ptr.length());
p.advance(remaining);
joint = true;
}
read_more = true;
continue;
} else if (rc == QZ_BUF_ERROR) {
if (ratio_idx == std::size(expansion_ratio))
return -1;
if (joint)
read_more = false;
ratio_idx++;
continue;
} else if (rc != QZ_OK) {
return -1;
} else {
ratio_idx = 0;
joint = false;
read_more = false;
}
p.advance(remaining);
remaining -= len;
dst.append(ptr, 0, out_len);
}
return 0;
}

35
src/compressor/QatAccel.h Normal file
View File

@ -0,0 +1,35 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Intel Corporation
*
* Author: Qiaowei Ren <qiaowei.ren@intel.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_QATACCEL_H
#define CEPH_QATACCEL_H
#include <qatzip.h>
#include "include/buffer.h"
class QatAccel {
QzSession_T session;
public:
QatAccel() : session({0}) {}
~QatAccel();
bool init(const std::string &alg);
int compress(const bufferlist &in, bufferlist &out);
int decompress(const bufferlist &in, bufferlist &out);
int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &dst);
};
#endif

View File

@ -30,7 +30,7 @@ public:
int factory(CompressorRef *cs, std::ostream *ss) override {
if (compressor == 0) {
LZ4Compressor *interface = new LZ4Compressor();
LZ4Compressor *interface = new LZ4Compressor(cct);
compressor = CompressorRef(interface);
}
*cs = compressor;

View File

@ -20,14 +20,26 @@
#include "compressor/Compressor.h"
#include "include/buffer.h"
#include "include/encoding.h"
#include "common/config.h"
#include "common/Tub.h"
class LZ4Compressor : public Compressor {
public:
LZ4Compressor() : Compressor(COMP_ALG_LZ4, "lz4") {}
LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") {
#ifdef HAVE_QATZIP
if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4"))
qat_enabled = true;
else
qat_enabled = false;
#endif
}
int compress(const bufferlist &src, bufferlist &dst) override {
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.compress(src, dst);
#endif
bufferptr outptr = buffer::create_page_aligned(
LZ4_compressBound(src.length()));
LZ4_stream_t lz4_stream;
@ -60,6 +72,10 @@ class LZ4Compressor : public Compressor {
}
int decompress(const bufferlist &src, bufferlist &dst) override {
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.decompress(src, dst);
#endif
bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
return decompress(i, src.length(), dst);
}
@ -67,6 +83,10 @@ class LZ4Compressor : public Compressor {
int decompress(bufferlist::iterator &p,
size_t compressed_len,
bufferlist &dst) override {
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.decompress(p, compressed_len, dst);
#endif
uint32_t count;
std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs;
decode(count, p);

View File

@ -31,7 +31,7 @@ public:
std::ostream *ss) override
{
if (compressor == 0) {
SnappyCompressor *interface = new SnappyCompressor();
SnappyCompressor *interface = new SnappyCompressor(cct);
compressor = CompressorRef(interface);
}
*cs = compressor;

View File

@ -17,6 +17,7 @@
#include <snappy.h>
#include <snappy-sinksource.h>
#include "common/config.h"
#include "compressor/Compressor.h"
#include "include/buffer.h"
@ -56,9 +57,20 @@ class CEPH_BUFFER_API BufferlistSource : public snappy::Source {
class SnappyCompressor : public Compressor {
public:
SnappyCompressor() : Compressor(COMP_ALG_SNAPPY, "snappy") {}
SnappyCompressor(CephContext* cct) : Compressor(COMP_ALG_SNAPPY, "snappy") {
#ifdef HAVE_QATZIP
if (cct->_conf->qat_compressor_enabled && qat_accel.init("snappy"))
qat_enabled = true;
else
qat_enabled = false;
#endif
}
int compress(const bufferlist &src, bufferlist &dst) override {
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.compress(src, dst);
#endif
BufferlistSource source(const_cast<bufferlist&>(src).begin(), src.length());
bufferptr ptr = buffer::create_page_aligned(
snappy::MaxCompressedLength(src.length()));
@ -69,6 +81,10 @@ class SnappyCompressor : public Compressor {
}
int decompress(const bufferlist &src, bufferlist &dst) override {
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.decompress(src, dst);
#endif
bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
return decompress(i, src.length(), dst);
}
@ -76,6 +92,10 @@ class SnappyCompressor : public Compressor {
int decompress(bufferlist::iterator &p,
size_t compressed_len,
bufferlist &dst) override {
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.decompress(p, compressed_len, dst);
#endif
snappy::uint32 res_len = 0;
BufferlistSource source_1(p, compressed_len);
if (!snappy::GetUncompressedLength(&source_1, &res_len)) {

View File

@ -19,7 +19,6 @@
#include "arch/probe.h"
#include "arch/intel.h"
#include "arch/arm.h"
#include "common/config.h"
#include "compressor/CompressionPlugin.h"
#include "ZlibCompressor.h"

View File

@ -163,6 +163,10 @@ int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out)
int ZlibCompressor::compress(const bufferlist &in, bufferlist &out)
{
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.compress(in, out);
#endif
#if __x86_64__ && defined(HAVE_BETTER_YASM_ELF64)
if (isal_enabled)
return isal_compress(in, out);
@ -175,6 +179,11 @@ int ZlibCompressor::compress(const bufferlist &in, bufferlist &out)
int ZlibCompressor::decompress(bufferlist::iterator &p, size_t compressed_size, bufferlist &out)
{
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.decompress(p, compressed_size, out);
#endif
int ret;
unsigned have;
z_stream strm;
@ -228,6 +237,10 @@ int ZlibCompressor::decompress(bufferlist::iterator &p, size_t compressed_size,
int ZlibCompressor::decompress(const bufferlist &in, bufferlist &out)
{
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.decompress(in, out);
#endif
bufferlist::iterator i = const_cast<bufferlist&>(in).begin();
return decompress(i, in.length(), out);
}

View File

@ -17,6 +17,7 @@
#ifndef CEPH_COMPRESSION_ZLIB_H
#define CEPH_COMPRESSION_ZLIB_H
#include "common/config.h"
#include "compressor/Compressor.h"
class ZlibCompressor : public Compressor {
@ -24,7 +25,14 @@ class ZlibCompressor : public Compressor {
CephContext *const cct;
public:
ZlibCompressor(CephContext *cct, bool isal)
: Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) {}
: Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) {
#ifdef HAVE_QATZIP
if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib"))
qat_enabled = true;
else
qat_enabled = false;
#endif
}
int compress(const bufferlist &in, bufferlist &out) override;
int decompress(const bufferlist &in, bufferlist &out) override;

View File

@ -342,4 +342,7 @@
/* Defined if std::map::merge() is supported */
#cmakedefine HAVE_STDLIB_MAP_SPLICING
/* Defined if Intel QAT compress/decompress is supported */
#cmakedefine HAVE_QATZIP
#endif /* CONFIG_H */

View File

@ -480,3 +480,78 @@ TEST(ZlibCompressor, isal_compress_zlib_decompress_walk)
}
#endif // __x86_64__
#ifdef HAVE_QATZIP
TEST(QAT, enc_qat_dec_noqat) {
#ifdef HAVE_LZ4
const char* alg_collection[] = {"zlib", "lz4", "snappy"};
#else
const char* alg_collection[] = {"zlib", "snappy"};
#endif
for (auto alg : alg_collection) {
g_conf->set_val("qat_compressor_enabled", "true");
CompressorRef q = Compressor::create(g_ceph_context, alg);
g_conf->set_val("qat_compressor_enabled", "false");
CompressorRef noq = Compressor::create(g_ceph_context, alg);
// generate random buffer
for (int cnt=0; cnt<100; cnt++) {
srand(cnt + 1000);
int log2 = (rand()%18) + 1;
int size = (rand() % (1 << log2)) + 1;
char test[size];
for (int i=0; i<size; ++i)
test[i] = rand()%256;
bufferlist in, out;
in.append(test, size);
int res = q->compress(in, out);
EXPECT_EQ(res, 0);
bufferlist after;
res = noq->decompress(out, after);
EXPECT_EQ(res, 0);
bufferlist exp;
exp.append(test, size);
EXPECT_TRUE(exp.contents_equal(after));
}
}
}
TEST(QAT, enc_noqat_dec_qat) {
#ifdef HAVE_LZ4
const char* alg_collection[] = {"zlib", "lz4", "snappy"};
#else
const char* alg_collection[] = {"zlib", "snappy"};
#endif
for (auto alg : alg_collection) {
g_conf->set_val("qat_compressor_enabled", "true");
CompressorRef q = Compressor::create(g_ceph_context, alg);
g_conf->set_val("qat_compressor_enabled", "false");
CompressorRef noq = Compressor::create(g_ceph_context, alg);
// generate random buffer
for (int cnt=0; cnt<100; cnt++) {
srand(cnt + 1000);
int log2 = (rand()%18) + 1;
int size = (rand() % (1 << log2)) + 1;
char test[size];
for (int i=0; i<size; ++i)
test[i] = rand()%256;
bufferlist in, out;
in.append(test, size);
int res = noq->compress(in, out);
EXPECT_EQ(res, 0);
bufferlist after;
res = q->decompress(out, after);
EXPECT_EQ(res, 0);
bufferlist exp;
exp.append(test, size);
EXPECT_TRUE(exp.contents_equal(after));
}
}
}
#endif // HAVE_QATZIP