Merge pull request #36482 from cbodley/wip-rgw-bucket-sync-checkpoint

rgw: add 'bucket sync checkpoint' command to radosgw-admin

Reviewed-by: Adam C. Emerson <aemerson@redhat.com>
This commit is contained in:
Casey Bodley 2020-09-25 10:42:29 -04:00 committed by GitHub
commit 4e680c9d32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 409 additions and 84 deletions

View File

@ -146,16 +146,19 @@ public:
value_by_shards[shard] = value;
}
const std::string& get(int shard, const std::string& default_value) {
const std::string& get(int shard, const std::string& default_value) const {
auto iter = value_by_shards.find(shard);
return (iter == value_by_shards.end() ? default_value : iter->second);
}
const std::map<int, std::string>& get() const {
return value_by_shards;
}
std::map<int, std::string>& get() {
return value_by_shards;
}
bool empty() {
bool empty() const {
return value_by_shards.empty();
}

View File

@ -328,6 +328,7 @@ install(TARGETS radosgwd DESTINATION bin)
set(radosgw_admin_srcs
rgw_admin.cc
rgw_sync_checkpoint.cc
rgw_orphan.cc)
add_executable(radosgw-admin ${radosgw_admin_srcs})
target_link_libraries(radosgw-admin ${rgw_libs} librados

View File

@ -59,6 +59,7 @@ extern "C" {
#include "rgw_pubsub.h"
#include "rgw_sync_module_pubsub.h"
#include "rgw_bucket_sync.h"
#include "rgw_sync_checkpoint.h"
#include "services/svc_sync_modules.h"
#include "services/svc_cls.h"
@ -134,6 +135,7 @@ void usage()
cout << " bucket chown link bucket to specified user and update its object ACLs\n";
cout << " bucket reshard reshard bucket\n";
cout << " bucket rewrite rewrite all objects in the specified bucket\n";
cout << " bucket sync checkpoint poll a bucket's sync status until it catches up to its remote\n";
cout << " bucket sync disable disable bucket sync\n";
cout << " bucket sync enable enable bucket sync\n";
cout << " bucket radoslist list rados objects backing bucket's objects\n";
@ -588,6 +590,7 @@ enum class OPT {
BUCKET_UNLINK,
BUCKET_STATS,
BUCKET_CHECK,
BUCKET_SYNC_CHECKPOINT,
BUCKET_SYNC_INFO,
BUCKET_SYNC_STATUS,
BUCKET_SYNC_MARKERS,
@ -785,6 +788,7 @@ static SimpleCmd::Commands all_cmds = {
{ "bucket unlink", OPT::BUCKET_UNLINK },
{ "bucket stats", OPT::BUCKET_STATS },
{ "bucket check", OPT::BUCKET_CHECK },
{ "bucket sync checkpoint", OPT::BUCKET_SYNC_CHECKPOINT },
{ "bucket sync info", OPT::BUCKET_SYNC_INFO },
{ "bucket sync status", OPT::BUCKET_SYNC_STATUS },
{ "bucket sync markers", OPT::BUCKET_SYNC_MARKERS },
@ -2215,31 +2219,6 @@ std::ostream& operator<<(std::ostream& out, const indented& h) {
return out << std::setw(h.w) << h.header << std::setw(1) << ' ';
}
static int remote_bilog_markers(rgw::sal::RGWRadosStore *store, const RGWZone& source,
RGWRESTConn *conn, const RGWBucketInfo& info,
BucketIndexShardsManager *markers)
{
const auto instance_key = info.bucket.get_key();
const rgw_http_param_pair params[] = {
{ "type" , "bucket-index" },
{ "bucket-instance", instance_key.c_str() },
{ "info" , nullptr },
{ nullptr, nullptr }
};
rgw_bucket_index_marker_info result;
int r = conn->get_json_resource("/admin/log/", params, result);
if (r < 0) {
lderr(store->ctx()) << "failed to fetch remote log markers: " << cpp_strerror(r) << dendl;
return r;
}
r = markers->from_string(result.max_marker, -1);
if (r < 0) {
lderr(store->ctx()) << "failed to decode remote log markers" << dendl;
return r;
}
return 0;
}
static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZone& zone,
const RGWZone& source, RGWRESTConn *conn,
const RGWBucketInfo& bucket_info,
@ -2302,7 +2281,7 @@ static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZo
out << indented{width} << "incremental sync: " << num_inc << "/" << total_shards << " shards\n";
BucketIndexShardsManager remote_markers;
r = remote_bilog_markers(store, source, conn, source_bucket_info, &remote_markers);
r = rgw_read_remote_bilog_info(conn, source_bucket, remote_markers, null_yield);
if (r < 0) {
lderr(store->ctx()) << "failed to read remote log: " << cpp_strerror(r) << dendl;
return r;
@ -3215,6 +3194,8 @@ int main(int argc, const char **argv)
std::optional<int> opt_priority;
std::optional<string> opt_mode;
std::optional<rgw_user> opt_dest_owner;
ceph::timespan opt_retry_delay_ms = std::chrono::milliseconds(2000);
ceph::timespan opt_timeout_sec = std::chrono::seconds(60);
SimpleCmd cmd(all_cmds, cmd_aliases);
@ -3629,6 +3610,10 @@ int main(int argc, const char **argv)
} else if (ceph_argparse_witharg(args, i, &val, "--dest-owner", (char*)NULL)) {
opt_dest_owner.emplace(val);
opt_dest_owner = val;
} else if (ceph_argparse_witharg(args, i, &val, "--retry-delay-ms", (char*)NULL)) {
opt_retry_delay_ms = std::chrono::milliseconds(atoi(val.c_str()));
} else if (ceph_argparse_witharg(args, i, &val, "--timeout-sec", (char*)NULL)) {
opt_timeout_sec = std::chrono::seconds(atoi(val.c_str()));
} else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
// do nothing
} else if (strncmp(*i, "-", 1) == 0) {
@ -3793,6 +3778,7 @@ int main(int argc, const char **argv)
OPT::BUCKETS_LIST,
OPT::BUCKET_LIMIT_CHECK,
OPT::BUCKET_STATS,
OPT::BUCKET_SYNC_CHECKPOINT,
OPT::BUCKET_SYNC_INFO,
OPT::BUCKET_SYNC_STATUS,
OPT::BUCKET_SYNC_MARKERS,
@ -7949,6 +7935,45 @@ next:
}
}
if (opt_cmd == OPT::BUCKET_SYNC_CHECKPOINT) {
std::optional<rgw_zone_id> opt_source_zone;
if (!source_zone.empty()) {
opt_source_zone = source_zone;
}
if (bucket_name.empty()) {
cerr << "ERROR: bucket not specified" << std::endl;
return EINVAL;
}
RGWBucketInfo bucket_info;
rgw_bucket bucket;
int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
if (ret < 0) {
return -ret;
}
if (!store->ctl()->bucket->bucket_imports_data(bucket_info.bucket, null_yield)) {
std::cout << "Sync is disabled for bucket " << bucket_name << std::endl;
return 0;
}
RGWBucketSyncPolicyHandlerRef handler;
ret = store->ctl()->bucket->get_sync_policy_handler(std::nullopt, bucket, &handler, null_yield);
if (ret < 0) {
std::cerr << "ERROR: failed to get policy handler for bucket ("
<< bucket_info.bucket << "): r=" << ret << ": " << cpp_strerror(-ret) << std::endl;
return -ret;
}
auto timeout_at = ceph::coarse_mono_clock::now() + opt_timeout_sec;
ret = rgw_bucket_sync_checkpoint(dpp(), store, *handler, bucket_info,
opt_source_zone, opt_source_bucket,
opt_retry_delay_ms, timeout_at);
if (ret < 0) {
lderr(store->ctx()) << "bucket sync checkpoint failed: " << cpp_strerror(ret) << dendl;
return -ret;
}
}
if ((opt_cmd == OPT::BUCKET_SYNC_DISABLE) || (opt_cmd == OPT::BUCKET_SYNC_ENABLE)) {
if (bucket_name.empty()) {
cerr << "ERROR: bucket not specified" << std::endl;

View File

@ -28,6 +28,7 @@
#include "rgw_sal.h"
#include "cls/lock/cls_lock_client.h"
#include "cls/rgw/cls_rgw_client.h"
#include "services/svc_zone.h"
#include "services/svc_sync_modules.h"
@ -606,7 +607,7 @@ int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
rgw_http_param_pair pairs[] = { { "type", "data" },
{ NULL, NULL } };
int ret = sc.conn->get_json_resource("/admin/log", pairs, *log_info);
int ret = sc.conn->get_json_resource("/admin/log", pairs, null_yield, *log_info);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl;
return ret;
@ -4945,6 +4946,32 @@ string RGWBucketPipeSyncStatusManager::obj_status_oid(const rgw_bucket_sync_pipe
return prefix + ":" + obj->get_name() + ":" + obj->get_instance();
}
int rgw_read_remote_bilog_info(RGWRESTConn* conn,
const rgw_bucket& bucket,
BucketIndexShardsManager& markers,
optional_yield y)
{
const auto instance_key = bucket.get_key();
const rgw_http_param_pair params[] = {
{ "type" , "bucket-index" },
{ "bucket-instance", instance_key.c_str() },
{ "info" , nullptr },
{ nullptr, nullptr }
};
rgw_bucket_index_marker_info result;
int r = conn->get_json_resource("/admin/log/", params, y, result);
if (r < 0) {
lderr(conn->get_ctx()) << "failed to fetch remote log markers: " << cpp_strerror(r) << dendl;
return r;
}
r = markers.from_string(result.max_marker, -1);
if (r < 0) {
lderr(conn->get_ctx()) << "failed to decode remote log markers" << dendl;
return r;
}
return 0;
}
class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
static constexpr int max_concurrent_shards = 16;
rgw::sal::RGWRadosStore *const store;

View File

@ -613,6 +613,13 @@ public:
void wakeup();
};
class BucketIndexShardsManager;
int rgw_read_remote_bilog_info(RGWRESTConn* conn,
const rgw_bucket& bucket,
BucketIndexShardsManager& markers,
optional_yield y);
class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
rgw::sal::RGWRadosStore *store;

View File

@ -3703,7 +3703,8 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx,
return ret;
}
ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize, nullptr, pheaders);
ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize,
nullptr, pheaders, null_yield);
if (ret < 0) {
return ret;
}
@ -3917,7 +3918,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
}
ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
&expected_size, nullptr, nullptr);
&expected_size, nullptr, nullptr, null_yield);
if (ret < 0) {
goto set_err_state;
}
@ -4126,7 +4127,7 @@ int RGWRados::copy_obj_to_remote_dest(RGWObjState *astate,
return ret;
}
ret = rest_master_conn->complete_request(out_stream_req, etag, mtime);
ret = rest_master_conn->complete_request(out_stream_req, etag, mtime, null_yield);
if (ret < 0)
return ret;

