mirror of
https://github.com/ceph/ceph
synced 2025-02-21 18:17:42 +00:00
rgw: define bucket sync pipes
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
parent
3bc7ca6d8b
commit
a0a68effe4
@ -27,6 +27,7 @@ class RGWBucketSyncPolicyHandler {
|
||||
|
||||
std::set<string> source_zones;
|
||||
|
||||
public:
|
||||
struct peer_info {
|
||||
std::string type;
|
||||
rgw_bucket bucket;
|
||||
@ -38,10 +39,15 @@ class RGWBucketSyncPolicyHandler {
|
||||
}
|
||||
return (type < si.type);
|
||||
}
|
||||
|
||||
bool is_rgw() const {
|
||||
return (type.empty() || type == "rgw");
|
||||
}
|
||||
};
|
||||
|
||||
std::map<string, std::set<peer_info> > sources;
|
||||
std::map<string, std::set<peer_info> > targets;
|
||||
private:
|
||||
std::map<string, std::set<peer_info> > sources; /* peers by zone */
|
||||
std::map<string, std::set<peer_info> > targets; /* peers by zone */
|
||||
|
||||
public:
|
||||
RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
|
||||
@ -49,6 +55,10 @@ public:
|
||||
bucket_info(_bucket_info) {}
|
||||
int init();
|
||||
|
||||
std::map<string, std::set<peer_info> >& get_sources() {
|
||||
return sources;
|
||||
}
|
||||
|
||||
const RGWBucketInfo& get_bucket_info() const {
|
||||
return bucket_info;
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "rgw_cr_tools.h"
|
||||
#include "rgw_http_client.h"
|
||||
#include "rgw_bucket.h"
|
||||
#include "rgw_bucket_sync.h"
|
||||
#include "rgw_metadata.h"
|
||||
#include "rgw_sync_counters.h"
|
||||
#include "rgw_sync_module.h"
|
||||
@ -1039,6 +1040,7 @@ std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
|
||||
class RGWRunBucketSyncCoroutine : public RGWCoroutine {
|
||||
RGWDataSyncCtx *sc;
|
||||
RGWDataSyncEnv *sync_env;
|
||||
rgw_bucket_sync_pair_info sync_pair;
|
||||
rgw_bucket_sync_pipe sync_pipe;
|
||||
rgw_bucket_shard_sync_info sync_status;
|
||||
RGWMetaSyncEnv meta_sync_env;
|
||||
@ -1051,12 +1053,11 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine {
|
||||
RGWSyncTraceNodeRef tn;
|
||||
|
||||
public:
|
||||
RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
|
||||
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
|
||||
status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pipe)),
|
||||
RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, const RGWSyncTraceNodeRef& _tn_parent)
|
||||
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair),
|
||||
status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
|
||||
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
|
||||
SSTR(bucket_shard_str{bs}))) {
|
||||
sync_pipe.source_bs = bs;
|
||||
SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
|
||||
}
|
||||
~RGWRunBucketSyncCoroutine() override {
|
||||
if (lease_cr) {
|
||||
@ -1075,6 +1076,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
|
||||
string entry_marker;
|
||||
|
||||
rgw_bucket_shard bs;
|
||||
rgw_bucket_sync_pair_info sync_pair;
|
||||
|
||||
int sync_status;
|
||||
|
||||
@ -1114,7 +1116,12 @@ public:
|
||||
marker_tracker->reset_need_retry(raw_key);
|
||||
}
|
||||
tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
|
||||
call(new RGWRunBucketSyncCoroutine(sc, bs, tn));
|
||||
|
||||
sync_pair = rgw_bucket_sync_pair_info();
|
||||
sync_pair.source_bs = bs;
|
||||
sync_pair.dest_bs = bs;
|
||||
#warning init pipe fields
|
||||
call(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn));
|
||||
}
|
||||
} while (marker_tracker && marker_tracker->need_retry(raw_key));
|
||||
|
||||
@ -1768,7 +1775,7 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl
|
||||
RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
|
||||
{
|
||||
auto sync_env = sc->env;
|
||||
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.source_bs.bucket,
|
||||
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.info.source_bs.bucket,
|
||||
std::nullopt, sync_pipe.dest_bucket_info,
|
||||
key, std::nullopt, versioned_epoch,
|
||||
true, zones_trace, sync_env->counters, sync_env->dpp);
|
||||
@ -1826,7 +1833,7 @@ int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattabl
|
||||
RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
|
||||
{
|
||||
auto sync_env = sc->env;
|
||||
ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
|
||||
ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
|
||||
if (!sync_pipe.dest_bucket_info.versioned() ||
|
||||
(sync_pipe.dest_bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
|
||||
ldout(sc->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
|
||||
@ -1849,7 +1856,7 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck
|
||||
}
|
||||
|
||||
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
|
||||
sync_pipe.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
|
||||
sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
|
||||
key, dest_key, versioned_epoch,
|
||||
true, zones_trace, nullptr, sync_env->dpp);
|
||||
}
|
||||
@ -1857,14 +1864,14 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck
|
||||
RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
|
||||
real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
|
||||
{
|
||||
ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
|
||||
ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
|
||||
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
|
||||
{
|
||||
ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
|
||||
ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
|
||||
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
|
||||
auto sync_env = sc->env;
|
||||
return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
|
||||
@ -2111,14 +2118,14 @@ public:
|
||||
rgw_bucket_shard_sync_info& _status)
|
||||
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
|
||||
sync_pipe(_sync_pipe),
|
||||
sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe)),
|
||||
sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe.info)),
|
||||
status(_status)
|
||||
{}
|
||||
|
||||
int operate() override {
|
||||
reenter(this) {
|
||||
/* fetch current position in logs */
|
||||
yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.source_bs, &info));
|
||||
yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.info.source_bs, &info));
|
||||
if (retcode < 0 && retcode != -ENOENT) {
|
||||
ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
|
||||
return set_cr_error(retcode);
|
||||
@ -2160,6 +2167,7 @@ RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
|
||||
#warning FIXME
|
||||
rgw_bucket_sync_pipe sync_pipe;
|
||||
sync_pipe.source_bs = bs;
|
||||
sync_pipe.dest_bs = bs;
|
||||
return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pipe, init_status);
|
||||
}
|
||||
|
||||
@ -2231,10 +2239,10 @@ class RGWReadBucketPipeSyncStatusCoroutine : public RGWCoroutine {
|
||||
map<string, bufferlist> attrs;
|
||||
public:
|
||||
RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc,
|
||||
const rgw_bucket_sync_pipe& sync_pipe,
|
||||
const rgw_bucket_sync_pair_info& sync_pair,
|
||||
rgw_bucket_shard_sync_info *_status)
|
||||
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
|
||||
oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pipe)),
|
||||
oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
|
||||
status(_status) {}
|
||||
int operate() override;
|
||||
};
|
||||
@ -2435,10 +2443,11 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_bucke
|
||||
|
||||
RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
|
||||
{
|
||||
rgw_bucket_sync_pair_info sync_pair;
|
||||
sync_pair.source_bs = bs;
|
||||
#warning FIXME
|
||||
rgw_bucket_sync_pipe sync_pipe;
|
||||
sync_pipe.source_bs = bs;
|
||||
return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pipe, sync_status);
|
||||
sync_pair.dest_bs = bs;
|
||||
return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pair, sync_status);
|
||||
}
|
||||
|
||||
RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone,
|
||||
@ -2802,7 +2811,7 @@ public:
|
||||
const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
|
||||
RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct),
|
||||
sc(_sc), sync_env(_sc->env),
|
||||
sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
|
||||
sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
|
||||
key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
|
||||
owner(_owner),
|
||||
timestamp(_timestamp), op(_op),
|
||||
@ -2937,7 +2946,7 @@ public:
|
||||
rgw_bucket_shard_sync_info& sync_info,
|
||||
RGWSyncTraceNodeRef tn_parent)
|
||||
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
|
||||
sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
|
||||
sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
|
||||
lease_cr(lease_cr), sync_info(sync_info),
|
||||
marker_tracker(sc, status_oid, sync_info.full_marker),
|
||||
status_oid(status_oid),
|
||||
@ -3089,7 +3098,7 @@ public:
|
||||
rgw_bucket_shard_sync_info& sync_info,
|
||||
RGWSyncTraceNodeRef& _tn_parent)
|
||||
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
|
||||
sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
|
||||
sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
|
||||
lease_cr(lease_cr), sync_info(sync_info),
|
||||
marker_tracker(sc, status_oid, sync_info.inc_marker),
|
||||
status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id),
|
||||
@ -3397,11 +3406,12 @@ struct rgw_bucket_sync_sources_local_info {
|
||||
};
|
||||
WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
|
||||
|
||||
class RGWReadBucketSourcesInfoCR : public RGWCoroutine {
|
||||
class RGWGetBucketSourcePeersCR : public RGWCoroutine {
|
||||
RGWDataSyncEnv *sync_env;
|
||||
rgw_bucket bucket;
|
||||
|
||||
RGWBucketInfo bucket_info;
|
||||
map<string, set<RGWBucketSyncPolicyHandler::peer_info> > *sources;
|
||||
RGWBucketInfo *pbucket_info;
|
||||
|
||||
rgw_raw_obj sources_obj;
|
||||
|
||||
@ -3409,19 +3419,23 @@ class RGWReadBucketSourcesInfoCR : public RGWCoroutine {
|
||||
rgw_bucket_sync_sources_local_info expected_local_info;
|
||||
|
||||
rgw_bucket_get_sync_policy_params get_policy_params;
|
||||
std::shared_ptr<rgw_bucket_get_sync_policy_result> get_policy_result;
|
||||
std::shared_ptr<rgw_bucket_get_sync_policy_result> policy;
|
||||
|
||||
RGWSyncTraceNodeRef tn;
|
||||
|
||||
public:
|
||||
RGWReadBucketSourcesInfoCR(RGWDataSyncEnv *_sync_env,
|
||||
const rgw_bucket& _bucket,
|
||||
const RGWSyncTraceNodeRef& _tn_parent)
|
||||
RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env,
|
||||
const rgw_bucket& _bucket,
|
||||
map<string, set<RGWBucketSyncPolicyHandler::peer_info> > *_sources,
|
||||
RGWBucketInfo *_pbucket_info,
|
||||
const RGWSyncTraceNodeRef& _tn_parent)
|
||||
: RGWCoroutine(_sync_env->cct),
|
||||
sync_env(_sync_env),
|
||||
bucket(_bucket),
|
||||
sources(_sources),
|
||||
pbucket_info(_pbucket_info),
|
||||
sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)),
|
||||
get_policy_result(make_shared<rgw_bucket_get_sync_policy_result>()),
|
||||
policy(make_shared<rgw_bucket_get_sync_policy_result>()),
|
||||
tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources",
|
||||
SSTR(bucket))) {
|
||||
}
|
||||
@ -3429,7 +3443,7 @@ public:
|
||||
int operate() override;
|
||||
};
|
||||
|
||||
int RGWReadBucketSourcesInfoCR::operate()
|
||||
int RGWGetBucketSourcePeersCR::operate()
|
||||
{
|
||||
reenter(this) {
|
||||
yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
|
||||
@ -3445,11 +3459,19 @@ int RGWReadBucketSourcesInfoCR::operate()
|
||||
yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
|
||||
sync_env->store,
|
||||
get_policy_params,
|
||||
get_policy_result));
|
||||
policy));
|
||||
if (retcode < 0 &&
|
||||
retcode != -ENOENT) {
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
#warning update local copy of sources and act on changes
|
||||
|
||||
{
|
||||
auto& handler = policy->policy_handler;
|
||||
|
||||
*sources = handler->get_sources();
|
||||
*pbucket_info = handler->get_bucket_info();
|
||||
}
|
||||
|
||||
return set_cr_done();
|
||||
}
|
||||
@ -3460,14 +3482,24 @@ int RGWReadBucketSourcesInfoCR::operate()
|
||||
class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
|
||||
RGWDataSyncEnv *sync_env;
|
||||
rgw_bucket bucket;
|
||||
rgw_sync_source source;
|
||||
RGWBucketInfo bucket_info;
|
||||
|
||||
rgw_raw_obj sources_obj;
|
||||
|
||||
map<string, set<RGWBucketSyncPolicyHandler::peer_info> > sources;
|
||||
map<string, set<RGWBucketSyncPolicyHandler::peer_info> >::iterator siter;
|
||||
set<RGWBucketSyncPolicyHandler::peer_info>::iterator piter;
|
||||
|
||||
rgw_bucket_sync_pair_info sync_pair;
|
||||
|
||||
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
|
||||
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
|
||||
|
||||
RGWSyncTraceNodeRef tn;
|
||||
std::vector<RGWDataSyncCtx> scs;
|
||||
RGWDataSyncCtx *cur_sc{nullptr};
|
||||
|
||||
int ret{0};
|
||||
|
||||
public:
|
||||
RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
|
||||
@ -3513,7 +3545,7 @@ int RGWRunBucketSourcesSyncCR::operate()
|
||||
}
|
||||
|
||||
tn->log(10, "took lease");
|
||||
yield call(new RGWReadBucketSourcesInfoCR(sync_env, bucket, tn, &info));
|
||||
yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, &sources, &bucket_info, tn));
|
||||
if (retcode < 0 && retcode != -ENOENT) {
|
||||
tn->log(0, "ERROR: failed to read sync status for bucket");
|
||||
lease_cr->go_down();
|
||||
@ -3521,15 +3553,40 @@ int RGWRunBucketSourcesSyncCR::operate()
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
if (retcode == -ENOENT) {
|
||||
rgw_bucket_sync_pipe sync_pipe;
|
||||
sync_pipe.init_default(bs);
|
||||
info.pipes.push_back(sync_pipe);
|
||||
}
|
||||
for (siter = sources.begin(); siter != sources.end(); ++siter) {
|
||||
scs.emplace_back();
|
||||
cur_sc = &scs.back();
|
||||
{
|
||||
auto& source_zone = siter->first;
|
||||
auto conn = sync_env->svc->zone->get_zone_conn_by_id(source_zone);
|
||||
if (!conn) {
|
||||
ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << source_zone << " does not exist" << dendl;
|
||||
continue;
|
||||
}
|
||||
cur_sc->init(sync_env, conn, siter->first);
|
||||
}
|
||||
for (piter = siter->second.begin(); piter != siter->second.end(); ++piter) {
|
||||
if (!piter->is_rgw()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
yield {
|
||||
for (auto pipe : info.pipes) {
|
||||
spawn(new RGWRunBucketSyncCoroutine(sc, pipe, &tn));
|
||||
sync_pair.source_bs.bucket = piter->bucket;
|
||||
sync_pair.dest_bs.bucket = bucket_info.bucket;
|
||||
|
||||
yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn));
|
||||
while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
|
||||
set_status() << "num_spawned() > spawn_window";
|
||||
yield wait_for_child();
|
||||
bool again = true;
|
||||
while (again) {
|
||||
again = collect(&ret, nullptr);
|
||||
if (ret < 0) {
|
||||
tn->log(10, "a sync operation returned error");
|
||||
/* we have reported this error */
|
||||
}
|
||||
/* not waiting for child here */
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -3541,6 +3598,65 @@ int RGWRunBucketSourcesSyncCR::operate()
|
||||
return 0;
|
||||
}
|
||||
|
||||
class RGWSyncGetBucketInfoCR : public RGWCoroutine {
|
||||
RGWDataSyncEnv *sync_env;
|
||||
rgw_bucket bucket;
|
||||
RGWBucketInfo *pbucket_info;
|
||||
RGWMetaSyncEnv meta_sync_env;
|
||||
|
||||
RGWSyncTraceNodeRef tn;
|
||||
|
||||
public:
|
||||
RGWSyncGetBucketInfoCR(RGWDataSyncEnv *_sync_env,
|
||||
const rgw_bucket& _bucket,
|
||||
RGWBucketInfo *_pbucket_info,
|
||||
const RGWSyncTraceNodeRef& _tn_parent)
|
||||
: RGWCoroutine(_sync_env->cct),
|
||||
sync_env(_sync_env),
|
||||
bucket(_bucket),
|
||||
pbucket_info(_pbucket_info),
|
||||
tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_info",
|
||||
SSTR(bucket))) {
|
||||
}
|
||||
|
||||
int operate() override;
|
||||
};
|
||||
|
||||
int RGWSyncGetBucketInfoCR::operate()
|
||||
{
|
||||
reenter(this) {
|
||||
yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info));
|
||||
if (retcode == -ENOENT) {
|
||||
/* bucket instance info has not been synced in yet, fetch it now */
|
||||
yield {
|
||||
tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
|
||||
string raw_key = string("bucket.instance:") + bucket.get_key();
|
||||
|
||||
meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc->zone->get_master_conn(), sync_env->async_rados,
|
||||
sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
|
||||
|
||||
call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
|
||||
string() /* no marker */,
|
||||
MDLOG_STATUS_COMPLETE,
|
||||
NULL /* no marker tracker */,
|
||||
tn));
|
||||
}
|
||||
if (retcode < 0) {
|
||||
tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bucket}));
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, &pbucket_info));
|
||||
}
|
||||
if (retcode < 0) {
|
||||
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bucket}));
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
return set_cr_done();
|
||||
}
|
||||
}
|
||||
|
||||
int RGWRunBucketSyncCoroutine::operate()
|
||||
{
|
||||
reenter(this) {
|
||||
@ -3566,7 +3682,7 @@ int RGWRunBucketSyncCoroutine::operate()
|
||||
}
|
||||
|
||||
tn->log(10, "took lease");
|
||||
yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pipe, &sync_status));
|
||||
yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status));
|
||||
if (retcode < 0 && retcode != -ENOENT) {
|
||||
tn->log(0, "ERROR: failed to read sync status for bucket");
|
||||
lease_cr->go_down();
|
||||
@ -3574,33 +3690,17 @@ int RGWRunBucketSyncCoroutine::operate()
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
|
||||
tn->log(20, SSTR("sync status for source bucket: " << sync_status.state));
|
||||
|
||||
yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info));
|
||||
if (retcode == -ENOENT) {
|
||||
/* bucket instance info has not been synced in yet, fetch it now */
|
||||
yield {
|
||||
tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
|
||||
string raw_key = string("bucket.instance:") + sync_pipe.source_bs.bucket.get_key();
|
||||
|
||||
meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc->zone->get_master_conn(), sync_env->async_rados,
|
||||
sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
|
||||
|
||||
call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
|
||||
string() /* no marker */,
|
||||
MDLOG_STATUS_COMPLETE,
|
||||
NULL /* no marker tracker */,
|
||||
tn));
|
||||
}
|
||||
if (retcode < 0) {
|
||||
tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{sync_pipe.source_bs.bucket}));
|
||||
lease_cr->go_down();
|
||||
drain_all();
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info));
|
||||
yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info, tn));
|
||||
if (retcode < 0) {
|
||||
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.info.source_bs.bucket}));
|
||||
lease_cr->go_down();
|
||||
drain_all();
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, tn));
|
||||
if (retcode < 0) {
|
||||
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.source_bs.bucket}));
|
||||
lease_cr->go_down();
|
||||
@ -3608,6 +3708,8 @@ int RGWRunBucketSyncCoroutine::operate()
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
sync_pipe.info = sync_pair;
|
||||
|
||||
do {
|
||||
if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
|
||||
yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pipe, sync_status));
|
||||
@ -3680,6 +3782,7 @@ int RGWBucketPipeSyncStatusManager::init()
|
||||
return ret;
|
||||
}
|
||||
|
||||
#warning read specific bucket sources
|
||||
|
||||
const string key = bucket.get_key();
|
||||
|
||||
@ -3791,9 +3894,13 @@ std::ostream& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream& out) cons
|
||||
}
|
||||
|
||||
string RGWBucketPipeSyncStatusManager::status_oid(const string& source_zone,
|
||||
const rgw_bucket_sync_pipe& sync_pipe)
|
||||
const rgw_bucket_sync_pair_info& sync_pair)
|
||||
{
|
||||
return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pipe.source_bs.get_key();
|
||||
if (sync_pair.source_bs == sync_pair.dest_bs) {
|
||||
return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pair.dest_bs.get_key();
|
||||
} else {
|
||||
return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pair.dest_bs.get_key() + ":" + sync_pair.source_bs.get_key();
|
||||
}
|
||||
}
|
||||
|
||||
string RGWBucketPipeSyncStatusManager::obj_status_oid(const string& source_zone,
|
||||
|
@ -18,15 +18,21 @@
|
||||
|
||||
class JSONObj;
|
||||
|
||||
struct rgw_bucket_sync_pipe {
|
||||
struct rgw_bucket_sync_pair_info {
|
||||
rgw_bucket_shard source_bs;
|
||||
RGWBucketInfo dest_bucket_info;
|
||||
rgw_bucket_shard dest_bs;
|
||||
string source_prefix;
|
||||
string dest_prefix;
|
||||
};
|
||||
|
||||
inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
|
||||
if (p.source_bs.bucket == p.dest_bucket_info.bucket &&
|
||||
struct rgw_bucket_sync_pipe {
|
||||
rgw_bucket_sync_pair_info info;
|
||||
RGWBucketInfo source_bucket_info;
|
||||
RGWBucketInfo dest_bucket_info;
|
||||
};
|
||||
|
||||
inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) {
|
||||
if (p.source_bs.bucket == p.dest_bs.bucket &&
|
||||
p.source_prefix == p.dest_prefix) {
|
||||
return out << p.source_bs;
|
||||
}
|
||||
@ -37,7 +43,7 @@ inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
|
||||
out << "/" << p.source_prefix;
|
||||
}
|
||||
|
||||
out << " -> " << p.dest_bucket_info.bucket;
|
||||
out << " -> " << p.dest_bs.bucket;
|
||||
|
||||
if (!p.dest_prefix.empty()) {
|
||||
out << "/" << p.dest_prefix;
|
||||
@ -766,7 +772,7 @@ public:
|
||||
map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
|
||||
int init_sync_status();
|
||||
|
||||
static string status_oid(const string& source_zone, const rgw_bucket_sync_pipe& bs);
|
||||
static string status_oid(const string& source_zone, const rgw_bucket_sync_pair_info& bs);
|
||||
static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */
|
||||
|
||||
// implements DoutPrefixProvider
|
||||
|
Loading…
Reference in New Issue
Block a user