diff --git a/src/rgw/driver/rados/rgw_data_sync.cc b/src/rgw/driver/rados/rgw_data_sync.cc index 5e91f3d24b6..4207f4ddcd1 100644 --- a/src/rgw/driver/rados/rgw_data_sync.cc +++ b/src/rgw/driver/rados/rgw_data_sync.cc @@ -1862,6 +1862,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR { vector log_entries; decltype(log_entries)::iterator log_iter; bool truncated = false; + int cbret = 0; utime_t get_idle_interval() const { ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval); @@ -1920,8 +1921,12 @@ public: modified_iter != current_modified.end(); ++modified_iter) { if (!lease_cr->is_locked()) { - yield call(marker_tracker->flush()); drain_all(); + yield call(marker_tracker->flush()); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode)); + return set_cr_error(retcode); + } return set_cr_error(-ECANCELED); } retcode = parse_bucket_key(modified_iter->key, source_bs); @@ -1950,8 +1955,12 @@ public: iter = error_entries.begin(); for (; iter != error_entries.end(); ++iter) { if (!lease_cr->is_locked()) { - yield call(marker_tracker->flush()); drain_all(); + yield call(marker_tracker->flush()); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode)); + return set_cr_error(retcode); + } return set_cr_error(-ECANCELED); } error_marker = iter->first; @@ -2013,8 +2022,12 @@ public: log_iter != log_entries.end(); ++log_iter) { if (!lease_cr->is_locked()) { - yield call(marker_tracker->flush()); drain_all(); + yield call(marker_tracker->flush()); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode)); + return set_cr_error(retcode); + } return set_cr_error(-ECANCELED); } @@ -2032,17 +2045,25 @@ public: tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); } else { - tn->log(1, SSTR("incremental sync on " << log_iter->entry.key - << "shard: " << shard_id << "on gen " - << log_iter->entry.gen)); - yield_spawn_window( - data_sync_single_entry(sc, source_bs,log_iter->entry.gen, - log_iter->log_id, log_iter->log_timestamp, - lease_cr,bucket_shard_cache, - &*marker_tracker, error_repo, tn, false), - sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt); + tn->log(1, SSTR("incremental sync on " << log_iter->entry.key << "shard: " << shard_id << "on gen " << log_iter->entry.gen)); + yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id, + log_iter->log_timestamp, lease_cr,bucket_shard_cache, + &*marker_tracker, error_repo, tn, false), + sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), + [&](uint64_t stack_id, int ret) { + if (ret < 0) { + tn->log(10, SSTR("data_sync_single_entry returned error: " << ret)); + cbret = ret; + } + return 0; + }); } } + if (cbret < 0 ) { + retcode = cbret; + drain_all(); + return set_cr_error(retcode); + } tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker="<< sync_marker.marker @@ -4475,9 +4496,13 @@ int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp) total_entries = sync_status.full.count; do { if (lease_cr && !lease_cr->is_locked()) { - drain_all(); - yield call(marker_tracker.flush()); tn->log(1, "no lease or lease is lost, abort"); + drain_all(); + yield call(marker_tracker.flush()); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode)); + return set_cr_error(retcode); + } return set_cr_error(-ECANCELED); } set_status("listing remote bucket"); @@ -4505,6 +4530,10 @@ int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp) drain_all(); yield call(marker_tracker.flush()); tn->log(1, "no lease or lease is lost, abort"); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode)); + return set_cr_error(retcode); + } return set_cr_error(-ECANCELED); } tn->log(20, SSTR("[full sync] syncing object: " @@ -4552,11 +4581,15 @@ int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp) if (lease_cr && !lease_cr->is_locked()) { tn->log(1, "no lease or lease is lost, abort"); yield call(marker_tracker.flush()); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode)); + return set_cr_error(retcode); + } return set_cr_error(-ECANCELED); } yield call(marker_tracker.flush()); if (retcode < 0) { - tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode)); + tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode)); return set_cr_error(retcode); } /* update sync state to incremental */ @@ -4741,9 +4774,13 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp) reenter(this) { do { if (lease_cr && !lease_cr->is_locked()) { + tn->log(1, "no lease or lease is lost, abort"); drain_all(); yield call(marker_tracker.flush()); - tn->log(1, "no lease or lease is lost, abort"); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode)); + return set_cr_error(retcode); + } return set_cr_error(-ECANCELED); } tn->log(20, SSTR("listing bilog for incremental sync; position=" << sync_info.inc_marker.position)); @@ -4799,9 +4836,13 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp) entries_iter = list_result.begin(); for (; entries_iter != entries_end; ++entries_iter) { if (lease_cr && !lease_cr->is_locked()) { - drain_all(); - yield call(marker_tracker.flush()); tn->log(1, "no lease or lease is lost, abort"); + drain_all(); + yield call(marker_tracker.flush()); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode)); + return set_cr_error(retcode); + } return set_cr_error(-ECANCELED); } entry = &(*entries_iter); @@ -4957,7 +4998,7 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp) yield call(marker_tracker.flush()); if (retcode < 0) { - tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode)); + tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode)); return set_cr_error(retcode); } if (sync_status < 0) { diff --git a/src/rgw/driver/rados/rgw_sync.cc b/src/rgw/driver/rados/rgw_sync.cc index 081bc7772e9..d0ec90796d9 100644 --- a/src/rgw/driver/rados/rgw_sync.cc +++ b/src/rgw/driver/rados/rgw_sync.cc @@ -1194,6 +1194,10 @@ int RGWLastCallerWinsCR::operate(const DoutPrefixProvider *dpp) { cr = nullptr; yield call(call_cr); /* cr might have been modified at this point */ + if (retcode < 0) { + ldpp_dout(dpp, 0) << "ERROR: RGWLastCallerWinsCR() failed: retcode=" << retcode << dendl; + return set_cr_error(retcode); + } } return set_cr_done(); }