Merge pull request #8190 from yehudasa/wip-rgw-sync-fixes-4

Wip rgw sync fixes 4

Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Yehuda Sadeh 2016-03-23 15:23:16 -07:00
commit f2f26465d7
8 changed files with 250 additions and 53 deletions

View File

@ -331,6 +331,7 @@ enum {
OPT_SYNC_ERROR_LIST,
OPT_BILOG_LIST,
OPT_BILOG_TRIM,
OPT_BILOG_STATUS,
OPT_DATA_SYNC_STATUS,
OPT_DATA_SYNC_INIT,
OPT_DATA_SYNC_RUN,
@ -681,6 +682,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
return OPT_BILOG_LIST;
if (strcmp(cmd, "trim") == 0)
return OPT_BILOG_TRIM;
if (strcmp(cmd, "status") == 0)
return OPT_BILOG_STATUS;
} else if (strcmp(prev_cmd, "data") == 0) {
if (strcmp(cmd, "sync") == 0) {
*need_more = true;
@ -5224,6 +5227,29 @@ next:
}
}
if (opt_cmd == OPT_BILOG_STATUS) {
if (bucket_name.empty()) {
cerr << "ERROR: bucket not specified" << std::endl;
return -EINVAL;
}
RGWBucketInfo bucket_info;
int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
if (ret < 0) {
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
map<int, string> markers;
ret = store->get_bi_log_status(bucket, shard_id, markers);
if (ret < 0) {
cerr << "ERROR: trim_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
return -ret;
}
formatter->open_object_section("entries");
encode_json("markers", markers, formatter);
formatter->close_section();
formatter->flush(cout);
}
if (opt_cmd == OPT_DATALOG_LIST) {
formatter->open_array_section("entries");
bool truncated;

View File

@ -460,6 +460,7 @@ int RGWAsyncFetchRemoteObj::_send_request()
user_id,
client_id,
op_id,
false, /* don't record op state in ops log */
NULL, /* req_info */
source_zone,
dest_obj,

View File

@ -551,10 +551,10 @@ static string full_data_sync_index_shard_oid(const string& source_zone, int shar
struct bucket_instance_meta_info {
string key;
obj_version ver;
time_t mtime;
utime_t mtime;
RGWBucketInstanceMetadataObject data;
bucket_instance_meta_info() : mtime(0) {}
bucket_instance_meta_info() {}
void decode_json(JSONObj *obj) {
JSONDecoder::decode_json("key", key, obj);
@ -2202,6 +2202,8 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
string instance;
string ns;
string cur_id;
public:
@ -2270,32 +2272,44 @@ int RGWBucketShardIncrementalSyncCR::operate()
entries_iter = list_result.begin();
for (; entries_iter != list_result.end(); ++entries_iter) {
entry = &(*entries_iter);
inc_marker.position = entry->id;
{
ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
if (p < 0) {
cur_id = entry->id;
} else {
cur_id = entry->id.substr(p + 1);
}
}
inc_marker.position = cur_id;
if (!rgw_obj::parse_raw_oid(entries_iter->object, &name, &instance, &ns)) {
set_status() << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry";
ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl;
marker_tracker->try_update_high_marker(cur_id, 0, entries_iter->timestamp);
continue;
}
ldout(sync_env->cct, 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
if (!ns.empty()) {
set_status() << "skipping entry in namespace: " << entries_iter->object;
ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entries_iter->object << dendl;
set_status() << "skipping entry in namespace: " << entry->object;
ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
key = rgw_obj_key(name, entries_iter->instance);
set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op;
key = rgw_obj_key(name, entry->instance);
set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
if (entry->op == CLS_RGW_OP_CANCEL) {
set_status() << "canceled operation, skipping";
ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
if (entry->state != CLS_RGW_STATE_COMPLETE) {
set_status() << "non-complete operation, skipping";
ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl;
marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
ldout(sync_env->cct, 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
@ -2309,26 +2323,26 @@ int RGWBucketShardIncrementalSyncCR::operate()
yield wait_for_child();
}
if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) {
if (!marker_tracker->index_key_to_marker(key, entry->op, cur_id)) {
set_status() << "can't do op, sync already in progress for object";
ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
marker_tracker->try_update_high_marker(entry->id, 0, entries_iter->timestamp);
ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
// yield {
set_status() << "start object sync";
if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) {
ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
if (!marker_tracker->start(cur_id, 0, entry->timestamp)) {
ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
} else {
uint64_t versioned_epoch = 0;
bucket_entry_owner owner(entry->owner, entry->owner_display_name);
if (entry->ver.pool < 0) {
versioned_epoch = entry->ver.epoch;
}
ldout(sync_env->cct, 0) << __FILE__ << ":" << __LINE__ << " entry->timestamp=" << entry->timestamp << dendl;
ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(sync_env, bucket_info, shard_id,
key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op,
entry->state, entry->id, marker_tracker), false);
entry->state, cur_id, marker_tracker), false);
}
// }
while ((int)num_spawned() > spawn_window) {
@ -2345,9 +2359,20 @@ ldout(sync_env->cct, 0) << __FILE__ << ":" << __LINE__ << " entry->timestamp=" <
}
} while (!list_result.empty());
yield {
call(marker_tracker->flush());
}
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: marker_tracker->flush() returned retcode=" << retcode << dendl;
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
lease_cr->go_down();
/* wait for all operations to complete */
drain_all();
return set_cr_done();
}
return 0;

View File

@ -6338,6 +6338,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
const rgw_user& user_id,
const string& client_id,
const string& op_id,
bool record_op_state,
req_info *info,
const string& source_zone,
rgw_obj& dest_obj,
@ -6411,14 +6412,20 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
string obj_name = dest_obj.bucket.name + "/" + dest_obj.get_object();
RGWOpStateSingleOp opstate(this, client_id, op_id, obj_name);
RGWOpStateSingleOp *opstate = NULL;
ret = opstate.set_state(RGWOpState::OPSTATE_IN_PROGRESS);
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl;
return ret;
if (record_op_state) {
opstate = new RGWOpStateSingleOp(this, client_id, op_id, obj_name);
ret = opstate->set_state(RGWOpState::OPSTATE_IN_PROGRESS);
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl;
delete opstate;
return ret;
}
}
RGWRadosPutObj cb(&processor, &opstate, progress_cb, progress_data);
RGWRadosPutObj cb(&processor, opstate, progress_cb, progress_data);
string etag;
map<string, string> req_headers;
real_time set_mtime;
@ -6547,21 +6554,27 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
goto set_err_state;
}
ret = opstate.set_state(RGWOpState::OPSTATE_COMPLETE);
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl;
if (opstate) {
ret = opstate->set_state(RGWOpState::OPSTATE_COMPLETE);
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl;
}
delete opstate;
}
return 0;
set_err_state:
RGWOpState::OpState state = RGWOpState::OPSTATE_ERROR;
if (copy_if_newer && ret == -ERR_NOT_MODIFIED) {
state = RGWOpState::OPSTATE_COMPLETE;
ret = 0;
}
int r = opstate.set_state(state);
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to set opstate r=" << ret << dendl;
if (opstate) {
RGWOpState::OpState state = RGWOpState::OPSTATE_ERROR;
if (copy_if_newer && ret == -ERR_NOT_MODIFIED) {
state = RGWOpState::OPSTATE_COMPLETE;
ret = 0;
}
int r = opstate->set_state(state);
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to set opstate r=" << ret << dendl;
}
delete opstate;
}
return ret;
}
@ -6662,7 +6675,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
ldout(cct, 5) << "Copy object " << src_obj.bucket << ":" << src_obj.get_object() << " => " << dest_obj.bucket << ":" << dest_obj.get_object() << dendl;
if (remote_src || !source_zone.empty()) {
return fetch_remote_obj(obj_ctx, user_id, client_id, op_id, info, source_zone,
return fetch_remote_obj(obj_ctx, user_id, client_id, op_id, true, info, source_zone,
dest_obj, src_obj, dest_bucket_info, src_bucket_info, src_mtime, mtime, mod_ptr,
unmod_ptr, high_precision_time,
if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
@ -8574,7 +8587,20 @@ int RGWRados::Bucket::UpdateIndex::cancel()
ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
return ret;
}
return store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags);
ret = store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags);
/*
* need to update data log anyhow, so that whoever follows needs to update its internal markers
* for following the specific bucket shard log. Otherwise they end up staying behind, and users
* have no way to tell that they're all caught up
*/
int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
}
return ret;
}
int RGWRados::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl)
@ -10049,6 +10075,30 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, int shard_id, string *bucket_
return 0;
}
int RGWRados::get_bi_log_status(rgw_bucket& bucket, int shard_id,
map<int, string>& markers)
{
map<string, rgw_bucket_dir_header> headers;
map<int, string> bucket_instance_ids;
int r = cls_bucket_head(bucket, shard_id, headers, &bucket_instance_ids);
if (r < 0)
return r;
assert(headers.size() == bucket_instance_ids.size());
map<string, rgw_bucket_dir_header>::iterator iter = headers.begin();
map<int, string>::iterator viter = bucket_instance_ids.begin();
for(; iter != headers.end(); ++iter, ++viter) {
if (shard_id >= 0) {
markers[shard_id] = iter->second.max_marker;
} else {
markers[viter->first] = iter->second.max_marker;
}
}
return 0;
}
class RGWGetBucketStatsContext : public RGWGetDirHeader_CB {
RGWGetBucketStats_CB *cb;
uint32_t pendings;

View File

@ -2430,6 +2430,7 @@ public:
const rgw_user& user_id,
const string& client_id,
const string& op_id,
bool record_op_state,
req_info *info,
const string& source_zone,
rgw_obj& dest_obj,
@ -2749,6 +2750,7 @@ public:
int cls_bucket_head_async(rgw_bucket& bucket, int shard_id, RGWGetDirHeader_CB *ctx, int *num_aio);
int list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
int trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, string& end_marker);
int get_bi_log_status(rgw_bucket& bucket, int shard_id, map<int, string>& max_marker);
int bi_get_instance(rgw_obj& obj, rgw_bucket_dir_entry *dirent);
int bi_get(rgw_bucket& bucket, rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry);

