Merge pull request #35079 from liewegas/wip-dedup-tool

ceph-dedup-tool: add new FastCDC chunker, and make estimate test a range of chunk sizes

Reviewed-by: Samuel Just <sjust@redhat.com>
Reviewed-by: Myoungwon Oh <myoungwon.oh@samsung.com>
This commit is contained in:
Kefu Chai 2020-05-27 21:52:58 +08:00 committed by GitHub
commit a4107f2c25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 865 additions and 624 deletions

View File

@ -153,48 +153,8 @@ function test_dedup_chunk_scrub()
$RADOS_TOOL -p $POOL rm bar
}
function test_dedup_ratio_rabin()
{
# case 1
echo "abcdefghijklmnop" >> dedup_16
for num in `seq 0 63`
do
dd if=./dedup_16 bs=16 count=1 >> dedup_object_1k
done
for num in `seq 0 11`
do
dd if=dedup_object_1k bs=1K count=1 >> test_rabin_object
done
$RADOS_TOOL -p $POOL put $OBJ ./test_rabin_object
RESULT=$($DEDUP_TOOL --op estimate --pool $POOL --min-chunk 1015 --chunk-algorithm rabin --fingerprint-algorithm rabin --debug | grep result -a | awk '{print$4}')
if [ 4096 -ne $RESULT ];
then
die "Estimate failed expecting 4096 result $RESULT"
fi
echo "a" >> test_rabin_object_2
dd if=./test_rabin_object bs=8K count=1 >> test_rabin_object_2
$RADOS_TOOL -p $POOL put $OBJ"_2" ./test_rabin_object_2
RESULT=$($DEDUP_TOOL --op estimate --pool $POOL --min-chunk 1012 --chunk-algorithm rabin --fingerprint-algorithm rabin --debug | grep result -a | awk '{print$4}')
if [ 11259 -ne $RESULT ];
then
die "Estimate failed expecting 11259 result $RESULT"
fi
RESULT=$($DEDUP_TOOL --op estimate --pool $POOL --min-chunk 1024 --chunk-mask-bit 3 --chunk-algorithm rabin --fingerprint-algorithm rabin --debug | grep result -a | awk '{print$4}')
if [ 7170 -ne $RESULT ];
then
die "Estimate failed expecting 7170 result $RESULT"
fi
rm -rf ./dedup_object_1k ./test_rabin_object ./test_rabin_object_2 ./dedup_16
}
test_dedup_ratio_fixed
test_dedup_chunk_scrub
test_dedup_ratio_rabin
$CEPH_TOOL osd pool delete $POOL $POOL --yes-i-really-really-mean-it

22
src/common/CDC.cc Normal file
View File

@ -0,0 +1,22 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <random>
#include "CDC.h"
#include "FastCDC.h"
#include "FixedCDC.h"
std::unique_ptr<CDC> CDC::create(
const std::string& type,
int bits,
int windowbits)
{
if (type == "fastcdc") {
return std::unique_ptr<CDC>(new FastCDC(bits, windowbits));
}
if (type == "fixed") {
return std::unique_ptr<CDC>(new FixedCDC(bits, windowbits));
}
return nullptr;
}

27
src/common/CDC.h Normal file
View File

@ -0,0 +1,27 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <vector>
#include <string>
#include "include/buffer.h"
class CDC {
public:
virtual ~CDC() = default;
/// calculate chunk boundaries as vector of (offset, length) pairs
virtual void calc_chunks(
const bufferlist& inputdata,
std::vector<std::pair<uint64_t, uint64_t>> *chunks) const = 0;
/// set target chunk size as a power of 2, and number of bits for hard min/max
virtual void set_target_bits(int bits, int windowbits = 2) = 0;
static std::unique_ptr<CDC> create(
const std::string& type,
int bits,
int windowbits = 0);
};

View File

