From 9ce54d0781c83db5c3575a52239b8ea00e63629c Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Sun, 1 Sep 2019 04:25:12 -0700 Subject: [PATCH] rgw: pass through bucket sync pairs Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_common.h | 5 +++ src/rgw/rgw_data_sync.cc | 66 +++++++++++++++---------------- src/rgw/rgw_data_sync.h | 26 +++++++----- src/rgw/rgw_sync_module_aws.cc | 16 ++++---- src/rgw/rgw_sync_module_es.cc | 14 +++---- src/rgw/rgw_sync_module_log.cc | 8 ++-- src/rgw/rgw_sync_module_pubsub.cc | 14 +++---- 7 files changed, 80 insertions(+), 69 deletions(-) diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 6ddbe979c75..8c3872cf2ae 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -1307,6 +1307,11 @@ struct rgw_bucket_shard { } return shard_id < b.shard_id; } + + bool operator==(const rgw_bucket_shard& b) const { + return (bucket == b.bucket && + shard_id == b.shard_id); + } }; inline ostream& operator<<(ostream& out, const rgw_bucket_shard& bs) { diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 43687023c51..102310cb3e8 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2051,15 +2051,20 @@ RGWRemoteBucketLog::RGWRemoteBucketLog(const DoutPrefixProvider *_dpp, } int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, - const rgw_bucket& bucket, int shard_id, + const rgw_bucket& source_bucket, int shard_id, + const rgw_bucket& dest_bucket, RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module) { conn = _conn; source_zone = _source_zone; - bs.bucket = bucket; - bs.shard_id = shard_id; + sync_pair.source_bs.bucket = source_bucket; + sync_pair.source_bs.shard_id = shard_id; + sync_pair.dest_bs.bucket = dest_bucket; + if (dest_bucket == source_bucket) { + sync_pair.dest_bs.shard_id = shard_id; + } sync_env.init(dpp, store->ctx(), store, store->svc(), async_rados, http_manager, _error_logger, _sync_tracer, _sync_module, nullptr); @@ -2106,7 +2111,7 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; - const rgw_bucket_sync_pipe& sync_pipe; + const rgw_bucket_sync_pair_info& sync_pair; const string sync_status_oid; rgw_bucket_shard_sync_info& status; @@ -2114,18 +2119,18 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { rgw_bucket_index_marker_info info; public: RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc, - const rgw_bucket_sync_pipe& _sync_pipe, + const rgw_bucket_sync_pair_info& _sync_pair, rgw_bucket_shard_sync_info& _status) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - sync_pipe(_sync_pipe), - sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe.info)), + sync_pair(_sync_pair), + sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pair)), status(_status) {} int operate() override { reenter(this) { /* fetch current position in logs */ - yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.info.source_bs, &info)); + yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.source_bs, &info)); if (retcode < 0 && retcode != -ENOENT) { ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl; return set_cr_error(retcode); @@ -2164,11 +2169,7 @@ public: RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr() { -#warning FIXME - rgw_bucket_sync_pipe sync_pipe; - sync_pipe.source_bs = bs; - sync_pipe.dest_bs = bs; - return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pipe, init_status); + return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pair, init_status); } #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync." @@ -2443,10 +2444,6 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set& pending_bucke RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status) { - rgw_bucket_sync_pair_info sync_pair; - sync_pair.source_bs = bs; -#warning FIXME - sync_pair.dest_bs = bs; return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pair, sync_status); } @@ -2456,7 +2453,7 @@ RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRado http_manager(store->ctx(), cr_mgr.get_completion_mgr()), source_zone(_source_zone), conn(NULL), error_logger(NULL), - bucket(bucket), + dest_bucket(_dest_bucket), num_shards(0) { } @@ -3573,7 +3570,7 @@ int RGWRunBucketSourcesSyncCR::operate() sync_pair.source_bs.bucket = piter->bucket; sync_pair.dest_bs.bucket = bucket_info.bucket; - yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn)); + yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false); while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { set_status() << "num_spawned() > spawn_window"; yield wait_for_child(); @@ -3646,7 +3643,7 @@ int RGWSyncGetBucketInfoCR::operate() return set_cr_error(retcode); } - yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, &pbucket_info)); + yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info)); } if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bucket})); @@ -3655,6 +3652,8 @@ int RGWSyncGetBucketInfoCR::operate() return set_cr_done(); } + + return 0; } int RGWRunBucketSyncCoroutine::operate() @@ -3694,7 +3693,7 @@ int RGWRunBucketSyncCoroutine::operate() yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info, tn)); if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.info.source_bs.bucket})); + tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket})); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); @@ -3702,7 +3701,7 @@ int RGWRunBucketSyncCoroutine::operate() yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, tn)); if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.source_bs.bucket})); + tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket})); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); @@ -3712,7 +3711,7 @@ int RGWRunBucketSyncCoroutine::operate() do { if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) { - yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pipe, sync_status)); + yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status)); if (retcode == -ENOENT) { tn->log(0, "bucket sync disabled"); lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock @@ -3765,7 +3764,7 @@ int RGWRunBucketSyncCoroutine::operate() RGWCoroutine *RGWRemoteBucketLog::run_sync_cr() { - return new RGWRunBucketSyncCoroutine(&sc, bs, sync_env.sync_tracer->root_node); + return new RGWRunBucketSyncCoroutine(&sc, sync_pair, sync_env.sync_tracer->root_node); } int RGWBucketPipeSyncStatusManager::init() @@ -3783,8 +3782,9 @@ int RGWBucketPipeSyncStatusManager::init() } #warning read specific bucket sources + rgw_bucket source_bucket = dest_bucket; - const string key = bucket.get_key(); + const string key = source_bucket.get_key(); rgw_http_param_pair pairs[] = { { "key", key.c_str() }, { NULL, NULL } }; @@ -3807,11 +3807,11 @@ int RGWBucketPipeSyncStatusManager::init() int effective_num_shards = (num_shards ? num_shards : 1); - auto async_rados = store->svc.rados->get_async_processor(); + auto async_rados = store->svc()->rados->get_async_processor(); for (int i = 0; i < effective_num_shards; i++) { RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, async_rados, &http_manager); - ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, store->getRados()->get_sync_tracer(), sync_module); + ret = l->init(source_zone, conn, source_bucket, (num_shards ? i : -1), dest_bucket, error_logger, store->getRados()->get_sync_tracer(), sync_module); if (ret < 0) { ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl; return ret; @@ -3852,7 +3852,7 @@ int RGWBucketPipeSyncStatusManager::read_sync_status() int ret = cr_mgr.run(stacks); if (ret < 0) { ldpp_dout(this, 0) << "ERROR: failed to read sync status for " - << bucket_str{bucket} << dendl; + << bucket_str{dest_bucket} << dendl; return ret; } @@ -3874,7 +3874,7 @@ int RGWBucketPipeSyncStatusManager::run() int ret = cr_mgr.run(stacks); if (ret < 0) { ldpp_dout(this, 0) << "ERROR: failed to read sync status for " - << bucket_str{bucket} << dendl; + << bucket_str{dest_bucket} << dendl; return ret; } @@ -3890,7 +3890,7 @@ std::ostream& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream& out) cons { auto zone = std::string_view{source_zone}; return out << "bucket sync zone:" << zone.substr(0, 8) - << " bucket:" << bucket.name << ' '; + << " bucket:" << dest_bucket << ' '; } string RGWBucketPipeSyncStatusManager::status_oid(const string& source_zone, @@ -3919,7 +3919,7 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { const int num_shards; rgw_bucket_shard bs; #warning change this - rgw_bucket_sync_pipe sync_pipe; + rgw_bucket_sync_pair_info sync_pair; using Vector = std::vector; Vector::iterator i, end; @@ -3938,8 +3938,8 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { if (i == end) { return false; } - sync_pipe.source_bs = bs; - spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pipe, &*i), false); + sync_pair.source_bs = bs; + spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i), false); ++i; ++bs.shard_id; return true; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index cf0ca89d0ac..9a3e16b3dab 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -25,12 +25,6 @@ struct rgw_bucket_sync_pair_info { string dest_prefix; }; -struct rgw_bucket_sync_pipe { - rgw_bucket_sync_pair_info info; - RGWBucketInfo source_bucket_info; - RGWBucketInfo dest_bucket_info; -}; - inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) { if (p.source_bs.bucket == p.dest_bs.bucket && p.source_prefix == p.dest_prefix) { @@ -52,6 +46,16 @@ inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) { return out; } +struct rgw_bucket_sync_pipe { + rgw_bucket_sync_pair_info info; + RGWBucketInfo source_bucket_info; + RGWBucketInfo dest_bucket_info; +}; + +inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) { + return out << p.info; +} + struct rgw_datalog_info { uint32_t num_shards; @@ -707,7 +711,8 @@ class RGWRemoteBucketLog : public RGWCoroutinesManager { rgw::sal::RGWRadosStore *store; RGWRESTConn *conn{nullptr}; string source_zone; - rgw_bucket_shard bs; + + rgw_bucket_sync_pair_info sync_pair; RGWAsyncRadosProcessor *async_rados; RGWHTTPManager *http_manager; @@ -724,7 +729,8 @@ public: RGWHTTPManager *_http_manager); int init(const string& _source_zone, RGWRESTConn *_conn, - const rgw_bucket& bucket, int shard_id, + const rgw_bucket& source_bucket, int shard_id, + const rgw_bucket& dest_bucket, RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module); @@ -749,7 +755,7 @@ class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider { RGWSyncErrorLogger *error_logger; RGWSyncModuleInstanceRef sync_module; - rgw_bucket bucket; + rgw_bucket dest_bucket; map source_logs; @@ -764,7 +770,7 @@ class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider { public: RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone, - const rgw_bucket& bucket); + const rgw_bucket& dest_bucket); ~RGWBucketPipeSyncStatusManager(); int init(); diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index ac45036226f..103b28864cf 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -1584,7 +1584,7 @@ public: rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, AWSSyncInstanceEnv& _instance, - uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key), + uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key), sync_pipe(_sync_pipe), instance(_instance), versioned_epoch(_versioned_epoch) {} @@ -1616,7 +1616,7 @@ public: return set_cr_error(-EINVAL); } - instance.get_profile(sync_pipe.source_bs.bucket, &target); + instance.get_profile(sync_pipe.info.source_bs.bucket, &target); instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name); if (bucket_created.find(target_bucket_name) == bucket_created.end()){ @@ -1707,7 +1707,7 @@ class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { public: RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, - AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key), + AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key), sync_pipe(_sync_pipe), instance(_instance), versioned_epoch(_versioned_epoch) { } @@ -1736,9 +1736,9 @@ public: int operate() override { reenter(this) { ldout(sc->cct, 0) << ": remove remote obj: z=" << sc->source_zone - << " b=" < versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0)); } RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance); } RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime + ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index eea1d908201..d7e5dca28ff 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -779,13 +779,13 @@ class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { public: RGWElasticHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, - ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key), + ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key), sync_pipe(_sync_pipe), conf(_conf), versioned_epoch(_versioned_epoch) {} int operate() override { reenter(this) { ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sc->source_zone - << " b=" << sync_pipe.source_bs.bucket << " k=" << key + << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << dendl; yield { @@ -815,7 +815,7 @@ class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { public: RGWElasticHandleRemoteObjCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, - ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key), + ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key), sync_pipe(_sync_pipe), conf(_conf), versioned_epoch(_versioned_epoch) { } @@ -843,7 +843,7 @@ public: int operate() override { reenter(this) { ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sc->source_zone - << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl; + << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl; yield { string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key); @@ -885,7 +885,7 @@ public: } RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) { ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; return nullptr; @@ -894,7 +894,7 @@ public: } RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { /* versioned and versioned epoch params are useless in the elasticsearch backend case */ - ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) { ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; return nullptr; @@ -903,7 +903,7 @@ public: } RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime + ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; ldout(sc->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; return NULL; diff --git a/src/rgw/rgw_sync_module_log.cc b/src/rgw/rgw_sync_module_log.cc index c712f7c17f3..d0475509dae 100644 --- a/src/rgw/rgw_sync_module_log.cc +++ b/src/rgw/rgw_sync_module_log.cc @@ -42,16 +42,16 @@ public: explicit RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {} RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; - return new RGWLogStatRemoteObjCR(sc, sync_pipe.source_bs.bucket, key); + ldout(sc->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + return new RGWLogStatRemoteObjCR(sc, sync_pipe.info.source_bs.bucket, key); } RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sc->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime + ldout(sc->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 008d35f0584..03e1c38c42e 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -1328,7 +1328,7 @@ public: RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, PSEnvRef _env, std::optional _versioned_epoch, - TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key), + TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key), sc(_sc), sync_pipe(_sync_pipe), env(_env), @@ -1338,7 +1338,7 @@ public: int operate() override { reenter(this) { ldout(sc->cct, 20) << ": stat of remote obj: z=" << sc->source_zone - << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime + << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " attrs=" << attrs << dendl; { std::vector > attrs; @@ -1353,11 +1353,11 @@ public: // this is why both are created here, once we have information about the // subscription, we will store/push only the relevant ones make_event_ref(sc->cct, - sync_pipe.source_bs.bucket, key, + sync_pipe.info.source_bs.bucket, key, mtime, &attrs, rgw::notify::ObjectCreated, &event); make_s3_record_ref(sc->cct, - sync_pipe.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key, + sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key, mtime, &attrs, rgw::notify::ObjectCreated, &record); } @@ -1382,7 +1382,7 @@ public: RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, PSEnvRef _env, std::optional _versioned_epoch, - TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key), + TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key), sync_pipe(_sync_pipe), env(_env), versioned_epoch(_versioned_epoch), topics(_topics) { @@ -1418,7 +1418,7 @@ public: int operate() override { reenter(this) { yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner, - sync_pipe.source_bs.bucket, key, + sync_pipe.info.source_bs.bucket, key, rgw::notify::ObjectCreated, &topics)); if (retcode < 0) { @@ -1426,7 +1426,7 @@ public: return set_cr_error(retcode); } if (topics->empty()) { - ldout(sc->cct, 20) << "no topics found for " << sync_pipe.source_bs.bucket << "/" << key << dendl; + ldout(sc->cct, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl; return set_cr_done(); } yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics));