mirror of
https://github.com/ceph/ceph
synced 2025-01-19 09:32:00 +00:00
0b451f9475
With long-running jobs like thrashing, ssh connections were timing out.
372 lines
12 KiB
Python
372 lines
12 KiB
Python
from cStringIO import StringIO
|
|
import contextlib
|
|
import gevent
|
|
import logging
|
|
import os
|
|
import tarfile
|
|
import time
|
|
import yaml
|
|
|
|
from teuthology import lock
|
|
from teuthology import misc as teuthology
|
|
from teuthology import safepath
|
|
from ..orchestra import run
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
@contextlib.contextmanager
|
|
def base(ctx, config):
|
|
log.info('Creating base directory...')
|
|
run.wait(
|
|
ctx.cluster.run(
|
|
args=[
|
|
'mkdir', '-m0755', '--',
|
|
'/tmp/cephtest',
|
|
],
|
|
wait=False,
|
|
)
|
|
)
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
log.info('Tidying up after the test...')
|
|
# if this fails, one of the earlier cleanups is flawed; don't
|
|
# just cram an rm -rf here
|
|
run.wait(
|
|
ctx.cluster.run(
|
|
args=[
|
|
'rmdir',
|
|
'--',
|
|
'/tmp/cephtest',
|
|
],
|
|
wait=False,
|
|
),
|
|
)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def lock_machines(ctx, config):
|
|
log.info('Locking machines...')
|
|
assert isinstance(config, int), 'config must be an integer'
|
|
|
|
while True:
|
|
# make sure there are enough machines up
|
|
machines = lock.list_locks(ctx)
|
|
if machines is None:
|
|
if ctx.block:
|
|
log.warn('error listing machines, trying again')
|
|
time.sleep(20)
|
|
continue
|
|
else:
|
|
assert 0, 'error listing machines'
|
|
num_up = len(filter(lambda machine: machine['up'], machines))
|
|
assert num_up >= config, 'not enough machines are up'
|
|
|
|
# make sure there are machines for non-automated jobs to run
|
|
num_free = len(filter(
|
|
lambda machine: machine['up'] and machine['locked'] == 0,
|
|
machines
|
|
))
|
|
if num_free < 6 and ctx.owner.startswith('scheduled'):
|
|
if ctx.block:
|
|
log.info('waiting for more machines to be free...')
|
|
time.sleep(10)
|
|
continue
|
|
else:
|
|
assert 0, 'not enough machines free'
|
|
|
|
newly_locked = lock.lock_many(ctx, config, ctx.owner)
|
|
if len(newly_locked) == config:
|
|
ctx.config['targets'] = newly_locked
|
|
log.info('\n '.join(['Locked targets:', ] + yaml.safe_dump(ctx.config['targets'], default_flow_style=False).splitlines()))
|
|
break
|
|
elif not ctx.block:
|
|
assert 0, 'not enough machines are available'
|
|
|
|
log.warn('Could not lock enough machines, waiting...')
|
|
time.sleep(10)
|
|
try:
|
|
yield
|
|
finally:
|
|
if ctx.summary.get('success', False):
|
|
log.info('Unlocking machines...')
|
|
for machine in ctx.config['targets'].iterkeys():
|
|
lock.unlock(ctx, machine, ctx.owner)
|
|
|
|
def check_lock(ctx, config):
|
|
log.info('Checking locks...')
|
|
for machine in ctx.config['targets'].iterkeys():
|
|
status = lock.get_status(ctx, machine)
|
|
log.debug('machine status is %s', repr(status))
|
|
assert status is not None, \
|
|
'could not read lock status for {name}'.format(name=machine)
|
|
assert status['up'], 'machine {name} is marked down'.format(name=machine)
|
|
assert status['locked'], \
|
|
'machine {name} is not locked'.format(name=machine)
|
|
assert status['locked_by'] == ctx.owner, \
|
|
'machine {name} is locked by {user}, not {owner}'.format(
|
|
name=machine,
|
|
user=status['locked_by'],
|
|
owner=ctx.owner,
|
|
)
|
|
|
|
def connect(ctx, config):
|
|
log.info('Opening connections...')
|
|
from ..orchestra import connection, remote
|
|
import orchestra.cluster
|
|
remotes = []
|
|
for t, key in ctx.config['targets'].iteritems():
|
|
log.debug('connecting to %s', t)
|
|
remotes.append(
|
|
remote.Remote(name=t,
|
|
ssh=connection.connect(user_at_host=t,
|
|
host_key=key,
|
|
keep_alive=True)))
|
|
ctx.cluster = orchestra.cluster.Cluster()
|
|
if 'roles' in ctx.config:
|
|
for rem, roles in zip(remotes, ctx.config['roles']):
|
|
assert all(isinstance(role, str) for role in roles), \
|
|
"Roles in config must be strings: %r" % roles
|
|
ctx.cluster.add(rem, roles)
|
|
else:
|
|
for rem in remotes:
|
|
ctx.cluster.add(rem, rem.name)
|
|
|
|
def check_conflict(ctx, config):
|
|
log.info('Checking for old test directory...')
|
|
processes = ctx.cluster.run(
|
|
args=[
|
|
'test', '!', '-e', '/tmp/cephtest',
|
|
],
|
|
wait=False,
|
|
)
|
|
failed = False
|
|
for proc in processes:
|
|
assert isinstance(proc.exitstatus, gevent.event.AsyncResult)
|
|
try:
|
|
proc.exitstatus.get()
|
|
except run.CommandFailedError:
|
|
log.error('Host %s has stale cephtest directory, check your lock and reboot to clean up.', proc.remote.shortname)
|
|
failed = True
|
|
if failed:
|
|
raise RuntimeError('Stale jobs detected, aborting.')
|
|
|
|
@contextlib.contextmanager
|
|
def archive(ctx, config):
|
|
log.info('Creating archive directory...')
|
|
run.wait(
|
|
ctx.cluster.run(
|
|
args=[
|
|
'install', '-d', '-m0755', '--',
|
|
'/tmp/cephtest/archive',
|
|
],
|
|
wait=False,
|
|
)
|
|
)
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
if ctx.archive is not None:
|
|
|
|
log.info('Transferring archived files...')
|
|
logdir = os.path.join(ctx.archive, 'remote')
|
|
os.mkdir(logdir)
|
|
for remote in ctx.cluster.remotes.iterkeys():
|
|
path = os.path.join(logdir, remote.shortname)
|
|
os.mkdir(path)
|
|
log.debug('Transferring archived files from %s to %s', remote.shortname, path)
|
|
proc = remote.run(
|
|
args=[
|
|
'tar',
|
|
'c',
|
|
'-f', '-',
|
|
'-C', '/tmp/cephtest/archive',
|
|
'--',
|
|
'.',
|
|
],
|
|
stdout=run.PIPE,
|
|
wait=False,
|
|
)
|
|
tar = tarfile.open(mode='r|', fileobj=proc.stdout)
|
|
while True:
|
|
ti = tar.next()
|
|
if ti is None:
|
|
break
|
|
|
|
if ti.isdir():
|
|
# ignore silently; easier to just create leading dirs below
|
|
pass
|
|
elif ti.isfile():
|
|
sub = safepath.munge(ti.name)
|
|
safepath.makedirs(root=path, path=os.path.dirname(sub))
|
|
tar.makefile(ti, targetpath=os.path.join(path, sub))
|
|
else:
|
|
if ti.isdev():
|
|
type_ = 'device'
|
|
elif ti.issym():
|
|
type_ = 'symlink'
|
|
elif ti.islnk():
|
|
type_ = 'hard link'
|
|
else:
|
|
type_ = 'unknown'
|
|
log.info('Ignoring tar entry: %r type %r', ti.name, type_)
|
|
continue
|
|
proc.exitstatus.get()
|
|
|
|
log.info('Removing archive directory...')
|
|
run.wait(
|
|
ctx.cluster.run(
|
|
args=[
|
|
'rm',
|
|
'-rf',
|
|
'--',
|
|
'/tmp/cephtest/archive',
|
|
],
|
|
wait=False,
|
|
),
|
|
)
|
|
|
|
@contextlib.contextmanager
|
|
def coredump(ctx, config):
|
|
log.info('Enabling coredump saving...')
|
|
run.wait(
|
|
ctx.cluster.run(
|
|
args=[
|
|
'install', '-d', '-m0755', '--',
|
|
'/tmp/cephtest/archive/coredump',
|
|
run.Raw('&&'),
|
|
'sudo', 'sysctl', '-w', 'kernel.core_pattern=/tmp/cephtest/archive/coredump/%t.%p.core',
|
|
],
|
|
wait=False,
|
|
)
|
|
)
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
run.wait(
|
|
ctx.cluster.run(
|
|
args=[
|
|
'sudo', 'sysctl', '-w', 'kernel.core_pattern=core',
|
|
run.Raw('&&'),
|
|
# don't litter the archive dir if there were no cores dumped
|
|
'rmdir',
|
|
'--ignore-fail-on-non-empty',
|
|
'--',
|
|
'/tmp/cephtest/archive/coredump',
|
|
],
|
|
wait=False,
|
|
)
|
|
)
|
|
|
|
# set success=false if the dir is still there = coredumps were
|
|
# seen
|
|
for remote in ctx.cluster.remotes.iterkeys():
|
|
r = remote.run(
|
|
args=[
|
|
'if', 'test', '!', '-e', '/tmp/cephtest/archive/coredump', run.Raw(';'), 'then',
|
|
'echo', 'OK', run.Raw(';'),
|
|
'fi',
|
|
],
|
|
stdout=StringIO(),
|
|
)
|
|
if r.stdout.getvalue() != 'OK\n':
|
|
log.warning('Found coredumps on %s, flagging run as failed', remote)
|
|
ctx.summary['success'] = False
|
|
if 'failure_reason' not in ctx.summary:
|
|
ctx.summary['failure_reason'] = \
|
|
'Found coredumps on {remote}'.format(remote=remote)
|
|
|
|
@contextlib.contextmanager
|
|
def syslog(ctx, config):
|
|
if ctx.archive is None:
|
|
# disable this whole feature if we're not going to archive the data anyway
|
|
yield
|
|
return
|
|
|
|
log.info('Starting syslog monitoring...')
|
|
|
|
run.wait(
|
|
ctx.cluster.run(
|
|
args=[
|
|
'mkdir', '-m0755', '--',
|
|
'/tmp/cephtest/archive/syslog',
|
|
],
|
|
wait=False,
|
|
)
|
|
)
|
|
|
|
CONF = '/etc/rsyslog.d/80-cephtest.conf'
|
|
conf_fp = StringIO("""
|
|
kern.* -/tmp/cephtest/archive/syslog/kern.log;RSYSLOG_FileFormat
|
|
*.*;kern.none -/tmp/cephtest/archive/syslog/misc.log;RSYSLOG_FileFormat
|
|
""")
|
|
try:
|
|
for rem in ctx.cluster.remotes.iterkeys():
|
|
teuthology.sudo_write_file(
|
|
remote=rem,
|
|
path=CONF,
|
|
data=conf_fp,
|
|
)
|
|
conf_fp.seek(0)
|
|
run.wait(
|
|
ctx.cluster.run(
|
|
args=[
|
|
'sudo',
|
|
'initctl',
|
|
# a mere reload (SIGHUP) doesn't seem to make
|
|
# rsyslog open the files
|
|
'restart',
|
|
'rsyslog',
|
|
],
|
|
wait=False,
|
|
),
|
|
)
|
|
|
|
yield
|
|
finally:
|
|
log.info('Shutting down syslog monitoring...')
|
|
|
|
run.wait(
|
|
ctx.cluster.run(
|
|
args=[
|
|
'sudo',
|
|
'rm',
|
|
'-f',
|
|
'--',
|
|
CONF,
|
|
run.Raw('&&'),
|
|
'sudo',
|
|
'initctl',
|
|
'restart',
|
|
'rsyslog',
|
|
],
|
|
wait=False,
|
|
),
|
|
)
|
|
# race condition: nothing actually says rsyslog had time to
|
|
# flush the file fully. oh well.
|
|
|
|
log.info('Compressing syslogs...')
|
|
run.wait(
|
|
ctx.cluster.run(
|
|
args=[
|
|
'find',
|
|
'/tmp/cephtest/archive/syslog',
|
|
'-name',
|
|
'*.log',
|
|
'-print0',
|
|
run.Raw('|'),
|
|
'xargs',
|
|
'-0',
|
|
'--no-run-if-empty',
|
|
'--',
|
|
'gzip',
|
|
'--',
|
|
],
|
|
wait=False,
|
|
),
|
|
)
|