Merge pull request #45806 from hualongfeng/qat_error_fix1

common/compressor: fix the issue that read more data

Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2022-06-09 10:50:24 -04:00 committed by GitHub
commit 16b73f58c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 55 deletions

View File

@ -31,15 +31,17 @@ static std::ostream& _prefix(std::ostream* _dout)
return *_dout << "QatAccel: ";
}
// -----------------------------------------------------------------------------
// default window size for Zlib 1.2.8, negated for raw deflate
#define ZLIB_DEFAULT_WIN_SIZE -15
/* Estimate data expansion after decompression */
static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000};
void QzSessionDeleter::operator() (struct QzSession_S *session) {
qzTeardownSession(session);
delete session;
}
/* Estimate data expansion after decompression */
static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200};
static bool get_qz_params(const std::string &alg, QzSessionParams_T &params) {
int rc;
rc = qzGetDefaults(&params);
@ -49,7 +51,8 @@ static bool get_qz_params(const std::string &alg, QzSessionParams_T &params) {
params.is_busy_polling = true;
if (alg == "zlib") {
params.comp_algorithm = QZ_DEFLATE;
params.data_fmt = QZ_DEFLATE_GZIP_EXT;
params.data_fmt = QZ_DEFLATE_RAW;
params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level;
}
else {
// later, there also has lz4.
@ -153,17 +156,26 @@ int QatAccel::compress(const bufferlist &in, bufferlist &out, std::optional<int3
return -1; // session initialization failed
}
auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
compressor_message = ZLIB_DEFAULT_WIN_SIZE;
int begin = 1;
for (auto &i : in.buffers()) {
const unsigned char* c_in = (unsigned char*) i.c_str();
unsigned int len = i.length();
unsigned int out_len = qzMaxCompressedLength(len, session.get());
unsigned int out_len = qzMaxCompressedLength(len, session.get()) + begin;
bufferptr ptr = buffer::create_small_page_aligned(out_len);
int rc = qzCompress(session.get(), c_in, &len, (unsigned char *)ptr.c_str(), &out_len, 1);
unsigned char* c_out = (unsigned char*)ptr.c_str() + begin;
int rc = qzCompress(session.get(), c_in, &len, c_out, &out_len, 1);
if (rc != QZ_OK)
return -1;
if (begin) {
// put a compressor variation mark in front of compressed stream, not used at the moment
ptr.c_str()[0] = 0;
out_len += begin;
begin = 0;
}
out.append(ptr, 0, out_len);
}
return 0;
@ -183,62 +195,45 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
return -1; // session initialization failed
}
auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
int begin = 1;
unsigned int ratio_idx = 0;
bool read_more = false;
bool joint = false;
int rc = 0;
bufferlist tmp;
size_t remaining = std::min<size_t>(p.get_remaining(), compressed_len);
while (remaining) {
if (p.end()) {
return -1;
}
unsigned int ratio_idx = 0;
const char* c_in = nullptr;
unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
remaining -= len;
len -= begin;
c_in += begin;
begin = 0;
unsigned int out_len = QZ_HW_BUFF_SZ;
bufferptr cur_ptr = p.get_current_ptr();
unsigned int len = cur_ptr.length();
if (joint) {
if (read_more)
tmp.append(cur_ptr.c_str(), len);
len = tmp.length();
tmp.rebuild_page_aligned();
}
unsigned int out_len = len * expansion_ratio[ratio_idx];
bufferptr ptr = buffer::create_small_page_aligned(out_len);
if (joint)
rc = qzDecompress(session.get(), (const unsigned char*)tmp.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
else
rc = qzDecompress(session.get(), (const unsigned char*)cur_ptr.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
if (rc == QZ_DATA_ERROR) {
if (!joint) {
tmp.append(cur_ptr.c_str(), cur_ptr.length());
p += cur_ptr.length();
remaining -= cur_ptr.length();
joint = true;
bufferptr ptr;
do {
while (out_len <= len * expansion_ratio[ratio_idx]) {
out_len *= 2;
}
read_more = true;
continue;
} else if (rc == QZ_BUF_ERROR) {
if (ratio_idx == std::size(expansion_ratio))
return -1;
if (joint)
read_more = false;
ratio_idx++;
continue;
} else if (rc != QZ_OK) {
return -1;
} else {
ratio_idx = 0;
joint = false;
read_more = false;
}
p += cur_ptr.length();
remaining -= cur_ptr.length();
dst.append(ptr, 0, out_len);
ptr = buffer::create_small_page_aligned(out_len);
rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len);
ratio_idx++;
} while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio));
if (rc == QZ_OK) {
dst.append(ptr, 0, out_len);
} else if (rc == QZ_DATA_ERROR) {
dout(1) << "QAT compressor DATA ERROR" << dendl;
return -1;
} else if (rc == QZ_BUF_ERROR) {
dout(1) << "QAT compressor BUF ERROR" << dendl;
return -1;
} else if (rc != QZ_OK) {
dout(1) << "QAT compressor NOT OK" << dendl;
return -1;
}
}
return 0;

View File

@ -187,7 +187,8 @@ int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, std::optiona
int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, std::optional<int32_t> compressor_message)
{
#ifdef HAVE_QATZIP
if (qat_enabled)
// QAT can only decompress with the default window size
if (qat_enabled && (!compressor_message || *compressor_message == ZLIB_DEFAULT_WIN_SIZE))
return qat_accel.decompress(p, compressed_size, out, compressor_message);
#endif