mirror of
https://github.com/ceph/ceph
synced 2024-12-19 01:46:00 +00:00
Merge pull request #17761 from cbodley/wip-rgw-bilog-trim
rgw multisite: automated trimming for bucket index logs Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
commit
c9dbb86dc4
@ -187,7 +187,8 @@ class Cluster(multisite.Cluster):
|
||||
""" radosgw-admin command """
|
||||
args = args or []
|
||||
args += ['--cluster', self.name]
|
||||
args += ['--debug-rgw', '0']
|
||||
args += ['--debug-rgw', str(kwargs.pop('debug_rgw', 0))]
|
||||
args += ['--debug-ms', str(kwargs.pop('debug_ms', 0))]
|
||||
if kwargs.pop('read_only', False):
|
||||
args += ['--rgw-cache-enabled', 'false']
|
||||
kwargs['decode'] = False
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include "include/str_list.h"
|
||||
#include "include/rados/librados.hpp"
|
||||
#include "cls_rgw_ops.h"
|
||||
#include "cls_rgw_const.h"
|
||||
#include "common/RefCountedObj.h"
|
||||
#include "include/compat.h"
|
||||
#include "common/ceph_time.h"
|
||||
@ -217,6 +218,15 @@ public:
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// trim the '<shard-id>#' prefix from a single shard marker if present
|
||||
static std::string get_shard_marker(const std::string& marker) {
|
||||
auto p = marker.find(KEY_VALUE_SEPARATOR);
|
||||
if (p == marker.npos) {
|
||||
return marker;
|
||||
}
|
||||
return marker.substr(p + 1);
|
||||
}
|
||||
};
|
||||
|
||||
/* bucket index */
|
||||
|
187
src/common/bounded_key_counter.h
Normal file
187
src/common/bounded_key_counter.h
Normal file
@ -0,0 +1,187 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2017 Red Hat, Inc
|
||||
*
|
||||
* Author: Casey Bodley <cbodley@redhat.com>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef BOUNDED_KEY_COUNTER_H
|
||||
#define BOUNDED_KEY_COUNTER_H
|
||||
|
||||
#include <algorithm>
|
||||
#include <map>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
#include "include/assert.h"
|
||||
|
||||
/**
|
||||
* BoundedKeyCounter
|
||||
*
|
||||
* A data structure that counts the number of times a given key is inserted,
|
||||
* and can return the keys with the highest counters. The number of unique keys
|
||||
* is bounded by the given constructor argument, meaning that new keys will be
|
||||
* rejected if they would exceed this bound.
|
||||
*
|
||||
* It is optimized for use where insertion is frequent, but sorted listings are
|
||||
* both infrequent and tend to request a small subset of the available keys.
|
||||
*/
|
||||
template <typename Key, typename Count>
|
||||
class BoundedKeyCounter {
|
||||
/// map type to associate keys with their counter values
|
||||
using map_type = std::map<Key, Count>;
|
||||
using value_type = typename map_type::value_type;
|
||||
|
||||
/// view type used for sorting key-value pairs by their counter value
|
||||
using view_type = std::vector<const value_type*>;
|
||||
|
||||
/// maximum number of counters to store at once
|
||||
const size_t bound;
|
||||
|
||||
/// map of counters, with a maximum size given by 'bound'
|
||||
map_type counters;
|
||||
|
||||
/// storage for sorted key-value pairs
|
||||
view_type sorted;
|
||||
|
||||
/// remembers how much of the range is actually sorted
|
||||
typename view_type::iterator sorted_position;
|
||||
|
||||
/// invalidate view of sorted entries
|
||||
void invalidate_sorted()
|
||||
{
|
||||
sorted_position = sorted.begin();
|
||||
sorted.clear();
|
||||
}
|
||||
|
||||
/// value_type comparison function for sorting in descending order
|
||||
static bool value_greater(const value_type *lhs, const value_type *rhs)
|
||||
{
|
||||
return lhs->second > rhs->second;
|
||||
}
|
||||
|
||||
/// map iterator that adapts value_type to value_type*
|
||||
struct const_pointer_iterator : public map_type::const_iterator {
|
||||
const_pointer_iterator(typename map_type::const_iterator i)
|
||||
: map_type::const_iterator(i) {}
|
||||
const value_type* operator*() const {
|
||||
return &map_type::const_iterator::operator*();
|
||||
}
|
||||
};
|
||||
|
||||
protected:
|
||||
/// return the number of sorted entries. marked protected for unit testing
|
||||
size_t get_num_sorted() const
|
||||
{
|
||||
using const_iterator = typename view_type::const_iterator;
|
||||
return std::distance<const_iterator>(sorted.begin(), sorted_position);
|
||||
}
|
||||
|
||||
public:
|
||||
BoundedKeyCounter(size_t bound)
|
||||
: bound(bound)
|
||||
{
|
||||
sorted.reserve(bound);
|
||||
sorted_position = sorted.begin();
|
||||
}
|
||||
|
||||
/// return the number of keys stored
|
||||
size_t size() const noexcept { return counters.size(); }
|
||||
|
||||
/// return the maximum number of keys
|
||||
size_t capacity() const noexcept { return bound; }
|
||||
|
||||
/// increment a counter for the given key and return its value. if the key was
|
||||
/// not present, insert it. if the map is full, return 0
|
||||
Count insert(const Key& key, Count n = 1)
|
||||
{
|
||||
typename map_type::iterator i;
|
||||
|
||||
if (counters.size() < bound) {
|
||||
// insert new entries at count=0
|
||||
bool inserted;
|
||||
std::tie(i, inserted) = counters.emplace(key, 0);
|
||||
if (inserted) {
|
||||
sorted.push_back(&*i);
|
||||
}
|
||||
} else {
|
||||
// when full, refuse to insert new entries
|
||||
i = counters.find(key);
|
||||
if (i == counters.end()) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
i->second += n; // add to the counter
|
||||
|
||||
// update sorted position if necessary. use a binary search for the last
|
||||
// element in the sorted range that's greater than this counter
|
||||
sorted_position = std::lower_bound(sorted.begin(), sorted_position,
|
||||
&*i, &value_greater);
|
||||
|
||||
return i->second;
|
||||
}
|
||||
|
||||
/// remove the given key from the map of counters
|
||||
void erase(const Key& key)
|
||||
{
|
||||
auto i = counters.find(key);
|
||||
if (i == counters.end()) {
|
||||
return;
|
||||
}
|
||||
// removing the sorted entry would require linear search; invalidate instead
|
||||
invalidate_sorted();
|
||||
|
||||
counters.erase(i);
|
||||
}
|
||||
|
||||
/// query the highest N key-value pairs sorted by counter value, passing each
|
||||
/// in order to the given callback with arguments (Key, Count)
|
||||
template <typename Callback>
|
||||
void get_highest(size_t count, Callback&& cb)
|
||||
{
|
||||
if (sorted.empty()) {
|
||||
// initialize the vector with pointers to all key-value pairs
|
||||
sorted.assign(const_pointer_iterator{counters.cbegin()},
|
||||
const_pointer_iterator{counters.cend()});
|
||||
// entire range is unsorted
|
||||
assert(sorted_position == sorted.begin());
|
||||
}
|
||||
|
||||
const size_t sorted_count = get_num_sorted();
|
||||
if (sorted_count < count) {
|
||||
// move sorted_position to cover the requested number of entries
|
||||
sorted_position = sorted.begin() + std::min(count, sorted.size());
|
||||
|
||||
// sort all entries in descending order up to the given position
|
||||
std::partial_sort(sorted.begin(), sorted_position, sorted.end(),
|
||||
&value_greater);
|
||||
}
|
||||
|
||||
// return the requested range via callback
|
||||
for (const auto& pair : sorted) {
|
||||
if (count-- == 0) {
|
||||
return;
|
||||
}
|
||||
cb(pair->first, pair->second);
|
||||
}
|
||||
}
|
||||
|
||||
/// remove all keys and counters and invalidate the sorted range
|
||||
void clear()
|
||||
{
|
||||
invalidate_sorted();
|
||||
counters.clear();
|
||||
}
|
||||
};
|
||||
|
||||
#endif // BOUNDED_KEY_COUNTER_H
|
@ -5361,6 +5361,29 @@ std::vector<Option> get_rgw_options() {
|
||||
.set_long_description(
|
||||
"Time in seconds between attempts to trim sync logs."),
|
||||
|
||||
Option("rgw_sync_log_trim_max_buckets", Option::TYPE_INT, Option::LEVEL_ADVANCED)
|
||||
.set_default(16)
|
||||
.set_description("Maximum number of buckets to trim per interval")
|
||||
.set_long_description("The maximum number of buckets to consider for bucket index log trimming each trim interval, regardless of the number of bucket index shards. Priority is given to buckets with the most sync activity over the last trim interval.")
|
||||
.add_see_also("rgw_sync_log_trim_interval")
|
||||
.add_see_also("rgw_sync_log_trim_min_cold_buckets")
|
||||
.add_see_also("rgw_sync_log_trim_concurrent_buckets"),
|
||||
|
||||
Option("rgw_sync_log_trim_min_cold_buckets", Option::TYPE_INT, Option::LEVEL_ADVANCED)
|
||||
.set_default(4)
|
||||
.set_description("Minimum number of cold buckets to trim per interval")
|
||||
.set_long_description("Of the `rgw_sync_log_trim_max_buckets` selected for bucket index log trimming each trim interval, at least this many of them must be 'cold' buckets. These buckets are selected in order from the list of all bucket instances, to guarantee that all buckets will be visited eventually.")
|
||||
.add_see_also("rgw_sync_log_trim_interval")
|
||||
.add_see_also("rgw_sync_log_trim_max_buckets")
|
||||
.add_see_also("rgw_sync_log_trim_concurrent_buckets"),
|
||||
|
||||
Option("rgw_sync_log_trim_concurrent_buckets", Option::TYPE_INT, Option::LEVEL_ADVANCED)
|
||||
.set_default(4)
|
||||
.set_description("Maximum number of buckets to trim in parallel")
|
||||
.add_see_also("rgw_sync_log_trim_interval")
|
||||
.add_see_also("rgw_sync_log_trim_max_buckets")
|
||||
.add_see_also("rgw_sync_log_trim_min_cold_buckets"),
|
||||
|
||||
Option("rgw_sync_data_inject_err_probability", Option::TYPE_FLOAT, Option::LEVEL_DEV)
|
||||
.set_default(0)
|
||||
.set_description(""),
|
||||
|
@ -76,6 +76,7 @@ set(rgw_a_srcs
|
||||
rgw_sync_module_es.cc
|
||||
rgw_sync_module_es_rest.cc
|
||||
rgw_sync_module_log.cc
|
||||
rgw_sync_log_trim.cc
|
||||
rgw_sync_trace.cc
|
||||
rgw_period_history.cc
|
||||
rgw_period_puller.cc
|
||||
|
@ -38,6 +38,7 @@
|
||||
#include "rgw_replica_log.h"
|
||||
#include "rgw_orphan.h"
|
||||
#include "rgw_sync.h"
|
||||
#include "rgw_sync_log_trim.h"
|
||||
#include "rgw_data_sync.h"
|
||||
#include "rgw_rest_conn.h"
|
||||
#include "rgw_realm_watcher.h"
|
||||
@ -440,6 +441,7 @@ enum {
|
||||
OPT_BILOG_LIST,
|
||||
OPT_BILOG_TRIM,
|
||||
OPT_BILOG_STATUS,
|
||||
OPT_BILOG_AUTOTRIM,
|
||||
OPT_DATA_SYNC_STATUS,
|
||||
OPT_DATA_SYNC_INIT,
|
||||
OPT_DATA_SYNC_RUN,
|
||||
@ -864,6 +866,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
|
||||
return OPT_BILOG_TRIM;
|
||||
if (strcmp(cmd, "status") == 0)
|
||||
return OPT_BILOG_STATUS;
|
||||
if (strcmp(cmd, "autotrim") == 0)
|
||||
return OPT_BILOG_AUTOTRIM;
|
||||
} else if (strcmp(prev_cmd, "data") == 0) {
|
||||
if (strcmp(cmd, "sync") == 0) {
|
||||
*need_more = true;
|
||||
@ -6737,6 +6741,31 @@ next:
|
||||
formatter->flush(cout);
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT_BILOG_AUTOTRIM) {
|
||||
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
|
||||
RGWHTTPManager http(store->ctx(), crs.get_completion_mgr());
|
||||
int ret = http.set_threaded();
|
||||
if (ret < 0) {
|
||||
cerr << "failed to initialize http client with " << cpp_strerror(ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
|
||||
rgw::BucketTrimConfig config;
|
||||
configure_bucket_trim(store->ctx(), config);
|
||||
|
||||
rgw::BucketTrimManager trim(store, config);
|
||||
ret = trim.init();
|
||||
if (ret < 0) {
|
||||
cerr << "trim manager init failed with " << cpp_strerror(ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
ret = crs.run(trim.create_admin_bucket_trim_cr(&http));
|
||||
if (ret < 0) {
|
||||
cerr << "automated bilog trim failed with " << cpp_strerror(ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT_DATALOG_LIST) {
|
||||
formatter->open_array_section("entries");
|
||||
bool truncated;
|
||||
|
@ -1728,6 +1728,7 @@ struct rgw_obj_key {
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
void dump(Formatter *f) const;
|
||||
void decode_json(JSONObj *obj);
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_obj_key)
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include "rgw_cr_rados.h"
|
||||
|
||||
#include "cls/lock/cls_lock_client.h"
|
||||
#include "cls/rgw/cls_rgw_client.h"
|
||||
|
||||
#include <boost/asio/yield.hpp>
|
||||
|
||||
@ -481,16 +482,50 @@ bool RGWOmapAppend::finish() {
|
||||
int RGWAsyncGetBucketInstanceInfo::_send_request()
|
||||
{
|
||||
RGWObjectCtx obj_ctx(store);
|
||||
int r = store->get_bucket_instance_info(obj_ctx, bucket, *bucket_info, NULL, NULL);
|
||||
int r = store->get_bucket_instance_from_oid(obj_ctx, oid, *bucket_info, NULL, NULL);
|
||||
if (r < 0) {
|
||||
ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for "
|
||||
<< bucket << dendl;
|
||||
<< oid << dendl;
|
||||
return r;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(RGWRados *store,
|
||||
const RGWBucketInfo& bucket_info,
|
||||
int shard_id,
|
||||
const std::string& start_marker,
|
||||
const std::string& end_marker)
|
||||
: RGWSimpleCoroutine(store->ctx()), bs(store),
|
||||
start_marker(BucketIndexShardsManager::get_shard_marker(start_marker)),
|
||||
end_marker(BucketIndexShardsManager::get_shard_marker(end_marker))
|
||||
{
|
||||
bs.init(bucket_info, shard_id);
|
||||
}
|
||||
|
||||
int RGWRadosBILogTrimCR::send_request()
|
||||
{
|
||||
bufferlist in;
|
||||
cls_rgw_bi_log_trim_op call;
|
||||
call.start_marker = std::move(start_marker);
|
||||
call.end_marker = std::move(end_marker);
|
||||
::encode(call, in);
|
||||
|
||||
librados::ObjectWriteOperation op;
|
||||
op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
|
||||
|
||||
cn = stack->create_completion_notifier();
|
||||
return bs.index_ctx.aio_operate(bs.bucket_obj, cn->completion(), &op);
|
||||
}
|
||||
|
||||
int RGWRadosBILogTrimCR::request_complete()
|
||||
{
|
||||
int r = cn->completion()->get_return_value();
|
||||
set_status() << "request complete; ret=" << r;
|
||||
return r;
|
||||
}
|
||||
|
||||
int RGWAsyncFetchRemoteObj::_send_request()
|
||||
{
|
||||
RGWObjectCtx obj_ctx(store);
|
||||
@ -794,3 +829,36 @@ int RGWStatObjCR::request_complete()
|
||||
{
|
||||
return req->get_ret_status();
|
||||
}
|
||||
|
||||
RGWRadosNotifyCR::RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
|
||||
bufferlist& request, uint64_t timeout_ms,
|
||||
bufferlist *response)
|
||||
: RGWSimpleCoroutine(store->ctx()), store(store), obj(obj),
|
||||
request(request), timeout_ms(timeout_ms), response(response)
|
||||
{
|
||||
set_description() << "notify dest=" << obj;
|
||||
}
|
||||
|
||||
int RGWRadosNotifyCR::send_request()
|
||||
{
|
||||
int r = store->get_raw_obj_ref(obj, &ref);
|
||||
if (r < 0) {
|
||||
lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
|
||||
return r;
|
||||
}
|
||||
|
||||
set_status() << "sending request";
|
||||
|
||||
cn = stack->create_completion_notifier();
|
||||
return ref.ioctx.aio_notify(ref.oid, cn->completion(), request,
|
||||
timeout_ms, response);
|
||||
}
|
||||
|
||||
int RGWRadosNotifyCR::request_complete()
|
||||
{
|
||||
int r = cn->completion()->get_return_value();
|
||||
|
||||
set_status() << "request complete; ret=" << r;
|
||||
|
||||
return r;
|
||||
}
|
||||
|
@ -670,32 +670,40 @@ public:
|
||||
|
||||
class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
|
||||
RGWRados *store;
|
||||
rgw_bucket bucket;
|
||||
const std::string oid;
|
||||
RGWBucketInfo *bucket_info;
|
||||
|
||||
protected:
|
||||
int _send_request() override;
|
||||
public:
|
||||
RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
|
||||
RGWRados *_store, const rgw_bucket& bucket,
|
||||
RGWRados *_store, const std::string& oid,
|
||||
RGWBucketInfo *_bucket_info)
|
||||
: RGWAsyncRadosRequest(caller, cn), store(_store),
|
||||
bucket(bucket), bucket_info(_bucket_info) {}
|
||||
oid(oid), bucket_info(_bucket_info) {}
|
||||
};
|
||||
|
||||
class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
|
||||
RGWAsyncRadosProcessor *async_rados;
|
||||
RGWRados *store;
|
||||
rgw_bucket bucket;
|
||||
const std::string oid;
|
||||
RGWBucketInfo *bucket_info;
|
||||
|
||||
RGWAsyncGetBucketInstanceInfo *req;
|
||||
RGWAsyncGetBucketInstanceInfo *req{nullptr};
|
||||
|
||||
public:
|
||||
// metadata key constructor
|
||||
RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
|
||||
const std::string& meta_key, RGWBucketInfo *_bucket_info)
|
||||
: RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
|
||||
oid(RGW_BUCKET_INSTANCE_MD_PREFIX + meta_key),
|
||||
bucket_info(_bucket_info) {}
|
||||
// rgw_bucket constructor
|
||||
RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
|
||||
const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
|
||||
: RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
|
||||
bucket(bucket), bucket_info(_bucket_info), req(NULL) {}
|
||||
oid(RGW_BUCKET_INSTANCE_MD_PREFIX + bucket.get_key(':')),
|
||||
bucket_info(_bucket_info) {}
|
||||
~RGWGetBucketInstanceInfoCR() override {
|
||||
request_cleanup();
|
||||
}
|
||||
@ -707,7 +715,7 @@ public:
|
||||
}
|
||||
|
||||
int send_request() override {
|
||||
req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, bucket_info);
|
||||
req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, oid, bucket_info);
|
||||
async_rados->queue(req);
|
||||
return 0;
|
||||
}
|
||||
@ -716,6 +724,20 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class RGWRadosBILogTrimCR : public RGWSimpleCoroutine {
|
||||
RGWRados::BucketShard bs;
|
||||
std::string start_marker;
|
||||
std::string end_marker;
|
||||
boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
|
||||
public:
|
||||
RGWRadosBILogTrimCR(RGWRados *store, const RGWBucketInfo& bucket_info,
|
||||
int shard_id, const std::string& start_marker,
|
||||
const std::string& end_marker);
|
||||
|
||||
int send_request() override;
|
||||
int request_complete() override;
|
||||
};
|
||||
|
||||
class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
|
||||
RGWRados *store;
|
||||
string source_zone;
|
||||
@ -1154,4 +1176,23 @@ class RGWStatObjCR : public RGWSimpleCoroutine {
|
||||
int request_complete() override;
|
||||
};
|
||||
|
||||
/// coroutine wrapper for IoCtx::aio_notify()
|
||||
class RGWRadosNotifyCR : public RGWSimpleCoroutine {
|
||||
RGWRados *const store;
|
||||
const rgw_raw_obj obj;
|
||||
bufferlist request;
|
||||
const uint64_t timeout_ms;
|
||||
bufferlist *response;
|
||||
rgw_rados_ref ref;
|
||||
boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
|
||||
|
||||
public:
|
||||
RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj,
|
||||
bufferlist& request, uint64_t timeout_ms,
|
||||
bufferlist *response);
|
||||
|
||||
int send_request() override;
|
||||
int request_complete() override;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "rgw_bucket.h"
|
||||
#include "rgw_metadata.h"
|
||||
#include "rgw_sync_module.h"
|
||||
#include "rgw_sync_log_trim.h"
|
||||
|
||||
#include "cls/lock/cls_lock_client.h"
|
||||
|
||||
@ -626,7 +627,7 @@ int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSy
|
||||
RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module)
|
||||
{
|
||||
sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
|
||||
_sync_tracer, _source_zone, _sync_module);
|
||||
_sync_tracer, _source_zone, _sync_module, observer);
|
||||
|
||||
if (initialized) {
|
||||
return 0;
|
||||
@ -1050,6 +1051,9 @@ public:
|
||||
<< error_repo->get_obj() << " retcode=" << retcode));
|
||||
}
|
||||
}
|
||||
if (sync_env->observer) {
|
||||
sync_env->observer->on_bucket_changed(bs.bucket.get_key());
|
||||
}
|
||||
/* FIXME: what do do in case of error */
|
||||
if (marker_tracker && !entry_marker.empty()) {
|
||||
/* update marker */
|
||||
@ -1836,7 +1840,9 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
|
||||
bs.bucket = bucket;
|
||||
bs.shard_id = shard_id;
|
||||
|
||||
sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, _sync_tracer, source_zone, _sync_module);
|
||||
sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
|
||||
_error_logger, _sync_tracer, source_zone, _sync_module,
|
||||
nullptr);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -3107,6 +3113,61 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
|
||||
return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
|
||||
}
|
||||
|
||||
class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
|
||||
static constexpr int max_concurrent_shards = 16;
|
||||
RGWRados *const store;
|
||||
RGWDataSyncEnv *const env;
|
||||
const int num_shards;
|
||||
rgw_bucket_shard bs;
|
||||
|
||||
using Vector = std::vector<rgw_bucket_shard_sync_info>;
|
||||
Vector::iterator i, end;
|
||||
|
||||
public:
|
||||
RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
|
||||
int num_shards, const rgw_bucket& bucket,
|
||||
Vector *status)
|
||||
: RGWShardCollectCR(store->ctx(), max_concurrent_shards),
|
||||
store(store), env(env), num_shards(num_shards),
|
||||
bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
|
||||
i(status->begin()), end(status->end())
|
||||
{}
|
||||
|
||||
bool spawn_next() override {
|
||||
if (i == end) {
|
||||
return false;
|
||||
}
|
||||
spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
|
||||
++i;
|
||||
++bs.shard_id;
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
|
||||
const rgw_bucket& bucket,
|
||||
std::vector<rgw_bucket_shard_sync_info> *status)
|
||||
{
|
||||
// read the bucket instance info for num_shards
|
||||
RGWObjectCtx ctx(store);
|
||||
RGWBucketInfo info;
|
||||
int ret = store->get_bucket_instance_info(ctx, bucket, info, nullptr, nullptr);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
}
|
||||
status->clear();
|
||||
status->resize(std::max<size_t>(1, info.num_shards));
|
||||
|
||||
RGWDataSyncEnv env;
|
||||
RGWSyncModuleInstanceRef module; // null sync module
|
||||
env.init(store->ctx(), store, nullptr, store->get_async_rados(),
|
||||
nullptr, nullptr, nullptr, source_zone, module, nullptr);
|
||||
|
||||
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
|
||||
return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, info.num_shards,
|
||||
bucket, status));
|
||||
}
|
||||
|
||||
|
||||
// TODO: move into rgw_data_sync_trim.cc
|
||||
#undef dout_prefix
|
||||
|
@ -11,6 +11,9 @@
|
||||
#include "common/RWLock.h"
|
||||
#include "common/ceph_json.h"
|
||||
|
||||
namespace rgw {
|
||||
class BucketChangeObserver;
|
||||
}
|
||||
|
||||
struct rgw_datalog_info {
|
||||
uint32_t num_shards;
|
||||
@ -218,13 +221,15 @@ struct RGWDataSyncEnv {
|
||||
RGWSyncTraceManager *sync_tracer{nullptr};
|
||||
string source_zone;
|
||||
RGWSyncModuleInstanceRef sync_module{nullptr};
|
||||
rgw::BucketChangeObserver *observer{nullptr};
|
||||
|
||||
RGWDataSyncEnv() {}
|
||||
|
||||
void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
|
||||
RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
|
||||
RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer,
|
||||
const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module) {
|
||||
const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module,
|
||||
rgw::BucketChangeObserver *_observer) {
|
||||
cct = _cct;
|
||||
store = _store;
|
||||
conn = _conn;
|
||||
@ -234,6 +239,7 @@ struct RGWDataSyncEnv {
|
||||
sync_tracer = _sync_tracer;
|
||||
source_zone = _source_zone;
|
||||
sync_module = _sync_module;
|
||||
observer = _observer;
|
||||
}
|
||||
|
||||
string shard_obj_name(int shard_id);
|
||||
@ -243,6 +249,7 @@ struct RGWDataSyncEnv {
|
||||
class RGWRemoteDataLog : public RGWCoroutinesManager {
|
||||
RGWRados *store;
|
||||
RGWAsyncRadosProcessor *async_rados;
|
||||
rgw::BucketChangeObserver *observer;
|
||||
RGWHTTPManager http_manager;
|
||||
|
||||
RGWDataSyncEnv sync_env;
|
||||
@ -255,9 +262,10 @@ class RGWRemoteDataLog : public RGWCoroutinesManager {
|
||||
bool initialized;
|
||||
|
||||
public:
|
||||
RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
|
||||
RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
|
||||
rgw::BucketChangeObserver *observer)
|
||||
: RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
|
||||
store(_store), async_rados(async_rados),
|
||||
store(_store), async_rados(async_rados), observer(observer),
|
||||
http_manager(store->ctx(), completion_mgr),
|
||||
lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
|
||||
initialized(false) {}
|
||||
@ -295,10 +303,11 @@ class RGWDataSyncStatusManager {
|
||||
|
||||
public:
|
||||
RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
|
||||
const string& _source_zone)
|
||||
const string& _source_zone,
|
||||
rgw::BucketChangeObserver *observer = nullptr)
|
||||
: store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
|
||||
sync_module(nullptr),
|
||||
source_log(store, async_rados), num_shards(0) {}
|
||||
source_log(store, async_rados, observer), num_shards(0) {}
|
||||
~RGWDataSyncStatusManager() {
|
||||
finalize();
|
||||
}
|
||||
@ -356,10 +365,8 @@ struct rgw_bucket_shard_full_sync_marker {
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void dump(Formatter *f) const {
|
||||
encode_json("position", position, f);
|
||||
encode_json("count", count, f);
|
||||
}
|
||||
void dump(Formatter *f) const;
|
||||
void decode_json(JSONObj *obj);
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
|
||||
|
||||
@ -382,9 +389,8 @@ struct rgw_bucket_shard_inc_sync_marker {
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void dump(Formatter *f) const {
|
||||
encode_json("position", position, f);
|
||||
}
|
||||
void dump(Formatter *f) const;
|
||||
void decode_json(JSONObj *obj);
|
||||
|
||||
bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
|
||||
return (position < m.position);
|
||||
@ -423,26 +429,8 @@ struct rgw_bucket_shard_sync_info {
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void dump(Formatter *f) const {
|
||||
string s;
|
||||
switch ((SyncState)state) {
|
||||
case StateInit:
|
||||
s = "init";
|
||||
break;
|
||||
case StateFullSync:
|
||||
s = "full-sync";
|
||||
break;
|
||||
case StateIncrementalSync:
|
||||
s = "incremental-sync";
|
||||
break;
|
||||
default:
|
||||
s = "unknown";
|
||||
break;
|
||||
}
|
||||
encode_json("status", s, f);
|
||||
encode_json("full_marker", full_marker, f);
|
||||
encode_json("inc_marker", inc_marker, f);
|
||||
}
|
||||
void dump(Formatter *f) const;
|
||||
void decode_json(JSONObj *obj);
|
||||
|
||||
rgw_bucket_shard_sync_info() : state((int)StateInit) {}
|
||||
|
||||
@ -530,6 +518,11 @@ public:
|
||||
int run();
|
||||
};
|
||||
|
||||
/// read the sync status of all bucket shards from the given source zone
|
||||
int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
|
||||
const rgw_bucket& bucket,
|
||||
std::vector<rgw_bucket_shard_sync_info> *status);
|
||||
|
||||
class RGWDefaultSyncModule : public RGWSyncModule {
|
||||
public:
|
||||
RGWDefaultSyncModule() {}
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "rgw_keystone.h"
|
||||
#include "rgw_basic_types.h"
|
||||
#include "rgw_op.h"
|
||||
#include "rgw_data_sync.h"
|
||||
#include "rgw_sync.h"
|
||||
#include "rgw_orphan.h"
|
||||
|
||||
@ -790,6 +791,13 @@ void rgw_obj_key::dump(Formatter *f) const
|
||||
encode_json("ns", ns, f);
|
||||
}
|
||||
|
||||
void rgw_obj_key::decode_json(JSONObj *obj)
|
||||
{
|
||||
JSONDecoder::decode_json("name", name, obj);
|
||||
JSONDecoder::decode_json("instance", instance, obj);
|
||||
JSONDecoder::decode_json("ns", ns, obj);
|
||||
}
|
||||
|
||||
void RGWBucketEnt::dump(Formatter *f) const
|
||||
{
|
||||
encode_json("bucket", bucket, f);
|
||||
@ -1344,6 +1352,65 @@ void rgw_sync_error_info::dump(Formatter *f) const {
|
||||
encode_json("message", message, f);
|
||||
}
|
||||
|
||||
void rgw_bucket_shard_full_sync_marker::decode_json(JSONObj *obj)
|
||||
{
|
||||
JSONDecoder::decode_json("position", position, obj);
|
||||
JSONDecoder::decode_json("count", count, obj);
|
||||
}
|
||||
|
||||
void rgw_bucket_shard_full_sync_marker::dump(Formatter *f) const
|
||||
{
|
||||
encode_json("position", position, f);
|
||||
encode_json("count", count, f);
|
||||
}
|
||||
|
||||
void rgw_bucket_shard_inc_sync_marker::decode_json(JSONObj *obj)
|
||||
{
|
||||
JSONDecoder::decode_json("position", position, obj);
|
||||
}
|
||||
|
||||
void rgw_bucket_shard_inc_sync_marker::dump(Formatter *f) const
|
||||
{
|
||||
encode_json("position", position, f);
|
||||
}
|
||||
|
||||
void rgw_bucket_shard_sync_info::decode_json(JSONObj *obj)
|
||||
{
|
||||
std::string s;
|
||||
JSONDecoder::decode_json("status", s, obj);
|
||||
if (s == "full-sync") {
|
||||
state = StateFullSync;
|
||||
} else if (s == "incremental-sync") {
|
||||
state = StateIncrementalSync;
|
||||
} else {
|
||||
state = StateInit;
|
||||
}
|
||||
JSONDecoder::decode_json("full_marker", full_marker, obj);
|
||||
JSONDecoder::decode_json("inc_marker", inc_marker, obj);
|
||||
}
|
||||
|
||||
void rgw_bucket_shard_sync_info::dump(Formatter *f) const
|
||||
{
|
||||
const char *s{nullptr};
|
||||
switch ((SyncState)state) {
|
||||
case StateInit:
|
||||
s = "init";
|
||||
break;
|
||||
case StateFullSync:
|
||||
s = "full-sync";
|
||||
break;
|
||||
case StateIncrementalSync:
|
||||
s = "incremental-sync";
|
||||
break;
|
||||
default:
|
||||
s = "unknown";
|
||||
break;
|
||||
}
|
||||
encode_json("status", s, f);
|
||||
encode_json("full_marker", full_marker, f);
|
||||
encode_json("inc_marker", inc_marker, f);
|
||||
}
|
||||
|
||||
/* This utility function shouldn't conflict with the overload of std::to_string
|
||||
* provided by string_ref since Boost 1.54 as it's defined outside of the std
|
||||
* namespace. I hope we'll remove it soon - just after merging the Matt's PR
|
||||
|
@ -844,12 +844,13 @@ struct list_keys_handle {
|
||||
RGWMetadataHandler *handler;
|
||||
};
|
||||
|
||||
int RGWMetadataManager::list_keys_init(string& section, void **handle)
|
||||
int RGWMetadataManager::list_keys_init(const string& section, void **handle)
|
||||
{
|
||||
return list_keys_init(section, string(), handle);
|
||||
}
|
||||
|
||||
int RGWMetadataManager::list_keys_init(string& section, const string& marker, void **handle)
|
||||
int RGWMetadataManager::list_keys_init(const string& section,
|
||||
const string& marker, void **handle)
|
||||
{
|
||||
string entry;
|
||||
RGWMetadataHandler *handler;
|
||||
|
@ -353,8 +353,8 @@ public:
|
||||
obj_version *existing_version = NULL);
|
||||
int remove(string& metadata_key);
|
||||
|
||||
int list_keys_init(string& section, void **phandle);
|
||||
int list_keys_init(string& section, const string& marker, void **phandle);
|
||||
int list_keys_init(const string& section, void **phandle);
|
||||
int list_keys_init(const string& section, const string& marker, void **phandle);
|
||||
int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated);
|
||||
void list_keys_complete(void *handle);
|
||||
|
||||
|
@ -3153,8 +3153,10 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread
|
||||
}
|
||||
public:
|
||||
RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
|
||||
const string& _source_zone)
|
||||
: RGWSyncProcessorThread(_store, "data-sync"), sync(_store, async_rados, _source_zone),
|
||||
const string& _source_zone,
|
||||
rgw::BucketChangeObserver *observer)
|
||||
: RGWSyncProcessorThread(_store, "data-sync"),
|
||||
sync(_store, async_rados, _source_zone, observer),
|
||||
initialized(false) {}
|
||||
|
||||
void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
|
||||
@ -3190,15 +3192,18 @@ class RGWSyncLogTrimThread : public RGWSyncProcessorThread
|
||||
{
|
||||
RGWCoroutinesManager crs;
|
||||
RGWRados *store;
|
||||
rgw::BucketTrimManager *bucket_trim;
|
||||
RGWHTTPManager http;
|
||||
const utime_t trim_interval;
|
||||
|
||||
uint64_t interval_msec() override { return 0; }
|
||||
void stop_process() override { crs.stop(); }
|
||||
public:
|
||||
RGWSyncLogTrimThread(RGWRados *store, int interval)
|
||||
RGWSyncLogTrimThread(RGWRados *store, rgw::BucketTrimManager *bucket_trim,
|
||||
int interval)
|
||||
: RGWSyncProcessorThread(store, "sync-log-trim"),
|
||||
crs(store->ctx(), store->get_cr_registry()), store(store),
|
||||
bucket_trim(bucket_trim),
|
||||
http(store->ctx(), crs.get_completion_mgr()),
|
||||
trim_interval(interval, 0)
|
||||
{}
|
||||
@ -3220,6 +3225,10 @@ public:
|
||||
trim_interval));
|
||||
stacks.push_back(data);
|
||||
|
||||
auto bucket = new RGWCoroutinesStack(store->ctx(), &crs);
|
||||
bucket->call(bucket_trim->create_bucket_trim_cr(&http));
|
||||
stacks.push_back(bucket);
|
||||
|
||||
crs.run(stacks);
|
||||
return 0;
|
||||
}
|
||||
@ -3630,6 +3639,7 @@ void RGWRados::finalize()
|
||||
data_sync_processor_threads.clear();
|
||||
delete sync_log_trimmer;
|
||||
sync_log_trimmer = nullptr;
|
||||
bucket_trim = boost::none;
|
||||
}
|
||||
if (finisher) {
|
||||
finisher->stop();
|
||||
@ -4495,10 +4505,22 @@ int RGWRados::init_complete()
|
||||
}
|
||||
meta_sync_processor_thread->start();
|
||||
|
||||
// configure the bucket trim manager
|
||||
rgw::BucketTrimConfig config;
|
||||
rgw::configure_bucket_trim(cct, config);
|
||||
|
||||
bucket_trim.emplace(this, config);
|
||||
ret = bucket_trim->init();
|
||||
if (ret < 0) {
|
||||
ldout(cct, 0) << "ERROR: failed to start bucket trim manager" << dendl;
|
||||
return ret;
|
||||
}
|
||||
|
||||
Mutex::Locker dl(data_sync_thread_lock);
|
||||
for (auto iter : zone_data_sync_from_map) {
|
||||
ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
|
||||
RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
|
||||
auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first,
|
||||
&*bucket_trim);
|
||||
ret = thread->init();
|
||||
if (ret < 0) {
|
||||
ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
|
||||
@ -4509,7 +4531,7 @@ int RGWRados::init_complete()
|
||||
}
|
||||
auto interval = cct->_conf->rgw_sync_log_trim_interval;
|
||||
if (interval > 0) {
|
||||
sync_log_trimmer = new RGWSyncLogTrimThread(this, interval);
|
||||
sync_log_trimmer = new RGWSyncLogTrimThread(this, &*bucket_trim, interval);
|
||||
ret = sync_log_trimmer->init();
|
||||
if (ret < 0) {
|
||||
ldout(cct, 0) << "ERROR: failed to initialize sync log trim thread" << dendl;
|
||||
@ -6516,6 +6538,21 @@ int RGWRados::BucketShard::init(const rgw_bucket& _bucket, int sid)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWRados::BucketShard::init(const RGWBucketInfo& bucket_info, int sid)
|
||||
{
|
||||
bucket = bucket_info.bucket;
|
||||
shard_id = sid;
|
||||
|
||||
int ret = store->open_bucket_index_shard(bucket_info, index_ctx, shard_id, &bucket_obj);
|
||||
if (ret < 0) {
|
||||
ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
|
||||
return ret;
|
||||
}
|
||||
ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/* Execute @handler on last item in bucket listing for bucket specified
|
||||
* in @bucket_info. @obj_prefix and @obj_delim narrow down the listing
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "rgw_meta_sync_status.h"
|
||||
#include "rgw_period_puller.h"
|
||||
#include "rgw_sync_module.h"
|
||||
#include "rgw_sync_log_trim.h"
|
||||
|
||||
class RGWWatcher;
|
||||
class SafeTimer;
|
||||
@ -2280,6 +2281,7 @@ class RGWRados
|
||||
RGWSyncTraceManager *sync_tracer;
|
||||
map<string, RGWDataSyncProcessorThread *> data_sync_processor_threads;
|
||||
|
||||
boost::optional<rgw::BucketTrimManager> bucket_trim;
|
||||
RGWSyncLogTrimThread *sync_log_trimmer{nullptr};
|
||||
|
||||
Mutex meta_sync_thread_lock;
|
||||
@ -2708,6 +2710,7 @@ public:
|
||||
explicit BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
|
||||
int init(const rgw_bucket& _bucket, const rgw_obj& obj);
|
||||
int init(const rgw_bucket& _bucket, int sid);
|
||||
int init(const RGWBucketInfo& bucket_info, int sid);
|
||||
};
|
||||
|
||||
class Object {
|
||||
|
@ -872,6 +872,55 @@ void RGWOp_MDLog_Status::send_response()
|
||||
flusher.flush();
|
||||
}
|
||||
|
||||
// not in header to avoid pulling in rgw_data_sync.h
|
||||
class RGWOp_BILog_Status : public RGWRESTOp {
|
||||
std::vector<rgw_bucket_shard_sync_info> status;
|
||||
public:
|
||||
int check_caps(RGWUserCaps& caps) override {
|
||||
return caps.check_cap("bilog", RGW_CAP_READ);
|
||||
}
|
||||
int verify_permission() override {
|
||||
return check_caps(s->user->caps);
|
||||
}
|
||||
void execute() override;
|
||||
void send_response() override;
|
||||
const string name() override { return "get_bucket_index_log_status"; }
|
||||
};
|
||||
|
||||
void RGWOp_BILog_Status::execute()
|
||||
{
|
||||
const auto source_zone = s->info.args.get("source-zone");
|
||||
const auto key = s->info.args.get("bucket");
|
||||
if (key.empty()) {
|
||||
ldout(s->cct, 4) << "no 'bucket' provided" << dendl;
|
||||
http_ret = -EINVAL;
|
||||
return;
|
||||
}
|
||||
|
||||
rgw_bucket bucket;
|
||||
int shard_id{-1}; // unused
|
||||
http_ret = rgw_bucket_parse_bucket_key(s->cct, key, &bucket, &shard_id);
|
||||
if (http_ret < 0) {
|
||||
ldout(s->cct, 4) << "no 'bucket' provided" << dendl;
|
||||
http_ret = -EINVAL;
|
||||
return;
|
||||
}
|
||||
|
||||
http_ret = rgw_bucket_sync_status(store, source_zone, bucket, &status);
|
||||
}
|
||||
|
||||
void RGWOp_BILog_Status::send_response()
|
||||
{
|
||||
set_req_state_err(s, http_ret);
|
||||
dump_errno(s);
|
||||
end_header(s);
|
||||
|
||||
if (http_ret >= 0) {
|
||||
encode_json("status", status, s->formatter);
|
||||
}
|
||||
flusher.flush();
|
||||
}
|
||||
|
||||
// not in header to avoid pulling in rgw_data_sync.h
|
||||
class RGWOp_DATALog_Status : public RGWRESTOp {
|
||||
rgw_data_sync_status status;
|
||||
@ -935,6 +984,8 @@ RGWOp *RGWHandler_Log::op_get() {
|
||||
} else if (type.compare("bucket-index") == 0) {
|
||||
if (s->info.args.exists("info")) {
|
||||
return new RGWOp_BILog_Info;
|
||||
} else if (s->info.args.exists("status")) {
|
||||
return new RGWOp_BILog_Status;
|
||||
} else {
|
||||
return new RGWOp_BILog_List;
|
||||
}
|
||||
|
1086
src/rgw/rgw_sync_log_trim.cc
Normal file
1086
src/rgw/rgw_sync_log_trim.cc
Normal file
File diff suppressed because it is too large
Load Diff
109
src/rgw/rgw_sync_log_trim.h
Normal file
109
src/rgw/rgw_sync_log_trim.h
Normal file
@ -0,0 +1,109 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2017 Red Hat, Inc
|
||||
*
|
||||
* Author: Casey Bodley <cbodley@redhat.com>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*/
|
||||
|
||||
#ifndef RGW_SYNC_LOG_TRIM_H
|
||||
#define RGW_SYNC_LOG_TRIM_H
|
||||
|
||||
#include <memory>
|
||||
#include <boost/utility/string_view.hpp>
|
||||
#include "include/encoding.h"
|
||||
#include "common/ceph_time.h"
|
||||
|
||||
class CephContext;
|
||||
class RGWCoroutine;
|
||||
class RGWHTTPManager;
|
||||
class RGWRados;
|
||||
|
||||
namespace rgw {
|
||||
|
||||
/// Interface to inform the trim process about which buckets are most active
|
||||
struct BucketChangeObserver {
|
||||
virtual ~BucketChangeObserver() = default;
|
||||
|
||||
virtual void on_bucket_changed(const boost::string_view& bucket_instance) = 0;
|
||||
};
|
||||
|
||||
/// Configuration for BucketTrimManager
|
||||
struct BucketTrimConfig {
|
||||
/// time interval in seconds between bucket trim attempts
|
||||
uint32_t trim_interval_sec{0};
|
||||
/// maximum number of buckets to track with BucketChangeObserver
|
||||
size_t counter_size{0};
|
||||
/// maximum number of buckets to process each trim interval
|
||||
uint32_t buckets_per_interval{0};
|
||||
/// minimum number of buckets to choose from the global bucket instance list
|
||||
uint32_t min_cold_buckets_per_interval{0};
|
||||
/// maximum number of buckets to process in parallel
|
||||
uint32_t concurrent_buckets{0};
|
||||
/// timeout in ms for bucket trim notify replies
|
||||
uint64_t notify_timeout_ms{0};
|
||||
/// maximum number of recently trimmed buckets to remember (should be small
|
||||
/// enough for a linear search)
|
||||
size_t recent_size{0};
|
||||
/// maximum duration to consider a trim as 'recent' (should be some multiple
|
||||
/// of the trim interval, at least)
|
||||
ceph::timespan recent_duration{0};
|
||||
};
|
||||
|
||||
/// fill out the BucketTrimConfig from the ceph context
|
||||
void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config);
|
||||
|
||||
/// Determines the buckets on which to focus trim activity, using two sources of
|
||||
/// input: the frequency of entries read from the data changes log, and a global
|
||||
/// listing of the bucket.instance metadata. This allows us to trim active
|
||||
/// buckets quickly, while also ensuring that all buckets will eventually trim
|
||||
class BucketTrimManager : public BucketChangeObserver {
|
||||
class Impl;
|
||||
std::unique_ptr<Impl> impl;
|
||||
public:
|
||||
BucketTrimManager(RGWRados *store, const BucketTrimConfig& config);
|
||||
~BucketTrimManager();
|
||||
|
||||
int init();
|
||||
|
||||
/// increment a counter for the given bucket instance
|
||||
void on_bucket_changed(const boost::string_view& bucket_instance) override;
|
||||
|
||||
/// create a coroutine to run the bucket trim process every trim interval
|
||||
RGWCoroutine* create_bucket_trim_cr(RGWHTTPManager *http);
|
||||
|
||||
/// create a coroutine to trim buckets directly via radosgw-admin
|
||||
RGWCoroutine* create_admin_bucket_trim_cr(RGWHTTPManager *http);
|
||||
};
|
||||
|
||||
/// provides persistent storage for the trim manager's current position in the
|
||||
/// list of bucket instance metadata
|
||||
struct BucketTrimStatus {
|
||||
std::string marker; //< metadata key of current bucket instance
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(marker, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
void decode(bufferlist::iterator& p) {
|
||||
DECODE_START(1, p);
|
||||
::decode(marker, p);
|
||||
DECODE_FINISH(p);
|
||||
}
|
||||
|
||||
static const std::string oid;
|
||||
};
|
||||
|
||||
} // namespace rgw
|
||||
|
||||
WRITE_CLASS_ENCODER(rgw::BucketTrimStatus);
|
||||
|
||||
#endif // RGW_SYNC_LOG_TRIM_H
|
@ -271,3 +271,9 @@ add_ceph_unittest(unittest_backport14)
|
||||
add_executable(unittest_convenience test_convenience.cc)
|
||||
target_link_libraries(unittest_convenience ceph-common)
|
||||
add_ceph_unittest(unittest_convenience)
|
||||
|
||||
add_executable(unittest_bounded_key_counter
|
||||
test_bounded_key_counter.cc
|
||||
$<TARGET_OBJECTS:unit-main>)
|
||||
target_link_libraries(unittest_bounded_key_counter global)
|
||||
add_ceph_unittest(unittest_bounded_key_counter)
|
||||
|
200
src/test/common/test_bounded_key_counter.cc
Normal file
200
src/test/common/test_bounded_key_counter.cc
Normal file
@ -0,0 +1,200 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2015 Red Hat
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
#include "common/bounded_key_counter.h"
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
namespace {
|
||||
|
||||
// call get_highest() and return the number of callbacks
|
||||
template <typename Key, typename Count>
|
||||
size_t count_highest(BoundedKeyCounter<Key, Count>& counter, size_t count)
|
||||
{
|
||||
size_t callbacks = 0;
|
||||
counter.get_highest(count, [&callbacks] (const Key& key, Count count) {
|
||||
++callbacks;
|
||||
});
|
||||
return callbacks;
|
||||
}
|
||||
|
||||
// call get_highest() and return the key/value pairs as a vector
|
||||
template <typename Key, typename Count,
|
||||
typename Vector = std::vector<std::pair<Key, Count>>>
|
||||
Vector get_highest(BoundedKeyCounter<Key, Count>& counter, size_t count)
|
||||
{
|
||||
Vector results;
|
||||
counter.get_highest(count, [&results] (const Key& key, Count count) {
|
||||
results.emplace_back(key, count);
|
||||
});
|
||||
return results;
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
TEST(BoundedKeyCounter, Insert)
|
||||
{
|
||||
BoundedKeyCounter<int, int> counter(2);
|
||||
EXPECT_EQ(1, counter.insert(0)); // insert new key
|
||||
EXPECT_EQ(2, counter.insert(0)); // increment counter
|
||||
EXPECT_EQ(7, counter.insert(0, 5)); // add 5 to counter
|
||||
EXPECT_EQ(1, counter.insert(1)); // insert new key
|
||||
EXPECT_EQ(0, counter.insert(2)); // reject new key
|
||||
}
|
||||
|
||||
TEST(BoundedKeyCounter, Erase)
|
||||
{
|
||||
BoundedKeyCounter<int, int> counter(10);
|
||||
|
||||
counter.erase(0); // ok to erase nonexistent key
|
||||
EXPECT_EQ(1, counter.insert(1, 1));
|
||||
EXPECT_EQ(2, counter.insert(2, 2));
|
||||
EXPECT_EQ(3, counter.insert(3, 3));
|
||||
counter.erase(2);
|
||||
counter.erase(1);
|
||||
counter.erase(3);
|
||||
counter.erase(3);
|
||||
EXPECT_EQ(0u, count_highest(counter, 10));
|
||||
}
|
||||
|
||||
TEST(BoundedKeyCounter, Size)
|
||||
{
|
||||
BoundedKeyCounter<int, int> counter(4);
|
||||
EXPECT_EQ(0u, counter.size());
|
||||
EXPECT_EQ(1, counter.insert(1, 1));
|
||||
EXPECT_EQ(1u, counter.size());
|
||||
EXPECT_EQ(2, counter.insert(2, 2));
|
||||
EXPECT_EQ(2u, counter.size());
|
||||
EXPECT_EQ(3, counter.insert(3, 3));
|
||||
EXPECT_EQ(3u, counter.size());
|
||||
EXPECT_EQ(4, counter.insert(4, 4));
|
||||
EXPECT_EQ(4u, counter.size());
|
||||
EXPECT_EQ(0, counter.insert(5, 5)); // reject new key
|
||||
EXPECT_EQ(4u, counter.size()); // size unchanged
|
||||
EXPECT_EQ(5, counter.insert(4, 1)); // update existing key
|
||||
EXPECT_EQ(4u, counter.size()); // size unchanged
|
||||
counter.erase(2);
|
||||
EXPECT_EQ(3u, counter.size());
|
||||
counter.erase(2); // erase duplicate
|
||||
EXPECT_EQ(3u, counter.size()); // size unchanged
|
||||
counter.erase(4);
|
||||
EXPECT_EQ(2u, counter.size());
|
||||
counter.erase(1);
|
||||
EXPECT_EQ(1u, counter.size());
|
||||
counter.erase(3);
|
||||
EXPECT_EQ(0u, counter.size());
|
||||
EXPECT_EQ(6, counter.insert(6, 6));
|
||||
EXPECT_EQ(1u, counter.size());
|
||||
counter.clear();
|
||||
EXPECT_EQ(0u, counter.size());
|
||||
}
|
||||
|
||||
TEST(BoundedKeyCounter, GetHighest)
|
||||
{
|
||||
BoundedKeyCounter<int, int> counter(10);
|
||||
using Vector = std::vector<std::pair<int, int>>;
|
||||
|
||||
EXPECT_EQ(0u, count_highest(counter, 0)); // ok to request 0
|
||||
EXPECT_EQ(0u, count_highest(counter, 10)); // empty
|
||||
EXPECT_EQ(0u, count_highest(counter, 999)); // ok to request count >> 10
|
||||
|
||||
EXPECT_EQ(1, counter.insert(1, 1));
|
||||
EXPECT_EQ(Vector({{1,1}}), get_highest(counter, 10));
|
||||
EXPECT_EQ(2, counter.insert(2, 2));
|
||||
EXPECT_EQ(Vector({{2,2},{1,1}}), get_highest(counter, 10));
|
||||
EXPECT_EQ(3, counter.insert(3, 3));
|
||||
EXPECT_EQ(Vector({{3,3},{2,2},{1,1}}), get_highest(counter, 10));
|
||||
EXPECT_EQ(3, counter.insert(4, 3)); // insert duplicated count=3
|
||||
// still returns 4 entries (but order of {3,3} and {4,3} is unspecified)
|
||||
EXPECT_EQ(4u, count_highest(counter, 10));
|
||||
counter.erase(3);
|
||||
EXPECT_EQ(Vector({{4,3},{2,2},{1,1}}), get_highest(counter, 10));
|
||||
EXPECT_EQ(0u, count_highest(counter, 0)); // requesting 0 still returns 0
|
||||
}
|
||||
|
||||
TEST(BoundedKeyCounter, Clear)
|
||||
{
|
||||
BoundedKeyCounter<int, int> counter(2);
|
||||
EXPECT_EQ(1, counter.insert(0)); // insert new key
|
||||
EXPECT_EQ(1, counter.insert(1)); // insert new key
|
||||
EXPECT_EQ(2u, count_highest(counter, 2)); // return 2 entries
|
||||
|
||||
counter.clear();
|
||||
|
||||
EXPECT_EQ(0u, count_highest(counter, 2)); // return 0 entries
|
||||
EXPECT_EQ(1, counter.insert(1)); // insert new key
|
||||
EXPECT_EQ(1, counter.insert(2)); // insert new unique key
|
||||
EXPECT_EQ(2u, count_highest(counter, 2)); // return 2 entries
|
||||
}
|
||||
|
||||
// tests for partial sort and invalidation
|
||||
TEST(BoundedKeyCounter, GetNumSorted)
|
||||
{
|
||||
struct MockCounter : public BoundedKeyCounter<int, int> {
|
||||
using BoundedKeyCounter<int, int>::BoundedKeyCounter;
|
||||
// expose as public for testing sort invalidations
|
||||
using BoundedKeyCounter<int, int>::get_num_sorted;
|
||||
};
|
||||
|
||||
MockCounter counter(10);
|
||||
|
||||
EXPECT_EQ(0u, counter.get_num_sorted());
|
||||
EXPECT_EQ(0u, count_highest(counter, 10));
|
||||
EXPECT_EQ(0u, counter.get_num_sorted());
|
||||
|
||||
EXPECT_EQ(2, counter.insert(2, 2));
|
||||
EXPECT_EQ(3, counter.insert(3, 3));
|
||||
EXPECT_EQ(4, counter.insert(4, 4));
|
||||
EXPECT_EQ(0u, counter.get_num_sorted());
|
||||
|
||||
EXPECT_EQ(0u, count_highest(counter, 0));
|
||||
EXPECT_EQ(0u, counter.get_num_sorted());
|
||||
EXPECT_EQ(1u, count_highest(counter, 1));
|
||||
EXPECT_EQ(1u, counter.get_num_sorted());
|
||||
EXPECT_EQ(2u, count_highest(counter, 2));
|
||||
EXPECT_EQ(2u, counter.get_num_sorted());
|
||||
EXPECT_EQ(3u, count_highest(counter, 10));
|
||||
EXPECT_EQ(3u, counter.get_num_sorted());
|
||||
|
||||
EXPECT_EQ(1, counter.insert(1, 1)); // insert at bottom does not invalidate
|
||||
EXPECT_EQ(3u, counter.get_num_sorted());
|
||||
|
||||
EXPECT_EQ(4u, count_highest(counter, 10));
|
||||
EXPECT_EQ(4u, counter.get_num_sorted());
|
||||
|
||||
EXPECT_EQ(5, counter.insert(5, 5)); // insert at top invalidates sort
|
||||
EXPECT_EQ(0u, counter.get_num_sorted());
|
||||
|
||||
EXPECT_EQ(0u, count_highest(counter, 0));
|
||||
EXPECT_EQ(0u, counter.get_num_sorted());
|
||||
EXPECT_EQ(1u, count_highest(counter, 1));
|
||||
EXPECT_EQ(1u, counter.get_num_sorted());
|
||||
EXPECT_EQ(2u, count_highest(counter, 2));
|
||||
EXPECT_EQ(2u, counter.get_num_sorted());
|
||||
EXPECT_EQ(3u, count_highest(counter, 3));
|
||||
EXPECT_EQ(3u, counter.get_num_sorted());
|
||||
EXPECT_EQ(4u, count_highest(counter, 4));
|
||||
EXPECT_EQ(4u, counter.get_num_sorted());
|
||||
EXPECT_EQ(5u, count_highest(counter, 10));
|
||||
EXPECT_EQ(5u, counter.get_num_sorted());
|
||||
|
||||
// updating an existing counter only invalidates entries <= that counter
|
||||
EXPECT_EQ(2, counter.insert(1)); // invalidates {1,2} and {2,2}
|
||||
EXPECT_EQ(3u, counter.get_num_sorted());
|
||||
|
||||
EXPECT_EQ(5u, count_highest(counter, 10));
|
||||
EXPECT_EQ(5u, counter.get_num_sorted());
|
||||
|
||||
counter.clear(); // invalidates sort
|
||||
EXPECT_EQ(0u, counter.get_num_sorted());
|
||||
}
|
||||
|
@ -95,6 +95,15 @@ def meta_sync_status(zone):
|
||||
def mdlog_autotrim(zone):
|
||||
zone.cluster.admin(['mdlog', 'autotrim'])
|
||||
|
||||
def bilog_list(zone, bucket, args = None):
|
||||
cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
|
||||
bilog, _ = zone.cluster.admin(cmd, read_only=True)
|
||||
bilog = bilog.decode('utf-8')
|
||||
return json.loads(bilog)
|
||||
|
||||
def bilog_autotrim(zone, args = None):
|
||||
zone.cluster.admin(['bilog', 'autotrim'] + (args or []))
|
||||
|
||||
def parse_meta_sync_status(meta_sync_status_json):
|
||||
meta_sync_status_json = meta_sync_status_json.decode('utf-8')
|
||||
log.debug('current meta sync status=%s', meta_sync_status_json)
|
||||
@ -994,3 +1003,57 @@ def test_encrypted_object_sync():
|
||||
|
||||
key = bucket2.get_key('testobj-sse-kms')
|
||||
eq(data, key.get_contents_as_string())
|
||||
|
||||
def test_bucket_index_log_trim():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
zonegroup_conns = ZonegroupConns(zonegroup)
|
||||
|
||||
zone = zonegroup_conns.rw_zones[0]
|
||||
|
||||
# create a test bucket, upload some objects, and wait for sync
|
||||
def make_test_bucket():
|
||||
name = gen_bucket_name()
|
||||
log.info('create bucket zone=%s name=%s', zone.name, name)
|
||||
bucket = zone.conn.create_bucket(name)
|
||||
for objname in ('a', 'b', 'c', 'd'):
|
||||
k = new_key(zone, name, objname)
|
||||
k.set_contents_from_string('foo')
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
zonegroup_bucket_checkpoint(zonegroup_conns, name)
|
||||
return bucket
|
||||
|
||||
# create a 'cold' bucket
|
||||
cold_bucket = make_test_bucket()
|
||||
|
||||
# trim with max-buckets=0 to clear counters for cold bucket. this should
|
||||
# prevent it from being considered 'active' by the next autotrim
|
||||
bilog_autotrim(zone.zone, [
|
||||
'--rgw-sync-log-trim-max-buckets', '0',
|
||||
])
|
||||
|
||||
# create an 'active' bucket
|
||||
active_bucket = make_test_bucket()
|
||||
|
||||
# trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
|
||||
bilog_autotrim(zone.zone, [
|
||||
'--rgw-sync-log-trim-max-buckets', '1',
|
||||
'--rgw-sync-log-trim-min-cold-buckets', '0',
|
||||
])
|
||||
|
||||
# verify active bucket has empty bilog
|
||||
active_bilog = bilog_list(zone.zone, active_bucket.name)
|
||||
assert(len(active_bilog) == 0)
|
||||
|
||||
# verify cold bucket has nonempty bilog
|
||||
cold_bilog = bilog_list(zone.zone, cold_bucket.name)
|
||||
assert(len(cold_bilog) > 0)
|
||||
|
||||
# trim with min-cold-buckets=999 to trim all buckets
|
||||
bilog_autotrim(zone.zone, [
|
||||
'--rgw-sync-log-trim-max-buckets', '999',
|
||||
'--rgw-sync-log-trim-min-cold-buckets', '999',
|
||||
])
|
||||
|
||||
# verify cold bucket has empty bilog
|
||||
cold_bilog = bilog_list(zone.zone, cold_bucket.name)
|
||||
assert(len(cold_bilog) == 0)
|
||||
|
@ -52,6 +52,8 @@ class Cluster(multisite.Cluster):
|
||||
cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', self.cluster_id]
|
||||
if args:
|
||||
cmd += args
|
||||
cmd += ['--debug-rgw', str(kwargs.pop('debug_rgw', 0))]
|
||||
cmd += ['--debug-ms', str(kwargs.pop('debug_ms', 0))]
|
||||
if kwargs.pop('read_only', False):
|
||||
cmd += ['--rgw-cache-enabled', 'false']
|
||||
return bash(cmd, **kwargs)
|
||||
|
Loading…
Reference in New Issue
Block a user