mirror of
https://github.com/ceph/ceph
synced 2025-02-25 03:52:04 +00:00
rgw/multisite: don't delete per shard status on init
and pass correct generation and num shards when deleting per shard status objects when disabling during incremental sync Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
This commit is contained in:
parent
d7c94b0112
commit
b44b71ab08
@ -2752,13 +2752,13 @@ public:
|
||||
RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
|
||||
const rgw_bucket_sync_pair_info& _sync_pair,
|
||||
rgw_bucket_shard_sync_info& _status,
|
||||
uint64_t latest_gen,
|
||||
uint64_t gen,
|
||||
const BucketIndexShardsManager& _marker_mgr,
|
||||
RGWObjVersionTracker& objv_tracker,
|
||||
bool exclusive)
|
||||
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
|
||||
sync_pair(_sync_pair),
|
||||
sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, latest_gen)),
|
||||
sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, gen)),
|
||||
status(_status), objv_tracker(objv_tracker), marker_mgr(_marker_mgr), exclusive(exclusive)
|
||||
{}
|
||||
|
||||
@ -2991,23 +2991,22 @@ class InitBucketShardStatusCR : public RGWCoroutine {
|
||||
rgw_bucket_sync_pair_info pair;
|
||||
rgw_bucket_shard_sync_info status;
|
||||
RGWObjVersionTracker objv;
|
||||
const uint64_t latest_gen;
|
||||
const uint64_t gen;
|
||||
const BucketIndexShardsManager& marker_mgr;
|
||||
|
||||
public:
|
||||
InitBucketShardStatusCR(RGWDataSyncCtx* sc,
|
||||
const rgw_bucket_sync_pair_info& pair,
|
||||
uint64_t latest_gen,
|
||||
uint64_t gen,
|
||||
const BucketIndexShardsManager& marker_mgr)
|
||||
: RGWCoroutine(sc->cct), sc(sc), pair(pair), latest_gen(latest_gen), marker_mgr(marker_mgr)
|
||||
: RGWCoroutine(sc->cct), sc(sc), pair(pair), gen(gen), marker_mgr(marker_mgr)
|
||||
{}
|
||||
int operate(const DoutPrefixProvider *dpp) {
|
||||
reenter(this) {
|
||||
// non exclusive create with empty status
|
||||
objv.generate_new_write_ver(cct);
|
||||
yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, pair, status, latest_gen, marker_mgr, objv, false));
|
||||
yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, pair, status, gen, marker_mgr, objv, false));
|
||||
if (retcode < 0) {
|
||||
assert(retcode != -EEXIST && retcode != -ECANCELED);
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
return set_cr_done();
|
||||
@ -3020,7 +3019,7 @@ class InitBucketShardStatusCollectCR : public RGWShardCollectCR {
|
||||
static constexpr int max_concurrent_shards = 16;
|
||||
RGWDataSyncCtx* sc;
|
||||
rgw_bucket_sync_pair_info sync_pair;
|
||||
const uint64_t latest_gen;
|
||||
const uint64_t gen;
|
||||
const BucketIndexShardsManager& marker_mgr;
|
||||
|
||||
const int num_shards;
|
||||
@ -3036,11 +3035,11 @@ class InitBucketShardStatusCollectCR : public RGWShardCollectCR {
|
||||
public:
|
||||
InitBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
|
||||
const rgw_bucket_sync_pair_info& sync_pair,
|
||||
uint64_t latest_gen,
|
||||
uint64_t gen,
|
||||
const BucketIndexShardsManager& marker_mgr,
|
||||
int num_shards)
|
||||
: RGWShardCollectCR(sc->cct, max_concurrent_shards),
|
||||
sc(sc), sync_pair(sync_pair), latest_gen(latest_gen), marker_mgr(marker_mgr), num_shards(num_shards)
|
||||
sc(sc), sync_pair(sync_pair), gen(gen), marker_mgr(marker_mgr), num_shards(num_shards)
|
||||
{}
|
||||
|
||||
bool spawn_next() override {
|
||||
@ -3048,7 +3047,7 @@ class InitBucketShardStatusCollectCR : public RGWShardCollectCR {
|
||||
return false;
|
||||
}
|
||||
sync_pair.source_bs.shard_id = shard++;
|
||||
spawn(new InitBucketShardStatusCR(sc, sync_pair, latest_gen, marker_mgr), false);
|
||||
spawn(new InitBucketShardStatusCR(sc, sync_pair, gen, marker_mgr), false);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
@ -3063,11 +3062,11 @@ class RemoveBucketShardStatusCR : public RGWCoroutine {
|
||||
|
||||
public:
|
||||
RemoveBucketShardStatusCR(RGWDataSyncCtx* sc,
|
||||
const rgw_bucket_sync_pair_info& sync_pair, uint64_t latest_gen)
|
||||
const rgw_bucket_sync_pair_info& sync_pair, uint64_t gen)
|
||||
: RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
|
||||
sync_pair(sync_pair),
|
||||
obj(sync_env->svc->zone->get_zone_params().log_pool,
|
||||
RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, latest_gen))
|
||||
RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, gen))
|
||||
{}
|
||||
|
||||
int operate(const DoutPrefixProvider *dpp) override {
|
||||
@ -3090,7 +3089,7 @@ class RemoveBucketShardStatusCollectCR : public RGWShardCollectCR {
|
||||
RGWDataSyncCtx* const sc;
|
||||
RGWDataSyncEnv* const sync_env;
|
||||
rgw_bucket_sync_pair_info sync_pair;
|
||||
const uint64_t latest_gen;
|
||||
const uint64_t gen;
|
||||
|
||||
const int num_shards;
|
||||
int shard = 0;
|
||||
@ -3105,18 +3104,18 @@ class RemoveBucketShardStatusCollectCR : public RGWShardCollectCR {
|
||||
public:
|
||||
RemoveBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
|
||||
const rgw_bucket_sync_pair_info& sync_pair,
|
||||
uint64_t latest_gen,
|
||||
uint64_t gen,
|
||||
int num_shards)
|
||||
: RGWShardCollectCR(sc->cct, max_concurrent_shards),
|
||||
sc(sc), sync_env(sc->env), sync_pair(sync_pair), latest_gen(latest_gen), num_shards(num_shards)
|
||||
sc(sc), sync_env(sc->env), sync_pair(sync_pair), gen(gen), num_shards(num_shards)
|
||||
{}
|
||||
|
||||
bool spawn_next() override {
|
||||
if (shard >= num_shards || status < 0) { // stop spawning on any errors
|
||||
if (shard >= num_shards) {
|
||||
return false;
|
||||
}
|
||||
sync_pair.source_bs.shard_id = shard++;
|
||||
spawn(new RemoveBucketShardStatusCR(sc, sync_pair, latest_gen), false);
|
||||
spawn(new RemoveBucketShardStatusCR(sc, sync_pair, gen), false);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
@ -3165,7 +3164,6 @@ public:
|
||||
if (check_compat) {
|
||||
// try to convert existing per-shard incremental status for backward compatibility
|
||||
yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental));
|
||||
ldout(cct, 20) << "check for 'all incremental' in compatibility mode" << dendl;
|
||||
if (retcode < 0) {
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
@ -5250,7 +5248,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
|
||||
bucket_status.state == BucketSyncState::Stopped ||
|
||||
bucket_stopped) {
|
||||
// if state is Init or Stopped, we query the remote RGW for ther state
|
||||
yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bucket, &info));
|
||||
yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bucket, &info));
|
||||
if (retcode < 0) {
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
@ -5273,6 +5271,15 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
|
||||
}
|
||||
}
|
||||
|
||||
// if state was incremental, remove all per-shard status objects
|
||||
if (bucket_status.state == BucketSyncState::Incremental) {
|
||||
yield {
|
||||
const auto num_shards = bucket_status.shards_done_with_gen.size();
|
||||
const auto gen = bucket_status.incremental_gen;
|
||||
call(new RemoveBucketShardStatusCollectCR(sc, sync_pair, gen, num_shards));
|
||||
}
|
||||
}
|
||||
|
||||
// check if local state is "stopped"
|
||||
yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
|
||||
status_obj, &bucket_status, false, &objv));
|
||||
@ -5292,13 +5299,9 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
}
|
||||
yield {
|
||||
const int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
|
||||
call(new RemoveBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, num_shards));
|
||||
}
|
||||
RELEASE_LOCK(bucket_lease_cr);
|
||||
return set_cr_done();
|
||||
}
|
||||
}
|
||||
if (bucket_stopped) {
|
||||
tn->log(20, SSTR("ERROR: switched from 'stop' to 'start' sync. while state is: " << bucket_status.state));
|
||||
bucket_stopped = false;
|
||||
|
Loading…
Reference in New Issue
Block a user