rgw/multisite: return error from RGWLastCallerWinsCR() to track marker update failures

Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
This commit is contained in:
Shilpa Jagannath 2022-12-20 10:37:55 -05:00 committed by Adam C. Emerson
parent dc4685f693
commit fb92311b3e
2 changed files with 64 additions and 19 deletions

View File

@ -1862,6 +1862,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
vector<rgw_data_change_log_entry> log_entries;
decltype(log_entries)::iterator log_iter;
bool truncated = false;
int cbret = 0;
utime_t get_idle_interval() const {
ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval);
@ -1920,8 +1921,12 @@ public:
modified_iter != current_modified.end();
++modified_iter) {
if (!lease_cr->is_locked()) {
yield call(marker_tracker->flush());
drain_all();
yield call(marker_tracker->flush());
if (retcode < 0) {
tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
return set_cr_error(-ECANCELED);
}
retcode = parse_bucket_key(modified_iter->key, source_bs);
@ -1950,8 +1955,12 @@ public:
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
if (!lease_cr->is_locked()) {
yield call(marker_tracker->flush());
drain_all();
yield call(marker_tracker->flush());
if (retcode < 0) {
tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
return set_cr_error(-ECANCELED);
}
error_marker = iter->first;
@ -2013,8 +2022,12 @@ public:
log_iter != log_entries.end();
++log_iter) {
if (!lease_cr->is_locked()) {
yield call(marker_tracker->flush());
drain_all();
yield call(marker_tracker->flush());
if (retcode < 0) {
tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
return set_cr_error(-ECANCELED);
}
@ -2032,16 +2045,24 @@ public:
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id
<< ". Duplicate entry?"));
} else {
tn->log(1, SSTR("incremental sync on " << log_iter->entry.key
<< "shard: " << shard_id << "on gen "
<< log_iter->entry.gen));
yield_spawn_window(
data_sync_single_entry(sc, source_bs,log_iter->entry.gen,
log_iter->log_id, log_iter->log_timestamp,
lease_cr,bucket_shard_cache,
tn->log(1, SSTR("incremental sync on " << log_iter->entry.key << "shard: " << shard_id << "on gen " << log_iter->entry.gen));
yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id,
log_iter->log_timestamp, lease_cr,bucket_shard_cache,
&*marker_tracker, error_repo, tn, false),
sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt);
sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, SSTR("data_sync_single_entry returned error: " << ret));
cbret = ret;
}
return 0;
});
}
}
if (cbret < 0 ) {
retcode = cbret;
drain_all();
return set_cr_error(retcode);
}
tn->log(20, SSTR("shard_id=" << shard_id <<
@ -4475,9 +4496,13 @@ int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp)
total_entries = sync_status.full.count;
do {
if (lease_cr && !lease_cr->is_locked()) {
tn->log(1, "no lease or lease is lost, abort");
drain_all();
yield call(marker_tracker.flush());
tn->log(1, "no lease or lease is lost, abort");
if (retcode < 0) {
tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
return set_cr_error(-ECANCELED);
}
set_status("listing remote bucket");
@ -4505,6 +4530,10 @@ int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp)
drain_all();
yield call(marker_tracker.flush());
tn->log(1, "no lease or lease is lost, abort");
if (retcode < 0) {
tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
return set_cr_error(-ECANCELED);
}
tn->log(20, SSTR("[full sync] syncing object: "
@ -4552,11 +4581,15 @@ int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp)
if (lease_cr && !lease_cr->is_locked()) {
tn->log(1, "no lease or lease is lost, abort");
yield call(marker_tracker.flush());
if (retcode < 0) {
tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
return set_cr_error(-ECANCELED);
}
yield call(marker_tracker.flush());
if (retcode < 0) {
tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
/* update sync state to incremental */
@ -4741,9 +4774,13 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
reenter(this) {
do {
if (lease_cr && !lease_cr->is_locked()) {
tn->log(1, "no lease or lease is lost, abort");
drain_all();
yield call(marker_tracker.flush());
tn->log(1, "no lease or lease is lost, abort");
if (retcode < 0) {
tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
return set_cr_error(-ECANCELED);
}
tn->log(20, SSTR("listing bilog for incremental sync; position=" << sync_info.inc_marker.position));
@ -4799,9 +4836,13 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
entries_iter = list_result.begin();
for (; entries_iter != entries_end; ++entries_iter) {
if (lease_cr && !lease_cr->is_locked()) {
tn->log(1, "no lease or lease is lost, abort");
drain_all();
yield call(marker_tracker.flush());
tn->log(1, "no lease or lease is lost, abort");
if (retcode < 0) {
tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
return set_cr_error(-ECANCELED);
}
entry = &(*entries_iter);
@ -4957,7 +4998,7 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
yield call(marker_tracker.flush());
if (retcode < 0) {
tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
if (sync_status < 0) {

View File

@ -1194,6 +1194,10 @@ int RGWLastCallerWinsCR::operate(const DoutPrefixProvider *dpp) {
cr = nullptr;
yield call(call_cr);
/* cr might have been modified at this point */
if (retcode < 0) {
ldpp_dout(dpp, 0) << "ERROR: RGWLastCallerWinsCR() failed: retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
}
return set_cr_done();
}