View File

@ -803,13 +803,14 @@ int RGWRESTStreamRWRequest::send(RGWHTTPManager *mgr)
return 0;
}
int RGWRESTStreamRWRequest::complete_request(string *etag,
int RGWRESTStreamRWRequest::complete_request(optional_yield y,
string *etag,
real_time *mtime,
uint64_t *psize,
map<string, string> *pattrs,
map<string, string> *pheaders)
{
int ret = wait(null_yield);
int ret = wait(y);
if (ret < 0) {
return ret;
}

View File

@ -176,7 +176,8 @@ public:
int send_request(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr);
int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */);
int complete_request(string *etag = nullptr,
int complete_request(optional_yield y,
string *etag = nullptr,
real_time *mtime = nullptr,
uint64_t *psize = nullptr,
map<string, string> *pattrs = nullptr,

View File

@ -76,14 +76,7 @@ int RGWRESTConn::get_url(string& endpoint)
string RGWRESTConn::get_url()
{
string endpoint;
if (endpoints.empty()) {
ldout(cct, 0) << "WARNING: endpoints not configured for upstream zone" << dendl; /* we'll catch this later */
return endpoint;
}
int i = ++counter;
endpoint = endpoints[i % endpoints.size()];
get_url(endpoint);
return endpoint;
}
@ -159,9 +152,10 @@ int RGWRESTConn::put_obj_async(const rgw_user& uid, rgw::sal::RGWObject* obj, ui
return 0;
}
int RGWRESTConn::complete_request(RGWRESTStreamS3PutObj *req, string& etag, real_time *mtime)
int RGWRESTConn::complete_request(RGWRESTStreamS3PutObj *req, string& etag,
real_time *mtime, optional_yield y)
{
int ret = req->complete_request(&etag, mtime);
int ret = req->complete_request(null_yield, &etag, mtime);
delete req;
return ret;
@ -300,9 +294,10 @@ int RGWRESTConn::complete_request(RGWRESTStreamRWRequest *req,
real_time *mtime,
uint64_t *psize,
map<string, string> *pattrs,
map<string, string> *pheaders)
map<string, string> *pheaders,
optional_yield y)
{
int ret = req->complete_request(etag, mtime, psize, pattrs, pheaders);
int ret = req->complete_request(y, etag, mtime, psize, pattrs, pheaders);
delete req;
return ret;
@ -313,7 +308,8 @@ int RGWRESTConn::get_resource(const string& resource,
map<string, string> *extra_headers,
bufferlist& bl,
bufferlist *send_data,
RGWHTTPManager *mgr)
RGWHTTPManager *mgr,
optional_yield y)
{
string url;
int ret = get_url(url);
@ -343,7 +339,7 @@ int RGWRESTConn::get_resource(const string& resource,
return ret;
}
return req.complete_request();
return req.complete_request(y);
}
RGWRESTReadResource::RGWRESTReadResource(RGWRESTConn *_conn,
@ -388,7 +384,7 @@ int RGWRESTReadResource::read()
return ret;
}
return req.complete_request();
return req.complete_request(null_yield);
}
int RGWRESTReadResource::aio_read()
@ -449,7 +445,7 @@ int RGWRESTSendResource::send(bufferlist& outbl)
return ret;
}
return req.complete_request();
return req.complete_request(null_yield);
}
int RGWRESTSendResource::aio_send(bufferlist& outbl)

