Merge pull request #23596 from cbodley/wip-rgw-no-datalog-marker

rgw multisite: incremental data sync uses truncated flag to detect end of listing

Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Casey Bodley 2018-08-28 11:30:31 -04:00 committed by GitHub
commit c79aa2468c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1149,12 +1149,6 @@ public:
#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
#define DATA_SYNC_MAX_ERR_ENTRIES 10
enum RemoteDatalogStatus {
RemoteNotTrimmed = 0,
RemoteTrimmed = 1,
RemoteMightTrimmed = 2
};
class RGWDataSyncShardCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
@ -1175,10 +1169,6 @@ class RGWDataSyncShardCR : public RGWCoroutine {
list<rgw_data_change_log_entry>::iterator log_iter;
bool truncated;
RGWDataChangesLogInfo shard_info;
string datalog_marker;
RemoteDatalogStatus remote_trimmed;
Mutex inc_lock;
Cond inc_cond;
@ -1230,7 +1220,7 @@ public:
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker),
marker_tracker(NULL), truncated(false), remote_trimmed(RemoteNotTrimmed), inc_lock("RGWDataSyncShardCR::inc_lock"),
marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
@ -1445,81 +1435,65 @@ public:
}
yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
#define INCREMENTAL_MAX_ENTRIES 100
tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
spawned_keys.clear();
yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to fetch remote data log info: ret=" << retcode));
tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
stop_spawned_services();
drain_all();
return set_cr_error(retcode);
}
datalog_marker = shard_info.marker;
remote_trimmed = RemoteNotTrimmed;
#define INCREMENTAL_MAX_ENTRIES 100
tn->log(20, SSTR("shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker));
if (datalog_marker > sync_marker.marker) {
spawned_keys.clear();
if (sync_marker.marker.empty())
remote_trimmed = RemoteMightTrimmed; //remote data log shard might be trimmed;
yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
stop_spawned_services();
drain_all();
return set_cr_error(retcode);
if (log_entries.size() > 0) {
tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
}
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
continue;
}
if ((remote_trimmed == RemoteMightTrimmed) && sync_marker.marker.empty() && log_entries.empty())
remote_trimmed = RemoteTrimmed;
else
remote_trimmed = RemoteNotTrimmed;
if (log_entries.size() > 0) {
tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
}
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
continue;
}
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {
/*
* don't spawn the same key more than once. We can do that as long as we don't yield
*/
if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
spawned_keys.insert(log_iter->entry.key);
spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
if (retcode < 0) {
stop_spawned_services();
drain_all();
return set_cr_error(retcode);
}
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {
/*
* don't spawn the same key more than once. We can do that as long as we don't yield
*/
if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
spawned_keys.insert(log_iter->entry.key);
spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
if (retcode < 0) {
stop_spawned_services();
drain_all();
return set_cr_error(retcode);
}
}
}
while ((int)num_spawned() > spawn_window) {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
int ret;
while (collect(&ret, lease_stack.get())) {
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we have reported this error */
}
/* not waiting for child here */
}
}
}
tn->log(20, SSTR("shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker));
if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) {
}
while ((int)num_spawned() > spawn_window) {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
int ret;
while (collect(&ret, lease_stack.get())) {
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we have reported this error */
}
/* not waiting for child here */
}
}
tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated));
if (!truncated) {
// we reached the end, wait a while before checking for more
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
#define INCREMENTAL_INTERVAL 20
yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
}
yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
}
} while (true);
}
return 0;