ceph/qa/tasks/cephfs/caps_helper.py
Rishabh Dave 352a41d1b3 qa/cephfs: add tests for multi-FS auth tests
Add testsuite for testing authorization on Ceph cluster with multiple
file systems and enable it to be executable with Teuthology framework.

Also add helper methods required to setup the test environment for
multi-FS tests.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
2020-09-11 18:02:48 +05:30

80 lines
3.2 KiB
Python

"""
Helper methods to test that MON and MDS caps are enforced properly.
"""
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.orchestra.run import Raw
class CapsHelper(CephFSTestCase):
def run_mon_cap_tests(self, moncap, keyring):
keyring_path = self.create_keyring_file(self.fs.admin_remote, keyring)
fsls = self.run_cluster_cmd(f'fs ls --id {self.client_id} -k '
f'{keyring_path}')
# we need to check only for default FS when fsname clause is absent
# in MON/MDS caps
if 'fsname' not in moncap:
self.assertIn(self.fs.name, fsls)
return
fss = (self.fs1.name, self.fs2.name) if hasattr(self, 'fs1') else \
(self.fs.name,)
for fsname in fss:
if fsname in moncap:
self.assertIn('name: ' + fsname, fsls)
else:
self.assertNotIn('name: ' + fsname, fsls)
def run_mds_cap_tests(self, filepaths, filedata, mounts, perm):
self.conduct_pos_test_for_read_caps(filepaths, filedata, mounts)
if perm == 'rw':
self.conduct_pos_test_for_write_caps(filepaths, mounts)
elif perm == 'r':
self.conduct_neg_test_for_write_caps(filepaths, mounts)
else:
raise RuntimeError(f'perm = {perm}\nIt should be "r" or "rw".')
def conduct_pos_test_for_read_caps(self, filepaths, filedata, mounts):
for mount in mounts:
for path, data in zip(filepaths, filedata):
# XXX: conduct tests only if path belongs to current mount; in
# teuth tests client are located on same machines.
if path.find(mount.hostfs_mntpt) != -1:
contents = mount.read_file(path)
self.assertEqual(data, contents)
def conduct_pos_test_for_write_caps(self, filepaths, mounts):
filedata = ('some new data on first fs', 'some new data on second fs')
for mount in mounts:
for path, data in zip(filepaths, filedata):
if path.find(mount.hostfs_mntpt) != -1:
# test that write was successful
mount.write_file(path=path, data=data)
# verify that contents written was same as the one that was
# intended
contents1 = mount.read_file(path=path)
self.assertEqual(data, contents1)
def conduct_neg_test_for_write_caps(self, filepaths, mounts):
cmdargs = ['echo', 'some random data', Raw('|'), 'sudo', 'tee']
for mount in mounts:
for path in filepaths:
if path.find(mount.hostfs_mntpt) != -1:
cmdargs.append(path)
mount.negtestcmd(args=cmdargs, retval=1,
errmsg='permission denied')
def get_mon_cap_from_keyring(self, client_name):
keyring = self.run_cluster_cmd(cmd=f'auth get {client_name}')
for line in keyring.split('\n'):
if 'caps mon' in line:
return line[line.find(' = "') + 4 : -1]
raise RuntimeError('get_save_mon_cap: mon cap not found in keyring. '
'keyring -\n' + keyring)