test/rgw: use bucket for data checkpoint instead of data

data checkpoint is not accurate, as data log might not show repeating changes.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Yehuda Sadeh 2016-03-21 16:52:14 -07:00
parent d615eafbe2
commit cbc9d42706

View File

@ -304,6 +304,30 @@ class RGWRealm:
return (num_shards, markers)
def bucket_sync_status(self, target_zone, source_zone, bucket_name):
if target_zone.zone_name == source_zone.zone_name:
return None
while True:
(bucket_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm +
' bucket sync status --source-zone=' + source_zone.zone_name +
' --bucket=' + bucket_name, check_retcode = False)
if retcode == 0:
break
assert(retcode == 2) # ENOENT
log(20, 'current bucket sync status=', bucket_sync_status_json)
sync_status = json.loads(bucket_sync_status_json)
markers={}
for entry in sync_status:
val = entry['val']
pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
markers[entry['key']] = pos
return markers
def data_source_log_status(self, source_zone):
source_cluster = source_zone.cluster
(datalog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' datalog status')
@ -319,6 +343,27 @@ class RGWRealm:
return markers
def bucket_source_log_status(self, source_zone, bucket_name):
source_cluster = source_zone.cluster
(bilog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm + ' bilog status --bucket=' + bucket_name)
bilog_status = json.loads(bilog_status_json)
m={}
markers={}
try:
m = bilog_status['markers']
except:
pass
for s in m:
key = s['key']
val = s['val']
markers[key] = val
log(20, 'bilog markers for zone=', source_zone.zone_name, ' bucket=', bucket_name, ' markers=', markers)
return markers
def compare_data_status(self, target_zone, source_zone, log_status, sync_status):
if len(log_status) != len(sync_status):
log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
@ -337,6 +382,24 @@ class RGWRealm:
return True
def compare_bucket_status(self, target_zone, source_zone, bucket_name, log_status, sync_status):
if len(log_status) != len(sync_status):
log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status))
return False
msg = ''
for i, l, s in zip(log_status, log_status.itervalues(), sync_status.itervalues()):
if l > s:
if len(s) != 0:
msg += ', '
msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
if len(msg) > 0:
log(1, 'bucket ', bucket_name, ' zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg)
return False
return True
def zone_data_checkpoint(self, target_zone, source_zone):
if target_zone.zone_name == source_zone.zone_name:
return
@ -357,6 +420,26 @@ class RGWRealm:
log(10, 'finished data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name)
def zone_bucket_checkpoint(self, target_zone, source_zone, bucket_name):
if target_zone.zone_name == source_zone.zone_name:
return
log(10, 'starting bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
while True:
log_status = self.bucket_source_log_status(source_zone, bucket_name)
sync_status = self.bucket_sync_status(target_zone, source_zone, bucket_name)
log(20, 'log_status=', log_status)
log(20, 'sync_status=', sync_status)
if self.compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
break
time.sleep(5)
log(10, 'finished bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name)
def create_user(self, user, wait_meta = True):
log(5, 'creating user uid=', user.uid)
@ -595,14 +678,14 @@ def test_object_sync():
realm.meta_checkpoint()
for source_zone, bucket_name in zone_bucket.iteritems():
for source_zone, bucket in zone_bucket.iteritems():
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
realm.zone_data_checkpoint(target_zone, source_zone)
realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket_name)
check_bucket_eq(source_zone, target_zone, bucket)
def test_object_delete():
buckets, zone_bucket = create_bucket_per_zone()
@ -615,33 +698,33 @@ def test_object_delete():
content = 'asdasd'
# don't wait for meta sync just yet
for zone, bucket_name in zone_bucket.iteritems():
k = new_key(zone, bucket_name, objname)
for zone, bucket in zone_bucket.iteritems():
k = new_key(zone, bucket, objname)
k.set_contents_from_string(content)
realm.meta_checkpoint()
# check object exists
for source_zone, bucket_name in zone_bucket.iteritems():
for source_zone, bucket in zone_bucket.iteritems():
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
realm.zone_data_checkpoint(target_zone, source_zone)
realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket_name)
check_bucket_eq(source_zone, target_zone, bucket)
# check object removal
for source_zone, bucket_name in zone_bucket.iteritems():
k = get_key(source_zone, bucket_name, objname)
for source_zone, bucket in zone_bucket.iteritems():
k = get_key(source_zone, bucket, objname)
k.delete()
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
realm.zone_data_checkpoint(target_zone, source_zone)
realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket_name)
check_bucket_eq(source_zone, target_zone, bucket)
def test_multi_period_incremental_sync():
if len(realm.clusters) < 3:
@ -692,14 +775,14 @@ def test_multi_period_incremental_sync():
realm.meta_checkpoint()
# verify that we end up with the same objects
for source_zone, bucket_name in zone_bucket.iteritems():
for source_zone, bucket in zone_bucket.iteritems():
for target_zone in all_zones:
if source_zone.zone_name == target_zone.zone_name:
continue
realm.zone_data_checkpoint(target_zone, source_zone)
realm.zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
check_bucket_eq(source_zone, target_zone, bucket_name)
check_bucket_eq(source_zone, target_zone, bucket)
def init(parse_args):