From b3a5c5a0569f95a5a755ea0dc03ec2c1ea4bdc4d Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 29 Aug 2017 15:51:56 -0400 Subject: [PATCH 01/35] rgw: MetadataManager interface takes const string refs Signed-off-by: Casey Bodley --- src/rgw/rgw_metadata.cc | 5 +++-- src/rgw/rgw_metadata.h | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/rgw/rgw_metadata.cc b/src/rgw/rgw_metadata.cc index 6c554810e98..6a275239c5f 100644 --- a/src/rgw/rgw_metadata.cc +++ b/src/rgw/rgw_metadata.cc @@ -844,12 +844,13 @@ struct list_keys_handle { RGWMetadataHandler *handler; }; -int RGWMetadataManager::list_keys_init(string& section, void **handle) +int RGWMetadataManager::list_keys_init(const string& section, void **handle) { return list_keys_init(section, string(), handle); } -int RGWMetadataManager::list_keys_init(string& section, const string& marker, void **handle) +int RGWMetadataManager::list_keys_init(const string& section, + const string& marker, void **handle) { string entry; RGWMetadataHandler *handler; diff --git a/src/rgw/rgw_metadata.h b/src/rgw/rgw_metadata.h index f6dc2db03bc..0dfc73f112d 100644 --- a/src/rgw/rgw_metadata.h +++ b/src/rgw/rgw_metadata.h @@ -353,8 +353,8 @@ public: obj_version *existing_version = NULL); int remove(string& metadata_key); - int list_keys_init(string& section, void **phandle); - int list_keys_init(string& section, const string& marker, void **phandle); + int list_keys_init(const string& section, void **phandle); + int list_keys_init(const string& section, const string& marker, void **phandle); int list_keys_next(void *handle, int max, list& keys, bool *truncated); void list_keys_complete(void *handle); From 964d966969bf757570bffb650ebb4ef515a6592e Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 30 Aug 2017 16:19:36 -0400 Subject: [PATCH 02/35] rgw: introduce RGWRadosNotifyCR for aio_notify Signed-off-by: Casey Bodley --- src/rgw/rgw_cr_rados.cc | 33 +++++++++++++++++++++++++++++++++ src/rgw/rgw_cr_rados.h | 19 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index fabf387e70f..002220f2e42 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -794,3 +794,36 @@ int RGWStatObjCR::request_complete() { return req->get_ret_status(); } + +RGWRadosNotifyCR::RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj, + bufferlist& request, uint64_t timeout_ms, + bufferlist *response) + : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj), + request(request), timeout_ms(timeout_ms), response(response) +{ + set_description() << "notify dest=" << obj; +} + +int RGWRadosNotifyCR::send_request() +{ + int r = store->get_raw_obj_ref(obj, &ref); + if (r < 0) { + lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl; + return r; + } + + set_status() << "sending request"; + + cn = stack->create_completion_notifier(); + return ref.ioctx.aio_notify(ref.oid, cn->completion(), request, + timeout_ms, response); +} + +int RGWRadosNotifyCR::request_complete() +{ + int r = cn->completion()->get_return_value(); + + set_status() << "request complete; ret=" << r; + + return r; +} diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index a82bb35b5fc..144ffbc0bf2 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -1154,4 +1154,23 @@ class RGWStatObjCR : public RGWSimpleCoroutine { int request_complete() override; }; +/// coroutine wrapper for IoCtx::aio_notify() +class RGWRadosNotifyCR : public RGWSimpleCoroutine { + RGWRados *const store; + const rgw_raw_obj obj; + bufferlist request; + const uint64_t timeout_ms; + bufferlist *response; + rgw_rados_ref ref; + boost::intrusive_ptr cn; + +public: + RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj, + bufferlist& request, uint64_t timeout_ms, + bufferlist *response); + + int send_request() override; + int request_complete() override; +}; + #endif From 4309adb36be8dff737ab2196f59743c316b12bca Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 10:51:57 -0400 Subject: [PATCH 03/35] rgw: add skeleton for BucketTrimManager Signed-off-by: Casey Bodley --- src/rgw/CMakeLists.txt | 1 + src/rgw/rgw_sync_log_trim.cc | 44 +++++++++++++++++++++++++++++++++ src/rgw/rgw_sync_log_trim.h | 47 ++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 src/rgw/rgw_sync_log_trim.cc create mode 100644 src/rgw/rgw_sync_log_trim.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 9e56be8b98f..05071c04f16 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -76,6 +76,7 @@ set(rgw_a_srcs rgw_sync_module_es.cc rgw_sync_module_es_rest.cc rgw_sync_module_log.cc + rgw_sync_log_trim.cc rgw_sync_trace.cc rgw_period_history.cc rgw_period_puller.cc diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc new file mode 100644 index 00000000000..315288edce1 --- /dev/null +++ b/src/rgw/rgw_sync_log_trim.cc @@ -0,0 +1,44 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * Author: Casey Bodley + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "rgw_sync_log_trim.h" + +#define dout_subsys ceph_subsys_rgw + +#undef dout_prefix +#define dout_prefix (*_dout << "trim: ") + +using rgw::BucketTrimConfig; + +namespace rgw { + +class BucketTrimManager::Impl { + public: + RGWRados *const store; + const BucketTrimConfig config; + + Impl(RGWRados *store, const BucketTrimConfig& config) + : store(store), config(config) + {} +}; + +BucketTrimManager::BucketTrimManager(RGWRados *store, + const BucketTrimConfig& config) + : impl(new Impl(store, config)) +{ +} +BucketTrimManager::~BucketTrimManager() = default; + +} // namespace rgw diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h new file mode 100644 index 00000000000..fb21ac7d9de --- /dev/null +++ b/src/rgw/rgw_sync_log_trim.h @@ -0,0 +1,47 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * Author: Casey Bodley + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef RGW_SYNC_LOG_TRIM_H +#define RGW_SYNC_LOG_TRIM_H + +#include + +class CephContext; +class RGWRados; + +namespace rgw { + +/// Configuration for BucketTrimManager +struct BucketTrimConfig { +}; + +/// fill out the BucketTrimConfig from the ceph context +void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config); + +/// Determines the buckets on which to focus trim activity, using two sources of +/// input: the frequency of entries read from the data changes log, and a global +/// listing of the bucket.instance metadata. This allows us to trim active +/// buckets quickly, while also ensuring that all buckets will eventually trim +class BucketTrimManager { + class Impl; + std::unique_ptr impl; + public: + BucketTrimManager(RGWRados *store, const BucketTrimConfig& config); + ~BucketTrimManager(); +}; + +} // namespace rgw + +#endif // RGW_SYNC_LOG_TRIM_H From e9a5ec9f64dd6cd163a855335b846181c3ac83d2 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 24 Aug 2017 10:01:36 -0400 Subject: [PATCH 04/35] common: introduce BoundedKeyCounter and unit test Signed-off-by: Casey Bodley --- src/common/bounded_key_counter.h | 187 ++++++++++++++++++ src/test/common/CMakeLists.txt | 6 + src/test/common/test_bounded_key_counter.cc | 200 ++++++++++++++++++++ 3 files changed, 393 insertions(+) create mode 100644 src/common/bounded_key_counter.h create mode 100644 src/test/common/test_bounded_key_counter.cc diff --git a/src/common/bounded_key_counter.h b/src/common/bounded_key_counter.h new file mode 100644 index 00000000000..e5aa52210df --- /dev/null +++ b/src/common/bounded_key_counter.h @@ -0,0 +1,187 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * Author: Casey Bodley + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef BOUNDED_KEY_COUNTER_H +#define BOUNDED_KEY_COUNTER_H + +#include +#include +#include +#include + +#include "include/assert.h" + +/** + * BoundedKeyCounter + * + * A data structure that counts the number of times a given key is inserted, + * and can return the keys with the highest counters. The number of unique keys + * is bounded by the given constructor argument, meaning that new keys will be + * rejected if they would exceed this bound. + * + * It is optimized for use where insertion is frequent, but sorted listings are + * both infrequent and tend to request a small subset of the available keys. + */ +template +class BoundedKeyCounter { + /// map type to associate keys with their counter values + using map_type = std::map; + using value_type = typename map_type::value_type; + + /// view type used for sorting key-value pairs by their counter value + using view_type = std::vector; + + /// maximum number of counters to store at once + const size_t bound; + + /// map of counters, with a maximum size given by 'bound' + map_type counters; + + /// storage for sorted key-value pairs + view_type sorted; + + /// remembers how much of the range is actually sorted + typename view_type::iterator sorted_position; + + /// invalidate view of sorted entries + void invalidate_sorted() + { + sorted_position = sorted.begin(); + sorted.clear(); + } + + /// value_type comparison function for sorting in descending order + static bool value_greater(const value_type *lhs, const value_type *rhs) + { + return lhs->second > rhs->second; + } + + /// map iterator that adapts value_type to value_type* + struct const_pointer_iterator : public map_type::const_iterator { + const_pointer_iterator(typename map_type::const_iterator i) + : map_type::const_iterator(i) {} + const value_type* operator*() const { + return &map_type::const_iterator::operator*(); + } + }; + + protected: + /// return the number of sorted entries. marked protected for unit testing + size_t get_num_sorted() const + { + using const_iterator = typename view_type::const_iterator; + return std::distance(sorted.begin(), sorted_position); + } + + public: + BoundedKeyCounter(size_t bound) + : bound(bound) + { + sorted.reserve(bound); + sorted_position = sorted.begin(); + } + + /// return the number of keys stored + size_t size() const noexcept { return counters.size(); } + + /// return the maximum number of keys + size_t capacity() const noexcept { return bound; } + + /// increment a counter for the given key and return its value. if the key was + /// not present, insert it. if the map is full, return 0 + Count insert(const Key& key, Count n = 1) + { + typename map_type::iterator i; + + if (counters.size() < bound) { + // insert new entries at count=0 + bool inserted; + std::tie(i, inserted) = counters.emplace(key, 0); + if (inserted) { + sorted.push_back(&*i); + } + } else { + // when full, refuse to insert new entries + i = counters.find(key); + if (i == counters.end()) { + return 0; + } + } + + i->second += n; // add to the counter + + // update sorted position if necessary. use a binary search for the last + // element in the sorted range that's greater than this counter + sorted_position = std::lower_bound(sorted.begin(), sorted_position, + &*i, &value_greater); + + return i->second; + } + + /// remove the given key from the map of counters + void erase(const Key& key) + { + auto i = counters.find(key); + if (i == counters.end()) { + return; + } + // removing the sorted entry would require linear search; invalidate instead + invalidate_sorted(); + + counters.erase(i); + } + + /// query the highest N key-value pairs sorted by counter value, passing each + /// in order to the given callback with arguments (Key, Count) + template + void get_highest(size_t count, Callback&& cb) + { + if (sorted.empty()) { + // initialize the vector with pointers to all key-value pairs + sorted.assign(const_pointer_iterator{counters.cbegin()}, + const_pointer_iterator{counters.cend()}); + // entire range is unsorted + assert(sorted_position == sorted.begin()); + } + + const size_t sorted_count = get_num_sorted(); + if (sorted_count < count) { + // move sorted_position to cover the requested number of entries + sorted_position = sorted.begin() + std::min(count, sorted.size()); + + // sort all entries in descending order up to the given position + std::partial_sort(sorted.begin(), sorted_position, sorted.end(), + &value_greater); + } + + // return the requested range via callback + for (const auto& pair : sorted) { + if (count-- == 0) { + return; + } + cb(pair->first, pair->second); + } + } + + /// remove all keys and counters and invalidate the sorted range + void clear() + { + invalidate_sorted(); + counters.clear(); + } +}; + +#endif // BOUNDED_KEY_COUNTER_H diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index 4b123446266..7be766079d1 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -271,3 +271,9 @@ add_ceph_unittest(unittest_backport14) add_executable(unittest_convenience test_convenience.cc) target_link_libraries(unittest_convenience ceph-common) add_ceph_unittest(unittest_convenience) + +add_executable(unittest_bounded_key_counter + test_bounded_key_counter.cc + $) +target_link_libraries(unittest_bounded_key_counter global) +add_ceph_unittest(unittest_bounded_key_counter) diff --git a/src/test/common/test_bounded_key_counter.cc b/src/test/common/test_bounded_key_counter.cc new file mode 100644 index 00000000000..a8739405147 --- /dev/null +++ b/src/test/common/test_bounded_key_counter.cc @@ -0,0 +1,200 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include "common/bounded_key_counter.h" +#include + +namespace { + +// call get_highest() and return the number of callbacks +template +size_t count_highest(BoundedKeyCounter& counter, size_t count) +{ + size_t callbacks = 0; + counter.get_highest(count, [&callbacks] (const Key& key, Count count) { + ++callbacks; + }); + return callbacks; +} + +// call get_highest() and return the key/value pairs as a vector +template >> +Vector get_highest(BoundedKeyCounter& counter, size_t count) +{ + Vector results; + counter.get_highest(count, [&results] (const Key& key, Count count) { + results.emplace_back(key, count); + }); + return results; +} + +} // anonymous namespace + +TEST(BoundedKeyCounter, Insert) +{ + BoundedKeyCounter counter(2); + EXPECT_EQ(1, counter.insert(0)); // insert new key + EXPECT_EQ(2, counter.insert(0)); // increment counter + EXPECT_EQ(7, counter.insert(0, 5)); // add 5 to counter + EXPECT_EQ(1, counter.insert(1)); // insert new key + EXPECT_EQ(0, counter.insert(2)); // reject new key +} + +TEST(BoundedKeyCounter, Erase) +{ + BoundedKeyCounter counter(10); + + counter.erase(0); // ok to erase nonexistent key + EXPECT_EQ(1, counter.insert(1, 1)); + EXPECT_EQ(2, counter.insert(2, 2)); + EXPECT_EQ(3, counter.insert(3, 3)); + counter.erase(2); + counter.erase(1); + counter.erase(3); + counter.erase(3); + EXPECT_EQ(0u, count_highest(counter, 10)); +} + +TEST(BoundedKeyCounter, Size) +{ + BoundedKeyCounter counter(4); + EXPECT_EQ(0u, counter.size()); + EXPECT_EQ(1, counter.insert(1, 1)); + EXPECT_EQ(1u, counter.size()); + EXPECT_EQ(2, counter.insert(2, 2)); + EXPECT_EQ(2u, counter.size()); + EXPECT_EQ(3, counter.insert(3, 3)); + EXPECT_EQ(3u, counter.size()); + EXPECT_EQ(4, counter.insert(4, 4)); + EXPECT_EQ(4u, counter.size()); + EXPECT_EQ(0, counter.insert(5, 5)); // reject new key + EXPECT_EQ(4u, counter.size()); // size unchanged + EXPECT_EQ(5, counter.insert(4, 1)); // update existing key + EXPECT_EQ(4u, counter.size()); // size unchanged + counter.erase(2); + EXPECT_EQ(3u, counter.size()); + counter.erase(2); // erase duplicate + EXPECT_EQ(3u, counter.size()); // size unchanged + counter.erase(4); + EXPECT_EQ(2u, counter.size()); + counter.erase(1); + EXPECT_EQ(1u, counter.size()); + counter.erase(3); + EXPECT_EQ(0u, counter.size()); + EXPECT_EQ(6, counter.insert(6, 6)); + EXPECT_EQ(1u, counter.size()); + counter.clear(); + EXPECT_EQ(0u, counter.size()); +} + +TEST(BoundedKeyCounter, GetHighest) +{ + BoundedKeyCounter counter(10); + using Vector = std::vector>; + + EXPECT_EQ(0u, count_highest(counter, 0)); // ok to request 0 + EXPECT_EQ(0u, count_highest(counter, 10)); // empty + EXPECT_EQ(0u, count_highest(counter, 999)); // ok to request count >> 10 + + EXPECT_EQ(1, counter.insert(1, 1)); + EXPECT_EQ(Vector({{1,1}}), get_highest(counter, 10)); + EXPECT_EQ(2, counter.insert(2, 2)); + EXPECT_EQ(Vector({{2,2},{1,1}}), get_highest(counter, 10)); + EXPECT_EQ(3, counter.insert(3, 3)); + EXPECT_EQ(Vector({{3,3},{2,2},{1,1}}), get_highest(counter, 10)); + EXPECT_EQ(3, counter.insert(4, 3)); // insert duplicated count=3 + // still returns 4 entries (but order of {3,3} and {4,3} is unspecified) + EXPECT_EQ(4u, count_highest(counter, 10)); + counter.erase(3); + EXPECT_EQ(Vector({{4,3},{2,2},{1,1}}), get_highest(counter, 10)); + EXPECT_EQ(0u, count_highest(counter, 0)); // requesting 0 still returns 0 +} + +TEST(BoundedKeyCounter, Clear) +{ + BoundedKeyCounter counter(2); + EXPECT_EQ(1, counter.insert(0)); // insert new key + EXPECT_EQ(1, counter.insert(1)); // insert new key + EXPECT_EQ(2u, count_highest(counter, 2)); // return 2 entries + + counter.clear(); + + EXPECT_EQ(0u, count_highest(counter, 2)); // return 0 entries + EXPECT_EQ(1, counter.insert(1)); // insert new key + EXPECT_EQ(1, counter.insert(2)); // insert new unique key + EXPECT_EQ(2u, count_highest(counter, 2)); // return 2 entries +} + +// tests for partial sort and invalidation +TEST(BoundedKeyCounter, GetNumSorted) +{ + struct MockCounter : public BoundedKeyCounter { + using BoundedKeyCounter::BoundedKeyCounter; + // expose as public for testing sort invalidations + using BoundedKeyCounter::get_num_sorted; + }; + + MockCounter counter(10); + + EXPECT_EQ(0u, counter.get_num_sorted()); + EXPECT_EQ(0u, count_highest(counter, 10)); + EXPECT_EQ(0u, counter.get_num_sorted()); + + EXPECT_EQ(2, counter.insert(2, 2)); + EXPECT_EQ(3, counter.insert(3, 3)); + EXPECT_EQ(4, counter.insert(4, 4)); + EXPECT_EQ(0u, counter.get_num_sorted()); + + EXPECT_EQ(0u, count_highest(counter, 0)); + EXPECT_EQ(0u, counter.get_num_sorted()); + EXPECT_EQ(1u, count_highest(counter, 1)); + EXPECT_EQ(1u, counter.get_num_sorted()); + EXPECT_EQ(2u, count_highest(counter, 2)); + EXPECT_EQ(2u, counter.get_num_sorted()); + EXPECT_EQ(3u, count_highest(counter, 10)); + EXPECT_EQ(3u, counter.get_num_sorted()); + + EXPECT_EQ(1, counter.insert(1, 1)); // insert at bottom does not invalidate + EXPECT_EQ(3u, counter.get_num_sorted()); + + EXPECT_EQ(4u, count_highest(counter, 10)); + EXPECT_EQ(4u, counter.get_num_sorted()); + + EXPECT_EQ(5, counter.insert(5, 5)); // insert at top invalidates sort + EXPECT_EQ(0u, counter.get_num_sorted()); + + EXPECT_EQ(0u, count_highest(counter, 0)); + EXPECT_EQ(0u, counter.get_num_sorted()); + EXPECT_EQ(1u, count_highest(counter, 1)); + EXPECT_EQ(1u, counter.get_num_sorted()); + EXPECT_EQ(2u, count_highest(counter, 2)); + EXPECT_EQ(2u, counter.get_num_sorted()); + EXPECT_EQ(3u, count_highest(counter, 3)); + EXPECT_EQ(3u, counter.get_num_sorted()); + EXPECT_EQ(4u, count_highest(counter, 4)); + EXPECT_EQ(4u, counter.get_num_sorted()); + EXPECT_EQ(5u, count_highest(counter, 10)); + EXPECT_EQ(5u, counter.get_num_sorted()); + + // updating an existing counter only invalidates entries <= that counter + EXPECT_EQ(2, counter.insert(1)); // invalidates {1,2} and {2,2} + EXPECT_EQ(3u, counter.get_num_sorted()); + + EXPECT_EQ(5u, count_highest(counter, 10)); + EXPECT_EQ(5u, counter.get_num_sorted()); + + counter.clear(); // invalidates sort + EXPECT_EQ(0u, counter.get_num_sorted()); +} + From b4249cc432b5b74e5c8f545bdc7daddbc9e60d23 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 10:57:41 -0400 Subject: [PATCH 05/35] rgw: BucketTrimManager implements BucketChangeObserver Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 19 ++++++++++++++++++- src/rgw/rgw_sync_log_trim.h | 15 ++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 315288edce1..cd177dc5747 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -13,6 +13,9 @@ * Foundation. See file COPYING. */ +#include + +#include "common/bounded_key_counter.h" #include "rgw_sync_log_trim.h" #define dout_subsys ceph_subsys_rgw @@ -21,6 +24,7 @@ #define dout_prefix (*_dout << "trim: ") using rgw::BucketTrimConfig; +using BucketChangeCounter = BoundedKeyCounter; namespace rgw { @@ -29,8 +33,15 @@ class BucketTrimManager::Impl { RGWRados *const store; const BucketTrimConfig config; + /// count frequency of bucket instance entries in the data changes log + BucketChangeCounter counter; + + /// protect data shared between data sync and trim threads + std::mutex mutex; + Impl(RGWRados *store, const BucketTrimConfig& config) - : store(store), config(config) + : store(store), config(config), + counter(config.counter_size) {} }; @@ -41,4 +52,10 @@ BucketTrimManager::BucketTrimManager(RGWRados *store, } BucketTrimManager::~BucketTrimManager() = default; +void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket) +{ + std::lock_guard lock(impl->mutex); + impl->counter.insert(bucket.to_string()); +} + } // namespace rgw diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h index fb21ac7d9de..2819eeb0bc4 100644 --- a/src/rgw/rgw_sync_log_trim.h +++ b/src/rgw/rgw_sync_log_trim.h @@ -17,14 +17,24 @@ #define RGW_SYNC_LOG_TRIM_H #include +#include class CephContext; class RGWRados; namespace rgw { +/// Interface to inform the trim process about which buckets are most active +struct BucketChangeObserver { + virtual ~BucketChangeObserver() = default; + + virtual void on_bucket_changed(const boost::string_view& bucket_instance) = 0; +}; + /// Configuration for BucketTrimManager struct BucketTrimConfig { + /// maximum number of buckets to track with BucketChangeObserver + size_t counter_size{0}; }; /// fill out the BucketTrimConfig from the ceph context @@ -34,12 +44,15 @@ void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config); /// input: the frequency of entries read from the data changes log, and a global /// listing of the bucket.instance metadata. This allows us to trim active /// buckets quickly, while also ensuring that all buckets will eventually trim -class BucketTrimManager { +class BucketTrimManager : public BucketChangeObserver { class Impl; std::unique_ptr impl; public: BucketTrimManager(RGWRados *store, const BucketTrimConfig& config); ~BucketTrimManager(); + + /// increment a counter for the given bucket instance + void on_bucket_changed(const boost::string_view& bucket_instance) override; }; } // namespace rgw From f96d9a8e22195bfb3347b5add7a4385895d36f9c Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 11:26:01 -0400 Subject: [PATCH 06/35] rgw: add BucketTrimWatcher to serve watch/notify apis Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 141 ++++++++++++++++++++++++++++++++++- src/rgw/rgw_sync_log_trim.h | 2 + 2 files changed, 141 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index cd177dc5747..462b467acea 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -14,9 +14,13 @@ */ #include +#include #include "common/bounded_key_counter.h" +#include "common/errno.h" #include "rgw_sync_log_trim.h" +#include "rgw_rados.h" +#include "include/assert.h" #define dout_subsys ceph_subsys_rgw @@ -26,6 +30,127 @@ using rgw::BucketTrimConfig; using BucketChangeCounter = BoundedKeyCounter; + +// watch/notify api for gateways to coordinate about which buckets to trim +enum TrimNotifyType { +}; +WRITE_RAW_ENCODER(TrimNotifyType); + +struct TrimNotifyHandler { + virtual ~TrimNotifyHandler() = default; + + virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0; +}; + +/// rados watcher for bucket trim notifications +class BucketTrimWatcher : public librados::WatchCtx2 { + RGWRados *const store; + const rgw_raw_obj& obj; + rgw_rados_ref ref; + uint64_t handle{0}; + + using HandlerPtr = std::unique_ptr; + boost::container::flat_map handlers; + + public: + BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj) + : store(store), obj(obj) + { + } + + ~BucketTrimWatcher() + { + stop(); + } + + int start() + { + int r = store->get_raw_obj_ref(obj, &ref); + if (r < 0) { + return r; + } + + // register a watch on the realm's control object + r = ref.ioctx.watch2(ref.oid, &handle, this); + if (r == -ENOENT) { + constexpr bool exclusive = true; + r = ref.ioctx.create(ref.oid, exclusive); + if (r == -EEXIST || r == 0) { + r = ref.ioctx.watch2(ref.oid, &handle, this); + } + } + if (r < 0) { + lderr(store->ctx()) << "Failed to watch " << ref.oid + << " with " << cpp_strerror(-r) << dendl; + ref.ioctx.close(); + return r; + } + + ldout(store->ctx(), 10) << "Watching " << ref.oid << dendl; + return 0; + } + + int restart() + { + int r = ref.ioctx.unwatch2(handle); + if (r < 0) { + lderr(store->ctx()) << "Failed to unwatch on " << ref.oid + << " with " << cpp_strerror(-r) << dendl; + } + r = ref.ioctx.watch2(ref.oid, &handle, this); + if (r < 0) { + lderr(store->ctx()) << "Failed to restart watch on " << ref.oid + << " with " << cpp_strerror(-r) << dendl; + ref.ioctx.close(); + } + return r; + } + + void stop() + { + ref.ioctx.unwatch2(handle); + ref.ioctx.close(); + } + + /// respond to bucket trim notifications + void handle_notify(uint64_t notify_id, uint64_t cookie, + uint64_t notifier_id, bufferlist& bl) override + { + if (cookie != handle) { + return; + } + bufferlist reply; + try { + auto p = bl.begin(); + TrimNotifyType type; + ::decode(type, p); + + auto handler = handlers.find(type); + if (handler != handlers.end()) { + handler->second->handle(p, reply); + } else { + lderr(store->ctx()) << "no handler for notify type " << type << dendl; + } + } catch (const buffer::error& e) { + lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl; + } + ref.ioctx.notify_ack(ref.oid, notify_id, cookie, reply); + } + + /// reestablish the watch if it gets disconnected + void handle_error(uint64_t cookie, int err) override + { + if (cookie != handle) { + return; + } + if (err == -ENOTCONN) { + ldout(store->ctx(), 4) << "Disconnected watch on " << ref.oid << dendl; + restart(); + } + } +}; + + namespace rgw { class BucketTrimManager::Impl { @@ -33,15 +158,22 @@ class BucketTrimManager::Impl { RGWRados *const store; const BucketTrimConfig config; + const rgw_raw_obj status_obj; + /// count frequency of bucket instance entries in the data changes log BucketChangeCounter counter; - /// protect data shared between data sync and trim threads + /// serve the bucket trim watch/notify api + BucketTrimWatcher watcher; + + /// protect data shared between data sync, trim, and watch/notify threads std::mutex mutex; Impl(RGWRados *store, const BucketTrimConfig& config) : store(store), config(config), - counter(config.counter_size) + status_obj(store->get_zone_params().log_pool, "bilog.trim"), + counter(config.counter_size), + watcher(store, status_obj) {} }; @@ -52,6 +184,11 @@ BucketTrimManager::BucketTrimManager(RGWRados *store, } BucketTrimManager::~BucketTrimManager() = default; +int BucketTrimManager::init() +{ + return impl->watcher.start(); +} + void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket) { std::lock_guard lock(impl->mutex); diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h index 2819eeb0bc4..6110a7aa80e 100644 --- a/src/rgw/rgw_sync_log_trim.h +++ b/src/rgw/rgw_sync_log_trim.h @@ -51,6 +51,8 @@ class BucketTrimManager : public BucketChangeObserver { BucketTrimManager(RGWRados *store, const BucketTrimConfig& config); ~BucketTrimManager(); + int init(); + /// increment a counter for the given bucket instance void on_bucket_changed(const boost::string_view& bucket_instance) override; }; From 5bcf109eac30780cfa9ae5d524d2bde638651f40 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 11:29:55 -0400 Subject: [PATCH 07/35] rgw: add TrimCounters api to BucketTrimWatcher Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 131 +++++++++++++++++++++++++++++++++-- 1 file changed, 127 insertions(+), 4 deletions(-) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 462b467acea..09017290dfd 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -18,8 +18,8 @@ #include "common/bounded_key_counter.h" #include "common/errno.h" -#include "rgw_sync_log_trim.h" #include "rgw_rados.h" +#include "rgw_sync_log_trim.h" #include "include/assert.h" #define dout_subsys ceph_subsys_rgw @@ -33,6 +33,7 @@ using BucketChangeCounter = BoundedKeyCounter; // watch/notify api for gateways to coordinate about which buckets to trim enum TrimNotifyType { + NotifyTrimCounters = 0, }; WRITE_RAW_ENCODER(TrimNotifyType); @@ -42,6 +43,115 @@ struct TrimNotifyHandler { virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0; }; +/// api to share the bucket trim counters between gateways in the same zone. +/// each gateway will process different datalog shards, so the gateway that runs +/// the trim process needs to accumulate their counters +struct TrimCounters { + /// counter for a single bucket + struct BucketCounter { + std::string bucket; + int count{0}; + + BucketCounter() = default; + BucketCounter(const std::string& bucket, int count) + : bucket(bucket), count(count) {} + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& p); + }; + using Vector = std::vector; + + /// request bucket trim counters from peer gateways + struct Request { + uint16_t max_buckets; //< maximum number of bucket counters to return + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& p); + }; + + /// return the current bucket trim counters + struct Response { + Vector bucket_counters; + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& p); + }; + + /// server interface to query the hottest buckets + struct Server { + virtual ~Server() = default; + + virtual void get_bucket_counters(int count, Vector& counters) = 0; + }; + + /// notify handler + class Handler : public TrimNotifyHandler { + Server *const server; + public: + Handler(Server *server) : server(server) {} + + void handle(bufferlist::iterator& input, bufferlist& output) override; + }; +}; +std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs) +{ + return out << rhs.bucket << ":" << rhs.count; +} + +void TrimCounters::BucketCounter::encode(bufferlist& bl) const +{ + // no versioning to save space + ::encode(bucket, bl); + ::encode(count, bl); +} +void TrimCounters::BucketCounter::decode(bufferlist::iterator& p) +{ + ::decode(bucket, p); + ::decode(count, p); +} +WRITE_CLASS_ENCODER(TrimCounters::BucketCounter); + +void TrimCounters::Request::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(max_buckets, bl); + ENCODE_FINISH(bl); +} +void TrimCounters::Request::decode(bufferlist::iterator& p) +{ + DECODE_START(1, p); + ::decode(max_buckets, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimCounters::Request); + +void TrimCounters::Response::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(bucket_counters, bl); + ENCODE_FINISH(bl); +} +void TrimCounters::Response::decode(bufferlist::iterator& p) +{ + DECODE_START(1, p); + ::decode(bucket_counters, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimCounters::Response); + +void TrimCounters::Handler::handle(bufferlist::iterator& input, + bufferlist& output) +{ + Request request; + ::decode(request, input); + auto count = std::min(request.max_buckets, 128); + + Response response; + server->get_bucket_counters(count, response.bucket_counters); + ::encode(response, output); +} + + /// rados watcher for bucket trim notifications class BucketTrimWatcher : public librados::WatchCtx2 { RGWRados *const store; @@ -53,9 +163,11 @@ class BucketTrimWatcher : public librados::WatchCtx2 { boost::container::flat_map handlers; public: - BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj) + BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj, + TrimCounters::Server *counters) : store(store), obj(obj) { + handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters)); } ~BucketTrimWatcher() @@ -153,7 +265,7 @@ class BucketTrimWatcher : public librados::WatchCtx2 { namespace rgw { -class BucketTrimManager::Impl { +class BucketTrimManager::Impl : public TrimCounters::Server { public: RGWRados *const store; const BucketTrimConfig config; @@ -173,8 +285,19 @@ class BucketTrimManager::Impl { : store(store), config(config), status_obj(store->get_zone_params().log_pool, "bilog.trim"), counter(config.counter_size), - watcher(store, status_obj) + watcher(store, status_obj, this) {} + + /// TrimCounters::Server interface for watch/notify api + void get_bucket_counters(int count, TrimCounters::Vector& buckets) + { + buckets.reserve(count); + std::lock_guard lock(mutex); + counter.get_highest(count, [&buckets] (const std::string& key, int count) { + buckets.emplace_back(key, count); + }); + ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl; + } }; BucketTrimManager::BucketTrimManager(RGWRados *store, From 129fc99d5208279029ff1722d21f0ad24c37db62 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 12:22:47 -0400 Subject: [PATCH 08/35] rgw: add BucketTrimPollCR for interval and lease logic Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 55 ++++++++++++++++++++++++++++++++++++ src/rgw/rgw_sync_log_trim.h | 6 ++++ 2 files changed, 61 insertions(+) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 09017290dfd..05b6e247ed6 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -18,8 +18,11 @@ #include "common/bounded_key_counter.h" #include "common/errno.h" +#include "rgw_cr_rados.h" #include "rgw_rados.h" #include "rgw_sync_log_trim.h" + +#include #include "include/assert.h" #define dout_subsys ceph_subsys_rgw @@ -263,6 +266,53 @@ class BucketTrimWatcher : public librados::WatchCtx2 { }; +class BucketTrimPollCR : public RGWCoroutine { + RGWRados *const store; + const BucketTrimConfig& config; + const rgw_raw_obj& obj; + const std::string name{"trim"}; //< lock name + const std::string cookie; + + public: + BucketTrimPollCR(RGWRados *store, const BucketTrimConfig& config, + const rgw_raw_obj& obj) + : RGWCoroutine(store->ctx()), store(store), config(config), obj(obj), + cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)) + {} + + int operate(); +}; + +int BucketTrimPollCR::operate() +{ + reenter(this) { + for (;;) { + set_status("sleeping"); + wait(utime_t{config.trim_interval_sec, 0}); + + // prevent others from trimming for our entire wait interval + set_status("acquiring trim lock"); + yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, + obj, name, cookie, + config.trim_interval_sec)); + if (retcode < 0) { + ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl; + continue; + } + + set_status("trimming"); + // TODO: spawn trim logic + if (retcode < 0) { + // on errors, unlock so other gateways can try + set_status("unlocking"); + yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store, + obj, name, cookie)); + } + } + } + return 0; +} + namespace rgw { class BucketTrimManager::Impl : public TrimCounters::Server { @@ -318,4 +368,9 @@ void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket) impl->counter.insert(bucket.to_string()); } +RGWCoroutine* BucketTrimManager::create_bucket_trim_cr() +{ + return new BucketTrimPollCR(impl->store, impl->config, impl->status_obj); +} + } // namespace rgw diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h index 6110a7aa80e..fb69512e97b 100644 --- a/src/rgw/rgw_sync_log_trim.h +++ b/src/rgw/rgw_sync_log_trim.h @@ -20,6 +20,7 @@ #include class CephContext; +class RGWCoroutine; class RGWRados; namespace rgw { @@ -33,6 +34,8 @@ struct BucketChangeObserver { /// Configuration for BucketTrimManager struct BucketTrimConfig { + /// time interval in seconds between bucket trim attempts + uint32_t trim_interval_sec{0}; /// maximum number of buckets to track with BucketChangeObserver size_t counter_size{0}; }; @@ -55,6 +58,9 @@ class BucketTrimManager : public BucketChangeObserver { /// increment a counter for the given bucket instance void on_bucket_changed(const boost::string_view& bucket_instance) override; + + /// create a coroutine to run the bucket trim process every trim interval + RGWCoroutine* create_bucket_trim_cr(); }; } // namespace rgw From 82c059530871af054c48de0e6a091b1f74f2eb12 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 12:31:44 -0400 Subject: [PATCH 09/35] rgw: add BucketTrimCR to spawn trim for active buckets Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 138 ++++++++++++++++++++++++++++++++++- src/rgw/rgw_sync_log_trim.h | 6 ++ 2 files changed, 143 insertions(+), 1 deletion(-) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 05b6e247ed6..1bca8539cfa 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -21,6 +21,7 @@ #include "rgw_cr_rados.h" #include "rgw_rados.h" #include "rgw_sync_log_trim.h" +#include "rgw_sync.h" #include #include "include/assert.h" @@ -266,6 +267,141 @@ class BucketTrimWatcher : public librados::WatchCtx2 { }; +/// trim the bilog of all of the given bucket instance's shards +class BucketTrimInstanceCR : public RGWCoroutine { + RGWRados *const store; + std::string bucket_instance; + + public: + BucketTrimInstanceCR(RGWRados *store, const std::string& bucket_instance) + : RGWCoroutine(store->ctx()), store(store), + bucket_instance(bucket_instance) + {} + int operate() { + return set_cr_done(); + } +}; + +/// trim each bucket instance while limiting the number of concurrent operations +class BucketTrimInstanceCollectCR : public RGWShardCollectCR { + RGWRados *const store; + std::vector::const_iterator bucket; + std::vector::const_iterator end; + public: + BucketTrimInstanceCollectCR(RGWRados *store, + const std::vector& buckets, + int max_concurrent) + : RGWShardCollectCR(store->ctx(), max_concurrent), + store(store), + bucket(buckets.begin()), end(buckets.end()) + {} + bool spawn_next() override; +}; + +bool BucketTrimInstanceCollectCR::spawn_next() +{ + if (bucket == end) { + return false; + } + spawn(new BucketTrimInstanceCR(store, *bucket), false); + ++bucket; + return true; +} + +/// correlate the replies from each peer gateway into the given counter +int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter) +{ + counter.clear(); + + try { + // decode notify responses + auto p = bl.begin(); + std::map, bufferlist> replies; + std::set> timeouts; + ::decode(replies, p); + ::decode(timeouts, p); + + for (auto& peer : replies) { + auto q = peer.second.begin(); + TrimCounters::Response response; + ::decode(response, q); + for (const auto& b : response.bucket_counters) { + counter.insert(b.bucket, b.count); + } + } + } catch (const buffer::error& e) { + return -EIO; + } + return 0; +} + +class BucketTrimCR : public RGWCoroutine { + RGWRados *const store; + const BucketTrimConfig& config; + const rgw_raw_obj& obj; + bufferlist notify_replies; + BucketChangeCounter counter; + std::vector buckets; //< buckets selected for trim + + public: + BucketTrimCR(RGWRados *store, const BucketTrimConfig& config, + const rgw_raw_obj& obj) + : RGWCoroutine(store->ctx()), store(store), config(config), + obj(obj), counter(config.counter_size) + {} + + int operate(); +}; + +int BucketTrimCR::operate() +{ + reenter(this) { + if (config.buckets_per_interval) { + // query watch/notify for hot buckets + ldout(cct, 10) << "fetching active bucket counters" << dendl; + set_status("fetching active bucket counters"); + yield { + // request the top bucket counters from each peer gateway + const TrimNotifyType type = NotifyTrimCounters; + TrimCounters::Request request{32}; + bufferlist bl; + ::encode(type, bl); + ::encode(request, bl); + call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, + ¬ify_replies)); + } + if (retcode < 0) { + ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl; + return set_cr_error(retcode); + } + + // select the hottest buckets for trim + retcode = accumulate_peer_counters(notify_replies, counter); + if (retcode < 0) { + ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl; + return set_cr_error(retcode); + } + buckets.reserve(config.buckets_per_interval); + + const int max_count = config.buckets_per_interval; + counter.get_highest(max_count, + [this] (const std::string& bucket, int count) { + buckets.push_back(bucket); + }); + } + + // trim bucket instances with limited concurrency + set_status("trimming buckets"); + ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl; + yield call(new BucketTrimInstanceCollectCR(store, buckets, + config.concurrent_buckets)); + // ignore errors from individual buckets + + return set_cr_done(); + } + return 0; +} + class BucketTrimPollCR : public RGWCoroutine { RGWRados *const store; const BucketTrimConfig& config; @@ -301,7 +437,7 @@ int BucketTrimPollCR::operate() } set_status("trimming"); - // TODO: spawn trim logic + yield call(new BucketTrimCR(store, config, obj)); if (retcode < 0) { // on errors, unlock so other gateways can try set_status("unlocking"); diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h index fb69512e97b..ea47734e238 100644 --- a/src/rgw/rgw_sync_log_trim.h +++ b/src/rgw/rgw_sync_log_trim.h @@ -38,6 +38,12 @@ struct BucketTrimConfig { uint32_t trim_interval_sec{0}; /// maximum number of buckets to track with BucketChangeObserver size_t counter_size{0}; + /// maximum number of buckets to process each trim interval + uint32_t buckets_per_interval{0}; + /// maximum number of buckets to process in parallel + uint32_t concurrent_buckets{0}; + /// timeout in ms for bucket trim notify replies + uint64_t notify_timeout_ms{0}; }; /// fill out the BucketTrimConfig from the ceph context From c9d50860b735809e343a0831ba3d346b0b37973c Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 7 Sep 2017 12:12:43 -0400 Subject: [PATCH 10/35] rgw: add MetadataListCR to loop over bucket instances Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 145 +++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 1bca8539cfa..4eaddf71721 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -19,6 +19,7 @@ #include "common/bounded_key_counter.h" #include "common/errno.h" #include "rgw_cr_rados.h" +#include "rgw_metadata.h" #include "rgw_rados.h" #include "rgw_sync_log_trim.h" #include "rgw_sync.h" @@ -335,6 +336,150 @@ int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter) return 0; } +/// metadata callback has the signature bool(string&& key, string&& marker) +using MetadataListCallback = std::function; + +/// lists metadata keys, passing each to a callback until it returns false. +/// on reaching the end, it will restart at the beginning and list up to the +/// initial marker +class AsyncMetadataList : public RGWAsyncRadosRequest { + CephContext *const cct; + RGWMetadataManager *const mgr; + const std::string section; + const std::string start_marker; + MetadataListCallback callback; + void *handle{nullptr}; + + int _send_request() override; + public: + AsyncMetadataList(CephContext *cct, RGWCoroutine *caller, + RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr, + const std::string& section, const std::string& start_marker, + const MetadataListCallback& callback) + : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr), + section(section), start_marker(start_marker), callback(callback) + {} + ~AsyncMetadataList() override { + if (handle) { + mgr->list_keys_complete(handle); + } + } +}; + +int AsyncMetadataList::_send_request() +{ + // start a listing at the given marker + int r = mgr->list_keys_init(section, start_marker, &handle); + if (r < 0) { + ldout(cct, 10) << "failed to init metadata listing: " + << cpp_strerror(r) << dendl; + return r; + } + ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl; + + std::list keys; + bool truncated{false}; + std::string marker; + + do { + // get the next key and marker + r = mgr->list_keys_next(handle, 1, keys, &truncated); + if (r < 0) { + ldout(cct, 10) << "failed to list metadata: " + << cpp_strerror(r) << dendl; + return r; + } + marker = mgr->get_marker(handle); + + if (!keys.empty()) { + assert(keys.size() == 1); + auto& key = keys.front(); + if (!callback(std::move(key), std::move(marker))) { + return 0; + } + } + } while (truncated); + + if (start_marker.empty()) { + // already listed all keys + return 0; + } + + // restart the listing from the beginning (empty marker) + mgr->list_keys_complete(handle); + handle = nullptr; + + r = mgr->list_keys_init(section, "", &handle); + if (r < 0) { + ldout(cct, 10) << "failed to restart metadata listing: " + << cpp_strerror(r) << dendl; + return r; + } + ldout(cct, 20) << "restarting metadata listing" << dendl; + + do { + // get the next key and marker + r = mgr->list_keys_next(handle, 1, keys, &truncated); + if (r < 0) { + ldout(cct, 10) << "failed to list metadata: " + << cpp_strerror(r) << dendl; + return r; + } + marker = mgr->get_marker(handle); + + if (!keys.empty()) { + assert(keys.size() == 1); + auto& key = keys.front(); + // stop at original marker + if (marker >= start_marker) { + return 0; + } + if (!callback(std::move(key), std::move(marker))) { + return 0; + } + } + } while (truncated); + + return 0; +} + +/// coroutine wrapper for AsyncMetadataList +class MetadataListCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *const async_rados; + RGWMetadataManager *const mgr; + const std::string& section; + const std::string& start_marker; + MetadataListCallback callback; + RGWAsyncRadosRequest *req{nullptr}; + public: + MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados, + RGWMetadataManager *mgr, const std::string& section, + const std::string& start_marker, + const MetadataListCallback& callback) + : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr), + section(section), start_marker(start_marker), callback(callback) + {} + ~MetadataListCR() override { + request_cleanup(); + } + + int send_request() override { + req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(), + mgr, section, start_marker, callback); + async_rados->queue(req); + return 0; + } + int request_complete() override { + return req->get_ret_status(); + } + void request_cleanup() override { + if (req) { + req->finish(); + req = nullptr; + } + } +}; + class BucketTrimCR : public RGWCoroutine { RGWRados *const store; const BucketTrimConfig& config; From 2c07d7dd0e6c358bcdba409747ebf13f846a77e1 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 12:37:56 -0400 Subject: [PATCH 11/35] rgw: add BucketTrimStatus Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 7 +++++-- src/rgw/rgw_sync_log_trim.h | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 4eaddf71721..a83f37cd288 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -18,10 +18,10 @@ #include "common/bounded_key_counter.h" #include "common/errno.h" +#include "rgw_sync_log_trim.h" #include "rgw_cr_rados.h" #include "rgw_metadata.h" #include "rgw_rados.h" -#include "rgw_sync_log_trim.h" #include "rgw_sync.h" #include @@ -35,6 +35,9 @@ using rgw::BucketTrimConfig; using BucketChangeCounter = BoundedKeyCounter; +const std::string rgw::BucketTrimStatus::oid = "bilog.trim"; +using rgw::BucketTrimStatus; + // watch/notify api for gateways to coordinate about which buckets to trim enum TrimNotifyType { @@ -614,7 +617,7 @@ class BucketTrimManager::Impl : public TrimCounters::Server { Impl(RGWRados *store, const BucketTrimConfig& config) : store(store), config(config), - status_obj(store->get_zone_params().log_pool, "bilog.trim"), + status_obj(store->get_zone_params().log_pool, BucketTrimStatus::oid), counter(config.counter_size), watcher(store, status_obj, this) {} diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h index ea47734e238..e54a74a582d 100644 --- a/src/rgw/rgw_sync_log_trim.h +++ b/src/rgw/rgw_sync_log_trim.h @@ -18,6 +18,7 @@ #include #include +#include "include/encoding.h" class CephContext; class RGWCoroutine; @@ -69,6 +70,27 @@ class BucketTrimManager : public BucketChangeObserver { RGWCoroutine* create_bucket_trim_cr(); }; +/// provides persistent storage for the trim manager's current position in the +/// list of bucket instance metadata +struct BucketTrimStatus { + std::string marker; //< metadata key of current bucket instance + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(marker, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& p) { + DECODE_START(1, p); + ::decode(marker, p); + DECODE_FINISH(p); + } + + static const std::string oid; +}; + } // namespace rgw +WRITE_CLASS_ENCODER(rgw::BucketTrimStatus); + #endif // RGW_SYNC_LOG_TRIM_H From 06a22a134f9af92753fa206eb64025472ec94f40 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 12:40:58 -0400 Subject: [PATCH 12/35] rgw: collect cold buckets for trim Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 67 +++++++++++++++++++++++++++++++++++- src/rgw/rgw_sync_log_trim.h | 2 ++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index a83f37cd288..4e155501a91 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -490,7 +490,11 @@ class BucketTrimCR : public RGWCoroutine { bufferlist notify_replies; BucketChangeCounter counter; std::vector buckets; //< buckets selected for trim + BucketTrimStatus status; + RGWObjVersionTracker objv; //< version tracker for trim status object + std::string last_cold_marker; //< position for next trim marker + static const std::string section; //< metadata section for bucket instances public: BucketTrimCR(RGWRados *store, const BucketTrimConfig& config, const rgw_raw_obj& obj) @@ -501,6 +505,8 @@ class BucketTrimCR : public RGWCoroutine { int operate(); }; +const std::string BucketTrimCR::section{"bucket.instance"}; + int BucketTrimCR::operate() { reenter(this) { @@ -531,13 +537,57 @@ int BucketTrimCR::operate() } buckets.reserve(config.buckets_per_interval); - const int max_count = config.buckets_per_interval; + const int max_count = config.buckets_per_interval - + config.min_cold_buckets_per_interval; counter.get_highest(max_count, [this] (const std::string& bucket, int count) { buckets.push_back(bucket); }); } + if (buckets.size() < config.buckets_per_interval) { + // read BucketTrimStatus for marker position + set_status("reading trim status"); + using ReadStatus = RGWSimpleRadosReadCR; + yield call(new ReadStatus(store->get_async_rados(), store, obj, + &status, true, &objv)); + if (retcode < 0) { + ldout(cct, 10) << "failed to read bilog trim status: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + if (status.marker == "MAX") { + status.marker.clear(); // restart at the beginning + } + ldout(cct, 10) << "listing cold buckets from marker=" + << status.marker << dendl; + + set_status("listing cold buckets for trim"); + yield { + // list cold buckets to consider for trim + auto cb = [this] (std::string&& bucket, std::string&& marker) { + // filter out active buckets that we've already selected + auto i = std::find(buckets.begin(), buckets.end(), bucket); + if (i != buckets.end()) { + return true; + } + buckets.emplace_back(std::move(bucket)); + // remember the last cold bucket spawned to update the status marker + last_cold_marker = std::move(marker); + // return true if there's room for more + return buckets.size() < config.buckets_per_interval; + }; + + call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr, + section, status.marker, cb)); + } + if (retcode < 0) { + ldout(cct, 4) << "failed to list bucket instance metadata: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + } + // trim bucket instances with limited concurrency set_status("trimming buckets"); ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl; @@ -545,6 +595,21 @@ int BucketTrimCR::operate() config.concurrent_buckets)); // ignore errors from individual buckets + // write updated trim status + if (!last_cold_marker.empty() && status.marker != last_cold_marker) { + set_status("writing updated trim status"); + status.marker = std::move(last_cold_marker); + ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl; + using WriteStatus = RGWSimpleRadosWriteCR; + yield call(new WriteStatus(store->get_async_rados(), store, obj, + status, &objv)); + if (retcode < 0) { + ldout(cct, 4) << "failed to write updated trim status: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + } + return set_cr_done(); } return 0; diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h index e54a74a582d..18de544266a 100644 --- a/src/rgw/rgw_sync_log_trim.h +++ b/src/rgw/rgw_sync_log_trim.h @@ -41,6 +41,8 @@ struct BucketTrimConfig { size_t counter_size{0}; /// maximum number of buckets to process each trim interval uint32_t buckets_per_interval{0}; + /// minimum number of buckets to choose from the global bucket instance list + uint32_t min_cold_buckets_per_interval{0}; /// maximum number of buckets to process in parallel uint32_t concurrent_buckets{0}; /// timeout in ms for bucket trim notify replies From 7be4eab8a339e9e083352a44ad09272da717c73e Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 11:06:30 -0400 Subject: [PATCH 13/35] rgw: BucketTrimManager implements BucketTrimObserver Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 123 +++++++++++++++++++++++++++++++---- src/rgw/rgw_sync_log_trim.h | 7 ++ 2 files changed, 118 insertions(+), 12 deletions(-) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 4e155501a91..a3c21329822 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -14,6 +14,7 @@ */ #include +#include #include #include "common/bounded_key_counter.h" @@ -271,17 +272,29 @@ class BucketTrimWatcher : public librados::WatchCtx2 { }; +/// Interface to communicate with the trim manager about completed operations +struct BucketTrimObserver { + virtual ~BucketTrimObserver() = default; + + virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0; + virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0; +}; + /// trim the bilog of all of the given bucket instance's shards class BucketTrimInstanceCR : public RGWCoroutine { RGWRados *const store; + BucketTrimObserver *const observer; std::string bucket_instance; public: - BucketTrimInstanceCR(RGWRados *store, const std::string& bucket_instance) + BucketTrimInstanceCR(RGWRados *store, BucketTrimObserver *observer, + const std::string& bucket_instance) : RGWCoroutine(store->ctx()), store(store), + observer(observer), bucket_instance(bucket_instance) {} int operate() { + observer->on_bucket_trimmed(std::move(bucket_instance)); return set_cr_done(); } }; @@ -289,14 +302,15 @@ class BucketTrimInstanceCR : public RGWCoroutine { /// trim each bucket instance while limiting the number of concurrent operations class BucketTrimInstanceCollectCR : public RGWShardCollectCR { RGWRados *const store; + BucketTrimObserver *const observer; std::vector::const_iterator bucket; std::vector::const_iterator end; public: - BucketTrimInstanceCollectCR(RGWRados *store, + BucketTrimInstanceCollectCR(RGWRados *store, BucketTrimObserver *observer, const std::vector& buckets, int max_concurrent) : RGWShardCollectCR(store->ctx(), max_concurrent), - store(store), + store(store), observer(observer), bucket(buckets.begin()), end(buckets.end()) {} bool spawn_next() override; @@ -307,7 +321,7 @@ bool BucketTrimInstanceCollectCR::spawn_next() if (bucket == end) { return false; } - spawn(new BucketTrimInstanceCR(store, *bucket), false); + spawn(new BucketTrimInstanceCR(store, observer, *bucket), false); ++bucket; return true; } @@ -486,6 +500,7 @@ class MetadataListCR : public RGWSimpleCoroutine { class BucketTrimCR : public RGWCoroutine { RGWRados *const store; const BucketTrimConfig& config; + BucketTrimObserver *const observer; const rgw_raw_obj& obj; bufferlist notify_replies; BucketChangeCounter counter; @@ -497,9 +512,9 @@ class BucketTrimCR : public RGWCoroutine { static const std::string section; //< metadata section for bucket instances public: BucketTrimCR(RGWRados *store, const BucketTrimConfig& config, - const rgw_raw_obj& obj) + BucketTrimObserver *observer, const rgw_raw_obj& obj) : RGWCoroutine(store->ctx()), store(store), config(config), - obj(obj), counter(config.counter_size) + observer(observer), obj(obj), counter(config.counter_size) {} int operate(); @@ -566,6 +581,10 @@ int BucketTrimCR::operate() yield { // list cold buckets to consider for trim auto cb = [this] (std::string&& bucket, std::string&& marker) { + // filter out keys that we trimmed recently + if (observer->trimmed_recently(bucket)) { + return true; + } // filter out active buckets that we've already selected auto i = std::find(buckets.begin(), buckets.end(), bucket); if (i != buckets.end()) { @@ -591,7 +610,7 @@ int BucketTrimCR::operate() // trim bucket instances with limited concurrency set_status("trimming buckets"); ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl; - yield call(new BucketTrimInstanceCollectCR(store, buckets, + yield call(new BucketTrimInstanceCollectCR(store, observer, buckets, config.concurrent_buckets)); // ignore errors from individual buckets @@ -618,14 +637,16 @@ int BucketTrimCR::operate() class BucketTrimPollCR : public RGWCoroutine { RGWRados *const store; const BucketTrimConfig& config; + BucketTrimObserver *const observer; const rgw_raw_obj& obj; const std::string name{"trim"}; //< lock name const std::string cookie; public: BucketTrimPollCR(RGWRados *store, const BucketTrimConfig& config, - const rgw_raw_obj& obj) - : RGWCoroutine(store->ctx()), store(store), config(config), obj(obj), + BucketTrimObserver *observer, const rgw_raw_obj& obj) + : RGWCoroutine(store->ctx()), store(store), config(config), + observer(observer), obj(obj), cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)) {} @@ -650,7 +671,7 @@ int BucketTrimPollCR::operate() } set_status("trimming"); - yield call(new BucketTrimCR(store, config, obj)); + yield call(new BucketTrimCR(store, config, observer, obj)); if (retcode < 0) { // on errors, unlock so other gateways can try set_status("unlocking"); @@ -662,9 +683,62 @@ int BucketTrimPollCR::operate() return 0; } +/// tracks a bounded list of events with timestamps. old events can be expired, +/// and recent events can be searched by key. expiration depends on events being +/// inserted in temporal order +template +class RecentEventList { + public: + using clock_type = Clock; + using time_point = typename clock_type::time_point; + + RecentEventList(size_t max_size, const ceph::timespan& max_duration) + : events(max_size), max_duration(max_duration) + {} + + /// insert an event at the given point in time. this time must be at least as + /// recent as the last inserted event + void insert(T&& value, const time_point& now) + { + // assert(events.empty() || now >= events.back().time) + events.push_back(Event{std::move(value), now}); + } + + /// performs a linear search for an event matching the given key, whose type + /// U can be any that provides operator==(U, T) + template + bool lookup(const U& key) const + { + for (const auto& event : events) { + if (key == event.value) { + return true; + } + } + return false; + } + + /// remove events that are no longer recent compared to the given point in time + void expire_old(const time_point& now) + { + const auto expired_before = now - max_duration; + while (!events.empty() && events.front().time < expired_before) { + events.pop_front(); + } + } + + private: + struct Event { + T value; + time_point time; + }; + boost::circular_buffer events; + const ceph::timespan max_duration; +}; + namespace rgw { -class BucketTrimManager::Impl : public TrimCounters::Server { +class BucketTrimManager::Impl : public TrimCounters::Server, + public BucketTrimObserver { public: RGWRados *const store; const BucketTrimConfig config; @@ -674,6 +748,11 @@ class BucketTrimManager::Impl : public TrimCounters::Server { /// count frequency of bucket instance entries in the data changes log BucketChangeCounter counter; + using RecentlyTrimmedBucketList = RecentEventList; + using clock_type = RecentlyTrimmedBucketList::clock_type; + /// track recently trimmed buckets to focus trim activity elsewhere + RecentlyTrimmedBucketList trimmed; + /// serve the bucket trim watch/notify api BucketTrimWatcher watcher; @@ -684,6 +763,7 @@ class BucketTrimManager::Impl : public TrimCounters::Server { : store(store), config(config), status_obj(store->get_zone_params().log_pool, BucketTrimStatus::oid), counter(config.counter_size), + trimmed(config.recent_size, config.recent_duration), watcher(store, status_obj, this) {} @@ -697,6 +777,20 @@ class BucketTrimManager::Impl : public TrimCounters::Server { }); ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl; } + + /// BucketTrimObserver interface to remember successfully-trimmed buckets + void on_bucket_trimmed(std::string&& bucket_instance) override + { + ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl; + std::lock_guard lock(mutex); + trimmed.insert(std::move(bucket_instance), clock_type::now()); + } + + bool trimmed_recently(const boost::string_view& bucket_instance) override + { + std::lock_guard lock(mutex); + return trimmed.lookup(bucket_instance); + } }; BucketTrimManager::BucketTrimManager(RGWRados *store, @@ -714,12 +808,17 @@ int BucketTrimManager::init() void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket) { std::lock_guard lock(impl->mutex); + // filter recently trimmed bucket instances out of bucket change counter + if (impl->trimmed.lookup(bucket)) { + return; + } impl->counter.insert(bucket.to_string()); } RGWCoroutine* BucketTrimManager::create_bucket_trim_cr() { - return new BucketTrimPollCR(impl->store, impl->config, impl->status_obj); + return new BucketTrimPollCR(impl->store, impl->config, + impl.get(), impl->status_obj); } } // namespace rgw diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h index 18de544266a..d8a1cacc741 100644 --- a/src/rgw/rgw_sync_log_trim.h +++ b/src/rgw/rgw_sync_log_trim.h @@ -19,6 +19,7 @@ #include #include #include "include/encoding.h" +#include "common/ceph_time.h" class CephContext; class RGWCoroutine; @@ -47,6 +48,12 @@ struct BucketTrimConfig { uint32_t concurrent_buckets{0}; /// timeout in ms for bucket trim notify replies uint64_t notify_timeout_ms{0}; + /// maximum number of recently trimmed buckets to remember (should be small + /// enough for a linear search) + size_t recent_size{0}; + /// maximum duration to consider a trim as 'recent' (should be some multiple + /// of the trim interval, at least) + ceph::timespan recent_duration{0}; }; /// fill out the BucketTrimConfig from the ceph context From d29f96ae3e1b7d65b2f513340e8c2c42cf6de9f1 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 7 Sep 2017 12:48:47 -0400 Subject: [PATCH 14/35] rgw: add configure_bucket_trim() Signed-off-by: Casey Bodley --- src/common/options.cc | 12 ++++++++++++ src/rgw/rgw_sync_log_trim.cc | 19 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/src/common/options.cc b/src/common/options.cc index 2e70f1c592e..881a18d73d1 100644 --- a/src/common/options.cc +++ b/src/common/options.cc @@ -5010,6 +5010,18 @@ std::vector