rgw: update bucket sync status after bucket shards finishes current gen

Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
This commit is contained in:
Shilpa Jagannath 2021-04-06 01:45:45 +05:30 committed by Casey Bodley
parent 94fdadd356
commit e73eac1bbd

View File

@ -3174,6 +3174,7 @@ public:
}
}
status.shards_done_with_gen.resize(num_shards);
status.incremental_gen = info.latest_gen;
ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl;
@ -4128,12 +4129,88 @@ static bool has_olh_epoch(RGWModifyOp op) {
return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
}
class RGWBucketShardIsDoneCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_sync_status bucket_status;
const rgw_raw_obj& bucket_status_obj;
const int shard_id;
RGWObjVersionTracker objv_tracker;
const next_bilog_result& next_log;
const uint64_t generation;
public:
RGWBucketShardIsDoneCR(RGWDataSyncCtx *_sc, const rgw_raw_obj& _bucket_status_obj,
int _shard_id, const next_bilog_result& _next_log, const uint64_t _gen)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
bucket_status_obj(_bucket_status_obj),
shard_id(_shard_id), next_log(_next_log), generation(_gen) {}
int operate(const DoutPrefixProvider* dpp) override
{
reenter(this) {
do {
// read bucket sync status
objv_tracker.clear();
using ReadCR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
yield call(new ReadCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
bucket_status_obj, &bucket_status, false, &objv_tracker));
if (retcode < 0) {
ldpp_dout(dpp, 20) << "failed to read bucket shard status: "
<< cpp_strerror(retcode) << dendl;
return set_cr_error(retcode);
}
if (bucket_status.state != BucketSyncState::Incremental) {
// exit with success to avoid stale shard being
// retried in error repo if we lost a race
ldpp_dout(dpp, 20) << "RGWBucketShardIsDoneCR found sync state = " << bucket_status.state << dendl;
return set_cr_done();
}
if (bucket_status.incremental_gen != generation) {
// exit with success to avoid stale shard being
// retried in error repo if we lost a race
ldpp_dout(dpp, 20) << "RGWBucketShardIsDoneCR expected gen: " << generation
<< ", got: " << bucket_status.incremental_gen << dendl;
return set_cr_done();
}
yield {
// update bucket_status after a shard is done with current gen
auto& done = bucket_status.shards_done_with_gen;
done[shard_id] = true;
// increment gen if all shards are already done with current gen
if (std::all_of(done.begin(), done.end(),
[] (const bool done){return done; } )) {
bucket_status.incremental_gen = next_log.generation;
done.clear();
done.resize(next_log.num_shards, false);
}
using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
call(new WriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
bucket_status_obj, bucket_status, &objv_tracker, false));
}
if (retcode < 0 && retcode != -ECANCELED) {
ldpp_dout(dpp, 20) << "failed to write bucket sync status: " << cpp_strerror(retcode) << dendl;
return set_cr_error(retcode);
} else if (retcode >= 0) {
return set_cr_done();
}
} while (retcode == -ECANCELED);
}
return 0;
}
};
class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_sync_pipe& sync_pipe;
RGWBucketSyncFlowManager::pipe_rules_ref rules;
rgw_bucket_shard& bs;
const rgw_raw_obj& bucket_status_obj;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
bilog_list_result extended_result;
list<rgw_bi_log_entry> list_result;
@ -4162,7 +4239,8 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
public:
RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe,
const std::string& status_oid,
const std::string& shard_status_oid,
const rgw_raw_obj& _bucket_status_obj,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
rgw_bucket_shard_sync_info& sync_info,
RGWSyncTraceNodeRef& _tn_parent,
@ -4170,11 +4248,11 @@ public:
ceph::real_time* stable_timestamp)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
lease_cr(std::move(lease_cr)), sync_info(sync_info),
zone_id(sync_env->svc->zone->get_zone().id),
bucket_status_obj(_bucket_status_obj), lease_cr(std::move(lease_cr)),
sync_info(sync_info), zone_id(sync_env->svc->zone->get_zone().id),
tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
SSTR(bucket_shard_str{bs}))),
marker_tracker(sc, status_oid, sync_info.inc_marker, tn,
marker_tracker(sc, shard_status_oid, sync_info.inc_marker, tn,
objv_tracker, stable_timestamp)
{
set_description() << "bucket shard incremental sync bucket="
@ -4393,6 +4471,7 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
return 0;
});
}
} while (!list_result.empty() && sync_status == 0 && !syncstopped);
drain_all_cb([&](uint64_t stack_id, int ret) {
@ -4422,6 +4501,17 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
tn->log(10, SSTR("backing out with sync_status=" << sync_status));
return set_cr_error(sync_status);
}
if (!truncated && extended_result.next_log) {
yield call(new RGWBucketShardIsDoneCR(sc, bucket_status_obj, bs.shard_id, *extended_result.next_log, generation));
if (retcode < 0) {
ldout(cct, 20) << "failed to update bucket sync status: "
<< cpp_strerror(retcode) << dendl;
drain_all();
return set_cr_error(retcode);
}
}
return set_cr_done();
}
return 0;
@ -4970,7 +5060,8 @@ class RGWSyncBucketShardCR : public RGWCoroutine {
BucketSyncState& bucket_state;
ceph::real_time* progress;
const std::string status_oid;
const std::string shard_status_oid;
const rgw_raw_obj bucket_status_obj;
rgw_bucket_shard_sync_info sync_status;
RGWObjVersionTracker objv_tracker;
@ -4987,7 +5078,11 @@ public:
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
sync_pipe(sync_pipe), bucket_state(bucket_state), progress(progress),
status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
bucket_status_obj(sc->env->svc->zone->get_zone_params().log_pool,
RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
sync_pair.source_bs.bucket,
sync_pair.dest_bs.bucket)),
tn(tn) {
}
@ -5010,7 +5105,7 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
}
yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
status_oid, lease_cr,
shard_status_oid, bucket_status_obj, lease_cr,
sync_status, tn,
objv_tracker, progress));
if (retcode < 0) {
@ -5204,6 +5299,16 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
}
}
if (sync_pair.source_bs.shard_id >= bucket_status.shards_done_with_gen.size()) {
tn->log(1, SSTR("bucket shard " << sync_pair.source_bs << " index out of bounds"));
return set_cr_done(); // return success so we don't retry
}
if (bucket_status.shards_done_with_gen[sync_pair.source_bs.shard_id]) {
tn->log(10, SSTR("bucket shard " << sync_pair.source_bs << " of gen " <<
gen << " already synced."));
return set_cr_done();
}
yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
sync_pipe, bucket_status.state,
tn, progress));