ceph/tasks/cephfs_test_runner.py

189 lines
5.4 KiB
Python

import contextlib
import logging
import os
import unittest
from unittest import suite, loader, case
from teuthology.task import interactive
from tasks.cephfs.filesystem import Filesystem, MDSCluster
log = logging.getLogger(__name__)
class DecoratingLoader(loader.TestLoader):
"""
A specialization of TestLoader that tags some extra attributes
onto test classes as they are loaded.
"""
def __init__(self, params):
self._params = params
super(DecoratingLoader, self).__init__()
def _apply_params(self, obj):
for k, v in self._params.items():
setattr(obj, k, v)
def loadTestsFromTestCase(self, testCaseClass):
self._apply_params(testCaseClass)
return super(DecoratingLoader, self).loadTestsFromTestCase(testCaseClass)
def loadTestsFromName(self, name, module=None):
result = super(DecoratingLoader, self).loadTestsFromName(name, module)
# Special case for when we were called with the name of a method, we get
# a suite with one TestCase
tests_in_result = list(result)
if len(tests_in_result) == 1 and isinstance(tests_in_result[0], case.TestCase):
self._apply_params(tests_in_result[0])
return result
class LogStream(object):
def __init__(self):
self.buffer = ""
def write(self, data):
self.buffer += data
if "\n" in self.buffer:
lines = self.buffer.split("\n")
for line in lines[:-1]:
log.info(line)
self.buffer = lines[-1]
def flush(self):
pass
class InteractiveFailureResult(unittest.TextTestResult):
"""
Specialization that implements interactive-on-error style
behavior.
"""
ctx = None
def addFailure(self, test, err):
log.error(self._exc_info_to_string(err, test))
log.error("Failure in test '{0}', going interactive".format(
self.getDescription(test)
))
interactive.task(ctx=self.ctx, config=None)
def addError(self, test, err):
log.error(self._exc_info_to_string(err, test))
log.error("Error in test '{0}', going interactive".format(
self.getDescription(test)
))
interactive.task(ctx=self.ctx, config=None)
@contextlib.contextmanager
def task(ctx, config):
"""
Run the CephFS test cases.
Run everything in tasks/cephfs/test_*.py:
::
tasks:
- install:
- ceph:
- ceph-fuse:
- cephfs_test_runner:
`modules` argument allows running only some specific modules:
::
tasks:
...
- cephfs_test_runner:
modules:
- tasks.cephfs.test_sessionmap
- tasks.cephfs.test_auto_repair
By default, any cases that can't be run on the current cluster configuration
will generate a failure. When the optional `fail_on_skip` argument is set
to false, any tests that can't be run on the current configuration will
simply be skipped:
::
tasks:
...
- cephfs_test_runner:
fail_on_skip: false
"""
fs = Filesystem(ctx)
mds_cluster = MDSCluster(ctx)
# Mount objects, sorted by ID
mounts = [v for k, v in sorted(ctx.mounts.items(), lambda a, b: cmp(a[0], b[0]))]
decorating_loader = DecoratingLoader({
"ctx": ctx,
"mounts": mounts,
"fs": fs,
"mds_cluster": mds_cluster
})
fail_on_skip = config.get('fail_on_skip', True)
# Put useful things onto ctx for interactive debugging
ctx.fs = fs
ctx.mds_cluster = mds_cluster
# Depending on config, either load specific modules, or scan for moduless
if config and 'modules' in config and config['modules']:
module_suites = []
for mod_name in config['modules']:
# Test names like cephfs.test_auto_repair
module_suites.append(decorating_loader.loadTestsFromName(mod_name))
overall_suite = suite.TestSuite(module_suites)
else:
# Default, run all tests
overall_suite = decorating_loader.discover(
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"cephfs/"
)
)
if ctx.config.get("interactive-on-error", False):
InteractiveFailureResult.ctx = ctx
result_class = InteractiveFailureResult
else:
result_class = unittest.TextTestResult
class LoggingResult(result_class):
def startTest(self, test):
log.info("Starting test: {0}".format(self.getDescription(test)))
return super(LoggingResult, self).startTest(test)
def addSkip(self, test, reason):
if fail_on_skip:
# Don't just call addFailure because that requires a traceback
self.failures.append((test, reason))
else:
super(LoggingResult, self).addSkip(test, reason)
# Execute!
result = unittest.TextTestRunner(
stream=LogStream(),
resultclass=LoggingResult,
verbosity=2,
failfast=True).run(overall_suite)
if not result.wasSuccessful():
result.printErrors() # duplicate output at end for convenience
bad_tests = []
for test, error in result.errors:
bad_tests.append(str(test))
for test, failure in result.failures:
bad_tests.append(str(test))
raise RuntimeError("Test failure: {0}".format(", ".join(bad_tests)))
yield