ceph/teuthology/task/ceph.py
Tommi Virtanen 14ddb419d6 Start splitting the ceph task into smaller units.
This lets them nicely have independent error handling;
an example case where that is desirable is that the
old code failed to archive the logs if one of the daemons
exited with an error.

This also means a config file might specify just some
of the smaller tasks, e.g. for a test that explicitly
wants to create operational error situations.

The diff looks big because of an indentation change,
use "git diff -b" for a smaller diff.
2011-06-16 10:39:26 -07:00

669 lines
22 KiB
Python

from cStringIO import StringIO
import contextlib
import logging
import os
import gevent
import tarfile
from teuthology import misc as teuthology
from teuthology import safepath
from teuthology import contextutil
from orchestra import run
log = logging.getLogger(__name__)
@contextlib.contextmanager
def ship_utilities(ctx, config):
assert config is None
FILES = ['daemon-helper']
for filename in FILES:
log.info('Shipping %r...', filename)
src = os.path.join(os.path.dirname(__file__), filename)
dst = os.path.join('/tmp/cephtest', filename)
with file(src, 'rb') as f:
for rem in ctx.cluster.remotes.iterkeys():
teuthology.write_file(
remote=rem,
path=dst,
data=f,
)
f.seek(0)
rem.run(
args=[
'chmod',
'a=rx',
'--',
dst,
],
)
try:
yield
finally:
log.info('Removing shipped files: %s...', ' '.join(FILES))
filenames = (
os.path.join('/tmp/cephtest', filename)
for filename in FILES
)
run.wait(
ctx.cluster.run(
args=[
'rm',
'-rf',
'--',
] + list(filenames),
wait=False,
),
)
@contextlib.contextmanager
def binaries(ctx, config):
log.info('Unpacking ceph binaries...')
sha1, ceph_bindir_url = teuthology.get_ceph_binary_url(
branch=config.get('branch'),
tag=config.get('tag'),
sha1=config.get('sha1'),
flavor=config.get('flavor'),
)
if ctx.archive is not None:
with file(os.path.join(ctx.archive, 'ceph-sha1'), 'w') as f:
f.write(sha1 + '\n')
ctx.cluster.run(
args=[
'install', '-d', '-m0755', '--', '/tmp/cephtest/binary',
run.Raw('&&'),
'uname', '-m',
run.Raw('|'),
'sed', '-e', 's/^/ceph./; s/$/.tgz/',
run.Raw('|'),
'wget',
'-nv',
'-O-',
'--base={url}'.format(url=ceph_bindir_url),
# need to use --input-file to make wget respect --base
'--input-file=-',
run.Raw('|'),
'tar', '-xzf', '-', '-C', '/tmp/cephtest/binary',
],
)
try:
yield
finally:
log.info('Removing ceph binaries...')
run.wait(
ctx.cluster.run(
args=[
'rm',
'-rf',
'--',
'/tmp/cephtest/binary',
],
wait=False,
),
)
@contextlib.contextmanager
def task(ctx, config):
"""
Set up and tear down a Ceph cluster.
For example::
tasks:
- ceph:
- interactive:
You can also specify what branch to run::
tasks:
- ceph:
branch: foo
Or a tag::
tasks:
- ceph:
tag: v0.42.13
Or a sha1::
tasks:
- ceph:
sha1: 1376a5ab0c89780eab39ffbbe436f6a6092314ed
To capture code coverage data, use::
tasks:
- ceph:
coverage: true
"""
if config is None:
config = {}
assert isinstance(config, dict), \
"task ceph only supports a dictionary for configuration"
flavor = None
daemon_signal = 'kill'
if config.get('coverage'):
log.info('Recording coverage for this run.')
flavor = 'gcov'
daemon_signal = 'term'
log.info('Checking for old test directory...')
processes = ctx.cluster.run(
args=[
'test', '!', '-e', '/tmp/cephtest',
],
wait=False,
)
failed = False
for proc in processes:
assert isinstance(proc.exitstatus, gevent.event.AsyncResult)
try:
proc.exitstatus.get()
except run.CommandFailedError:
log.error('Host %s has stale cephtest directory, check your lock and reboot to clean up.', proc.remote.shortname)
failed = True
if failed:
raise RuntimeError('Stale jobs detected, aborting.')
coverage_dir = '/tmp/cephtest/archive/coverage'
log.info('Creating directories...')
run.wait(
ctx.cluster.run(
args=[
'install', '-d', '-m0755', '--',
'/tmp/cephtest/archive',
'/tmp/cephtest/archive/log',
'/tmp/cephtest/archive/profiling-logger',
'/tmp/cephtest/data',
coverage_dir,
],
wait=False,
)
)
with contextutil.nested(
lambda: ship_utilities(ctx=ctx, config=None),
lambda: binaries(ctx=ctx, config=dict(
branch=config.get('branch'),
tag=config.get('tag'),
sha1=config.get('sha1'),
flavor=flavor,
)),
):
log.info('Writing configs...')
remotes_and_roles = ctx.cluster.remotes.items()
roles = [roles for (remote, roles) in remotes_and_roles]
ips = [host for (host, port) in (remote.ssh.get_transport().getpeername() for (remote, roles) in remotes_and_roles)]
conf = teuthology.skeleton_config(roles=roles, ips=ips)
conf_fp = StringIO()
conf.write(conf_fp)
conf_fp.seek(0)
writes = ctx.cluster.run(
args=[
'python',
'-c',
'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
'/tmp/cephtest/ceph.conf',
],
stdin=run.PIPE,
wait=False,
)
teuthology.feed_many_stdins_and_close(conf_fp, writes)
run.wait(writes)
log.info('Setting up mon.0...')
ctx.cluster.only('mon.0').run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/cauthtool',
'--create-keyring',
'/tmp/cephtest/ceph.keyring',
],
)
ctx.cluster.only('mon.0').run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/cauthtool',
'--gen-key',
'--name=mon.',
'/tmp/cephtest/ceph.keyring',
],
)
(mon0_remote,) = ctx.cluster.only('mon.0').remotes.keys()
teuthology.create_simple_monmap(
remote=mon0_remote,
conf=conf,
)
log.info('Creating admin key on mon.0...')
ctx.cluster.only('mon.0').run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/cauthtool',
'--gen-key',
'--name=client.admin',
'--set-uid=0',
'--cap', 'mon', 'allow *',
'--cap', 'osd', 'allow *',
'--cap', 'mds', 'allow',
'/tmp/cephtest/ceph.keyring',
],
)
log.info('Copying mon.0 info to all monitors...')
keyring = teuthology.get_file(
remote=mon0_remote,
path='/tmp/cephtest/ceph.keyring',
)
monmap = teuthology.get_file(
remote=mon0_remote,
path='/tmp/cephtest/monmap',
)
mons = ctx.cluster.only(teuthology.is_type('mon'))
mons_no_0 = mons.exclude('mon.0')
for rem in mons_no_0.remotes.iterkeys():
# copy mon key and initial monmap
log.info('Sending mon0 info to node {remote}'.format(remote=rem))
teuthology.write_file(
remote=rem,
path='/tmp/cephtest/ceph.keyring',
data=keyring,
)
teuthology.write_file(
remote=rem,
path='/tmp/cephtest/monmap',
data=monmap,
)
log.info('Setting up mon nodes...')
run.wait(
mons.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/osdmaptool',
'--clobber',
'--createsimple', '{num:d}'.format(
num=teuthology.num_instances_of_type(ctx.cluster, 'osd'),
),
'/tmp/cephtest/osdmap',
'--pg_bits', '2',
'--pgp_bits', '4',
],
wait=False,
),
)
for remote, roles_for_host in mons.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, 'mon'):
remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/cmon',
'--mkfs',
'-i', id_,
'-c', '/tmp/cephtest/ceph.conf',
'--monmap=/tmp/cephtest/monmap',
'--osdmap=/tmp/cephtest/osdmap',
'--keyring=/tmp/cephtest/ceph.keyring',
],
)
run.wait(
mons.run(
args=[
'rm',
'--',
'/tmp/cephtest/monmap',
'/tmp/cephtest/osdmap',
],
wait=False,
),
)
mon_daemons = {}
log.info('Starting mon daemons...')
for remote, roles_for_host in mons.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, 'mon'):
proc = remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/daemon-helper',
daemon_signal,
'/tmp/cephtest/binary/usr/local/bin/cmon',
'-f',
'-i', id_,
'-c', '/tmp/cephtest/ceph.conf',
],
logger=log.getChild('mon.{id}'.format(id=id_)),
stdin=run.PIPE,
wait=False,
)
mon_daemons[id_] = proc
log.info('Setting up osd nodes...')
osds = ctx.cluster.only(teuthology.is_type('osd'))
for remote, roles_for_host in osds.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, 'osd'):
remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/cauthtool',
'--create-keyring',
'--gen-key',
'--name=osd.{id}'.format(id=id_),
'/tmp/cephtest/data/osd.{id}.keyring'.format(id=id_),
],
)
log.info('Setting up mds nodes...')
mdss = ctx.cluster.only(teuthology.is_type('mds'))
for remote, roles_for_host in mdss.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, 'mds'):
remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/cauthtool',
'--create-keyring',
'--gen-key',
'--name=mds.{id}'.format(id=id_),
'/tmp/cephtest/data/mds.{id}.keyring'.format(id=id_),
],
)
log.info('Setting up client nodes...')
clients = ctx.cluster.only(teuthology.is_type('client'))
for remote, roles_for_host in clients.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, 'client'):
remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/cauthtool',
'--create-keyring',
'--gen-key',
# TODO this --name= is not really obeyed, all unknown "types" are munged to "client"
'--name=client.{id}'.format(id=id_),
'/tmp/cephtest/data/client.{id}.keyring'.format(id=id_),
],
)
log.info('Reading keys from all nodes...')
keys = []
for remote, roles_for_host in ctx.cluster.remotes.iteritems():
for type_ in ['osd','mds','client']:
for id_ in teuthology.roles_of_type(roles_for_host, type_):
data = teuthology.get_file(
remote=remote,
path='/tmp/cephtest/data/{type}.{id}.keyring'.format(
type=type_,
id=id_,
),
)
keys.append((type_, id_, data))
log.info('Adding keys to mon.0...')
for type_, id_, data in keys:
teuthology.write_file(
remote=mon0_remote,
path='/tmp/cephtest/temp.keyring',
data=data,
)
mon0_remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/cauthtool',
'/tmp/cephtest/temp.keyring',
'--name={type}.{id}'.format(
type=type_,
id=id_,
),
] + list(teuthology.generate_caps(type_)),
)
mon0_remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/ceph',
'-c', '/tmp/cephtest/ceph.conf',
'-k', '/tmp/cephtest/ceph.keyring',
'-i', '/tmp/cephtest/temp.keyring',
'auth',
'add',
'{type}.{id}'.format(
type=type_,
id=id_,
),
],
)
log.info('Setting max_mds...')
# TODO where does this belong?
mon0_remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/ceph',
'-c', '/tmp/cephtest/ceph.conf',
'-k', '/tmp/cephtest/ceph.keyring',
'mds',
'set_max_mds',
'{num_mds:d}'.format(
num_mds=teuthology.num_instances_of_type(ctx.cluster, 'mds'),
),
],
)
log.info('Running mkfs on osd nodes...')
for remote, roles_for_host in osds.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, 'osd'):
remote.run(
args=[
'mkdir',
os.path.join('/tmp/cephtest/data', 'osd.{id}.data'.format(id=id_)),
],
)
remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/binary/usr/local/bin/cosd',
'--mkfs',
'-i', id_,
'-c', '/tmp/cephtest/ceph.conf',
],
)
osd_daemons = {}
log.info('Starting osd daemons...')
for remote, roles_for_host in osds.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, 'osd'):
proc = remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/daemon-helper',
daemon_signal,
'/tmp/cephtest/binary/usr/local/bin/cosd',
'-f',
'-i', id_,
'-c', '/tmp/cephtest/ceph.conf',
],
logger=log.getChild('osd.{id}'.format(id=id_)),
stdin=run.PIPE,
wait=False,
)
osd_daemons[id_] = proc
mds_daemons = {}
log.info('Starting mds daemons...')
for remote, roles_for_host in mdss.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, 'mds'):
proc = remote.run(
args=[
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
coverage_dir,
'/tmp/cephtest/daemon-helper',
daemon_signal,
'/tmp/cephtest/binary/usr/local/bin/cmds',
'-f',
'-i', id_,
'-c', '/tmp/cephtest/ceph.conf',
],
logger=log.getChild('mds.{id}'.format(id=id_)),
stdin=run.PIPE,
wait=False,
)
mds_daemons[id_] = proc
log.info('Waiting until ceph is healthy...')
teuthology.wait_until_healthy(
remote=mon0_remote,
)
try:
yield
finally:
log.info('Shutting down mds daemons...')
for id_, proc in mds_daemons.iteritems():
proc.stdin.close()
log.info('Shutting down osd daemons...')
for id_, proc in osd_daemons.iteritems():
proc.stdin.close()
log.info('Shutting down mon daemons...')
for id_, proc in mon_daemons.iteritems():
proc.stdin.close()
run.wait(mds_daemons.itervalues())
run.wait(osd_daemons.itervalues())
run.wait(mon_daemons.itervalues())
log.info('Removing uninteresting files...')
run.wait(
ctx.cluster.run(
args=[
'rm',
'-rf',
'--',
'/tmp/cephtest/ceph.conf',
'/tmp/cephtest/ceph.keyring',
'/tmp/cephtest/temp.keyring',
'/tmp/cephtest/data',
],
wait=False,
),
)
if ctx.archive is not None:
log.info('Compressing logs...')
run.wait(
ctx.cluster.run(
args=[
'find',
'/tmp/cephtest/archive/log',
'-name',
'*.log',
'-print0',
run.Raw('|'),
'xargs',
'-0',
'--no-run-if-empty',
'--',
'bzip2',
'-9',
'--',
],
wait=False,
),
)
log.info('Transferring archived files...')
logdir = os.path.join(ctx.archive, 'remote')
os.mkdir(logdir)
for remote in ctx.cluster.remotes.iterkeys():
path = os.path.join(logdir, remote.shortname)
os.mkdir(path)
log.debug('Transferring archived files from %s to %s', remote.shortname, path)
proc = remote.run(
args=[
'tar',
'c',
'-f', '-',
'-C', '/tmp/cephtest/archive',
'--',
'.',
],
stdout=run.PIPE,
wait=False,
)
tar = tarfile.open(mode='r|', fileobj=proc.stdout)
while True:
ti = tar.next()
if ti is None:
break
if ti.isdir():
# ignore silently; easier to just create leading dirs below
pass
elif ti.isfile():
sub = safepath.munge(ti.name)
safepath.makedirs(root=path, path=os.path.dirname(sub))
tar.makefile(ti, targetpath=os.path.join(path, sub))
else:
if ti.isdev():
type_ = 'device'
elif ti.issym():
type_ = 'symlink'
elif ti.islnk():
type_ = 'hard link'
else:
type_ = 'unknown'
log.info('Ignoring tar entry: %r type %r', ti.name, type_)
continue
proc.exitstatus.get()
log.info('Removing archived files...')
run.wait(
ctx.cluster.run(
args=[
'rm',
'-rf',
'--',
'/tmp/cephtest/archive',
],
wait=False,
),
)
log.info('Tidying up after the test...')
# if this fails, one of the above cleanups is flawed; don't
# just cram an rm -rf here
run.wait(
ctx.cluster.run(
args=[
'rmdir',
'--',
'/tmp/cephtest',
],
wait=False,
),
)