diff --git a/tasks/cephfs/filesystem.py b/tasks/cephfs/filesystem.py index df0d2b21a2b..2db056d2bbd 100644 --- a/tasks/cephfs/filesystem.py +++ b/tasks/cephfs/filesystem.py @@ -50,14 +50,20 @@ class Filesystem(object): return list(result) + def get_config(self, key): + return self.mds_asok(['config', 'get', key])[key] + def set_ceph_conf(self, subsys, key, value): - # Set config so that journal will be created in older format - if 'mds' not in self._ctx.ceph.conf: - self._ctx.ceph.conf['mds'] = {} - self._ctx.ceph.conf['mds'][key] = value + if subsys not in self._ctx.ceph.conf: + self._ctx.ceph.conf[subsys] = {} + self._ctx.ceph.conf[subsys][key] = value write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they # used a different config path this won't work. + def clear_ceph_conf(self, subsys, key): + del self._ctx.ceph.conf[subsys][key] + write_conf(self._ctx) + def are_daemons_healthy(self): """ Return true if all daemons are in one of active, standby, standby-replay @@ -159,6 +165,7 @@ class Filesystem(object): """ temp_bin_path = '/tmp/out.bin' + # FIXME get the metadata pool name from mdsmap instead of hardcoding self.client_remote.run(args=[ 'sudo', 'rados', '-p', 'metadata', 'get', object_id, temp_bin_path ]) diff --git a/tasks/cephfs/fuse_mount.py b/tasks/cephfs/fuse_mount.py index 51ceccc496c..37ef0789dfe 100644 --- a/tasks/cephfs/fuse_mount.py +++ b/tasks/cephfs/fuse_mount.py @@ -120,6 +120,7 @@ class FuseMount(CephFSMount): def umount(self): try: + log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name)) self.client_remote.run( args=[ 'sudo', diff --git a/tasks/cephfs/mount.py b/tasks/cephfs/mount.py index 1d34079ed9b..114c3f50dd2 100644 --- a/tasks/cephfs/mount.py +++ b/tasks/cephfs/mount.py @@ -99,7 +99,7 @@ class CephFSMount(object): def _run_python(self, pyscript): return self.client_remote.run(args=[ - 'sudo', 'daemon-helper', 'kill', 'python', '-c', pyscript + 'sudo', 'adjust-ulimits', 'daemon-helper', 'kill', 'python', '-c', pyscript ], wait=False, stdin=run.PIPE) def run_shell(self, args): @@ -169,6 +169,41 @@ class CephFSMount(object): self.background_procs.append(rproc) return rproc + def open_n_background(self, fs_path, count): + """ + Open N files for writing, hold them open in a background process + + :param fs_path: Path relative to CephFS root, e.g. "foo/bar" + :return: a RemoteProcess + """ + assert(self.is_mounted()) + + abs_path = os.path.join(self.mountpoint, fs_path) + + pyscript = dedent(""" + import sys + import time + import os + + n = {count} + abs_path = "{abs_path}" + + if not os.path.exists(os.path.dirname(abs_path)): + os.makedirs(os.path.dirname(abs_path)) + + handles = [] + for i in range(0, n): + fname = "{{0}}_{{1}}".format(abs_path, i) + handles.append(open(fname, 'w')) + + while True: + time.sleep(1) + """).format(abs_path=abs_path, count=count) + + rproc = self._run_python(pyscript) + self.background_procs.append(rproc) + return rproc + def teardown(self): for p in self.background_procs: log.info("Terminating background process") diff --git a/tasks/mds_client_limits.py b/tasks/mds_client_limits.py new file mode 100644 index 00000000000..a4a25ccc826 --- /dev/null +++ b/tasks/mds_client_limits.py @@ -0,0 +1,242 @@ + +""" +Exercise the MDS's behaviour when clients and the MDCache reach or +exceed the limits of how many caps/inodes they should hold. +""" + +import contextlib +import logging +import time + +from teuthology.orchestra.run import CommandFailedError + +from tasks.cephfs.filesystem import Filesystem +from tasks.cephfs.fuse_mount import FuseMount +from tasks.cephfs.cephfs_test_case import CephFSTestCase, run_tests + + +log = logging.getLogger(__name__) + + +# Arbitrary timeouts for operations involving restarting +# an MDS or waiting for it to come up +MDS_RESTART_GRACE = 60 + +# Hardcoded values from Server::recall_client_state +CAP_RECALL_RATIO = 0.8 +CAP_RECALL_MIN = 100 + + +def wait_until_equal(get_fn, expect_val, timeout, reject_fn=None): + period = 5 + elapsed = 0 + while True: + val = get_fn() + if val == expect_val: + return + elif reject_fn and reject_fn(val): + raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val)) + else: + if elapsed >= timeout: + raise RuntimeError("Timed out after {0} seconds waiting for {1} (currently {2})".format( + elapsed, expect_val, val + )) + else: + log.debug("wait_until_equal: {0} != {1}, waiting...".format(val, expect_val)) + time.sleep(period) + elapsed += period + + log.debug("wait_until_equal: success") + + +def wait_until_true(condition, timeout): + period = 5 + elapsed = 0 + while True: + if condition(): + return + else: + if elapsed >= timeout: + raise RuntimeError("Timed out after {0} seconds".format(elapsed)) + else: + log.debug("wait_until_equal: waiting...") + time.sleep(period) + elapsed += period + + log.debug("wait_until_equal: success") + + +class TestClientLimits(CephFSTestCase): + # Environment references + mount_a = None + mount_b = None + mds_session_timeout = None + mds_reconnect_timeout = None + ms_max_backoff = None + + def __init__(self, *args, **kwargs): + super(TestClientLimits, self).__init__(*args, **kwargs) + + self.configs_set = set() + + def set_conf(self, subsys, key, value): + self.configs_set.add((subsys, key)) + self.fs.set_ceph_conf(subsys, key, value) + + def setUp(self): + self.fs.mds_restart() + self.mount_a.mount() + self.mount_a.wait_until_mounted() + self.mount_b.mount() + self.mount_b.wait_until_mounted() + + def tearDown(self): + self.fs.clear_firewall() + self.mount_a.teardown() + self.mount_b.teardown() + + for subsys, key in self.configs_set: + self.fs.clear_ceph_conf(subsys, key) + + def wait_for_health(self, pattern, timeout): + """ + Wait until 'ceph health' contains a single message matching the pattern + """ + def seen_health_warning(): + health = self.fs.mon_manager.get_mon_health() + summary_strings = [s['summary'] for s in health['summary']] + if len(summary_strings) == 0: + log.debug("Not expected number of summary strings ({0})".format(summary_strings)) + return False + elif len(summary_strings) == 1 and pattern in summary_strings[0]: + return True + else: + raise RuntimeError("Unexpected health messages: {0}".format(summary_strings)) + + wait_until_true(seen_health_warning, timeout) + + def _test_client_pin(self, use_subdir): + """ + When a client pins an inode in its cache, for example because the file is held open, + it should reject requests from the MDS to trim these caps. The MDS should complain + to the user that it is unable to enforce its cache size limits because of this + objectionable client. + + :param use_subdir: whether to put test files in a subdir or use root + """ + + cache_size = 200 + open_files = 250 + + self.fs.set_ceph_conf('mds', 'mds cache size', cache_size) + self.fs.mds_restart() + + mount_a_client_id = self.mount_a.get_client_id() + path = "subdir/mount_a" if use_subdir else "mount_a" + open_proc = self.mount_a.open_n_background(path, open_files) + + # Client should now hold: + # `open_files` caps for the open files + # 1 cap for root + # 1 cap for subdir + wait_until_equal(lambda: self.get_session(mount_a_client_id)['num_caps'], + open_files + (2 if use_subdir else 1), + timeout=600, + reject_fn=lambda x: x > open_files + 2) + + # MDS should not be happy about that, as the client is failing to comply + # with the SESSION_RECALL messages it is being sent + mds_recall_state_timeout = int(self.fs.get_config("mds_recall_state_timeout")) + self.wait_for_health("failing to respond to cache pressure", mds_recall_state_timeout + 10) + + # When the client closes the files, it should retain only as many caps as allowed + # under the SESSION_RECALL policy + log.info("Terminating process holding files open") + open_proc.stdin.close() + try: + open_proc.wait() + except CommandFailedError: + # We killed it, so it raises an error + pass + + # The remaining caps should comply with the numbers sent from MDS in SESSION_RECALL message, + # which depend on the cache size and overall ratio + wait_until_equal( + lambda: self.get_session(mount_a_client_id)['num_caps'], + int(cache_size * 0.8), + timeout=600, + reject_fn=lambda x: x < int(cache_size*.8)) + + def test_client_pin_root(self): + self._test_client_pin(False) + + def test_client_pin(self): + self._test_client_pin(True) + + def test_client_release_bug(self): + """ + When a client has a bug (which we will simulate) preventing it from releasing caps, + the MDS should notice that releases are not being sent promptly, and generate a health + metric to that effect. + """ + + self.set_conf('client.{0}'.format(self.mount_a.client_id), 'client inject release failure', 'true') + self.mount_a.teardown() + self.mount_a.mount() + self.mount_a.wait_until_mounted() + mount_a_client_id = self.mount_a.get_client_id() + + # Client A creates a file. He will hold the write caps on the file, and later (simulated bug) fail + # to comply with the MDSs request to release that cap + self.mount_a.run_shell(["touch", "file1"]) + + # Client B tries to stat the file that client A created + rproc = self.mount_b.write_background("file1") + + # After mds_revoke_cap_timeout, we should see a health warning (extra lag from + # MDS beacon period) + mds_revoke_cap_timeout = int(self.fs.get_config("mds_revoke_cap_timeout")) + self.wait_for_health("failing to respond to capability release", mds_revoke_cap_timeout + 10) + + # Client B should still be stuck + self.assertFalse(rproc.finished) + + # Kill client A + self.mount_a.kill() + self.mount_a.kill_cleanup() + + # Client B should complete + self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) + rproc.wait() + + +@contextlib.contextmanager +def task(ctx, config): + fs = Filesystem(ctx, config) + + # Pick out the clients we will use from the configuration + # ======================================================= + if len(ctx.mounts) < 2: + raise RuntimeError("Need at least two clients") + mount_a = ctx.mounts.values()[0] + mount_b = ctx.mounts.values()[1] + + if not isinstance(mount_a, FuseMount): + # TODO: make kclient mount capable of all the same test tricks as ceph_fuse + raise RuntimeError("Require FUSE clients") + + # Stash references on ctx so that we can easily debug in interactive mode + # ======================================================================= + ctx.filesystem = fs + ctx.mount_a = mount_a + ctx.mount_b = mount_b + + run_tests(ctx, config, TestClientLimits, { + 'fs': fs, + 'mount_a': mount_a, + 'mount_b': mount_b + }) + + # Continue to any downstream tasks + # ================================ + yield