mirror of
https://github.com/ceph/ceph
synced 2024-12-22 03:22:00 +00:00
Merge pull request #14688 from cbodley/wip-rgw-multi-suite
qa/rgw: add multisite suite to configure and run multisite tests Reviewed-by: Orit Wasserman <owasserm@redhat.com>
This commit is contained in:
commit
cff53b246f
0
qa/suites/rgw/multisite/%
Normal file
0
qa/suites/rgw/multisite/%
Normal file
3
qa/suites/rgw/multisite/clusters.yaml
Normal file
3
qa/suites/rgw/multisite/clusters.yaml
Normal file
@ -0,0 +1,3 @@
|
||||
roles:
|
||||
- [c1.mon.a, c1.mgr.x, c1.osd.0, c1.osd.1, c1.osd.2, c1.client.0, c1.client.1]
|
||||
- [c2.mon.a, c2.mgr.x, c2.osd.0, c2.osd.1, c2.osd.2, c2.client.0, c2.client.1]
|
3
qa/suites/rgw/multisite/frontend/civetweb.yaml
Normal file
3
qa/suites/rgw/multisite/frontend/civetweb.yaml
Normal file
@ -0,0 +1,3 @@
|
||||
overrides:
|
||||
rgw:
|
||||
frontend: civetweb
|
8
qa/suites/rgw/multisite/overrides.yaml
Normal file
8
qa/suites/rgw/multisite/overrides.yaml
Normal file
@ -0,0 +1,8 @@
|
||||
overrides:
|
||||
ceph:
|
||||
wait-for-scrub: false
|
||||
conf:
|
||||
client:
|
||||
debug rgw: 20
|
||||
rgw:
|
||||
compression type: random
|
20
qa/suites/rgw/multisite/realms/three-zone.yaml
Normal file
20
qa/suites/rgw/multisite/realms/three-zone.yaml
Normal file
@ -0,0 +1,20 @@
|
||||
overrides:
|
||||
rgw-multisite:
|
||||
realm:
|
||||
name: test-realm
|
||||
is default: true
|
||||
zonegroups:
|
||||
- name: test-zonegroup
|
||||
is_master: true
|
||||
is_default: true
|
||||
endpoints: [c1.client.0]
|
||||
zones:
|
||||
- name: test-zone1
|
||||
is_master: true
|
||||
is_default: true
|
||||
endpoints: [c1.client.0]
|
||||
- name: test-zone2
|
||||
is_default: true
|
||||
endpoints: [c2.client.0]
|
||||
- name: test-zone3
|
||||
endpoints: [c1.client.1]
|
27
qa/suites/rgw/multisite/realms/two-zonegroup.yaml
Normal file
27
qa/suites/rgw/multisite/realms/two-zonegroup.yaml
Normal file
@ -0,0 +1,27 @@
|
||||
overrides:
|
||||
rgw-multisite:
|
||||
realm:
|
||||
name: test-realm
|
||||
is default: true
|
||||
zonegroups:
|
||||
- name: a
|
||||
is_master: true
|
||||
is_default: true
|
||||
endpoints: [c1.client.0]
|
||||
zones:
|
||||
- name: a1
|
||||
is_master: true
|
||||
is_default: true
|
||||
endpoints: [c1.client.0]
|
||||
- name: a2
|
||||
endpoints: [c1.client.1]
|
||||
- name: b
|
||||
is_default: true
|
||||
endpoints: [c2.client.0]
|
||||
zones:
|
||||
- name: b1
|
||||
is_master: true
|
||||
is_default: true
|
||||
endpoints: [c2.client.0]
|
||||
- name: b2
|
||||
endpoints: [c2.client.1]
|
17
qa/suites/rgw/multisite/tasks/test_multi.yaml
Normal file
17
qa/suites/rgw/multisite/tasks/test_multi.yaml
Normal file
@ -0,0 +1,17 @@
|
||||
tasks:
|
||||
- install:
|
||||
- ceph: {cluster: c1}
|
||||
- ceph: {cluster: c2}
|
||||
- rgw:
|
||||
c1.client.0:
|
||||
valgrind: [--tool=memcheck]
|
||||
c1.client.1:
|
||||
valgrind: [--tool=memcheck]
|
||||
c2.client.0:
|
||||
valgrind: [--tool=memcheck]
|
||||
c2.client.1:
|
||||
valgrind: [--tool=memcheck]
|
||||
- rgw-multisite:
|
||||
- rgw-multisite-tests:
|
||||
config:
|
||||
reconfigure_delay: 60
|
13
qa/suites/rgw/multisite/valgrind.yaml
Normal file
13
qa/suites/rgw/multisite/valgrind.yaml
Normal file
@ -0,0 +1,13 @@
|
||||
os_type: centos # xenial valgrind buggy, see http://tracker.ceph.com/issues/18126
|
||||
overrides:
|
||||
install:
|
||||
ceph:
|
||||
flavor: notcmalloc
|
||||
ceph:
|
||||
conf:
|
||||
global:
|
||||
osd heartbeat grace: 40
|
||||
valgrind:
|
||||
mon: [--tool=memcheck, --leak-check=full, --show-reachable=yes]
|
||||
osd: [--tool=memcheck]
|
||||
mds: [--tool=memcheck]
|
@ -3,8 +3,8 @@ overrides:
|
||||
conf:
|
||||
client:
|
||||
debug rgw: 20
|
||||
rgw compression type: random
|
||||
rgw crypt s3 kms encryption keys: testkey-1=YmluCmJvb3N0CmJvb3N0LWJ1aWxkCmNlcGguY29uZgo= testkey-2=aWIKTWFrZWZpbGUKbWFuCm91dApzcmMKVGVzdGluZwo=
|
||||
rgw crypt require ssl: false
|
||||
rgw:
|
||||
frontend: civetweb
|
||||
compression type: random
|
||||
|
@ -9,16 +9,13 @@ import os
|
||||
import errno
|
||||
import util.rgw as rgw_utils
|
||||
|
||||
from requests.packages.urllib3 import PoolManager
|
||||
from requests.packages.urllib3.util import Retry
|
||||
|
||||
from cStringIO import StringIO
|
||||
|
||||
from teuthology.orchestra import run
|
||||
from teuthology import misc as teuthology
|
||||
from teuthology import contextutil
|
||||
from teuthology.orchestra.run import CommandFailedError
|
||||
from util.rgw import rgwadmin, get_config_master_client, extract_zone_info, extract_region_info
|
||||
from util.rgw import rgwadmin, get_config_master_client, extract_zone_info, extract_region_info, wait_for_radosgw
|
||||
from util.rados import (rados, create_ec_pool,
|
||||
create_replicated_pool,
|
||||
create_cache_pool)
|
||||
@ -221,6 +218,8 @@ exec radosgw -f -n {client_with_id} --cluster {cluster_name} -k /etc/ceph/{clien
|
||||
finally:
|
||||
log.info('Removing apache config...')
|
||||
for client in clients_to_create_as:
|
||||
cluster_name, daemon_type, client_id = teuthology.split_role(client)
|
||||
client_with_cluster = '.'.join((cluster_name, daemon_type, client_id))
|
||||
ctx.cluster.only(client).run(
|
||||
args=[
|
||||
'rm',
|
||||
@ -263,8 +262,6 @@ def start_rgw(ctx, config, on_client = None, except_client = None):
|
||||
if client_config is None:
|
||||
client_config = {}
|
||||
log.info("rgw %s config is %s", client, client_config)
|
||||
id_ = client.split('.', 1)[1]
|
||||
log.info('client {client} is id {id}'.format(client=client, id=id_))
|
||||
cmd_prefix = [
|
||||
'sudo',
|
||||
'adjust-ulimits',
|
||||
@ -327,7 +324,7 @@ def start_rgw(ctx, config, on_client = None, except_client = None):
|
||||
if client_config.get('valgrind'):
|
||||
cmd_prefix = teuthology.get_valgrind_args(
|
||||
testdir,
|
||||
client,
|
||||
client_with_cluster,
|
||||
cmd_prefix,
|
||||
client_config.get('valgrind')
|
||||
)
|
||||
@ -336,7 +333,7 @@ def start_rgw(ctx, config, on_client = None, except_client = None):
|
||||
run_cmd.extend(rgw_cmd)
|
||||
|
||||
ctx.daemons.add_daemon(
|
||||
remote, 'rgw', client,
|
||||
remote, 'rgw', client_with_id,
|
||||
cluster=cluster_name,
|
||||
args=run_cmd,
|
||||
logger=log.getChild(client),
|
||||
@ -345,27 +342,28 @@ def start_rgw(ctx, config, on_client = None, except_client = None):
|
||||
)
|
||||
|
||||
# XXX: add_daemon() doesn't let us wait until radosgw finishes startup
|
||||
# use a connection pool with retry/backoff to poll each gateway until it starts listening
|
||||
http = PoolManager(retries=Retry(connect=8, backoff_factor=1))
|
||||
for client in clients_to_run:
|
||||
if client == except_client:
|
||||
continue
|
||||
host, port = ctx.rgw.role_endpoints[client]
|
||||
endpoint = 'http://{host}:{port}/'.format(host=host, port=port)
|
||||
log.info('Polling {client} until it starts accepting connections on {endpoint}'.format(client=client, endpoint=endpoint))
|
||||
http.request('GET', endpoint)
|
||||
wait_for_radosgw(endpoint)
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
teuthology.stop_daemons_of_type(ctx, 'rgw')
|
||||
for client in config.iterkeys():
|
||||
cluster_name, daemon_type, client_id = teuthology.split_role(client)
|
||||
client_with_id = daemon_type + '.' + client_id
|
||||
client_with_cluster = cluster_name + '.' + client_with_id
|
||||
ctx.daemons.get_daemon('rgw', client_with_id, cluster_name).stop()
|
||||
ctx.cluster.only(client).run(
|
||||
args=[
|
||||
'rm',
|
||||
'-f',
|
||||
'{tdir}/rgw.opslog.{client_with_cluster}.sock'.format(tdir=testdir,
|
||||
client_with_cluster=client_with_cluster),
|
||||
'{tdir}/rgw.opslog.{client}.sock'.format(tdir=testdir,
|
||||
client=client_with_cluster),
|
||||
],
|
||||
)
|
||||
|
||||
@ -727,26 +725,21 @@ def configure_multisite_regions_and_zones(ctx, config, regions, role_endpoints,
|
||||
yield
|
||||
|
||||
def configure_compression_in_default_zone(ctx, config):
|
||||
ceph_config = ctx.ceph['ceph'].conf.get('global', {})
|
||||
ceph_config.update(ctx.ceph['ceph'].conf.get('client', {}))
|
||||
for client, c_config in config.iteritems():
|
||||
ceph_config.update(ctx.ceph['ceph'].conf.get(client, {}))
|
||||
key = 'rgw compression type'
|
||||
if not key in ceph_config:
|
||||
log.debug('No compression setting to enable')
|
||||
break
|
||||
compression = ceph_config[key]
|
||||
log.debug('Configuring compression type = %s', compression)
|
||||
compression = ctx.rgw.compression_type
|
||||
if not compression:
|
||||
return
|
||||
|
||||
log.debug('Configuring compression type = %s', compression)
|
||||
for client, c_config in config.iteritems():
|
||||
# XXX: the 'default' zone and zonegroup aren't created until we run RGWRados::init_complete().
|
||||
# issue a 'radosgw-admin user list' command to trigger this
|
||||
rgwadmin(ctx, client, cmd=['user', 'list'], check_status=True)
|
||||
|
||||
rgwadmin(ctx, client,
|
||||
cmd=['zone', 'placement', 'modify', '--rgw-zone', 'default',
|
||||
'--placement-id', 'default-placement', '--compression', compression],
|
||||
'--placement-id', 'default-placement',
|
||||
'--compression', compression],
|
||||
check_status=True)
|
||||
break # only the first client
|
||||
|
||||
@contextlib.contextmanager
|
||||
def configure_regions_and_zones(ctx, config, regions, role_endpoints, realm):
|
||||
@ -1121,53 +1114,26 @@ def task(ctx, config):
|
||||
overrides = ctx.config.get('overrides', {})
|
||||
teuthology.deep_merge(config, overrides.get('rgw', {}))
|
||||
|
||||
regions = {}
|
||||
if 'regions' in config:
|
||||
# separate region info so only clients are keys in config
|
||||
regions = config['regions']
|
||||
del config['regions']
|
||||
regions = config.pop('regions', {})
|
||||
realm = config.pop('realm', None)
|
||||
|
||||
role_endpoints = assign_ports(ctx, config)
|
||||
ctx.rgw = argparse.Namespace()
|
||||
ctx.rgw.role_endpoints = role_endpoints
|
||||
# stash the region info for later, since it was deleted from the config
|
||||
# structure
|
||||
ctx.rgw.regions = regions
|
||||
|
||||
realm = None
|
||||
if 'realm' in config:
|
||||
# separate region info so only clients are keys in config
|
||||
realm = config['realm']
|
||||
del config['realm']
|
||||
ctx.rgw.realm = realm
|
||||
|
||||
ctx.rgw.ec_data_pool = False
|
||||
if 'ec-data-pool' in config:
|
||||
ctx.rgw.ec_data_pool = bool(config['ec-data-pool'])
|
||||
del config['ec-data-pool']
|
||||
ctx.rgw.erasure_code_profile = {}
|
||||
if 'erasure_code_profile' in config:
|
||||
ctx.rgw.erasure_code_profile = config['erasure_code_profile']
|
||||
del config['erasure_code_profile']
|
||||
ctx.rgw.default_idle_timeout = 30
|
||||
if 'default_idle_timeout' in config:
|
||||
ctx.rgw.default_idle_timeout = int(config['default_idle_timeout'])
|
||||
del config['default_idle_timeout']
|
||||
ctx.rgw.cache_pools = False
|
||||
if 'cache-pools' in config:
|
||||
ctx.rgw.cache_pools = bool(config['cache-pools'])
|
||||
del config['cache-pools']
|
||||
ctx.rgw.ec_data_pool = bool(config.pop('ec-data-pool', False))
|
||||
ctx.rgw.erasure_code_profile = config.pop('erasure_code_profile', {})
|
||||
ctx.rgw.default_idle_timeout = int(config.pop('default_idle_timeout', 30))
|
||||
ctx.rgw.cache_pools = bool(config.pop('cache-pools', False))
|
||||
ctx.rgw.frontend = config.pop('frontend', 'civetweb')
|
||||
|
||||
ctx.rgw.frontend = 'civetweb'
|
||||
if 'frontend' in config:
|
||||
ctx.rgw.frontend = config['frontend']
|
||||
del config['frontend']
|
||||
|
||||
ctx.rgw.use_fastcgi = True
|
||||
if "use_fcgi" in config:
|
||||
ctx.rgw.use_fastcgi = False
|
||||
ctx.rgw.use_fastcgi = not config.pop('use_fcgi', True)
|
||||
if not ctx.rgw.use_fastcgi:
|
||||
log.info("Using mod_proxy_fcgi instead of mod_fastcgi...")
|
||||
del config['use_fcgi']
|
||||
|
||||
ctx.rgw.compression_type = config.pop('compression type', None)
|
||||
|
||||
subtasks = [
|
||||
lambda: create_nonregion_pools(
|
||||
|
1
qa/tasks/rgw_multi
Symbolic link
1
qa/tasks/rgw_multi
Symbolic link
@ -0,0 +1 @@
|
||||
../../src/test/rgw/rgw_multi
|
417
qa/tasks/rgw_multisite.py
Normal file
417
qa/tasks/rgw_multisite.py
Normal file
@ -0,0 +1,417 @@
|
||||
"""
|
||||
rgw multisite configuration routines
|
||||
"""
|
||||
import argparse
|
||||
import contextlib
|
||||
import logging
|
||||
import random
|
||||
import string
|
||||
from copy import deepcopy
|
||||
from util.rgw import rgwadmin, wait_for_radosgw
|
||||
from util.rados import create_ec_pool, create_replicated_pool
|
||||
from rgw_multi import multisite
|
||||
|
||||
from teuthology.orchestra import run
|
||||
from teuthology import misc
|
||||
from teuthology.exceptions import ConfigError
|
||||
from teuthology.task import Task
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class RGWMultisite(Task):
|
||||
"""
|
||||
Performs rgw multisite configuration to match the given realm definition.
|
||||
|
||||
- rgw-multisite:
|
||||
realm:
|
||||
name: test-realm
|
||||
is_default: true
|
||||
|
||||
List one or more zonegroup definitions. These are provided as json
|
||||
input to `radosgw-admin zonegroup set`, with the exception of these keys:
|
||||
|
||||
* 'is_master' is passed on the command line as --master
|
||||
* 'is_default' is passed on the command line as --default
|
||||
* 'endpoints' given as client names are replaced with actual endpoints
|
||||
|
||||
zonegroups:
|
||||
- name: test-zonegroup
|
||||
api_name: test-api
|
||||
is_master: true
|
||||
is_default: true
|
||||
endpoints: [c1.client.0]
|
||||
|
||||
List each of the zones to be created in this zonegroup.
|
||||
|
||||
zones:
|
||||
- name: test-zone1
|
||||
is_master: true
|
||||
is_default: true
|
||||
endpoints: [c1.client.0]
|
||||
- name: test-zone2
|
||||
is_default: true
|
||||
endpoints: [c2.client.0]
|
||||
|
||||
A complete example:
|
||||
|
||||
tasks:
|
||||
- install:
|
||||
- ceph: {cluster: c1}
|
||||
- ceph: {cluster: c2}
|
||||
- rgw:
|
||||
c1.client.0:
|
||||
c2.client.0:
|
||||
- rgw-multisite:
|
||||
realm:
|
||||
name: test-realm
|
||||
is_default: true
|
||||
zonegroups:
|
||||
- name: test-zonegroup
|
||||
is_master: true
|
||||
is_default: true
|
||||
zones:
|
||||
- name: test-zone1
|
||||
is_master: true
|
||||
is_default: true
|
||||
endpoints: [c1.client.0]
|
||||
- name: test-zone2
|
||||
is_default: true
|
||||
endpoints: [c2.client.0]
|
||||
|
||||
"""
|
||||
def __init__(self, ctx, config):
|
||||
super(RGWMultisite, self).__init__(ctx, config)
|
||||
|
||||
def setup(self):
|
||||
super(RGWMultisite, self).setup()
|
||||
|
||||
overrides = self.ctx.config.get('overrides', {})
|
||||
misc.deep_merge(self.config, overrides.get('rgw-multisite', {}))
|
||||
|
||||
if not self.ctx.rgw:
|
||||
raise ConfigError('rgw-multisite must run after the rgw task')
|
||||
role_endpoints = self.ctx.rgw.role_endpoints
|
||||
|
||||
# construct Clusters and Gateways for each client in the rgw task
|
||||
clusters, gateways = extract_clusters_and_gateways(self.ctx,
|
||||
role_endpoints)
|
||||
|
||||
# get the master zone and zonegroup configuration
|
||||
mz, mzg = extract_master_zone_zonegroup(self.config['zonegroups'])
|
||||
cluster1 = cluster_for_zone(clusters, mz)
|
||||
|
||||
# create the realm and period on the master zone's cluster
|
||||
log.info('creating realm..')
|
||||
realm = create_realm(cluster1, self.config['realm'])
|
||||
period = realm.current_period
|
||||
|
||||
creds = gen_credentials()
|
||||
|
||||
# create the master zonegroup and its master zone
|
||||
log.info('creating master zonegroup..')
|
||||
master_zonegroup = create_zonegroup(cluster1, gateways, period,
|
||||
deepcopy(mzg))
|
||||
period.master_zonegroup = master_zonegroup
|
||||
|
||||
log.info('creating master zone..')
|
||||
master_zone = create_zone(self.ctx, cluster1, gateways, creds,
|
||||
master_zonegroup, deepcopy(mz))
|
||||
master_zonegroup.master_zone = master_zone
|
||||
|
||||
period.update(master_zone, commit=True)
|
||||
restart_zone_gateways(master_zone) # restart with --rgw-zone
|
||||
|
||||
# create the admin user on the master zone
|
||||
log.info('creating admin user..')
|
||||
user_args = ['--display-name', 'Realm Admin', '--system']
|
||||
user_args += creds.credential_args()
|
||||
admin_user = multisite.User('realm-admin')
|
||||
admin_user.create(master_zone, user_args)
|
||||
|
||||
# process 'zonegroups'
|
||||
for zg_config in self.config['zonegroups']:
|
||||
zones_config = zg_config.pop('zones')
|
||||
|
||||
zonegroup = None
|
||||
for zone_config in zones_config:
|
||||
# get the cluster for this zone
|
||||
cluster = cluster_for_zone(clusters, zone_config)
|
||||
|
||||
if cluster != cluster1: # already created on master cluster
|
||||
log.info('pulling realm configuration to %s', cluster.name)
|
||||
realm.pull(cluster, master_zone.gateways[0], creds)
|
||||
|
||||
# use the first zone's cluster to create the zonegroup
|
||||
if not zonegroup:
|
||||
if zg_config['name'] == master_zonegroup.name:
|
||||
zonegroup = master_zonegroup
|
||||
else:
|
||||
log.info('creating zonegroup..')
|
||||
zonegroup = create_zonegroup(cluster, gateways,
|
||||
period, zg_config)
|
||||
|
||||
if zone_config['name'] == master_zone.name:
|
||||
# master zone was already created
|
||||
zone = master_zone
|
||||
else:
|
||||
# create the zone and commit the period
|
||||
log.info('creating zone..')
|
||||
zone = create_zone(self.ctx, cluster, gateways, creds,
|
||||
zonegroup, zone_config)
|
||||
period.update(zone, commit=True)
|
||||
|
||||
restart_zone_gateways(zone) # restart with --rgw-zone
|
||||
|
||||
# attach configuration to the ctx for other tasks
|
||||
self.ctx.rgw_multisite = argparse.Namespace()
|
||||
self.ctx.rgw_multisite.clusters = clusters
|
||||
self.ctx.rgw_multisite.gateways = gateways
|
||||
self.ctx.rgw_multisite.realm = realm
|
||||
self.ctx.rgw_multisite.admin_user = admin_user
|
||||
|
||||
log.info('rgw multisite configuration completed')
|
||||
|
||||
def end(self):
|
||||
del self.ctx.rgw_multisite
|
||||
|
||||
class Cluster(multisite.Cluster):
|
||||
""" Issues 'radosgw-admin' commands with the rgwadmin() helper """
|
||||
def __init__(self, ctx, name, client):
|
||||
super(Cluster, self).__init__()
|
||||
self.ctx = ctx
|
||||
self.name = name
|
||||
self.client = client
|
||||
|
||||
def admin(self, args = None, **kwargs):
|
||||
""" radosgw-admin command """
|
||||
args = args or []
|
||||
args += ['--cluster', self.name]
|
||||
args += ['--debug-rgw', '0']
|
||||
if kwargs.pop('read_only', False):
|
||||
args += ['--rgw-cache-enabled', 'false']
|
||||
kwargs['decode'] = False
|
||||
check_retcode = kwargs.pop('check_retcode', True)
|
||||
r, s = rgwadmin(self.ctx, self.client, args, **kwargs)
|
||||
if check_retcode:
|
||||
assert r == 0
|
||||
return s, r
|
||||
|
||||
class Gateway(multisite.Gateway):
|
||||
""" Controls a radosgw instance using its daemon """
|
||||
def __init__(self, role, remote, daemon, *args, **kwargs):
|
||||
super(Gateway, self).__init__(*args, **kwargs)
|
||||
self.role = role
|
||||
self.remote = remote
|
||||
self.daemon = daemon
|
||||
|
||||
def set_zone(self, zone):
|
||||
""" set the zone and add its args to the daemon's command line """
|
||||
assert self.zone is None, 'zone can only be set once'
|
||||
self.zone = zone
|
||||
# daemon.restart_with_args() would be perfect for this, except that
|
||||
# radosgw args likely include a pipe and redirect. zone arguments at
|
||||
# the end won't actually apply to radosgw
|
||||
args = self.daemon.command_kwargs.get('args', [])
|
||||
try:
|
||||
# insert zone args before the first |
|
||||
pipe = args.index(run.Raw('|'))
|
||||
args = args[0:pipe] + zone.zone_args() + args[pipe:]
|
||||
except ValueError, e:
|
||||
args += zone.zone_args()
|
||||
self.daemon.command_kwargs['args'] = args
|
||||
|
||||
def start(self, args = None):
|
||||
""" (re)start the daemon """
|
||||
self.daemon.restart()
|
||||
# wait until startup completes
|
||||
wait_for_radosgw(self.endpoint())
|
||||
|
||||
def stop(self):
|
||||
""" stop the daemon """
|
||||
self.daemon.stop()
|
||||
|
||||
def extract_clusters_and_gateways(ctx, role_endpoints):
|
||||
""" create cluster and gateway instances for all of the radosgw roles """
|
||||
clusters = {}
|
||||
gateways = {}
|
||||
for role, (host, port) in role_endpoints.iteritems():
|
||||
cluster_name, daemon_type, client_id = misc.split_role(role)
|
||||
# find or create the cluster by name
|
||||
cluster = clusters.get(cluster_name)
|
||||
if not cluster:
|
||||
clusters[cluster_name] = cluster = Cluster(ctx, cluster_name, role)
|
||||
# create a gateway for this daemon
|
||||
client_with_id = daemon_type + '.' + client_id # match format from rgw.py
|
||||
daemon = ctx.daemons.get_daemon('rgw', client_with_id, cluster_name)
|
||||
if not daemon:
|
||||
raise ConfigError('no daemon for role=%s cluster=%s type=rgw id=%s' % \
|
||||
(role, cluster_name, client_id))
|
||||
(remote,) = ctx.cluster.only(role).remotes.keys()
|
||||
gateways[role] = Gateway(role, remote, daemon, host, port, cluster)
|
||||
return clusters, gateways
|
||||
|
||||
def create_realm(cluster, config):
|
||||
""" create a realm from configuration and initialize its first period """
|
||||
realm = multisite.Realm(config['name'])
|
||||
args = []
|
||||
if config.get('is_default', False):
|
||||
args += ['--default']
|
||||
realm.create(cluster, args)
|
||||
realm.current_period = multisite.Period(realm)
|
||||
return realm
|
||||
|
||||
def extract_user_credentials(config):
|
||||
""" extract keys from configuration """
|
||||
return multisite.Credentials(config['access_key'], config['secret_key'])
|
||||
|
||||
def extract_master_zone(zonegroup_config):
|
||||
""" find and return the master zone definition """
|
||||
master = None
|
||||
for zone in zonegroup_config['zones']:
|
||||
if not zone.get('is_master', False):
|
||||
continue
|
||||
if master:
|
||||
raise ConfigError('zones %s and %s cannot both set \'is_master\'' % \
|
||||
(master['name'], zone['name']))
|
||||
master = zone
|
||||
# continue the loop so we can detect duplicates
|
||||
if not master:
|
||||
raise ConfigError('one zone must set \'is_master\' in zonegroup %s' % \
|
||||
zonegroup_config['name'])
|
||||
return master
|
||||
|
||||
def extract_master_zone_zonegroup(zonegroups_config):
|
||||
""" find and return the master zone and zonegroup definitions """
|
||||
master_zone, master_zonegroup = (None, None)
|
||||
for zonegroup in zonegroups_config:
|
||||
# verify that all zonegroups have a master zone set, even if they
|
||||
# aren't in the master zonegroup
|
||||
zone = extract_master_zone(zonegroup)
|
||||
if not zonegroup.get('is_master', False):
|
||||
continue
|
||||
if master_zonegroup:
|
||||
raise ConfigError('zonegroups %s and %s cannot both set \'is_master\'' % \
|
||||
(master_zonegroup['name'], zonegroup['name']))
|
||||
master_zonegroup = zonegroup
|
||||
master_zone = zone
|
||||
# continue the loop so we can detect duplicates
|
||||
if not master_zonegroup:
|
||||
raise ConfigError('one zonegroup must set \'is_master\'')
|
||||
return master_zone, master_zonegroup
|
||||
|
||||
def extract_zone_cluster_name(zone_config):
|
||||
""" return the cluster (must be common to all zone endpoints) """
|
||||
cluster_name = None
|
||||
endpoints = zone_config.get('endpoints')
|
||||
if not endpoints:
|
||||
raise ConfigError('zone %s missing \'endpoints\' list' % \
|
||||
zone_config['name'])
|
||||
for role in endpoints:
|
||||
name, _, _ = misc.split_role(role)
|
||||
if not cluster_name:
|
||||
cluster_name = name
|
||||
elif cluster_name != name:
|
||||
raise ConfigError('all zone %s endpoints must be in the same cluster' % \
|
||||
zone_config['name'])
|
||||
return cluster_name
|
||||
|
||||
def cluster_for_zone(clusters, zone_config):
|
||||
""" return the cluster entry for the given zone """
|
||||
name = extract_zone_cluster_name(zone_config)
|
||||
try:
|
||||
return clusters[name]
|
||||
except KeyError:
|
||||
raise ConfigError('no cluster %s found' % name)
|
||||
|
||||
def gen_access_key():
|
||||
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(16))
|
||||
|
||||
def gen_secret():
|
||||
return ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(32))
|
||||
|
||||
def gen_credentials():
|
||||
return multisite.Credentials(gen_access_key(), gen_secret())
|
||||
|
||||
def extract_gateway_endpoints(gateways, endpoints_config):
|
||||
""" return a list of gateway endpoints associated with the given roles """
|
||||
endpoints = []
|
||||
for role in endpoints_config:
|
||||
try:
|
||||
# replace role names with their gateway's endpoint
|
||||
endpoints.append(gateways[role].endpoint())
|
||||
except KeyError:
|
||||
raise ConfigError('no radosgw endpoint found for role %s' % role)
|
||||
return endpoints
|
||||
|
||||
def is_default_arg(config):
|
||||
return ['--default'] if config.pop('is_default', False) else []
|
||||
|
||||
def is_master_arg(config):
|
||||
return ['--master'] if config.pop('is_master', False) else []
|
||||
|
||||
def create_zonegroup(cluster, gateways, period, config):
|
||||
""" pass the zonegroup configuration to `zonegroup set` """
|
||||
config.pop('zones', None) # remove 'zones' from input to `zonegroup set`
|
||||
endpoints = config.get('endpoints')
|
||||
if endpoints:
|
||||
# replace client names with their gateway endpoints
|
||||
config['endpoints'] = extract_gateway_endpoints(gateways, endpoints)
|
||||
zonegroup = multisite.ZoneGroup(config['name'], period)
|
||||
# `zonegroup set` needs --default on command line, and 'is_master' in json
|
||||
args = is_default_arg(config)
|
||||
zonegroup.set(cluster, config, args)
|
||||
period.zonegroups.append(zonegroup)
|
||||
return zonegroup
|
||||
|
||||
def create_zone(ctx, cluster, gateways, creds, zonegroup, config):
|
||||
""" create a zone with the given configuration """
|
||||
zone = multisite.Zone(config['name'], zonegroup, cluster)
|
||||
|
||||
# collect Gateways for the zone's endpoints
|
||||
endpoints = config.get('endpoints')
|
||||
if not endpoints:
|
||||
raise ConfigError('no \'endpoints\' for zone %s' % config['name'])
|
||||
zone.gateways = [gateways[role] for role in endpoints]
|
||||
for gateway in zone.gateways:
|
||||
gateway.set_zone(zone)
|
||||
|
||||
# format the gateway endpoints
|
||||
endpoints = [g.endpoint() for g in zone.gateways]
|
||||
|
||||
args = is_default_arg(config)
|
||||
args += is_master_arg(config)
|
||||
args += creds.credential_args()
|
||||
if len(endpoints):
|
||||
args += ['--endpoints', ','.join(endpoints)]
|
||||
zone.create(cluster, args)
|
||||
zonegroup.zones.append(zone)
|
||||
|
||||
create_zone_pools(ctx, zone)
|
||||
if ctx.rgw.compression_type:
|
||||
configure_zone_compression(zone, ctx.rgw.compression_type)
|
||||
return zone
|
||||
|
||||
def create_zone_pools(ctx, zone):
|
||||
""" Create the data_pool for each placement type """
|
||||
gateway = zone.gateways[0]
|
||||
cluster = zone.cluster
|
||||
for pool_config in zone.data.get('placement_pools', []):
|
||||
pool_name = pool_config['val']['data_pool']
|
||||
if ctx.rgw.ec_data_pool:
|
||||
create_ec_pool(gateway.remote, pool_name, zone.name, 64,
|
||||
ctx.rgw.erasure_code_profile, cluster.name)
|
||||
else:
|
||||
create_replicated_pool(gateway.remote, pool_name, 64, cluster.name)
|
||||
|
||||
def configure_zone_compression(zone, compression):
|
||||
""" Set compression type in the zone's default-placement """
|
||||
zone.json_command(zone.cluster, 'placement', ['modify',
|
||||
'--placement-id', 'default-placement',
|
||||
'--compression', compression
|
||||
])
|
||||
|
||||
def restart_zone_gateways(zone):
|
||||
zone.stop()
|
||||
zone.start()
|
||||
|
||||
task = RGWMultisite
|
91
qa/tasks/rgw_multisite_tests.py
Normal file
91
qa/tasks/rgw_multisite_tests.py
Normal file
@ -0,0 +1,91 @@
|
||||
"""
|
||||
rgw multisite testing
|
||||
"""
|
||||
import logging
|
||||
import sys
|
||||
import nose.core
|
||||
import nose.config
|
||||
|
||||
from teuthology.exceptions import ConfigError
|
||||
from teuthology.task import Task
|
||||
from teuthology import misc
|
||||
|
||||
from rgw_multi import multisite, tests
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class RGWMultisiteTests(Task):
|
||||
"""
|
||||
Runs the rgw_multi tests against a multisite configuration created by the
|
||||
rgw-multisite task. Tests are run with nose, using any additional 'args'
|
||||
provided. Overrides for tests.Config can be set in 'config'.
|
||||
|
||||
- rgw-multisite-tests:
|
||||
args:
|
||||
- tasks.rgw_multi.tests:test_object_sync
|
||||
config:
|
||||
reconfigure_delay: 60
|
||||
|
||||
"""
|
||||
def __init__(self, ctx, config):
|
||||
super(RGWMultisiteTests, self).__init__(ctx, config)
|
||||
|
||||
def setup(self):
|
||||
super(RGWMultisiteTests, self).setup()
|
||||
|
||||
overrides = self.ctx.config.get('overrides', {})
|
||||
misc.deep_merge(self.config, overrides.get('rgw-multisite-tests', {}))
|
||||
|
||||
if not self.ctx.rgw_multisite:
|
||||
raise ConfigError('rgw-multisite-tests must run after the rgw-multisite task')
|
||||
realm = self.ctx.rgw_multisite.realm
|
||||
master_zone = realm.meta_master_zone()
|
||||
|
||||
# create the test user
|
||||
log.info('creating test user..')
|
||||
user = multisite.User('rgw-multisite-test-user')
|
||||
user.create(master_zone, ['--display-name', 'Multisite Test User',
|
||||
'--gen-access-key', '--gen-secret'])
|
||||
|
||||
config = self.config.get('config', {})
|
||||
tests.init_multi(realm, user, tests.Config(**config))
|
||||
tests.realm_meta_checkpoint(realm)
|
||||
|
||||
def begin(self):
|
||||
# extra arguments for nose can be passed as a string or list
|
||||
extra_args = self.config.get('args', [])
|
||||
if not isinstance(extra_args, list):
|
||||
extra_args = [extra_args]
|
||||
argv = [__name__] + extra_args
|
||||
|
||||
log.info("running rgw multisite tests on '%s' with args=%r",
|
||||
tests.__name__, extra_args)
|
||||
|
||||
# run nose tests in the rgw_multi.tests module
|
||||
conf = nose.config.Config(stream=get_log_stream(), verbosity=2)
|
||||
result = nose.run(defaultTest=tests.__name__, argv=argv, config=conf)
|
||||
if not result:
|
||||
raise RuntimeError('rgw multisite test failures')
|
||||
|
||||
def get_log_stream():
|
||||
""" return a log stream for nose output """
|
||||
# XXX: this is a workaround for IOErrors when nose writes to stderr,
|
||||
# copied from vstart_runner.py
|
||||
class LogStream(object):
|
||||
def __init__(self):
|
||||
self.buffer = ""
|
||||
|
||||
def write(self, data):
|
||||
self.buffer += data
|
||||
if "\n" in self.buffer:
|
||||
lines = self.buffer.split("\n")
|
||||
for line in lines[:-1]:
|
||||
log.info(line)
|
||||
self.buffer = lines[-1]
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
return LogStream()
|
||||
|
||||
task = RGWMultisiteTests
|
@ -2,6 +2,8 @@ from cStringIO import StringIO
|
||||
import logging
|
||||
import json
|
||||
import requests
|
||||
|
||||
from requests.packages.urllib3 import PoolManager
|
||||
from requests.packages.urllib3.util import Retry
|
||||
from urlparse import urlparse
|
||||
|
||||
@ -16,7 +18,7 @@ def multi_region_enabled(ctx):
|
||||
return 'radosgw_agent' in ctx
|
||||
|
||||
def rgwadmin(ctx, client, cmd, stdin=StringIO(), check_status=False,
|
||||
format='json'):
|
||||
format='json', decode=True, log_level=logging.DEBUG):
|
||||
log.info('rgwadmin: {client} : {cmd}'.format(client=client,cmd=cmd))
|
||||
testdir = teuthology.get_testdir(ctx)
|
||||
cluster_name, daemon_type, client_id = teuthology.split_role(client)
|
||||
@ -32,7 +34,7 @@ def rgwadmin(ctx, client, cmd, stdin=StringIO(), check_status=False,
|
||||
'--cluster', cluster_name,
|
||||
]
|
||||
pre.extend(cmd)
|
||||
log.info('rgwadmin: cmd=%s' % pre)
|
||||
log.log(log_level, 'rgwadmin: cmd=%s' % pre)
|
||||
(remote,) = ctx.cluster.only(client).remotes.iterkeys()
|
||||
proc = remote.run(
|
||||
args=pre,
|
||||
@ -43,14 +45,16 @@ def rgwadmin(ctx, client, cmd, stdin=StringIO(), check_status=False,
|
||||
)
|
||||
r = proc.exitstatus
|
||||
out = proc.stdout.getvalue()
|
||||
if not decode:
|
||||
return (r, out)
|
||||
j = None
|
||||
if not r and out != '':
|
||||
try:
|
||||
j = json.loads(out)
|
||||
log.info(' json result: %s' % j)
|
||||
log.log(log_level, ' json result: %s' % j)
|
||||
except ValueError:
|
||||
j = out
|
||||
log.info(' raw result: %s' % j)
|
||||
log.log(log_level, ' raw result: %s' % j)
|
||||
return (r, j)
|
||||
|
||||
def get_user_summary(out, user):
|
||||
@ -302,3 +306,12 @@ def get_config_master_client(ctx, config, regions):
|
||||
|
||||
return None
|
||||
|
||||
def wait_for_radosgw(url):
|
||||
""" poll the given url until it starts accepting connections
|
||||
|
||||
add_daemon() doesn't wait until radosgw finishes startup, so this is used
|
||||
to avoid racing with later tasks that expect radosgw to be up and listening
|
||||
"""
|
||||
# use a connection pool with retry/backoff to poll until it starts listening
|
||||
http = PoolManager(retries=Retry(connect=8, backoff_factor=1))
|
||||
http.request('GET', url)
|
||||
|
@ -1,4 +1,5 @@
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from cStringIO import StringIO
|
||||
import json
|
||||
|
||||
class Cluster:
|
||||
@ -65,14 +66,14 @@ class SystemObject:
|
||||
def json_command(self, cluster, cmd, args = None, **kwargs):
|
||||
""" run the given command, parse the output and return the resulting
|
||||
data and retcode """
|
||||
(s, r) = self.command(cluster, cmd, args or [], **kwargs)
|
||||
s, r = self.command(cluster, cmd, args or [], **kwargs)
|
||||
if r == 0:
|
||||
output = s.decode('utf-8')
|
||||
output = output[output.find('{'):] # trim extra output before json
|
||||
data = json.loads(output)
|
||||
self.load_from_json(data)
|
||||
self.data = data
|
||||
return (self.data, r)
|
||||
return self.data, r
|
||||
|
||||
# mixins for supported commands
|
||||
class Create(object):
|
||||
@ -84,7 +85,7 @@ class SystemObject:
|
||||
def delete(self, cluster, args = None, **kwargs):
|
||||
""" delete the object """
|
||||
# not json_command() because delete has no output
|
||||
(_, r) = self.command(cluster, 'delete', args, **kwargs)
|
||||
_, r = self.command(cluster, 'delete', args, **kwargs)
|
||||
if r == 0:
|
||||
self.data = None
|
||||
return r
|
||||
@ -195,20 +196,20 @@ class ZoneGroup(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, Sy
|
||||
def add(self, cluster, zone, args = None, **kwargs):
|
||||
""" add an existing zone to the zonegroup """
|
||||
args = zone.zone_arg() + (args or [])
|
||||
(data, r) = self.json_command(cluster, 'add', args, **kwargs)
|
||||
data, r = self.json_command(cluster, 'add', args, **kwargs)
|
||||
if r == 0:
|
||||
zone.zonegroup = self
|
||||
self.zones.append(zone)
|
||||
return (data, r)
|
||||
return data, r
|
||||
|
||||
def remove(self, cluster, zone, args = None, **kwargs):
|
||||
""" remove an existing zone from the zonegroup """
|
||||
args = zone.zone_arg() + (args or [])
|
||||
(data, r) = self.json_command(cluster, 'remove', args, **kwargs)
|
||||
data, r = self.json_command(cluster, 'remove', args, **kwargs)
|
||||
if r == 0:
|
||||
zone.zonegroup = None
|
||||
self.zones.remove(zone)
|
||||
return (data, r)
|
||||
return data, r
|
||||
|
||||
def realm(self):
|
||||
return self.period.realm if self.period else None
|
||||
|
@ -19,16 +19,28 @@ from nose.plugins.skip import SkipTest
|
||||
|
||||
from .multisite import Zone
|
||||
|
||||
class Config:
|
||||
""" test configuration """
|
||||
def __init__(self, **kwargs):
|
||||
# by default, wait up to 5 minutes before giving up on a sync checkpoint
|
||||
self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
|
||||
self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
|
||||
# allow some time for realm reconfiguration after changing master zone
|
||||
self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
|
||||
|
||||
# rgw multisite tests, written against the interfaces provided in rgw_multi.
|
||||
# these tests must be initialized and run by another module that provides
|
||||
# implementations of these interfaces by calling init_multi()
|
||||
realm = None
|
||||
user = None
|
||||
def init_multi(_realm, _user):
|
||||
config = None
|
||||
def init_multi(_realm, _user, _config=None):
|
||||
global realm
|
||||
realm = _realm
|
||||
global user
|
||||
user = _user
|
||||
global config
|
||||
config = _config or Config()
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@ -64,15 +76,7 @@ def mdlog_list(zone, period = None):
|
||||
def mdlog_autotrim(zone):
|
||||
zone.cluster.admin(['mdlog', 'autotrim'])
|
||||
|
||||
def meta_sync_status(zone):
|
||||
while True:
|
||||
cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
|
||||
meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
|
||||
if retcode == 0:
|
||||
break
|
||||
assert(retcode == 2) # ENOENT
|
||||
time.sleep(5)
|
||||
|
||||
def parse_meta_sync_status(meta_sync_status_json):
|
||||
meta_sync_status_json = meta_sync_status_json.decode('utf-8')
|
||||
log.debug('current meta sync status=%s', meta_sync_status_json)
|
||||
sync_status = json.loads(meta_sync_status_json)
|
||||
@ -97,6 +101,17 @@ def meta_sync_status(zone):
|
||||
|
||||
return period, realm_epoch, num_shards, markers
|
||||
|
||||
def meta_sync_status(zone):
|
||||
for _ in range(config.checkpoint_retries):
|
||||
cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
|
||||
meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
|
||||
if retcode == 0:
|
||||
return parse_meta_sync_status(meta_sync_status_json)
|
||||
assert(retcode == 2) # ENOENT
|
||||
time.sleep(config.checkpoint_delay)
|
||||
|
||||
assert False, 'failed to read metadata sync status for zone=%s' % zone.name
|
||||
|
||||
def meta_master_log_status(master_zone):
|
||||
cmd = ['mdlog', 'status'] + master_zone.zone_args()
|
||||
mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
|
||||
@ -134,7 +149,7 @@ def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
|
||||
|
||||
log.info('starting meta checkpoint for zone=%s', zone.name)
|
||||
|
||||
while True:
|
||||
for _ in range(config.checkpoint_retries):
|
||||
period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
|
||||
if realm_epoch < current_realm_epoch:
|
||||
log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
|
||||
@ -143,11 +158,11 @@ def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
|
||||
log.debug('log_status=%s', master_status)
|
||||
log.debug('sync_status=%s', sync_status)
|
||||
if compare_meta_status(zone, master_status, sync_status):
|
||||
break
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
log.info('finish meta checkpoint for zone=%s', zone.name)
|
||||
return
|
||||
|
||||
time.sleep(config.checkpoint_delay)
|
||||
assert False, 'failed meta checkpoint for zone=%s' % zone.name
|
||||
|
||||
def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
|
||||
if not meta_master_zone:
|
||||
@ -169,19 +184,7 @@ def realm_meta_checkpoint(realm):
|
||||
for zonegroup in realm.current_period.zonegroups:
|
||||
zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
|
||||
|
||||
def data_sync_status(target_zone, source_zone):
|
||||
if target_zone == source_zone:
|
||||
return None
|
||||
|
||||
while True:
|
||||
cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
|
||||
cmd += ['--source-zone', source_zone.name]
|
||||
data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
|
||||
if retcode == 0:
|
||||
break
|
||||
|
||||
assert(retcode == 2) # ENOENT
|
||||
|
||||
def parse_data_sync_status(data_sync_status_json):
|
||||
data_sync_status_json = data_sync_status_json.decode('utf-8')
|
||||
log.debug('current data sync status=%s', data_sync_status_json)
|
||||
sync_status = json.loads(data_sync_status_json)
|
||||
@ -199,6 +202,23 @@ def data_sync_status(target_zone, source_zone):
|
||||
|
||||
return (num_shards, markers)
|
||||
|
||||
def data_sync_status(target_zone, source_zone):
|
||||
if target_zone == source_zone:
|
||||
return None
|
||||
|
||||
for _ in range(config.checkpoint_retries):
|
||||
cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
|
||||
cmd += ['--source-zone', source_zone.name]
|
||||
data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
|
||||
if retcode == 0:
|
||||
return parse_data_sync_status(data_sync_status_json)
|
||||
|
||||
assert(retcode == 2) # ENOENT
|
||||
time.sleep(config.checkpoint_delay)
|
||||
|
||||
assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
|
||||
(target_zone.name, source_zone.name)
|
||||
|
||||
def bucket_sync_status(target_zone, source_zone, bucket_name):
|
||||
if target_zone == source_zone:
|
||||
return None
|
||||
@ -300,49 +320,54 @@ def zone_data_checkpoint(target_zone, source_zone):
|
||||
if target_zone == source_zone:
|
||||
return
|
||||
|
||||
log_status = data_source_log_status(source_zone)
|
||||
log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
|
||||
|
||||
while True:
|
||||
log_status = data_source_log_status(source_zone)
|
||||
for _ in range(config.checkpoint_retries):
|
||||
num_shards, sync_status = data_sync_status(target_zone, source_zone)
|
||||
|
||||
log.debug('log_status=%s', log_status)
|
||||
log.debug('sync_status=%s', sync_status)
|
||||
|
||||
if compare_data_status(target_zone, source_zone, log_status, sync_status):
|
||||
break
|
||||
log.info('finished data checkpoint for target_zone=%s source_zone=%s',
|
||||
target_zone.name, source_zone.name)
|
||||
return
|
||||
time.sleep(config.checkpoint_delay)
|
||||
|
||||
time.sleep(5)
|
||||
assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
|
||||
(target_zone.name, source_zone.name)
|
||||
|
||||
log.info('finished data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
|
||||
|
||||
def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
|
||||
if target_zone == source_zone:
|
||||
return
|
||||
|
||||
log_status = bucket_source_log_status(source_zone, bucket_name)
|
||||
log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
|
||||
|
||||
while True:
|
||||
log_status = bucket_source_log_status(source_zone, bucket_name)
|
||||
for _ in range(config.checkpoint_retries):
|
||||
sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
|
||||
|
||||
log.debug('log_status=%s', log_status)
|
||||
log.debug('sync_status=%s', sync_status)
|
||||
|
||||
if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
|
||||
break
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
|
||||
return
|
||||
|
||||
time.sleep(config.checkpoint_delay)
|
||||
|
||||
assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
|
||||
(target_zone.name, source_zone.name, bucket_name)
|
||||
|
||||
def set_master_zone(zone):
|
||||
zone.modify(zone.cluster, ['--master'])
|
||||
zonegroup = zone.zonegroup
|
||||
zonegroup.period.update(zone, commit=True)
|
||||
zonegroup.master_zone = zone
|
||||
# wait for reconfiguration, so that later metadata requests go to the new master
|
||||
time.sleep(5)
|
||||
log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
|
||||
time.sleep(config.reconfigure_delay)
|
||||
|
||||
def gen_bucket_name():
|
||||
global num_buckets
|
||||
@ -765,12 +790,6 @@ def test_zonegroup_remove():
|
||||
zonegroup.zones.append(zone)
|
||||
zonegroup.period.update(zone, commit=True)
|
||||
|
||||
# try to 'zone delete' the new zone from cluster 1
|
||||
# must fail with ENOENT because the zone is local to cluster 2
|
||||
retcode = zone.delete(c1, check_retcode=False)
|
||||
assert(retcode == 2) # ENOENT
|
||||
|
||||
# use 'zonegroup remove', expecting success
|
||||
zonegroup.remove(c1, zone)
|
||||
|
||||
# another 'zonegroup remove' should fail with ENOENT
|
||||
|
@ -152,6 +152,9 @@ def init(parse_args):
|
||||
'log_file': None,
|
||||
'file_log_level': 20,
|
||||
'tenant': None,
|
||||
'checkpoint_retries': 60,
|
||||
'checkpoint_delay': 5,
|
||||
'reconfigure_delay': 5,
|
||||
})
|
||||
try:
|
||||
path = os.environ['RGW_MULTI_TEST_CONF']
|
||||
@ -178,6 +181,9 @@ def init(parse_args):
|
||||
parser.add_argument('--log-file', type=str, default=cfg.get(section, 'log_file'))
|
||||
parser.add_argument('--file-log-level', type=int, default=cfg.getint(section, 'file_log_level'))
|
||||
parser.add_argument('--tenant', type=str, default=cfg.get(section, 'tenant'))
|
||||
parser.add_argument('--checkpoint-retries', type=int, default=cfg.getint(section, 'checkpoint_retries'))
|
||||
parser.add_argument('--checkpoint-delay', type=int, default=cfg.getint(section, 'checkpoint_delay'))
|
||||
parser.add_argument('--reconfigure-delay', type=int, default=cfg.getint(section, 'reconfigure_delay'))
|
||||
|
||||
argv = []
|
||||
|
||||
@ -297,7 +303,10 @@ def init(parse_args):
|
||||
if not bootstrap:
|
||||
period.get(c1)
|
||||
|
||||
init_multi(realm, user)
|
||||
config = Config(checkpoint_retries=args.checkpoint_retries,
|
||||
checkpoint_delay=args.checkpoint_delay,
|
||||
reconfigure_delay=args.reconfigure_delay)
|
||||
init_multi(realm, user, config)
|
||||
|
||||
def setup_module():
|
||||
init(False)
|
||||
|
Loading…
Reference in New Issue
Block a user