mirror of
https://github.com/ceph/ceph
synced 2024-12-25 21:03:31 +00:00
252 lines
8.0 KiB
Python
252 lines
8.0 KiB
Python
import argparse
|
|
import fcntl
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import yaml
|
|
|
|
import beanstalkc
|
|
|
|
from .config import config as teuth_config
|
|
from . import safepath
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# simple flock class
|
|
class filelock(object):
|
|
def __init__(self, fn):
|
|
self.fn = fn
|
|
self.fd = None
|
|
|
|
def acquire(self):
|
|
assert not self.fd
|
|
self.fd = file(self.fn, 'w')
|
|
fcntl.lockf(self.fd, fcntl.LOCK_EX)
|
|
|
|
def release(self):
|
|
assert self.fd
|
|
fcntl.lockf(self.fd, fcntl.LOCK_UN)
|
|
self.fd = None
|
|
|
|
|
|
def connect(ctx):
|
|
host = ctx.teuthology_config['queue_host']
|
|
port = ctx.teuthology_config['queue_port']
|
|
return beanstalkc.Connection(host=host, port=port)
|
|
|
|
|
|
def fetch_teuthology_branch(path, branch='master'):
|
|
"""
|
|
Make sure we have the correct teuthology branch checked out and up-to-date
|
|
"""
|
|
# only let one worker create/update the checkout at a time
|
|
lock = filelock('%s.lock' % path)
|
|
lock.acquire()
|
|
try:
|
|
if not os.path.isdir(path):
|
|
log.info("Cloning %s from upstream", branch)
|
|
teuthology_git_upstream = teuth_config.ceph_git_base_url + 'teuthology.git'
|
|
log.info(
|
|
subprocess.check_output(('git', 'clone', '--branch', branch,
|
|
teuthology_git_upstream, path),
|
|
cwd=os.path.dirname(path))
|
|
)
|
|
elif time.time() - os.stat('/etc/passwd').st_mtime > 60:
|
|
# only do this at most once per minute
|
|
log.info("Fetching %s from upstream", branch)
|
|
log.info(
|
|
subprocess.check_output(('git', 'fetch', '-p', 'origin'),
|
|
cwd=path)
|
|
)
|
|
log.info(
|
|
subprocess.check_output(('touch', path))
|
|
)
|
|
else:
|
|
log.info("%s was just updated; assuming it is current", branch)
|
|
|
|
# This try/except block will notice if the requested branch doesn't
|
|
# exist, whether it was cloned or fetched.
|
|
try:
|
|
subprocess.check_output(
|
|
('git', 'reset', '--hard', 'origin/%s' % branch),
|
|
cwd=path,
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
log.exception("teuthology branch not found: %s", branch)
|
|
shutil.rmtree(path)
|
|
raise
|
|
|
|
log.info("Bootstrapping %s", path)
|
|
# This magic makes the bootstrap script not attempt to clobber an
|
|
# existing virtualenv. But the branch's bootstrap needs to actually
|
|
# check for the NO_CLOBBER variable.
|
|
env = os.environ.copy()
|
|
env['NO_CLOBBER'] = '1'
|
|
log.info(
|
|
subprocess.check_output(('./bootstrap'), cwd=path, env=env)
|
|
)
|
|
|
|
finally:
|
|
lock.release()
|
|
|
|
def worker():
|
|
parser = argparse.ArgumentParser(description="""
|
|
Grab jobs from a beanstalk queue and run the teuthology tests they
|
|
describe. One job is run at a time.
|
|
""")
|
|
parser.add_argument(
|
|
'-v', '--verbose',
|
|
action='store_true', default=None,
|
|
help='be more verbose',
|
|
)
|
|
parser.add_argument(
|
|
'--archive-dir',
|
|
metavar='DIR',
|
|
help='path under which to archive results',
|
|
required=True,
|
|
)
|
|
parser.add_argument(
|
|
'-l', '--log-dir',
|
|
help='path in which to store logs',
|
|
required=True,
|
|
)
|
|
parser.add_argument(
|
|
'-t', '--tube',
|
|
help='which beanstalk tube to read jobs from',
|
|
required=True,
|
|
)
|
|
|
|
ctx = parser.parse_args()
|
|
|
|
loglevel = logging.INFO
|
|
if ctx.verbose:
|
|
loglevel = logging.DEBUG
|
|
|
|
logging.basicConfig(
|
|
level=loglevel,
|
|
filename=os.path.join(ctx.log_dir, 'worker.{tube}.{pid}'.format(
|
|
pid=os.getpid(),
|
|
tube=ctx.tube,
|
|
)),
|
|
format='%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
|
|
datefmt='%Y-%m-%dT%H:%M:%S',
|
|
)
|
|
|
|
if not os.path.isdir(ctx.archive_dir):
|
|
sys.exit("{prog}: archive directory must exist: {path}".format(
|
|
prog=os.path.basename(sys.argv[0]),
|
|
path=ctx.archive_dir,
|
|
))
|
|
|
|
from teuthology.misc import read_config
|
|
read_config(ctx)
|
|
|
|
beanstalk = connect(ctx)
|
|
beanstalk.watch(ctx.tube)
|
|
beanstalk.ignore('default')
|
|
|
|
while True:
|
|
job = beanstalk.reserve(timeout=60)
|
|
if job is None:
|
|
continue
|
|
|
|
# bury the job so it won't be re-run if it fails
|
|
job.bury()
|
|
log.debug('Reserved job %d', job.jid)
|
|
log.debug('Config is: %s', job.body)
|
|
job_config = yaml.safe_load(job.body)
|
|
|
|
job_config['job_id'] = job.jid
|
|
safe_archive = safepath.munge(job_config['name'])
|
|
archive_path_full = os.path.join(ctx.archive_dir, safe_archive, str(job.jid))
|
|
job_config['archive_path'] = archive_path_full
|
|
|
|
# If the teuthology branch was not specified, default to master and
|
|
# store that value.
|
|
teuthology_branch = job_config.get('teuthology_branch', 'master')
|
|
job_config['teuthology_branch'] = teuthology_branch
|
|
|
|
teuth_path = os.path.join(os.getenv("HOME"),
|
|
'teuthology-' + teuthology_branch)
|
|
|
|
fetch_teuthology_branch(path=teuth_path, branch=teuthology_branch)
|
|
|
|
teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
|
|
if not os.path.isdir(teuth_bin_path):
|
|
raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
|
|
(teuthology_branch, teuth_bin_path))
|
|
|
|
if job_config.get('last_in_suite'):
|
|
log.debug('Generating coverage for %s', job_config['name'])
|
|
args = [
|
|
os.path.join(teuth_bin_path, 'teuthology-results'),
|
|
'--timeout',
|
|
str(job_config.get('results_timeout', 21600)),
|
|
'--email',
|
|
job_config['email'],
|
|
'--archive-dir',
|
|
os.path.join(ctx.archive_dir, safe_archive),
|
|
'--name',
|
|
job_config['name'],
|
|
]
|
|
subprocess.Popen(args=args)
|
|
else:
|
|
log.debug('Creating archive dir...')
|
|
safepath.makedirs(ctx.archive_dir, safe_archive)
|
|
log.info('Running job %d', job.jid)
|
|
run_job(job_config, teuth_bin_path)
|
|
job.delete()
|
|
|
|
|
|
def run_job(job_config, teuth_bin_path):
|
|
arg = [
|
|
os.path.join(teuth_bin_path, 'teuthology'),
|
|
]
|
|
# The following is for compatibility with older schedulers, from before we
|
|
# started merging the contents of job_config['config'] into job_config
|
|
# itself.
|
|
if 'config' in job_config:
|
|
inner_config = job_config.pop('config')
|
|
if not isinstance(inner_config, dict):
|
|
log.debug("run_job: job_config['config'] isn't a dict, it's a %s",
|
|
str(type(inner_config)))
|
|
else:
|
|
job_config.update(inner_config)
|
|
|
|
if job_config['verbose']:
|
|
arg.append('-v')
|
|
|
|
arg.extend([
|
|
'--lock',
|
|
'--block',
|
|
'--owner', job_config['owner'],
|
|
'--archive', job_config['archive_path'],
|
|
'--name', job_config['name'],
|
|
])
|
|
if job_config['description'] is not None:
|
|
arg.extend(['--description', job_config['description']])
|
|
arg.append('--')
|
|
|
|
with tempfile.NamedTemporaryFile(prefix='teuthology-worker.',
|
|
suffix='.tmp',) as tmp:
|
|
yaml.safe_dump(data=job_config, stream=tmp)
|
|
tmp.flush()
|
|
arg.append(tmp.name)
|
|
p = subprocess.Popen(
|
|
args=arg,
|
|
close_fds=True,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
child = logging.getLogger(__name__ + '.child')
|
|
for line in p.stderr:
|
|
child.error(': %s', line.rstrip('\n'))
|
|
p.wait()
|
|
if p.returncode != 0:
|
|
log.error('Child exited with code %d', p.returncode)
|
|
else:
|
|
log.info('Success!')
|