View File

@ -115,7 +115,8 @@ public:
int put_obj_send_init(rgw::sal::RGWObject* obj, const rgw_http_param_pair *extra_params, RGWRESTStreamS3PutObj **req);
int put_obj_async(const rgw_user& uid, rgw::sal::RGWObject* obj, uint64_t obj_size,
map<string, bufferlist>& attrs, bool send, RGWRESTStreamS3PutObj **req);
int complete_request(RGWRESTStreamS3PutObj *req, string& etag, ceph::real_time *mtime);
int complete_request(RGWRESTStreamS3PutObj *req, string& etag,
ceph::real_time *mtime, optional_yield y);
struct get_obj_params {
rgw_user uid;
@ -154,21 +155,26 @@ public:
ceph::real_time *mtime,
uint64_t *psize,
map<string, string> *pattrs,
map<string, string> *pheaders);
map<string, string> *pheaders,
optional_yield y);
int get_resource(const string& resource,
param_vec_t *extra_params,
map<string, string>* extra_headers,
bufferlist& bl,
bufferlist *send_data = nullptr,
RGWHTTPManager *mgr = nullptr);
bufferlist *send_data,
RGWHTTPManager *mgr,
optional_yield y);
template <class T>
int get_json_resource(const string& resource, param_vec_t *params, bufferlist *in_data, T& t);
int get_json_resource(const string& resource, param_vec_t *params,
bufferlist *in_data, optional_yield y, T& t);
template <class T>
int get_json_resource(const string& resource, param_vec_t *params, T& t);
int get_json_resource(const string& resource, param_vec_t *params,
optional_yield y, T& t);
template <class T>
int get_json_resource(const string& resource, const rgw_http_param_pair *pp, T& t);
int get_json_resource(const string& resource, const rgw_http_param_pair *pp,
optional_yield y, T& t);
private:
void populate_zonegroup(param_vec_t& params, const string& zonegroup) {
@ -205,10 +211,11 @@ public:
template<class T>
int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params, bufferlist *in_data, T& t)
int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params,
bufferlist *in_data, optional_yield y, T& t)
{
bufferlist bl;
int ret = get_resource(resource, params, nullptr, bl, in_data);
int ret = get_resource(resource, params, nullptr, bl, in_data, nullptr, y);
if (ret < 0) {
return ret;
}
@ -222,16 +229,18 @@ int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params,
}
template<class T>
int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params, T& t)
int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params,
optional_yield y, T& t)
{
return get_json_resource(resource, params, nullptr, t);
return get_json_resource(resource, params, nullptr, y, t);
}
template<class T>
int RGWRESTConn::get_json_resource(const string& resource, const rgw_http_param_pair *pp, T& t)
int RGWRESTConn::get_json_resource(const string& resource, const rgw_http_param_pair *pp,
optional_yield y, T& t)
{
param_vec_t params = make_param_list(pp);
return get_json_resource(resource, &params, t);
return get_json_resource(resource, &params, y, t);
}
class RGWStreamIntoBufferlist : public RGWHTTPStreamRWRequest::ReceiveCB {

View File

@ -244,7 +244,7 @@ int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
rgw_http_param_pair pairs[] = { { "type", "metadata" },
{ NULL, NULL } };
int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
int ret = conn->get_json_resource("/admin/log", pairs, null_yield, *log_info);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog info" << dendl;
return ret;

View File

@ -0,0 +1,223 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2020 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <fmt/format.h>
#include "common/errno.h"
#include "rgw_sync_checkpoint.h"
#include "rgw_sal_rados.h"
#include "rgw_bucket_sync.h"
#include "rgw_data_sync.h"
#include "rgw_http_errors.h"
#include "cls/rgw/cls_rgw_client.h"
#include "services/svc_sys_obj.h"
#include "services/svc_zone.h"
#include "rgw_zone.h"
#define dout_subsys ceph_subsys_rgw
namespace {
std::string incremental_marker(const rgw_bucket_shard_sync_info& info)
{
if (info.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
return "";
}
return BucketIndexShardsManager::get_shard_marker(info.inc_marker.position);
}
bool operator<(const std::vector<rgw_bucket_shard_sync_info>& lhs,
const BucketIndexShardsManager& rhs)
{
for (size_t i = 0; i < lhs.size(); ++i) {
const auto& l = incremental_marker(lhs[i]);
const auto& r = rhs.get(i, "");
if (l < r) {
return true;
}
}
return false;
}
bool empty(const BucketIndexShardsManager& markers, int size)
{
for (int i = 0; i < size; ++i) {
const auto& m = markers.get(i, "");
if (!m.empty()) {
return false;
}
}
return true;
}
std::ostream& operator<<(std::ostream& out, const std::vector<rgw_bucket_shard_sync_info>& rhs)
{
const char* separator = ""; // first entry has no comma
out << '[';
for (auto& i : rhs) {
out << std::exchange(separator, ", ") << incremental_marker(i);
}
return out << ']';
}
std::ostream& operator<<(std::ostream& out, const BucketIndexShardsManager& rhs)
{
out << '[';
const char* separator = ""; // first entry has no comma
for (auto& [i, marker] : rhs.get()) {
out << std::exchange(separator, ", ") << marker;
}
return out << ']';
}
int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp,
rgw::sal::RGWRadosStore *store,
const RGWBucketInfo& bucket_info,
const RGWBucketInfo& source_bucket_info,
const rgw_sync_bucket_pipe& pipe,
const BucketIndexShardsManager& remote_markers,
ceph::timespan retry_delay,
ceph::coarse_mono_time timeout_at)
{
const auto num_shards = source_bucket_info.layout.current_index.layout.normal.num_shards;
if (empty(remote_markers, num_shards)) {
ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl;
return 0;
}
std::vector<rgw_bucket_shard_sync_info> status;
status.resize(std::max<size_t>(1, num_shards));
int r = rgw_bucket_sync_status(dpp, store, pipe, bucket_info,
&source_bucket_info, &status);
if (r < 0) {
return r;
}
while (status < remote_markers) {
auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
if (delay_until > timeout_at) {
ldpp_dout(dpp, 0) << "bucket checkpoint timed out waiting for incremental sync to catch up" << dendl;
return -ETIMEDOUT;
}
ldpp_dout(dpp, 1) << "waiting for incremental sync to catch up:\n"
<< " local status: " << status << '\n'
<< " remote markers: " << remote_markers << dendl;
std::this_thread::sleep_until(delay_until);
r = rgw_bucket_sync_status(dpp, store, pipe, bucket_info, &source_bucket_info, &status);
if (r < 0) {
return r;
}
}
ldpp_dout(dpp, 1) << "bucket sync caught up with source:\n"
<< " local status: " << status << '\n'
<< " remote markers: " << remote_markers << dendl;
return 0;
}
int source_bilog_markers(RGWSI_Zone* zone_svc,
const rgw_sync_bucket_pipe& pipe,
BucketIndexShardsManager& remote_markers,
optional_yield y)
{
ceph_assert(pipe.source.zone);
auto& zone_conn_map = zone_svc->get_zone_conn_map();
auto conn = zone_conn_map.find(pipe.source.zone->id);
if (conn == zone_conn_map.end()) {
return -EINVAL;
}
return rgw_read_remote_bilog_info(conn->second, *pipe.source.bucket,
remote_markers, y);
}
} // anonymous namespace
int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
rgw::sal::RGWRadosStore *store,
const RGWBucketSyncPolicyHandler& policy,
const RGWBucketInfo& info,
std::optional<rgw_zone_id> opt_source_zone,
std::optional<rgw_bucket> opt_source_bucket,
ceph::timespan retry_delay,
ceph::coarse_mono_time timeout_at)
{
struct sync_source_entry {
rgw_sync_bucket_pipe pipe;
BucketIndexShardsManager remote_markers;
RGWBucketInfo source_bucket_info;
};
std::list<sync_source_entry> sources;
// fetch remote markers and bucket info in parallel
boost::asio::io_context ioctx;
for (const auto& [source_zone_id, pipe] : policy.get_all_sources()) {
// filter by source zone/bucket
if (opt_source_zone && *opt_source_zone != *pipe.source.zone) {
continue;
}
if (opt_source_bucket && !opt_source_bucket->match(*pipe.source.bucket)) {
continue;
}
auto& entry = sources.emplace_back();
entry.pipe = pipe;
// fetch remote markers
spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
auto y = optional_yield{ioctx, yield};
int r = source_bilog_markers(store->svc()->zone, entry.pipe,
entry.remote_markers, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "failed to fetch remote bilog markers: "
<< cpp_strerror(r) << dendl;
throw std::system_error(-r, std::system_category());
}
});
// fetch source bucket info
spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
auto y = optional_yield{ioctx, yield};
auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
int r = store->getRados()->get_bucket_instance_info(
obj_ctx, *entry.pipe.source.bucket, entry.source_bucket_info,
nullptr, nullptr, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "failed to read source bucket info: "
<< cpp_strerror(r) << dendl;
throw std::system_error(-r, std::system_category());
}
});
}
try {
ioctx.run();
} catch (const std::system_error& e) {
return -e.code().value();
}
// checkpoint each source sequentially
for (const auto& [pipe, remote_markers, source_bucket_info] : sources) {
int r = bucket_source_sync_checkpoint(dpp, store, info, source_bucket_info,
pipe, remote_markers,
retry_delay, timeout_at);
if (r < 0) {
ldpp_dout(dpp, 0) << "bucket sync checkpoint failed: " << cpp_strerror(r) << dendl;
return r;
}
}
ldpp_dout(dpp, 0) << "bucket checkpoint complete" << dendl;
return 0;
}

