dedup-tool: add basic crawling

Create crawling threads which crawl objects in base pool and deduplicate
based on their deduplication efficiency. Crawler samples objects and finds
duplicated chunks within the samples. It regards an object which has
duplicated chunks higher than object_dedup_threshold value as an efficient
object to be deduplicated. Besides the chunk which is duplicated more than
chunk_dedup_threshold times is also deduplicated.
The commit contains basic crawling which crawls all objects in base pool
instead of sampling among the objects.

[usage]
  ceph_dedup_tool --op sample-dedup --pool POOL --chunk-pool POOL \
    --chunk-altorithm ALGO --fingerprint-algorithm FP \
    --object-dedup-threshold <percentile> --chunk-dedup-threshold <number>

Signed-off-by: JinyongHa <jy200.ha@samsung.com>
This commit is contained in:
JinyongHa 2022-02-25 06:48:36 +00:00 committed by JinyongHa
parent cc63d6ee73
commit 3a5876be6e

View File

@ -155,6 +155,8 @@ po::options_description make_usage() {
": perform a chunk dedup---deduplicate only a chunk, which is a part of object.")
("op object-dedup --pool <POOL> --object <OID> --chunk-pool <POOL> --fingerprint-algorithm <FP> --dedup-cdc-chunk-size <CHUNK_SIZE> [--snap]",
": perform a object dedup---deduplicate the entire object, not a chunk. Related snapshots are also deduplicated if --snap is given")
("op sample-dedup --pool <POOL> --chunk-pool <POOL> --chunk-algorithm <ALGO> --fingerprint-algorithm <FP>",
": perform a sample dedup---make crawling threads which crawl objeccts in base pool and deduplicate them based on their deduplication efficiency")
;
po::options_description op_desc("Opational arguments");
op_desc.add_options()
@ -179,6 +181,7 @@ po::options_description make_usage() {
("snap", ": deduplciate snapshotted object")
("debug", ": enable debug")
("pgid", ": set pgid")
("chunk-dedup-threshold", po::value<uint32_t>(), ": set the threshold for chunk dedup (number of duplication) ")
;
desc.add(op_desc);
return desc;
@ -224,6 +227,8 @@ public:
io_ctx(io_ctx), n(n), m(m), begin(begin), end(end),
report_period(report_period), total_objects(num_objects), max_read_size(max_read_size)
{}
CrawlerThread() { }
void signal(int signum) {
std::lock_guard l{m_lock};
m_stop = true;
@ -525,6 +530,366 @@ void ChunkScrub::chunk_scrub_common()
cout << "--done--" << std::endl;
}
using AioCompRef = unique_ptr<AioCompletion>;
class SampleDedupWorkerThread : public CrawlerThread
{
public:
struct chunk_t {
string oid = "";
size_t start = 0;
size_t size = 0;
string fingerprint = "";
bufferlist data;
};
class FpStore {
public:
using dup_count_t = ssize_t;
bool find(string& fp) {
std::shared_lock lock(fingerprint_lock);
auto found_item = fp_map.find(fp);
if (found_item != fp_map.end()) {
return true;
}
return false;
}
// return true if the chunk is duplicate
bool add(chunk_t& chunk) {
std::unique_lock lock(fingerprint_lock);
auto found_iter = fp_map.find(chunk.fingerprint);
if (found_iter == fp_map.end()) {
auto ret = fp_map.insert({chunk.fingerprint, 1});
found_iter = ret.first;
}
auto &target = found_iter->second;
target++;
if (target >= dedup_threshold && dedup_threshold != -1) {
return true;
}
return false;
}
void init(size_t dedup_threshold_) {
std::unique_lock lock(fingerprint_lock);
fp_map.clear();
dedup_threshold = dedup_threshold_;
}
FpStore(size_t chunk_threshold) : dedup_threshold(chunk_threshold) { }
private:
ssize_t dedup_threshold = -1;
std::unordered_map<std::string, dup_count_t> fp_map;
std::shared_mutex fingerprint_lock;
};
struct SampleDedupGlobal {
FpStore fp_store;
double object_dedup_threshold = -1;
SampleDedupGlobal(
int chunk_threshold) :
fp_store(chunk_threshold) { }
};
SampleDedupWorkerThread(
IoCtx &io_ctx,
IoCtx &chunk_io_ctx,
ObjectCursor begin,
ObjectCursor end,
size_t chunk_size,
std::string &fp_algo,
std::string &chunk_algo,
SampleDedupGlobal &sample_dedup_global) :
io_ctx(io_ctx),
chunk_io_ctx(chunk_io_ctx),
chunk_size(chunk_size),
fp_type(pg_pool_t::get_fingerprint_from_str(fp_algo)),
chunk_algo(chunk_algo),
sample_dedup_global(sample_dedup_global),
begin(begin),
end(end) { }
~SampleDedupWorkerThread() { };
protected:
void* entry() override {
crawl();
return nullptr;
}
private:
void crawl();
std::tuple<std::vector<ObjectItem>, ObjectCursor> get_objects(
ObjectCursor current,
ObjectCursor end,
size_t max_object_count);
std::vector<size_t> sample_object(size_t count);
void try_dedup_and_accumulate_result(ObjectItem &object);
bool ok_to_dedup_all();
void do_object_dedup(ObjectItem &object);
int do_chunk_dedup(chunk_t &chunk);
void mark_non_dedup(ObjectCursor start, ObjectCursor end);
bufferlist read_object(ObjectItem &object);
std::vector<std::tuple<bufferlist, pair<uint64_t, uint64_t>>> do_cdc(
ObjectItem &object,
bufferlist &data);
std::string generate_fingerprint(bufferlist chunk_data);
AioCompRef do_async_evict(string oid);
IoCtx io_ctx;
IoCtx chunk_io_ctx;
std::list<chunk_t> duplicable_chunks;
size_t total_duplicated_size = 0;
size_t total_object_size = 0;
std::set<std::string> oid_for_evict;
size_t chunk_size = 0;
pg_pool_t::fingerprint_t fp_type = pg_pool_t::TYPE_FINGERPRINT_NONE;
std::string chunk_algo;
SampleDedupGlobal &sample_dedup_global;
ObjectCursor begin;
ObjectCursor end;
};
void SampleDedupWorkerThread::crawl()
{
cout << "new iteration" << std::endl;
for (ObjectCursor current_object = begin;
current_object < end;) {
std::vector<ObjectItem> objects;
// Get the list of object IDs to deduplicate
std::tie(objects, current_object) = get_objects(current_object, end, 100);
// Pick few objects to be processed. Crawling mode decides how many
// objects to pick (sampling ratio). Lower sampling ratio makes crawler
// have lower crawling overhead but find less duplication.
std::set<size_t> sampled_indexes = sample_object(objects.size());
for (size_t index : sampled_indexes) {
ObjectItem target = objects[index];
try_dedup_and_accumulate_result(target);
}
}
vector<AioCompRef> evict_completions(oid_for_evict.size());
int i = 0;
for (auto &oid : oid_for_evict) {
auto completion = do_async_evict(oid);
evict_completions[i] = move(completion);
i++;
}
for (auto &completion : evict_completions) {
completion->wait_for_complete();
}
cout << "done iteration" << std::endl;
}
AioCompRef SampleDedupWorkerThread::do_async_evict(string oid)
{
Rados rados;
ObjectReadOperation op_tier;
AioCompRef completion(rados.aio_create_completion());
op_tier.tier_evict();
io_ctx.aio_operate(
oid,
completion.get(),
&op_tier,
NULL);
return completion;
}
std::tuple<std::vector<ObjectItem>, ObjectCursor> SampleDedupWorkerThread::get_objects(
ObjectCursor current, ObjectCursor end, size_t max_object_count)
{
std::vector<ObjectItem> objects;
ObjectCursor next;
int ret = io_ctx.object_list(
current,
end,
max_object_count,
{},
&objects,
&next);
if (ret < 0 ) {
cerr << "error object_list" << std::endl;
objects.resize(0);
}
return std::make_tuple(objects, next);
}
std::set<size_t> SampleDedupWorkerThread::sample_object(size_t count)
{
std::set<size_t> indexes;
for (size_t index = 0 ; index < count ; index++) {
indexes.insert(index);
}
return indexes;
}
void SampleDedupWorkerThread::try_dedup_and_accumulate_result(ObjectItem &object)
{
bufferlist data = read_object(object);
if (data.length() == 0) {
cerr << __func__ << " skip object " << object.oid
<< " dedup (read failed)\n";
return;
}
auto chunks = do_cdc(object, data);
size_t chunk_total_amount = 0;
// First, check total size of created chunks
for (auto &chunk : chunks) {
auto &chunk_data = std::get<0>(chunk);
chunk_total_amount += chunk_data.length();
}
if (chunk_total_amount != data.length()) {
cerr << __func__ << " sum of chunked length(" << chunk_total_amount
<< ") is different from object data length(" << data.length() << ")\n";
return;
}
size_t duplicated_size = 0;
list<chunk_t> redundant_chunks;
for (auto &chunk : chunks) {
auto &chunk_data = std::get<0>(chunk);
std::string fingerprint = generate_fingerprint(chunk_data);
std::pair<uint64_t, uint64_t> chunk_boundary = std::get<1>(chunk);
chunk_t chunk_info = {
.oid = object.oid,
.start = chunk_boundary.first,
.size = chunk_boundary.second,
.fingerprint = fingerprint,
.data = chunk_data
};
if (sample_dedup_global.fp_store.find(fingerprint)) {
duplicated_size += chunk_data.length();
}
if (sample_dedup_global.fp_store.add(chunk_info)) {
redundant_chunks.push_back(chunk_info);
}
}
size_t object_size = data.length();
// perform chunk-dedup
for (auto &p : redundant_chunks) {
do_chunk_dedup(p);
}
total_duplicated_size += duplicated_size;
total_object_size += object_size;
}
bufferlist SampleDedupWorkerThread::read_object(ObjectItem &object)
{
bufferlist whole_data;
size_t offset = 0;
int ret = -1;
while (ret != 0) {
bufferlist partial_data;
ret = io_ctx.read(object.oid, partial_data, default_op_size, offset);
if (ret < 0) {
cerr << "read object error " << object.oid << " offset " << offset
<< " size " << default_op_size << " error(" << cpp_strerror(ret)
<< std::endl;
bufferlist empty_buf;
return empty_buf;
}
offset += ret;
whole_data.claim_append(partial_data);
}
return whole_data;
}
std::vector<std::tuple<bufferlist, pair<uint64_t, uint64_t>>> SampleDedupWorkerThread::do_cdc(
ObjectItem &object,
bufferlist &data)
{
std::vector<std::tuple<bufferlist, pair<uint64_t, uint64_t>>> ret;
unique_ptr<CDC> cdc = CDC::create(chunk_algo, cbits(chunk_size) - 1);
vector<pair<uint64_t, uint64_t>> chunks;
cdc->calc_chunks(data, &chunks);
for (auto &p : chunks) {
bufferlist chunk;
chunk.substr_of(data, p.first, p.second);
ret.push_back(make_tuple(chunk, p));
}
return ret;
}
std::string SampleDedupWorkerThread::generate_fingerprint(bufferlist chunk_data)
{
string ret;
switch (fp_type) {
case pg_pool_t::TYPE_FINGERPRINT_SHA1:
ret = crypto::digest<crypto::SHA1>(chunk_data).to_str();
break;
case pg_pool_t::TYPE_FINGERPRINT_SHA256:
ret = crypto::digest<crypto::SHA256>(chunk_data).to_str();
break;
case pg_pool_t::TYPE_FINGERPRINT_SHA512:
ret = crypto::digest<crypto::SHA512>(chunk_data).to_str();
break;
default:
ceph_assert(0 == "Invalid fp type");
break;
}
return ret;
}
void SampleDedupWorkerThread::do_object_dedup(ObjectItem &object)
{
ObjectReadOperation op;
op.tier_flush();
int ret = io_ctx.operate(
object.oid,
&op,
NULL);
if (ret < 0) {
cerr << " tier_flush fail : " << cpp_strerror(ret) << std::endl;
}
return;
}
int SampleDedupWorkerThread::do_chunk_dedup(chunk_t &chunk)
{
uint64_t size;
time_t mtime;
int ret = chunk_io_ctx.stat(chunk.fingerprint, &size, &mtime);
if (ret == -ENOENT) {
bufferlist bl;
bl.append(chunk.data);
ObjectWriteOperation wop;
wop.write_full(bl);
chunk_io_ctx.operate(chunk.fingerprint, &wop);
} else {
ceph_assert(ret == 0);
}
ObjectReadOperation op;
op.set_chunk(
chunk.start,
chunk.size,
chunk_io_ctx,
chunk.fingerprint,
0,
CEPH_OSD_OP_FLAG_WITH_REFERENCE);
ret = io_ctx.operate(chunk.oid, &op, nullptr);
oid_for_evict.insert(chunk.oid);
return ret;
}
void ChunkScrub::print_status(Formatter *f, ostream &out)
{
if (f) {
@ -800,8 +1165,6 @@ int chunk_scrub_common(const po::variables_map &opts)
op_name = get_opts_op_name(opts);
chunk_pool_name = get_opts_chunk_pool(opts);
max_thread = get_opts_max_thread(opts);
report_period = get_opts_report_period(opts);
boost::optional<pg_t> pgid(opts.count("pgid"), pg_t());
ret = rados.init_with_context(g_ceph_context);
@ -944,6 +1307,8 @@ int chunk_scrub_common(const po::variables_map &opts)
return 0;
}
max_thread = get_opts_max_thread(opts);
report_period = get_opts_report_period(opts);
glock.lock();
begin = chunk_io_ctx.object_list_begin();
end = chunk_io_ctx.object_list_end();
@ -1174,6 +1539,146 @@ out:
return (ret < 0) ? 1 : 0;
}
int make_crawling_daemon(const po::variables_map &opts)
{
string base_pool_name = get_opts_pool_name(opts);
string chunk_pool_name = get_opts_chunk_pool(opts);
unsigned max_thread = get_opts_max_thread(opts);
size_t chunk_size = 8192;
if (opts.count("chunk-size")) {
chunk_size = opts["chunk-size"].as<int>();
} else {
cout << "8192 is set as chunk size by default" << std::endl;
}
uint32_t chunk_dedup_threshold = -1;
if (opts.count("chunk-dedup-threshold")) {
chunk_size = opts["chunk-dedup-threshold"].as<uint32_t>();
}
std::string chunk_algo = get_opts_chunk_algo(opts);
Rados rados;
int ret = rados.init_with_context(g_ceph_context);
if (ret < 0) {
cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl;
return -EINVAL;
}
ret = rados.connect();
if (ret) {
cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl;
return -EINVAL;
}
std::string fp_algo = get_opts_fp_algo(opts);
list<string> pool_names;
IoCtx io_ctx, chunk_io_ctx;
pool_names.push_back(base_pool_name);
ret = rados.ioctx_create(base_pool_name.c_str(), io_ctx);
if (ret < 0) {
cerr << "error opening base pool "
<< base_pool_name << ": "
<< cpp_strerror(ret) << std::endl;
return -EINVAL;
}
ret = rados.ioctx_create(chunk_pool_name.c_str(), chunk_io_ctx);
if (ret < 0) {
cerr << "error opening chunk pool "
<< chunk_pool_name << ": "
<< cpp_strerror(ret) << std::endl;
return -EINVAL;
}
bufferlist inbl;
ret = rados.mon_command(
make_pool_str(base_pool_name, "fingerprint_algorithm", fp_algo),
inbl, NULL, NULL);
if (ret < 0) {
cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
return ret;
}
ret = rados.mon_command(
make_pool_str(base_pool_name, "dedup_chunk_algorithm", "fastcdc"),
inbl, NULL, NULL);
if (ret < 0) {
cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
return ret;
}
ret = rados.mon_command(
make_pool_str(base_pool_name, "dedup_cdc_chunk_size", chunk_size),
inbl, NULL, NULL);
if (ret < 0) {
cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
return ret;
}
cout << "Object Dedup Threshold : " << object_dedup_threshold << std::endl
<< "Chunk Dedup Threshold : " << chunk_dedup_threshold << std::endl
<< "Chunk Size : " << chunk_size << std::endl
<< std::endl;
while (true) {
lock_guard lock(glock);
ObjectCursor begin = io_ctx.object_list_begin();
ObjectCursor end = io_ctx.object_list_end();
map<string, librados::pool_stat_t> stats;
ret = rados.get_pool_stats(pool_names, stats);
if (ret < 0) {
cerr << "error fetching pool stats: " << cpp_strerror(ret) << std::endl;
return -EINVAL;
}
if (stats.find(base_pool_name) == stats.end()) {
cerr << "stats can not find pool name: " << base_pool_name << std::endl;
return -EINVAL;
}
bool debug = false;
if (opts.count("debug")) {
debug = true;
}
estimate_threads.clear();
SampleDedupWorkerThread::SampleDedupGlobal sample_dedup_global(
chunk_dedup_threshold);
for (unsigned i = 0; i < max_thread; i++) {
cout << " add thread.. " << std::endl;
ObjectCursor shard_start;
ObjectCursor shard_end;
io_ctx.object_list_slice(
begin,
end,
i,
max_thread,
&shard_start,
&shard_end);
unique_ptr<CrawlerThread> ptr (
new SampleDedupWorkerThread(
io_ctx,
chunk_io_ctx,
shard_start,
shard_end,
chunk_size,
fp_algo,
chunk_algo,
sample_dedup_global));
ptr->set_debug(debug);
ptr->create("sample_dedup");
estimate_threads.push_back(move(ptr));
}
for (auto &p : estimate_threads) {
p->join();
}
break;
}
return 0;
}
int main(int argc, const char **argv)
{
auto args = argv_to_vec(argc, argv);
@ -1229,6 +1734,8 @@ int main(int argc, const char **argv)
*
*/
return make_dedup_object(opts);
} else if (op_name == "sample-dedup") {
return make_crawling_daemon(opts);
} else {
cerr << "unrecognized op " << op_name << std::endl;
exit(1);