Merge pull request #32534 from cbodley/wip-43512

rgw multisite: enforce spawn window for incremental data sync

Reviewed-by: Daniel Gryniewicz <dang@redhat.com>
Reviewed-by: Eric J. Ivancich <ivancich@redhat.com>
Reviewed-by: Shilpa Jagannath <smanjara@redhat.com>
This commit is contained in:
Casey Bodley 2020-01-13 14:44:47 -05:00 committed by GitHub
commit 01d7e32d07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1156,8 +1156,6 @@ class RGWDataSyncShardCR : public RGWCoroutine {
bool *reset_backoff; bool *reset_backoff;
set<string> spawned_keys;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr; boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack; boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
string status_oid; string status_oid;
@ -1426,9 +1424,7 @@ public:
} }
omapkeys.reset(); omapkeys.reset();
#define INCREMENTAL_MAX_ENTRIES 100
tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker)); tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
spawned_keys.clear();
yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker, yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
&next_marker, &log_entries, &truncated)); &next_marker, &log_entries, &truncated));
if (retcode < 0 && retcode != -ENOENT) { if (retcode < 0 && retcode != -ENOENT) {
@ -1452,30 +1448,19 @@ public:
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else { } else {
/* spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
* don't spawn the same key more than once. We can do that as long as we don't yield
*/
if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
spawned_keys.insert(log_iter->entry.key);
spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
if (retcode < 0) {
stop_spawned_services();
drain_all();
return set_cr_error(retcode);
}
}
} }
} while ((int)num_spawned() > spawn_window) {
while ((int)num_spawned() > spawn_window) { set_status() << "num_spawned() > spawn_window";
set_status() << "num_spawned() > spawn_window"; yield wait_for_child();
yield wait_for_child(); int ret;
int ret; while (collect(&ret, lease_stack.get())) {
while (collect(&ret, lease_stack.get())) { if (ret < 0) {
if (ret < 0) { tn->log(10, "a sync operation returned error");
tn->log(10, "a sync operation returned error"); /* we have reported this error */
/* we have reported this error */ }
/* not waiting for child here */
} }
/* not waiting for child here */
} }
} }