rgw: pass through bucket sync pairs

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Yehuda Sadeh 2019-09-01 04:25:12 -07:00
parent a0a68effe4
commit 9ce54d0781
7 changed files with 80 additions and 69 deletions

View File

@ -1307,6 +1307,11 @@ struct rgw_bucket_shard {
}
return shard_id < b.shard_id;
}
bool operator==(const rgw_bucket_shard& b) const {
return (bucket == b.bucket &&
shard_id == b.shard_id);
}
};
inline ostream& operator<<(ostream& out, const rgw_bucket_shard& bs) {

View File

@ -2051,15 +2051,20 @@ RGWRemoteBucketLog::RGWRemoteBucketLog(const DoutPrefixProvider *_dpp,
}
int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
const rgw_bucket& bucket, int shard_id,
const rgw_bucket& source_bucket, int shard_id,
const rgw_bucket& dest_bucket,
RGWSyncErrorLogger *_error_logger,
RGWSyncTraceManager *_sync_tracer,
RGWSyncModuleInstanceRef& _sync_module)
{
conn = _conn;
source_zone = _source_zone;
bs.bucket = bucket;
bs.shard_id = shard_id;
sync_pair.source_bs.bucket = source_bucket;
sync_pair.source_bs.shard_id = shard_id;
sync_pair.dest_bs.bucket = dest_bucket;
if (dest_bucket == source_bucket) {
sync_pair.dest_bs.shard_id = shard_id;
}
sync_env.init(dpp, store->ctx(), store, store->svc(), async_rados, http_manager,
_error_logger, _sync_tracer, _sync_module, nullptr);
@ -2106,7 +2111,7 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
const rgw_bucket_sync_pipe& sync_pipe;
const rgw_bucket_sync_pair_info& sync_pair;
const string sync_status_oid;
rgw_bucket_shard_sync_info& status;
@ -2114,18 +2119,18 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
rgw_bucket_index_marker_info info;
public:
RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
const rgw_bucket_sync_pipe& _sync_pipe,
const rgw_bucket_sync_pair_info& _sync_pair,
rgw_bucket_shard_sync_info& _status)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pipe(_sync_pipe),
sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe.info)),
sync_pair(_sync_pair),
sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pair)),
status(_status)
{}
int operate() override {
reenter(this) {
/* fetch current position in logs */
yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.info.source_bs, &info));
yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.source_bs, &info));
if (retcode < 0 && retcode != -ENOENT) {
ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
return set_cr_error(retcode);
@ -2164,11 +2169,7 @@ public:
RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
{
#warning FIXME
rgw_bucket_sync_pipe sync_pipe;
sync_pipe.source_bs = bs;
sync_pipe.dest_bs = bs;
return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pipe, init_status);
return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pair, init_status);
}
#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
@ -2443,10 +2444,6 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_bucke
RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
{
rgw_bucket_sync_pair_info sync_pair;
sync_pair.source_bs = bs;
#warning FIXME
sync_pair.dest_bs = bs;
return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pair, sync_status);
}
@ -2456,7 +2453,7 @@ RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRado
http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
source_zone(_source_zone),
conn(NULL), error_logger(NULL),
bucket(bucket),
dest_bucket(_dest_bucket),
num_shards(0)
{
}
@ -3573,7 +3570,7 @@ int RGWRunBucketSourcesSyncCR::operate()
sync_pair.source_bs.bucket = piter->bucket;
sync_pair.dest_bs.bucket = bucket_info.bucket;
yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn));
yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
@ -3646,7 +3643,7 @@ int RGWSyncGetBucketInfoCR::operate()
return set_cr_error(retcode);
}
yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, &pbucket_info));
yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info));
}
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bucket}));
@ -3655,6 +3652,8 @@ int RGWSyncGetBucketInfoCR::operate()
return set_cr_done();
}
return 0;
}
int RGWRunBucketSyncCoroutine::operate()
@ -3694,7 +3693,7 @@ int RGWRunBucketSyncCoroutine::operate()
yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info, tn));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.info.source_bs.bucket}));
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
@ -3702,7 +3701,7 @@ int RGWRunBucketSyncCoroutine::operate()
yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, tn));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.source_bs.bucket}));
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
@ -3712,7 +3711,7 @@ int RGWRunBucketSyncCoroutine::operate()
do {
if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pipe, sync_status));
yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status));
if (retcode == -ENOENT) {
tn->log(0, "bucket sync disabled");
lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
@ -3765,7 +3764,7 @@ int RGWRunBucketSyncCoroutine::operate()
RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
{
return new RGWRunBucketSyncCoroutine(&sc, bs, sync_env.sync_tracer->root_node);
return new RGWRunBucketSyncCoroutine(&sc, sync_pair, sync_env.sync_tracer->root_node);
}
int RGWBucketPipeSyncStatusManager::init()
@ -3783,8 +3782,9 @@ int RGWBucketPipeSyncStatusManager::init()
}
#warning read specific bucket sources
rgw_bucket source_bucket = dest_bucket;
const string key = bucket.get_key();
const string key = source_bucket.get_key();
rgw_http_param_pair pairs[] = { { "key", key.c_str() },
{ NULL, NULL } };
@ -3807,11 +3807,11 @@ int RGWBucketPipeSyncStatusManager::init()
int effective_num_shards = (num_shards ? num_shards : 1);
auto async_rados = store->svc.rados->get_async_processor();
auto async_rados = store->svc()->rados->get_async_processor();
for (int i = 0; i < effective_num_shards; i++) {
RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, async_rados, &http_manager);
ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, store->getRados()->get_sync_tracer(), sync_module);
ret = l->init(source_zone, conn, source_bucket, (num_shards ? i : -1), dest_bucket, error_logger, store->getRados()->get_sync_tracer(), sync_module);
if (ret < 0) {
ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
return ret;
@ -3852,7 +3852,7 @@ int RGWBucketPipeSyncStatusManager::read_sync_status()
int ret = cr_mgr.run(stacks);
if (ret < 0) {
ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
<< bucket_str{bucket} << dendl;
<< bucket_str{dest_bucket} << dendl;
return ret;
}
@ -3874,7 +3874,7 @@ int RGWBucketPipeSyncStatusManager::run()
int ret = cr_mgr.run(stacks);
if (ret < 0) {
ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
<< bucket_str{bucket} << dendl;
<< bucket_str{dest_bucket} << dendl;
return ret;
}
@ -3890,7 +3890,7 @@ std::ostream& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream& out) cons
{
auto zone = std::string_view{source_zone};
return out << "bucket sync zone:" << zone.substr(0, 8)
<< " bucket:" << bucket.name << ' ';
<< " bucket:" << dest_bucket << ' ';
}
string RGWBucketPipeSyncStatusManager::status_oid(const string& source_zone,
@ -3919,7 +3919,7 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
const int num_shards;
rgw_bucket_shard bs;
#warning change this
rgw_bucket_sync_pipe sync_pipe;
rgw_bucket_sync_pair_info sync_pair;
using Vector = std::vector<rgw_bucket_shard_sync_info>;
Vector::iterator i, end;
@ -3938,8 +3938,8 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
if (i == end) {
return false;
}
sync_pipe.source_bs = bs;
spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pipe, &*i), false);
sync_pair.source_bs = bs;
spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i), false);
++i;
++bs.shard_id;
return true;

