ceph/teuthology/worker.py

339 lines
12 KiB
Python
Raw Normal View History

import fcntl
import logging
import os
import subprocess
import sys
import tempfile
import time
import yaml
from datetime import datetime
from traceback import format_tb
from teuthology import setup_log_file
from . import beanstalk
from . import report
from . import safepath
from .config import config as teuth_config
from .kill import kill_job
from .misc import read_config
from .repo_utils import enforce_repo_state, BranchNotFoundError
log = logging.getLogger(__name__)
start_time = datetime.utcnow()
restart_file_path = '/tmp/teuthology-restart-workers'
def need_restart():
if not os.path.exists(restart_file_path):
return False
file_mtime = datetime.utcfromtimestamp(os.path.getmtime(restart_file_path))
if file_mtime > start_time:
return True
else:
return False
def restart():
log.info('Restarting...')
args = sys.argv[:]
args.insert(0, sys.executable)
os.execv(sys.executable, args)
def install_except_hook():
"""
Install an exception hook that first logs any uncaught exception, then
raises it.
"""
def log_exception(exception_class, exception, traceback):
logging.critical(''.join(format_tb(traceback)))
if not exception.message:
logging.critical(exception_class.__name__)
else:
logging.critical('{0}: {1}'.format(
exception_class.__name__, exception))
# Now raise the exception like normal
sys.__excepthook__(exception_class, exception, traceback)
sys.excepthook = log_exception
class filelock(object):
# simple flock class
def __init__(self, fn):
self.fn = fn
self.fd = None
def acquire(self):
assert not self.fd
self.fd = file(self.fn, 'w')
fcntl.lockf(self.fd, fcntl.LOCK_EX)
def release(self):
assert self.fd
fcntl.lockf(self.fd, fcntl.LOCK_UN)
self.fd = None
def fetch_teuthology_branch(dest_path, branch='master'):
"""
Make sure we have the correct teuthology branch checked out and up-to-date
"""
# only let one worker create/update the checkout at a time
lock = filelock('%s.lock' % dest_path)
lock.acquire()
try:
teuthology_git_upstream = teuth_config.ceph_git_base_url + \
'teuthology.git'
enforce_repo_state(teuthology_git_upstream, dest_path, branch)
log.debug("Bootstrapping %s", dest_path)
# This magic makes the bootstrap script not attempt to clobber an
# existing virtualenv. But the branch's bootstrap needs to actually
# check for the NO_CLOBBER variable.
env = os.environ.copy()
env['NO_CLOBBER'] = '1'
cmd = './bootstrap'
boot_proc = subprocess.Popen(cmd, shell=True, cwd=dest_path, env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
returncode = boot_proc.wait()
if returncode != 0:
for line in boot_proc.stdout.readlines():
log.warn(line.strip())
log.info("Bootstrap exited with status %s", returncode)
finally:
lock.release()
def main(ctx):
loglevel = logging.INFO
if ctx.verbose:
loglevel = logging.DEBUG
log.setLevel(loglevel)
log_file_path = os.path.join(ctx.log_dir, 'worker.{tube}.{pid}'.format(
pid=os.getpid(), tube=ctx.tube,))
setup_log_file(log, log_file_path)
install_except_hook()
if not os.path.isdir(ctx.archive_dir):
sys.exit("{prog}: archive directory must exist: {path}".format(
prog=os.path.basename(sys.argv[0]),
path=ctx.archive_dir,
))
else:
teuth_config.archive_base = ctx.archive_dir
read_config(ctx)
connection = beanstalk.connect()
beanstalk.watch_tube(connection, ctx.tube)
result_proc = None
while True:
# Check to see if we have a teuthology-results process hanging around
# and if so, read its return code so that it can exit.
if result_proc is not None and result_proc.poll() is not None:
log.debug("teuthology-results exited with code: %s",
result_proc.returncode)
result_proc = None
if need_restart():
restart()
job = connection.reserve(timeout=60)
if job is None:
continue
# bury the job so it won't be re-run if it fails
job.bury()
log.info('Reserved job %d', job.jid)
log.info('Config is: %s', job.body)
job_config = yaml.safe_load(job.body)
job_config['job_id'] = str(job.jid)
safe_archive = safepath.munge(job_config['name'])
job_config['worker_log'] = log_file_path
archive_path_full = os.path.join(
ctx.archive_dir, safe_archive, str(job.jid))
job_config['archive_path'] = archive_path_full
# If the teuthology branch was not specified, default to master and
# store that value.
teuthology_branch = job_config.get('teuthology_branch', 'master')
job_config['teuthology_branch'] = teuthology_branch
teuth_path = os.path.join(os.getenv("HOME"),
'teuthology-' + teuthology_branch)
2013-08-23 14:59:48 +00:00
try:
fetch_teuthology_branch(dest_path=teuth_path,
branch=teuthology_branch)
except BranchNotFoundError:
log.exception(
"Branch not found; throwing job away")
# Optionally, we could mark the job as dead, but we don't have a
# great way to express why it is dead.
report.try_delete_jobs(job_config['name'],
job_config['job_id'])
continue
queue: only bootstrap new checkouts Until we figure out why bootstrap is getting stuck like this: 9851 pts/7 S 0:03 /home/teuthworker/teuthology-master/virtualenv/bin/python ./teuthology-master/virtualenv/bin/teuthology-worker -v --archive-dir /var/lib/teuthworker/archive --tube plana --log-dir /var/lib/teuthworker/archive/worker_logs 2075 pts/7 Z 0:00 \_ [git] <defunct> 2112 pts/7 Z 0:00 \_ [git] <defunct> 2138 pts/7 Z 0:00 \_ [bootstrap] <defunct> 9852 pts/7 S 0:03 /home/teuthworker/teuthology-master/virtualenv/bin/python ./teuthology-master/virtualenv/bin/teuthology-worker -v --archive-dir /var/lib/teuthworker/archive --tube plana --log-dir /var/lib/teuthworker/archive/worker_logs 2153 pts/7 Z 0:00 \_ [git] <defunct> 2177 pts/7 Z 0:00 \_ [git] <defunct> 2264 pts/7 Z 0:00 \_ [bootstrap] <defunct> 9853 pts/7 S 0:03 /home/teuthworker/teuthology-master/virtualenv/bin/python ./teuthology-master/virtualenv/bin/teuthology-worker -v --archive-dir /var/lib/teuthworker/archive --tube plana --log-dir /var/lib/teuthworker/archive/worker_logs 2141 pts/7 Z 0:00 \_ [git] <defunct> 2276 pts/7 Z 0:00 \_ [git] <defunct> 2305 pts/7 Z 0:00 \_ [bootstrap] <defunct> 9854 pts/7 S 0:03 /home/teuthworker/teuthology-master/virtualenv/bin/python ./teuthology-master/virtualenv/bin/teuthology-worker -v --archive-dir /var/lib/teuthworker/archive --tube plana --log-dir /var/lib/teuthworker/archive/worker_logs 7448 pts/7 Z 0:00 \_ [git] <defunct> 7449 pts/7 Z 0:00 \_ [git] <defunct> 7450 pts/7 Z 0:00 \_ [bootstrap] <defunct> 7452 pts/7 Z 0:00 \_ [teuthology-resu] <defunct> 9855 pts/7 S 0:01 /home/teuthworker/teuthology-master/virtualenv/bin/python ./teuthology-master/virtualenv/bin/teuthology-worker -v --archive-dir /var/lib/teuthworker/archive --tube plana --log-dir /var/lib/teuthworker/archive/worker_logs 7712 pts/7 Z 0:00 \_ [git] <defunct> 7713 pts/7 Z 0:00 \_ [git] <defunct> 7714 pts/7 Z 0:00 \_ [bootstrap] <defunct> 7716 pts/7 Z 0:00 \_ [teuthology-resu] <defunct> 9856 pts/7 S 0:03 /home/teuthworker/teuthology-master/virtualenv/bin/python ./teuthology-master/virtualenv/bin/teuthology-worker -v --archive-dir /var/lib/teuthworker/archive --tube plana --log-dir /var/lib/teuthworker/archive/worker_logs 2316 pts/7 Z 0:00 \_ [bootstrap] <defunct> 9857 pts/7 S 0:03 /home/teuthworker/teuthology-master/virtualenv/bin/python ./teuthology-master/virtualenv/bin/teuthology-worker -v --archive-dir /var/lib/teuthworker/archive --tube plana --log-dir /var/lib/teuthworker/archive/worker_logs 2340 pts/7 Z 0:00 \_ [bootstrap] <defunct> 9858 pts/7 S 0:01 /home/teuthworker/teuthology-master/virtualenv/bin/python ./teuthology-master/virtualenv/bin/teuthology-worker -v --archive-dir /var/lib/teuthworker/archive --tube plana --log-dir /var/lib/teuthworker/archive/worker_logs 23188 pts/7 Z 0:00 \_ [bootstrap] <defunct> 9859 pts/7 S 0:03 /home/teuthworker/teuthology-master/virtualenv/bin/python ./teuthology-master/virtualenv/bin/teuthology-worker -v --archive-dir /var/lib/teuthworker/archive --tube plana --log-dir /var/lib/teuthworker/archive/worker_logs Signed-off-by: Sage Weil <sage@inktank.com>
2013-08-23 05:14:41 +00:00
teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
if not os.path.isdir(teuth_bin_path):
2013-08-23 14:59:48 +00:00
raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
(teuthology_branch, teuth_bin_path))
2013-07-19 13:01:28 +00:00
if job_config.get('last_in_suite'):
if teuth_config.results_server:
report.try_delete_jobs(job_config['name'],
job_config['job_id'])
log.info('Generating results email for %s', job_config['name'])
args = [
os.path.join(teuth_bin_path, 'teuthology-results'),
'--timeout',
str(job_config.get('results_timeout', 21600)),
'--email',
job_config['email'],
'--archive-dir',
os.path.join(ctx.archive_dir, safe_archive),
'--name',
job_config['name'],
]
# Execute teuthology-results, passing 'preexec_fn=os.setpgrp' to
# make sure that it will continue to run if this worker process
# dies (e.g. because of a restart)
result_proc = subprocess.Popen(args=args, preexec_fn=os.setpgrp)
log.info("teuthology-results PID: %s", result_proc.pid)
else:
log.info('Creating archive dir %s', archive_path_full)
safepath.makedirs(ctx.archive_dir, safe_archive)
log.info('Running job %d', job.jid)
run_job(job_config, teuth_bin_path)
job.delete()
def run_with_watchdog(process, job_config):
job_start_time = datetime.utcnow()
# Only push the information that's relevant to the watchdog, to save db
# load
job_info = dict(
name=job_config['name'],
job_id=job_config['job_id'],
)
# Sleep once outside of the loop to avoid double-posting jobs
time.sleep(teuth_config.watchdog_interval)
symlink_worker_log(job_config['worker_log'], job_config['archive_path'])
while process.poll() is None:
# Kill jobs that have been running longer than the global max
run_time = datetime.utcnow() - job_start_time
total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
if total_seconds > teuth_config.max_job_time:
log.warning("Job ran longer than {max}s. Killing...".format(
max=teuth_config.max_job_time))
kill_job(job_info['name'], job_info['job_id'],
teuth_config.archive_base)
report.try_push_job_info(job_info, dict(status='running'))
time.sleep(teuth_config.watchdog_interval)
# The job finished. Let's make sure paddles knows.
branches_sans_reporting = ('argonaut', 'bobtail', 'cuttlefish', 'dumpling')
if job_config.get('teuthology_branch') in branches_sans_reporting:
# The job ran with a teuthology branch that may not have the reporting
# feature. Let's call teuthology-report (which will be from the master
# branch) to report the job manually.
cmd = "teuthology-report -v -D -r {run_name} -j {job_id}".format(
run_name=job_info['name'],
job_id=job_info['job_id'])
try:
log.info("Executing %s" % cmd)
report_proc = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
while report_proc.poll() is None:
for line in report_proc.stdout.readlines():
log.info(line.strip())
time.sleep(1)
log.info("Reported results via the teuthology-report command")
except Exception:
log.exception("teuthology-report failed")
else:
# Let's make sure that paddles knows the job is finished. We don't know
# the status, but if it was a pass or fail it will have already been
# reported to paddles. In that case paddles ignores the 'dead' status.
# If the job was killed, paddles will use the 'dead' status.
report.try_push_job_info(job_info, dict(status='dead'))
def run_job(job_config, teuth_bin_path):
arg = [
os.path.join(teuth_bin_path, 'teuthology'),
]
# The following is for compatibility with older schedulers, from before we
# started merging the contents of job_config['config'] into job_config
# itself.
if 'config' in job_config:
inner_config = job_config.pop('config')
if not isinstance(inner_config, dict):
log.warn("run_job: job_config['config'] isn't a dict, it's a %s",
str(type(inner_config)))
else:
job_config.update(inner_config)
if job_config['verbose']:
arg.append('-v')
arg.extend([
'--lock',
'--block',
'--owner', job_config['owner'],
'--archive', job_config['archive_path'],
'--name', job_config['name'],
])
if job_config['description'] is not None:
arg.extend(['--description', job_config['description']])
arg.append('--')
with tempfile.NamedTemporaryFile(prefix='teuthology-worker.',
suffix='.tmp',) as tmp:
2013-09-11 20:14:58 +00:00
yaml.safe_dump(data=job_config, stream=tmp)
tmp.flush()
arg.append(tmp.name)
p = subprocess.Popen(args=arg)
log.info("Job archive: %s", job_config['archive_path'])
log.info("Job PID: %s", str(p.pid))
if teuth_config.results_server:
log.info("Running with watchdog")
try:
run_with_watchdog(p, job_config)
except Exception:
log.exception("run_with_watchdog had an unhandled exception")
raise
else:
log.info("Running without watchdog")
# This sleep() is to give the child time to start up and create the
# archive dir.
time.sleep(5)
symlink_worker_log(job_config['worker_log'],
job_config['archive_path'])
p.wait()
if p.returncode != 0:
log.error('Child exited with code %d', p.returncode)
else:
log.info('Success!')
def symlink_worker_log(worker_log_path, archive_dir):
try:
log.debug("Worker log: %s", worker_log_path)
os.symlink(worker_log_path, os.path.join(archive_dir, 'worker.log'))
except Exception:
log.exception("Failed to symlink worker log")