from io import StringIO from tasks.cephfs.fuse_mount import FuseMount from tasks.cephfs.cephfs_test_case import CephFSTestCase, classhook from teuthology.exceptions import CommandFailedError from textwrap import dedent from threading import Thread import errno import platform import time import json import logging import os import re log = logging.getLogger(__name__) class TestMisc(CephFSTestCase): CLIENTS_REQUIRED = 2 def test_statfs_on_deleted_fs(self): """ That statfs does not cause monitors to SIGSEGV after fs deletion. """ self.mount_b.umount_wait() self.mount_a.run_shell_payload("stat -f .") self.fs.delete_all_filesystems() # This will hang either way, run in background. p = self.mount_a.run_shell_payload("stat -f .", wait=False, timeout=60, check_status=False) time.sleep(30) self.assertFalse(p.finished) # the process is stuck in uninterruptible sleep, just kill the mount self.mount_a.umount_wait(force=True) p.wait() def test_fuse_mount_on_already_mounted_path(self): if platform.system() != "Linux": self.skipTest("Require Linux platform") if not isinstance(self.mount_a, FuseMount): self.skipTest("Require FUSE client") # Try to mount already mounted path # expecting EBUSY error try: mount_cmd = ['sudo'] + self.mount_a._mount_bin + [self.mount_a.hostfs_mntpt] self.mount_a.client_remote.run(args=mount_cmd, stderr=StringIO(), stdout=StringIO(), timeout=60, omit_sudo=False) except CommandFailedError as e: self.assertEqual(e.exitstatus, errno.EBUSY) else: self.fail("Expected EBUSY") def test_getattr_caps(self): """ Check if MDS recognizes the 'mask' parameter of open request. The parameter allows client to request caps when opening file """ if not isinstance(self.mount_a, FuseMount): self.skipTest("Require FUSE client") # Enable debug. Client will requests CEPH_CAP_XATTR_SHARED # on lookup/open self.mount_b.umount_wait() self.set_conf('client', 'client debug getattr caps', 'true') self.mount_b.mount_wait() # create a file and hold it open. MDS will issue CEPH_CAP_EXCL_* # to mount_a self.mount_a.open_background("testfile") self.mount_b.wait_for_visible("testfile") # this triggers a lookup request and an open request. The debug # code will check if lookup/open reply contains xattrs self.mount_b.run_shell(["cat", "testfile"]) def test_root_rctime(self): """ Check that the root inode has a non-default rctime on startup. """ t = time.time() rctime = self.mount_a.getfattr(".", "ceph.dir.rctime") log.info("rctime = {}".format(rctime)) self.assertGreaterEqual(float(rctime), t - 10) def test_fs_new(self): self.mount_a.umount_wait() self.mount_b.umount_wait() data_pool_name = self.fs.get_data_pool_name() self.fs.fail() self.run_ceph_cmd('fs', 'rm', self.fs.name, '--yes-i-really-mean-it') self.run_ceph_cmd('osd', 'pool', 'delete', self.fs.metadata_pool_name, self.fs.metadata_pool_name, '--yes-i-really-really-mean-it') self.run_ceph_cmd('osd', 'pool', 'create', self.fs.metadata_pool_name, '--pg_num_min', str(self.fs.pg_num_min)) # insert a garbage object self.fs.radosm(["put", "foo", "-"], stdin=StringIO("bar")) def get_pool_df(fs, name): try: return fs.get_pool_df(name)['objects'] > 0 except RuntimeError: return False self.wait_until_true(lambda: get_pool_df(self.fs, self.fs.metadata_pool_name), timeout=30) try: self.run_ceph_cmd('fs', 'new', self.fs.name, self.fs.metadata_pool_name, data_pool_name) except CommandFailedError as e: self.assertEqual(e.exitstatus, errno.EINVAL) else: raise AssertionError("Expected EINVAL") self.run_ceph_cmd('fs', 'new', self.fs.name, self.fs.metadata_pool_name, data_pool_name, "--force") self.run_ceph_cmd('fs', 'fail', self.fs.name) self.run_ceph_cmd('fs', 'rm', self.fs.name, '--yes-i-really-mean-it') self.run_ceph_cmd('osd', 'pool', 'delete', self.fs.metadata_pool_name, self.fs.metadata_pool_name, '--yes-i-really-really-mean-it') self.run_ceph_cmd('osd', 'pool', 'create', self.fs.metadata_pool_name, '--pg_num_min', str(self.fs.pg_num_min)) self.run_ceph_cmd('fs', 'new', self.fs.name, self.fs.metadata_pool_name, data_pool_name, '--allow_dangerous_metadata_overlay') def test_cap_revoke_nonresponder(self): """ Check that a client is evicted if it has not responded to cap revoke request for configured number of seconds. """ session_timeout = self.fs.get_var("session_timeout") eviction_timeout = session_timeout / 2.0 self.fs.mds_asok(['config', 'set', 'mds_cap_revoke_eviction_timeout', str(eviction_timeout)]) cap_holder = self.mount_a.open_background() # Wait for the file to be visible from another client, indicating # that mount_a has completed its network ops self.mount_b.wait_for_visible() # Simulate client death self.mount_a.suspend_netns() try: # The waiter should get stuck waiting for the capability # held on the MDS by the now-dead client A cap_waiter = self.mount_b.write_background() a = time.time() time.sleep(eviction_timeout) cap_waiter.wait() b = time.time() cap_waited = b - a log.info("cap_waiter waited {0}s".format(cap_waited)) # check if the cap is transferred before session timeout kicked in. # this is a good enough check to ensure that the client got evicted # by the cap auto evicter rather than transitioning to stale state # and then getting evicted. self.assertLess(cap_waited, session_timeout, "Capability handover took {0}, expected less than {1}".format( cap_waited, session_timeout )) self.assertTrue(self.mds_cluster.is_addr_blocklisted( self.mount_a.get_global_addr())) self.mount_a._kill_background(cap_holder) finally: self.mount_a.resume_netns() def test_filtered_df(self): pool_name = self.fs.get_data_pool_name() raw_df = self.fs.get_pool_df(pool_name) raw_avail = float(raw_df["max_avail"]) out = self.get_ceph_cmd_stdout('osd', 'pool', 'get', pool_name, 'size', '-f', 'json-pretty') _ = json.loads(out) proc = self.mount_a.run_shell(['df', '.']) output = proc.stdout.getvalue() fs_avail = output.split('\n')[1].split()[3] fs_avail = float(fs_avail) * 1024 ratio = raw_avail / fs_avail self.assertTrue(0.9 < ratio < 1.1) def test_dump_inode(self): info = self.fs.mds_asok(['dump', 'inode', '1']) self.assertEqual(info['path'], "/") def test_dump_inode_hexademical(self): self.mount_a.run_shell(["mkdir", "-p", "foo"]) ino = self.mount_a.path_to_ino("foo") self.assertTrue(type(ino) is int) info = self.fs.mds_asok(['dump', 'inode', hex(ino)]) self.assertEqual(info['path'], "/foo") def test_dump_dir(self): self.mount_a.run_shell(["mkdir", "-p", "foo/bar"]) dirs = self.fs.mds_asok(['dump', 'dir', '/foo']) self.assertTrue(type(dirs) is list) for dir in dirs: self.assertEqual(dir['path'], "/foo") self.assertFalse("dentries" in dir) dirs = self.fs.mds_asok(['dump', 'dir', '/foo', '--dentry_dump']) self.assertTrue(type(dirs) is list) found_dentry = False for dir in dirs: self.assertEqual(dir['path'], "/foo") self.assertTrue(type(dir['dentries']) is list) if found_dentry: continue for dentry in dir['dentries']: if dentry['path'] == "foo/bar": found_dentry = True break self.assertTrue(found_dentry) def test_fs_lsflags(self): """ Check that the lsflags displays the default state and the new state of flags """ # Set some flags self.fs.set_joinable(False) self.fs.set_allow_new_snaps(False) self.fs.set_allow_standby_replay(True) lsflags = json.loads(self.get_ceph_cmd_stdout( 'fs', 'lsflags', self.fs.name, "--format=json-pretty")) self.assertEqual(lsflags["joinable"], False) self.assertEqual(lsflags["allow_snaps"], False) self.assertEqual(lsflags["allow_multimds_snaps"], True) self.assertEqual(lsflags["allow_standby_replay"], True) def _test_sync_stuck_for_around_5s(self, dir_path, file_sync=False): self.mount_a.run_shell(["mkdir", dir_path]) sync_dir_pyscript = dedent(""" import os path = "{path}" dfd = os.open(path, os.O_DIRECTORY) os.fsync(dfd) os.close(dfd) """.format(path=dir_path)) # run create/delete directories and test the sync time duration for i in range(300): for j in range(5): self.mount_a.run_shell(["mkdir", os.path.join(dir_path, f"{i}_{j}")]) start = time.time() if file_sync: self.mount_a.run_shell(['python3', '-c', sync_dir_pyscript], timeout=4) else: self.mount_a.run_shell(["sync"], timeout=4) # the real duration should be less than the rough one duration = time.time() - start log.info(f"sync mkdir i = {i}, rough duration = {duration}") for j in range(5): self.mount_a.run_shell(["rm", "-rf", os.path.join(dir_path, f"{i}_{j}")]) start = time.time() if file_sync: self.mount_a.run_shell(['python3', '-c', sync_dir_pyscript], timeout=4) else: self.mount_a.run_shell(["sync"], timeout=4) # the real duration should be less than the rough one duration = time.time() - start log.info(f"sync rmdir i = {i}, rough duration = {duration}") self.mount_a.run_shell(["rm", "-rf", dir_path]) def test_filesystem_sync_stuck_for_around_5s(self): """ To check whether the filesystem sync will be stuck to wait for the mdlog to be flushed for at most 5 seconds. """ dir_path = "filesystem_sync_do_not_wait_mdlog_testdir" self._test_sync_stuck_for_around_5s(dir_path) def test_file_sync_stuck_for_around_5s(self): """ To check whether the fsync will be stuck to wait for the mdlog to be flushed for at most 5 seconds. """ dir_path = "file_sync_do_not_wait_mdlog_testdir" self._test_sync_stuck_for_around_5s(dir_path, True) def test_file_filesystem_sync_crash(self): """ To check whether the kernel crashes when doing the file/filesystem sync. """ stop_thread = False dir_path = "file_filesystem_sync_crash_testdir" self.mount_a.run_shell(["mkdir", dir_path]) def mkdir_rmdir_thread(mount, path): #global stop_thread log.info(" mkdir_rmdir_thread starting...") num = 0 while not stop_thread: n = num m = num for __ in range(10): mount.run_shell(["mkdir", os.path.join(path, f"{n}")]) n += 1 for __ in range(10): mount.run_shell(["rm", "-rf", os.path.join(path, f"{m}")]) m += 1 num += 10 log.info(" mkdir_rmdir_thread stopped") def filesystem_sync_thread(mount, path): #global stop_thread log.info(" filesystem_sync_thread starting...") while not stop_thread: mount.run_shell(["sync"]) log.info(" filesystem_sync_thread stopped") def file_sync_thread(mount, path): #global stop_thread log.info(" file_sync_thread starting...") pyscript = dedent(""" import os path = "{path}" dfd = os.open(path, os.O_DIRECTORY) os.fsync(dfd) os.close(dfd) """.format(path=path)) while not stop_thread: mount.run_shell(['python3', '-c', pyscript]) log.info(" file_sync_thread stopped") td1 = Thread(target=mkdir_rmdir_thread, args=(self.mount_a, dir_path,)) td2 = Thread(target=filesystem_sync_thread, args=(self.mount_a, dir_path,)) td3 = Thread(target=file_sync_thread, args=(self.mount_a, dir_path,)) td1.start() td2.start() td3.start() time.sleep(1200) # run 20 minutes stop_thread = True td1.join() td2.join() td3.join() self.mount_a.run_shell(["rm", "-rf", dir_path]) def test_dump_inmemory_log_on_client_eviction(self): """ That the in-memory logs are dumped during a client eviction event. """ self.fs.mds_asok(['config', 'set', 'debug_mds', '1/10']) self.fs.mds_asok(['config', 'set', 'mds_extraordinary_events_dump_interval', '1']) mount_a_client_id = self.mount_a.get_global_id() infos = self.fs.status().get_ranks(self.fs.id) #evict the client self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) time.sleep(10) #wait for 10 seconds for the logs dumping to complete. #The client is evicted, so unmount it. try: self.mount_a.umount_wait(require_clean=True, timeout=30) except: pass #continue with grepping the log eviction_log = f"Evicting (\(and blocklisting\) )?client session {mount_a_client_id} \(.+:.+/.+\)" search_range = "/^--- begin dump of recent events ---$/,/^--- end dump of recent events ---$/p" for info in infos: mds_id = info['name'] try: remote = self.fs.mon_manager.find_remote('mds', mds_id) out = remote.run(args=["sed", "-n", "{0}".format(search_range), f"/var/log/ceph/{self.mount_a.cluster_name}-mds.{mds_id}.log"], stdout=StringIO(), timeout=30) except: continue #continue with the next info if out.stdout and re.search(eviction_log, out.stdout.getvalue().strip()): return self.assertTrue(False, "Failed to dump in-memory logs during client eviction") def test_dump_inmemory_log_on_missed_beacon_ack_from_monitors(self): """ That the in-memory logs are dumped when the mds misses beacon ACKs from monitors. """ self.fs.mds_asok(['config', 'set', 'debug_mds', '1/10']) self.fs.mds_asok(['config', 'set', 'mds_extraordinary_events_dump_interval', '1']) try: mons = json.loads(self.get_ceph_cmd_stdout('mon', 'dump', '-f', 'json'))['mons'] except: self.assertTrue(False, "Error fetching monitors") #Freeze all monitors for mon in mons: mon_name = mon['name'] log.info(f'Sending STOP to mon {mon_name}') self.fs.mon_manager.signal_mon(mon_name, 19) time.sleep(10) #wait for 10 seconds to get the in-memory logs dumped #Unfreeze all monitors for mon in mons: mon_name = mon['name'] log.info(f'Sending CONT to mon {mon_name}') self.fs.mon_manager.signal_mon(mon_name, 18) missed_beacon_ack_log = "missed beacon ack from the monitors" search_range = "/^--- begin dump of recent events ---$/,/^--- end dump of recent events ---$/p" for info in self.fs.status().get_ranks(self.fs.id): mds_id = info['name'] try: remote = self.fs.mon_manager.find_remote('mds', mds_id) out = remote.run(args=["sed", "-n", "{0}".format(search_range), f"/var/log/ceph/{self.mount_a.cluster_name}-mds.{mds_id}.log"], stdout=StringIO(), timeout=30) except: continue #continue with the next info if out.stdout and (missed_beacon_ack_log in out.stdout.getvalue().strip()): return self.assertTrue(False, "Failed to dump in-memory logs during missed beacon ack") def test_dump_inmemory_log_on_missed_internal_heartbeats(self): """ That the in-memory logs are dumped when the mds misses internal heartbeats. """ self.fs.mds_asok(['config', 'set', 'debug_mds', '1/10']) self.fs.mds_asok(['config', 'set', 'mds_heartbeat_grace', '1']) self.fs.mds_asok(['config', 'set', 'mds_extraordinary_events_dump_interval', '1']) try: mons = json.loads(self.get_ceph_cmd_stdout('mon', 'dump', '-f', 'json'))['mons'] except: self.assertTrue(False, "Error fetching monitors") #Freeze all monitors for mon in mons: mon_name = mon['name'] log.info(f'Sending STOP to mon {mon_name}') self.fs.mon_manager.signal_mon(mon_name, 19) time.sleep(10) #wait for 10 seconds to get the in-memory logs dumped #Unfreeze all monitors for mon in mons: mon_name = mon['name'] log.info(f'Sending CONT to mon {mon_name}') self.fs.mon_manager.signal_mon(mon_name, 18) missed_internal_heartbeat_log = \ "Skipping beacon heartbeat to monitors \(last acked .+s ago\); MDS internal heartbeat is not healthy!" search_range = "/^--- begin dump of recent events ---$/,/^--- end dump of recent events ---$/p" for info in self.fs.status().get_ranks(self.fs.id): mds_id = info['name'] try: remote = self.fs.mon_manager.find_remote('mds', mds_id) out = remote.run(args=["sed", "-n", "{0}".format(search_range), f"/var/log/ceph/{self.mount_a.cluster_name}-mds.{mds_id}.log"], stdout=StringIO(), timeout=30) except: continue #continue with the next info if out.stdout and re.search(missed_internal_heartbeat_log, out.stdout.getvalue().strip()): return self.assertTrue(False, "Failed to dump in-memory logs during missed internal heartbeat") def _session_client_ls(self, cmd): mount_a_client_id = self.mount_a.get_global_id() info = self.fs.rank_asok(cmd) mount_a_mountpoint = self.mount_a.mountpoint mount_b_mountpoint = self.mount_b.mountpoint self.assertIsNotNone(info) for i in range(0, len(info)): self.assertIn(info[i]["client_metadata"]["mount_point"], [mount_a_mountpoint, mount_b_mountpoint]) info = self.fs.rank_asok(cmd + [f"id={mount_a_client_id}"]) self.assertEqual(len(info), 1) self.assertEqual(info[0]["id"], mount_a_client_id) self.assertEqual(info[0]["client_metadata"]["mount_point"], mount_a_mountpoint) info = self.fs.rank_asok(cmd + ['--cap_dump']) for i in range(0, len(info)): self.assertIn("caps", info[i]) def test_session_ls(self): self._session_client_ls(['session', 'ls']) def test_client_ls(self): self._session_client_ls(['client', 'ls']) def test_ceph_tell_for_unknown_cephname_type(self): with self.assertRaises(CommandFailedError) as ce: self.run_ceph_cmd('tell', 'cephfs.c', 'something') self.assertEqual(ce.exception.exitstatus, 1) @classhook('_add_session_client_evictions') class TestSessionClientEvict(CephFSTestCase): CLIENTS_REQUIRED = 3 def _evict_without_filter(self, cmd): info_initial = self.fs.rank_asok(cmd + ['ls']) # without any filter or flags with self.assertRaises(CommandFailedError) as ce: self.fs.rank_asok(cmd + ['evict']) self.assertEqual(ce.exception.exitstatus, errno.EINVAL) # without any filter but with existing flag with self.assertRaises(CommandFailedError) as ce: self.fs.rank_asok(cmd + ['evict', '--help']) self.assertEqual(ce.exception.exitstatus, errno.EINVAL) info = self.fs.rank_asok(cmd + ['ls']) self.assertEqual(len(info), len(info_initial)) # without any filter but with non-existing flag with self.assertRaises(CommandFailedError) as ce: self.fs.rank_asok(cmd + ['evict', '--foo']) self.assertEqual(ce.exception.exitstatus, errno.EINVAL) info = self.fs.rank_asok(cmd + ['ls']) self.assertEqual(len(info), len(info_initial)) def _evict_with_id_zero(self, cmd): # with id=0 with self.assertRaises(CommandFailedError) as ce: self.fs.rank_tell(cmd + ['evict', 'id=0']) self.assertEqual(ce.exception.exitstatus, errno.EINVAL) def _evict_with_invalid_id(self, cmd): info_initial = self.fs.rank_asok(cmd + ['ls']) # with invalid id self.fs.rank_tell(cmd + ['evict', 'id=1']) info = self.fs.rank_asok(cmd + ['ls']) self.assertEqual(len(info), len(info_initial)) # session list is status-quo def _evict_with_negative_id(self, cmd): info_initial = self.fs.rank_asok(cmd + ['ls']) # with negative id self.fs.rank_tell(cmd + ['evict', 'id=-9']) info = self.fs.rank_asok(cmd + ['ls']) self.assertEqual(len(info), len(info_initial)) # session list is status-quo def _evict_with_valid_id(self, cmd): info_initial = self.fs.rank_asok(cmd + ['ls']) mount_a_client_id = self.mount_a.get_global_id() # with a valid id self.fs.rank_asok(cmd + ['evict', f'id={mount_a_client_id}']) info = self.fs.rank_asok(cmd + ['ls']) self.assertEqual(len(info), len(info_initial) - 1) # client with id provided is evicted self.assertNotIn(mount_a_client_id, [val['id'] for val in info]) def _evict_all_clients(self, cmd): # with id=* to evict all clients info = self.fs.rank_asok(cmd + ['ls']) self.assertGreater(len(info), 0) self.fs.rank_asok(cmd + ['evict', 'id=*']) info = self.fs.rank_asok(cmd + ['ls']) self.assertEqual(len(info), 0) # multiple clients are evicted @classmethod def _add_session_client_evictions(cls): tests = [ "_evict_without_filter", "_evict_with_id_zero", "_evict_with_invalid_id", "_evict_with_negative_id", "_evict_with_valid_id", "_evict_all_clients", ] def create_test(t, cmd): def test(self): getattr(self, t)(cmd) return test for t in tests: setattr(cls, 'test_session' + t, create_test(t, ['session'])) setattr(cls, 'test_client' + t, create_test(t, ['client'])) class TestCacheDrop(CephFSTestCase): CLIENTS_REQUIRED = 1 def _run_drop_cache_cmd(self, timeout=None): result = None args = ["cache", "drop"] if timeout is not None: args.append(str(timeout)) result = self.fs.rank_tell(args) return result def _setup(self, max_caps=20, threshold=400): # create some files self.mount_a.create_n_files("dc-dir/dc-file", 1000, sync=True) # Reduce this so the MDS doesn't rkcall the maximum for simple tests self.fs.rank_asok(['config', 'set', 'mds_recall_max_caps', str(max_caps)]) self.fs.rank_asok(['config', 'set', 'mds_recall_max_decay_threshold', str(threshold)]) def test_drop_cache_command(self): """ Basic test for checking drop cache command. Confirm it halts without a timeout. Note that the cache size post trimming is not checked here. """ mds_min_caps_per_client = int(self.fs.get_config("mds_min_caps_per_client")) self._setup() result = self._run_drop_cache_cmd() self.assertEqual(result['client_recall']['return_code'], 0) self.assertEqual(result['flush_journal']['return_code'], 0) # It should take at least 1 second self.assertGreater(result['duration'], 1) self.assertGreaterEqual(result['trim_cache']['trimmed'], 1000-2*mds_min_caps_per_client) def test_drop_cache_command_timeout(self): """ Basic test for checking drop cache command. Confirm recall halts early via a timeout. Note that the cache size post trimming is not checked here. """ self._setup() result = self._run_drop_cache_cmd(timeout=10) self.assertEqual(result['client_recall']['return_code'], -errno.ETIMEDOUT) self.assertEqual(result['flush_journal']['return_code'], 0) self.assertGreater(result['duration'], 10) self.assertGreaterEqual(result['trim_cache']['trimmed'], 100) # we did something, right? def test_drop_cache_command_dead_timeout(self): """ Check drop cache command with non-responding client using tell interface. Note that the cache size post trimming is not checked here. """ self._setup() self.mount_a.suspend_netns() # Note: recall is subject to the timeout. The journal flush will # be delayed due to the client being dead. result = self._run_drop_cache_cmd(timeout=5) self.assertEqual(result['client_recall']['return_code'], -errno.ETIMEDOUT) self.assertEqual(result['flush_journal']['return_code'], 0) self.assertGreater(result['duration'], 5) self.assertLess(result['duration'], 120) # Note: result['trim_cache']['trimmed'] may be >0 because dropping the # cache now causes the Locker to drive eviction of stale clients (a # stale session will be autoclosed at mdsmap['session_timeout']). The # particular operation causing this is journal flush which causes the # MDS to wait wait for cap revoke. #self.assertEqual(0, result['trim_cache']['trimmed']) self.mount_a.resume_netns() def test_drop_cache_command_dead(self): """ Check drop cache command with non-responding client using tell interface. Note that the cache size post trimming is not checked here. """ self._setup() self.mount_a.suspend_netns() result = self._run_drop_cache_cmd() self.assertEqual(result['client_recall']['return_code'], 0) self.assertEqual(result['flush_journal']['return_code'], 0) self.assertGreater(result['duration'], 5) self.assertLess(result['duration'], 120) # Note: result['trim_cache']['trimmed'] may be >0 because dropping the # cache now causes the Locker to drive eviction of stale clients (a # stale session will be autoclosed at mdsmap['session_timeout']). The # particular operation causing this is journal flush which causes the # MDS to wait wait for cap revoke. self.mount_a.resume_netns() class TestSkipReplayInoTable(CephFSTestCase): MDSS_REQUIRED = 1 CLIENTS_REQUIRED = 1 def test_alloc_cinode_assert(self): """ Test alloc CInode assert. See: https://tracker.ceph.com/issues/52280 """ # Create a directory and the mds will journal this and then crash self.mount_a.run_shell(["rm", "-rf", "test_alloc_ino"]) self.mount_a.run_shell(["mkdir", "test_alloc_ino"]) status = self.fs.status() rank0 = self.fs.get_rank(rank=0, status=status) self.fs.mds_asok(['config', 'set', 'mds_kill_after_journal_logs_flushed', "true"]) # This will make the MDS crash, since we only have one MDS in the # cluster and without the "wait=False" it will stuck here forever. self.mount_a.run_shell(["mkdir", "test_alloc_ino/dir1"], wait=False) # sleep 10 seconds to make sure the journal logs are flushed and # the mds crashes time.sleep(10) # Now set the mds config to skip replaying the inotable self.fs.set_ceph_conf('mds', 'mds_inject_skip_replaying_inotable', True) self.fs.set_ceph_conf('mds', 'mds_wipe_sessions', True) self.fs.mds_restart() # sleep 5 seconds to make sure the mds tell command won't stuck time.sleep(5) self.fs.wait_for_daemons() self.delete_mds_coredump(rank0['name']); self.mount_a.run_shell(["mkdir", "test_alloc_ino/dir2"]) ls_out = set(self.mount_a.ls("test_alloc_ino/")) self.assertEqual(ls_out, set({"dir1", "dir2"})) class TestNewFSCreation(CephFSTestCase): MDSS_REQUIRED = 1 TEST_FS = "test_fs" TEST_FS1 = "test_fs1" def test_fs_creation_valid_ops(self): """ Test setting fs ops with CLI command `ceph fs new`. """ fs_ops = [["max_mds", "3"], ["refuse_client_session", "true"], ["allow_new_snaps", "true", "max_file_size", "65536"], ["session_timeout", "234", "session_autoclose", "100", "max_xattr_size", "150"]] for fs_ops_list in fs_ops: test_fs = None try: test_fs = self.mds_cluster.newfs(name=self.TEST_FS, create=True, fs_ops=fs_ops_list) for i in range(0, len(fs_ops_list), 2): # edge case: for option `allow_new_snaps`, the flag name # is `allow_snaps` in mdsmap if fs_ops_list[i] == "allow_new_snaps": fs_ops_list[i] = "allow_snaps" fs_op_val = str(test_fs.get_var_from_fs( self.TEST_FS, fs_ops_list[i])).lower() self.assertEqual(fs_op_val, fs_ops_list[i+1]) finally: if test_fs is not None: test_fs.destroy() def test_fs_creation_invalid_ops(self): """ Test setting invalid fs ops with CLI command `ceph fs new`. """ invalid_fs_ops = {("inline_data", "true"): errno.EPERM, ("session_timeout", "3"): errno.ERANGE, ("session_autoclose", "foo"): errno.EINVAL, ("max_mds", "-1"): errno.EINVAL, ("bal_rank_mask", ""): errno.EINVAL, ("foo", "2"): errno.EINVAL, ("", ""): errno.EINVAL, ("session_timeout", "180", "", "3"): errno.EINVAL, ("allow_new_snaps", "true", "max_mddds", "3"): errno.EINVAL, ("allow_new_snapsss", "true", "max_mds", "3"): errno.EINVAL, ("session_timeout", "20", "max_mddds", "3"): errno.ERANGE} for invalid_op_list, expected_errno in invalid_fs_ops.items(): test_fs = None try: test_fs = self.mds_cluster.newfs(name=self.TEST_FS, create=True, fs_ops=invalid_op_list) except CommandFailedError as e: self.assertEqual(e.exitstatus, expected_errno) else: self.fail(f"Expected {expected_errno}") finally: if test_fs is not None: test_fs.destroy() def test_fs_creation_incomplete_args(self): """ Test sending incomplete key-val pair of fs ops. """ invalid_args_fs_ops = [["max_mds"], ["max_mds", "2", "3"], [""]] for incomplete_args in invalid_args_fs_ops: test_fs = None try: test_fs = self.mds_cluster.newfs(name=self.TEST_FS, create=True, fs_ops=incomplete_args) except CommandFailedError as e: self.assertEqual(e.exitstatus, errno.EINVAL) else: self.fail("Expected EINVAL") finally: if test_fs is not None: test_fs.destroy() def test_endure_fs_fields_post_failure(self): """ Test fields like epoch and legacy_client_fscid should not change after fs creation failure. """ initial_epoch_ = self.mds_cluster.status()["epoch"] initial_default_fscid = self.mds_cluster.status()["default_fscid"] test_fs = None try: test_fs = self.mds_cluster.newfs(name=self.TEST_FS, create=True, fs_ops=["foo"]) except CommandFailedError as e: self.assertEqual(e.exitstatus, errno.EINVAL) self.assertEqual(initial_epoch_, self.mds_cluster.status()["epoch"]) self.assertEqual(initial_default_fscid, self.mds_cluster.status()["default_fscid"]) else: self.fail("Expected EINVAL") finally: if test_fs is not None: test_fs.destroy() def test_yes_i_really_really_mean_it(self): """ --yes-i-really-really-mean-it can be used while creating fs with CLI command `ceph fs new`, test fs creation succeeds. """ test_fs = None try: test_fs = self.mds_cluster.newfs(name=self.TEST_FS, create=True, yes_i_really_really_mean_it=True) self.assertTrue(test_fs.exists()) finally: if test_fs is not None: test_fs.destroy() def test_inline_data(self): """ inline_data needs --yes-i-really-really-mean-it to get it enabled. Test fs creation by with/without providing it. NOTE: inline_data is deprecated, this test case would be removed in the future. """ test_fs = None try: test_fs = self.mds_cluster.newfs(name=self.TEST_FS, create=True, fs_ops=["inline_data", "true"]) except CommandFailedError as e: self.assertEqual(e.exitstatus, errno.EPERM) test_fs = self.mds_cluster.newfs(name=self.TEST_FS, create=True, fs_ops=["inline_data", "true"], yes_i_really_really_mean_it=True) self.assertIn("mds uses inline data", str(test_fs.status())) else: self.fail("Expected EPERM") finally: if test_fs is not None: test_fs.destroy() def test_no_fs_id_incr_on_fs_creation_fail(self): """ Failure while creating fs due to error in setting fs ops will keep on incrementing `next_filesystem_id`, test its value is preserved and rolled back in case fs creation fails. """ test_fs, test_fs1 = None, None try: test_fs = self.mds_cluster.newfs(name=self.TEST_FS, create=True) for _ in range(5): try: self.mds_cluster.newfs(name=self.TEST_FS1, create=True, fs_ops=["max_mdss", "2"]) except CommandFailedError as e: self.assertEqual(e.exitstatus, errno.EINVAL) test_fs1 = self.mds_cluster.newfs(name=self.TEST_FS1, create=True, fs_ops=["max_mds", "2"]) test_fs_id, test_fs1_id = None, None for fs in self.mds_cluster.status().get_filesystems(): if fs["mdsmap"]["fs_name"] == self.TEST_FS: test_fs_id = fs["id"] if fs["mdsmap"]["fs_name"] == self.TEST_FS1: test_fs1_id = fs["id"] self.assertEqual(test_fs_id, test_fs1_id - 1) finally: if test_fs is not None: test_fs.destroy() if test_fs1 is not None: test_fs1.destroy()