tasks: generalise cephfs test runner

...to avoid having boilerplate in each test module,
and gain the ability to run them all in one go
with a nice test-by-test pass/fail report.

Signed-off-by: John Spray <john.spray@redhat.com>
This commit is contained in:
John Spray 2015-03-26 17:52:10 +00:00
parent f54e5414f9
commit 2b5137bf06
8 changed files with 271 additions and 341 deletions

View File

@ -1,7 +1,8 @@
import logging
import unittest
from unittest import case
import time
from teuthology.task import interactive
from tasks.cephfs.fuse_mount import FuseMount
log = logging.getLogger(__name__)
@ -15,11 +16,55 @@ class CephFSTestCase(unittest.TestCase):
Handles resetting the cluster under test between tests.
"""
# Environment references
mounts = None
fs = None
ctx = None
# FIXME weird explicit naming
mount_a = None
mount_b = None
fs = None
# Declarative test requirements: subclasses should override these to indicate
# their special needs. If not met, tests will be skipped.
CLIENTS_REQUIRED = 1
MDSS_REQUIRED = 1
REQUIRE_KCLIENT_REMOTE = False
REQUIRE_ONE_CLIENT_REMOTE = False
LOAD_SETTINGS = []
def setUp(self):
if len(self.fs.mds_ids) < self.MDSS_REQUIRED:
raise case.SkipTest("Only have {0} MDSs, require {1}".format(
len(self.fs.mds_ids), self.MDSS_REQUIRED
))
if len(self.mounts) < self.CLIENTS_REQUIRED:
raise case.SkipTest("Only have {0} clients, require {1}".format(
len(self.mounts), self.CLIENTS_REQUIRED
))
if self.REQUIRE_KCLIENT_REMOTE:
if not isinstance(self.mounts[0], FuseMount) or not isinstance(self.mounts[1], FuseMount):
# kclient kill() power cycles nodes, so requires clients to each be on
# their own node
if self.mounts[0].client_remote.hostname == self.mounts[1].client_remote.hostname:
raise case.SkipTest("kclient clients must be on separate nodes")
if self.REQUIRE_ONE_CLIENT_REMOTE:
if self.mounts[0].client_remote.hostname in self.fs.get_mds_hostnames():
raise case.SkipTest("Require first client to be on separate server from MDSs")
# Unmount all surplus clients
for i in range(self.CLIENTS_REQUIRED, len(self.mounts)):
mount = self.mounts[i]
log.info("Unmounting unneeded client {0}".format(mount.client_id))
mount.umount_wait()
# Create friendly mount_a, mount_b attrs
for i in range(0, self.CLIENTS_REQUIRED):
setattr(self, "mount_{0}".format(chr(ord('a') + i)), self.mounts[i])
self.fs.clear_firewall()
# Unmount in order to start each test on a fresh mount, such
@ -56,6 +101,12 @@ class CephFSTestCase(unittest.TestCase):
self.mount_b.mount()
self.mount_b.wait_until_mounted()
# Load an config settings of interest
for setting in self.LOAD_SETTINGS:
setattr(self, setting, int(self.fs.mds_asok(
['config', 'get', setting], self.fs.mds_ids[0]
)[setting]))
self.configs_set = set()
def tearDown(self):
@ -138,86 +189,3 @@ class CephFSTestCase(unittest.TestCase):
elapsed += period
log.debug("wait_until_true: success")
class LogStream(object):
def __init__(self):
self.buffer = ""
def write(self, data):
self.buffer += data
if "\n" in self.buffer:
lines = self.buffer.split("\n")
for line in lines[:-1]:
log.info(line)
self.buffer = lines[-1]
def flush(self):
pass
class InteractiveFailureResult(unittest.TextTestResult):
"""
Specialization that implements interactive-on-error style
behavior.
"""
ctx = None
def addFailure(self, test, err):
log.error(self._exc_info_to_string(err, test))
log.error("Failure in test '{0}', going interactive".format(
self.getDescription(test)
))
interactive.task(ctx=self.ctx, config=None)
def addError(self, test, err):
log.error(self._exc_info_to_string(err, test))
log.error("Error in test '{0}', going interactive".format(
self.getDescription(test)
))
interactive.task(ctx=self.ctx, config=None)
def run_tests(ctx, config, test_klass, params):
for k, v in params.items():
setattr(test_klass, k, v)
# Execute test suite
# ==================
if config and 'test_name' in config:
# Test names like TestCase.this_test
suite = unittest.TestLoader().loadTestsFromName(
"{0}.{1}".format(test_klass.__module__, config['test_name']))
else:
suite = unittest.TestLoader().loadTestsFromTestCase(test_klass)
if ctx.config.get("interactive-on-error", False):
InteractiveFailureResult.ctx = ctx
result_class = InteractiveFailureResult
else:
result_class = unittest.TextTestResult
# Unmount all clients not involved
for mount in ctx.mounts.values():
if mount is not params.get('mount_a') and mount is not params.get('mount_b'):
if mount.is_mounted():
log.info("Unmounting unneeded client {0}".format(mount.client_id))
mount.umount_wait()
# Execute!
result = unittest.TextTestRunner(
stream=LogStream(),
resultclass=result_class,
verbosity=2,
failfast=True).run(suite)
if not result.wasSuccessful():
result.printErrors() # duplicate output at end for convenience
bad_tests = []
for test, error in result.errors:
bad_tests.append(str(test))
for test, failure in result.failures:
bad_tests.append(str(test))
raise RuntimeError("Test failure: {0}".format(", ".join(bad_tests)))

View File

@ -3,14 +3,11 @@
Exercise the MDS's auto repair functions
"""
import contextlib
import logging
import time
from teuthology.orchestra.run import CommandFailedError
from tasks.cephfs.filesystem import Filesystem
from tasks.cephfs.cephfs_test_case import CephFSTestCase, run_tests
from tasks.cephfs.cephfs_test_case import CephFSTestCase
log = logging.getLogger(__name__)
@ -100,23 +97,3 @@ class TestMDSAutoRepair(CephFSTestCase):
# restart mds to make it writable
self.fs.mds_fail_restart()
self.fs.wait_for_daemons()
@contextlib.contextmanager
def task(ctx, config):
fs = Filesystem(ctx)
mount_a = ctx.mounts.values()[0]
# Stash references on ctx so that we can easily debug in interactive mode
# =======================================================================
ctx.filesystem = fs
ctx.mount_a = mount_a
run_tests(ctx, config, TestMDSAutoRepair, {
'fs': fs,
'mount_a': mount_a,
})
# Continue to any downstream tasks
# ================================
yield

