From cbc9d427068dd5baf17c918127832adb08db64be Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 21 Mar 2016 16:52:14 -0700 Subject: [PATCH] test/rgw: use bucket for data checkpoint instead of data data checkpoint is not accurate, as data log might not show repeating changes. Signed-off-by: Yehuda Sadeh --- src/test/rgw/test_multi.py | 113 ++++++++++++++++++++++++++++++++----- 1 file changed, 98 insertions(+), 15 deletions(-) diff --git a/src/test/rgw/test_multi.py b/src/test/rgw/test_multi.py index dce6c0ebf68..92460d74f05 100644 --- a/src/test/rgw/test_multi.py +++ b/src/test/rgw/test_multi.py @@ -304,6 +304,30 @@ class RGWRealm: return (num_shards, markers) + def bucket_sync_status(self, target_zone, source_zone, bucket_name): + if target_zone.zone_name == source_zone.zone_name: + return None + + while True: + (bucket_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm + + ' bucket sync status --source-zone=' + source_zone.zone_name + + ' --bucket=' + bucket_name, check_retcode = False) + if retcode == 0: + break + + assert(retcode == 2) # ENOENT + + log(20, 'current bucket sync status=', bucket_sync_status_json) + sync_status = json.loads(bucket_sync_status_json) + + markers={} + for entry in sync_status: + val = entry['val'] + pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3 + markers[entry['key']] = pos + + return markers + def data_source_log_status(self, source_zone): source_cluster = source_zone.cluster (datalog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' datalog status') @@ -319,6 +343,27 @@ class RGWRealm: return markers + def bucket_source_log_status(self, source_zone, bucket_name): + source_cluster = source_zone.cluster + (bilog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' bilog status --bucket=' + bucket_name) + bilog_status = json.loads(bilog_status_json) + + m={} + markers={} + try: + m = bilog_status['markers'] + except: + pass + + for s in m: + key = s['key'] + val = s['val'] + markers[key] = val + + log(20, 'bilog markers for zone=', source_zone.zone_name, ' bucket=', bucket_name, ' markers=', markers) + + return markers + def compare_data_status(self, target_zone, source_zone, log_status, sync_status): if len(log_status) != len(sync_status): log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status)) @@ -337,6 +382,24 @@ class RGWRealm: return True + def compare_bucket_status(self, target_zone, source_zone, bucket_name, log_status, sync_status): + if len(log_status) != len(sync_status): + log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status)) + return False + + msg = '' + for i, l, s in zip(log_status, log_status.itervalues(), sync_status.itervalues()): + if l > s: + if len(s) != 0: + msg += ', ' + msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s + + if len(msg) > 0: + log(1, 'bucket ', bucket_name, ' zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg) + return False + + return True + def zone_data_checkpoint(self, target_zone, source_zone): if target_zone.zone_name == source_zone.zone_name: return @@ -357,6 +420,26 @@ class RGWRealm: log(10, 'finished data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name) + def zone_bucket_checkpoint(self, target_zone, source_zone, bucket_name): + if target_zone.zone_name == source_zone.zone_name: + return + + log(10, 'starting bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name) + + while True: + log_status = self.bucket_source_log_status(source_zone, bucket_name) + sync_status = self.bucket_sync_status(target_zone, source_zone, bucket_name) + + log(20, 'log_status=', log_status) + log(20, 'sync_status=', sync_status) + + if self.compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status): + break + + time.sleep(5) + + log(10, 'finished bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name) + def create_user(self, user, wait_meta = True): log(5, 'creating user uid=', user.uid) @@ -595,14 +678,14 @@ def test_object_sync(): realm.meta_checkpoint() - for source_zone, bucket_name in zone_bucket.iteritems(): + for source_zone, bucket in zone_bucket.iteritems(): for target_zone in all_zones: if source_zone.zone_name == target_zone.zone_name: continue - realm.zone_data_checkpoint(target_zone, source_zone) + realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - check_bucket_eq(source_zone, target_zone, bucket_name) + check_bucket_eq(source_zone, target_zone, bucket) def test_object_delete(): buckets, zone_bucket = create_bucket_per_zone() @@ -615,33 +698,33 @@ def test_object_delete(): content = 'asdasd' # don't wait for meta sync just yet - for zone, bucket_name in zone_bucket.iteritems(): - k = new_key(zone, bucket_name, objname) + for zone, bucket in zone_bucket.iteritems(): + k = new_key(zone, bucket, objname) k.set_contents_from_string(content) realm.meta_checkpoint() # check object exists - for source_zone, bucket_name in zone_bucket.iteritems(): + for source_zone, bucket in zone_bucket.iteritems(): for target_zone in all_zones: if source_zone.zone_name == target_zone.zone_name: continue - realm.zone_data_checkpoint(target_zone, source_zone) + realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - check_bucket_eq(source_zone, target_zone, bucket_name) + check_bucket_eq(source_zone, target_zone, bucket) # check object removal - for source_zone, bucket_name in zone_bucket.iteritems(): - k = get_key(source_zone, bucket_name, objname) + for source_zone, bucket in zone_bucket.iteritems(): + k = get_key(source_zone, bucket, objname) k.delete() for target_zone in all_zones: if source_zone.zone_name == target_zone.zone_name: continue - realm.zone_data_checkpoint(target_zone, source_zone) + realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - check_bucket_eq(source_zone, target_zone, bucket_name) + check_bucket_eq(source_zone, target_zone, bucket) def test_multi_period_incremental_sync(): if len(realm.clusters) < 3: @@ -692,14 +775,14 @@ def test_multi_period_incremental_sync(): realm.meta_checkpoint() # verify that we end up with the same objects - for source_zone, bucket_name in zone_bucket.iteritems(): + for source_zone, bucket in zone_bucket.iteritems(): for target_zone in all_zones: if source_zone.zone_name == target_zone.zone_name: continue - realm.zone_data_checkpoint(target_zone, source_zone) + realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - check_bucket_eq(source_zone, target_zone, bucket_name) + check_bucket_eq(source_zone, target_zone, bucket) def init(parse_args):