qa/cephfs: improve caps_helper.CapTester

Improvement #1:

CapTester.write_test_files() not only creates the test file but also
does the following for every mount object it receives in parameters -

* carefully produces the path for the test file as per parameters
  received
* generates the unique data for each test file on a CephFS mount
* creates a data structure -- list of lists -- that holds all this
  information along with mount object itself for each mount object so
  that tests can be conducted at a later point

Untangle this mess of code by splitting this method into 3 separate
methods -

1. To produce the path for test file (as per user's need).
2. To generate the data that will be written into the test file.
3. To actually create the test file on CephFS.

Improvement #2:

Remove the internal data structure used for testing -- self.test_set --
and use separate class attributes to store all the data required for
testing instead of a tuple. This serves two purpose -

One, it makes it easy to manipulate all this data from helper methods
and during debugging session, especially while using a PDB session.

And two, make it impossible to have multiple mounts/multiple "test sets"
within same CapTester instance for the sake of simplicity. Users can
instead create two instances of CapTester instances if needed.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
This commit is contained in:
Rishabh Dave 2023-04-01 00:44:52 +05:30
parent e8bdf94b81
commit 008dbe91e2
3 changed files with 124 additions and 116 deletions

View File

@ -119,7 +119,10 @@ class CapTester:
run_mon_cap_tests() or run_mds_cap_tests() as per the need. run_mon_cap_tests() or run_mds_cap_tests() as per the need.
""" """
def write_test_files(self, mounts, testpath=''): def __init__(self, mount=None, path=''):
self._create_test_files(mount, path)
def _create_test_files(self, mount, path):
""" """
Exercising 'r' and 'w' access levels on a file on CephFS mount is Exercising 'r' and 'w' access levels on a file on CephFS mount is
pretty routine across all tests for caps. Adding to method to write pretty routine across all tests for caps. Adding to method to write
@ -129,39 +132,47 @@ class CapTester:
at the path passed in testpath for the given list of mounts. If at the path passed in testpath for the given list of mounts. If
testpath is empty, the file is created at the root of the CephFS. testpath is empty, the file is created at the root of the CephFS.
""" """
dirname, filename = 'testdir', 'testfile' # CephFS mount where read/write test will be conducted.
self.test_set = [] self.mount = mount
# XXX: The reason behind testpath[1:] below is that the testpath is # Path where out test file located.
self.path = self._gen_test_file_path(path)
# Data that out test file will contain.
self.data = self._gen_test_file_data()
self.mount.write_file(self.path, self.data)
log.info(f'Test file has been created on FS '
f'"{self.mount.cephfs_name}" at path "{self.path}" with the '
f'following data -\n{self.data}')
def _gen_test_file_path(self, path=''):
# XXX: The reason behind path[1:] below is that the path is
# supposed to contain a path inside CephFS (which might be passed as # supposed to contain a path inside CephFS (which might be passed as
# an absolute path). os.path.join() deletes all previous path # an absolute path). os.path.join() deletes all previous path
# components when it encounters a path component starting with '/'. # components when it encounters a path component starting with '/'.
# Deleting the first '/' from the string in testpath ensures that # Deleting the first '/' from the string in path ensures that
# previous path components are not deleted by os.path.join(). # previous path components are not deleted by os.path.join().
if testpath: if path:
testpath = testpath[1:] if testpath[0] == '/' else testpath path = path[1:] if path[0] == '/' else path
# XXX: passing just '/' screw up os.path.join() ahead. # XXX: passing just '/' messes up os.path.join() ahead.
if testpath == '/': if path == '/':
testpath = '' path = ''
for mount_x in mounts: dirname, filename = 'testdir', 'testfile'
log.info(f'creating test file on FS {mount_x.cephfs_name} ' dirpath = os_path_join(self.mount.hostfs_mntpt, path, dirname)
f'mounted at {mount_x.mountpoint}...') self.mount.run_shell(f'mkdir {dirpath}')
dirpath = os_path_join(mount_x.hostfs_mntpt, testpath, dirname) return os_path_join(dirpath, filename)
mount_x.run_shell(f'mkdir {dirpath}')
filepath = os_path_join(dirpath, filename) def _gen_test_file_data(self):
# XXX: the reason behind adding filepathm, cephfs_name and both # XXX: the reason behind adding path, cephfs_name and both
# mntpts is to avoid a test bug where we mount cephfs1 but what # mntpts is to avoid a test bug where we mount cephfs1 but what
# ends up being mounted cephfs2. since filepath and filedata are # ends up being mounted cephfs2. since self.path and self.data are
# identical, how would tests figure otherwise that they are # identical, how would tests figure otherwise that they are
# accessing the right filename but on wrong CephFS. # accessing the right filename but on wrong CephFS.
filedata = (f'filepath = {filepath}\n' return dedent(f'''\
f'cephfs_name = {mount_x.cephfs_name}\n' self.path = {self.path}
f'cephfs_mntpt = {mount_x.cephfs_mntpt}\n' cephfs_name = {self.mount.cephfs_name}
f'hostfs_mntpt = {mount_x.hostfs_mntpt}') cephfs_mntpt = {self.mount.cephfs_mntpt}
mount_x.write_file(filepath, filedata) hostfs_mntpt = {self.mount.hostfs_mntpt}''')
self.test_set.append([mount_x, filepath, filedata])
log.info(f'Test file created at "{filepath}" with the following '
f'data -\n"{filedata}"')
def run_cap_tests(self, perm, mntpt=None): def run_cap_tests(self, perm, mntpt=None):
# TODO # TODO
@ -235,15 +246,17 @@ class CapTester:
Run test for read perm and, for write perm, run positive test if it Run test for read perm and, for write perm, run positive test if it
is present and run negative test if not. is present and run negative test if not.
""" """
if mntpt:
# beacaue we want to value of mntpt from test_set.path along with
# slash that precedes it.
mntpt = '/' + mntpt if mntpt[0] != '/' else mntpt
# XXX: mntpt is path inside cephfs that serves as root for current # XXX: mntpt is path inside cephfs that serves as root for current
# mount. Therefore, this path must me deleted from self.filepaths. # mount. Therefore, this path must me deleted from self.path.
# Example - # Example -
# orignal path: /mnt/cephfs_x/dir1/dir2/testdir # orignal path: /mnt/cephfs_x/dir1/dir2/testdir
# cephfs dir serving as root for current mnt: /dir1/dir2 # cephfs dir serving as root for current mnt: /dir1/dir2
# therefore, final path: /mnt/cephfs_x/testdir # therefore, final path: /mnt/cephfs_x/testdir
if mntpt: self.path = self.path.replace(mntpt, '')
self.test_set = [(x, y.replace(mntpt, ''), z) for x, y, z in \
self.test_set]
self.conduct_pos_test_for_read_caps() self.conduct_pos_test_for_read_caps()
@ -255,23 +268,22 @@ class CapTester:
raise RuntimeError(f'perm = {perm}\nIt should be "r" or "rw".') raise RuntimeError(f'perm = {perm}\nIt should be "r" or "rw".')
def conduct_pos_test_for_read_caps(self): def conduct_pos_test_for_read_caps(self):
for mount, path, data in self.test_set: log.info(f'test read perm: read file {self.path} and expect data '
log.info(f'test read perm: read file {path} and expect data ' f'"{self.data}"')
f'"{data}"') contents = self.mount.read_file(self.path)
contents = mount.read_file(path) assert_equal(self.data, contents)
assert_equal(data, contents) log.info(f'read perm was tested successfully: "{self.data}" was '
log.info(f'read perm was tested successfully: "{data}" was ' f'successfully read from path {self.path}')
f'successfully read from path {path}')
def conduct_pos_test_for_write_caps(self): def conduct_pos_test_for_write_caps(self):
for mount, path, data in self.test_set: log.info(f'test write perm: try writing data "{self.data}" to '
log.info(f'test write perm: try writing data "{data}" to ' f'file {self.path}.')
f'file {path}.') self.mount.write_file(path=self.path, data=self.data)
mount.write_file(path=path, data=data) contents = self.mount.read_file(path=self.path)
contents = mount.read_file(path=path) assert_equal(self.data, contents)
assert_equal(data, contents) log.info(f'write perm was tested was successfully: self.data '
log.info(f'write perm was tested was successfully: data ' f'"{self.data}" was successfully written to file '
f'"{data}" was successfully written to file "{path}".') f'"{self.path}".')
def conduct_neg_test_for_write_caps(self, sudo_write=False): def conduct_neg_test_for_write_caps(self, sudo_write=False):
possible_errmsgs = ('permission denied', 'operation not permitted') possible_errmsgs = ('permission denied', 'operation not permitted')
@ -279,11 +291,10 @@ class CapTester:
cmdargs += ['sudo', 'tee'] if sudo_write else ['tee'] cmdargs += ['sudo', 'tee'] if sudo_write else ['tee']
# don't use data, cmd args to write are set already above. # don't use data, cmd args to write are set already above.
for mount, path, data in self.test_set:
log.info('test absence of write perm: expect failure ' log.info('test absence of write perm: expect failure '
f'writing data to file {path}.') f'writing data to file {self.path}.')
cmdargs.append(path) cmdargs.append(self.path)
mount.negtestcmd(args=cmdargs, retval=1, errmsgs=possible_errmsgs) self.mount.negtestcmd(args=cmdargs, retval=1, errmsgs=possible_errmsgs)
cmdargs.pop(-1) cmdargs.pop(-1)
log.info('absence of write perm was tested successfully: ' log.info('absence of write perm was tested successfully: '
f'failed to be write data to file {path}.') f'failed to be write data to file {self.path}.')

View File

@ -1206,27 +1206,30 @@ class TestFsAuthorize(CephFSTestCase):
def test_single_path_r(self): def test_single_path_r(self):
PERM = 'r' PERM = 'r'
FS_AUTH_CAPS = (('/', PERM),) FS_AUTH_CAPS = (('/', PERM),)
self.captester = CapTester() self.captester = CapTester(self.mount_a, '/')
self.setup_test_env(FS_AUTH_CAPS) keyring = self.fs.authorize(self.client_id, FS_AUTH_CAPS)
self._remount(keyring)
self.captester.run_mon_cap_tests(self.fs, self.client_id) self.captester.run_mon_cap_tests(self.fs, self.client_id)
self.captester.run_mds_cap_tests(PERM) self.captester.run_mds_cap_tests(PERM)
def test_single_path_rw(self): def test_single_path_rw(self):
PERM = 'rw' PERM = 'rw'
FS_AUTH_CAPS = (('/', PERM),) FS_AUTH_CAPS = (('/', PERM),)
self.captester = CapTester() self.captester = CapTester(self.mount_a, '/')
self.setup_test_env(FS_AUTH_CAPS) keyring = self.fs.authorize(self.client_id, FS_AUTH_CAPS)
self._remount(keyring)
self.captester.run_mon_cap_tests(self.fs, self.client_id) self.captester.run_mon_cap_tests(self.fs, self.client_id)
self.captester.run_mds_cap_tests(PERM) self.captester.run_mds_cap_tests(PERM)
def test_single_path_rootsquash(self): def test_single_path_rootsquash(self):
PERM = 'rw' PERM = 'rw'
FS_AUTH_CAPS = (('/', PERM, 'root_squash'),) FS_AUTH_CAPS = (('/', PERM, 'root_squash'),)
self.captester = CapTester() self.captester = CapTester(self.mount_a, '/')
self.setup_test_env(FS_AUTH_CAPS) keyring = self.fs.authorize(self.client_id, FS_AUTH_CAPS)
self._remount(keyring)
# testing MDS caps... # testing MDS caps...
# Since root_squash is set in client caps, client can read but not # Since root_squash is set in client caps, client can read but not
# write even thought access level is set to "rw". # write even thought access level is set to "rw".
@ -1250,8 +1253,10 @@ class TestFsAuthorize(CephFSTestCase):
self.mount_a.remount(cephfs_name=self.fs.name) self.mount_a.remount(cephfs_name=self.fs.name)
PERM = 'rw' PERM = 'rw'
FS_AUTH_CAPS = (('/', PERM),) FS_AUTH_CAPS = (('/', PERM),)
self.captester = CapTester() self.captester = CapTester(self.mount_a, '/')
self.setup_test_env(FS_AUTH_CAPS) keyring = self.fs.authorize(self.client_id, FS_AUTH_CAPS)
self._remount(keyring)
self.captester.run_mds_cap_tests(PERM) self.captester.run_mds_cap_tests(PERM)
def test_multiple_path_r(self): def test_multiple_path_r(self):
@ -1259,23 +1264,24 @@ class TestFsAuthorize(CephFSTestCase):
FS_AUTH_CAPS = (('/dir1/dir12', PERM), ('/dir2/dir22', PERM)) FS_AUTH_CAPS = (('/dir1/dir12', PERM), ('/dir2/dir22', PERM))
for c in FS_AUTH_CAPS: for c in FS_AUTH_CAPS:
self.mount_a.run_shell(f'mkdir -p .{c[0]}') self.mount_a.run_shell(f'mkdir -p .{c[0]}')
self.captesters = (CapTester(), CapTester()) self.captesters = (CapTester(self.mount_a, '/dir1/dir12'),
self.setup_test_env(FS_AUTH_CAPS) CapTester(self.mount_a, '/dir2/dir22'))
keyring = self.fs.authorize(self.client_id, FS_AUTH_CAPS)
self.run_cap_test_one_by_one(FS_AUTH_CAPS) self._remount_and_run_tests(FS_AUTH_CAPS, keyring)
def test_multiple_path_rw(self): def test_multiple_path_rw(self):
PERM = 'rw' PERM = 'rw'
FS_AUTH_CAPS = (('/dir1/dir12', PERM), ('/dir2/dir22', PERM)) FS_AUTH_CAPS = (('/dir1/dir12', PERM), ('/dir2/dir22', PERM))
for c in FS_AUTH_CAPS: for c in FS_AUTH_CAPS:
self.mount_a.run_shell(f'mkdir -p .{c[0]}') self.mount_a.run_shell(f'mkdir -p .{c[0]}')
self.captesters = (CapTester(), CapTester()) self.captesters = (CapTester(self.mount_a, '/dir1/dir12'),
self.setup_test_env(FS_AUTH_CAPS) CapTester(self.mount_a, '/dir2/dir22'))
keyring = self.fs.authorize(self.client_id, FS_AUTH_CAPS)
self.run_cap_test_one_by_one(FS_AUTH_CAPS) self._remount_and_run_tests(FS_AUTH_CAPS, keyring)
def run_cap_test_one_by_one(self, fs_auth_caps): def _remount_and_run_tests(self, fs_auth_caps, keyring):
keyring = self.run_cluster_cmd(f'auth get {self.client_name}')
for i, c in enumerate(fs_auth_caps): for i, c in enumerate(fs_auth_caps):
self.assertIn(i, (0, 1)) self.assertIn(i, (0, 1))
PATH = c[0] PATH = c[0]
@ -1297,24 +1303,6 @@ class TestFsAuthorize(CephFSTestCase):
client_keyring_path=keyring_path, client_keyring_path=keyring_path,
cephfs_mntpt=path) cephfs_mntpt=path)
def setup_for_single_path(self, fs_auth_caps):
self.captester.write_test_files((self.mount_a,), '/')
keyring = self.fs.authorize(self.client_id, fs_auth_caps)
self._remount(keyring)
def setup_for_multiple_paths(self, fs_auth_caps):
for i, c in enumerate(fs_auth_caps):
PATH = c[0]
self.captesters[i].write_test_files((self.mount_a,), PATH)
self.fs.authorize(self.client_id, fs_auth_caps)
def setup_test_env(self, fs_auth_caps):
if len(fs_auth_caps) == 1:
self.setup_for_single_path(fs_auth_caps[0])
else:
self.setup_for_multiple_paths(fs_auth_caps)
class TestAdminCommandIdempotency(CephFSTestCase): class TestAdminCommandIdempotency(CephFSTestCase):
""" """