View File

@ -4,14 +4,12 @@ Exercise the MDS's behaviour when clients and the MDCache reach or
exceed the limits of how many caps/inodes they should hold.
"""
import contextlib
import logging
from unittest import SkipTest
from teuthology.orchestra.run import CommandFailedError
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from tasks.cephfs.filesystem import Filesystem
from tasks.cephfs.fuse_mount import FuseMount
from tasks.cephfs.cephfs_test_case import CephFSTestCase, run_tests
log = logging.getLogger(__name__)
@ -27,10 +25,8 @@ CAP_RECALL_MIN = 100
class TestClientLimits(CephFSTestCase):
# Environment references
mds_session_timeout = None
mds_reconnect_timeout = None
ms_max_backoff = None
REQUIRE_KCLIENT_REMOTE = True
CLIENTS_REQUIRED = 2
def wait_for_health(self, pattern, timeout):
"""
@ -147,37 +143,3 @@ class TestClientLimits(CephFSTestCase):
# Client B should complete
self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
rproc.wait()
@contextlib.contextmanager
def task(ctx, config):
fs = Filesystem(ctx)
# Pick out the clients we will use from the configuration
# =======================================================
if len(ctx.mounts) < 2:
raise RuntimeError("Need at least two clients")
mount_a = ctx.mounts.values()[0]
mount_b = ctx.mounts.values()[1]
if not isinstance(mount_a, FuseMount) or not isinstance(mount_b, FuseMount):
# kclient kill() power cycles nodes, so requires clients to each be on
# their own node
if mount_a.client_remote.hostname == mount_b.client_remote.hostname:
raise RuntimeError("kclient clients must be on separate nodes")
# Stash references on ctx so that we can easily debug in interactive mode
# =======================================================================
ctx.filesystem = fs
ctx.mount_a = mount_a
ctx.mount_b = mount_b
run_tests(ctx, config, TestClientLimits, {
'fs': fs,
'mount_a': mount_a,
'mount_b': mount_b
})
# Continue to any downstream tasks
# ================================
yield

View File