View File

@ -25,12 +25,6 @@ struct rgw_bucket_sync_pair_info {
string dest_prefix;
};
struct rgw_bucket_sync_pipe {
rgw_bucket_sync_pair_info info;
RGWBucketInfo source_bucket_info;
RGWBucketInfo dest_bucket_info;
};
inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) {
if (p.source_bs.bucket == p.dest_bs.bucket &&
p.source_prefix == p.dest_prefix) {
@ -52,6 +46,16 @@ inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) {
return out;
}
struct rgw_bucket_sync_pipe {
rgw_bucket_sync_pair_info info;
RGWBucketInfo source_bucket_info;
RGWBucketInfo dest_bucket_info;
};
inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
return out << p.info;
}
struct rgw_datalog_info {
uint32_t num_shards;
@ -707,7 +711,8 @@ class RGWRemoteBucketLog : public RGWCoroutinesManager {
rgw::sal::RGWRadosStore *store;
RGWRESTConn *conn{nullptr};
string source_zone;
rgw_bucket_shard bs;
rgw_bucket_sync_pair_info sync_pair;
RGWAsyncRadosProcessor *async_rados;
RGWHTTPManager *http_manager;
@ -724,7 +729,8 @@ public:
RGWHTTPManager *_http_manager);
int init(const string& _source_zone, RGWRESTConn *_conn,
const rgw_bucket& bucket, int shard_id,
const rgw_bucket& source_bucket, int shard_id,
const rgw_bucket& dest_bucket,
RGWSyncErrorLogger *_error_logger,
RGWSyncTraceManager *_sync_tracer,
RGWSyncModuleInstanceRef& _sync_module);
@ -749,7 +755,7 @@ class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
RGWSyncErrorLogger *error_logger;
RGWSyncModuleInstanceRef sync_module;
rgw_bucket bucket;
rgw_bucket dest_bucket;
map<int, RGWRemoteBucketLog *> source_logs;
@ -764,7 +770,7 @@ class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
public:
RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
const string& _source_zone,
const rgw_bucket& bucket);
const rgw_bucket& dest_bucket);
~RGWBucketPipeSyncStatusManager();
int init();

View File

