from cStringIO import StringIO import logging import json import requests from requests.packages.urllib3.util import Retry from urlparse import urlparse from teuthology.orchestra.connection import split_user from teuthology import misc as teuthology log = logging.getLogger(__name__) # simple test to indicate if multi-region testing should occur def multi_region_enabled(ctx): # this is populated by the radosgw-agent task, seems reasonable to # use that as an indicator that we're testing multi-region sync return 'radosgw_agent' in ctx def rgwadmin(ctx, client, cmd, stdin=StringIO(), check_status=False, format='json'):'rgwadmin: {client} : {cmd}'.format(client=client,cmd=cmd)) testdir = teuthology.get_testdir(ctx) pre = [ 'adjust-ulimits', 'ceph-coverage'.format(tdir=testdir), '{tdir}/archive/coverage'.format(tdir=testdir), 'radosgw-admin'.format(tdir=testdir), '--log-to-stderr', '--format', format, '-n', client, ] pre.extend(cmd)'rgwadmin: cmd=%s' % pre) (remote,) = ctx.cluster.only(client).remotes.iterkeys() proc = args=pre, check_status=check_status, stdout=StringIO(), stderr=StringIO(), stdin=stdin, ) r = proc.exitstatus out = proc.stdout.getvalue() j = None if not r and out != '': try: j = json.loads(out)' json result: %s' % j) except ValueError: j = out' raw result: %s' % j) return (r, j) def get_user_summary(out, user): """Extract the summary for a given user""" user_summary = None for summary in out['summary']: if summary.get('user') == user: user_summary = summary if not user_summary: raise AssertionError('No summary info found for user: %s' % user) return user_summary def get_user_successful_ops(out, user): summary = out['summary'] if len(summary) == 0: return 0 return get_user_summary(out, user)['total']['successful_ops'] def get_zone_host_and_port(ctx, client, zone): _, region_map = rgwadmin(ctx, client, check_status=True, cmd=['-n', client, 'region-map', 'get']) regions = region_map['zonegroups'] for region in regions: for zone_info in region['val']['zones']: if zone_info['name'] == zone: endpoint = urlparse(zone_info['endpoints'][0]) host, port = endpoint.hostname, endpoint.port if port is None: port = 80 return host, port assert False, 'no endpoint for zone {zone} found'.format(zone=zone) def get_master_zone(ctx, client): _, region_map = rgwadmin(ctx, client, check_status=True, cmd=['-n', client, 'region-map', 'get']) regions = region_map['zonegroups'] for region in regions: is_master = (region['val']['is_master'] == "true")'region={r} is_master={ism}'.format(r=region, ism=is_master)) if not is_master: continue master_zone = region['val']['master_zone']'master_zone=%s' % master_zone) for zone_info in region['val']['zones']: if zone_info['name'] == master_zone: return master_zone'couldn\'t find master zone') return None def get_master_client(ctx, clients): master_zone = get_master_zone(ctx, clients[0]) # can use any client for this as long as system configured correctly if not master_zone: return None for client in clients: zone = zone_for_client(ctx, client) if zone == master_zone: return client return None def get_zone_system_keys(ctx, client, zone): _, zone_info = rgwadmin(ctx, client, check_status=True, cmd=['-n', client, 'zone', 'get', '--rgw-zone', zone]) system_key = zone_info['system_key'] return system_key['access_key'], system_key['secret_key'] def zone_for_client(ctx, client): ceph_config = ctx.ceph['ceph'].conf.get('global', {}) ceph_config.update(ctx.ceph['ceph'].conf.get('client', {})) ceph_config.update(ctx.ceph['ceph'].conf.get(client, {})) return ceph_config.get('rgw zone') def region_for_client(ctx, client): ceph_config = ctx.ceph['ceph'].conf.get('global', {}) ceph_config.update(ctx.ceph['ceph'].conf.get('client', {})) ceph_config.update(ctx.ceph['ceph'].conf.get(client, {})) return ceph_config.get('rgw region') def radosgw_data_log_window(ctx, client): ceph_config = ctx.ceph['ceph'].conf.get('global', {}) ceph_config.update(ctx.ceph['ceph'].conf.get('client', {})) ceph_config.update(ctx.ceph['ceph'].conf.get(client, {})) return ceph_config.get('rgw data log window', 30) def radosgw_agent_sync_data(ctx, agent_host, agent_port, full=False):'sync agent {h}:{p}'.format(h=agent_host, p=agent_port)) # use retry with backoff to tolerate slow startup of radosgw-agent s = requests.Session() s.mount('http://{addr}:{port}/'.format(addr = agent_host, port = agent_port), requests.adapters.HTTPAdapter(max_retries=Retry(total=5, backoff_factor=1))) method = "full" if full else "incremental" return'http://{addr}:{port}/data/{method}'.format(addr = agent_host, port = agent_port, method = method)) def radosgw_agent_sync_metadata(ctx, agent_host, agent_port, full=False):'sync agent {h}:{p}'.format(h=agent_host, p=agent_port)) # use retry with backoff to tolerate slow startup of radosgw-agent s = requests.Session() s.mount('http://{addr}:{port}/'.format(addr = agent_host, port = agent_port), requests.adapters.HTTPAdapter(max_retries=Retry(total=5, backoff_factor=1))) method = "full" if full else "incremental" return'http://{addr}:{port}/metadata/{method}'.format(addr = agent_host, port = agent_port, method = method)) def radosgw_agent_sync_all(ctx, full=False, data=False): if ctx.radosgw_agent.procs: for agent_client, c_config in ctx.radosgw_agent.config.iteritems(): zone_for_client(ctx, agent_client) sync_host, sync_port = get_sync_agent(ctx, agent_client) log.debug('doing a sync via {host1}'.format(host1=sync_host)) radosgw_agent_sync_metadata(ctx, sync_host, sync_port, full) if (data): radosgw_agent_sync_data(ctx, sync_host, sync_port, full) def host_for_role(ctx, role): for target, roles in zip(ctx.config['targets'].iterkeys(), ctx.config['roles']): if role in roles: _, host = split_user(target) return host def get_sync_agent(ctx, source): for task in ctx.config['tasks']: if 'radosgw-agent' not in task: continue for client, conf in task['radosgw-agent'].iteritems(): if conf['src'] == source: return host_for_role(ctx, source), conf.get('port', 8000) return None, None