View File

@ -712,7 +712,7 @@ public:
int RGWReadSyncStatusCoroutine::handle_data(rgw_meta_sync_info& data)
{
if (retcode == -ENOENT) {
return retcode;
return 0;
}
RGWRados *store = sync_env->store;
@ -1878,6 +1878,10 @@ int RGWRemoteMetaLog::run_sync()
}
do {
if (going_down.read()) {
ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
return 0;
}
r = run(new RGWReadSyncStatusCoroutine(&sync_env, obj_ctx, &sync_status));
if (r < 0 && r != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;

View File

@ -294,6 +294,7 @@ class RGWSyncShardMarkerTrack {
typename std::map<T, marker_entry> pending;
T high_marker;
T last_stored_marker;
marker_entry high_entry;
int window_size;
@ -355,14 +356,19 @@ public:
updates_since_flush++;
if (is_first && (updates_since_flush >= window_size || pending.empty())) {
return update_marker(high_marker, high_entry);
return flush();
}
return NULL;
}
RGWCoroutine *update_marker(const T& new_marker, marker_entry& entry) {
RGWCoroutine *flush() {
if (last_stored_marker == high_marker) {
return NULL;
}
updates_since_flush = 0;
return store_marker(new_marker, entry.pos, entry.timestamp);
last_stored_marker = high_marker;
return store_marker(high_marker, high_entry.pos, high_entry.timestamp);
}
/*

View File

@ -304,6 +304,30 @@ class RGWRealm:
return (num_shards, markers)
def bucket_sync_status(self, target_zone, source_zone, bucket_name):
if target_zone.zone_name == source_zone.zone_name:
return None
while True:
(bucket_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm +
' bucket sync status --source-zone=' + source_zone.zone_name +
' --bucket=' + bucket_name, check_retcode = False)
if retcode == 0:
break
assert(retcode == 2) # ENOENT
log(20, 'current bucket sync status=', bucket_sync_status_json)
sync_status = json.loads(bucket_sync_status_json)
markers={}
for entry in sync_status:
val = entry['val']
pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
markers[entry['key']] = pos
return markers
def data_source_log_status(self, source_zone):
source_cluster = source_zone.cluster
(datalog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' datalog status')
@ -319,6 +343,27 @@ class RGWRealm:
return markers
def bucket_source_log_status(self, source_zone, bucket_name):
source_cluster = source_zone.cluster
(bilog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' bilog status --bucket=' + bucket_name)
bilog_status = json.loads(bilog_status_json)
m={}
markers={}
try:
m = bilog_status['markers']
except:
pass
for s in m:
key = s['key']
val = s['val']
markers[key] = val
log(20, 'bilog markers for zone=', source_zone.zone_name, ' bucket=', bucket_name, ' markers=', markers)
return markers
def compare_data_status(self, target_zone, source_zone, log_status, sync_status):
if len(log_status) != len(sync_status):
log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
@ -337,6 +382,24 @@ class RGWRealm:
return True
def compare_bucket_status(self, target_zone, source_zone, bucket_name, log_status, sync_status):
if len(log_status) != len(sync_status):
log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
return False
msg = ''
for i, l, s in zip(log_status, log_status.itervalues(), sync_status.itervalues()):
if l > s:
if len(s) != 0:
msg += ', '
msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
if len(msg) > 0:
log(1, 'bucket ', bucket_name, ' zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg)
return False
return True
def zone_data_checkpoint(self, target_zone, source_zone):
if target_zone.zone_name == source_zone.zone_name:
return
@ -357,6 +420,26 @@ class RGWRealm:
log(10, 'finished data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name)
def zone_bucket_checkpoint(self, target_zone, source_zone, bucket_name):
if target_zone.zone_name == source_zone.zone_name:
return
log(10, 'starting bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
while True:
log_status = self.bucket_source_log_status(source_zone, bucket_name)
sync_status = self.bucket_sync_status(target_zone, source_zone, bucket_name)
log(20, 'log_status=', log_status)
log(20, 'sync_status=', sync_status)
if self.compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
break
time.sleep(5)
log(10, 'finished bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
def create_user(self, user, wait_meta = True):
log(5, 'creating user uid=', user.uid)
@ -595,14 +678,14 @@ def test_object_sync():
realm.meta_checkpoint()
for source_zone, bucket_name in zone_bucket.iteritems():
for source_zone, bucket in zone_bucket.iteritems():
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
realm.zone_data_checkpoint(target_zone, source_zone)
realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket_name)
check_bucket_eq(source_zone, target_zone, bucket)
def test_object_delete():
buckets, zone_bucket = create_bucket_per_zone()
@ -615,33 +698,33 @@ def test_object_delete():
content = 'asdasd'
# don't wait for meta sync just yet
for zone, bucket_name in zone_bucket.iteritems():
k = new_key(zone, bucket_name, objname)
for zone, bucket in zone_bucket.iteritems():
k = new_key(zone, bucket, objname)
k.set_contents_from_string(content)
realm.meta_checkpoint()
# check object exists
for source_zone, bucket_name in zone_bucket.iteritems():
for source_zone, bucket in zone_bucket.iteritems():
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
realm.zone_data_checkpoint(target_zone, source_zone)
realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket_name)
check_bucket_eq(source_zone, target_zone, bucket)
# check object removal
for source_zone, bucket_name in zone_bucket.iteritems():
k = get_key(source_zone, bucket_name, objname)
for source_zone, bucket in zone_bucket.iteritems():
k = get_key(source_zone, bucket, objname)
k.delete()
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
realm.zone_data_checkpoint(target_zone, source_zone)
realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket_name)
check_bucket_eq(source_zone, target_zone, bucket)
def test_multi_period_incremental_sync():
if len(realm.clusters) < 3:
@ -692,14 +775,14 @@ def test_multi_period_incremental_sync():
realm.meta_checkpoint()
# verify that we end up with the same objects
for source_zone, bucket_name in zone_bucket.iteritems():
for source_zone, bucket in zone_bucket.iteritems():
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
realm.zone_data_checkpoint(target_zone, source_zone)
realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket_name)
check_bucket_eq(source_zone, target_zone, bucket)
def init(parse_args):