2013-08-23 20:43:18 +00:00
|
|
|
import fcntl
|
2011-07-08 18:37:20 +00:00
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import subprocess
|
|
|
|
import sys
|
|
|
|
import tempfile
|
2013-08-27 22:58:14 +00:00
|
|
|
import time
|
2011-07-08 18:37:20 +00:00
|
|
|
import yaml
|
|
|
|
|
2013-12-09 20:56:49 +00:00
|
|
|
from datetime import datetime
|
|
|
|
|
2014-05-07 16:02:52 +00:00
|
|
|
from teuthology import setup_log_file
|
2014-04-15 00:28:54 +00:00
|
|
|
from . import beanstalk
|
2013-12-05 23:37:25 +00:00
|
|
|
from . import report
|
2013-09-20 20:12:02 +00:00
|
|
|
from . import safepath
|
2013-10-11 16:38:35 +00:00
|
|
|
from .config import config as teuth_config
|
2014-01-16 16:38:39 +00:00
|
|
|
from .kill import kill_job
|
2013-10-11 16:38:35 +00:00
|
|
|
from .misc import read_config
|
2014-06-30 16:10:31 +00:00
|
|
|
from .repo_utils import enforce_repo_state, BranchNotFoundError
|
2011-07-08 18:37:20 +00:00
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
2013-12-09 20:56:49 +00:00
|
|
|
start_time = datetime.utcnow()
|
|
|
|
restart_file_path = '/tmp/teuthology-restart-workers'
|
2011-07-08 18:37:20 +00:00
|
|
|
|
2013-10-09 18:11:15 +00:00
|
|
|
|
2013-12-09 20:56:49 +00:00
|
|
|
def need_restart():
|
|
|
|
if not os.path.exists(restart_file_path):
|
|
|
|
return False
|
2013-12-09 22:57:11 +00:00
|
|
|
file_mtime = datetime.utcfromtimestamp(os.path.getmtime(restart_file_path))
|
|
|
|
if file_mtime > start_time:
|
2013-12-09 20:56:49 +00:00
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
2013-10-09 18:11:15 +00:00
|
|
|
|
|
|
|
|
2013-12-09 20:56:49 +00:00
|
|
|
def restart():
|
|
|
|
log.info('Restarting...')
|
|
|
|
args = sys.argv[:]
|
|
|
|
args.insert(0, sys.executable)
|
|
|
|
os.execv(sys.executable, args)
|
|
|
|
|
|
|
|
|
2014-05-21 16:34:11 +00:00
|
|
|
def install_except_hook():
|
|
|
|
"""
|
|
|
|
Install an exception hook that first logs any uncaught exception, then
|
|
|
|
raises it.
|
|
|
|
"""
|
2014-07-10 22:17:25 +00:00
|
|
|
def log_exception(exc_type, exc_value, exc_traceback):
|
|
|
|
if not issubclass(exc_type, KeyboardInterrupt):
|
|
|
|
log.critical("Uncaught exception", exc_info=(exc_type, exc_value,
|
|
|
|
exc_traceback))
|
|
|
|
sys.__excepthook__(exc_type, exc_value, exc_traceback)
|
2014-05-21 16:34:11 +00:00
|
|
|
sys.excepthook = log_exception
|
|
|
|
|
|
|
|
|
2013-12-09 20:56:49 +00:00
|
|
|
class filelock(object):
|
|
|
|
# simple flock class
|
2013-08-23 20:43:18 +00:00
|
|
|
def __init__(self, fn):
|
|
|
|
self.fn = fn
|
|
|
|
self.fd = None
|
|
|
|
|
|
|
|
def acquire(self):
|
|
|
|
assert not self.fd
|
|
|
|
self.fd = file(self.fn, 'w')
|
|
|
|
fcntl.lockf(self.fd, fcntl.LOCK_EX)
|
|
|
|
|
|
|
|
def release(self):
|
|
|
|
assert self.fd
|
|
|
|
fcntl.lockf(self.fd, fcntl.LOCK_UN)
|
|
|
|
self.fd = None
|
|
|
|
|
2013-08-22 19:47:18 +00:00
|
|
|
|
2014-06-25 19:23:54 +00:00
|
|
|
def fetch_teuthology_branch(dest_path, branch='master'):
|
2013-08-23 15:08:01 +00:00
|
|
|
"""
|
|
|
|
Make sure we have the correct teuthology branch checked out and up-to-date
|
|
|
|
"""
|
2013-08-23 20:43:18 +00:00
|
|
|
# only let one worker create/update the checkout at a time
|
2014-06-25 19:23:54 +00:00
|
|
|
lock = filelock('%s.lock' % dest_path)
|
2013-08-23 20:43:18 +00:00
|
|
|
lock.acquire()
|
2013-08-23 15:08:01 +00:00
|
|
|
try:
|
2014-06-25 20:20:18 +00:00
|
|
|
teuthology_git_upstream = teuth_config.ceph_git_base_url + \
|
|
|
|
'teuthology.git'
|
2014-06-30 16:10:31 +00:00
|
|
|
enforce_repo_state(teuthology_git_upstream, dest_path, branch)
|
2013-08-23 20:43:18 +00:00
|
|
|
|
2014-06-25 19:23:54 +00:00
|
|
|
log.debug("Bootstrapping %s", dest_path)
|
2013-08-23 20:43:18 +00:00
|
|
|
# This magic makes the bootstrap script not attempt to clobber an
|
|
|
|
# existing virtualenv. But the branch's bootstrap needs to actually
|
|
|
|
# check for the NO_CLOBBER variable.
|
|
|
|
env = os.environ.copy()
|
|
|
|
env['NO_CLOBBER'] = '1'
|
2014-01-03 17:55:13 +00:00
|
|
|
cmd = './bootstrap'
|
2014-06-25 19:23:54 +00:00
|
|
|
boot_proc = subprocess.Popen(cmd, shell=True, cwd=dest_path, env=env,
|
2014-01-03 17:55:13 +00:00
|
|
|
stdout=subprocess.PIPE,
|
|
|
|
stderr=subprocess.STDOUT)
|
2014-02-26 23:15:37 +00:00
|
|
|
returncode = boot_proc.wait()
|
|
|
|
if returncode != 0:
|
2014-01-03 17:55:13 +00:00
|
|
|
for line in boot_proc.stdout.readlines():
|
2014-02-26 23:15:37 +00:00
|
|
|
log.warn(line.strip())
|
|
|
|
log.info("Bootstrap exited with status %s", returncode)
|
2013-08-23 15:08:01 +00:00
|
|
|
|
2013-08-23 20:43:18 +00:00
|
|
|
finally:
|
|
|
|
lock.release()
|
2013-08-23 15:08:01 +00:00
|
|
|
|
2013-10-09 18:11:15 +00:00
|
|
|
|
2014-04-15 00:39:13 +00:00
|
|
|
def main(ctx):
|
2011-07-08 18:37:20 +00:00
|
|
|
loglevel = logging.INFO
|
|
|
|
if ctx.verbose:
|
|
|
|
loglevel = logging.DEBUG
|
2013-11-06 22:02:12 +00:00
|
|
|
log.setLevel(loglevel)
|
|
|
|
|
|
|
|
log_file_path = os.path.join(ctx.log_dir, 'worker.{tube}.{pid}'.format(
|
|
|
|
pid=os.getpid(), tube=ctx.tube,))
|
2014-05-07 16:02:52 +00:00
|
|
|
setup_log_file(log, log_file_path)
|
2011-07-08 18:37:20 +00:00
|
|
|
|
2014-05-21 16:34:11 +00:00
|
|
|
install_except_hook()
|
|
|
|
|
2011-07-08 18:37:20 +00:00
|
|
|
if not os.path.isdir(ctx.archive_dir):
|
|
|
|
sys.exit("{prog}: archive directory must exist: {path}".format(
|
2013-10-09 18:11:15 +00:00
|
|
|
prog=os.path.basename(sys.argv[0]),
|
|
|
|
path=ctx.archive_dir,
|
|
|
|
))
|
2014-01-16 16:38:39 +00:00
|
|
|
else:
|
|
|
|
teuth_config.archive_base = ctx.archive_dir
|
2011-07-08 18:37:20 +00:00
|
|
|
|
|
|
|
read_config(ctx)
|
|
|
|
|
2014-04-15 00:28:54 +00:00
|
|
|
connection = beanstalk.connect()
|
2014-04-17 21:04:39 +00:00
|
|
|
beanstalk.watch_tube(connection, ctx.tube)
|
2014-06-02 14:18:31 +00:00
|
|
|
result_proc = None
|
2011-07-08 18:37:20 +00:00
|
|
|
|
|
|
|
while True:
|
2014-06-02 14:18:31 +00:00
|
|
|
# Check to see if we have a teuthology-results process hanging around
|
|
|
|
# and if so, read its return code so that it can exit.
|
|
|
|
if result_proc is not None and result_proc.poll() is not None:
|
|
|
|
log.debug("teuthology-results exited with code: %s",
|
|
|
|
result_proc.returncode)
|
|
|
|
result_proc = None
|
|
|
|
|
2013-12-09 20:56:49 +00:00
|
|
|
if need_restart():
|
|
|
|
restart()
|
|
|
|
|
2014-04-15 00:28:54 +00:00
|
|
|
job = connection.reserve(timeout=60)
|
2011-07-08 18:37:20 +00:00
|
|
|
if job is None:
|
|
|
|
continue
|
|
|
|
|
|
|
|
# bury the job so it won't be re-run if it fails
|
|
|
|
job.bury()
|
2014-02-26 23:15:37 +00:00
|
|
|
log.info('Reserved job %d', job.jid)
|
|
|
|
log.info('Config is: %s', job.body)
|
2011-08-26 00:09:03 +00:00
|
|
|
job_config = yaml.safe_load(job.body)
|
2013-09-11 14:59:45 +00:00
|
|
|
|
2013-10-04 17:35:03 +00:00
|
|
|
job_config['job_id'] = str(job.jid)
|
2011-08-26 00:09:03 +00:00
|
|
|
safe_archive = safepath.munge(job_config['name'])
|
2014-01-24 16:19:43 +00:00
|
|
|
job_config['worker_log'] = log_file_path
|
2013-10-09 18:11:15 +00:00
|
|
|
archive_path_full = os.path.join(
|
|
|
|
ctx.archive_dir, safe_archive, str(job.jid))
|
2013-09-11 14:59:45 +00:00
|
|
|
job_config['archive_path'] = archive_path_full
|
|
|
|
|
2013-09-11 20:40:14 +00:00
|
|
|
# If the teuthology branch was not specified, default to master and
|
|
|
|
# store that value.
|
|
|
|
teuthology_branch = job_config.get('teuthology_branch', 'master')
|
|
|
|
job_config['teuthology_branch'] = teuthology_branch
|
2011-07-08 18:37:20 +00:00
|
|
|
|
2014-07-03 16:12:41 +00:00
|
|
|
teuth_path = os.path.join(os.getenv("HOME"), 'src',
|
|
|
|
'teuthology_' + teuthology_branch)
|
2013-08-23 14:59:48 +00:00
|
|
|
|
2014-05-22 17:26:50 +00:00
|
|
|
try:
|
2014-06-25 19:23:54 +00:00
|
|
|
fetch_teuthology_branch(dest_path=teuth_path,
|
|
|
|
branch=teuthology_branch)
|
2014-05-22 17:26:50 +00:00
|
|
|
except BranchNotFoundError:
|
|
|
|
log.exception(
|
|
|
|
"Branch not found; throwing job away")
|
|
|
|
# Optionally, we could mark the job as dead, but we don't have a
|
|
|
|
# great way to express why it is dead.
|
|
|
|
report.try_delete_jobs(job_config['name'],
|
|
|
|
job_config['job_id'])
|
|
|
|
continue
|
2013-08-23 05:14:41 +00:00
|
|
|
|
2013-08-22 19:47:18 +00:00
|
|
|
teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
|
|
|
|
if not os.path.isdir(teuth_bin_path):
|
2013-08-23 14:59:48 +00:00
|
|
|
raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
|
2013-08-22 19:47:18 +00:00
|
|
|
(teuthology_branch, teuth_bin_path))
|
|
|
|
|
2013-07-19 13:01:28 +00:00
|
|
|
if job_config.get('last_in_suite'):
|
2014-04-25 20:22:30 +00:00
|
|
|
if teuth_config.results_server:
|
|
|
|
report.try_delete_jobs(job_config['name'],
|
|
|
|
job_config['job_id'])
|
2014-05-28 19:56:59 +00:00
|
|
|
log.info('Generating results email for %s', job_config['name'])
|
2011-08-29 19:42:45 +00:00
|
|
|
args = [
|
2013-08-22 19:47:18 +00:00
|
|
|
os.path.join(teuth_bin_path, 'teuthology-results'),
|
2011-08-29 19:42:45 +00:00
|
|
|
'--timeout',
|
2014-07-07 18:20:39 +00:00
|
|
|
str(job_config.get('results_timeout', 32400)),
|
2011-08-29 19:42:45 +00:00
|
|
|
'--email',
|
|
|
|
job_config['email'],
|
|
|
|
'--archive-dir',
|
|
|
|
os.path.join(ctx.archive_dir, safe_archive),
|
|
|
|
'--name',
|
|
|
|
job_config['name'],
|
2013-08-22 19:47:18 +00:00
|
|
|
]
|
2014-05-30 14:56:27 +00:00
|
|
|
# Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to
|
|
|
|
# make sure that it will continue to run if this worker process
|
|
|
|
# dies (e.g. because of a restart)
|
2014-06-02 14:18:31 +00:00
|
|
|
result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp)
|
|
|
|
log.info("teuthology-results PID: %s", result_proc.pid)
|
2011-08-26 00:11:33 +00:00
|
|
|
else:
|
2014-02-26 23:15:37 +00:00
|
|
|
log.info('Creating archive dir %s', archive_path_full)
|
2011-08-26 00:11:33 +00:00
|
|
|
safepath.makedirs(ctx.archive_dir, safe_archive)
|
|
|
|
log.info('Running job %d', job.jid)
|
2013-09-11 14:59:45 +00:00
|
|
|
run_job(job_config, teuth_bin_path)
|
2011-10-04 19:32:58 +00:00
|
|
|
job.delete()
|
2011-07-08 18:37:20 +00:00
|
|
|
|
2013-08-22 19:47:18 +00:00
|
|
|
|
2013-12-16 17:43:06 +00:00
|
|
|
def run_with_watchdog(process, job_config):
|
2014-01-16 16:38:39 +00:00
|
|
|
job_start_time = datetime.utcnow()
|
|
|
|
|
2013-12-05 23:37:25 +00:00
|
|
|
# Only push the information that's relevant to the watchdog, to save db
|
|
|
|
# load
|
|
|
|
job_info = dict(
|
|
|
|
name=job_config['name'],
|
|
|
|
job_id=job_config['job_id'],
|
|
|
|
)
|
|
|
|
|
2013-12-31 20:25:05 +00:00
|
|
|
# Sleep once outside of the loop to avoid double-posting jobs
|
|
|
|
time.sleep(teuth_config.watchdog_interval)
|
2014-02-26 22:22:32 +00:00
|
|
|
symlink_worker_log(job_config['worker_log'], job_config['archive_path'])
|
2013-12-05 23:37:25 +00:00
|
|
|
while process.poll() is None:
|
2014-01-16 16:38:39 +00:00
|
|
|
# Kill jobs that have been running longer than the global max
|
2014-03-17 16:19:02 +00:00
|
|
|
run_time = datetime.utcnow() - job_start_time
|
|
|
|
total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
|
|
|
|
if total_seconds > teuth_config.max_job_time:
|
2014-01-16 16:53:53 +00:00
|
|
|
log.warning("Job ran longer than {max}s. Killing...".format(
|
|
|
|
max=teuth_config.max_job_time))
|
2014-01-16 16:38:39 +00:00
|
|
|
kill_job(job_info['name'], job_info['job_id'],
|
|
|
|
teuth_config.archive_base)
|
|
|
|
|
2013-12-05 23:37:25 +00:00
|
|
|
report.try_push_job_info(job_info, dict(status='running'))
|
|
|
|
time.sleep(teuth_config.watchdog_interval)
|
|
|
|
|
2013-12-12 23:33:53 +00:00
|
|
|
# The job finished. Let's make sure paddles knows.
|
2014-02-24 17:20:03 +00:00
|
|
|
branches_sans_reporting = ('argonaut', 'bobtail', 'cuttlefish', 'dumpling')
|
|
|
|
if job_config.get('teuthology_branch') in branches_sans_reporting:
|
2013-12-12 23:33:53 +00:00
|
|
|
# The job ran with a teuthology branch that may not have the reporting
|
|
|
|
# feature. Let's call teuthology-report (which will be from the master
|
|
|
|
# branch) to report the job manually.
|
2014-04-17 14:43:07 +00:00
|
|
|
cmd = "teuthology-report -v -D -r {run_name} -j {job_id}".format(
|
2013-12-16 20:22:22 +00:00
|
|
|
run_name=job_info['name'],
|
|
|
|
job_id=job_info['job_id'])
|
2013-12-16 19:34:37 +00:00
|
|
|
try:
|
2014-02-26 23:15:37 +00:00
|
|
|
log.info("Executing %s" % cmd)
|
2013-12-19 16:25:51 +00:00
|
|
|
report_proc = subprocess.Popen(cmd, shell=True,
|
|
|
|
stdout=subprocess.PIPE,
|
|
|
|
stderr=subprocess.STDOUT)
|
|
|
|
while report_proc.poll() is None:
|
|
|
|
for line in report_proc.stdout.readlines():
|
2014-01-03 21:01:31 +00:00
|
|
|
log.info(line.strip())
|
2013-12-19 16:25:51 +00:00
|
|
|
time.sleep(1)
|
|
|
|
log.info("Reported results via the teuthology-report command")
|
2013-12-17 17:02:30 +00:00
|
|
|
except Exception:
|
|
|
|
log.exception("teuthology-report failed")
|
2013-12-13 15:56:23 +00:00
|
|
|
else:
|
|
|
|
# Let's make sure that paddles knows the job is finished. We don't know
|
|
|
|
# the status, but if it was a pass or fail it will have already been
|
|
|
|
# reported to paddles. In that case paddles ignores the 'dead' status.
|
|
|
|
# If the job was killed, paddles will use the 'dead' status.
|
|
|
|
report.try_push_job_info(job_info, dict(status='dead'))
|
2013-12-05 23:37:25 +00:00
|
|
|
|
|
|
|
|
2013-09-11 14:59:45 +00:00
|
|
|
def run_job(job_config, teuth_bin_path):
|
2011-07-08 18:37:20 +00:00
|
|
|
arg = [
|
2013-08-22 19:47:18 +00:00
|
|
|
os.path.join(teuth_bin_path, 'teuthology'),
|
|
|
|
]
|
2013-09-16 18:14:52 +00:00
|
|
|
# The following is for compatibility with older schedulers, from before we
|
|
|
|
# started merging the contents of job_config['config'] into job_config
|
|
|
|
# itself.
|
|
|
|
if 'config' in job_config:
|
|
|
|
inner_config = job_config.pop('config')
|
|
|
|
if not isinstance(inner_config, dict):
|
2014-02-26 23:15:37 +00:00
|
|
|
log.warn("run_job: job_config['config'] isn't a dict, it's a %s",
|
|
|
|
str(type(inner_config)))
|
2013-09-16 18:14:52 +00:00
|
|
|
else:
|
|
|
|
job_config.update(inner_config)
|
2011-07-08 18:37:20 +00:00
|
|
|
|
|
|
|
if job_config['verbose']:
|
|
|
|
arg.append('-v')
|
|
|
|
|
|
|
|
arg.extend([
|
2013-08-29 21:12:36 +00:00
|
|
|
'--lock',
|
|
|
|
'--block',
|
|
|
|
'--owner', job_config['owner'],
|
2013-09-11 14:59:45 +00:00
|
|
|
'--archive', job_config['archive_path'],
|
2013-08-29 21:12:36 +00:00
|
|
|
'--name', job_config['name'],
|
|
|
|
])
|
2011-07-08 18:37:20 +00:00
|
|
|
if job_config['description'] is not None:
|
|
|
|
arg.extend(['--description', job_config['description']])
|
|
|
|
arg.append('--')
|
|
|
|
|
2013-08-29 21:12:36 +00:00
|
|
|
with tempfile.NamedTemporaryFile(prefix='teuthology-worker.',
|
|
|
|
suffix='.tmp',) as tmp:
|
2013-09-11 20:14:58 +00:00
|
|
|
yaml.safe_dump(data=job_config, stream=tmp)
|
2012-08-09 16:42:35 +00:00
|
|
|
tmp.flush()
|
2012-08-08 21:45:49 +00:00
|
|
|
arg.append(tmp.name)
|
2013-12-12 21:45:58 +00:00
|
|
|
p = subprocess.Popen(args=arg)
|
2013-12-10 19:19:56 +00:00
|
|
|
log.info("Job archive: %s", job_config['archive_path'])
|
2014-03-05 20:13:04 +00:00
|
|
|
log.info("Job PID: %s", str(p.pid))
|
2013-12-05 23:37:25 +00:00
|
|
|
|
|
|
|
if teuth_config.results_server:
|
2013-12-10 16:06:16 +00:00
|
|
|
log.info("Running with watchdog")
|
2014-01-03 20:56:46 +00:00
|
|
|
try:
|
|
|
|
run_with_watchdog(p, job_config)
|
|
|
|
except Exception:
|
|
|
|
log.exception("run_with_watchdog had an unhandled exception")
|
2014-01-03 21:45:18 +00:00
|
|
|
raise
|
2013-12-05 23:37:25 +00:00
|
|
|
else:
|
2013-12-10 16:06:16 +00:00
|
|
|
log.info("Running without watchdog")
|
2014-02-26 22:22:32 +00:00
|
|
|
# This sleep() is to give the child time to start up and create the
|
|
|
|
# archive dir.
|
|
|
|
time.sleep(5)
|
|
|
|
symlink_worker_log(job_config['worker_log'],
|
|
|
|
job_config['archive_path'])
|
2013-12-05 23:37:25 +00:00
|
|
|
p.wait()
|
|
|
|
|
2012-08-08 21:48:21 +00:00
|
|
|
if p.returncode != 0:
|
2012-08-14 22:08:21 +00:00
|
|
|
log.error('Child exited with code %d', p.returncode)
|
2012-08-08 21:44:47 +00:00
|
|
|
else:
|
|
|
|
log.info('Success!')
|
2014-02-26 22:22:32 +00:00
|
|
|
|
|
|
|
|
|
|
|
def symlink_worker_log(worker_log_path, archive_dir):
|
|
|
|
try:
|
|
|
|
log.debug("Worker log: %s", worker_log_path)
|
|
|
|
os.symlink(worker_log_path, os.path.join(archive_dir, 'worker.log'))
|
|
|
|
except Exception:
|
|
|
|
log.exception("Failed to symlink worker log")
|