ceph/qa/tasks/cephfs/caps_helper.py
Rishabh Dave f01188d72a qa/cephfs: fix minor bug in caps_helper.py's run_mon_cap_tests()
When an entity's MON cap contains no FS name in it, the entity has the
access to all the FSs on cluster and not just to the default FS of the
cluster.

Fixes: https://tracker.ceph.com/issues/55558
Signed-off-by: Rishabh Dave <ridave@redhat.com>
2022-05-27 15:19:38 +05:30

79 lines
3.2 KiB
Python

"""
Helper methods to test that MON and MDS caps are enforced properly.
"""
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.orchestra.run import Raw
class CapsHelper(CephFSTestCase):
def run_mon_cap_tests(self, moncap, keyring):
keyring_path = self.fs.admin_remote.mktemp(data=keyring)
fsls = self.run_cluster_cmd(f'fs ls --id {self.client_id} -k '
f'{keyring_path}')
if 'fsname=' not in moncap:
fsls_admin = self.run_cluster_cmd('fs ls')
self.assertEqual(fsls, fsls_admin)
return
fss = (self.fs1.name, self.fs2.name) if hasattr(self, 'fs1') else \
(self.fs.name,)
for fsname in fss:
if fsname in moncap:
self.assertIn('name: ' + fsname, fsls)
else:
self.assertNotIn('name: ' + fsname, fsls)
def run_mds_cap_tests(self, filepaths, filedata, mounts, perm):
self.conduct_pos_test_for_read_caps(filepaths, filedata, mounts)
if perm == 'rw':
self.conduct_pos_test_for_write_caps(filepaths, mounts)
elif perm == 'r':
self.conduct_neg_test_for_write_caps(filepaths, mounts)
else:
raise RuntimeError(f'perm = {perm}\nIt should be "r" or "rw".')
def conduct_pos_test_for_read_caps(self, filepaths, filedata, mounts):
for mount in mounts:
for path, data in zip(filepaths, filedata):
# XXX: conduct tests only if path belongs to current mount; in
# teuth tests client are located on same machines.
if path.find(mount.hostfs_mntpt) != -1:
contents = mount.read_file(path)
self.assertEqual(data, contents)
def conduct_pos_test_for_write_caps(self, filepaths, mounts):
filedata = ('some new data on first fs', 'some new data on second fs')
for mount in mounts:
for path, data in zip(filepaths, filedata):
if path.find(mount.hostfs_mntpt) != -1:
# test that write was successful
mount.write_file(path=path, data=data)
# verify that contents written was same as the one that was
# intended
contents1 = mount.read_file(path=path)
self.assertEqual(data, contents1)
def conduct_neg_test_for_write_caps(self, filepaths, mounts):
cmdargs = ['echo', 'some random data', Raw('|'), 'tee']
for mount in mounts:
for path in filepaths:
if path.find(mount.hostfs_mntpt) != -1:
cmdargs.append(path)
mount.negtestcmd(args=cmdargs, retval=1,
errmsg='permission denied')
def get_mon_cap_from_keyring(self, client_name):
keyring = self.run_cluster_cmd(cmd=f'auth get {client_name}')
for line in keyring.split('\n'):
if 'caps mon' in line:
return line[line.find(' = "') + 4 : -1]
raise RuntimeError('get_save_mon_cap: mon cap not found in keyring. '
'keyring -\n' + keyring)