rgw/multisite: add cls versioning for tracking data sync per shard object and store it in a vector

Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
This commit is contained in:
Shilpa Jagannath 2022-08-18 12:55:24 -04:00 committed by Adam C. Emerson
parent 58f33d83bc
commit 9aeb2aa38d

View File

@ -90,6 +90,7 @@ class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
int shard_id{0};;
map<uint32_t, rgw_data_sync_marker>& markers;
std::vector<RGWObjVersionTracker>& objvs;
int handle_result(int r) override {
if (r == -ENOENT) { // ENOENT is not a fatal error
@ -103,9 +104,10 @@ class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
}
public:
RGWReadDataSyncStatusMarkersCR(RGWDataSyncCtx *sc, int num_shards,
map<uint32_t, rgw_data_sync_marker>& markers)
map<uint32_t, rgw_data_sync_marker>& markers,
std::vector<RGWObjVersionTracker>& objvs)
: RGWShardCollectCR(sc->cct, MAX_CONCURRENT_SHARDS),
sc(sc), env(sc->env), num_shards(num_shards), markers(markers)
sc(sc), env(sc->env), num_shards(num_shards), markers(markers), objvs(objvs)
{}
bool spawn_next() override;
};
@ -118,7 +120,7 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next()
using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
spawn(new CR(env->dpp, env->async_rados, env->svc->sysobj,
rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
&markers[shard_id]),
&markers[shard_id], true, &objvs[shard_id]),
false);
shard_id++;
return true;
@ -175,11 +177,13 @@ class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_data_sync_status *sync_status;
std::vector<RGWObjVersionTracker>& objvs;
public:
RGWReadDataSyncStatusCoroutine(RGWDataSyncCtx *_sc,
rgw_data_sync_status *_status)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env), sync_status(_status)
rgw_data_sync_status *_status, std::vector<RGWObjVersionTracker>& objvs)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env),
sync_status(_status), objvs(objvs)
{}
int operate(const DoutPrefixProvider *dpp) override;
};
@ -201,9 +205,10 @@ int RGWReadDataSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp)
return set_cr_error(retcode);
}
// read shard markers
objvs.resize(sync_status->sync_info.num_shards);
using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
yield call(new ReadMarkersCR(sc, sync_status->sync_info.num_shards,
sync_status->sync_markers));
sync_status->sync_markers, objvs));
if (retcode < 0) {
ldpp_dout(dpp, 4) << "failed to read sync status markers with "
<< cpp_strerror(retcode) << dendl;
@ -528,6 +533,7 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
string lock_name;
string cookie;
rgw_data_sync_status *status;
std::vector<RGWObjVersionTracker>& objvs;
map<int, RGWDataChangesLogInfo> shards_info;
RGWSyncTraceNodeRef tn;
@ -535,10 +541,11 @@ public:
RGWInitDataSyncStatusCoroutine(RGWDataSyncCtx *_sc, uint32_t num_shards,
uint64_t instance_id,
RGWSyncTraceNodeRef& _tn_parent,
rgw_data_sync_status *status)
rgw_data_sync_status *status,
std::vector<RGWObjVersionTracker>& objvs)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), driver(sync_env->driver),
pool(sync_env->svc->zone->get_zone_params().log_pool),
num_shards(num_shards), status(status),
num_shards(num_shards), status(status), objvs(objvs),
tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
lock_name = "sync_lock";
@ -604,15 +611,18 @@ public:
yield;
}
yield {
objvs.resize(num_shards);
for (uint32_t i = 0; i < num_shards; i++) {
RGWDataChangesLogInfo& info = shards_info[i];
auto& marker = status->sync_markers[i];
marker.next_step_marker = info.marker;
marker.timestamp = info.last_update;
const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, i);
auto& objv = objvs[i];
objv.generate_new_write_ver(cct);
using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
spawn(new WriteMarkerCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj{pool, oid}, marker), true);
rgw_raw_obj{pool, oid}, marker, &objv), true);
}
}
while (collect(&ret, NULL)) {
@ -718,6 +728,7 @@ void RGWRemoteDataLog::finish()
int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status)
{
// cannot run concurrently with run_sync(), so run in a separate manager
std::vector<RGWObjVersionTracker> objvs;
RGWCoroutinesManager crs(cct, cr_registry);
RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
int ret = http_manager.start();
@ -731,7 +742,7 @@ int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_data_s
RGWDataSyncCtx sc_local = sc;
sc_local.env = &sync_env_local;
ret = crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status));
ret = crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status, objvs));
http_manager.stop();
return ret;
}
@ -773,6 +784,7 @@ int RGWRemoteDataLog::read_recovering_shards(const DoutPrefixProvider *dpp, cons
int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_shards)
{
rgw_data_sync_status sync_status;
std::vector<RGWObjVersionTracker> objvs;
sync_status.sync_info.num_shards = num_shards;
RGWCoroutinesManager crs(cct, cr_registry);
@ -787,7 +799,7 @@ int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_sh
auto instance_id = ceph::util::generate_random_number<uint64_t>();
RGWDataSyncCtx sc_local = sc;
sc_local.env = &sync_env_local;
ret = crs.run(dpp, new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status));
ret = crs.run(dpp, new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status, objvs));
http_manager.stop();
return ret;
}
@ -874,6 +886,7 @@ class RGWListBucketIndexesCR : public RGWCoroutine {
rgw::sal::RadosStore* driver = sync_env->driver;
rgw_data_sync_status *sync_status;
std::vector<RGWObjVersionTracker>& objvs;
int req_ret = 0;
int ret = 0;
@ -894,8 +907,8 @@ class RGWListBucketIndexesCR : public RGWCoroutine {
public:
RGWListBucketIndexesCR(RGWDataSyncCtx* sc,
rgw_data_sync_status* sync_status)
: RGWCoroutine(sc->cct), sc(sc), sync_status(sync_status) {}
rgw_data_sync_status* sync_status, std::vector<RGWObjVersionTracker>& objvs)
: RGWCoroutine(sc->cct), sc(sc), sync_status(sync_status), objvs(objvs) {}
~RGWListBucketIndexesCR() override { }
int operate(const DoutPrefixProvider *dpp) override {
@ -975,7 +988,7 @@ public:
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool,
RGWDataSyncStatusManager::shard_obj_name(
sc->source_zone, shard_id)),
marker),
marker, &objvs[shard_id]),
true);
}
} else {
@ -1011,16 +1024,17 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, strin
string marker_oid;
rgw_data_sync_marker sync_marker;
RGWSyncTraceNodeRef tn;
RGWObjVersionTracker& objv;
public:
RGWDataSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
const string& _marker_oid,
const rgw_data_sync_marker& _marker,
RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
RGWSyncTraceNodeRef& _tn, RGWObjVersionTracker& objv) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
sc(_sc), sync_env(_sc->env),
marker_oid(_marker_oid),
sync_marker(_marker),
tn(_tn) {}
tn(_tn), objv(objv) {}
RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_marker.marker = new_marker;
@ -1031,7 +1045,7 @@ public:
return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
sync_marker);
sync_marker, &objv);
}
RGWOrderCallCR *allocate_order_control_cr() override {
@ -1631,6 +1645,7 @@ protected:
const rgw_raw_obj& error_repo;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
const rgw_data_sync_status& sync_status;
RGWObjVersionTracker& objv;
boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
@ -1648,11 +1663,13 @@ protected:
const string& status_oid, const rgw_raw_obj& error_repo,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_data_sync_status& sync_status,
RGWObjVersionTracker& objv,
const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache)
: RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id),
sync_marker(sync_marker), tn(tn), status_oid(status_oid),
error_repo(error_repo), lease_cr(std::move(lease_cr)),
sync_status(sync_status), bucket_shard_cache(bucket_shard_cache) {}
sync_status(sync_status), objv(objv),
bucket_shard_cache(bucket_shard_cache) {}
};
class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR {
@ -1672,17 +1689,17 @@ public:
rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
const string& status_oid, const rgw_raw_obj& error_repo,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_data_sync_status& sync_status,
const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv,
const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache)
: RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
status_oid, error_repo, std::move(lease_cr),
sync_status, bucket_shard_cache) {}
sync_status, objv, bucket_shard_cache) {}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
tn->log(10, "start full sync");
oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
marker_tracker.emplace(sc, status_oid, sync_marker, tn);
marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
total_entries = sync_marker.pos;
entry_timestamp = sync_marker.timestamp; // time when full sync started
do {
@ -1744,7 +1761,7 @@ public:
sync_marker.next_step_marker.clear();
yield call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
sc->env->dpp,sc->env->async_rados, sc->env->svc->sysobj,
rgw_raw_obj(pool, status_oid), sync_marker));
rgw_raw_obj(pool, status_oid), sync_marker, &objv));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
return set_cr_error(retcode);
@ -1805,19 +1822,19 @@ public:
rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
const string& status_oid, const rgw_raw_obj& error_repo,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_data_sync_status& sync_status,
const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv,
const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache,
ceph::mutex& inc_lock,
bc::flat_set<rgw_data_notify_entry>& modified_shards)
: RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
status_oid, error_repo, std::move(lease_cr),
sync_status, bucket_shard_cache),
sync_status, objv, bucket_shard_cache),
inc_lock(inc_lock), modified_shards(modified_shards) {}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
tn->log(10, "start incremental sync");
marker_tracker.emplace(sc, status_oid, sync_marker, tn);
marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
do {
if (!lease_cr->is_locked()) {
drain_all();
@ -1976,6 +1993,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
rgw_data_sync_marker& sync_marker;
rgw_data_sync_status sync_status;
const RGWSyncTraceNodeRef tn;
RGWObjVersionTracker& objv;
bool *reset_backoff;
ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
@ -2001,10 +2019,10 @@ public:
RGWDataSyncShardCR(RGWDataSyncCtx* const _sc, const rgw_pool& pool,
const uint32_t shard_id, rgw_data_sync_marker& marker,
const rgw_data_sync_status& sync_status,
RGWSyncTraceNodeRef& tn, bool *reset_backoff)
RGWSyncTraceNodeRef& tn, RGWObjVersionTracker& objv, bool *reset_backoff)
: RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id),
sync_marker(marker), sync_status(sync_status), tn(tn),
reset_backoff(reset_backoff) {
objv(objv), reset_backoff(reset_backoff) {
set_description() << "data sync shard source_zone=" << sc->source_zone
<< " shard_id=" << shard_id;
}
@ -2042,7 +2060,7 @@ public:
sync_marker, tn,
status_oid, error_repo,
lease_cr, sync_status,
bucket_shard_cache));
objv, bucket_shard_cache));
if (retcode < 0) {
if (retcode != -EBUSY) {
tn->log(10, SSTR("full sync failed (retcode=" << retcode << ")"));
@ -2056,7 +2074,7 @@ public:
sync_marker, tn,
status_oid, error_repo,
lease_cr, sync_status,
bucket_shard_cache,
objv, bucket_shard_cache,
inc_lock, modified_shards));
if (retcode < 0) {
if (retcode != -EBUSY) {
@ -2103,25 +2121,29 @@ class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
rgw_data_sync_status sync_status;
RGWSyncTraceNodeRef tn;
RGWObjVersionTracker& objv;
public:
RGWDataSyncShardControlCR(RGWDataSyncCtx *_sc, const rgw_pool& _pool,
uint32_t _shard_id, rgw_data_sync_marker& _marker, const rgw_data_sync_status& sync_status,
RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sc->cct, false),
sc(_sc), sync_env(_sc->env),
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker) {
uint32_t _shard_id, rgw_data_sync_marker& _marker,
const rgw_data_sync_status& sync_status,
RGWObjVersionTracker& objv,
RGWSyncTraceNodeRef& _tn_parent)
: RGWBackoffControlCR(_sc->cct, false),
sc(_sc), sync_env(_sc->env),
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker), objv(objv) {
tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
}
RGWCoroutine *alloc_cr() override {
return new RGWDataSyncShardCR(sc, pool, shard_id, sync_marker, sync_status, tn, backoff_ptr());
return new RGWDataSyncShardCR(sc, pool, shard_id, sync_marker, sync_status, tn, objv, backoff_ptr());
}
RGWCoroutine *alloc_finisher_cr() override {
return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
&sync_marker);
&sync_marker, true, &objv);
}
void append_modified_shards(bc::flat_set<rgw_data_notify_entry>& keys) {
@ -2142,6 +2164,7 @@ class RGWDataSyncCR : public RGWCoroutine {
uint32_t num_shards;
rgw_data_sync_status sync_status;
std::vector<RGWObjVersionTracker> objvs;
ceph::mutex shard_crs_lock =
ceph::make_mutex("RGWDataSyncCR::shard_crs_lock");
@ -2152,6 +2175,7 @@ class RGWDataSyncCR : public RGWCoroutine {
RGWSyncTraceNodeRef tn;
RGWDataSyncModule *data_sync_module{nullptr};
RGWObjVersionTracker objv;
public:
RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct),
sc(_sc), sync_env(_sc->env),
@ -2170,7 +2194,7 @@ public:
reenter(this) {
/* read sync status */
yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status));
yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status, objvs));
data_sync_module = sync_env->sync_module->get_data_handler();
@ -2185,7 +2209,7 @@ public:
sync_status.sync_info.num_shards = num_shards;
uint64_t instance_id;
instance_id = ceph::util::generate_random_number<uint64_t>();
yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status));
yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status, objvs));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
return set_cr_error(retcode);
@ -2207,7 +2231,7 @@ public:
return set_cr_error(retcode);
}
/* state: building full sync maps */
yield call(new RGWListBucketIndexesCR(sc, &sync_status));
yield call(new RGWListBucketIndexesCR(sc, &sync_status, objvs));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
return set_cr_error(retcode);
@ -2236,7 +2260,7 @@ public:
for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
iter != sync_status.sync_markers.end(); ++iter) {
RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sc, sync_env->svc->zone->get_zone_params().log_pool,
iter->first, iter->second, sync_status, tn);
iter->first, iter->second, sync_status, objvs[iter->first], tn);
cr->get();
shard_crs_lock.lock();
shard_crs[iter->first] = cr;