@ -3,17 +3,11 @@
Teuthology task for exercising CephFS client recovery
"""
import contextlib
import logging
import time
import unittest
from teuthology.orchestra.run import CommandFailedError, ConnectionLostError
from teuthology.task import interactive
from tasks.cephfs.cephfs_test_case import CephFSTestCase, run_tests
from tasks.cephfs.filesystem import Filesystem
from tasks.cephfs.fuse_mount import FuseMount
from tasks.cephfs.cephfs_test_case import CephFSTestCase
log = logging.getLogger(__name__)
@ -25,6 +19,12 @@ MDS_RESTART_GRACE = 60
class TestClientRecovery(CephFSTestCase):
REQUIRE_KCLIENT_REMOTE = True
REQUIRE_ONE_CLIENT_REMOTE = True
CLIENTS_REQUIRED = 2
LOAD_SETTINGS = ["mds_session_timeout", "mds_reconnect_timeout", "ms_max_backoff"]
# Environment references
mds_session_timeout = None
mds_reconnect_timeout = None
@ -328,96 +328,3 @@ class TestClientRecovery(CephFSTestCase):
except (CommandFailedError, ConnectionLostError):
# We killed it, so it raises an error
pass
class LogStream(object):
def __init__(self):
self.buffer = ""
def write(self, data):
self.buffer += data
if "\n" in self.buffer:
lines = self.buffer.split("\n")
for line in lines[:-1]:
log.info(line)
self.buffer = lines[-1]
def flush(self):
pass
class InteractiveFailureResult(unittest.TextTestResult):
"""
Specialization that implements interactive-on-error style
behavior.
"""
ctx = None
def addFailure(self, test, err):
log.error(self._exc_info_to_string(err, test))
log.error("Failure in test '{0}', going interactive".format(
self.getDescription(test)
))
interactive.task(ctx=self.ctx, config=None)
def addError(self, test, err):
log.error(self._exc_info_to_string(err, test))
log.error("Error in test '{0}', going interactive".format(
self.getDescription(test)
))
interactive.task(ctx=self.ctx, config=None)
@contextlib.contextmanager
def task(ctx, config):
"""
Execute CephFS client recovery test suite.
Requires:
- An outer ceph_fuse task with at least two clients
- That the clients are on a separate host to the MDS
"""
fs = Filesystem(ctx)
# Pick out the clients we will use from the configuration
# =======================================================
if len(ctx.mounts) < 2:
raise RuntimeError("Need at least two clients")
mount_a = ctx.mounts.values()[0]
mount_b = ctx.mounts.values()[1]
if not isinstance(mount_a, FuseMount) or not isinstance(mount_b, FuseMount):
# kclient kill() power cycles nodes, so requires clients to each be on
# their own node
if mount_a.client_remote.hostname == mount_b.client_remote.hostname:
raise RuntimeError("kclient clients must be on separate nodes")
# Check we have at least one remote client for use with network-dependent tests
# =============================================================================
if mount_a.client_remote.hostname in fs.get_mds_hostnames():
raise RuntimeError("Require first client to on separate server from MDSs")
# Stash references on ctx so that we can easily debug in interactive mode
# =======================================================================
ctx.filesystem = fs
ctx.mount_a = mount_a
ctx.mount_b = mount_b
run_tests(ctx, config, TestClientRecovery, {
"mds_reconnect_timeout": int(fs.mds_asok(
['config', 'get', 'mds_reconnect_timeout'], fs.mds_ids[0]
)['mds_reconnect_timeout']),
"mds_session_timeout": int(fs.mds_asok(
['config', 'get', 'mds_session_timeout'], fs.mds_ids[0]
)['mds_session_timeout']),
"ms_max_backoff": int(fs.mds_asok(
['config', 'get', 'ms_max_backoff'], fs.mds_ids[0]
)['ms_max_backoff']),
"fs": fs,
"mount_a": mount_a,
"mount_b": mount_b
})
# Continue to any downstream tasks
# ================================
yield

View File

@ -1,7 +1,7 @@
import contextlib
from textwrap import dedent
from tasks.cephfs.cephfs_test_case import run_tests, CephFSTestCase
from tasks.cephfs.filesystem import Filesystem, ObjectNotFound, ROOT_INO
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from tasks.cephfs.filesystem import ObjectNotFound, ROOT_INO
class TestFlush(CephFSTestCase):
@ -111,28 +111,3 @@ class TestFlush(CephFSTestCase):
with self.assertRaises(ObjectNotFound):
self.fs.read_backtrace(file_ino)
self.assertEqual(self.fs.list_dirfrag(ROOT_INO), [])
@contextlib.contextmanager
def task(ctx, config):
fs = Filesystem(ctx)
# Pick out the clients we will use from the configuration
# =======================================================
if len(ctx.mounts) < 1:
raise RuntimeError("Need at least one client")
mount = ctx.mounts.values()[0]
# Stash references on ctx so that we can easily debug in interactive mode
# =======================================================================
ctx.filesystem = fs
ctx.mount = mount
run_tests(ctx, config, TestFlush, {
'fs': fs,
'mount_a': mount,
})
# Continue to any downstream tasks
# ================================
yield

View File

@ -1,27 +1,27 @@
"""
Exercise the MDS and Client behaviour when the cluster fills up.
"""
import contextlib
import json
import logging
import os
from textwrap import dedent
import time
from teuthology.orchestra.run import CommandFailedError
from tasks.cephfs.filesystem import Filesystem
from tasks.cephfs.cephfs_test_case import CephFSTestCase, run_tests
from unittest import case
from tasks.cephfs.cephfs_test_case import CephFSTestCase
log = logging.getLogger(__name__)
class TestClusterFull(CephFSTestCase):
"""
Exercise the MDS and Client behaviour when the cluster fills up.
"""
# Persist-between-tests constants
pool_capacity = None
CLIENTS_REQUIRED = 2
def setUp(self):
super(TestClusterFull, self).setUp()
@ -31,6 +31,12 @@ class TestClusterFull(CephFSTestCase):
# tests (reason as yet unclear, but this dodges the issue)
TestClusterFull.pool_capacity = self.fs.get_pool_df(self._data_pool_name())['max_avail']
objectstore = self.fs.get_config("osd_objectstore", "osd")
if objectstore != "memstore":
# You certainly *could* run this on a real OSD, but you don't want to sit
# here for hours waiting for the test to fill up a 1TB drive!
raise case.SkipTest("Require `memstore` OSD backend to simulate full drives")
def test_barrier(self):
"""
That when an OSD epoch barrier is set on an MDS, subsequently
@ -333,31 +339,3 @@ class TestClusterFull(CephFSTestCase):
""")
self._remote_write_test(remote_script)
@contextlib.contextmanager
def task(ctx, config):
fs = Filesystem(ctx)
# Pick out the clients we will use from the configuration
# =======================================================
if len(ctx.mounts) < 2:
raise RuntimeError("Need at least two clients")
mount_a = ctx.mounts.values()[0]
mount_b = ctx.mounts.values()[1]
# Stash references on ctx so that we can easily debug in interactive mode
# =======================================================================
ctx.filesystem = fs
ctx.mount_a = mount_a
ctx.mount_b = mount_b
run_tests(ctx, config, TestClusterFull, {
'fs': fs,
'mount_a': mount_a,
'mount_b': mount_b
})
# Continue to any downstream tasks
# ================================
yield

View File

@ -3,7 +3,6 @@
Test our tools for recovering the content of damaged journals
"""
import contextlib
import json
import logging
import os
@ -11,9 +10,10 @@ from textwrap import dedent
import time
from StringIO import StringIO
import re
from teuthology.orchestra.run import CommandFailedError
from tasks.cephfs.filesystem import Filesystem, ObjectNotFound, ROOT_INO
from tasks.cephfs.cephfs_test_case import CephFSTestCase, run_tests
from tasks.cephfs.filesystem import ObjectNotFound, ROOT_INO
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.orchestra import run
@ -21,6 +21,8 @@ log = logging.getLogger(__name__)
class TestJournalRepair(CephFSTestCase):
MDSS_REQUIRED = 2
def test_inject_to_empty(self):
"""
That when some dentries in the journal but nothing is in
@ -375,28 +377,3 @@ class TestJournalRepair(CephFSTestCase):
"pending_destroy": []},
"result": 0}
)
@contextlib.contextmanager
def task(ctx, config):
fs = Filesystem(ctx)
# Pick out the clients we will use from the configuration
# =======================================================
if len(ctx.mounts) < 1:
raise RuntimeError("Need at least one clients")
mount_a = ctx.mounts.values()[0]
# Stash references on ctx so that we can easily debug in interactive mode
# =======================================================================
ctx.filesystem = fs
ctx.mount_a = mount_a
run_tests(ctx, config, TestJournalRepair, {
'fs': fs,
'mount_a': mount_a
})
# Continue to any downstream tasks
# ================================
yield

186
tasks/cephfs_test_runner.py Normal file
View File

@ -0,0 +1,186 @@
import contextlib
import logging
import os
import unittest
from unittest import suite, loader, case
from teuthology.task import interactive
from tasks.cephfs.filesystem import Filesystem
log = logging.getLogger(__name__)
class DecoratingLoader(loader.TestLoader):
"""
A specialization of TestLoader that tags some extra attributes
onto test classes as they are loaded.
"""
def __init__(self, params):
self._params = params
super(DecoratingLoader, self).__init__()
def _apply_params(self, obj):
for k, v in self._params.items():
setattr(obj, k, v)
def loadTestsFromTestCase(self, testCaseClass):
self._apply_params(testCaseClass)
return super(DecoratingLoader, self).loadTestsFromTestCase(testCaseClass)
def loadTestsFromName(self, name, module=None):
result = super(DecoratingLoader, self).loadTestsFromName(name, module)
# Special case for when we were called with the name of a method, we get
# a suite with one TestCase
tests_in_result = list(result)
if len(tests_in_result) == 1 and isinstance(tests_in_result[0], case.TestCase):
self._apply_params(tests_in_result[0])
return result
class LogStream(object):
def __init__(self):
self.buffer = ""
def write(self, data):
self.buffer += data
if "\n" in self.buffer:
lines = self.buffer.split("\n")
for line in lines[:-1]:
log.info(line)
self.buffer = lines[-1]
def flush(self):
pass
class InteractiveFailureResult(unittest.TextTestResult):
"""
Specialization that implements interactive-on-error style
behavior.
"""
ctx = None
def addFailure(self, test, err):
log.error(self._exc_info_to_string(err, test))
log.error("Failure in test '{0}', going interactive".format(
self.getDescription(test)
))
interactive.task(ctx=self.ctx, config=None)
def addError(self, test, err):
log.error(self._exc_info_to_string(err, test))
log.error("Error in test '{0}', going interactive".format(
self.getDescription(test)
))
interactive.task(ctx=self.ctx, config=None)
@contextlib.contextmanager
def task(ctx, config):
"""
Run the CephFS test cases.
Run everything in tasks/cephfs/test_*.py:
::
tasks:
- install:
- ceph:
- ceph-fuse:
- cephfs_test_runner:
`modules` argument allows running only some specific modules:
::
tasks:
...
- cephfs_test_runner:
modules:
- tasks.cephfs.test_sessionmap
- tasks.cephfs.test_auto_repair
By default, any cases that can't be run on the current cluster configuration
will generate a failure. When the optional `fail_on_skip` argument is set
to false, any tests that can't be run on the current configuration will
simply be skipped:
::
tasks:
...
- cephfs_test_runner:
fail_on_skip: false
"""
fs = Filesystem(ctx)
# Mount objects, sorted by ID
mounts = [v for k, v in sorted(ctx.mounts.items(), lambda a, b: cmp(a[0], b[0]))]
decorating_loader = DecoratingLoader({
"ctx": ctx,
"mounts": mounts,
"fs": fs
})
fail_on_skip = config.get('fail_on_skip', True)
# Put useful things onto ctx for interactive debugging
ctx.fs = fs
# Depending on config, either load specific modules, or scan for moduless
if config and 'modules' in config and config['modules']:
module_suites = []
for mod_name in config['modules']:
# Test names like cephfs.test_auto_repair
module_suites.append(decorating_loader.loadTestsFromName(mod_name))
overall_suite = suite.TestSuite(module_suites)
else:
# Default, run all tests
overall_suite = decorating_loader.discover(
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"cephfs/"
)
)
if ctx.config.get("interactive-on-error", False):
InteractiveFailureResult.ctx = ctx
result_class = InteractiveFailureResult
else:
result_class = unittest.TextTestResult
class LoggingResult(result_class):
def startTest(self, test):
log.info("Starting test: {0}".format(self.getDescription(test)))
return super(LoggingResult, self).startTest(test)
def addSkip(self, test, reason):
if fail_on_skip:
# Don't just call addFailure because that requires a traceback
self.failures.append((test, reason))
else:
super(LoggingResult, self).addSkip(test, reason)
# Execute!
result = unittest.TextTestRunner(
stream=LogStream(),
resultclass=LoggingResult,
verbosity=2,
failfast=True).run(overall_suite)
if not result.wasSuccessful():
result.printErrors() # duplicate output at end for convenience
bad_tests = []
for test, error in result.errors:
bad_tests.append(str(test))
for test, failure in result.failures:
bad_tests.append(str(test))
raise RuntimeError("Test failure: {0}".format(", ".join(bad_tests)))
yield