2011-05-31 20:51:48 +00:00
|
|
|
from cStringIO import StringIO
|
|
|
|
|
|
|
|
import os
|
|
|
|
import logging
|
|
|
|
import configobj
|
2011-07-02 01:15:52 +00:00
|
|
|
import getpass
|
|
|
|
import socket
|
2011-05-31 20:51:48 +00:00
|
|
|
import time
|
2011-06-02 16:09:08 +00:00
|
|
|
import urllib2
|
|
|
|
import urlparse
|
2011-07-07 18:43:35 +00:00
|
|
|
import yaml
|
2011-05-31 20:51:48 +00:00
|
|
|
|
2011-09-13 21:53:02 +00:00
|
|
|
from .orchestra import run
|
2011-06-16 01:06:57 +00:00
|
|
|
|
2011-05-31 20:51:48 +00:00
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
2011-06-10 00:05:55 +00:00
|
|
|
def get_ceph_binary_url(branch=None, tag=None, sha1=None, flavor=None):
|
2011-06-10 18:12:34 +00:00
|
|
|
if flavor is None:
|
|
|
|
flavor = ''
|
|
|
|
else:
|
|
|
|
# TODO hardcoding amd64 here for simplicity; clients will try
|
|
|
|
# to fetch the tarball matching their arch, non-x86_64 just
|
|
|
|
# won't find anything and the test will fail. trying to
|
|
|
|
# support cross-arch clusters is messy because nothing
|
|
|
|
# guarantees the same sha1 of "master" has been built for all
|
|
|
|
# of them. hoping for yagni.
|
|
|
|
flavor = '-{flavor}-amd64'.format(flavor=flavor)
|
|
|
|
BASE = 'http://ceph.newdream.net/gitbuilder{flavor}/output/'.format(flavor=flavor)
|
|
|
|
|
2011-06-10 00:05:55 +00:00
|
|
|
if sha1 is not None:
|
|
|
|
assert branch is None, "cannot set both sha1 and branch"
|
|
|
|
assert tag is None, "cannot set both sha1 and tag"
|
2011-06-09 21:08:45 +00:00
|
|
|
else:
|
2011-06-10 00:05:55 +00:00
|
|
|
# gitbuilder uses remote-style ref names for branches, mangled to
|
|
|
|
# have underscores instead of slashes; e.g. origin_master
|
|
|
|
if tag is not None:
|
|
|
|
ref = tag
|
|
|
|
assert branch is None, "cannot set both branch and tag"
|
|
|
|
else:
|
|
|
|
if branch is None:
|
|
|
|
branch = 'master'
|
2011-08-05 21:35:22 +00:00
|
|
|
ref = branch
|
2011-06-10 00:05:55 +00:00
|
|
|
|
|
|
|
sha1_url = urlparse.urljoin(BASE, 'ref/{ref}/sha1'.format(ref=ref))
|
|
|
|
log.debug('Translating ref to sha1 using url %s', sha1_url)
|
|
|
|
sha1_fp = urllib2.urlopen(sha1_url)
|
|
|
|
sha1 = sha1_fp.read().rstrip('\n')
|
|
|
|
sha1_fp.close()
|
2011-06-09 22:43:43 +00:00
|
|
|
|
|
|
|
log.debug('Using ceph sha1 %s', sha1)
|
2011-06-02 16:09:08 +00:00
|
|
|
bindir_url = urlparse.urljoin(BASE, 'sha1/{sha1}/'.format(sha1=sha1))
|
2011-06-09 21:02:44 +00:00
|
|
|
return (sha1, bindir_url)
|
2011-05-31 20:51:48 +00:00
|
|
|
|
|
|
|
def feed_many_stdins(fp, processes):
|
|
|
|
while True:
|
|
|
|
data = fp.read(8192)
|
|
|
|
if not data:
|
|
|
|
break
|
|
|
|
for proc in processes:
|
|
|
|
proc.stdin.write(data)
|
|
|
|
|
|
|
|
def feed_many_stdins_and_close(fp, processes):
|
|
|
|
feed_many_stdins(fp, processes)
|
|
|
|
for proc in processes:
|
|
|
|
proc.stdin.close()
|
|
|
|
|
|
|
|
def get_mons(roles, ips):
|
|
|
|
mons = {}
|
2012-01-06 21:36:54 +00:00
|
|
|
mon_ports = {}
|
2011-07-27 04:46:47 +00:00
|
|
|
mon_id = 0
|
2011-05-31 20:51:48 +00:00
|
|
|
for idx, roles in enumerate(roles):
|
|
|
|
for role in roles:
|
|
|
|
if not role.startswith('mon.'):
|
|
|
|
continue
|
2012-01-06 21:36:54 +00:00
|
|
|
if ips[idx] not in mon_ports:
|
|
|
|
mon_ports[ips[idx]] = 6789
|
|
|
|
else:
|
|
|
|
mon_ports[ips[idx]] += 1
|
2011-05-31 20:51:48 +00:00
|
|
|
addr = '{ip}:{port}'.format(
|
|
|
|
ip=ips[idx],
|
2012-01-06 21:36:54 +00:00
|
|
|
port=mon_ports[ips[idx]],
|
2011-05-31 20:51:48 +00:00
|
|
|
)
|
2011-07-27 18:45:20 +00:00
|
|
|
mon_id += 1
|
2011-05-31 20:51:48 +00:00
|
|
|
mons[role] = addr
|
|
|
|
assert mons
|
|
|
|
return mons
|
|
|
|
|
|
|
|
def generate_caps(type_):
|
|
|
|
defaults = dict(
|
|
|
|
osd=dict(
|
|
|
|
mon='allow *',
|
|
|
|
osd='allow *',
|
|
|
|
),
|
|
|
|
mds=dict(
|
|
|
|
mon='allow *',
|
|
|
|
osd='allow *',
|
|
|
|
mds='allow',
|
|
|
|
),
|
|
|
|
client=dict(
|
|
|
|
mon='allow rw',
|
|
|
|
osd='allow rwx pool=data,rbd',
|
|
|
|
mds='allow',
|
|
|
|
),
|
|
|
|
)
|
|
|
|
for subsystem, capability in defaults[type_].items():
|
|
|
|
yield '--cap'
|
|
|
|
yield subsystem
|
|
|
|
yield capability
|
|
|
|
|
|
|
|
def skeleton_config(roles, ips):
|
|
|
|
"""
|
|
|
|
Returns a ConfigObj that's prefilled with a skeleton config.
|
|
|
|
|
|
|
|
Use conf[section][key]=value or conf.merge to change it.
|
|
|
|
|
|
|
|
Use conf.write to write it out, override .filename first if you want.
|
|
|
|
"""
|
|
|
|
path = os.path.join(os.path.dirname(__file__), 'ceph.conf')
|
|
|
|
conf = configobj.ConfigObj(path, file_error=True)
|
|
|
|
mons = get_mons(roles=roles, ips=ips)
|
|
|
|
for role, addr in mons.iteritems():
|
|
|
|
conf.setdefault(role, {})
|
|
|
|
conf[role]['mon addr'] = addr
|
2011-07-27 17:04:37 +00:00
|
|
|
# set up standby mds's
|
|
|
|
for roles in roles:
|
|
|
|
for role in roles:
|
|
|
|
if role.startswith('mds.') and role.endswith('-s'):
|
|
|
|
conf.setdefault(role, {})
|
|
|
|
conf[role]['mds standby for name'] = role[:-2]
|
2011-05-31 20:51:48 +00:00
|
|
|
return conf
|
|
|
|
|
|
|
|
def roles_of_type(roles_for_host, type_):
|
|
|
|
prefix = '{type}.'.format(type=type_)
|
|
|
|
for name in roles_for_host:
|
|
|
|
if not name.startswith(prefix):
|
|
|
|
continue
|
|
|
|
id_ = name[len(prefix):]
|
|
|
|
yield id_
|
|
|
|
|
2011-06-16 23:07:59 +00:00
|
|
|
def all_roles_of_type(cluster, type_):
|
|
|
|
prefix = '{type}.'.format(type=type_)
|
|
|
|
for _, roles_for_host in cluster.remotes.iteritems():
|
|
|
|
for name in roles_for_host:
|
|
|
|
if not name.startswith(prefix):
|
|
|
|
continue
|
|
|
|
id_ = name[len(prefix):]
|
|
|
|
yield id_
|
|
|
|
|
2011-06-01 23:04:52 +00:00
|
|
|
def is_type(type_):
|
|
|
|
"""
|
|
|
|
Returns a matcher function for whether role is of type given.
|
|
|
|
"""
|
|
|
|
prefix = '{type}.'.format(type=type_)
|
|
|
|
def _is_type(role):
|
|
|
|
return role.startswith(prefix)
|
|
|
|
return _is_type
|
|
|
|
|
2011-06-03 21:47:44 +00:00
|
|
|
def num_instances_of_type(cluster, type_):
|
|
|
|
remotes_and_roles = cluster.remotes.items()
|
|
|
|
roles = [roles for (remote, roles) in remotes_and_roles]
|
2011-05-31 20:51:48 +00:00
|
|
|
prefix = '{type}.'.format(type=type_)
|
|
|
|
num = sum(sum(1 for role in hostroles if role.startswith(prefix)) for hostroles in roles)
|
|
|
|
return num
|
|
|
|
|
2011-06-01 23:04:52 +00:00
|
|
|
def create_simple_monmap(remote, conf):
|
2011-05-31 20:51:48 +00:00
|
|
|
"""
|
|
|
|
Writes a simple monmap based on current ceph.conf into <tmpdir>/monmap.
|
|
|
|
|
|
|
|
Assumes ceph_conf is up to date.
|
|
|
|
|
|
|
|
Assumes mon sections are named "mon.*", with the dot.
|
|
|
|
"""
|
|
|
|
def gen_addresses():
|
|
|
|
for section, data in conf.iteritems():
|
|
|
|
PREFIX = 'mon.'
|
|
|
|
if not section.startswith(PREFIX):
|
|
|
|
continue
|
|
|
|
name = section[len(PREFIX):]
|
|
|
|
addr = data['mon addr']
|
|
|
|
yield (name, addr)
|
|
|
|
|
|
|
|
addresses = list(gen_addresses())
|
|
|
|
assert addresses, "There are no monitors in config!"
|
|
|
|
log.debug('Ceph mon addresses: %s', addresses)
|
|
|
|
|
|
|
|
args = [
|
2011-06-17 23:00:39 +00:00
|
|
|
'/tmp/cephtest/enable-coredump',
|
2011-06-07 18:45:29 +00:00
|
|
|
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
|
2011-06-10 18:17:11 +00:00
|
|
|
'/tmp/cephtest/archive/coverage',
|
2011-05-31 20:51:48 +00:00
|
|
|
'/tmp/cephtest/binary/usr/local/bin/monmaptool',
|
|
|
|
'--create',
|
|
|
|
'--clobber',
|
|
|
|
]
|
|
|
|
for (name, addr) in addresses:
|
|
|
|
args.extend(('--add', name, addr))
|
|
|
|
args.extend([
|
|
|
|
'--print',
|
|
|
|
'/tmp/cephtest/monmap',
|
|
|
|
])
|
2011-06-01 23:04:52 +00:00
|
|
|
remote.run(
|
2011-05-31 20:51:48 +00:00
|
|
|
args=args,
|
|
|
|
)
|
|
|
|
|
2011-06-01 23:04:52 +00:00
|
|
|
def write_file(remote, path, data):
|
|
|
|
remote.run(
|
2011-05-31 20:51:48 +00:00
|
|
|
args=[
|
|
|
|
'python',
|
|
|
|
'-c',
|
|
|
|
'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
|
|
|
|
path,
|
|
|
|
],
|
|
|
|
stdin=data,
|
|
|
|
)
|
|
|
|
|
2011-06-20 20:19:08 +00:00
|
|
|
def sudo_write_file(remote, path, data):
|
|
|
|
remote.run(
|
|
|
|
args=[
|
|
|
|
'sudo',
|
|
|
|
'python',
|
|
|
|
'-c',
|
|
|
|
'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
|
|
|
|
path,
|
|
|
|
],
|
|
|
|
stdin=data,
|
|
|
|
)
|
|
|
|
|
2011-06-01 23:04:52 +00:00
|
|
|
def get_file(remote, path):
|
2011-05-31 20:51:48 +00:00
|
|
|
"""
|
|
|
|
Read a file from remote host into memory.
|
|
|
|
"""
|
2011-06-01 23:04:52 +00:00
|
|
|
proc = remote.run(
|
2011-05-31 20:51:48 +00:00
|
|
|
args=[
|
|
|
|
'cat',
|
|
|
|
'--',
|
|
|
|
path,
|
|
|
|
],
|
|
|
|
stdout=StringIO(),
|
|
|
|
)
|
|
|
|
data = proc.stdout.getvalue()
|
|
|
|
return data
|
|
|
|
|
2011-10-03 21:03:36 +00:00
|
|
|
def get_scratch_devices(remote):
|
|
|
|
"""
|
|
|
|
Read the scratch disk list from remote host
|
|
|
|
"""
|
|
|
|
devs = []
|
|
|
|
try:
|
|
|
|
file_data = get_file(remote, "/scratch_devs")
|
|
|
|
devs = file_data.split()
|
|
|
|
except:
|
|
|
|
devs = ['/dev/sdb']
|
|
|
|
|
|
|
|
retval = []
|
|
|
|
for dev in devs:
|
|
|
|
try:
|
|
|
|
remote.run(
|
|
|
|
args=[
|
|
|
|
'stat',
|
|
|
|
dev
|
|
|
|
]
|
|
|
|
)
|
|
|
|
retval.append(dev)
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
return retval
|
|
|
|
|
2011-06-01 23:04:52 +00:00
|
|
|
def wait_until_healthy(remote):
|
2011-05-31 20:51:48 +00:00
|
|
|
"""Wait until a Ceph cluster is healthy."""
|
|
|
|
while True:
|
2011-06-01 23:04:52 +00:00
|
|
|
r = remote.run(
|
2011-05-31 20:51:48 +00:00
|
|
|
args=[
|
2011-06-17 23:00:39 +00:00
|
|
|
'/tmp/cephtest/enable-coredump',
|
2011-06-07 18:45:29 +00:00
|
|
|
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
|
2011-06-10 18:17:11 +00:00
|
|
|
'/tmp/cephtest/archive/coverage',
|
2011-05-31 20:51:48 +00:00
|
|
|
'/tmp/cephtest/binary/usr/local/bin/ceph',
|
|
|
|
'-c', '/tmp/cephtest/ceph.conf',
|
|
|
|
'health',
|
|
|
|
'--concise',
|
|
|
|
],
|
|
|
|
stdout=StringIO(),
|
|
|
|
logger=log.getChild('health'),
|
|
|
|
)
|
|
|
|
out = r.stdout.getvalue()
|
|
|
|
log.debug('Ceph health: %s', out.rstrip('\n'))
|
|
|
|
if out.split(None, 1)[0] == 'HEALTH_OK':
|
|
|
|
break
|
|
|
|
time.sleep(1)
|
|
|
|
|
2011-06-01 23:04:52 +00:00
|
|
|
def wait_until_fuse_mounted(remote, fuse, mountpoint):
|
2011-05-31 20:51:48 +00:00
|
|
|
while True:
|
2011-06-01 23:04:52 +00:00
|
|
|
proc = remote.run(
|
2011-05-31 20:51:48 +00:00
|
|
|
args=[
|
|
|
|
'stat',
|
|
|
|
'--file-system',
|
|
|
|
'--printf=%T\n',
|
|
|
|
'--',
|
|
|
|
mountpoint,
|
|
|
|
],
|
|
|
|
stdout=StringIO(),
|
|
|
|
)
|
|
|
|
fstype = proc.stdout.getvalue().rstrip('\n')
|
|
|
|
if fstype == 'fuseblk':
|
|
|
|
break
|
|
|
|
log.debug('cfuse not yet mounted, got fs type {fstype!r}'.format(fstype=fstype))
|
|
|
|
|
|
|
|
# it shouldn't have exited yet; exposes some trivial problems
|
|
|
|
assert not fuse.exitstatus.ready()
|
|
|
|
|
|
|
|
time.sleep(5)
|
|
|
|
log.info('cfuse is mounted on %s', mountpoint)
|
2011-06-16 01:06:57 +00:00
|
|
|
|
2011-08-10 17:37:04 +00:00
|
|
|
def reconnect(ctx, timeout):
|
|
|
|
"""
|
|
|
|
Connect to all the machines in ctx.cluster.
|
|
|
|
|
|
|
|
Presumably, some of them won't be up. Handle this
|
|
|
|
by waiting for them, unless the wait time exceeds
|
|
|
|
the specified timeout.
|
|
|
|
|
|
|
|
ctx needs to contain the cluster of machines you
|
|
|
|
wish it to try and connect to, as well as a config
|
|
|
|
holding the ssh keys for each of them. As long as it
|
|
|
|
contains this data, you can construct a context
|
|
|
|
that is a subset of your full cluster.
|
|
|
|
"""
|
|
|
|
log.info('Re-opening connections...')
|
|
|
|
starttime = time.time()
|
|
|
|
need_reconnect = ctx.cluster.remotes.keys()
|
2011-12-30 20:23:28 +00:00
|
|
|
while need_reconnect:
|
|
|
|
for remote in need_reconnect:
|
2011-08-10 17:37:04 +00:00
|
|
|
try:
|
2011-10-07 00:18:35 +00:00
|
|
|
log.info('trying to connect to %s', remote.name)
|
2011-09-13 21:53:02 +00:00
|
|
|
from .orchestra import connection
|
2011-08-10 17:37:04 +00:00
|
|
|
remote.ssh = connection.connect(
|
|
|
|
user_at_host=remote.name,
|
|
|
|
host_key=ctx.config['targets'][remote.name],
|
2011-11-03 20:08:39 +00:00
|
|
|
keep_alive=True,
|
2011-08-10 17:37:04 +00:00
|
|
|
)
|
2011-12-30 20:23:28 +00:00
|
|
|
except Exception:
|
2011-10-07 00:18:35 +00:00
|
|
|
if time.time() - starttime > timeout:
|
|
|
|
raise
|
2011-08-10 17:37:04 +00:00
|
|
|
else:
|
|
|
|
need_reconnect.remove(remote)
|
|
|
|
|
|
|
|
log.debug('waited {elapsed}'.format(elapsed=str(time.time() - starttime)))
|
|
|
|
time.sleep(1)
|
|
|
|
|
2011-06-16 01:06:57 +00:00
|
|
|
def write_secret_file(remote, role, filename):
|
|
|
|
remote.run(
|
|
|
|
args=[
|
2011-06-17 23:00:39 +00:00
|
|
|
'/tmp/cephtest/enable-coredump',
|
2011-06-16 01:06:57 +00:00
|
|
|
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
|
|
|
|
'/tmp/cephtest/archive/coverage',
|
2011-09-23 15:57:18 +00:00
|
|
|
'/tmp/cephtest/binary/usr/local/bin/ceph-authtool',
|
2011-06-16 01:06:57 +00:00
|
|
|
'--name={role}'.format(role=role),
|
|
|
|
'--print-key',
|
|
|
|
'/tmp/cephtest/data/{role}.keyring'.format(role=role),
|
|
|
|
run.Raw('>'),
|
|
|
|
filename,
|
|
|
|
],
|
|
|
|
)
|
2011-06-20 19:12:11 +00:00
|
|
|
|
2011-08-09 20:23:58 +00:00
|
|
|
def get_clients(ctx, roles):
|
|
|
|
for role in roles:
|
|
|
|
assert isinstance(role, basestring)
|
|
|
|
PREFIX = 'client.'
|
|
|
|
assert role.startswith(PREFIX)
|
|
|
|
id_ = role[len(PREFIX):]
|
|
|
|
(remote,) = ctx.cluster.only(role).remotes.iterkeys()
|
|
|
|
yield (id_, remote)
|
2011-07-02 01:15:52 +00:00
|
|
|
|
|
|
|
def get_user():
|
|
|
|
return getpass.getuser() + '@' + socket.gethostname()
|
2011-07-07 18:43:35 +00:00
|
|
|
|
|
|
|
def read_config(ctx):
|
|
|
|
filename = os.path.join(os.environ['HOME'], '.teuthology.yaml')
|
|
|
|
ctx.teuthology_config = {}
|
|
|
|
with file(filename) as f:
|
|
|
|
g = yaml.safe_load_all(f)
|
|
|
|
for new in g:
|
|
|
|
ctx.teuthology_config.update(new)
|
2011-08-31 20:56:42 +00:00
|
|
|
|
2011-11-09 06:06:43 +00:00
|
|
|
def get_mon_names(ctx):
|
2011-08-31 20:56:42 +00:00
|
|
|
mons = []
|
|
|
|
for remote, roles in ctx.cluster.remotes.items():
|
|
|
|
for role in roles:
|
|
|
|
if not role.startswith('mon.'):
|
|
|
|
continue
|
|
|
|
mons.append(role)
|
2011-11-09 06:06:43 +00:00
|
|
|
return mons
|
|
|
|
|
|
|
|
# return the "first" mon (alphanumerically, for lack of anything better)
|
|
|
|
def get_first_mon(ctx, config):
|
|
|
|
firstmon = sorted(get_mon_names(ctx))[0]
|
2011-08-31 20:56:42 +00:00
|
|
|
assert firstmon
|
|
|
|
return firstmon
|
2011-09-08 00:50:12 +00:00
|
|
|
|
|
|
|
def replace_all_with_clients(cluster, config):
|
|
|
|
"""
|
|
|
|
Converts a dict containing a key all to one
|
|
|
|
mapping all clients to the value of config['all']
|
|
|
|
"""
|
|
|
|
assert isinstance(config, dict), 'config must be a dict'
|
|
|
|
if 'all' not in config:
|
|
|
|
return config
|
|
|
|
norm_config = {}
|
|
|
|
assert len(config) == 1, \
|
|
|
|
"config cannot have 'all' and specific clients listed"
|
|
|
|
for client in all_roles_of_type(cluster, 'client'):
|
|
|
|
norm_config['client.{id}'.format(id=client)] = config['all']
|
|
|
|
return norm_config
|
2011-11-17 21:06:36 +00:00
|
|
|
|
|
|
|
def deep_merge(a, b):
|
|
|
|
if a is None:
|
|
|
|
return b
|
|
|
|
if b is None:
|
|
|
|
return a
|
|
|
|
if isinstance(a, list):
|
|
|
|
assert isinstance(b, list)
|
|
|
|
a.extend(b)
|
|
|
|
return a
|
|
|
|
if isinstance(a, dict):
|
|
|
|
assert isinstance(b, dict)
|
|
|
|
for (k, v) in b.iteritems():
|
|
|
|
if k in a:
|
|
|
|
a[k] = deep_merge(a[k], v)
|
|
|
|
else:
|
|
|
|
a[k] = v
|
|
|
|
return a
|
|
|
|
return b
|