import json from io import StringIO from os.path import join as os_path_join from teuthology.orchestra.run import CommandFailedError from tasks.cephfs.cephfs_test_case import CephFSTestCase from tasks.cephfs.filesystem import FileLayout from tasks.cephfs.fuse_mount import FuseMount from tasks.cephfs.caps_helper import CapsHelper class TestAdminCommands(CephFSTestCase): """ Tests for administration command. """ CLIENTS_REQUIRED = 1 MDSS_REQUIRED = 1 def test_fsnames_can_only_by_goodchars(self): n = 'test_fsnames_can_only_by_goodchars' metapoolname, datapoolname = n+'-testmetapool', n+'-testdatapool' badname = n+'badname@#' self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', n+metapoolname) self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', n+datapoolname) # test that fsname not with "goodchars" fails args = ['fs', 'new', badname, metapoolname, datapoolname] proc = self.fs.mon_manager.run_cluster_cmd(args=args,stderr=StringIO(), check_status=False) self.assertIn('invalid chars', proc.stderr.getvalue().lower()) self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'rm', metapoolname, metapoolname, '--yes-i-really-really-mean-it-not-faking') self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'rm', datapoolname, datapoolname, '--yes-i-really-really-mean-it-not-faking') def test_fs_status(self): """ That `ceph fs status` command functions. """ s = self.fs.mon_manager.raw_cluster_cmd("fs", "status") self.assertTrue("active" in s) mdsmap = json.loads(self.fs.mon_manager.raw_cluster_cmd("fs", "status", "--format=json-pretty"))["mdsmap"] self.assertEqual(mdsmap[0]["state"], "active") mdsmap = json.loads(self.fs.mon_manager.raw_cluster_cmd("fs", "status", "--format=json"))["mdsmap"] self.assertEqual(mdsmap[0]["state"], "active") def _setup_ec_pools(self, n, metadata=True, overwrites=True): if metadata: self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', n+"-meta", "8") cmd = ['osd', 'erasure-code-profile', 'set', n+"-profile", "m=2", "k=2", "crush-failure-domain=osd"] self.fs.mon_manager.raw_cluster_cmd(*cmd) self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', n+"-data", "8", "erasure", n+"-profile") if overwrites: self.fs.mon_manager.raw_cluster_cmd('osd', 'pool', 'set', n+"-data", 'allow_ec_overwrites', 'true') def _check_pool_application_metadata_key_value(self, pool, app, key, value): output = self.fs.mon_manager.raw_cluster_cmd( 'osd', 'pool', 'application', 'get', pool, app, key) self.assertEqual(str(output.strip()), value) def test_add_data_pool_root(self): """ That a new data pool can be added and used for the root directory. """ p = self.fs.add_data_pool("foo") self.fs.set_dir_layout(self.mount_a, ".", FileLayout(pool=p)) def test_add_data_pool_application_metadata(self): """ That the application metadata set on a newly added data pool is as expected. """ pool_name = "foo" mon_cmd = self.fs.mon_manager.raw_cluster_cmd mon_cmd('osd', 'pool', 'create', pool_name, str(self.fs.pgs_per_fs_pool)) # Check whether https://tracker.ceph.com/issues/43061 is fixed mon_cmd('osd', 'pool', 'application', 'enable', pool_name, 'cephfs') self.fs.add_data_pool(pool_name, create=False) self._check_pool_application_metadata_key_value( pool_name, 'cephfs', 'data', self.fs.name) def test_add_data_pool_subdir(self): """ That a new data pool can be added and used for a sub-directory. """ p = self.fs.add_data_pool("foo") self.mount_a.run_shell("mkdir subdir") self.fs.set_dir_layout(self.mount_a, "subdir", FileLayout(pool=p)) def test_add_data_pool_non_alphamueric_name_as_subdir(self): """ That a new data pool with non-alphanumeric name can be added and used for a sub-directory. """ p = self.fs.add_data_pool("I-am-data_pool00.") self.mount_a.run_shell("mkdir subdir") self.fs.set_dir_layout(self.mount_a, "subdir", FileLayout(pool=p)) def test_add_data_pool_ec(self): """ That a new EC data pool can be added. """ n = "test_add_data_pool_ec" self._setup_ec_pools(n, metadata=False) self.fs.add_data_pool(n+"-data", create=False) def test_new_default_ec(self): """ That a new file system warns/fails with an EC default data pool. """ self.mds_cluster.delete_all_filesystems() n = "test_new_default_ec" self._setup_ec_pools(n) try: self.fs.mon_manager.raw_cluster_cmd('fs', 'new', n, n+"-meta", n+"-data") except CommandFailedError as e: if e.exitstatus == 22: pass else: raise else: raise RuntimeError("expected failure") def test_new_default_ec_force(self): """ That a new file system succeeds with an EC default data pool with --force. """ self.mds_cluster.delete_all_filesystems() n = "test_new_default_ec_force" self._setup_ec_pools(n) self.fs.mon_manager.raw_cluster_cmd('fs', 'new', n, n+"-meta", n+"-data", "--force") def test_new_default_ec_no_overwrite(self): """ That a new file system fails with an EC default data pool without overwrite. """ self.mds_cluster.delete_all_filesystems() n = "test_new_default_ec_no_overwrite" self._setup_ec_pools(n, overwrites=False) try: self.fs.mon_manager.raw_cluster_cmd('fs', 'new', n, n+"-meta", n+"-data") except CommandFailedError as e: if e.exitstatus == 22: pass else: raise else: raise RuntimeError("expected failure") # and even with --force ! try: self.fs.mon_manager.raw_cluster_cmd('fs', 'new', n, n+"-meta", n+"-data", "--force") except CommandFailedError as e: if e.exitstatus == 22: pass else: raise else: raise RuntimeError("expected failure") def test_fs_new_pool_application_metadata(self): """ That the application metadata set on the pools of a newly created filesystem are as expected. """ self.mds_cluster.delete_all_filesystems() fs_name = "test_fs_new_pool_application" keys = ['metadata', 'data'] pool_names = [fs_name+'-'+key for key in keys] mon_cmd = self.fs.mon_manager.raw_cluster_cmd for p in pool_names: mon_cmd('osd', 'pool', 'create', p, str(self.fs.pgs_per_fs_pool)) mon_cmd('osd', 'pool', 'application', 'enable', p, 'cephfs') mon_cmd('fs', 'new', fs_name, pool_names[0], pool_names[1]) for i in range(2): self._check_pool_application_metadata_key_value( pool_names[i], 'cephfs', keys[i], fs_name) def test_required_client_features(self): """ That `ceph fs required_client_features` command functions. """ def is_required(index): out = self.fs.mon_manager.raw_cluster_cmd('fs', 'get', self.fs.name, '--format=json-pretty') features = json.loads(out)['mdsmap']['required_client_features'] if "feature_{0}".format(index) in features: return True; return False; features = json.loads(self.fs.mon_manager.raw_cluster_cmd('fs', 'feature', 'ls', '--format=json-pretty')) self.assertGreater(len(features), 0); for f in features: self.fs.mon_manager.raw_cluster_cmd('fs', 'required_client_features', self.fs.name, 'rm', str(f['index'])) for f in features: index = f['index'] feature = f['name'] if feature == 'reserved': feature = str(index) if index % 3 == 0: continue; self.fs.mon_manager.raw_cluster_cmd('fs', 'required_client_features', self.fs.name, 'add', feature) self.assertTrue(is_required(index)) if index % 2 == 0: continue; self.fs.mon_manager.raw_cluster_cmd('fs', 'required_client_features', self.fs.name, 'rm', feature) self.assertFalse(is_required(index)) class TestConfigCommands(CephFSTestCase): """ Test that daemons and clients respond to the otherwise rarely-used runtime config modification operations. """ CLIENTS_REQUIRED = 1 MDSS_REQUIRED = 1 def test_ceph_config_show(self): """ That I can successfully show MDS configuration. """ names = self.fs.get_rank_names() for n in names: s = self.fs.mon_manager.raw_cluster_cmd("config", "show", "mds."+n) self.assertTrue("NAME" in s) self.assertTrue("mon_host" in s) def test_client_config(self): """ That I can successfully issue asok "config set" commands :return: """ if not isinstance(self.mount_a, FuseMount): self.skipTest("Test only applies to FUSE clients") test_key = "client_cache_size" test_val = "123" self.mount_a.admin_socket(['config', 'set', test_key, test_val]) out = self.mount_a.admin_socket(['config', 'get', test_key]) self.assertEqual(out[test_key], test_val) self.mount_a.write_n_mb("file.bin", 1); # Implicitly asserting that things don't have lockdep error in shutdown self.mount_a.umount_wait(require_clean=True) self.fs.mds_stop() def test_mds_config_asok(self): test_key = "mds_max_purge_ops" test_val = "123" self.fs.mds_asok(['config', 'set', test_key, test_val]) out = self.fs.mds_asok(['config', 'get', test_key]) self.assertEqual(out[test_key], test_val) # Implicitly asserting that things don't have lockdep error in shutdown self.mount_a.umount_wait(require_clean=True) self.fs.mds_stop() def test_mds_config_tell(self): test_key = "mds_max_purge_ops" test_val = "123" mds_id = self.fs.get_lone_mds_id() self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "injectargs", "--{0}={1}".format(test_key, test_val)) # Read it back with asok because there is no `tell` equivalent out = self.fs.mds_asok(['config', 'get', test_key]) self.assertEqual(out[test_key], test_val) # Implicitly asserting that things don't have lockdep error in shutdown self.mount_a.umount_wait(require_clean=True) self.fs.mds_stop() class TestMirroringCommands(CephFSTestCase): CLIENTS_REQUIRED = 1 MDSS_REQUIRED = 1 def _enable_mirroring(self, fs_name): self.fs.mon_manager.raw_cluster_cmd("fs", "mirror", "enable", fs_name) def _disable_mirroring(self, fs_name): self.fs.mon_manager.raw_cluster_cmd("fs", "mirror", "disable", fs_name) def _add_peer(self, fs_name, peer_spec, remote_fs_name): self.fs.mon_manager.raw_cluster_cmd("fs", "mirror", "peer_add", fs_name, peer_spec, remote_fs_name) def _remove_peer(self, fs_name, peer_uuid): self.fs.mon_manager.raw_cluster_cmd("fs", "mirror", "peer_remove", fs_name, peer_uuid) def _verify_mirroring(self, fs_name, flag_str): status = self.fs.status() fs_map = status.get_fsmap_byname(fs_name) if flag_str == 'enabled': self.assertTrue('mirror_info' in fs_map) elif flag_str == 'disabled': self.assertTrue('mirror_info' not in fs_map) else: raise RuntimeError(f'invalid flag_str {flag_str}') def _get_peer_uuid(self, fs_name, peer_spec): status = self.fs.status() fs_map = status.get_fsmap_byname(fs_name) mirror_info = fs_map.get('mirror_info', None) self.assertTrue(mirror_info is not None) for uuid, remote in mirror_info['peers'].items(): client_name = remote['remote']['client_name'] cluster_name = remote['remote']['cluster_name'] spec = f'{client_name}@{cluster_name}' if spec == peer_spec: return uuid return None def test_mirroring_command(self): """basic mirroring command test -- enable, disable mirroring on a filesystem""" self._enable_mirroring(self.fs.name) self._verify_mirroring(self.fs.name, "enabled") self._disable_mirroring(self.fs.name) self._verify_mirroring(self.fs.name, "disabled") def test_mirroring_peer_commands(self): """test adding and removing peers to a mirror enabled filesystem""" self._enable_mirroring(self.fs.name) self._add_peer(self.fs.name, "client.site-b@site-b", "fs_b") self._add_peer(self.fs.name, "client.site-c@site-c", "fs_c") self._verify_mirroring(self.fs.name, "enabled") uuid_peer_b = self._get_peer_uuid(self.fs.name, "client.site-b@site-b") uuid_peer_c = self._get_peer_uuid(self.fs.name, "client.site-c@site-c") self.assertTrue(uuid_peer_b is not None) self.assertTrue(uuid_peer_c is not None) self._remove_peer(self.fs.name, uuid_peer_b) self._remove_peer(self.fs.name, uuid_peer_c) self._disable_mirroring(self.fs.name) self._verify_mirroring(self.fs.name, "disabled") def test_mirroring_command_idempotency(self): """test to check idempotency of mirroring family of commands """ self._enable_mirroring(self.fs.name) self._verify_mirroring(self.fs.name, "enabled") self._enable_mirroring(self.fs.name) # add peer self._add_peer(self.fs.name, "client.site-b@site-b", "fs_b") uuid_peer_b1 = self._get_peer_uuid(self.fs.name, "client.site-b@site-b") self.assertTrue(uuid_peer_b1 is not None) # adding the peer again should be idempotent self._add_peer(self.fs.name, "client.site-b@site-b", "fs_b") uuid_peer_b2 = self._get_peer_uuid(self.fs.name, "client.site-b@site-b") self.assertTrue(uuid_peer_b2 is not None) self.assertTrue(uuid_peer_b1 == uuid_peer_b2) # remove peer self._remove_peer(self.fs.name, uuid_peer_b1) uuid_peer_b3 = self._get_peer_uuid(self.fs.name, "client.site-b@site-b") self.assertTrue(uuid_peer_b3 is None) # removing the peer again should be idempotent self._remove_peer(self.fs.name, uuid_peer_b1) self._disable_mirroring(self.fs.name) self._verify_mirroring(self.fs.name, "disabled") self._disable_mirroring(self.fs.name) def test_mirroring_disable_with_peers(self): """test disabling mirroring for a filesystem with active peers""" self._enable_mirroring(self.fs.name) self._add_peer(self.fs.name, "client.site-b@site-b", "fs_b") self._verify_mirroring(self.fs.name, "enabled") uuid_peer_b = self._get_peer_uuid(self.fs.name, "client.site-b@site-b") self.assertTrue(uuid_peer_b is not None) self._disable_mirroring(self.fs.name) self._verify_mirroring(self.fs.name, "disabled") # enable mirroring to check old peers self._enable_mirroring(self.fs.name) self._verify_mirroring(self.fs.name, "enabled") # peer should be gone uuid_peer_b = self._get_peer_uuid(self.fs.name, "client.site-b@site-b") self.assertTrue(uuid_peer_b is None) self._disable_mirroring(self.fs.name) self._verify_mirroring(self.fs.name, "disabled") def test_mirroring_with_filesystem_reset(self): """test to verify mirroring state post filesystem reset""" self._enable_mirroring(self.fs.name) self._add_peer(self.fs.name, "client.site-b@site-b", "fs_b") self._verify_mirroring(self.fs.name, "enabled") uuid_peer_b = self._get_peer_uuid(self.fs.name, "client.site-b@site-b") self.assertTrue(uuid_peer_b is not None) # reset filesystem self.fs.fail() self.fs.mon_manager.raw_cluster_cmd("fs", "reset", self.fs.name, "--yes-i-really-mean-it") self.fs.wait_for_daemons() self._verify_mirroring(self.fs.name, "disabled") class TestSubCmdFsAuthorize(CapsHelper): client_id = 'testuser' client_name = 'client.' + client_id def test_single_path_r(self): perm = 'r' filepaths, filedata, mounts, keyring = self.setup_test_env(perm) moncap = self.get_mon_cap_from_keyring(self.client_name) self.run_mon_cap_tests(moncap, keyring) self.run_mds_cap_tests(filepaths, filedata, mounts, perm) def test_single_path_rw(self): perm = 'rw' filepaths, filedata, mounts, keyring = self.setup_test_env(perm) moncap = self.get_mon_cap_from_keyring(self.client_name) self.run_mon_cap_tests(moncap, keyring) self.run_mds_cap_tests(filepaths, filedata, mounts, perm) def test_multiple_path_r(self): perm, paths = 'r', ('/dir1', '/dir2/dir22') filepaths, filedata, mounts, keyring = self.setup_test_env(perm, paths) moncap = self.get_mon_cap_from_keyring(self.client_name) keyring_path = self.create_keyring_file(self.mount_a.client_remote, keyring) for path in paths: self.mount_a.remount(client_id=self.client_id, client_keyring_path=keyring_path, cephfs_mntpt=path) # actual tests... self.run_mon_cap_tests(moncap, keyring) self.run_mds_cap_tests(filepaths, filedata, mounts, perm) def test_multiple_path_rw(self): perm, paths = 'rw', ('/dir1', '/dir2/dir22') filepaths, filedata, mounts, keyring = self.setup_test_env(perm, paths) moncap = self.get_mon_cap_from_keyring(self.client_name) keyring_path = self.create_keyring_file(self.mount_a.client_remote, keyring) for path in paths: self.mount_a.remount(client_id=self.client_id, client_keyring_path=keyring_path, cephfs_mntpt=path) # actual tests... self.run_mon_cap_tests(moncap, keyring) self.run_mds_cap_tests(filepaths, filedata, mounts, perm) def tearDown(self): self.mount_a.umount_wait() self.run_cluster_cmd(f'auth rm {self.client_name}') super(type(self), self).tearDown() def setup_for_single_path(self, perm): filedata, filename = 'some data on fs 1', 'file_on_fs1' filepath = os_path_join(self.mount_a.hostfs_mntpt, filename) self.mount_a.write_file(filepath, filedata) keyring = self.fs.authorize(self.client_id, ('/', perm)) keyring_path = self.create_keyring_file(self.mount_a.client_remote, keyring) self.mount_a.remount(client_id=self.client_id, client_keyring_path=keyring_path, cephfs_mntpt='/') return filepath, filedata, keyring def setup_for_multiple_paths(self, perm, paths): filedata, filename = 'some data on fs 1', 'file_on_fs1' self.mount_a.run_shell('mkdir -p dir1/dir12/dir13 dir2/dir22/dir23') filepaths = [] for path in paths: filepath = os_path_join(self.mount_a.hostfs_mntpt, path[1:], filename) self.mount_a.write_file(filepath, filedata) filepaths.append(filepath.replace(path, '')) filepaths = tuple(filepaths) keyring = self.fs.authorize(self.client_id, (path[0], perm, path[1], perm)) return filepaths, filedata, keyring def setup_test_env(self, perm, paths=()): filepaths, filedata, keyring = self.setup_for_multiple_paths(perm, paths) if paths \ else self.setup_for_single_path(perm) if not isinstance(filepaths, tuple): filepaths = (filepaths, ) if not isinstance(filedata, tuple): filedata = (filedata, ) mounts = (self.mount_a, ) return filepaths, filedata, mounts, keyring