diff --git a/src/test/rgw/test_multi.py b/src/test/rgw/test_multi.py index 18a44a07aa5..9f289861ebf 100644 --- a/src/test/rgw/test_multi.py +++ b/src/test/rgw/test_multi.py @@ -1,36 +1,20 @@ import subprocess import os -import json import random import string import argparse import sys -import time -try: - from itertools import izip_longest as zip_longest -except ImportError: - from itertools import zip_longest +import logging try: import configparser except ImportError: import ConfigParser as configparser -import boto -import boto.s3.connection +import nose.core -import inspect - -from nose.tools import eq_ as eq -from nose.plugins.attrib import attr - -# test-suite for rgw multisite, the last test destroys a zone, -# so in order to use this as a dev cluster, do -# $nosetests -a '!destructive' /path/to/test_multi.py - -log_level = 20 - -num_buckets = 0 -run_prefix=''.join(random.SystemRandom().choice(string.ascii_lowercase) for _ in range(6)) +from rgw_multi import multisite +# make tests from rgw_multi.tests available to nose +from rgw_multi.tests import * mstart_path = os.getenv('MSTART_PATH') if mstart_path is None: @@ -38,959 +22,122 @@ if mstart_path is None: test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/' -def lineno(): - return inspect.currentframe().f_back.f_lineno +# configure logging for the tests module +log = logging.getLogger('rgw_multi.tests') -def log(level, *params): - if level > log_level: - return - - s = '>>> ' - for p in params: - if p: - s += str(p) - - print(s) - sys.stdout.flush() - -def build_cmd(*params): - s = '' - for p in params: - if len(s) != 0: - s += ' ' - s += p - - return s - -def mpath(bin, *params): - s = mstart_path + bin - for p in params: - s += ' ' + str(p) - - return s - -def tpath(bin, *params): - s = test_path + bin - for p in params: - s += ' ' + str(p) - - return s - -def bash(cmd, check_retcode = True): - log(5, 'running cmd: ', cmd) - process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) +def bash(cmd, **kwargs): + log.debug('running cmd: %s', ' '.join(cmd)) + check_retcode = kwargs.pop('check_retcode', True) + kwargs['stdout'] = subprocess.PIPE + process = subprocess.Popen(cmd, **kwargs) s = process.communicate()[0] - log(20, 'command returned status=', process.returncode, ' stdout=', s.decode('utf-8')) + log.debug('command returned status=%d stdout=%s', process.returncode, s.decode('utf-8')) if check_retcode: assert(process.returncode == 0) return (s, process.returncode) -def mstart(cluster_id, is_new): - cmd = mpath('mstart.sh', cluster_id) - if is_new: - cmd += ' -n' - cmd += ' --mds_num 0' - bash(cmd) - -def mstop(cluster_id, entity = None): - cmd = mpath('mstop.sh', cluster_id) - if entity is not None: - cmd += ' ' + entity - bash(cmd) - -def mrgw(cluster_id, port, extra_cmd = None): - cmd = mpath('mrgw.sh', cluster_id, port) - if extra_cmd is not None: - cmd += ' ' + extra_cmd - bash(cmd) - -def init_multi_site(num_clusters): - bash(tpath('test-rgw-multisite.sh', num_clusters)) - - -class RGWRealmCredentials: - def __init__(self, access_key, secret): - self.access_key = access_key - self.secret = secret - -class RGWCluster: - def __init__(self, zg_num, cluster_num, cluster_id, port, num_gateways): - self.zg_num = zg_num - self.cluster_num = cluster_num +class Cluster(multisite.Cluster): + """ cluster implementation based on mstart/mrun scripts """ + def __init__(self, cluster_id): + super(Cluster, self).__init__() self.cluster_id = cluster_id - self.port = port - self.num_gateways = num_gateways self.needs_reset = True + def admin(self, args = [], **kwargs): + """ radosgw-admin command """ + cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', self.cluster_id] + args + if kwargs.pop('read_only', False): + cmd += ['--rgw-cache-enabled', 'false'] + return bash(cmd, **kwargs) + def start(self): - mstart(self.cluster_id, self.needs_reset) + cmd = [mstart_path + 'mstart.sh', self.cluster_id] + if self.needs_reset: + cmd += ['-n', '--mds_num', '0'] + bash(cmd) self.needs_reset = False def stop(self): - mstop(self.cluster_id) - - def start_rgw(self): - for i in range(self.num_gateways): - mrgw(self.cluster_id, self.port + i, '--debug-rgw=20 --debug-ms=1') - - def stop_rgw(self): - mstop(self.cluster_id, 'radosgw') - - def rgw_admin(self, cmd, check_retcode = True): - (s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.cluster_id, cmd), check_retcode) - return (s, retcode) - - def rgw_admin_ro(self, cmd, check_retcode = True): - (s, retcode) = bash(tpath('test-rgw-call.sh', 'call_rgw_admin', self.cluster_id, '--rgw-cache-enabled=false ' + cmd), check_retcode) - return (s, retcode) - -class RGWZone: - def __init__(self, realm_name, cluster, zg_name, zone_name): - self.realm_name = realm_name - self.cluster = cluster - self.zg_name = zg_name - self.zone_name = zone_name - self.connection = None - - def get_connection(self, user): - if self.connection is None: - self.connection = boto.connect_s3(aws_access_key_id = user.access_key, - aws_secret_access_key = user.secret, - host = 'localhost', - port = self.cluster.port, - is_secure = False, - calling_format = boto.s3.connection.OrdinaryCallingFormat()) - return self.connection - -class RGWZonegroup: - def __init__(self, realm_name, credentials, clusters): - self.realm_name = realm_name - self.credentials = credentials - self.clusters = clusters - self.zones = {} - - def init_zone(self, cluster, zg_name, zone_name, first_zone_port, master_zg_first_zone_port): - self.is_master_zg = (first_zone_port == master_zg_first_zone_port) - is_master = (first_zone_port == cluster.port) - endpoints = ",".join(map(lambda x: "http://localhost:" + str(cluster.port + x), range(cluster.num_gateways))) - if is_master: - if self.is_master_zg: - bash(tpath('test-rgw-call.sh', 'init_first_zone', cluster.cluster_id, - self.realm_name, zg_name, zone_name, endpoints, - self.credentials.access_key, self.credentials.secret)) - else: - bash(tpath('test-rgw-call.sh', 'init_first_zone_in_slave_zg', cluster.cluster_id, - self.realm_name, zg_name, zone_name, master_zg_first_zone_port, endpoints, - self.credentials.access_key, self.credentials.secret)) - else: - bash(tpath('test-rgw-call.sh', 'init_zone_in_existing_zg', cluster.cluster_id, - self.realm_name, zg_name, zone_name, master_zg_first_zone_port, endpoints, - self.credentials.access_key, self.credentials.secret)) - - self.add_zone(cluster, zg_name, zone_name, is_master) - cluster.start_rgw() - - def add_zone(self, cluster, zg_name, zone_name, is_master): - zone = RGWZone(self.realm_name, cluster, zg_name, zone_name) - self.zones[zone_name] = zone - - if is_master: - self.master_zone = zone - if self.is_master_zg: - realm.master_zone = zone - - def remove_zone(self, zone_name): - del self.zones[zone_name] - - def get_zone(self, zone_name): - return self.zones[zone_name] - - def get_zones(self): - for (k, zone) in self.zones.items(): - yield zone - - def meta_sync_status(self, zone): - if zone.zone_name == realm.master_zone.zone_name: - return None - - while True: - (meta_sync_status_json, retcode) = zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' metadata sync status', check_retcode = False) - if retcode == 0: - break - - assert(retcode == 2) # ENOENT - - meta_sync_status_json = meta_sync_status_json.decode('utf-8') - log(20, 'current meta sync status=', meta_sync_status_json) - sync_status = json.loads(meta_sync_status_json) - - global_sync_status=sync_status['sync_status']['info']['status'] - num_shards=sync_status['sync_status']['info']['num_shards'] - - sync_markers=sync_status['sync_status']['markers'] - log(20, 'sync_markers=', sync_markers) - assert(num_shards == len(sync_markers)) - - markers={} - for i in range(num_shards): - markers[i] = sync_markers[i]['val']['marker'] - - return (num_shards, markers) - - def meta_master_log_status(self, master_zone): - (mdlog_status_json, retcode) = master_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' mdlog status') - mdlog_status = json.loads(mdlog_status_json.decode('utf-8')) - - markers={} - i = 0 - for s in mdlog_status: - markers[i] = s['marker'] - i += 1 - - log(20, 'master meta markers=', markers) - - return markers - - def compare_meta_status(self, zone, log_status, sync_status): - if len(log_status) != len(sync_status): - log(10, 'len(log_status)=', len(log_status), ' len(sync_status=', len(sync_status)) - return False - - msg = '' - for i, l, s in zip(log_status, log_status.values(), sync_status.values()): - if l > s: - if len(s) != 0: - msg += ', ' - msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s - - if len(msg) > 0: - log(1, 'zone ', zone.zone_name, ' behind master: ', msg) - return False - - return True - - def zone_meta_checkpoint(self, zone): - if zone.zone_name == realm.master_zone.zone_name: - return - - log(10, 'starting meta checkpoint for zone=', zone.zone_name) - - while True: - log_status = self.meta_master_log_status(realm.master_zone) - (num_shards, sync_status) = self.meta_sync_status(zone) - - log(20, 'log_status=', log_status) - log(20, 'sync_status=', sync_status) - - if self.compare_meta_status(zone, log_status, sync_status): - break - - time.sleep(5) - - - log(10, 'finish meta checkpoint for zone=', zone.zone_name) - - def meta_checkpoint(self): - log(5, 'meta checkpoint') - for z in self.get_zones(): - self.zone_meta_checkpoint(z) - - def data_sync_status(self, target_zone, source_zone): - if target_zone.zone_name == source_zone.zone_name: - return None - - while True: - (data_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' data sync status --source-zone=' + source_zone.zone_name, check_retcode = False) - if retcode == 0: - break - - assert(retcode == 2) # ENOENT - - data_sync_status_json = data_sync_status_json.decode('utf-8') - log(20, 'current data sync status=', data_sync_status_json) - sync_status = json.loads(data_sync_status_json) - - global_sync_status=sync_status['sync_status']['info']['status'] - num_shards=sync_status['sync_status']['info']['num_shards'] - - sync_markers=sync_status['sync_status']['markers'] - log(20, 'sync_markers=', sync_markers) - assert(num_shards == len(sync_markers)) - - markers={} - for i in range(num_shards): - markers[i] = sync_markers[i]['val']['marker'] - - return (num_shards, markers) - - def bucket_sync_status(self, target_zone, source_zone, bucket_name): - if target_zone.zone_name == source_zone.zone_name: - return None - - cmd = '--rgw-realm=' + self.realm_name + ' bucket sync status --source-zone=' + source_zone.zone_name + ' --bucket=' + bucket_name - global user - if user.tenant is not None: - cmd += ' --tenant=' + user.tenant + ' --uid=' + user.uid - while True: - (bucket_sync_status_json, retcode) = target_zone.cluster.rgw_admin_ro(cmd, check_retcode = False) - if retcode == 0: - break - - assert(retcode == 2) # ENOENT - - bucket_sync_status_json = bucket_sync_status_json.decode('utf-8') - log(20, 'current bucket sync status=', bucket_sync_status_json) - sync_status = json.loads(bucket_sync_status_json) - - markers={} - for entry in sync_status: - val = entry['val'] - if val['status'] == 'incremental-sync': - pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3 - else: - pos = '' - markers[entry['key']] = pos - - return markers - - def data_source_log_status(self, source_zone): - source_cluster = source_zone.cluster - (datalog_status_json, retcode) = source_cluster.rgw_admin_ro('--rgw-realm=' + self.realm_name + ' datalog status') - datalog_status = json.loads(datalog_status_json.decode('utf-8')) - - markers={} - i = 0 - for s in datalog_status: - markers[i] = s['marker'] - i += 1 - - log(20, 'data markers for zone=', source_zone.zone_name, ' markers=', markers) - - return markers - - def bucket_source_log_status(self, source_zone, bucket_name): - cmd = '--rgw-realm=' + self.realm_name + ' bilog status --bucket=' + bucket_name - global user - if user.tenant is not None: - cmd += ' --tenant=' + user.tenant + ' --uid=' + user.uid - source_cluster = source_zone.cluster - (bilog_status_json, retcode) = source_cluster.rgw_admin_ro(cmd) - bilog_status = json.loads(bilog_status_json.decode('utf-8')) - - m={} - markers={} - try: - m = bilog_status['markers'] - except: - pass - - for s in m: - key = s['key'] - val = s['val'] - markers[key] = val - - log(20, 'bilog markers for zone=', source_zone.zone_name, ' bucket=', bucket_name, ' markers=', markers) - - return markers - - def compare_data_status(self, target_zone, source_zone, log_status, sync_status): - if len(log_status) != len(sync_status): - log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status)) - return False - - msg = '' - for i, l, s in zip(log_status, log_status.values(), sync_status.values()): - if l > s: - if len(s) != 0: - msg += ', ' - msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s - - if len(msg) > 0: - log(1, 'data of zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg) - return False - - return True - - def compare_bucket_status(self, target_zone, source_zone, bucket_name, log_status, sync_status): - if len(log_status) != len(sync_status): - log(10, 'len(log_status)=', len(log_status), ' len(sync_status)=', len(sync_status)) - return False - - msg = '' - for i, l, s in zip(log_status, log_status.values(), sync_status.values()): - if l > s: - if len(s) != 0: - msg += ', ' - msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s - - if len(msg) > 0: - log(1, 'bucket ', bucket_name, ' zone ', target_zone.zone_name, ' behind zone ', source_zone.zone_name, ': ', msg) - return False - - return True - - def zone_data_checkpoint(self, target_zone, source_zone): - if target_zone.zone_name == source_zone.zone_name: - return - - log(10, 'starting data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name) - - while True: - log_status = self.data_source_log_status(source_zone) - (num_shards, sync_status) = self.data_sync_status(target_zone, source_zone) - - log(20, 'log_status=', log_status) - log(20, 'sync_status=', sync_status) - - if self.compare_data_status(target_zone, source_zone, log_status, sync_status): - break - - time.sleep(5) - - log(10, 'finished data checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name) - - def zone_bucket_checkpoint(self, target_zone, source_zone, bucket_name): - if target_zone.zone_name == source_zone.zone_name: - return - - log(10, 'starting bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name) - - while True: - log_status = self.bucket_source_log_status(source_zone, bucket_name) - sync_status = self.bucket_sync_status(target_zone, source_zone, bucket_name) - - log(20, 'log_status=', log_status) - log(20, 'sync_status=', sync_status) - - if self.compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status): - break - - time.sleep(5) - - log(10, 'finished bucket checkpoint for target_zone=', target_zone.zone_name, ' source_zone=', source_zone.zone_name, ' bucket_name=', bucket_name) - - - def create_user(self, user, wait_meta = True): - log(5, 'creating user uid=', user.uid) - cmd = build_cmd('--uid', user.uid, '--display-name', user.display_name, - '--access-key', user.access_key, '--secret', user.secret) - if user.tenant is not None: - cmd += ' --tenant ' + user.tenant - self.master_zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' user create ' + cmd) - - if wait_meta: - self.meta_checkpoint() - - def set_master_zone(self, zone): - (zg_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' --rgw-zonegroup=' + zone.zg_name + ' --rgw-zone=' + zone.zone_name + ' zone modify --master=1') - (period_json, retcode) = zone.cluster.rgw_admin('--rgw-realm=' + self.realm_name + ' period update --commit') - self.master_zone = zone - if self.is_master_zg: - realm.master_zone = zone - -class RGWRealm: - def __init__(self, realm_name, credentials): - self.realm_name = realm_name - self.credentials = credentials - self.zonegroups = {} - self.zones = {} - - def get_zone(self, zone_name): - return self.zones[zone_name] - - def get_zones(self): - for (k, zone) in self.zones.iteritems(): - yield zone - - def add_zonegroup(self, zg_name, zonegroup, is_master_zg): - self.zonegroups[zg_name] = zonegroup - - for zone in zonegroup.get_zones(): - self.zones[zone.zone_name] = zone - - if is_master_zg: - self.master_zonegroup = zonegroup - - def get_zonegroup(self, zonegroup_name): - return self.zonegroups[zonegroup_name] - - def get_zonegroups(self): - for (k, zonegroup) in self.zonegroups.iteritems(): - yield zonegroup - - def meta_checkpoint(self): - log(5, 'meta checkpoint') - for zg in self.get_zonegroups(): - zg.meta_checkpoint() - -class RGWUser: - def __init__(self, uid, display_name, access_key, secret, tenant): - self.uid = uid - self.display_name = display_name - self.access_key = access_key - self.secret = secret - self.tenant = tenant + cmd = [mstart_path + 'mstop.sh', self.cluster_id] + bash(cmd) + +class Gateway(multisite.Gateway): + """ gateway implementation based on mrgw/mstop scripts """ + def __init__(self, client_id = None, *args, **kwargs): + super(Gateway, self).__init__(*args, **kwargs) + self.id = client_id + + def start(self, args = []): + """ start the gateway """ + assert(self.cluster) + cmd = [mstart_path + 'mrgw.sh', self.cluster.cluster_id, str(self.port)] + if self.id: + cmd += ['-i', self.id] + cmd += ['--debug-rgw=20', '--debug-ms=1'] + cmd += args + bash(cmd) + + def stop(self): + """ stop the gateway """ + assert(self.cluster) + cmd = [mstart_path + 'mstop.sh', self.cluster.cluster_id, 'radosgw', self.id] + bash(cmd) def gen_access_key(): - return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(16)) + return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(16)) def gen_secret(): - return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(32)) - -def gen_bucket_name(): - global num_buckets - - num_buckets += 1 - return run_prefix + '-' + str(num_buckets) - -class RGWMulti: - def __init__(self, zg_num, num_clusters, gateways_per_cluster, base_port, base_port_master_zg): - self.num_clusters = num_clusters - - self.zg_num = zg_num - self.zg_name = 'zg' + str(zg_num) - - self.base_port = base_port - self.base_port_master_zg = base_port_master_zg - - self.clusters = {} - for i in range(num_clusters): - self.clusters[i] = RGWCluster(self.zg_num, i + 1, 'zg' + str(zg_num) + '-c' + str(i + 1), self.base_port + i * gateways_per_cluster, gateways_per_cluster) - - def setup(self, bootstrap, tenant): - global realm - global master_zg - global realm_credentials - global user - - is_master_zg = (self.base_port == self.base_port_master_zg) - if is_master_zg: - realm_credentials = RGWRealmCredentials(gen_access_key(), gen_secret()) - realm = RGWRealm('earth', realm_credentials) - rgw_zg = RGWZonegroup('earth', realm_credentials, self.clusters) - - if bootstrap: - log(1, 'bootstrapping clusters') - self.clusters[0].start() - rgw_zg.init_zone(self.clusters[0], self.zg_name, self.zg_name + '-1', self.base_port, self.base_port_master_zg) - - for i in range(1, self.num_clusters): - self.clusters[i].start() - rgw_zg.init_zone(self.clusters[i], self.zg_name, self.zg_name + '-' + str(i + 1), self.base_port, self.base_port_master_zg) - else: - for i in range(0, self.num_clusters): - rgw_zg.add_zone(self.clusters[i], self.zg_name, self.zg_name + '-' + str(i + 1), (i == 0)) - - realm.add_zonegroup(self.zg_name, rgw_zg, is_master_zg) - rgw_zg.meta_checkpoint() - - if is_master_zg: - master_zg = rgw_zg - user = RGWUser('tester', '"Test User"', gen_access_key(), gen_secret(), tenant) - master_zg.create_user(user) - -def check_all_buckets_exist(zone, buckets): - conn = zone.get_connection(user) - - for b in buckets: - try: - conn.get_bucket(b) - except: - log(0, 'zone ', zone.zone_name, ' does not contain bucket ', b) - return False - - return True - -def check_all_buckets_dont_exist(zone, buckets): - conn = zone.get_connection(user) - - for b in buckets: - try: - conn.get_bucket(b) - except: - continue - - log(0, 'zone ', zone.zone_name, ' contains bucket ', b) - return False - - return True - -def create_bucket_per_zone_in_master_zg(): - buckets = [] - zone_bucket = {} - for zone in master_zg.get_zones(): - conn = zone.get_connection(user) - bucket_name = gen_bucket_name() - log(1, 'create bucket zone=', zone.zone_name, ' name=', bucket_name) - bucket = conn.create_bucket(bucket_name) - buckets.append(bucket_name) - zone_bucket[zone] = bucket - - return buckets, zone_bucket - -def create_bucket_per_zone_in_realm(): - buckets = [] - zone_bucket = {} - for zone in realm.get_zones(): - conn = zone.get_connection(user) - bucket_name = gen_bucket_name() - log(1, 'create bucket zone=', zone.zone_name, ' name=', bucket_name) - bucket = conn.create_bucket(bucket_name, None, zone.zg_name) - buckets.append(bucket_name) - zone_bucket[zone] = bucket - - return buckets, zone_bucket - -def test_bucket_create(): - buckets, _ = create_bucket_per_zone_in_master_zg() - master_zg.meta_checkpoint() - - for zone in master_zg.get_zones(): - assert check_all_buckets_exist(zone, buckets) - -def test_bucket_recreate(): - buckets, _ = create_bucket_per_zone_in_master_zg() - master_zg.meta_checkpoint() - - for zone in master_zg.get_zones(): - assert check_all_buckets_exist(zone, buckets) - - # recreate buckets on all zones, make sure they weren't removed - for zone in master_zg.get_zones(): - for bucket_name in buckets: - conn = zone.get_connection(user) - bucket = conn.create_bucket(bucket_name) - - for zone in master_zg.get_zones(): - assert check_all_buckets_exist(zone, buckets) - - master_zg.meta_checkpoint() - - for zone in master_zg.get_zones(): - assert check_all_buckets_exist(zone, buckets) - -def test_bucket_remove(): - buckets, zone_bucket = create_bucket_per_zone_in_master_zg() - master_zg.meta_checkpoint() - - for zone in master_zg.get_zones(): - assert check_all_buckets_exist(zone, buckets) - - for zone, bucket_name in zone_bucket.items(): - conn = zone.get_connection(user) - conn.delete_bucket(bucket_name) - - master_zg.meta_checkpoint() - - for zone in master_zg.get_zones(): - assert check_all_buckets_dont_exist(zone, buckets) - -def get_bucket(zone, bucket_name): - conn = zone.get_connection(user) - return conn.get_bucket(bucket_name) - -def get_key(zone, bucket_name, obj_name): - b = get_bucket(zone, bucket_name) - return b.get_key(obj_name) - -def new_key(zone, bucket_name, obj_name): - b = get_bucket(zone, bucket_name) - return b.new_key(obj_name) - -def check_object_eq(k1, k2, check_extra = True): - assert k1 - assert k2 - log(10, 'comparing key name=', k1.name) - eq(k1.name, k2.name) - eq(k1.get_contents_as_string(), k2.get_contents_as_string()) - eq(k1.metadata, k2.metadata) - eq(k1.cache_control, k2.cache_control) - eq(k1.content_type, k2.content_type) - eq(k1.content_encoding, k2.content_encoding) - eq(k1.content_disposition, k2.content_disposition) - eq(k1.content_language, k2.content_language) - eq(k1.etag, k2.etag) - eq(k1.last_modified, k2.last_modified) - if check_extra: - eq(k1.owner.id, k2.owner.id) - eq(k1.owner.display_name, k2.owner.display_name) - eq(k1.storage_class, k2.storage_class) - eq(k1.size, k2.size) - eq(k1.version_id, k2.version_id) - eq(k1.encrypted, k2.encrypted) - -def check_bucket_eq(zone1, zone2, bucket_name): - log(10, 'comparing bucket=', bucket_name, ' zones={', zone1.zone_name, ', ', zone2.zone_name, '}') - b1 = get_bucket(zone1, bucket_name) - b2 = get_bucket(zone2, bucket_name) - - log(20, 'bucket1 objects:') - for o in b1.get_all_versions(): - log(20, 'o=', o.name) - log(20, 'bucket2 objects:') - for o in b2.get_all_versions(): - log(20, 'o=', o.name) - - for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()): - if k1 is None: - log(0, 'failure: key=', k2.name, ' is missing from zone=', zone1.zone_name) - assert False - if k2 is None: - log(0, 'failure: key=', k1.name, ' is missing from zone=', zone2.zone_name) - assert False - - check_object_eq(k1, k2) - - # now get the keys through a HEAD operation, verify that the available data is the same - k1_head = b1.get_key(k1.name) - k2_head = b2.get_key(k2.name) - - check_object_eq(k1_head, k2_head, False) - - log(5, 'success, bucket identical: bucket=', bucket_name, ' zones={', zone1.zone_name, ', ', zone2.zone_name, '}') - - -def test_object_sync(): - buckets, zone_bucket = create_bucket_per_zone_in_master_zg() - - all_zones = [] - for z in zone_bucket: - all_zones.append(z) - - objnames = [ 'myobj', '_myobj', ':', '&' ] - content = 'asdasd' - - # don't wait for meta sync just yet - for zone, bucket_name in zone_bucket.items(): - for objname in objnames: - k = new_key(zone, bucket_name, objname) - k.set_contents_from_string(content) - - master_zg.meta_checkpoint() - - for source_zone, bucket in zone_bucket.items(): - for target_zone in all_zones: - if source_zone.zone_name == target_zone.zone_name: - continue - - master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - - check_bucket_eq(source_zone, target_zone, bucket) - -def test_object_delete(): - buckets, zone_bucket = create_bucket_per_zone_in_master_zg() - - all_zones = [] - for z in zone_bucket: - all_zones.append(z) - - objname = 'myobj' - content = 'asdasd' - - # don't wait for meta sync just yet - for zone, bucket in zone_bucket.items(): - k = new_key(zone, bucket, objname) - k.set_contents_from_string(content) - - master_zg.meta_checkpoint() - - # check object exists - for source_zone, bucket in zone_bucket.items(): - for target_zone in all_zones: - if source_zone.zone_name == target_zone.zone_name: - continue - - master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - - check_bucket_eq(source_zone, target_zone, bucket) - - # check object removal - for source_zone, bucket in zone_bucket.items(): - k = get_key(source_zone, bucket, objname) - k.delete() - for target_zone in all_zones: - if source_zone.zone_name == target_zone.zone_name: - continue - - master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - - check_bucket_eq(source_zone, target_zone, bucket) - -def get_latest_object_version(key): - for k in key.bucket.list_versions(key.name): - if k.is_latest: - return k - return None - -def test_versioned_object_incremental_sync(): - buckets, zone_bucket = create_bucket_per_zone_in_master_zg() - - # enable versioning - all_zones = [] - for zone, bucket in zone_bucket.items(): - bucket.configure_versioning(True) - all_zones.append(zone) - - master_zg.meta_checkpoint() - - # upload a dummy object to each bucket and wait for sync. this forces each - # bucket to finish a full sync and switch to incremental - for source_zone, bucket in zone_bucket.items(): - new_key(source_zone, bucket, 'dummy').set_contents_from_string('') - for target_zone in all_zones: - if source_zone.zone_name == target_zone.zone_name: - continue - master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - - for _, bucket in zone_bucket.items(): - # create and delete multiple versions of an object from each zone - for zone in all_zones: - obj = 'obj-' + zone.zone_name - k = new_key(zone, bucket, obj) - - k.set_contents_from_string('version1') - v = get_latest_object_version(k) - log(10, 'version1 id=', v.version_id) - # don't delete version1 - this tests that the initial version - # doesn't get squashed into later versions - - # create and delete the following object versions to test that - # the operations don't race with each other during sync - k.set_contents_from_string('version2') - v = get_latest_object_version(k) - log(10, 'version2 id=', v.version_id) - k.bucket.delete_key(obj, version_id=v.version_id) - - k.set_contents_from_string('version3') - v = get_latest_object_version(k) - log(10, 'version3 id=', v.version_id) - k.bucket.delete_key(obj, version_id=v.version_id) - - for source_zone, bucket in zone_bucket.items(): - for target_zone in all_zones: - if source_zone.zone_name == target_zone.zone_name: - continue - master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - check_bucket_eq(source_zone, target_zone, bucket) - -def test_bucket_versioning(): - buckets, zone_bucket = create_bucket_per_zone_in_realm() - - for zone, bucket in zone_bucket.items(): - bucket.configure_versioning(True) - res = bucket.get_versioning_status() - key = 'Versioning' - assert(key in res and res[key] == 'Enabled') - -def test_bucket_acl(): - buckets, zone_bucket = create_bucket_per_zone_in_master_zg() - - for zone, bucket in zone_bucket.items(): - assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner - bucket.set_acl('public-read') - assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers - -def test_bucket_delete_notempty(): - buckets, zone_bucket = create_bucket_per_zone_in_master_zg() - master_zg.meta_checkpoint() - - for zone, bucket_name in zone_bucket.items(): - # upload an object to each bucket on its own zone - conn = zone.get_connection(user) - bucket = conn.get_bucket(bucket_name) - k = bucket.new_key('foo') - k.set_contents_from_string('bar') - # attempt to delete the bucket before this object can sync - try: - conn.delete_bucket(bucket_name) - except boto.exception.S3ResponseError, e: - assert(e.error_code == 'BucketNotEmpty') - continue - assert False # expected 409 BucketNotEmpty - - # assert that each bucket still exists on the master - z1 = master_zg.get_zone('zg1-1') - c1 = z1.get_connection(user) - for _, bucket_name in zone_bucket.items(): - assert c1.get_bucket(bucket_name) - -def test_multi_period_incremental_sync(): - if len(master_zg.clusters) < 3: - from nose.plugins.skip import SkipTest - raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more clusters.") - - buckets, zone_bucket = create_bucket_per_zone_in_master_zg() - - all_zones = [] - for z in zone_bucket: - all_zones.append(z) - - for zone, bucket_name in zone_bucket.items(): - for objname in [ 'p1', '_p1' ]: - k = new_key(zone, bucket_name, objname) - k.set_contents_from_string('asdasd') - master_zg.meta_checkpoint() - - # kill zone 3 gateway to freeze sync status to incremental in first period - z3 = master_zg.get_zone('zg1-3') - z3.cluster.stop_rgw() - - # change master to zone 2 -> period 2 - master_zg.set_master_zone(master_zg.get_zone('zg1-2')) - - for zone, bucket_name in zone_bucket.items(): - if zone == z3: - continue - for objname in [ 'p2', '_p2' ]: - k = new_key(zone, bucket_name, objname) - k.set_contents_from_string('qweqwe') - - # wait for zone 1 to sync - master_zg.zone_meta_checkpoint(master_zg.get_zone('zg1-1')) - - # change master back to zone 1 -> period 3 - master_zg.set_master_zone(master_zg.get_zone('zg1-1')) - - for zone, bucket_name in zone_bucket.items(): - if zone == z3: - continue - for objname in [ 'p3', '_p3' ]: - k = new_key(zone, bucket_name, objname) - k.set_contents_from_string('zxczxc') - - # restart zone 3 gateway and wait for sync - z3.cluster.start_rgw() - master_zg.meta_checkpoint() - - # verify that we end up with the same objects - for source_zone, bucket in zone_bucket.items(): - for target_zone in all_zones: - if source_zone.zone_name == target_zone.zone_name: - continue - - master_zg.zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - - check_bucket_eq(source_zone, target_zone, bucket) - -@attr('destructive') -def test_zonegroup_remove(): - z1 = master_zg.get_zone('zg1-1') - - # try to 'zone delete' zg1-2 from cluster 1 - # must fail with ENOENT because the zone is local to cluster 2 - (_, retcode) = z1.cluster.rgw_admin('zone delete --rgw-zone=zg1-2', False) - assert(retcode == 2) # ENOENT - - # use 'zonegroup remove', expecting success - z1.cluster.rgw_admin('zonegroup remove --rgw-zone=zg1-2', True) - - # another 'zonegroup remove' should fail with ENOENT - (_, retcode) = z1.cluster.rgw_admin('zonegroup remove --rgw-zone=zg1-2', False) - assert(retcode == 2) # ENOENT - - # validate the resulting period - z1.cluster.rgw_admin('period update --commit', True) - master_zg.remove_zone('zg1-2') + return ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(32)) + +def gen_credentials(): + return multisite.Credentials(gen_access_key(), gen_secret()) + +def cluster_name(cluster_num): + return 'c' + str(cluster_num) + +def zonegroup_name(zonegroup_num): + return string.ascii_lowercase[zonegroup_num] + +def zone_name(zonegroup_num, zone_num): + return zonegroup_name(zonegroup_num) + str(zone_num + 1) + +def gateway_port(zonegroup_num, gateway_num): + return 8000 + 100 * zonegroup_num + gateway_num + +def gateway_name(zonegroup_num, zone_num, gateway_num): + return zone_name(zonegroup_num, zone_num) + '-' + str(gateway_num + 1) + +def zone_endpoints(zonegroup_num, zone_num, gateways_per_zone): + endpoints = [] + base = gateway_port(zonegroup_num, zone_num * gateways_per_zone) + for i in range(0, gateways_per_zone): + endpoints.append('http://localhost:' + str(base + i)) + return endpoints + +def get_log_level(log_level): + if log_level >= 20: + return logging.DEBUG + if log_level >= 10: + return logging.INFO + if log_level >= 5: + return logging.WARN + if log_level >= 1: + return logging.ERROR + return logging.CRITICAL + +def setup_logging(log_level_console, log_file, log_level_file): + if log_file: + formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + fh = logging.FileHandler(log_file) + fh.setFormatter(formatter) + fh.setLevel(get_log_level(log_level_file)) + log.addHandler(fh) + + formatter = logging.Formatter('%(levelname)s %(message)s') + ch = logging.StreamHandler() + ch.setFormatter(formatter) + ch.setLevel(get_log_level(log_level_console)) + log.addHandler(ch) def init(parse_args): cfg = configparser.RawConfigParser({ @@ -999,6 +146,8 @@ def init(parse_args): 'gateways_per_zone': 2, 'no_bootstrap': 'false', 'log_level': 20, + 'log_file': None, + 'file_log_level': 20, 'tenant': None, }) try: @@ -1023,6 +172,8 @@ def init(parse_args): parser.add_argument('--gateways-per-zone', type=int, default=cfg.getint(section, 'gateways_per_zone')) parser.add_argument('--no-bootstrap', action='store_true', default=cfg.getboolean(section, 'no_bootstrap')) parser.add_argument('--log-level', type=int, default=cfg.getint(section, 'log_level')) + parser.add_argument('--log-file', type=str, default=cfg.get(section, 'log_file')) + parser.add_argument('--file-log-level', type=int, default=cfg.getint(section, 'file_log_level')) parser.add_argument('--tenant', type=str, default=cfg.get(section, 'tenant')) argv = [] @@ -1031,16 +182,119 @@ def init(parse_args): argv = sys.argv[1:] args = parser.parse_args(argv) + bootstrap = not args.no_bootstrap - global log_level - log_level = args.log_level + setup_logging(args.log_level, args.log_file, args.file_log_level) - master_zg_base_port = 8000 + # start first cluster + c1 = Cluster(cluster_name(1)) + if bootstrap: + c1.start() + clusters = [] + clusters.append(c1) - for i in range(0, args.num_zonegroups): - rgw_multi = RGWMulti(i + 1, int(args.num_zones), int(args.gateways_per_zone), master_zg_base_port + 100 * i, master_zg_base_port) - rgw_multi.setup(not args.no_bootstrap, args.tenant) + admin_creds = gen_credentials() + admin_user = multisite.User('zone.user') + user_creds = gen_credentials() + user = multisite.User('tester') + + realm = multisite.Realm('r') + if bootstrap: + # create the realm on c1 + realm.create(c1) + else: + realm.get(c1) + period = multisite.Period(realm=realm) + realm.current_period = period + + for zg in range(0, args.num_zonegroups): + zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period) + period.zonegroups.append(zonegroup) + + is_master_zg = zg == 0 + if is_master_zg: + period.master_zonegroup = zonegroup + + for z in range(0, args.num_zones): + is_master = z == 0 + # start a cluster, or use c1 for first zone + cluster = None + if is_master_zg and is_master: + cluster = c1 + else: + cluster = Cluster(cluster_name(len(clusters) + 1)) + clusters.append(cluster) + if bootstrap: + cluster.start() + # pull realm configuration from the master's gateway + gateway = realm.meta_master_zone().gateways[0] + realm.pull(cluster, gateway, admin_creds) + + endpoints = zone_endpoints(zg, z, args.gateways_per_zone) + if is_master: + if bootstrap: + # create the zonegroup on its first zone's cluster + arg = [] + if is_master_zg: + arg += ['--master'] + if len(endpoints): # use master zone's endpoints + arg += ['--endpoints', ','.join(endpoints)] + zonegroup.create(cluster, arg) + else: + zonegroup.get(cluster) + + # create the zone in its zonegroup + zone = multisite.Zone(zone_name(zg, z), zonegroup, cluster) + if bootstrap: + arg = admin_creds.credential_args() + if is_master: + arg += ['--master'] + if len(endpoints): + arg += ['--endpoints', ','.join(endpoints)] + zone.create(cluster, arg) + else: + zone.get(cluster) + zonegroup.zones.append(zone) + if is_master: + zonegroup.master_zone = zone + + # update/commit the period + if bootstrap: + period.update(zone, commit=True) + + # start the gateways + for g in range(0, args.gateways_per_zone): + port = gateway_port(zg, g + z * args.gateways_per_zone) + client_id = gateway_name(zg, z, g) + gateway = Gateway(client_id, 'localhost', port, cluster, zone) + if bootstrap: + gateway.start() + zone.gateways.append(gateway) + + if is_master_zg and is_master: + if bootstrap: + # create admin user + arg = ['--display-name', '"Zone User"', '--system'] + arg += admin_creds.credential_args() + admin_user.create(zone, arg) + # create test user + arg = ['--display-name', '"Test User"'] + arg += user_creds.credential_args() + if args.tenant: + cmd += ['--tenant', args.tenant] + user.create(zone, arg) + else: + # read users and update keys + admin_user.info(zone) + admin_creds = admin_user.credentials[0] + user.info(zone) + user_creds = user.credentials[0] + + if not bootstrap: + period.get(c1) + + init_multi(realm, user) def setup_module(): init(False)