@ -1584,7 +1584,7 @@ public:
rgw_bucket_sync_pipe& _sync_pipe,
rgw_obj_key& _key,
AWSSyncInstanceEnv& _instance,
uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key),
uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sync_pipe(_sync_pipe),
instance(_instance), versioned_epoch(_versioned_epoch)
{}
@ -1616,7 +1616,7 @@ public:
return set_cr_error(-EINVAL);
}
instance.get_profile(sync_pipe.source_bs.bucket, &target);
instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name);
if (bucket_created.find(target_bucket_name) == bucket_created.end()){
@ -1707,7 +1707,7 @@ class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
public:
RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key),
AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sync_pipe(_sync_pipe),
instance(_instance), versioned_epoch(_versioned_epoch) {
}
@ -1736,9 +1736,9 @@ public:
int operate() override {
reenter(this) {
ldout(sc->cct, 0) << ": remove remote obj: z=" << sc->source_zone
<< " b=" <<sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
<< " b=" <<sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
instance.get_profile(sync_pipe.source_bs.bucket, &target);
instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
string path = instance.conf.get_path(target, sync_pipe.dest_bucket_info, key);
ldout(sc->cct, 0) << "AWS: removing aws object at" << path << dendl;
@ -1775,18 +1775,18 @@ public:
RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
std::optional<uint64_t> versioned_epoch,
rgw_zone_set *zones_trace) override {
ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0));
}
RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
rgw_zone_set *zones_trace) override {
ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance);
}
RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
rgw_zone_set *zones_trace) override {
ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}

View File

@ -779,13 +779,13 @@ class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
public:
RGWElasticHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key),
ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sync_pipe(_sync_pipe), conf(_conf),
versioned_epoch(_versioned_epoch) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sc->source_zone
<< " b=" << sync_pipe.source_bs.bucket << " k=" << key
<< " b=" << sync_pipe.info.source_bs.bucket << " k=" << key
<< " size=" << size << " mtime=" << mtime << dendl;
yield {
@ -815,7 +815,7 @@ class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
public:
RGWElasticHandleRemoteObjCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key),
ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sync_pipe(_sync_pipe),
conf(_conf), versioned_epoch(_versioned_epoch) {
}
@ -843,7 +843,7 @@ public:
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sc->source_zone
<< " b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
<< " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
@ -885,7 +885,7 @@ public:
}
RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
return nullptr;
@ -894,7 +894,7 @@ public:
}
RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
/* versioned and versioned epoch params are useless in the elasticsearch backend case */
ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
return nullptr;
@ -903,7 +903,7 @@ public:
}
RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
ldout(sc->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
return NULL;

View File

@ -42,16 +42,16 @@ public:
explicit RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {}
RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sc->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
return new RGWLogStatRemoteObjCR(sc, sync_pipe.source_bs.bucket, key);
ldout(sc->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
return new RGWLogStatRemoteObjCR(sc, sync_pipe.info.source_bs.bucket, key);
}
RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sc->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
ldout(sc->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sc->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
ldout(sc->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}

View File

@ -1328,7 +1328,7 @@ public:
RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key),
TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sc(_sc),
sync_pipe(_sync_pipe),
env(_env),
@ -1338,7 +1338,7 @@ public:
int operate() override {
reenter(this) {
ldout(sc->cct, 20) << ": stat of remote obj: z=" << sc->source_zone
<< " b=" << sync_pipe.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
<< " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
<< " attrs=" << attrs << dendl;
{
std::vector<std::pair<std::string, std::string> > attrs;
@ -1353,11 +1353,11 @@ public:
// this is why both are created here, once we have information about the
// subscription, we will store/push only the relevant ones
make_event_ref(sc->cct,
sync_pipe.source_bs.bucket, key,
sync_pipe.info.source_bs.bucket, key,
mtime, &attrs,
rgw::notify::ObjectCreated, &event);
make_s3_record_ref(sc->cct,
sync_pipe.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
mtime, &attrs,
rgw::notify::ObjectCreated, &record);
}
@ -1382,7 +1382,7 @@ public:
RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key),
TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sync_pipe(_sync_pipe),
env(_env), versioned_epoch(_versioned_epoch),
topics(_topics) {
@ -1418,7 +1418,7 @@ public:
int operate() override {
reenter(this) {
yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner,
sync_pipe.source_bs.bucket, key,
sync_pipe.info.source_bs.bucket, key,
rgw::notify::ObjectCreated,
&topics));
if (retcode < 0) {
@ -1426,7 +1426,7 @@ public:
return set_cr_error(retcode);
}
if (topics->empty()) {
ldout(sc->cct, 20) << "no topics found for " << sync_pipe.source_bs.bucket << "/" << key << dendl;
ldout(sc->cct, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl;
return set_cr_done();
}
yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics));