mirror of https://github.com/ceph/ceph
210 lines
6.1 KiB
Python
210 lines
6.1 KiB
Python
import contextlib
|
|
import logging
|
|
import os
|
|
import unittest
|
|
from unittest import suite, loader, case
|
|
from teuthology.task import interactive
|
|
from teuthology import misc
|
|
from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
|
|
from tasks.mgr.mgr_test_case import MgrCluster
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class DecoratingLoader(loader.TestLoader):
|
|
"""
|
|
A specialization of TestLoader that tags some extra attributes
|
|
onto test classes as they are loaded.
|
|
"""
|
|
def __init__(self, params):
|
|
self._params = params
|
|
super(DecoratingLoader, self).__init__()
|
|
|
|
def _apply_params(self, obj):
|
|
for k, v in self._params.items():
|
|
setattr(obj, k, v)
|
|
|
|
def loadTestsFromTestCase(self, testCaseClass):
|
|
self._apply_params(testCaseClass)
|
|
return super(DecoratingLoader, self).loadTestsFromTestCase(testCaseClass)
|
|
|
|
def loadTestsFromName(self, name, module=None):
|
|
result = super(DecoratingLoader, self).loadTestsFromName(name, module)
|
|
|
|
# Special case for when we were called with the name of a method, we get
|
|
# a suite with one TestCase
|
|
tests_in_result = list(result)
|
|
if len(tests_in_result) == 1 and isinstance(tests_in_result[0], case.TestCase):
|
|
self._apply_params(tests_in_result[0])
|
|
|
|
return result
|
|
|
|
|
|
class LogStream(object):
|
|
def __init__(self):
|
|
self.buffer = ""
|
|
|
|
def write(self, data):
|
|
self.buffer += data
|
|
if "\n" in self.buffer:
|
|
lines = self.buffer.split("\n")
|
|
for line in lines[:-1]:
|
|
log.info(line)
|
|
self.buffer = lines[-1]
|
|
|
|
def flush(self):
|
|
pass
|
|
|
|
|
|
class InteractiveFailureResult(unittest.TextTestResult):
|
|
"""
|
|
Specialization that implements interactive-on-error style
|
|
behavior.
|
|
"""
|
|
ctx = None
|
|
|
|
def addFailure(self, test, err):
|
|
log.error(self._exc_info_to_string(err, test))
|
|
log.error("Failure in test '{0}', going interactive".format(
|
|
self.getDescription(test)
|
|
))
|
|
interactive.task(ctx=self.ctx, config=None)
|
|
|
|
def addError(self, test, err):
|
|
log.error(self._exc_info_to_string(err, test))
|
|
log.error("Error in test '{0}', going interactive".format(
|
|
self.getDescription(test)
|
|
))
|
|
interactive.task(ctx=self.ctx, config=None)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def task(ctx, config):
|
|
"""
|
|
Run the CephFS test cases.
|
|
|
|
Run everything in tasks/cephfs/test_*.py:
|
|
|
|
::
|
|
|
|
tasks:
|
|
- install:
|
|
- ceph:
|
|
- ceph-fuse:
|
|
- cephfs_test_runner:
|
|
|
|
`modules` argument allows running only some specific modules:
|
|
|
|
::
|
|
|
|
tasks:
|
|
...
|
|
- cephfs_test_runner:
|
|
modules:
|
|
- tasks.cephfs.test_sessionmap
|
|
- tasks.cephfs.test_auto_repair
|
|
|
|
By default, any cases that can't be run on the current cluster configuration
|
|
will generate a failure. When the optional `fail_on_skip` argument is set
|
|
to false, any tests that can't be run on the current configuration will
|
|
simply be skipped:
|
|
|
|
::
|
|
tasks:
|
|
...
|
|
- cephfs_test_runner:
|
|
fail_on_skip: false
|
|
|
|
"""
|
|
|
|
ceph_cluster = CephCluster(ctx)
|
|
|
|
if len(list(misc.all_roles_of_type(ctx.cluster, 'mds'))):
|
|
mds_cluster = MDSCluster(ctx)
|
|
fs = Filesystem(ctx)
|
|
else:
|
|
mds_cluster = None
|
|
fs = None
|
|
|
|
if len(list(misc.all_roles_of_type(ctx.cluster, 'mgr'))):
|
|
mgr_cluster = MgrCluster(ctx)
|
|
else:
|
|
mgr_cluster = None
|
|
|
|
# Mount objects, sorted by ID
|
|
if hasattr(ctx, 'mounts'):
|
|
mounts = [v for k, v in sorted(ctx.mounts.items(), lambda a, b: cmp(a[0], b[0]))]
|
|
else:
|
|
# The test configuration has a filesystem but no fuse/kclient mounts
|
|
mounts = []
|
|
|
|
decorating_loader = DecoratingLoader({
|
|
"ctx": ctx,
|
|
"mounts": mounts,
|
|
"fs": fs,
|
|
"ceph_cluster": ceph_cluster,
|
|
"mds_cluster": mds_cluster,
|
|
"mgr_cluster": mgr_cluster,
|
|
})
|
|
|
|
fail_on_skip = config.get('fail_on_skip', True)
|
|
|
|
# Put useful things onto ctx for interactive debugging
|
|
ctx.fs = fs
|
|
ctx.mds_cluster = mds_cluster
|
|
ctx.mgr_cluster = mgr_cluster
|
|
|
|
# Depending on config, either load specific modules, or scan for moduless
|
|
if config and 'modules' in config and config['modules']:
|
|
module_suites = []
|
|
for mod_name in config['modules']:
|
|
# Test names like cephfs.test_auto_repair
|
|
module_suites.append(decorating_loader.loadTestsFromName(mod_name))
|
|
overall_suite = suite.TestSuite(module_suites)
|
|
else:
|
|
# Default, run all tests
|
|
overall_suite = decorating_loader.discover(
|
|
os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
"cephfs/"
|
|
)
|
|
)
|
|
|
|
if ctx.config.get("interactive-on-error", False):
|
|
InteractiveFailureResult.ctx = ctx
|
|
result_class = InteractiveFailureResult
|
|
else:
|
|
result_class = unittest.TextTestResult
|
|
|
|
class LoggingResult(result_class):
|
|
def startTest(self, test):
|
|
log.info("Starting test: {0}".format(self.getDescription(test)))
|
|
return super(LoggingResult, self).startTest(test)
|
|
|
|
def addSkip(self, test, reason):
|
|
if fail_on_skip:
|
|
# Don't just call addFailure because that requires a traceback
|
|
self.failures.append((test, reason))
|
|
else:
|
|
super(LoggingResult, self).addSkip(test, reason)
|
|
|
|
# Execute!
|
|
result = unittest.TextTestRunner(
|
|
stream=LogStream(),
|
|
resultclass=LoggingResult,
|
|
verbosity=2,
|
|
failfast=True).run(overall_suite)
|
|
|
|
if not result.wasSuccessful():
|
|
result.printErrors() # duplicate output at end for convenience
|
|
|
|
bad_tests = []
|
|
for test, error in result.errors:
|
|
bad_tests.append(str(test))
|
|
for test, failure in result.failures:
|
|
bad_tests.append(str(test))
|
|
|
|
raise RuntimeError("Test failure: {0}".format(", ".join(bad_tests)))
|
|
|
|
yield
|