View File

@ -23,8 +23,6 @@ class TestMultiFS(CephFSTestCase):
def setUp(self): def setUp(self):
super(TestMultiFS, self).setUp() super(TestMultiFS, self).setUp()
self.captester = CapTester()
# we might have it - the client - if the same cluster was used for a # we might have it - the client - if the same cluster was used for a
# different vstart_runner.py run. # different vstart_runner.py run.
self.run_cluster_cmd(f'auth rm {self.client_name}') self.run_cluster_cmd(f'auth rm {self.client_name}')
@ -42,18 +40,21 @@ class TestMultiFS(CephFSTestCase):
class TestMONCaps(TestMultiFS): class TestMONCaps(TestMultiFS):
def test_moncap_with_one_fs_names(self): def test_moncap_with_one_fs_names(self):
self.captester = CapTester()
moncap = gen_mon_cap_str((('r', self.fs1.name),)) moncap = gen_mon_cap_str((('r', self.fs1.name),))
self.create_client(self.client_id, moncap) self.create_client(self.client_id, moncap)
self.captester.run_mon_cap_tests(self.fs1, self.client_id) self.captester.run_mon_cap_tests(self.fs1, self.client_id)
def test_moncap_with_multiple_fs_names(self): def test_moncap_with_multiple_fs_names(self):
self.captester = CapTester()
moncap = gen_mon_cap_str((('r', self.fs1.name), ('r', self.fs2.name))) moncap = gen_mon_cap_str((('r', self.fs1.name), ('r', self.fs2.name)))
self.create_client(self.client_id, moncap) self.create_client(self.client_id, moncap)
self.captester.run_mon_cap_tests(self.fs1, self.client_id) self.captester.run_mon_cap_tests(self.fs1, self.client_id)
def test_moncap_with_blanket_allow(self): def test_moncap_with_blanket_allow(self):
self.captester = CapTester()
moncap = 'allow r' moncap = 'allow r'
self.create_client(self.client_id, moncap) self.create_client(self.client_id, moncap)
@ -69,53 +70,55 @@ class TestMDSCaps(TestMultiFS):
3. Remount the current mounts with this new client. 3. Remount the current mounts with this new client.
4. Test read and write on both FSs. 4. Test read and write on both FSs.
""" """
def setUp(self):
super(self.__class__, self).setUp()
self.mounts = (self.mount_a, self.mount_b)
def test_rw_with_fsname_and_no_path_in_cap(self): def test_rw_with_fsname_and_no_path_in_cap(self):
PERM = 'rw' PERM = 'rw'
self.captester.write_test_files(self.mounts) self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
moncap, osdcap, mdscap = self._gen_caps(PERM, both_fsnames=True) moncap, osdcap, mdscap = self._gen_caps(PERM, both_fsnames=True)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap) keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
self.remount_with_new_client(keyring) self.remount_with_new_client(keyring)
self.captester.run_mds_cap_tests(PERM) self.captesters[0].run_mds_cap_tests(PERM)
self.captesters[1].run_mds_cap_tests(PERM)
def test_r_with_fsname_and_no_path_in_cap(self): def test_r_with_fsname_and_no_path_in_cap(self):
PERM = 'r' PERM = 'r'
self.captester.write_test_files(self.mounts) self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
moncap, osdcap, mdscap = self._gen_caps(PERM, both_fsnames=True) moncap, osdcap, mdscap = self._gen_caps(PERM, both_fsnames=True)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap) keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
self.remount_with_new_client(keyring) self.remount_with_new_client(keyring)
self.captester.run_mds_cap_tests(PERM) self.captesters[0].run_mds_cap_tests(PERM)
self.captesters[1].run_mds_cap_tests(PERM)
def test_rw_with_fsname_and_path_in_cap(self): def test_rw_with_fsname_and_path_in_cap(self):
PERM, CEPHFS_MNTPT = 'rw', 'dir1' PERM, CEPHFS_MNTPT = 'rw', 'dir1'
self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}') self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}') self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.captester.write_test_files(self.mounts, CEPHFS_MNTPT) self.captesters = (CapTester(self.mount_a, CEPHFS_MNTPT),
CapTester(self.mount_b, CEPHFS_MNTPT))
moncap, osdcap, mdscap = self._gen_caps(PERM, True, CEPHFS_MNTPT) moncap, osdcap, mdscap = self._gen_caps(PERM, True, CEPHFS_MNTPT)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap) keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
self.remount_with_new_client(keyring, CEPHFS_MNTPT) self.remount_with_new_client(keyring, CEPHFS_MNTPT)
self.captester.run_mds_cap_tests(PERM, CEPHFS_MNTPT) self.captesters[0].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
self.captesters[1].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
def test_r_with_fsname_and_path_in_cap(self): def test_r_with_fsname_and_path_in_cap(self):
PERM, CEPHFS_MNTPT = 'r', 'dir1' PERM, CEPHFS_MNTPT = 'r', 'dir1'
self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}') self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}') self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.captester.write_test_files(self.mounts, CEPHFS_MNTPT) self.captesters = (CapTester(self.mount_a, CEPHFS_MNTPT),
CapTester(self.mount_b, CEPHFS_MNTPT))
moncap, osdcap, mdscap = self._gen_caps(PERM, True, CEPHFS_MNTPT) moncap, osdcap, mdscap = self._gen_caps(PERM, True, CEPHFS_MNTPT)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap) keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
self.remount_with_new_client(keyring, CEPHFS_MNTPT) self.remount_with_new_client(keyring, CEPHFS_MNTPT)
self.captester.run_mds_cap_tests(PERM, CEPHFS_MNTPT) self.captesters[0].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
self.captesters[1].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
# XXX: this tests the backward compatibility; "allow rw path=<dir1>" is # XXX: this tests the backward compatibility; "allow rw path=<dir1>" is
# treated as "allow rw fsname=* path=<dir1>" # treated as "allow rw fsname=* path=<dir1>"
@ -123,13 +126,15 @@ class TestMDSCaps(TestMultiFS):
PERM, CEPHFS_MNTPT = 'rw', 'dir1' PERM, CEPHFS_MNTPT = 'rw', 'dir1'
self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}') self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}') self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.captester.write_test_files(self.mounts, CEPHFS_MNTPT) self.captesters = (CapTester(self.mount_a, CEPHFS_MNTPT),
CapTester(self.mount_b, CEPHFS_MNTPT))
moncap, osdcap, mdscap = self._gen_caps(PERM, False, CEPHFS_MNTPT) moncap, osdcap, mdscap = self._gen_caps(PERM, False, CEPHFS_MNTPT)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap) keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
self.remount_with_new_client(keyring, CEPHFS_MNTPT) self.remount_with_new_client(keyring, CEPHFS_MNTPT)
self.captester.run_mds_cap_tests(PERM, CEPHFS_MNTPT) self.captesters[0].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
self.captesters[1].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
# XXX: this tests the backward compatibility; "allow r path=<dir1>" is # XXX: this tests the backward compatibility; "allow r path=<dir1>" is
# treated as "allow r fsname=* path=<dir1>" # treated as "allow r fsname=* path=<dir1>"
@ -137,33 +142,37 @@ class TestMDSCaps(TestMultiFS):
PERM, CEPHFS_MNTPT = 'r', 'dir1' PERM, CEPHFS_MNTPT = 'r', 'dir1'
self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}') self.mount_a.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}') self.mount_b.run_shell(f'mkdir {CEPHFS_MNTPT}')
self.captester.write_test_files(self.mounts, CEPHFS_MNTPT) self.captesters = (CapTester(self.mount_a, CEPHFS_MNTPT),
CapTester(self.mount_b, CEPHFS_MNTPT))
moncap, osdcap, mdscap = self._gen_caps(PERM, False, CEPHFS_MNTPT) moncap, osdcap, mdscap = self._gen_caps(PERM, False, CEPHFS_MNTPT)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap) keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
self.remount_with_new_client(keyring, CEPHFS_MNTPT) self.remount_with_new_client(keyring, CEPHFS_MNTPT)
self.captester.run_mds_cap_tests(PERM, CEPHFS_MNTPT) self.captesters[0].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
self.captesters[1].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
def test_rw_with_no_fsname_and_no_path(self): def test_rw_with_no_fsname_and_no_path(self):
PERM = 'rw' PERM = 'rw'
self.captester.write_test_files(self.mounts) self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
moncap, osdcap, mdscap = self._gen_caps(PERM) moncap, osdcap, mdscap = self._gen_caps(PERM)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap) keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
self.remount_with_new_client(keyring) self.remount_with_new_client(keyring)
self.captester.run_mds_cap_tests(PERM) self.captesters[0].run_mds_cap_tests(PERM)
self.captesters[1].run_mds_cap_tests(PERM)
def test_r_with_no_fsname_and_no_path(self): def test_r_with_no_fsname_and_no_path(self):
PERM = 'r' PERM = 'r'
self.captester.write_test_files(self.mounts) self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
moncap, osdcap, mdscap = self._gen_caps(PERM) moncap, osdcap, mdscap = self._gen_caps(PERM)
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap) keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
self.remount_with_new_client(keyring) self.remount_with_new_client(keyring)
self.captester.run_mds_cap_tests(PERM) self.captesters[0].run_mds_cap_tests(PERM)
self.captesters[1].run_mds_cap_tests(PERM)
def tearDown(self): def tearDown(self):
self.mount_a.umount_wait() self.mount_a.umount_wait()