mirror of
https://github.com/ceph/ceph
synced 2025-01-03 09:32:43 +00:00
Merge pull request #14433 from cbodley/wip-rgw-multi-py
test/rgw: refactor test_multi.py for use in qa suite Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
commit
205a39c8d9
0
src/test/rgw/rgw_multi/__init__.py
Normal file
0
src/test/rgw/rgw_multi/__init__.py
Normal file
337
src/test/rgw/rgw_multi/multisite.py
Normal file
337
src/test/rgw/rgw_multi/multisite.py
Normal file
@ -0,0 +1,337 @@
|
||||
from abc import ABCMeta, abstractmethod
|
||||
import json
|
||||
|
||||
class Cluster:
|
||||
""" interface to run commands against a distinct ceph cluster """
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
@abstractmethod
|
||||
def admin(self, args = [], **kwargs):
|
||||
""" execute a radosgw-admin command """
|
||||
pass
|
||||
|
||||
class Gateway:
|
||||
""" interface to control a single radosgw instance """
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
def __init__(self, host = None, port = None, cluster = None, zone = None, proto = 'http', connection = None):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.cluster = cluster
|
||||
self.zone = zone
|
||||
self.proto = proto
|
||||
self.connection = connection
|
||||
|
||||
@abstractmethod
|
||||
def start(self, args = []):
|
||||
""" start the gateway with the given args """
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop(self):
|
||||
""" stop the gateway """
|
||||
pass
|
||||
|
||||
def endpoint(self):
|
||||
return '%s://%s:%d' % (self.proto, self.host, self.port)
|
||||
|
||||
class SystemObject:
|
||||
""" interface for system objects, represented in json format and
|
||||
manipulated with radosgw-admin commands """
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
def __init__(self, data = None, uuid = None):
|
||||
self.data = data
|
||||
self.id = uuid
|
||||
if data:
|
||||
self.load_from_json(data)
|
||||
|
||||
@abstractmethod
|
||||
def build_command(self, command):
|
||||
""" return the command line for the given command, including arguments
|
||||
to specify this object """
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def load_from_json(self, data):
|
||||
""" update internal state based on json data """
|
||||
pass
|
||||
|
||||
def command(self, cluster, cmd, args = [], **kwargs):
|
||||
""" run the given command and return the output and retcode """
|
||||
args = self.build_command(cmd) + args
|
||||
return cluster.admin(args, **kwargs)
|
||||
|
||||
def json_command(self, cluster, cmd, args = [], **kwargs):
|
||||
""" run the given command, parse the output and return the resulting
|
||||
data and retcode """
|
||||
(s, r) = self.command(cluster, cmd, args, **kwargs)
|
||||
if r == 0:
|
||||
output = s.decode('utf-8')
|
||||
output = output[output.find('{'):] # trim extra output before json
|
||||
data = json.loads(output)
|
||||
self.load_from_json(data)
|
||||
self.data = data
|
||||
return (self.data, r)
|
||||
|
||||
# mixins for supported commands
|
||||
class Create(object):
|
||||
def create(self, cluster, args = [], **kwargs):
|
||||
""" create the object with the given arguments """
|
||||
return self.json_command(cluster, 'create', args, **kwargs)
|
||||
|
||||
class Delete(object):
|
||||
def delete(self, cluster, args = [], **kwargs):
|
||||
""" delete the object """
|
||||
# not json_command() because delete has no output
|
||||
(_, r) = self.command(cluster, 'delete', args, **kwargs)
|
||||
if r == 0:
|
||||
self.data = None
|
||||
return r
|
||||
|
||||
class Get(object):
|
||||
def get(self, cluster, args = [], **kwargs):
|
||||
""" read the object from storage """
|
||||
kwargs['read_only'] = True
|
||||
return self.json_command(cluster, 'get', args, **kwargs)
|
||||
|
||||
class Set(object):
|
||||
def set(self, cluster, data, args = [], **kwargs):
|
||||
""" set the object by json """
|
||||
kwargs['stdin'] = StringIO(json.dumps(data))
|
||||
return self.json_command(cluster, 'set', args, **kwargs)
|
||||
|
||||
class Modify(object):
|
||||
def modify(self, cluster, args = [], **kwargs):
|
||||
""" modify the object with the given arguments """
|
||||
return self.json_command(cluster, 'modify', args, **kwargs)
|
||||
|
||||
class CreateDelete(Create, Delete): pass
|
||||
class GetSet(Get, Set): pass
|
||||
|
||||
class Zone(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemObject.Modify):
|
||||
def __init__(self, name, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = []):
|
||||
self.name = name
|
||||
self.zonegroup = zonegroup
|
||||
self.cluster = cluster
|
||||
self.gateways = gateways
|
||||
super(Zone, self).__init__(data, zone_id)
|
||||
|
||||
def zone_arg(self):
|
||||
""" command-line argument to specify this zone """
|
||||
return ['--rgw-zone', self.name]
|
||||
|
||||
def zone_args(self):
|
||||
""" command-line arguments to specify this zone/zonegroup/realm """
|
||||
args = self.zone_arg()
|
||||
if self.zonegroup:
|
||||
args += self.zonegroup.zonegroup_args()
|
||||
return args
|
||||
|
||||
def build_command(self, command):
|
||||
""" build a command line for the given command and args """
|
||||
return ['zone', command] + self.zone_args()
|
||||
|
||||
def load_from_json(self, data):
|
||||
""" load the zone from json """
|
||||
self.id = data['id']
|
||||
self.name = data['name']
|
||||
|
||||
def start(self, args = []):
|
||||
""" start all gateways """
|
||||
for g in self.gateways:
|
||||
g.start(args)
|
||||
|
||||
def stop(self):
|
||||
""" stop all gateways """
|
||||
for g in self.gateways:
|
||||
g.stop()
|
||||
|
||||
def period(self):
|
||||
return self.zonegroup.period if self.zonegroup else None
|
||||
|
||||
def realm(self):
|
||||
return self.zonegroup.realm() if self.zonegroup else None
|
||||
|
||||
class ZoneGroup(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemObject.Modify):
|
||||
def __init__(self, name, period = None, data = None, zonegroup_id = None, zones = [], master_zone = None):
|
||||
self.name = name
|
||||
self.period = period
|
||||
self.zones = zones
|
||||
self.master_zone = master_zone
|
||||
super(ZoneGroup, self).__init__(data, zonegroup_id)
|
||||
|
||||
def zonegroup_arg(self):
|
||||
""" command-line argument to specify this zonegroup """
|
||||
return ['--rgw-zonegroup', self.name]
|
||||
|
||||
def zonegroup_args(self):
|
||||
""" command-line arguments to specify this zonegroup/realm """
|
||||
args = self.zonegroup_arg()
|
||||
realm = self.realm()
|
||||
if realm:
|
||||
args += realm.realm_arg()
|
||||
return args
|
||||
|
||||
def build_command(self, command):
|
||||
""" build a command line for the given command and args """
|
||||
return ['zonegroup', command] + self.zonegroup_args()
|
||||
|
||||
def zone_by_id(self, zone_id):
|
||||
""" return the matching zone by id """
|
||||
for zone in self.zones:
|
||||
if zone.id == zone_id:
|
||||
return zone
|
||||
return None
|
||||
|
||||
def load_from_json(self, data):
|
||||
""" load the zonegroup from json """
|
||||
self.id = data['id']
|
||||
self.name = data['name']
|
||||
master_id = data['master_zone']
|
||||
if not self.master_zone or master_id != self.master_zone.id:
|
||||
self.master_zone = self.zone_by_id(master_id)
|
||||
|
||||
def add(self, cluster, zone, args = [], **kwargs):
|
||||
""" add an existing zone to the zonegroup """
|
||||
args += zone.zone_arg()
|
||||
(data, r) = self.json_command(cluster, 'add', args, **kwargs)
|
||||
if r == 0:
|
||||
zone.zonegroup = self
|
||||
self.zones.append(zone)
|
||||
return (data, r)
|
||||
|
||||
def remove(self, cluster, zone, args = [], **kwargs):
|
||||
""" remove an existing zone from the zonegroup """
|
||||
args += zone.zone_arg()
|
||||
(data, r) = self.json_command(cluster, 'remove', args, **kwargs)
|
||||
if r == 0:
|
||||
zone.zonegroup = None
|
||||
self.zones.remove(zone)
|
||||
return (data, r)
|
||||
|
||||
def realm(self):
|
||||
return self.period.realm if self.period else None
|
||||
|
||||
class Period(SystemObject, SystemObject.Get):
|
||||
def __init__(self, realm = None, data = None, period_id = None, zonegroups = [], master_zonegroup = None):
|
||||
self.realm = realm
|
||||
self.zonegroups = zonegroups
|
||||
self.master_zonegroup = master_zonegroup
|
||||
super(Period, self).__init__(data, period_id)
|
||||
|
||||
def zonegroup_by_id(self, zonegroup_id):
|
||||
""" return the matching zonegroup by id """
|
||||
for zonegroup in self.zonegroups:
|
||||
if zonegroup.id == zonegroup_id:
|
||||
return zonegroup
|
||||
return None
|
||||
|
||||
def build_command(self, command):
|
||||
""" build a command line for the given command and args """
|
||||
return ['period', command]
|
||||
|
||||
def load_from_json(self, data):
|
||||
""" load the period from json """
|
||||
self.id = data['id']
|
||||
master_id = data['master_zonegroup']
|
||||
if not self.master_zonegroup or master_id != self.master_zonegroup.id:
|
||||
self.master_zonegroup = self.zonegroup_by_id(master_id)
|
||||
|
||||
def update(self, zone, args = [], **kwargs):
|
||||
""" run 'radosgw-admin period update' on the given zone """
|
||||
assert(zone.cluster)
|
||||
args = zone.zone_args() + args
|
||||
if kwargs.pop('commit', False):
|
||||
args.append('--commit')
|
||||
return self.json_command(zone.cluster, 'update', args, **kwargs)
|
||||
|
||||
def commit(self, zone, args = [], **kwargs):
|
||||
""" run 'radosgw-admin period commit' on the given zone """
|
||||
assert(zone.cluster)
|
||||
args = zone.zone_args() + args
|
||||
return self.json_command(zone.cluster, 'commit', args, **kwargs)
|
||||
|
||||
class Realm(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet):
|
||||
def __init__(self, name, period = None, data = None, realm_id = None):
|
||||
self.name = name
|
||||
self.current_period = period
|
||||
super(Realm, self).__init__(data, realm_id)
|
||||
|
||||
def realm_arg(self):
|
||||
""" return the command-line arguments that specify this realm """
|
||||
return ['--rgw-realm', self.name]
|
||||
|
||||
def build_command(self, command):
|
||||
""" build a command line for the given command and args """
|
||||
return ['realm', command] + self.realm_arg()
|
||||
|
||||
def load_from_json(self, data):
|
||||
""" load the realm from json """
|
||||
self.id = data['id']
|
||||
|
||||
def pull(self, cluster, gateway, credentials, args = [], **kwargs):
|
||||
""" pull an existing realm from the given gateway """
|
||||
args += ['--url', gateway.endpoint()]
|
||||
args += credentials.credential_args()
|
||||
return self.json_command(cluster, 'pull', args, **kwargs)
|
||||
|
||||
def master_zonegroup(self):
|
||||
""" return the current period's master zonegroup """
|
||||
if self.current_period is None:
|
||||
return None
|
||||
return self.current_period.master_zonegroup
|
||||
|
||||
def meta_master_zone(self):
|
||||
""" return the current period's metadata master zone """
|
||||
zonegroup = self.master_zonegroup()
|
||||
if zonegroup is None:
|
||||
return None
|
||||
return zonegroup.master_zone
|
||||
|
||||
class Credentials:
|
||||
def __init__(self, access_key, secret):
|
||||
self.access_key = access_key
|
||||
self.secret = secret
|
||||
|
||||
def credential_args(self):
|
||||
return ['--access-key', self.access_key, '--secret', self.secret]
|
||||
|
||||
class User(SystemObject):
|
||||
def __init__(self, uid, data = None, name = None, credentials = []):
|
||||
self.name = name
|
||||
self.credentials = credentials
|
||||
super(User, self).__init__(data, uid)
|
||||
|
||||
def user_arg(self):
|
||||
""" command-line argument to specify this user """
|
||||
return ['--uid', self.id]
|
||||
|
||||
def build_command(self, command):
|
||||
""" build a command line for the given command and args """
|
||||
return ['user', command] + self.user_arg()
|
||||
|
||||
def load_from_json(self, data):
|
||||
""" load the user from json """
|
||||
self.id = data['user_id']
|
||||
self.name = data['display_name']
|
||||
self.credentials = [Credentials(k['access_key'], k['secret_key']) for k in data['keys']]
|
||||
|
||||
def create(self, zone, args = [], **kwargs):
|
||||
""" create the user with the given arguments """
|
||||
assert(zone.cluster)
|
||||
args += zone.zone_args()
|
||||
return self.json_command(zone.cluster, 'create', args, **kwargs)
|
||||
|
||||
def info(self, zone, args = [], **kwargs):
|
||||
""" read the user from storage """
|
||||
assert(zone.cluster)
|
||||
args += zone.zone_args()
|
||||
kwargs['read_only'] = True
|
||||
return self.json_command(zone.cluster, 'info', args, **kwargs)
|
||||
|
||||
def delete(self, zone, args = [], **kwargs):
|
||||
""" delete the user """
|
||||
assert(zone.cluster)
|
||||
args += zone.zone_args()
|
||||
return self.command(zone.cluster, 'delete', args, **kwargs)
|
718
src/test/rgw/rgw_multi/tests.py
Normal file
718
src/test/rgw/rgw_multi/tests.py
Normal file
@ -0,0 +1,718 @@
|
||||
import json
|
||||
import random
|
||||
import string
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
try:
|
||||
from itertools import izip_longest as zip_longest
|
||||
except ImportError:
|
||||
from itertools import zip_longest
|
||||
|
||||
import boto
|
||||
import boto.s3.connection
|
||||
|
||||
from nose.tools import eq_ as eq
|
||||
from nose.plugins.attrib import attr
|
||||
from nose.plugins.skip import SkipTest
|
||||
|
||||
from .multisite import Zone
|
||||
|
||||
# rgw multisite tests, written against the interfaces provided in rgw_multi.
|
||||
# these tests must be initialized and run by another module that provides
|
||||
# implementations of these interfaces by calling init_multi()
|
||||
realm = None
|
||||
user = None
|
||||
def init_multi(_realm, _user):
|
||||
global realm
|
||||
realm = _realm
|
||||
global user
|
||||
user = _user
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
num_buckets = 0
|
||||
run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
|
||||
|
||||
def get_gateway_connection(gateway, credentials):
|
||||
""" connect to the given gateway """
|
||||
if gateway.connection is None:
|
||||
gateway.connection = boto.connect_s3(
|
||||
aws_access_key_id = credentials.access_key,
|
||||
aws_secret_access_key = credentials.secret,
|
||||
host = gateway.host,
|
||||
port = gateway.port,
|
||||
is_secure = False,
|
||||
calling_format = boto.s3.connection.OrdinaryCallingFormat())
|
||||
return gateway.connection
|
||||
|
||||
def get_zone_connection(zone, credentials):
|
||||
""" connect to the zone's first gateway """
|
||||
if isinstance(credentials, list):
|
||||
credentials = credentials[0]
|
||||
return get_gateway_connection(zone.gateways[0], credentials)
|
||||
|
||||
def meta_sync_status(zone):
|
||||
while True:
|
||||
cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
|
||||
meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
|
||||
if retcode == 0:
|
||||
break
|
||||
assert(retcode == 2) # ENOENT
|
||||
time.sleep(5)
|
||||
|
||||
meta_sync_status_json = meta_sync_status_json.decode('utf-8')
|
||||
log.debug('current meta sync status=%s', meta_sync_status_json)
|
||||
sync_status = json.loads(meta_sync_status_json)
|
||||
|
||||
global_sync_status=sync_status['sync_status']['info']['status']
|
||||
num_shards=sync_status['sync_status']['info']['num_shards']
|
||||
|
||||
sync_markers=sync_status['sync_status']['markers']
|
||||
log.debug('sync_markers=%s', sync_markers)
|
||||
assert(num_shards == len(sync_markers))
|
||||
|
||||
markers = {i: m['val']['marker'] for i, m in enumerate(sync_markers)}
|
||||
return (num_shards, markers)
|
||||
|
||||
def meta_master_log_status(master_zone):
|
||||
cmd = ['mdlog', 'status'] + master_zone.zone_args()
|
||||
mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
|
||||
mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
|
||||
|
||||
markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
|
||||
log.debug('master meta markers=%s', markers)
|
||||
return markers
|
||||
|
||||
def compare_meta_status(zone, log_status, sync_status):
|
||||
if len(log_status) != len(sync_status):
|
||||
log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
|
||||
return False
|
||||
|
||||
msg = ''
|
||||
for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
|
||||
if l > s:
|
||||
if len(msg):
|
||||
msg += ', '
|
||||
msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
|
||||
|
||||
if len(msg) > 0:
|
||||
log.warning('zone %s behind master: %s', zone.name, msg)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
|
||||
if not meta_master_zone:
|
||||
meta_master_zone = zone.realm().meta_master_zone()
|
||||
if not master_status:
|
||||
master_status = meta_master_log_status(meta_master_zone)
|
||||
|
||||
log.info('starting meta checkpoint for zone=%s', zone.name)
|
||||
|
||||
while True:
|
||||
num_shards, sync_status = meta_sync_status(zone)
|
||||
log.debug('log_status=%s', master_status)
|
||||
log.debug('sync_status=%s', sync_status)
|
||||
if compare_meta_status(zone, master_status, sync_status):
|
||||
break
|
||||
time.sleep(5)
|
||||
|
||||
log.info('finish meta checkpoint for zone=%s', zone.name)
|
||||
|
||||
def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
|
||||
if not meta_master_zone:
|
||||
meta_master_zone = zonegroup.realm().meta_master_zone()
|
||||
if not master_status:
|
||||
master_status = meta_master_log_status(meta_master_zone)
|
||||
|
||||
for zone in zonegroup.zones:
|
||||
if zone == meta_master_zone:
|
||||
continue
|
||||
zone_meta_checkpoint(zone, meta_master_zone, master_status)
|
||||
|
||||
def realm_meta_checkpoint(realm):
|
||||
log.info('meta checkpoint')
|
||||
|
||||
meta_master_zone = realm.meta_master_zone()
|
||||
master_status = meta_master_log_status(meta_master_zone)
|
||||
|
||||
for zonegroup in realm.current_period.zonegroups:
|
||||
zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
|
||||
|
||||
def data_sync_status(target_zone, source_zone):
|
||||
if target_zone == source_zone:
|
||||
return None
|
||||
|
||||
while True:
|
||||
cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
|
||||
cmd += ['--source-zone', source_zone.name]
|
||||
data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
|
||||
if retcode == 0:
|
||||
break
|
||||
|
||||
assert(retcode == 2) # ENOENT
|
||||
|
||||
data_sync_status_json = data_sync_status_json.decode('utf-8')
|
||||
log.debug('current data sync status=%s', data_sync_status_json)
|
||||
sync_status = json.loads(data_sync_status_json)
|
||||
|
||||
global_sync_status=sync_status['sync_status']['info']['status']
|
||||
num_shards=sync_status['sync_status']['info']['num_shards']
|
||||
|
||||
sync_markers=sync_status['sync_status']['markers']
|
||||
log.debug('sync_markers=%s', sync_markers)
|
||||
assert(num_shards == len(sync_markers))
|
||||
|
||||
markers={}
|
||||
for i in range(num_shards):
|
||||
markers[i] = sync_markers[i]['val']['marker']
|
||||
|
||||
return (num_shards, markers)
|
||||
|
||||
def bucket_sync_status(target_zone, source_zone, bucket_name):
|
||||
if target_zone == source_zone:
|
||||
return None
|
||||
|
||||
cmd = ['bucket', 'sync', 'status'] + target_zone.zone_args()
|
||||
cmd += ['--source-zone', source_zone.name]
|
||||
cmd += ['--bucket', bucket_name]
|
||||
while True:
|
||||
bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
|
||||
if retcode == 0:
|
||||
break
|
||||
|
||||
assert(retcode == 2) # ENOENT
|
||||
|
||||
bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
|
||||
log.debug('current bucket sync status=%s', bucket_sync_status_json)
|
||||
sync_status = json.loads(bucket_sync_status_json)
|
||||
|
||||
markers={}
|
||||
for entry in sync_status:
|
||||
val = entry['val']
|
||||
if val['status'] == 'incremental-sync':
|
||||
pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
|
||||
else:
|
||||
pos = ''
|
||||
markers[entry['key']] = pos
|
||||
|
||||
return markers
|
||||
|
||||
def data_source_log_status(source_zone):
|
||||
source_cluster = source_zone.cluster
|
||||
cmd = ['datalog', 'status'] + source_zone.zone_args()
|
||||
datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True)
|
||||
datalog_status = json.loads(datalog_status_json.decode('utf-8'))
|
||||
|
||||
markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
|
||||
log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
|
||||
return markers
|
||||
|
||||
def bucket_source_log_status(source_zone, bucket_name):
|
||||
cmd = ['bilog', 'status'] + source_zone.zone_args()
|
||||
cmd += ['--bucket', bucket_name]
|
||||
source_cluster = source_zone.cluster
|
||||
bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
|
||||
bilog_status = json.loads(bilog_status_json.decode('utf-8'))
|
||||
|
||||
m={}
|
||||
markers={}
|
||||
try:
|
||||
m = bilog_status['markers']
|
||||
except:
|
||||
pass
|
||||
|
||||
for s in m:
|
||||
key = s['key']
|
||||
val = s['val']
|
||||
markers[key] = val
|
||||
|
||||
log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
|
||||
return markers
|
||||
|
||||
def compare_data_status(target_zone, source_zone, log_status, sync_status):
|
||||
if len(log_status) != len(sync_status):
|
||||
log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
|
||||
return False
|
||||
|
||||
msg = ''
|
||||
for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
|
||||
if l > s:
|
||||
if len(msg):
|
||||
msg += ', '
|
||||
msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
|
||||
|
||||
if len(msg) > 0:
|
||||
log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
|
||||
if len(log_status) != len(sync_status):
|
||||
log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
|
||||
return False
|
||||
|
||||
msg = ''
|
||||
for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
|
||||
if l > s:
|
||||
if len(msg):
|
||||
msg += ', '
|
||||
msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
|
||||
|
||||
if len(msg) > 0:
|
||||
log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def zone_data_checkpoint(target_zone, source_zone):
|
||||
if target_zone == source_zone:
|
||||
return
|
||||
|
||||
log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
|
||||
|
||||
while True:
|
||||
log_status = data_source_log_status(source_zone)
|
||||
num_shards, sync_status = data_sync_status(target_zone, source_zone)
|
||||
|
||||
log.debug('log_status=%s', log_status)
|
||||
log.debug('sync_status=%s', sync_status)
|
||||
|
||||
if compare_data_status(target_zone, source_zone, log_status, sync_status):
|
||||
break
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
log.info('finished data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
|
||||
|
||||
def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
|
||||
if target_zone == source_zone:
|
||||
return
|
||||
|
||||
log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
|
||||
|
||||
while True:
|
||||
log_status = bucket_source_log_status(source_zone, bucket_name)
|
||||
sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
|
||||
|
||||
log.debug('log_status=%s', log_status)
|
||||
log.debug('sync_status=%s', sync_status)
|
||||
|
||||
if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
|
||||
break
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
|
||||
|
||||
def set_master_zone(zone):
|
||||
zone.modify(zone.cluster, ['--master'])
|
||||
zonegroup = zone.zonegroup
|
||||
zonegroup.period.update(zone, commit=True)
|
||||
zonegroup.master_zone = zone
|
||||
|
||||
def gen_bucket_name():
|
||||
global num_buckets
|
||||
|
||||
num_buckets += 1
|
||||
return run_prefix + '-' + str(num_buckets)
|
||||
|
||||
def check_all_buckets_exist(zone, buckets):
|
||||
conn = get_zone_connection(zone, user.credentials)
|
||||
for b in buckets:
|
||||
try:
|
||||
conn.get_bucket(b)
|
||||
except:
|
||||
log.critical('zone %s does not contain bucket %s', zone.name, b)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def check_all_buckets_dont_exist(zone, buckets):
|
||||
conn = get_zone_connection(zone, user.credentials)
|
||||
for b in buckets:
|
||||
try:
|
||||
conn.get_bucket(b)
|
||||
except:
|
||||
continue
|
||||
|
||||
log.critical('zone %s contains bucket %s', zone.zone, b)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def create_bucket_per_zone(zonegroup):
|
||||
buckets = []
|
||||
zone_bucket = {}
|
||||
for zone in zonegroup.zones:
|
||||
conn = get_zone_connection(zone, user.credentials)
|
||||
bucket_name = gen_bucket_name()
|
||||
log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
|
||||
bucket = conn.create_bucket(bucket_name)
|
||||
buckets.append(bucket_name)
|
||||
zone_bucket[zone] = bucket
|
||||
|
||||
return buckets, zone_bucket
|
||||
|
||||
def create_bucket_per_zone_in_realm():
|
||||
buckets = []
|
||||
zone_bucket = {}
|
||||
for zonegroup in realm.current_period.zonegroups:
|
||||
b, z = create_bucket_per_zone(zonegroup)
|
||||
buckets.extend(b)
|
||||
zone_bucket.update(z)
|
||||
return buckets, zone_bucket
|
||||
|
||||
def test_bucket_create():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
buckets, _ = create_bucket_per_zone(zonegroup)
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
for zone in zonegroup.zones:
|
||||
assert check_all_buckets_exist(zone, buckets)
|
||||
|
||||
def test_bucket_recreate():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
buckets, _ = create_bucket_per_zone(zonegroup)
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
for zone in zonegroup.zones:
|
||||
assert check_all_buckets_exist(zone, buckets)
|
||||
|
||||
# recreate buckets on all zones, make sure they weren't removed
|
||||
for zone in zonegroup.zones:
|
||||
for bucket_name in buckets:
|
||||
conn = get_zone_connection(zone, user.credentials)
|
||||
bucket = conn.create_bucket(bucket_name)
|
||||
|
||||
for zone in zonegroup.zones:
|
||||
assert check_all_buckets_exist(zone, buckets)
|
||||
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
for zone in zonegroup.zones:
|
||||
assert check_all_buckets_exist(zone, buckets)
|
||||
|
||||
def test_bucket_remove():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
buckets, zone_bucket = create_bucket_per_zone(zonegroup)
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
for zone in zonegroup.zones:
|
||||
assert check_all_buckets_exist(zone, buckets)
|
||||
|
||||
for zone, bucket_name in zone_bucket.items():
|
||||
conn = get_zone_connection(zone, user.credentials)
|
||||
conn.delete_bucket(bucket_name)
|
||||
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
for zone in zonegroup.zones:
|
||||
assert check_all_buckets_dont_exist(zone, buckets)
|
||||
|
||||
def get_bucket(zone, bucket_name):
|
||||
conn = get_zone_connection(zone, user.credentials)
|
||||
return conn.get_bucket(bucket_name)
|
||||
|
||||
def get_key(zone, bucket_name, obj_name):
|
||||
b = get_bucket(zone, bucket_name)
|
||||
return b.get_key(obj_name)
|
||||
|
||||
def new_key(zone, bucket_name, obj_name):
|
||||
b = get_bucket(zone, bucket_name)
|
||||
return b.new_key(obj_name)
|
||||
|
||||
def check_object_eq(k1, k2, check_extra = True):
|
||||
assert k1
|
||||
assert k2
|
||||
log.debug('comparing key name=%s', k1.name)
|
||||
eq(k1.name, k2.name)
|
||||
eq(k1.get_contents_as_string(), k2.get_contents_as_string())
|
||||
eq(k1.metadata, k2.metadata)
|
||||
eq(k1.cache_control, k2.cache_control)
|
||||
eq(k1.content_type, k2.content_type)
|
||||
eq(k1.content_encoding, k2.content_encoding)
|
||||
eq(k1.content_disposition, k2.content_disposition)
|
||||
eq(k1.content_language, k2.content_language)
|
||||
eq(k1.etag, k2.etag)
|
||||
eq(k1.last_modified, k2.last_modified)
|
||||
if check_extra:
|
||||
eq(k1.owner.id, k2.owner.id)
|
||||
eq(k1.owner.display_name, k2.owner.display_name)
|
||||
eq(k1.storage_class, k2.storage_class)
|
||||
eq(k1.size, k2.size)
|
||||
eq(k1.version_id, k2.version_id)
|
||||
eq(k1.encrypted, k2.encrypted)
|
||||
|
||||
def check_bucket_eq(zone1, zone2, bucket_name):
|
||||
log.info('comparing bucket=%s zones={%s, %s}', bucket_name, zone1.name, zone2.name)
|
||||
b1 = get_bucket(zone1, bucket_name)
|
||||
b2 = get_bucket(zone2, bucket_name)
|
||||
|
||||
log.debug('bucket1 objects:')
|
||||
for o in b1.get_all_versions():
|
||||
log.debug('o=%s', o.name)
|
||||
log.debug('bucket2 objects:')
|
||||
for o in b2.get_all_versions():
|
||||
log.debug('o=%s', o.name)
|
||||
|
||||
for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
|
||||
if k1 is None:
|
||||
log.critical('key=%s is missing from zone=%s', k2.name, zone1.name)
|
||||
assert False
|
||||
if k2 is None:
|
||||
log.critical('key=%s is missing from zone=%s', k1.name, zone2.name)
|
||||
assert False
|
||||
|
||||
check_object_eq(k1, k2)
|
||||
|
||||
# now get the keys through a HEAD operation, verify that the available data is the same
|
||||
k1_head = b1.get_key(k1.name)
|
||||
k2_head = b2.get_key(k2.name)
|
||||
|
||||
check_object_eq(k1_head, k2_head, False)
|
||||
|
||||
log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, zone1.name, zone2.name)
|
||||
|
||||
|
||||
def test_object_sync():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
buckets, zone_bucket = create_bucket_per_zone(zonegroup)
|
||||
|
||||
objnames = [ 'myobj', '_myobj', ':', '&' ]
|
||||
content = 'asdasd'
|
||||
|
||||
# don't wait for meta sync just yet
|
||||
for zone, bucket_name in zone_bucket.items():
|
||||
for objname in objnames:
|
||||
k = new_key(zone, bucket_name, objname)
|
||||
k.set_contents_from_string(content)
|
||||
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
for source_zone, bucket in zone_bucket.items():
|
||||
for target_zone in zonegroup.zones:
|
||||
if source_zone == target_zone:
|
||||
continue
|
||||
|
||||
zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
|
||||
check_bucket_eq(source_zone, target_zone, bucket)
|
||||
|
||||
def test_object_delete():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
buckets, zone_bucket = create_bucket_per_zone(zonegroup)
|
||||
|
||||
objname = 'myobj'
|
||||
content = 'asdasd'
|
||||
|
||||
# don't wait for meta sync just yet
|
||||
for zone, bucket in zone_bucket.items():
|
||||
k = new_key(zone, bucket, objname)
|
||||
k.set_contents_from_string(content)
|
||||
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
# check object exists
|
||||
for source_zone, bucket in zone_bucket.items():
|
||||
for target_zone in zonegroup.zones:
|
||||
if source_zone == target_zone:
|
||||
continue
|
||||
|
||||
zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
|
||||
check_bucket_eq(source_zone, target_zone, bucket)
|
||||
|
||||
# check object removal
|
||||
for source_zone, bucket in zone_bucket.items():
|
||||
k = get_key(source_zone, bucket, objname)
|
||||
k.delete()
|
||||
for target_zone in zonegroup.zones:
|
||||
if source_zone == target_zone:
|
||||
continue
|
||||
|
||||
zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
|
||||
check_bucket_eq(source_zone, target_zone, bucket)
|
||||
|
||||
def get_latest_object_version(key):
|
||||
for k in key.bucket.list_versions(key.name):
|
||||
if k.is_latest:
|
||||
return k
|
||||
return None
|
||||
|
||||
def test_versioned_object_incremental_sync():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
buckets, zone_bucket = create_bucket_per_zone(zonegroup)
|
||||
|
||||
# enable versioning
|
||||
for zone, bucket in zone_bucket.items():
|
||||
bucket.configure_versioning(True)
|
||||
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
# upload a dummy object to each bucket and wait for sync. this forces each
|
||||
# bucket to finish a full sync and switch to incremental
|
||||
for source_zone, bucket in zone_bucket.items():
|
||||
new_key(source_zone, bucket, 'dummy').set_contents_from_string('')
|
||||
for target_zone in zonegroup.zones:
|
||||
if source_zone == target_zone:
|
||||
continue
|
||||
zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
|
||||
|
||||
for _, bucket in zone_bucket.items():
|
||||
# create and delete multiple versions of an object from each zone
|
||||
for zone in zonegroup.zones:
|
||||
obj = 'obj-' + zone.name
|
||||
k = new_key(zone, bucket, obj)
|
||||
|
||||
k.set_contents_from_string('version1')
|
||||
v = get_latest_object_version(k)
|
||||
log.debug('version1 id=%s', v.version_id)
|
||||
# don't delete version1 - this tests that the initial version
|
||||
# doesn't get squashed into later versions
|
||||
|
||||
# create and delete the following object versions to test that
|
||||
# the operations don't race with each other during sync
|
||||
k.set_contents_from_string('version2')
|
||||
v = get_latest_object_version(k)
|
||||
log.debug('version2 id=%s', v.version_id)
|
||||
k.bucket.delete_key(obj, version_id=v.version_id)
|
||||
|
||||
k.set_contents_from_string('version3')
|
||||
v = get_latest_object_version(k)
|
||||
log.debug('version3 id=%s', v.version_id)
|
||||
k.bucket.delete_key(obj, version_id=v.version_id)
|
||||
|
||||
for source_zone, bucket in zone_bucket.items():
|
||||
for target_zone in zonegroup.zones:
|
||||
if source_zone == target_zone:
|
||||
continue
|
||||
zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
|
||||
check_bucket_eq(source_zone, target_zone, bucket)
|
||||
|
||||
def test_bucket_versioning():
|
||||
buckets, zone_bucket = create_bucket_per_zone_in_realm()
|
||||
for zone, bucket in zone_bucket.items():
|
||||
bucket.configure_versioning(True)
|
||||
res = bucket.get_versioning_status()
|
||||
key = 'Versioning'
|
||||
assert(key in res and res[key] == 'Enabled')
|
||||
|
||||
def test_bucket_acl():
|
||||
buckets, zone_bucket = create_bucket_per_zone_in_realm()
|
||||
for zone, bucket in zone_bucket.items():
|
||||
assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
|
||||
bucket.set_acl('public-read')
|
||||
assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
|
||||
|
||||
def test_bucket_delete_notempty():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
buckets, zone_bucket = create_bucket_per_zone(zonegroup)
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
for zone, bucket_name in zone_bucket.items():
|
||||
# upload an object to each bucket on its own zone
|
||||
conn = get_zone_connection(zone, user.credentials)
|
||||
bucket = conn.get_bucket(bucket_name)
|
||||
k = bucket.new_key('foo')
|
||||
k.set_contents_from_string('bar')
|
||||
# attempt to delete the bucket before this object can sync
|
||||
try:
|
||||
conn.delete_bucket(bucket_name)
|
||||
except boto.exception.S3ResponseError as e:
|
||||
assert(e.error_code == 'BucketNotEmpty')
|
||||
continue
|
||||
assert False # expected 409 BucketNotEmpty
|
||||
|
||||
# assert that each bucket still exists on the master
|
||||
c1 = get_zone_connection(zonegroup.master_zone, user.credentials)
|
||||
for _, bucket_name in zone_bucket.items():
|
||||
assert c1.get_bucket(bucket_name)
|
||||
|
||||
def test_multi_period_incremental_sync():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
if len(zonegroup.zones) < 3:
|
||||
raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
|
||||
|
||||
buckets, zone_bucket = create_bucket_per_zone(zonegroup)
|
||||
|
||||
for zone, bucket_name in zone_bucket.items():
|
||||
for objname in [ 'p1', '_p1' ]:
|
||||
k = new_key(zone, bucket_name, objname)
|
||||
k.set_contents_from_string('asdasd')
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
z1, z2, z3 = zonegroup.zones[0:3]
|
||||
assert(z1 == zonegroup.master_zone)
|
||||
|
||||
# kill zone 3 gateways to freeze sync status to incremental in first period
|
||||
z3.stop()
|
||||
|
||||
# change master to zone 2 -> period 2
|
||||
set_master_zone(z2)
|
||||
|
||||
for zone, bucket_name in zone_bucket.items():
|
||||
if zone == z3:
|
||||
continue
|
||||
for objname in [ 'p2', '_p2' ]:
|
||||
k = new_key(zone, bucket_name, objname)
|
||||
k.set_contents_from_string('qweqwe')
|
||||
|
||||
# wait for zone 1 to sync
|
||||
zone_meta_checkpoint(z1)
|
||||
|
||||
# change master back to zone 1 -> period 3
|
||||
set_master_zone(z1)
|
||||
|
||||
for zone, bucket_name in zone_bucket.items():
|
||||
if zone == z3:
|
||||
continue
|
||||
for objname in [ 'p3', '_p3' ]:
|
||||
k = new_key(zone, bucket_name, objname)
|
||||
k.set_contents_from_string('zxczxc')
|
||||
|
||||
# restart zone 3 gateway and wait for sync
|
||||
z3.start()
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
|
||||
# verify that we end up with the same objects
|
||||
for source_zone, bucket in zone_bucket.items():
|
||||
for target_zone in zonegroup.zones:
|
||||
if source_zone == target_zone:
|
||||
continue
|
||||
|
||||
zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
|
||||
check_bucket_eq(source_zone, target_zone, bucket)
|
||||
|
||||
def test_zonegroup_remove():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
if len(zonegroup.zones) < 2:
|
||||
raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
|
||||
|
||||
zonegroup_meta_checkpoint(zonegroup)
|
||||
z1, z2 = zonegroup.zones[0:2]
|
||||
c1, c2 = (z1.cluster, z2.cluster)
|
||||
|
||||
# create a new zone in zonegroup on c2 and commit
|
||||
zone = Zone('remove', zonegroup, c2)
|
||||
zone.create(c2)
|
||||
zonegroup.zones.append(zone)
|
||||
zonegroup.period.update(zone, commit=True)
|
||||
|
||||
# try to 'zone delete' the new zone from cluster 1
|
||||
# must fail with ENOENT because the zone is local to cluster 2
|
||||
retcode = zone.delete(c1, check_retcode=False)
|
||||
assert(retcode == 2) # ENOENT
|
||||
|
||||
# use 'zonegroup remove', expecting success
|
||||
zonegroup.remove(c1, zone)
|
||||
|
||||
# another 'zonegroup remove' should fail with ENOENT
|
||||
_, retcode = zonegroup.remove(c1, zone, check_retcode=False)
|
||||
assert(retcode == 2) # ENOENT
|
||||
|
||||
# delete the new zone
|
||||
zone.delete(c2)
|
||||
|
||||
# validate the resulting period
|
||||
zonegroup.period.update(z1, commit=True)
|
@ -59,101 +59,98 @@ root_path=`(cd $script_dir/../..; pwd)`
|
||||
mstart=$root_path/mstart.sh
|
||||
mstop=$root_path/mstop.sh
|
||||
mrun=$root_path/mrun
|
||||
mrgw=$root_path/mrgw.sh
|
||||
|
||||
function start_ceph_cluster {
|
||||
[ $# -ne 2 ] && echo "start_ceph_cluster() needs 2 param" && exit 1
|
||||
[ $# -ne 1 ] && echo "start_ceph_cluster() needs 1 param" && exit 1
|
||||
|
||||
echo "$mstart zg$1-c$2"
|
||||
echo "$mstart $1"
|
||||
}
|
||||
|
||||
function rgw_admin {
|
||||
[ $# -lt 2 ] && echo "rgw_admin() needs 2 param" && exit 1
|
||||
[ $# -lt 1 ] && echo "rgw_admin() needs 1 param" && exit 1
|
||||
|
||||
echo "$mrun zg$1-c$2 radosgw-admin"
|
||||
echo "$mrun $1 radosgw-admin"
|
||||
}
|
||||
|
||||
function rgw {
|
||||
[ $# -ne 3 ] && echo "rgw() needs 3 params" && exit 1
|
||||
[ $# -ne 2 ] && echo "rgw() needs 2 params" && exit 1
|
||||
|
||||
echo "$root_path/mrgw.sh zg$1-c$2 $3 $rgw_flags"
|
||||
echo "$mrgw $1 $2 $rgw_flags"
|
||||
}
|
||||
|
||||
function init_first_zone {
|
||||
[ $# -ne 8 ] && echo "init_first_zone() needs 8 params" && exit 1
|
||||
[ $# -ne 7 ] && echo "init_first_zone() needs 7 params" && exit 1
|
||||
|
||||
zgid=$1
|
||||
cid=$2
|
||||
realm=$3
|
||||
zg=$4
|
||||
zone=$5
|
||||
cid=$1
|
||||
realm=$2
|
||||
zg=$3
|
||||
zone=$4
|
||||
endpoints=$5
|
||||
|
||||
access_key=$6
|
||||
secret=$7
|
||||
|
||||
# initialize realm
|
||||
x $(rgw_admin $cid) realm create --rgw-realm=$realm
|
||||
|
||||
# create zonegroup, zone
|
||||
x $(rgw_admin $cid) zonegroup create --rgw-zonegroup=$zg --master --default
|
||||
x $(rgw_admin $cid) zone create --rgw-zonegroup=$zg --rgw-zone=$zone --access-key=${access_key} --secret=${secret} --endpoints=$endpoints --default
|
||||
x $(rgw_admin $cid) user create --uid=zone.user --display-name="Zone User" --access-key=${access_key} --secret=${secret} --system
|
||||
|
||||
x $(rgw_admin $cid) period update --commit
|
||||
}
|
||||
|
||||
function init_zone_in_existing_zg {
|
||||
[ $# -ne 8 ] && echo "init_zone_in_existing_zg() needs 8 params" && exit 1
|
||||
|
||||
cid=$1
|
||||
realm=$2
|
||||
zg=$3
|
||||
zone=$4
|
||||
master_zg_zone1_port=$5
|
||||
endpoints=$6
|
||||
|
||||
access_key=$7
|
||||
secret=$8
|
||||
|
||||
# initialize realm
|
||||
x $(rgw_admin $zgid $cid) realm create --rgw-realm=$realm
|
||||
|
||||
# create zonegroup, zone
|
||||
x $(rgw_admin $zgid $cid) zonegroup create --rgw-zonegroup=$zg --master --default
|
||||
x $(rgw_admin $zgid $cid) zone create --rgw-zonegroup=$zg --rgw-zone=$zone --access-key=${access_key} --secret=${secret} --endpoints=$endpoints --default
|
||||
x $(rgw_admin $zgid $cid) user create --uid=zone.user --display-name="Zone User" --access-key=${access_key} --secret=${secret} --system
|
||||
|
||||
x $(rgw_admin $zgid $cid) period update --commit
|
||||
}
|
||||
|
||||
function init_zone_in_existing_zg {
|
||||
[ $# -ne 9 ] && echo "init_zone_in_existing_zg() needs 9 params" && exit 1
|
||||
|
||||
zgid=$1
|
||||
cid=$2
|
||||
realm=$3
|
||||
zg=$4
|
||||
zone=$5
|
||||
master_zg_zone1_port=$6
|
||||
endpoints=$7
|
||||
|
||||
access_key=$8
|
||||
secret=$9
|
||||
|
||||
x $(rgw_admin $zgid $cid) realm pull --url=http://localhost:$master_zg_zone1_port --access-key=${access_key} --secret=${secret} --default
|
||||
x $(rgw_admin $zgid $cid) zonegroup default --rgw-zonegroup=$zg
|
||||
x $(rgw_admin $zgid $cid) zone create --rgw-zonegroup=$zg --rgw-zone=$zone --access-key=${access_key} --secret=${secret} --endpoints=$endpoints
|
||||
x $(rgw_admin $zgid $cid) period update --commit --url=http://localhost:$master_zg_zone1_port --access-key=${access_key} --secret=${secret}
|
||||
x $(rgw_admin $cid) realm pull --url=http://localhost:$master_zg_zone1_port --access-key=${access_key} --secret=${secret} --default
|
||||
x $(rgw_admin $cid) zonegroup default --rgw-zonegroup=$zg
|
||||
x $(rgw_admin $cid) zone create --rgw-zonegroup=$zg --rgw-zone=$zone --access-key=${access_key} --secret=${secret} --endpoints=$endpoints
|
||||
x $(rgw_admin $cid) period update --commit --url=http://localhost:$master_zg_zone1_port --access-key=${access_key} --secret=${secret}
|
||||
}
|
||||
|
||||
function init_first_zone_in_slave_zg {
|
||||
[ $# -ne 9 ] && echo "init_first_zone_in_slave_zg() needs 9 params" && exit 1
|
||||
[ $# -ne 8 ] && echo "init_first_zone_in_slave_zg() needs 8 params" && exit 1
|
||||
|
||||
zgid=$1
|
||||
cid=$2
|
||||
realm=$3
|
||||
zg=$4
|
||||
zone=$5
|
||||
master_zg_zone1_port=$6
|
||||
endpoints=$7
|
||||
cid=$1
|
||||
realm=$2
|
||||
zg=$3
|
||||
zone=$4
|
||||
master_zg_zone1_port=$5
|
||||
endpoints=$6
|
||||
|
||||
access_key=$8
|
||||
secret=$9
|
||||
access_key=$7
|
||||
secret=$8
|
||||
|
||||
# create zonegroup, zone
|
||||
x $(rgw_admin $zgid $cid) realm pull --url=http://localhost:$master_zg_zone1_port --access-key=${access_key} --secret=${secret}
|
||||
x $(rgw_admin $zgid $cid) realm default --rgw-realm=$realm
|
||||
x $(rgw_admin $zgid $cid) zonegroup create --rgw-realm=$realm --rgw-zonegroup=$zg --endpoints=$endpoints --default
|
||||
x $(rgw_admin $zgid $cid) zonegroup default --rgw-zonegroup=$zg
|
||||
x $(rgw_admin $cid) realm pull --url=http://localhost:$master_zg_zone1_port --access-key=${access_key} --secret=${secret}
|
||||
x $(rgw_admin $cid) realm default --rgw-realm=$realm
|
||||
x $(rgw_admin $cid) zonegroup create --rgw-realm=$realm --rgw-zonegroup=$zg --endpoints=$endpoints --default
|
||||
x $(rgw_admin $cid) zonegroup default --rgw-zonegroup=$zg
|
||||
|
||||
x $(rgw_admin $zgid $cid) zone create --rgw-zonegroup=$zg --rgw-zone=$zone --access-key=${access_key} --secret=${secret} --endpoints=$endpoints
|
||||
x $(rgw_admin $zgid $cid) zone default --rgw-zone=$zone
|
||||
x $(rgw_admin $zgid $cid) zonegroup add --rgw-zonegroup=$zg --rgw-zone=$zone
|
||||
x $(rgw_admin $cid) zone create --rgw-zonegroup=$zg --rgw-zone=$zone --access-key=${access_key} --secret=${secret} --endpoints=$endpoints
|
||||
x $(rgw_admin $cid) zone default --rgw-zone=$zone
|
||||
x $(rgw_admin $cid) zonegroup add --rgw-zonegroup=$zg --rgw-zone=$zone
|
||||
|
||||
x $(rgw_admin $zgid $cid) user create --uid=zone.user --display-name="Zone User" --access-key=${access_key} --secret=${secret} --system
|
||||
x $(rgw_admin $zgid $cid) period update --commit --url=localhost:$master_zg_zone1_port --access-key=${access_key} --secret=${secret}
|
||||
x $(rgw_admin $cid) user create --uid=zone.user --display-name="Zone User" --access-key=${access_key} --secret=${secret} --system
|
||||
x $(rgw_admin $cid) period update --commit --url=localhost:$master_zg_zone1_port --access-key=${access_key} --secret=${secret}
|
||||
|
||||
}
|
||||
|
||||
function call_rgw_admin {
|
||||
zgid=$1
|
||||
cid=$2
|
||||
shift 2
|
||||
x $(rgw_admin $zgid $cid) "$@"
|
||||
cid=$1
|
||||
shift 1
|
||||
x $(rgw_admin $cid) "$@"
|
||||
}
|
||||
|
@ -5,10 +5,10 @@
|
||||
set -e
|
||||
|
||||
function get_metadata_sync_status {
|
||||
id=$1
|
||||
cid=$1
|
||||
realm=$2
|
||||
|
||||
meta_sync_status_json=`$(rgw_admin $zgid $cid) --rgw-realm=$realm metadata sync status`
|
||||
meta_sync_status_json=`$(rgw_admin $cid) --rgw-realm=$realm metadata sync status`
|
||||
|
||||
global_sync_status=$(json_extract sync_status.info.status $meta_sync_status_json)
|
||||
num_shards=$(json_extract sync_status.info.num_shards $meta_sync_status_json)
|
||||
@ -26,27 +26,25 @@ function get_metadata_sync_status {
|
||||
}
|
||||
|
||||
function get_metadata_log_status {
|
||||
zgid=$1
|
||||
master_id=$1
|
||||
cid=$1
|
||||
realm=$2
|
||||
|
||||
master_mdlog_status_json=`$(rgw_admin $zgid $master_id) --rgw_realm=$realm mdlog status`
|
||||
master_mdlog_status_json=`$(rgw_admin $cid) --rgw-realm=$realm mdlog status`
|
||||
master_meta_status=$(json_extract "" $master_mdlog_status_json)
|
||||
|
||||
eval master_status=$(project_python_array_field marker $master_meta_status)
|
||||
}
|
||||
|
||||
function wait_for_meta_sync {
|
||||
zgid=$1
|
||||
master_id=$2
|
||||
cid=$3
|
||||
realm=$4
|
||||
master_id=$1
|
||||
cid=$2
|
||||
realm=$3
|
||||
|
||||
get_metadata_log_status $zgid $master_id $realm
|
||||
get_metadata_log_status $master_id $realm
|
||||
echo "master_status=${master_status[*]}"
|
||||
|
||||
while true; do
|
||||
get_metadata_sync_status $zgid $cid $realm
|
||||
get_metadata_sync_status $cid $realm
|
||||
|
||||
echo "secondary_status=${secondary_status[*]}"
|
||||
|
||||
|
@ -14,23 +14,17 @@ set -e
|
||||
realm_name=earth
|
||||
zg=zg1
|
||||
|
||||
i=1
|
||||
while [ $i -le $num_clusters ]; do
|
||||
eval zone$i=${zg}-$i
|
||||
eval zone${i}_port=$((8000+$i))
|
||||
i=$((i+1))
|
||||
done
|
||||
|
||||
system_access_key="1234567890"
|
||||
system_secret="pencil"
|
||||
|
||||
# bring up first cluster
|
||||
x $(start_ceph_cluster 1 1) -n
|
||||
x $(start_ceph_cluster c1) -n
|
||||
|
||||
# create realm, zonegroup, zone, start rgw
|
||||
init_first_zone 1 $realm_name $zg $zone1 $zone1_port $system_access_key $system_secret
|
||||
init_first_zone c1 $realm_name $zg ${zg}-1 8001 $system_access_key $system_secret
|
||||
x $(rgw c1 8001)
|
||||
|
||||
output=`$(rgw_admin 1 1) realm get`
|
||||
output=`$(rgw_admin c1) realm get`
|
||||
|
||||
echo realm_status=$output
|
||||
|
||||
@ -38,18 +32,19 @@ echo realm_status=$output
|
||||
|
||||
i=2
|
||||
while [ $i -le $num_clusters ]; do
|
||||
x $(start_ceph_cluster 1 $i) -n
|
||||
|
||||
x $(start_ceph_cluster c$i) -n
|
||||
|
||||
# create new zone, start rgw
|
||||
zone_port=eval echo '$'zone${i}_port
|
||||
init_zone_in_existing_zg $i $realm_name $zg $zone1 $zone1_port $zone_port $system_access_key $system_secret
|
||||
init_zone_in_existing_zg c$i $realm_name $zg ${zg}-${i} 8001 $((8000+$i)) $zone_port $system_access_key $system_secret
|
||||
x $(rgw c$i $((8000+$i)))
|
||||
|
||||
i=$((i+1))
|
||||
done
|
||||
|
||||
i=2
|
||||
while [ $i -le $num_clusters ]; do
|
||||
wait_for_meta_sync 1 $i $realm_name
|
||||
wait_for_meta_sync c1 c$i $realm_name
|
||||
|
||||
i=$((i+1))
|
||||
done
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user