@ -16,8 +16,11 @@ set(common_srcs
BackTrace.cc
ConfUtils.cc
Cycles.cc
CDC.cc
DecayCounter.cc
FastCDC.cc
Finisher.cc
FixedCDC.cc
Formatter.cc
Graylog.cc
HTMLFormatter.cc
@ -88,7 +91,6 @@ set(common_srcs
perf_counters_collection.cc
perf_histogram.cc
pick_address.cc
rabin.cc
random_string.cc
reverse.c
run_cmd.cc

175
src/common/FastCDC.cc Normal file
View File

@ -0,0 +1,175 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <random>
#include "FastCDC.h"
// Unlike FastCDC described in the paper, if we are close to the
// target, use the target mask. If we are very small or very large,
// use an adjusted mask--like the paper. This tries to keep more
// cut points using the same mask, and fewer using the small or large
// masks.
// How many more/fewer bits to set in the small/large masks.
//
// This is the "normalization level" or "NC level" in the FastCDC
// paper.
#define TARGET_WINDOW_MASK_BITS 2
// How big the 'target window' is (in which we use the target mask).
//
// In the FastCDC paper, this is always 0: there is not "target
// window," and either small_mask (maskS) or large_mask (maskL) is
// used--never target_mask (maskA).
#define TARGET_WINDOW_BITS 1
// How many bits larger/smaller than target for hard limits on chunk
// size.
//
// We assume the min and max sizes are always this many bits
// larger/smaller than the target. (Note that the FastCDC paper 8KB
// example has a min of 2KB (2 bits smaller) and max of 64 KB (3 bits
// larger), although it is not clear why they chose those values.)
#define SIZE_WINDOW_BITS 2
void FastCDC::_setup(int target, int size_window_bits)
{
target_bits = target;
if (!size_window_bits) {
size_window_bits = SIZE_WINDOW_BITS;
}
min_bits = target - size_window_bits;
max_bits = target + size_window_bits;
std::mt19937_64 engine;
// prefill table
for (unsigned i = 0; i < 256; ++i) {
table[i] = engine();
}
// set mask
int did = 0;
uint64_t m = 0;
while (did < target_bits + TARGET_WINDOW_MASK_BITS) {
uint64_t bit = 1ull << (engine() & 63);
if (m & bit) {
continue; // this bit is already set
}
m |= bit;
++did;
if (did == target_bits - TARGET_WINDOW_MASK_BITS) {
large_mask = m;
} else if (did == target_bits) {
target_mask = m;
} else if (did == target_bits + TARGET_WINDOW_MASK_BITS) {
small_mask = m;
}
}
}
static inline bool _scan(
// these are our cursor/postion...
bufferlist::buffers_t::const_iterator *p,
const char **pp, const char **pe,
size_t& pos,
size_t max, // how much to read
uint64_t& fp, // fingerprint
uint64_t mask, const uint64_t *table)
{
while (pos < max) {
if (*pp == *pe) {
++(*p);
*pp = (*p)->c_str();
*pe = *pp + (*p)->length();
}
const char *te = std::min(*pe, *pp + max - pos);
for (; *pp < te; ++(*pp), ++pos) {
if ((fp & mask) == mask) {
return false;
}
fp = (fp << 1) ^ table[*(unsigned char*)*pp];
}
if (pos >= max) {
return true;
}
}
return true;
}
void FastCDC::calc_chunks(
const bufferlist& bl,
std::vector<std::pair<uint64_t, uint64_t>> *chunks) const
{
if (bl.length() == 0) {
return;
}
auto p = bl.buffers().begin();
const char *pp = p->c_str();
const char *pe = pp + p->length();
size_t pos = 0;
size_t len = bl.length();
while (pos < len) {
size_t cstart = pos;
uint64_t fp = 0;
// are we left with a min-sized (or smaller) chunk?
if (len - pos <= (1ul << min_bits)) {
chunks->push_back(std::pair<uint64_t,uint64_t>(pos, len - pos));
break;
}
// skip forward to the min chunk size cut point (minus the window, so
// we can initialize the rolling fingerprint).
size_t skip = (1 << min_bits) - window;
pos += skip;
while (skip) {
size_t s = std::min<size_t>(pe - pp, skip);
skip -= s;
pp += s;
if (pp == pe) {
++p;
pp = p->c_str();
pe = pp + p->length();
}
}
// first fill the window
size_t max = pos + window;
while (pos < max) {
if (pp == pe) {
++p;
pp = p->c_str();
pe = pp + p->length();
}
const char *te = std::min(pe, pp + (max - pos));
for (; pp < te; ++pp, ++pos) {
fp = (fp << 1) ^ table[*(unsigned char*)pp];
}
}
ceph_assert(pos < len);
// find an end marker
if (
// for the first "small" region
_scan(&p, &pp, &pe, pos,
std::min(len, cstart + (1 << (target_bits - TARGET_WINDOW_BITS))),
fp, small_mask, table) &&
// for the middle range (close to our target)
(TARGET_WINDOW_BITS == 0 ||
_scan(&p, &pp, &pe, pos,
std::min(len, cstart + (1 << (target_bits + TARGET_WINDOW_BITS))),
fp, target_mask, table)) &&
// we're past target, use large_mask!
_scan(&p, &pp, &pe, pos,
std::min(len,
cstart + (1 << max_bits)),
fp, large_mask, table)) ;
chunks->push_back(std::pair<uint64_t,uint64_t>(cstart, pos - cstart));
}
}

54
src/common/FastCDC.h Normal file
View File

@ -0,0 +1,54 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include "CDC.h"
// Based on this paper:
// https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf
//
// Changes:
// - window size fixed at 64 bytes (to match our word size)
// - use XOR instead of +
// - match mask instead of 0
// - use target mask when close to target size (instead of
// small/large mask). The idea here is to try to use a consistent (target)
// mask for most cut points if we can, and only resort to small/large mask
// when we are (very) small or (very) large.
// Note about the target_bits: The goal is an average chunk size of 1
// << target_bits. However, in reality the average is ~1.25x that
// because of the hard mininum chunk size.
class FastCDC : public CDC {
private:
int target_bits; ///< target chunk size bits (1 << target_bits)
int min_bits; ///< hard minimum chunk size bits (1 << min_bits)
int max_bits; ///< hard maximum chunk size bits (1 << max_bits)
uint64_t target_mask; ///< maskA in the paper (target_bits set)
uint64_t small_mask; ///< maskS in the paper (more bits set)
uint64_t large_mask; ///< maskL in the paper (fewer bits set)
/// lookup table with pseudorandom values for each byte
uint64_t table[256];
/// window size in bytes
const size_t window = sizeof(uint64_t)*8; // bits in uint64_t
void _setup(int target, int window_bits);
public:
FastCDC(int target = 18, int window_bits = 0) {
_setup(target, window_bits);
};
void set_target_bits(int target, int window_bits) override {
_setup(target, window_bits);
}
void calc_chunks(
const bufferlist& bl,
std::vector<std::pair<uint64_t, uint64_t>> *chunks) const override;
};

20
src/common/FixedCDC.cc Normal file
View File

@ -0,0 +1,20 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma
#include "FixedCDC.h"
void FixedCDC::calc_chunks(
const bufferlist& bl,
std::vector<std::pair<uint64_t, uint64_t>> *chunks) const
{
size_t len = bl.length();
if (!len) {
return;
}
for (size_t pos = 0; pos < len; pos += chunk_size) {
chunks->push_back(std::pair<uint64_t,uint64_t>(pos, std::min(chunk_size,
len - pos)));
}
}

23
src/common/FixedCDC.h Normal file
View File

@ -0,0 +1,23 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include "CDC.h"
class FixedCDC : public CDC {
private:
size_t chunk_size;
public:
FixedCDC(int target = 18, int window_bits = 0) {
set_target_bits(target, window_bits);
};
void set_target_bits(int target, int window_bits) override {
chunk_size = 1ul << target;
}
void calc_chunks(
const bufferlist& bl,
std::vector<std::pair<uint64_t, uint64_t>> *chunks) const override;
};

View File

@ -1,135 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <cstring>
#include "include/types.h"
#include "rabin.h"
uint64_t RabinChunk::gen_rabin_hash(char* chunk_data, uint64_t off, uint64_t len) {
uint64_t roll_sum = 0;
uint64_t data_len = len;
if (data_len == 0) {
data_len = window_size;
}
for (uint64_t i = off; i < data_len; i++) {
char cur_byte = *(chunk_data + i);
roll_sum = (roll_sum * rabin_prime + cur_byte ) % (mod_prime) ;
}
return roll_sum;
}
bool RabinChunk::end_of_chunk(const uint64_t fp , int numbits) {
return ((fp & rabin_mask[numbits]) == 0) ;
}
/*
* Given a bufferlist of inputdata, use Rabin-fingerprint to
* chunk it and return the chunked result
*
* Arguments:
* min: min data chunk size
* max: max data chunk size
*
* Returns:
* output_chunks split by Rabin
*/
int RabinChunk::do_rabin_chunks(ceph::buffer::list& inputdata,
std::vector<std::pair<uint64_t, uint64_t>>& chunks,
uint64_t min_val = 0, uint64_t max_val = 0)
{
char *ptr = inputdata.c_str();
uint64_t data_size = inputdata.length();
uint64_t min, max;
min = min_val;
max = max_val;
if (min == 0 || max == 0) {
min = this->min;
max = this->max;
}
if (min < window_size) {
return -ERANGE;
}
if (data_size < min) {
chunks.push_back(std::make_pair(0, data_size));
return 0;
}
uint64_t c_offset = 0;
uint64_t c_size = 0;
uint64_t c_start = 0;
uint64_t rabin_hash;
bool start_new_chunk = true;
bool store_chunk = false;
while (c_offset + window_size < data_size) { // if it is still possible to calculate rabin hash
if (start_new_chunk) {
rabin_hash = gen_rabin_hash(ptr, c_offset); // start calculating for a new chunk
c_size = window_size; // don't forget set c_size
start_new_chunk = false;
} else {
// use existing rabin to calculate a new rabin hash
// note c_offset already increased by 1
// old byte pointed by ptr + c_offset - 1
// new byte pointed by ptr + c_offset + WINDOW_SIZE -1;
char new_byte = *(ptr + c_offset + window_size - 1);
char old_byte = *(ptr + c_offset - 1);
// TODO modulus POW_47 is too large a constant in c++ even for 64 bit unsinged int
rabin_hash = (rabin_hash * rabin_prime + new_byte - old_byte * pow) % (mod_prime);
}
/*
Case 1 : Fingerprint Found
subcase 1 : if c_size < min -> ignore
subcase 2 : if min <= c_size <= max -> store
subcase 3 : if c_size > max -> won't happen
Case 2 : Fingerprint not Found
subcase 1 : if c_size < min -> ignore
subcase 2 : if min <= c_size < max -> ignore
subcase 3 : if c_size == max -> (force) store
*/
if (end_of_chunk(rabin_hash, num_bits)) {
if((c_size >= min && c_size <= max)) { // a valid chunk with rabin
store_chunk = true;
} else {
store_chunk = false;
}
} else {
if (c_size == max) {
store_chunk = true;
} else {
store_chunk = false;
}
}
if (store_chunk) {
chunks.push_back(std::make_pair(c_start, c_size));
c_start += c_size;
c_offset = c_start;
start_new_chunk = true;
continue;
}
c_size++;
c_offset++;
}
if (c_start < data_size) {
c_size = data_size - c_start;
chunks.push_back(std::make_pair(c_start, c_size));
}
return 0;
}

View File

@ -1,85 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Authors : Yuan-Ting Hsieh, Hsuan-Heng Wu, Myoungwon Oh
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_COMMON_RABIN_H_
#define CEPH_COMMON_RABIN_H_
#include <cstdint>
#include <utility>
#include <vector>
#include "include/buffer_fwd.h"
class RabinChunk {
public:
RabinChunk(uint32_t window_size, uint32_t rabin_prime,
uint64_t mod_prime, uint64_t pow, std::vector<uint64_t> rabin_mask,
uint64_t min, uint64_t max, uint32_t num_bits):
window_size(window_size), rabin_prime(rabin_prime),
mod_prime(mod_prime), pow(pow), rabin_mask(rabin_mask), min(min),
max(max), num_bits(num_bits) {}
RabinChunk() {
default_init_rabin_options();
}
void default_init_rabin_options() {
std::vector<uint64_t> _rabin_mask = {0,1,3,7,15,31,63,127,255,511,1023,2047,
4095,8191,16383,32767,65535};
window_size = 48;
rabin_prime = 3;
mod_prime = 6148914691236517051;
pow = 907234050803559263; // pow(prime, window_size)
min = 8000;
max = 16000;
num_bits = 3;
rabin_mask = _rabin_mask;
}
int do_rabin_chunks(ceph::buffer::list& inputdata,
std::vector<std::pair<uint64_t, uint64_t>>& chunks,
uint64_t min, uint64_t max);
uint64_t gen_rabin_hash(char* chunk_data, uint64_t off, uint64_t len = 0);
bool end_of_chunk(const uint64_t fp , int numbits);
void set_window_size(uint32_t size) { window_size = size; }
void set_rabin_prime(uint32_t r_prime) { rabin_prime = r_prime; }
void set_mod_prime(uint64_t m_prime) { mod_prime = m_prime; }
void set_pow(uint64_t p) { pow = p; }
void set_rabin_mask(std::vector<uint64_t> & mask) { rabin_mask = mask; }
void set_min_chunk(uint32_t c_min) { min = c_min; }
void set_max_chunk(uint32_t c_max) { max = c_max; }
int add_rabin_mask(uint64_t mask) {
rabin_mask.push_back(mask);
for (int i = 0; rabin_mask.size(); i++) {
if (rabin_mask[i] == mask) {
return i;
}
}
return -1;
}
void set_numbits(uint32_t bit) { num_bits = bit; }
private:
uint32_t window_size;
uint32_t rabin_prime;
uint64_t mod_prime;
uint64_t pow;
std::vector<uint64_t> rabin_mask;
uint64_t min;
uint64_t max;
uint32_t num_bits;
};
#endif // CEPH_COMMON_RABIN_H_

View File

@ -329,10 +329,10 @@ add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc)
add_ceph_unittest(unittest_async_shared_mutex)
target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system)
add_executable(unittest_rabin_chunk test_rabin_chunk.cc
add_executable(unittest_cdc test_cdc.cc
$<TARGET_OBJECTS:unit-main>)
target_link_libraries(unittest_rabin_chunk global ceph-common)
add_ceph_unittest(unittest_rabin_chunk)
target_link_libraries(unittest_cdc global ceph-common)
add_ceph_unittest(unittest_cdc)
add_executable(unittest_ceph_timer test_ceph_timer.cc)
add_ceph_unittest(unittest_ceph_timer)

183
src/test/common/test_cdc.cc Normal file
View File

@ -0,0 +1,183 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <vector>
#include <cstring>
#include <random>
#include "include/types.h"
#include "include/buffer.h"
#include "common/CDC.h"
#include "gtest/gtest.h"
void generate_buffer(int size, bufferlist *outbl, int seed = 0)
{
std::mt19937_64 engine, engine2;
engine.seed(seed);
engine2.seed(seed);
// assemble from randomly-sized segments!
outbl->clear();
auto left = size;
while (left) {
size_t l = std::min<size_t>((engine2() & 0xffff0) + 16, left);
left -= l;
bufferptr p(l);
p.set_length(l);
char *b = p.c_str();
for (size_t i = 0; i < l / sizeof(uint64_t); ++i) {
((uint64_t*)b)[i] = engine();
}
outbl->append(p);
}
}
class CDCTest : public ::testing::Test,
public ::testing::WithParamInterface<const char*> {
public:
std::unique_ptr<CDC> cdc;
CDCTest() {
auto plugin = GetParam();
cdc = std::move(CDC::create(plugin, 18));
}
};
TEST_P(CDCTest, insert_front)
{
if (GetParam() == "fixed"s) return;
for (int frontlen = 1; frontlen < 163840; frontlen *= 3) {
bufferlist bl1, bl2;
generate_buffer(4*1024*1024, &bl1);
generate_buffer(frontlen, &bl2);
bl2.append(bl1);
bl2.rebuild();
vector<pair<uint64_t, uint64_t>> chunks1, chunks2;
cdc->calc_chunks(bl1, &chunks1);
cdc->calc_chunks(bl2, &chunks2);
cout << "1: " << chunks1 << std::endl;
cout << "2: " << chunks2 << std::endl;
ASSERT_GE(chunks2.size(), chunks1.size());
int match = 0;
for (unsigned i = 0; i < chunks1.size(); ++i) {
unsigned j = i + (chunks2.size() - chunks1.size());
if (chunks1[i].first + frontlen == chunks2[j].first &&
chunks1[i].second == chunks2[j].second) {
match++;
}
}
ASSERT_GE(match, chunks1.size() - 1);
}
}
TEST_P(CDCTest, insert_middle)
{
if (GetParam() == "fixed"s) return;
for (int frontlen = 1; frontlen < 163840; frontlen *= 3) {
bufferlist bl1, bl2;
generate_buffer(4*1024*1024, &bl1);
bufferlist f, m, e;
generate_buffer(frontlen, &m);
f.substr_of(bl1, 0, bl1.length() / 2);
e.substr_of(bl1, bl1.length() / 2, bl1.length() / 2);
bl2 = f;
bl2.append(m);
bl2.append(e);
bl2.rebuild();
vector<pair<uint64_t, uint64_t>> chunks1, chunks2;
cdc->calc_chunks(bl1, &chunks1);
cdc->calc_chunks(bl2, &chunks2);
cout << "1: " << chunks1 << std::endl;
cout << "2: " << chunks2 << std::endl;
ASSERT_GE(chunks2.size(), chunks1.size());
int match = 0;
unsigned i;
for (i = 0; i < chunks1.size()/2; ++i) {
unsigned j = i;
if (chunks1[i].first == chunks2[j].first &&
chunks1[i].second == chunks2[j].second) {
match++;
}
}
for (; i < chunks1.size(); ++i) {
unsigned j = i + (chunks2.size() - chunks1.size());
if (chunks1[i].first + frontlen == chunks2[j].first &&
chunks1[i].second == chunks2[j].second) {
match++;
}
}
ASSERT_GE(match, chunks1.size() - 2);
}
}
TEST_P(CDCTest, specific_result)
{
map<string,vector<pair<uint64_t,uint64_t>>> expected = {
{"fixed", { {0, 262144}, {262144, 262144}, {524288, 262144}, {786432, 262144}, {1048576, 262144}, {1310720, 262144}, {1572864, 262144}, {1835008, 262144}, {2097152, 262144}, {2359296, 262144}, {2621440, 262144}, {2883584, 262144}, {3145728, 262144}, {3407872, 262144}, {3670016, 262144}, {3932160, 262144} }},
{"fastcdc", { {0, 151460}, {151460, 441676}, {593136, 407491}, {1000627, 425767}, {1426394, 602875}, {2029269, 327307}, {2356576, 155515}, {2512091, 159392}, {2671483, 829416}, {3500899, 539667}, {4040566, 153738}}},
};
bufferlist bl;
generate_buffer(4*1024*1024, &bl);
vector<pair<uint64_t,uint64_t>> chunks;
cdc->calc_chunks(bl, &chunks);
ASSERT_EQ(chunks, expected[GetParam()]);
}
void do_size_histogram(CDC& cdc, bufferlist& bl,
map<int,int> *h)
{
vector<pair<uint64_t, uint64_t>> chunks;
cdc.calc_chunks(bl, &chunks);
uint64_t total = 0;
uint64_t num = 0;
for (auto& i : chunks) {
//unsigned b = i.second & 0xfffff000;
unsigned b = 1 << (cbits(i.second - 1));
(*h)[b]++;
++num;
total += i.second;
}
(*h)[0] = total / num;
}
void print_histogram(map<int,int>& h)
{
cout << "size\tcount" << std::endl;
for (auto i : h) {
if (i.first) {
cout << i.first << "\t" << i.second << std::endl;
} else {
cout << "avg\t" << i.second << std::endl;
}
}
}
TEST_P(CDCTest, chunk_random)
{
map<int,int> h;
for (int i = 0; i < 32; ++i) {
cout << ".";
cout.flush();
bufferlist r;
generate_buffer(16*1024*1024, &r, i);
do_size_histogram(*cdc, r, &h);
}
cout << std::endl;
print_histogram(h);
}
INSTANTIATE_TEST_SUITE_P(
CDC,
CDCTest,
::testing::Values(
"fixed", // note: we skip most tests bc this is not content-based
"fastcdc"
));

View File

@ -3,6 +3,7 @@
#include <vector>
#include <cstring>
#include <random>
#include "include/types.h"
#include "include/buffer.h"
@ -67,3 +68,84 @@ TEST(Rabin, test_cdc) {
ASSERT_EQ(chunks[4].second, cmp_chunks[4].second);
}
void generate_buffer(int size, bufferlist *outbl)
{
outbl->clear();
outbl->append_zero(size);
char *b = outbl->c_str();
std::mt19937_64 engine;
for (size_t i = 0; i < size / sizeof(uint64_t); ++i) {
((uint64_t*)b)[i] = engine();
}
}
#if 0
this fails!
TEST(Rabin, shifts)
{
RabinChunk rabin;
rabin.set_target_bits(18, 2);
for (int frontlen = 1; frontlen < 5; frontlen++) {
bufferlist bl1, bl2;
generate_buffer(4*1024*1024, &bl1);
generate_buffer(frontlen, &bl2);
bl2.append(bl1);
bl2.rebuild();
vector<pair<uint64_t, uint64_t>> chunks1, chunks2;
rabin.do_rabin_chunks(bl1, chunks1);
rabin.do_rabin_chunks(bl2, chunks2);
cout << "1: " << chunks1 << std::endl;
cout << "2: " << chunks2 << std::endl;
ASSERT_GE(chunks2.size(), chunks1.size());
int match = 0;
for (unsigned i = 0; i < chunks1.size(); ++i) {
unsigned j = i + (chunks2.size() - chunks1.size());
if (chunks1[i].first + frontlen == chunks2[j].first &&
chunks1[i].second == chunks2[j].second) {
match++;
}
}
ASSERT_GE(match, chunks1.size() - 1);
}
}
#endif
void do_size_histogram(RabinChunk& rabin, bufferlist& bl,
map<int,int> *h)
{
vector<pair<uint64_t, uint64_t>> chunks;
rabin.do_rabin_chunks(bl, chunks);
for (auto& i : chunks) {
unsigned b = i.second & 0xfffff000;
//unsigned b = 1 << cbits(i.second);
(*h)[b]++;
}
}
void print_histogram(map<int,int>& h)
{
cout << "size\tcount" << std::endl;
for (auto i : h) {
cout << i.first << "\t" << i.second << std::endl;
}
}
TEST(Rabin, chunk_random)
{
RabinChunk rabin;
rabin.set_target_bits(18, 2);
map<int,int> h;
for (int i = 0; i < 8; ++i) {
cout << ".";
cout.flush();
bufferlist r;
generate_buffer(16*1024*1024, &r);
do_size_histogram(rabin, r, &h);
}
cout << std::endl;
print_histogram(h);
}

View File

@ -40,54 +40,106 @@
#include <climits>
#include <locale>
#include <memory>
#include <math.h>
#include "tools/RadosDump.h"
#include "cls/cas/cls_cas_client.h"
#include "include/stringify.h"
#include "global/signal_handler.h"
#include "common/rabin.h"
#include "common/CDC.h"
struct EstimateResult {
std::unique_ptr<CDC> cdc;
uint64_t chunk_size;
ceph::mutex lock = ceph::make_mutex("EstimateResult::lock");
// < key, <count, chunk_size> >
map< string, pair <uint64_t, uint64_t> > chunk_statistics;
uint64_t total_bytes = 0;
std::atomic<uint64_t> total_objects = {0};
EstimateResult(std::string alg, int chunk_size)
: cdc(CDC::create(alg, chunk_size)),
chunk_size(1ull << chunk_size) {}
void add_chunk(bufferlist& chunk, const std::string& fp_algo) {
string fp;
if (fp_algo == "sha1") {
sha1_digest_t sha1_val = crypto::digest<crypto::SHA1>(chunk);
fp = sha1_val.to_str();
} else if (fp_algo == "sha256") {
sha256_digest_t sha256_val = crypto::digest<crypto::SHA256>(chunk);
fp = sha256_val.to_str();
} else if (fp_algo == "sha512") {
sha512_digest_t sha512_val = crypto::digest<crypto::SHA512>(chunk);
fp = sha512_val.to_str();
} else {
ceph_assert(0 == "no support fingerperint algorithm");
}
std::lock_guard l(lock);
auto p = chunk_statistics.find(fp);
if (p != chunk_statistics.end()) {
p->second.first++;
if (p->second.second != chunk.length()) {
cerr << "warning: hash collision on " << fp
<< ": was " << p->second.second
<< " now " << chunk.length() << std::endl;
}
} else {
chunk_statistics[fp] = make_pair(1, chunk.length());
}
total_bytes += chunk.length();
}
void dump(Formatter *f) const {
f->dump_unsigned("target_chunk_size", chunk_size);
uint64_t dedup_bytes = 0;
uint64_t dedup_objects = chunk_statistics.size();
for (auto& j : chunk_statistics) {
dedup_bytes += j.second.second;
}
//f->dump_unsigned("dedup_bytes", dedup_bytes);
//f->dump_unsigned("original_bytes", total_bytes);
f->dump_float("dedup_bytes_ratio",
(double)dedup_bytes / (double)total_bytes);
f->dump_float("dedup_objects_ratio",
(double)dedup_objects / (double)total_objects);
uint64_t avg = total_bytes / dedup_objects;
uint64_t sqsum = 0;
for (auto& j : chunk_statistics) {
sqsum += (avg - j.second.second) * (avg - j.second.second);
}
uint64_t stddev = sqrt(sqsum / dedup_objects);
f->dump_unsigned("chunk_size_average", avg);
f->dump_unsigned("chunk_size_stddev", stddev);
}
};
map<uint64_t, EstimateResult> dedup_estimates; // chunk size -> result
using namespace librados;
unsigned default_op_size = 1 << 22;
unsigned default_op_size = 1 << 26;
unsigned default_max_thread = 2;
int32_t default_report_period = 2;
map< string, pair <uint64_t, uint64_t> > chunk_statistics; // < key, <count, chunk_size> >
ceph::mutex glock = ceph::make_mutex("chunk_statistics::Locker");
int32_t default_report_period = 10;
ceph::mutex glock = ceph::make_mutex("glock");
void usage()
{
cout << " usage: [--op <estimate|chunk_scrub|add_chunk_ref|get_chunk_ref>] [--pool <pool_name> ] " << std::endl;
cout << " --object <object_name> " << std::endl;
cout << " --chunk-size <size> chunk-size (byte) " << std::endl;
cout << " --chunk-algorithm <fixed|rabin> " << std::endl;
cout << " --chunk-algorithm <fixed|fastcdc> " << std::endl;
cout << " --fingerprint-algorithm <sha1|sha256|sha512> " << std::endl;
cout << " --chunk-pool <pool name> " << std::endl;
cout << " --max-thread <threads> " << std::endl;
cout << " --report-perioid <seconds> " << std::endl;
cout << " --report-period <seconds> " << std::endl;
cout << " --max-seconds <seconds>" << std::endl;
cout << " --max-read-size <bytes> " << std::endl;
cout << std::endl;
cout << " ***these options are for rabin chunk*** " << std::endl;
cout << " **rabin_hash = (rabin_hash * rabin_prime + new_byte - old_byte * pow) % (mod_prime) ** " << std::endl;
cout << " **default_chunk_mask = 7 ** " << std::endl;
cout << " **default_mod_prime = 6148914691236517051 ** " << std::endl;
cout << " **default_rabin_prime = 3 ** " << std::endl;
cout << " **default_pow = 907234050803559263 ** " << std::endl;
cout << " **default_window_size = 48** " << std::endl;
cout << " **default_min_chunk = 16384** " << std::endl;
cout << " **default_max_chunk = 4194304** " << std::endl;
cout << " --mod-prime <uint64_t> " << std::endl;
cout << " --rabin-prime <uint64_t> " << std::endl;
cout << " --pow <uint64_t> " << std::endl;
cout << " --chunk-mask-bit <uint32_t> " << std::endl;
cout << " --window-size <uint32_t> " << std::endl;
cout << " --min-chunk <uint32_t> " << std::endl;
cout << " --max-chunk <uint64_t> " << std::endl;
exit(1);
}
[[noreturn]] static void usage_exit()
{
usage();
exit(1);
}
@ -105,82 +157,83 @@ static int rados_sistrtoll(I &i, T *val) {
class EstimateDedupRatio;
class ChunkScrub;
class EstimateThread : public Thread
class CrawlerThread : public Thread
{
IoCtx io_ctx;
int n;
int m;
ObjectCursor begin;
ObjectCursor end;
ceph::mutex m_lock = ceph::make_mutex("EstimateThread::Locker");
ceph::mutex m_lock = ceph::make_mutex("CrawlerThread::Locker");
ceph::condition_variable m_cond;
int32_t timeout;
int32_t report_period;
bool m_stop = false;
uint64_t total_bytes = 0;
uint64_t total_objects = 0;
uint64_t examined_objects = 0;
uint64_t total_objects;
uint64_t examined_bytes = 0;
uint64_t max_read_size = 0;
bool debug = false;
#define COND_WAIT_INTERVAL 10
public:
EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, int32_t timeout,
CrawlerThread(IoCtx& io_ctx, int n, int m,
ObjectCursor begin, ObjectCursor end, int32_t report_period,
uint64_t num_objects, uint64_t max_read_size = default_op_size):
io_ctx(io_ctx), n(n), m(m), begin(begin), end(end),
timeout(timeout), total_objects(num_objects), max_read_size(max_read_size)
report_period(report_period), total_objects(num_objects), max_read_size(max_read_size)
{}
void signal(int signum) {
std::lock_guard l{m_lock};
m_stop = true;
m_cond.notify_all();
}
virtual void print_status(Formatter *f, ostream &out) = 0;
virtual void print_status(Formatter *f, ostream &out) {}
uint64_t get_examined_objects() { return examined_objects; }
uint64_t get_examined_bytes() { return examined_bytes; }
uint64_t get_total_bytes() { return total_bytes; }
uint64_t get_total_objects() { return total_objects; }
friend class EstimateDedupRatio;
friend class ChunkScrub;
};
class EstimateDedupRatio : public EstimateThread
class EstimateDedupRatio : public CrawlerThread
{
string chunk_algo;
string fp_algo;
uint64_t chunk_size;
map< string, pair <uint64_t, uint64_t> > local_chunk_statistics; // < key, <count, chunk_size> >
RabinChunk rabin;
uint64_t max_seconds;
public:
EstimateDedupRatio(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
string chunk_algo, string fp_algo, uint64_t chunk_size, int32_t timeout,
uint64_t num_objects, uint64_t max_read_size):
EstimateThread(io_ctx, n, m, begin, end, timeout, num_objects, max_read_size),
chunk_algo(chunk_algo), fp_algo(fp_algo), chunk_size(chunk_size) { }
EstimateDedupRatio(
IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
string chunk_algo, string fp_algo, uint64_t chunk_size, int32_t report_period,
uint64_t num_objects, uint64_t max_read_size,
uint64_t max_seconds):
CrawlerThread(io_ctx, n, m, begin, end, report_period, num_objects,
max_read_size),
chunk_algo(chunk_algo),
fp_algo(fp_algo),
chunk_size(chunk_size),
max_seconds(max_seconds) {
}
void* entry() {
estimate_dedup_ratio();
return NULL;
}
void estimate_dedup_ratio();
void print_status(Formatter *f, ostream &out);
map< string, pair <uint64_t, uint64_t> > &get_chunk_statistics() { return local_chunk_statistics; }
uint64_t fixed_chunk(string oid, uint64_t offset);
uint64_t rabin_chunk(string oid, uint64_t offset);
void add_chunk_fp_to_stat(bufferlist &chunk);
void set_rabin_options(uint64_t mod_prime, uint32_t rabin_prime, uint64_t pow,
uint64_t chunk_mask_bit, uint32_t window_size, uint32_t min_chunk,
uint64_t max_chunk);
};
class ChunkScrub: public EstimateThread
class ChunkScrub: public CrawlerThread
{
IoCtx chunk_io_ctx;
int fixed_objects = 0;
public:
ChunkScrub(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
IoCtx& chunk_io_ctx, int32_t timeout, uint64_t num_objects):
EstimateThread(io_ctx, n, m, begin, end, timeout, num_objects), chunk_io_ctx(chunk_io_ctx)
IoCtx& chunk_io_ctx, int32_t report_period, uint64_t num_objects):
CrawlerThread(io_ctx, n, m, begin, end, report_period, num_objects), chunk_io_ctx(chunk_io_ctx)
{ }
void* entry() {
chunk_scrub_common();
@ -191,57 +244,42 @@ public:
void print_status(Formatter *f, ostream &out);
};
vector<std::unique_ptr<EstimateThread>> estimate_threads;
vector<std::unique_ptr<CrawlerThread>> estimate_threads;
static void print_dedup_estimate(bool debug = false)
static void print_dedup_estimate(std::ostream& out, std::string chunk_algo)
{
uint64_t total_size = 0;
uint64_t dedup_size = 0;
uint64_t examined_objects = 0;
/*
uint64_t total_bytes = 0;
uint64_t total_objects = 0;
EstimateDedupRatio *ratio = NULL;
for (auto &et : estimate_threads) {
std::lock_guard l{glock};
ratio = dynamic_cast<EstimateDedupRatio*>(et.get());
assert(ratio);
for (auto p : ratio->get_chunk_statistics()) {
auto c = chunk_statistics.find(p.first);
if (c != chunk_statistics.end()) {
c->second.first += p.second.first;
} else {
chunk_statistics.insert(p);
}
}
}
if (debug) {
for (auto p : chunk_statistics) {
cout << " -- " << std::endl;
cout << " key: " << p.first << std::endl;
cout << " count: " << p.second.first << std::endl;
cout << " chunk_size: " << p.second.second << std::endl;
dedup_size += p.second.second;
cout << " -- " << std::endl;
}
} else {
for (auto p : chunk_statistics) {
dedup_size += p.second.second;
}
}
*/
uint64_t examined_objects = 0;
uint64_t examined_bytes = 0;
for (auto &et : estimate_threads) {
total_size += et->get_total_bytes();
examined_objects += et->get_examined_objects();
if (!total_objects) {
total_objects = et->get_total_objects();
}
examined_bytes += et->get_examined_bytes();
}
cout << " result: " << total_size << " | " << dedup_size << " (total size | deduped size) " << std::endl;
cout << " Dedup ratio: " << (100 - (double)(dedup_size)/total_size*100) << " % " << std::endl;
cout << " Examined objects: " << examined_objects << std::endl;
cout << " Total objects: " << total_objects << std::endl;
auto f = Formatter::create("json-pretty");
f->open_object_section("results");
f->dump_string("chunk_algo", chunk_algo);
f->open_array_section("chunk_sizes");
for (auto& i : dedup_estimates) {
f->dump_object("chunker", i.second);
}
f->close_section();
f->open_object_section("summary");
f->dump_unsigned("examined_objects", examined_objects);
f->dump_unsigned("examined_bytes", examined_bytes);
/*
f->dump_unsigned("total_objects", total_objects);
f->dump_unsigned("total_bytes", total_bytes);
f->dump_float("examined_ratio", (float)examined_bytes / (float)total_bytes);
*/
f->close_section();
f->close_section();
f->flush(out);
}
static void handle_signal(int signum)
@ -252,33 +290,10 @@ static void handle_signal(int signum)
}
}
void EstimateDedupRatio::print_status(Formatter *f, ostream &out)
{
if (f) {
f->open_array_section("estimate_dedup_ratio");
f->dump_string("PID", stringify(get_pid()));
for (auto p : local_chunk_statistics) {
f->open_object_section("fingerprint object");
f->dump_string("fingerprint", p.first);
f->dump_string("count", stringify(p.second.first));
f->dump_string("chunk_size", stringify(p.second.second));
f->close_section();
}
f->close_section();
f->open_object_section("Status");
f->dump_string("Total bytes", stringify(total_bytes));
f->dump_string("Examined objectes", stringify(examined_objects));
f->close_section();
f->flush(out);
cout << std::endl;
}
}
void EstimateDedupRatio::estimate_dedup_ratio()
{
ObjectCursor shard_start;
ObjectCursor shard_end;
utime_t cur_time = ceph_clock_now();
io_ctx.object_list_slice(
begin,
@ -288,6 +303,19 @@ void EstimateDedupRatio::estimate_dedup_ratio()
&shard_start,
&shard_end);
utime_t start = ceph_clock_now();
utime_t end;
if (max_seconds) {
end = start;
end += max_seconds;
}
utime_t next_report;
if (report_period) {
next_report = start;
next_report += report_period;
}
ObjectCursor c(shard_start);
while (c < shard_end)
{
@ -298,162 +326,62 @@ void EstimateDedupRatio::estimate_dedup_ratio()
return;
}
unsigned op_size = max_read_size;
for (const auto & i : result) {
const auto &oid = i.oid;
utime_t now = ceph_clock_now();
if (max_seconds && now > end) {
m_stop = true;
}
if (m_stop) {
return;
}
if (n == 0 && // first thread only
next_report != utime_t() && now > next_report) {
cerr << (int)(now - start) << "s : read "
<< dedup_estimates.begin()->second.total_bytes << " bytes so far..."
<< std::endl;
print_dedup_estimate(cerr, chunk_algo);
next_report = now;
next_report += report_period;
}
// read entire object
bufferlist bl;
uint64_t offset = 0;
while (true) {
std::unique_lock l{m_lock};
if (m_stop) {
Formatter *formatter = Formatter::create("json-pretty");
print_status(formatter, cout);
delete formatter;
return;
}
uint64_t next_offset;
if (chunk_algo == "fixed") {
next_offset = fixed_chunk(oid, offset);
} else if (chunk_algo == "rabin") {
next_offset = rabin_chunk(oid, offset);
} else {
ceph_assert(0 == "no support chunk algorithm");
}
if (!next_offset) {
bufferlist t;
int ret = io_ctx.read(oid, t, op_size, offset);
if (ret <= 0) {
break;
}
offset += next_offset;
m_cond.wait_for(l, std::chrono::nanoseconds(COND_WAIT_INTERVAL));
if (cur_time + utime_t(timeout, 0) < ceph_clock_now()) {
Formatter *formatter = Formatter::create("json-pretty");
print_status(formatter, cout);
delete formatter;
cur_time = ceph_clock_now();
}
offset += ret;
bl.claim_append(t);
}
examined_objects++;
examined_bytes += bl.length();
// do the chunking
for (auto& i : dedup_estimates) {
vector<pair<uint64_t, uint64_t>> chunks;
i.second.cdc->calc_chunks(bl, &chunks);
for (auto& p : chunks) {
bufferlist chunk;
chunk.substr_of(bl, p.first, p.second);
i.second.add_chunk(chunk, fp_algo);
if (debug) {
cout << " " << oid << " " << p.first << "~" << p.second << std::endl;
}
}
++i.second.total_objects;
}
}
}
}
uint64_t EstimateDedupRatio::fixed_chunk(string oid, uint64_t offset)
{
unsigned op_size = max_read_size;
int ret;
bufferlist outdata;
ret = io_ctx.read(oid, outdata, op_size, offset);
if (ret <= 0) {
return 0;
}
uint64_t c_offset = 0;
while (c_offset < outdata.length()) {
bufferlist chunk;
if (outdata.length() - c_offset > chunk_size) {
bufferptr bptr(chunk_size);
chunk.push_back(std::move(bptr));
chunk.begin().copy_in(chunk_size, outdata.c_str());
} else {
bufferptr bptr(outdata.length() - c_offset);
chunk.push_back(std::move(bptr));
chunk.begin().copy_in(outdata.length() - c_offset, outdata.c_str());
}
add_chunk_fp_to_stat(chunk);
c_offset = c_offset + chunk_size;
}
if (outdata.length() < op_size) {
return 0;
}
return outdata.length();
}
void EstimateDedupRatio::add_chunk_fp_to_stat(bufferlist &chunk)
{
string fp;
if (fp_algo == "sha1") {
sha1_digest_t sha1_val = crypto::digest<crypto::SHA1>(chunk);
fp = sha1_val.to_str();
} else if (fp_algo == "sha256") {
sha256_digest_t sha256_val = crypto::digest<crypto::SHA256>(chunk);
fp = sha256_val.to_str();
} else if (fp_algo == "sha512") {
sha512_digest_t sha512_val = crypto::digest<crypto::SHA512>(chunk);
fp = sha512_val.to_str();
} else if (chunk_algo == "rabin") {
uint64_t hash = rabin.gen_rabin_hash(chunk.c_str(), 0, chunk.length());
fp = to_string(hash);
} else {
ceph_assert(0 == "no support fingerperint algorithm");
}
auto p = local_chunk_statistics.find(fp);
if (p != local_chunk_statistics.end()) {
uint64_t count = p->second.first;
count++;
local_chunk_statistics[fp] = make_pair(count, chunk.length());
} else {
local_chunk_statistics[fp] = make_pair(1, chunk.length());
}
total_bytes += chunk.length();
}
uint64_t EstimateDedupRatio::rabin_chunk(string oid, uint64_t offset)
{
unsigned op_size = max_read_size;
int ret;
bufferlist outdata;
ret = io_ctx.read(oid, outdata, op_size, offset);
if (ret <= 0) {
return 0;
}
vector<pair<uint64_t, uint64_t>> chunks;
rabin.do_rabin_chunks(outdata, chunks, 0, 0); // use default value
for (auto p : chunks) {
bufferlist chunk;
bufferptr c_data = buffer::create(p.second);
c_data.zero();
chunk.append(c_data);
chunk.begin().copy_in(p.second, outdata.c_str() + p.first);
add_chunk_fp_to_stat(chunk);
cout << " oid: " << oid << " offset: " << p.first + offset << " length: " << p.second << std::endl;
}
if (outdata.length() < op_size) {
return 0;
}
return outdata.length();
}
void EstimateDedupRatio::set_rabin_options(uint64_t mod_prime, uint32_t rabin_prime, uint64_t pow,
uint64_t chunk_mask_bit, uint32_t window_size,
uint32_t min_chunk, uint64_t max_chunk)
{
if (mod_prime != 0) {
rabin.set_mod_prime(mod_prime);
}
if (rabin_prime != 0) {
rabin.set_rabin_prime(rabin_prime);
}
if (pow != 0) {
rabin.set_pow(pow);
}
if (chunk_mask_bit != 0) {
int index = rabin.add_rabin_mask(chunk_mask_bit);
rabin.set_numbits(index);
}
if (window_size != 0) {
rabin.set_window_size(window_size);
}
if (min_chunk != 0) {
rabin.set_min_chunk(min_chunk);
}
if (max_chunk != 0) {
rabin.set_max_chunk(max_chunk);
}
}
void ChunkScrub::chunk_scrub_common()
{
ObjectCursor shard_start;
@ -534,7 +462,7 @@ void ChunkScrub::chunk_scrub_common()
}
examined_objects++;
m_cond.wait_for(l, std::chrono::nanoseconds(COND_WAIT_INTERVAL));
if (cur_time + utime_t(timeout, 0) < ceph_clock_now()) {
if (cur_time + utime_t(report_period, 0) < ceph_clock_now()) {
Formatter *formatter = Formatter::create("json-pretty");
print_status(formatter, cout);
delete formatter;
@ -564,15 +492,16 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
{
Rados rados;
IoCtx io_ctx;
std::string chunk_algo;
string fp_algo;
std::string chunk_algo = "fastcdc";
string fp_algo = "sha1";
string pool_name;
uint64_t chunk_size = 0;
uint64_t min_chunk_size = 8192;
uint64_t max_chunk_size = 4*1024*1024;
unsigned max_thread = default_max_thread;
uint32_t report_period = default_report_period;
uint64_t max_read_size = default_op_size;
uint64_t mod_prime = 0, pow = 0, max_chunk = default_op_size;
uint32_t rabin_prime = 0, window_size = 0, chunk_mask_bit = 0, min_chunk = 16384;
uint64_t max_seconds = 0;
int ret;
std::map<std::string, std::string>::const_iterator i;
bool debug = false;
@ -589,22 +518,23 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
i = opts.find("chunk-algorithm");
if (i != opts.end()) {
chunk_algo = i->second.c_str();
if (chunk_algo != "fixed" && chunk_algo != "rabin") {
usage_exit();
if (!CDC::create(chunk_algo, 12)) {
cerr << "unrecognized chunk-algorithm " << chunk_algo << std::endl;
exit(1);
}
} else {
usage_exit();
cerr << "must specify chunk-algorithm" << std::endl;
exit(1);
}
i = opts.find("fingerprint-algorithm");
if (i != opts.end()) {
fp_algo = i->second.c_str();
if (fp_algo != "sha1" && fp_algo != "rabin"
if (fp_algo != "sha1"
&& fp_algo != "sha256" && fp_algo != "sha512") {
usage_exit();
cerr << "unrecognized fingerprint-algorithm " << fp_algo << std::endl;
exit(1);
}
} else {
usage_exit();
}
i = opts.find("chunk-size");
@ -612,9 +542,18 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
if (rados_sistrtoll(i, &chunk_size)) {
return -EINVAL;
}
} else {
if (chunk_algo != "rabin") {
usage_exit();
}
i = opts.find("min-chunk-size");
if (i != opts.end()) {
if (rados_sistrtoll(i, &min_chunk_size)) {
return -EINVAL;
}
}
i = opts.find("max-chunk-size");
if (i != opts.end()) {
if (rados_sistrtoll(i, &max_chunk_size)) {
return -EINVAL;
}
}
@ -630,55 +569,19 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
if (rados_sistrtoll(i, &report_period)) {
return -EINVAL;
}
}
}
i = opts.find("max-seconds");
if (i != opts.end()) {
if (rados_sistrtoll(i, &max_seconds)) {
return -EINVAL;
}
}
i = opts.find("max-read-size");
if (i != opts.end()) {
if (rados_sistrtoll(i, &max_read_size)) {
return -EINVAL;
}
}
i = opts.find("mod-prime");
if (i != opts.end()) {
if (rados_sistrtoll(i, &mod_prime)) {
return -EINVAL;
}
}
i = opts.find("rabin-prime");
if (i != opts.end()) {
if (rados_sistrtoll(i, &rabin_prime)) {
return -EINVAL;
}
}
i = opts.find("pow");
if (i != opts.end()) {
if (rados_sistrtoll(i, &pow)) {
return -EINVAL;
}
}
i = opts.find("chunk-mask-bit");
if (i != opts.end()) {
if (rados_sistrtoll(i, &chunk_mask_bit)) {
return -EINVAL;
}
}
i = opts.find("window-size");
if (i != opts.end()) {
if (rados_sistrtoll(i, &window_size)) {
return -EINVAL;
}
}
i = opts.find("min-chunk");
if (i != opts.end()) {
if (rados_sistrtoll(i, &min_chunk)) {
return -EINVAL;
}
}
i = opts.find("max-chunk");
if (i != opts.end()) {
if (rados_sistrtoll(i, &max_chunk)) {
return -EINVAL;
}
}
i = opts.find("debug");
if (i != opts.end()) {
debug = true;
@ -700,7 +603,7 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
}
if (pool_name.empty()) {
cerr << "--create-pool requested but pool_name was not specified!" << std::endl;
usage_exit();
exit(1);
}
ret = rados.ioctx_create(pool_name.c_str(), io_ctx);
if (ret < 0) {
@ -710,6 +613,19 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
goto out;
}
// set up chunkers
if (chunk_size) {
dedup_estimates.emplace(std::piecewise_construct,
std::forward_as_tuple(chunk_size),
std::forward_as_tuple(chunk_algo, cbits(chunk_size)-1));
} else {
for (size_t cs = min_chunk_size; cs <= max_chunk_size; cs *= 2) {
dedup_estimates.emplace(std::piecewise_construct,
std::forward_as_tuple(cs),
std::forward_as_tuple(chunk_algo, cbits(cs)-1));
}
}
glock.lock();
begin = io_ctx.object_list_begin();
end = io_ctx.object_list_end();
@ -728,15 +644,11 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
s = stats[pool_name];
for (unsigned i = 0; i < max_thread; i++) {
std::unique_ptr<EstimateThread> ptr (new EstimateDedupRatio(io_ctx, i, max_thread, begin, end,
chunk_algo, fp_algo, chunk_size,
report_period, s.num_objects, max_read_size));
if (chunk_algo == "rabin") {
EstimateDedupRatio *ratio = NULL;
ratio = dynamic_cast<EstimateDedupRatio*>(ptr.get());
ratio->set_rabin_options(mod_prime, rabin_prime, pow, chunk_mask_bit, window_size,
min_chunk, max_chunk);
}
std::unique_ptr<CrawlerThread> ptr (
new EstimateDedupRatio(io_ctx, i, max_thread, begin, end,
chunk_algo, fp_algo, chunk_size,
report_period, s.num_objects, max_read_size,
max_seconds));
ptr->create("estimate_thread");
estimate_threads.push_back(move(ptr));
}
@ -746,7 +658,7 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
p->join();
}
print_dedup_estimate(debug);
print_dedup_estimate(cout, chunk_algo);
out:
return (ret < 0) ? 1 : 0;
@ -793,14 +705,16 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts,
if (i != opts.end()) {
op_name= i->second.c_str();
} else {
usage_exit();
cerr << "must specify op" << std::endl;
exit(1);
}
i = opts.find("chunk-pool");
if (i != opts.end()) {
chunk_pool_name = i->second.c_str();
} else {
usage_exit();
cerr << "must specify pool" << std::endl;
exit(1);
}
i = opts.find("max-thread");
if (i != opts.end()) {
@ -843,13 +757,15 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts,
if (i != opts.end()) {
object_name = i->second.c_str();
} else {
usage_exit();
cerr << "must specify object" << std::endl;
exit(1);
}
i = opts.find("target-ref");
if (i != opts.end()) {
target_object_name = i->second.c_str();
} else {
usage_exit();
cerr << "must specify target ref" << std::endl;
exit(1);
}
i = opts.find("target-ref-pool-id");
if (i != opts.end()) {
@ -857,7 +773,8 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts,
return -EINVAL;
}
} else {
usage_exit();
cerr << "must specify target-ref-pool-id" << std::endl;
exit(1);
}
set<hobject_t> refs;
@ -892,7 +809,8 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts,
if (i != opts.end()) {
object_name = i->second.c_str();
} else {
usage_exit();
cerr << "must specify object" << std::endl;
exit(1);
}
set<hobject_t> refs;
cout << " refs: " << std::endl;
@ -922,8 +840,9 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts,
s = stats[chunk_pool_name];
for (unsigned i = 0; i < max_thread; i++) {
std::unique_ptr<EstimateThread> ptr (new ChunkScrub(io_ctx, i, max_thread, begin, end, chunk_io_ctx,
report_period, s.num_objects));
std::unique_ptr<CrawlerThread> ptr (
new ChunkScrub(io_ctx, i, max_thread, begin, end, chunk_io_ctx,
report_period, s.num_objects));
ptr->create("estimate_thread");
estimate_threads.push_back(move(ptr));
}
@ -991,26 +910,20 @@ int main(int argc, const char **argv)
} else if (ceph_argparse_witharg(args, i, &val, "--report-period", (char*)NULL)) {
opts["report-period"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--max-read-size", (char*)NULL)) {
opts["max-read-size"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--mod-prime", (char*)NULL)) {
opts["mod-prime"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--rabin-prime", (char*)NULL)) {
opts["rabin-prime"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--pow", (char*)NULL)) {
opts["pow"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--chunk-mask-bit", (char*)NULL)) {
opts["chunk-mask-bit"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--window-size", (char*)NULL)) {
opts["window-size"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--min-chunk", (char*)NULL)) {
opts["min-chunk"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--max-chunk", (char*)NULL)) {
opts["max-chunk"] = val;
opts["max-seconds"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--max-seconds", (char*)NULL)) {
opts["max-seconds"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--min-chunk-size", (char*)NULL)) {
opts["min-chunk-size"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--max-chunk-size", (char*)NULL)) {
opts["max-chunk-size"] = val;
} else if (ceph_argparse_flag(args, i, "--debug", (char*)NULL)) {
opts["debug"] = "true";
} else {
if (val[0] == '-')
usage_exit();
if (val[0] == '-') {
cerr << "unrecognized option " << val << std::endl;
exit(1);
}
++i;
}
}