mirror of
https://github.com/ceph/ceph
synced 2025-01-02 00:52:22 +00:00
Merge pull request #52151 from myoungwon/wip-ceph-dedup-tool-memory
tools/ceph-dedup-tool: set memory threshold not to cause excessive memory usage Reviewed-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
commit
6adc03ac37
@ -515,6 +515,76 @@ function test_sample_dedup_snap()
|
||||
$CEPH_TOOL osd pool delete $CHUNK_POOL $CHUNK_POOL --yes-i-really-really-mean-it
|
||||
}
|
||||
|
||||
function test_dedup_memory_limit()
|
||||
{
|
||||
CHUNK_POOL=dedup_chunk_pool
|
||||
$CEPH_TOOL osd pool delete $POOL $POOL --yes-i-really-really-mean-it
|
||||
$CEPH_TOOL osd pool delete $CHUNK_POOL $CHUNK_POOL --yes-i-really-really-mean-it
|
||||
|
||||
sleep 2
|
||||
|
||||
run_expect_succ "$CEPH_TOOL" osd pool create "$POOL" 8
|
||||
run_expect_succ "$CEPH_TOOL" osd pool create "$CHUNK_POOL" 8
|
||||
|
||||
# 6 dedupable objects
|
||||
CONTENT_1="There hiHI"
|
||||
echo $CONTENT_1 > foo
|
||||
for num in `seq 1 6`
|
||||
do
|
||||
$RADOS_TOOL -p $POOL put foo_$num ./foo
|
||||
done
|
||||
|
||||
# 3 Unique objects
|
||||
for num in `seq 7 9`
|
||||
do
|
||||
CONTENT_="There hiHI"$num
|
||||
echo $CONTENT_ > foo
|
||||
$RADOS_TOOL -p $POOL put foo_$num ./foo
|
||||
done
|
||||
|
||||
# 6 dedupable objects
|
||||
CONTENT_2="There hiHIhi"
|
||||
echo $CONTENT_2 > foo
|
||||
for num in `seq 10 15`
|
||||
do
|
||||
$RADOS_TOOL -p $POOL put foo_$num ./foo
|
||||
done
|
||||
|
||||
#Since the memory limit is 100 bytes, adding 3 unique objects causes a memory drop, leaving
|
||||
#the chunk of the 6 dupable objects. If we then add 6 dedupable objects to the pool,
|
||||
#the crawler should find dedupable chunks because it free memory space through the memory drop before.
|
||||
# 1 entry == 46 bytes
|
||||
|
||||
sleep 2
|
||||
|
||||
# Execute dedup crawler
|
||||
RESULT=$($DEDUP_TOOL --pool $POOL --chunk-pool $CHUNK_POOL --op sample-dedup --chunk-algorithm fastcdc --fingerprint-algorithm sha1 --chunk-dedup-threshold 2 --sampling-ratio 100 --fpstore-threshold 100)
|
||||
|
||||
CHUNK_OID_1=$(echo $CONTENT_1 | sha1sum | awk '{print $1}')
|
||||
CHUNK_OID_2=$(echo $CONTENT_2 | sha1sum | awk '{print $1}')
|
||||
|
||||
RESULT=$($DEDUP_TOOL --op dump-chunk-refs --chunk-pool $CHUNK_POOL --object $CHUNK_OID_1 | grep foo)
|
||||
if [ -z "$RESULT" ] ; then
|
||||
$CEPH_TOOL osd pool delete $POOL $POOL --yes-i-really-really-mean-it
|
||||
$CEPH_TOOL osd pool delete $CHUNK_POOL $CHUNK_POOL --yes-i-really-really-mean-it
|
||||
die "There is no expected chunk object"
|
||||
fi
|
||||
|
||||
RESULT=$($DEDUP_TOOL --op dump-chunk-refs --chunk-pool $CHUNK_POOL --object $CHUNK_OID_2 | grep foo)
|
||||
if [ -z "$RESULT" ] ; then
|
||||
$CEPH_TOOL osd pool delete $POOL $POOL --yes-i-really-really-mean-it
|
||||
$CEPH_TOOL osd pool delete $CHUNK_POOL $CHUNK_POOL --yes-i-really-really-mean-it
|
||||
die "There is no expected chunk object"
|
||||
fi
|
||||
|
||||
rm -rf ./foo
|
||||
for num in `seq 1 15`
|
||||
do
|
||||
$RADOS_TOOL -p $POOL rm foo_$num
|
||||
done
|
||||
|
||||
$CEPH_TOOL osd pool delete $CHUNK_POOL $CHUNK_POOL --yes-i-really-really-mean-it
|
||||
}
|
||||
|
||||
test_dedup_ratio_fixed
|
||||
test_dedup_chunk_scrub
|
||||
@ -522,6 +592,7 @@ test_dedup_chunk_repair
|
||||
test_dedup_object
|
||||
test_sample_dedup
|
||||
test_sample_dedup_snap
|
||||
test_dedup_memory_limit
|
||||
|
||||
$CEPH_TOOL osd pool delete $POOL $POOL --yes-i-really-really-mean-it
|
||||
|
||||
|
@ -180,11 +180,12 @@ po::options_description make_usage() {
|
||||
("snap", ": deduplciate snapshotted object")
|
||||
("debug", ": enable debug")
|
||||
("pgid", ": set pgid")
|
||||
("chunk-dedup-threshold", po::value<uint32_t>(), ": set the threshold for chunk dedup (number of duplication) ")
|
||||
("chunk-dedup-threshold", po::value<size_t>(), ": set the threshold for chunk dedup (number of duplication) ")
|
||||
("sampling-ratio", po::value<int>(), ": set the sampling ratio (percentile)")
|
||||
("daemon", ": execute sample dedup in daemon mode")
|
||||
("loop", ": execute sample dedup in a loop until terminated. Sleeps 'wakeup-period' seconds between iterations")
|
||||
("wakeup-period", po::value<int>(), ": set the wakeup period of crawler thread (sec)")
|
||||
("fpstore-threshold", po::value<size_t>()->default_value(100_M), ": set max size of in-memory fingerprint store (bytes)")
|
||||
;
|
||||
desc.add(op_desc);
|
||||
return desc;
|
||||
@ -566,10 +567,135 @@ public:
|
||||
bufferlist data;
|
||||
};
|
||||
|
||||
using dup_count_t = size_t;
|
||||
|
||||
template <typename K, typename V>
|
||||
class FpMap {
|
||||
using map_t = std::unordered_map<K, V>;
|
||||
public:
|
||||
/// Represents a nullable reference into logical container
|
||||
class entry_t {
|
||||
/// Entry may be into one of two maps or NONE, indicates which
|
||||
enum entry_into_t {
|
||||
UNDER, OVER, NONE
|
||||
} entry_into = NONE;
|
||||
|
||||
/// Valid iterator into map for UNDER|OVER, default for NONE
|
||||
map_t::iterator iter;
|
||||
|
||||
entry_t(entry_into_t entry_into, map_t::iterator iter) :
|
||||
entry_into(entry_into), iter(iter) {
|
||||
ceph_assert(entry_into != NONE);
|
||||
}
|
||||
|
||||
public:
|
||||
entry_t() = default;
|
||||
|
||||
auto &operator*() {
|
||||
ceph_assert(entry_into != NONE);
|
||||
return *iter;
|
||||
}
|
||||
auto operator->() {
|
||||
ceph_assert(entry_into != NONE);
|
||||
return iter.operator->();
|
||||
}
|
||||
bool is_valid() const {
|
||||
return entry_into != NONE;
|
||||
}
|
||||
bool is_above_threshold() const {
|
||||
return entry_into == entry_t::OVER;
|
||||
}
|
||||
friend class FpMap;
|
||||
};
|
||||
|
||||
/// inserts str, count into container, must not already be present
|
||||
entry_t insert(const K &str, V count) {
|
||||
std::pair<typename map_t::iterator, bool> r;
|
||||
typename entry_t::entry_into_t s;
|
||||
if (count < dedup_threshold) {
|
||||
r = under_threshold_fp_map.insert({str, count});
|
||||
s = entry_t::UNDER;
|
||||
} else {
|
||||
r = over_threshold_fp_map.insert({str, count});
|
||||
s = entry_t::OVER;
|
||||
}
|
||||
ceph_assert(r.second);
|
||||
return entry_t{s, r.first};
|
||||
}
|
||||
|
||||
/// increments refcount for entry, promotes as necessary, entry must be valid
|
||||
entry_t increment_reference(entry_t entry) {
|
||||
ceph_assert(entry.is_valid());
|
||||
entry.iter->second++;
|
||||
if (entry.entry_into == entry_t::OVER ||
|
||||
entry.iter->second < dedup_threshold) {
|
||||
return entry;
|
||||
} else {
|
||||
auto [over_iter, inserted] = over_threshold_fp_map.insert(
|
||||
*entry);
|
||||
ceph_assert(inserted);
|
||||
under_threshold_fp_map.erase(entry.iter);
|
||||
return entry_t{entry_t::OVER, over_iter};
|
||||
}
|
||||
}
|
||||
|
||||
/// returns entry for fp, return will be !is_valid() if not present
|
||||
auto find(const K &fp) {
|
||||
if (auto iter = under_threshold_fp_map.find(fp);
|
||||
iter != under_threshold_fp_map.end()) {
|
||||
return entry_t{entry_t::UNDER, iter};
|
||||
} else if (auto iter = over_threshold_fp_map.find(fp);
|
||||
iter != over_threshold_fp_map.end()) {
|
||||
return entry_t{entry_t::OVER, iter};
|
||||
} else {
|
||||
return entry_t{};
|
||||
}
|
||||
}
|
||||
|
||||
/// true if container contains fp
|
||||
bool contains(const K &fp) {
|
||||
return find(fp).is_valid();
|
||||
}
|
||||
|
||||
/// returns number of items
|
||||
size_t get_num_items() const {
|
||||
return under_threshold_fp_map.size() + over_threshold_fp_map.size();
|
||||
}
|
||||
|
||||
/// returns estimate of total in-memory size (bytes)
|
||||
size_t estimate_total_size() const {
|
||||
size_t total = 0;
|
||||
if (!under_threshold_fp_map.empty()) {
|
||||
total += under_threshold_fp_map.size() *
|
||||
(under_threshold_fp_map.begin()->first.size() + sizeof(V));
|
||||
}
|
||||
if (!over_threshold_fp_map.empty()) {
|
||||
total += over_threshold_fp_map.size() *
|
||||
(over_threshold_fp_map.begin()->first.size() + sizeof(V));
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/// true if empty
|
||||
bool empty() const {
|
||||
return under_threshold_fp_map.empty() && over_threshold_fp_map.empty();
|
||||
}
|
||||
|
||||
/// instructs container to drop entries with refcounts below threshold
|
||||
void drop_entries_below_threshold() {
|
||||
under_threshold_fp_map.clear();
|
||||
}
|
||||
|
||||
FpMap(size_t dedup_threshold) : dedup_threshold(dedup_threshold) {}
|
||||
FpMap() = delete;
|
||||
private:
|
||||
map_t under_threshold_fp_map;
|
||||
map_t over_threshold_fp_map;
|
||||
const size_t dedup_threshold;
|
||||
};
|
||||
|
||||
class FpStore {
|
||||
public:
|
||||
using dup_count_t = ssize_t;
|
||||
|
||||
void maybe_print_status() {
|
||||
utime_t now = ceph_clock_now();
|
||||
if (next_report != utime_t() && now > next_report) {
|
||||
@ -581,51 +707,61 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
bool find(string& fp) {
|
||||
bool contains(string& fp) {
|
||||
std::shared_lock lock(fingerprint_lock);
|
||||
auto found_item = fp_map.find(fp);
|
||||
return found_item != fp_map.end();
|
||||
return fp_map.contains(fp);
|
||||
}
|
||||
|
||||
// return true if the chunk is duplicate
|
||||
bool add(chunk_t& chunk) {
|
||||
std::unique_lock lock(fingerprint_lock);
|
||||
auto found_iter = fp_map.find(chunk.fingerprint);
|
||||
ssize_t cur_reference = 1;
|
||||
auto entry = fp_map.find(chunk.fingerprint);
|
||||
total_bytes += chunk.size;
|
||||
maybe_print_status();
|
||||
if (found_iter == fp_map.end()) {
|
||||
fp_map.insert({chunk.fingerprint, 1});
|
||||
if (!entry.is_valid()) {
|
||||
if (is_fpmap_full()) {
|
||||
fp_map.drop_entries_below_threshold();
|
||||
if (is_fpmap_full()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
entry = fp_map.insert(chunk.fingerprint, 1);
|
||||
} else {
|
||||
cur_reference = ++found_iter->second;
|
||||
entry = fp_map.increment_reference(entry);
|
||||
}
|
||||
return cur_reference >= dedup_threshold && dedup_threshold != -1;
|
||||
return entry.is_above_threshold();
|
||||
}
|
||||
|
||||
FpStore(size_t chunk_threshold, uint32_t report_period) :
|
||||
dedup_threshold(chunk_threshold), report_period(report_period) {
|
||||
next_report = start;
|
||||
next_report += report_period;
|
||||
bool is_fpmap_full() const {
|
||||
return fp_map.estimate_total_size() >= memory_threshold;
|
||||
}
|
||||
|
||||
FpStore(size_t chunk_threshold,
|
||||
uint32_t report_period,
|
||||
size_t memory_threshold) :
|
||||
report_period(report_period),
|
||||
memory_threshold(memory_threshold),
|
||||
fp_map(chunk_threshold) { }
|
||||
FpStore() = delete;
|
||||
|
||||
private:
|
||||
ssize_t dedup_threshold = -1;
|
||||
std::unordered_map<std::string, dup_count_t> fp_map;
|
||||
std::shared_mutex fingerprint_lock;
|
||||
const utime_t start = ceph_clock_now();
|
||||
utime_t next_report;
|
||||
const uint32_t report_period;
|
||||
size_t total_bytes = 0;
|
||||
const size_t memory_threshold;
|
||||
FpMap<std::string, dup_count_t> fp_map;
|
||||
};
|
||||
|
||||
struct SampleDedupGlobal {
|
||||
FpStore fp_store;
|
||||
const double sampling_ratio = -1;
|
||||
SampleDedupGlobal(
|
||||
int chunk_threshold,
|
||||
size_t chunk_threshold,
|
||||
int sampling_ratio,
|
||||
uint32_t report_period) :
|
||||
fp_store(chunk_threshold, report_period),
|
||||
uint32_t report_period,
|
||||
size_t fpstore_threshold) :
|
||||
fp_store(chunk_threshold, report_period, fpstore_threshold),
|
||||
sampling_ratio(static_cast<double>(sampling_ratio) / 100) { }
|
||||
};
|
||||
|
||||
@ -836,7 +972,7 @@ void SampleDedupWorkerThread::try_dedup_and_accumulate_result(
|
||||
.data = chunk_data
|
||||
};
|
||||
|
||||
if (sample_dedup_global.fp_store.find(fingerprint)) {
|
||||
if (sample_dedup_global.fp_store.contains(fingerprint)) {
|
||||
duplicated_size += chunk_data.length();
|
||||
}
|
||||
if (sample_dedup_global.fp_store.add(chunk_info)) {
|
||||
@ -1620,7 +1756,7 @@ int make_crawling_daemon(const po::variables_map &opts)
|
||||
|
||||
uint32_t chunk_dedup_threshold = -1;
|
||||
if (opts.count("chunk-dedup-threshold")) {
|
||||
chunk_dedup_threshold = opts["chunk-dedup-threshold"].as<uint32_t>();
|
||||
chunk_dedup_threshold = opts["chunk-dedup-threshold"].as<size_t>();
|
||||
}
|
||||
|
||||
std::string chunk_algo = get_opts_chunk_algo(opts);
|
||||
@ -1643,6 +1779,8 @@ int make_crawling_daemon(const po::variables_map &opts)
|
||||
cout << "100 second is set as wakeup period by default" << std::endl;
|
||||
}
|
||||
|
||||
const size_t fp_threshold = opts["fpstore-threshold"].as<size_t>();
|
||||
|
||||
std::string fp_algo = get_opts_fp_algo(opts);
|
||||
|
||||
list<string> pool_names;
|
||||
@ -1714,7 +1852,7 @@ int make_crawling_daemon(const po::variables_map &opts)
|
||||
}
|
||||
|
||||
SampleDedupWorkerThread::SampleDedupGlobal sample_dedup_global(
|
||||
chunk_dedup_threshold, sampling_ratio, report_period);
|
||||
chunk_dedup_threshold, sampling_ratio, report_period, fp_threshold);
|
||||
|
||||
std::list<SampleDedupWorkerThread> threads;
|
||||
size_t total_size = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user