ceph/teuthology/run.py
Zack Cerza 972ab1cf22 Tweak logic of try_push_job_info()
Signed-off-by: Zack Cerza <zack.cerza@inktank.com>
2013-10-04 15:01:16 -05:00

416 lines
12 KiB
Python

import argparse
import os
import yaml
import StringIO
import contextlib
import sys
from traceback import format_tb
def config_file(string):
config = {}
try:
with file(string) as f:
g = yaml.safe_load_all(f)
for new in g:
config.update(new)
except IOError, e:
raise argparse.ArgumentTypeError(str(e))
return config
class MergeConfig(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
config = getattr(namespace, self.dest)
from teuthology.misc import deep_merge
for new in values:
deep_merge(config, new)
def parse_args():
parser = argparse.ArgumentParser(description='Run ceph integration tests')
parser.add_argument(
'-v', '--verbose',
action='store_true', default=None,
help='be more verbose',
)
parser.add_argument(
'config',
metavar='CONFFILE',
nargs='+',
type=config_file,
action=MergeConfig,
default={},
help='config file to read',
)
parser.add_argument(
'-a', '--archive',
metavar='DIR',
help='path to archive results in',
)
parser.add_argument(
'--description',
help='job description',
)
parser.add_argument(
'--owner',
help='job owner',
)
parser.add_argument(
'--lock',
action='store_true',
default=False,
help='lock machines for the duration of the run',
)
parser.add_argument(
'--machine-type',
default=None,
help='Type of machine to lock/run tests on.',
)
parser.add_argument(
'--os-type',
default='ubuntu',
help='Distro/OS of machine to run test on.',
)
parser.add_argument(
'--block',
action='store_true',
default=False,
help='block until locking machines succeeds (use with --lock)',
)
parser.add_argument(
'--name',
metavar='NAME',
help='name for this teuthology run',
)
args = parser.parse_args()
return args
def set_up_logging(ctx):
import logging
loglevel = logging.INFO
if ctx.verbose:
loglevel = logging.DEBUG
logging.basicConfig(level=loglevel)
if ctx.archive is not None:
os.mkdir(ctx.archive)
handler = logging.FileHandler(
filename=os.path.join(ctx.archive, 'teuthology.log'),
)
formatter = logging.Formatter(
fmt='%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
datefmt='%Y-%m-%dT%H:%M:%S',
)
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)
install_except_hook()
def install_except_hook():
def log_exception(exception_class, exception, traceback):
import logging
logging.critical(''.join(format_tb(traceback)))
if not exception.message:
logging.critical(exception_class.__name__)
return
logging.critical('{0}: {1}'.format(exception_class.__name__, exception))
sys.excepthook = log_exception
def write_initial_metadata(ctx):
if ctx.archive is not None:
with file(os.path.join(ctx.archive, 'pid'), 'w') as f:
f.write('%d' % os.getpid())
with file(os.path.join(ctx.archive, 'owner'), 'w') as f:
f.write(ctx.owner + '\n')
with file(os.path.join(ctx.archive, 'orig.config.yaml'), 'w') as f:
yaml.safe_dump(ctx.config, f, default_flow_style=False)
info = {
'name': ctx.name,
'description': ctx.description,
'owner': ctx.owner,
'pid': os.getpid(),
}
if 'job_id' in ctx.config:
info['job_id'] = ctx.config['job_id']
with file(os.path.join(ctx.archive, 'info.yaml'), 'w') as f:
yaml.safe_dump(info, f, default_flow_style=False)
def main():
from gevent import monkey
monkey.patch_all(dns=False)
from .orchestra import monkey
monkey.patch_all()
# WARNING: Do not import any modules that import logging before this next
# block. That would cause connections to hang because the monkey patching
# hadn't been done.
import logging
ctx = parse_args()
set_up_logging(ctx)
log = logging.getLogger(__name__)
# Now it is safe to import other teuthology modules.
from . import report
if ctx.owner is None:
from teuthology.misc import get_user
ctx.owner = get_user()
# Older versions of teuthology stored job_id as an int. Convert it to a str
# if necessary.
job_id = ctx.config.get('job_id')
if job_id is not None:
job_id = str(job_id)
ctx.config['job_id'] = job_id
write_initial_metadata(ctx)
report.try_push_job_info(ctx.config)
if 'targets' in ctx.config and 'roles' in ctx.config:
targets = len(ctx.config['targets'])
roles = len(ctx.config['roles'])
assert targets >= roles, \
'%d targets are needed for all roles but found %d listed.' % (roles, targets)
machine_type = ctx.machine_type
if machine_type is None:
fallback_default = ctx.config.get('machine_type', 'plana')
machine_type = ctx.config.get('machine-type', fallback_default)
if ctx.block:
assert ctx.lock, \
'the --block option is only supported with the --lock option'
from teuthology.misc import read_config
read_config(ctx)
log.debug('\n '.join(['Config:', ] + yaml.safe_dump(ctx.config, default_flow_style=False).splitlines()))
ctx.summary = dict(success=True)
ctx.summary['owner'] = ctx.owner
if ctx.description is not None:
ctx.summary['description'] = ctx.description
for task in ctx.config['tasks']:
assert 'kernel' not in task, \
'kernel installation shouldn be a base-level item, not part of the tasks list'
init_tasks = []
if ctx.lock:
assert 'targets' not in ctx.config, \
'You cannot specify targets in a config file when using the --lock option'
init_tasks.append({'internal.lock_machines': (len(ctx.config['roles']), machine_type)})
init_tasks.extend([
{'internal.save_config': None},
{'internal.check_lock': None},
{'internal.connect': None},
{'internal.check_conflict': None},
{'internal.check_ceph_data': None},
{'internal.vm_setup': None},
])
if 'kernel' in ctx.config:
from teuthology.misc import get_distro
distro = get_distro(ctx)
if distro == 'ubuntu':
init_tasks.append({'kernel': ctx.config['kernel']})
init_tasks.extend([
{'internal.base': None},
{'internal.archive': None},
{'internal.coredump': None},
{'internal.sudo': None},
{'internal.syslog': None},
{'internal.timer': None},
])
ctx.config['tasks'][:0] = init_tasks
from teuthology.run_tasks import run_tasks
try:
run_tasks(tasks=ctx.config['tasks'], ctx=ctx)
finally:
if not ctx.summary.get('success') and ctx.config.get('nuke-on-error'):
from teuthology.nuke import nuke
# only unlock if we locked them in the first place
nuke(ctx, log, ctx.lock)
if ctx.archive is not None:
with file(os.path.join(ctx.archive, 'summary.yaml'), 'w') as f:
yaml.safe_dump(ctx.summary, f, default_flow_style=False)
with contextlib.closing(StringIO.StringIO()) as f:
yaml.safe_dump(ctx.summary, f)
log.info('Summary data:\n%s' % f.getvalue())
with contextlib.closing(StringIO.StringIO()) as f:
if 'email-on-error' in ctx.config and not ctx.summary.get('success', False):
yaml.safe_dump(ctx.summary, f)
yaml.safe_dump(ctx.config, f)
emsg = f.getvalue()
subject = "Teuthology error -- %s" % ctx.summary['failure_reason']
from teuthology.suite import email_results
email_results(subject,"Teuthology",ctx.config['email-on-error'],emsg)
report.try_push_job_info(ctx.config, ctx.summary)
if ctx.summary.get('success', True):
log.info('pass')
else:
log.info('FAIL')
import sys
sys.exit(1)
def schedule():
parser = argparse.ArgumentParser(description='Schedule ceph integration tests')
parser.add_argument(
'config',
metavar='CONFFILE',
nargs='*',
type=config_file,
action=MergeConfig,
default={},
help='config file to read',
)
parser.add_argument(
'--name',
help='name of suite run the job is part of',
)
parser.add_argument(
'--last-in-suite',
action='store_true',
default=False,
help='mark the last job in a suite so suite post-processing can be run',
)
parser.add_argument(
'--email',
help='where to send the results of a suite (only applies to the last job in a suite)',
)
parser.add_argument(
'--timeout',
help='how many seconds to wait for jobs to finish before emailing results (only applies to the last job in a suite',
type=int,
)
parser.add_argument(
'--description',
help='job description',
)
parser.add_argument(
'--owner',
help='job owner',
)
parser.add_argument(
'--delete',
metavar='JOBID',
type=int,
nargs='*',
help='list of jobs to remove from the queue',
)
parser.add_argument(
'-n', '--num',
default=1,
type=int,
help='number of times to run/queue the job'
)
parser.add_argument(
'-p', '--priority',
default=1000,
type=int,
help='beanstalk priority (lower is sooner)'
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
default=False,
help='be more verbose',
)
parser.add_argument(
'-w', '--worker',
default='plana',
help='which worker to use (type of machine)',
)
parser.add_argument(
'-s', '--show',
metavar='JOBID',
type=int,
nargs='*',
help='output the contents of specified jobs in the queue',
)
ctx = parser.parse_args()
if not ctx.last_in_suite:
assert not ctx.email, '--email is only applicable to the last job in a suite'
assert not ctx.timeout, '--timeout is only applicable to the last job in a suite'
from teuthology.misc import read_config, get_user
if ctx.owner is None:
ctx.owner = 'scheduled_{user}'.format(user=get_user())
read_config(ctx)
import teuthology.queue
beanstalk = teuthology.queue.connect(ctx)
tube=ctx.worker
beanstalk.use(tube)
if ctx.show:
for job_id in ctx.show:
job = beanstalk.peek(job_id)
if job is None and ctx.verbose:
print 'job {jid} is not in the queue'.format(jid=job_id)
else:
print '--- job {jid} priority {prio} ---\n'.format(
jid=job_id,
prio=job.stats()['pri']), job.body
return
if ctx.delete:
for job_id in ctx.delete:
job = beanstalk.peek(job_id)
if job is None:
print 'job {jid} is not in the queue'.format(jid=job_id)
else:
job.delete()
return
# strip out targets; the worker will allocate new ones when we run
# the job with --lock.
if ctx.config.get('targets'):
del ctx.config['targets']
job_config = dict(
name=ctx.name,
last_in_suite=ctx.last_in_suite,
email=ctx.email,
description=ctx.description,
owner=ctx.owner,
verbose=ctx.verbose,
)
# Merge job_config and ctx.config
job_config.update(ctx.config)
if ctx.timeout is not None:
job_config['results_timeout'] = ctx.timeout
job = yaml.safe_dump(job_config)
num = ctx.num
while num > 0:
jid = beanstalk.put(
job,
ttr=60*60*24,
priority=ctx.priority,
)
print 'Job scheduled with name {name} and ID {jid}'.format(
name=ctx.name, jid=jid)
num -= 1