View File

@ -0,0 +1,35 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2020 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <optional>
#include "common/ceph_time.h"
#include "rgw_basic_types.h"
class DoutPrefixProvider;
namespace rgw::sal { class RGWRadosStore; }
class RGWBucketInfo;
class RGWBucketSyncPolicyHandler;
// poll the bucket's sync status until it's caught up against all sync sources
int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
rgw::sal::RGWRadosStore *store,
const RGWBucketSyncPolicyHandler& policy,
const RGWBucketInfo& info,
std::optional<rgw_zone_id> opt_source_zone,
std::optional<rgw_bucket> opt_source_bucket,
ceph::timespan retry_delay,
ceph::coarse_mono_time timeout_at);

View File

@ -247,7 +247,8 @@ void RGWMetadataSearchOp::execute()
}
ldout(s->cct, 20) << "sending request to elasticsearch, payload=" << string(in.c_str(), in.length()) << dendl;
auto& extra_headers = es_module->get_request_headers();
op_ret = conn->get_resource(resource, &params, &extra_headers, out, &in);
op_ret = conn->get_resource(resource, &params, &extra_headers,
out, &in, nullptr, null_yield);
if (op_ret < 0) {
ldout(s->cct, 0) << "ERROR: failed to fetch resource (r=" << resource << ", ret=" << op_ret << ")" << dendl;
return;

View File

@ -29,6 +29,7 @@
bucket chown link bucket to specified user and update its object ACLs
bucket reshard reshard bucket
bucket rewrite rewrite all objects in the specified bucket
bucket sync checkpoint poll a bucket's sync status until it catches up to its remote
bucket sync disable disable bucket sync
bucket sync enable enable bucket sync
bucket radoslist list rados objects backing bucket's objects

View File

@ -370,23 +370,13 @@ def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
if not target_zone.syncs_from(source_zone.name):
return
log_status = bucket_source_log_status(source_zone, bucket_name)
log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
for _ in range(config.checkpoint_retries):
sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
log.debug('log_status=%s', log_status)
log.debug('sync_status=%s', sync_status)
if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
return
time.sleep(config.checkpoint_delay)
assert False, 'failed bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
(target_zone.name, source_zone.name, bucket_name)
cmd = ['bucket', 'sync', 'checkpoint']
cmd += ['--bucket', bucket_name, '--source-zone', source_zone.name]
retry_delay_ms = config.checkpoint_delay * 1000
timeout_sec = config.checkpoint_retries * config.checkpoint_delay
cmd += ['--retry-delay-ms', str(retry_delay_ms), '--timeout-sec', str(timeout_sec)]
cmd += target_zone.zone_args()
target_zone.cluster.admin(cmd, debug_rgw=1)
def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
for source_conn in zonegroup_conns.rw_zones:
@ -958,7 +948,8 @@ def test_datalog_autotrim():
k = new_key(zone, bucket.name, 'key')
k.set_contents_from_string('body')
# wait for data sync to catch up
# wait for metadata and data sync to catch up
zonegroup_meta_checkpoint(zonegroup)
zonegroup_data_checkpoint(zonegroup_conns)
# trim each datalog
@ -1112,6 +1103,7 @@ def test_bucket_sync_disable():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
zonegroup_meta_checkpoint(zonegroup)
for bucket_name in buckets:
disable_bucket_sync(realm.meta_master_zone(), bucket_name)
@ -1134,6 +1126,8 @@ def test_bucket_sync_enable_right_after_disable():
k = new_key(zone, bucket.name, objname)
k.set_contents_from_string(content)
zonegroup_meta_checkpoint(zonegroup)
for bucket_name in buckets:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)