qa/cephfs: add more helper methods to caps_helper.py

Add methods that will accept read/write permissions, CephFS names and
CephFS mount point and in return will generate string form of MON, OSD
and MDS caps exactly as it is reported in Ceph keyrings.

Replace similar code in test_multifs_auth.py with calls to these helper
methods.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
This commit is contained in:
Rishabh Dave 2023-03-13 16:16:13 +05:30
parent 3203d6005e
commit 969a93d0dc
2 changed files with 119 additions and 32 deletions

View File

@ -12,6 +12,107 @@ from teuthology.orchestra.run import Raw
log = getLogger(__name__) log = getLogger(__name__)
def gen_mon_cap_str(caps):
"""
Accepts a tuple of tuples, where the inner tuple contains read/write
permissions and CephFS name.
caps = ((perm1, fsname1), (perm2, fsname2))
"""
def _unpack_tuple(c):
if len(c) == 1:
perm, fsname = c[0], None
elif len(c) == 2:
perm, fsname = c
else:
raise RuntimeError('caps tuple received isn\'t right')
return perm, fsname
def _gen_mon_cap_str(c):
perm, fsname = _unpack_tuple(c)
mon_cap = f'allow {perm}'
if fsname:
mon_cap += f' fsname={fsname}'
return mon_cap
if len(caps) == 1:
return _gen_mon_cap_str(caps[0])
mon_cap = ''
for i, c in enumerate(caps):
mon_cap += _gen_mon_cap_str(c)
if i != len(caps) - 1:
mon_cap += ', '
return mon_cap
def gen_osd_cap_str(caps):
"""
Accepts a tuple of tuples, where the inner tuple contains read/write
permissions and CephFS name.
caps = ((perm1, fsname1), (perm2, fsname2))
"""
def _gen_osd_cap_str(c):
perm, fsname = c
osd_cap = f'allow {perm} tag cephfs'
if fsname:
osd_cap += f' data={fsname}'
return osd_cap
if len(caps) == 1:
return _gen_osd_cap_str(caps[0])
osd_cap = ''
for i, c in enumerate(caps):
osd_cap += _gen_osd_cap_str(c)
if i != len(caps) - 1:
osd_cap += ', '
return osd_cap
def gen_mds_cap_str(caps):
"""
Accepts a tuple of tuples where inner the tuple containts read/write
permissions, Ceph FS name and CephFS mountpoint.
caps = ((perm1, fsname1, cephfs_mntpt1), (perm2, fsname2, cephfs_mntpt2))
"""
def _unpack_tuple(c):
if len(c) == 2:
perm, fsname, cephfs_mntpt = c[0], c[1], '/'
elif len(c) == 3:
perm, fsname, cephfs_mntpt = c
elif len(c) < 2:
raise RuntimeError('received items are too less in caps')
else: # len(c) > 3
raise RuntimeError('received items are too many in caps')
return perm, fsname, cephfs_mntpt
def _gen_mds_cap_str(c):
perm, fsname, cephfs_mntpt = _unpack_tuple(c)
mds_cap = f'allow {perm}'
if fsname:
mds_cap += f' fsname={fsname}'
if cephfs_mntpt != '/':
if cephfs_mntpt[0] == '/':
cephfs_mntpt = cephfs_mntpt[1:]
mds_cap += f' path={cephfs_mntpt}'
return mds_cap
if len(caps) == 1:
return _gen_mds_cap_str(caps[0])
mds_cap = ''
for i, c in enumerate(caps):
mds_cap += _gen_mds_cap_str(c)
if i != len(caps) - 1:
mds_cap += ', '
return mds_cap
class CapTester(CephFSTestCase): class CapTester(CephFSTestCase):
""" """
Test that MON and MDS caps are enforced. Test that MON and MDS caps are enforced.

View File

@ -4,7 +4,8 @@ Test for Ceph clusters with multiple FSs.
import logging import logging
from tasks.cephfs.cephfs_test_case import CephFSTestCase from tasks.cephfs.cephfs_test_case import CephFSTestCase
from tasks.cephfs.caps_helper import CapTester from tasks.cephfs.caps_helper import (CapTester, gen_mon_cap_str,
gen_osd_cap_str, gen_mds_cap_str)
from teuthology.exceptions import CommandFailedError from teuthology.exceptions import CommandFailedError
@ -41,14 +42,13 @@ class TestMultiFS(CephFSTestCase):
class TestMONCaps(TestMultiFS): class TestMONCaps(TestMultiFS):
def test_moncap_with_one_fs_names(self): def test_moncap_with_one_fs_names(self):
moncap = f'allow r fsname={self.fs1.name}' moncap = gen_mon_cap_str((('r', self.fs1.name),))
self.create_client(self.client_id, moncap) self.create_client(self.client_id, moncap)
self.captester.run_mon_cap_tests(self.fs1, self.client_id) self.captester.run_mon_cap_tests(self.fs1, self.client_id)
def test_moncap_with_multiple_fs_names(self): def test_moncap_with_multiple_fs_names(self):
moncap = (f'allow r fsname={self.fs1.name}, ' moncap = gen_mon_cap_str((('r', self.fs1.name), ('r', self.fs2.name)))
f'allow r fsname={self.fs2.name}')
self.create_client(self.client_id, moncap) self.create_client(self.client_id, moncap)
self.captester.run_mon_cap_tests(self.fs1, self.client_id) self.captester.run_mon_cap_tests(self.fs1, self.client_id)
@ -155,31 +155,16 @@ class TestMDSCaps(TestMultiFS):
super(type(self), self).tearDown() super(type(self), self).tearDown()
def generate_caps(self, perm, fsname, cephfs_mntpt):
moncap = 'allow r'
osdcap = (f'allow {perm} tag cephfs data={self.fs1.name}, '
f'allow {perm} tag cephfs data={self.fs2.name}')
if fsname:
if cephfs_mntpt == '/':
mdscap = (f'allow {perm} fsname={self.fs1.name}, '
f'allow {perm} fsname={self.fs2.name}')
else:
mdscap = (f'allow {perm} fsname={self.fs1.name} '
f'path=/{cephfs_mntpt}, '
f'allow {perm} fsname={self.fs2.name} '
f'path=/{cephfs_mntpt}')
else:
if cephfs_mntpt == '/':
mdscap = f'allow {perm}'
else:
mdscap = f'allow {perm} path=/{cephfs_mntpt}'
return moncap, osdcap, mdscap
def _create_client(self, perm, fsname=False, cephfs_mntpt='/'): def _create_client(self, perm, fsname=False, cephfs_mntpt='/'):
moncap, osdcap, mdscap = self.generate_caps(perm, fsname, moncap = 'allow r'
cephfs_mntpt) osdcap = gen_osd_cap_str(((perm, self.fs1.name),
(perm, self.fs2.name)))
if fsname:
mdscap = gen_mds_cap_str(((perm, self.fs1.name, cephfs_mntpt),
(perm, self.fs2.name, cephfs_mntpt)))
else:
mdscap = gen_mds_cap_str(((perm, None, cephfs_mntpt),
(perm, None, cephfs_mntpt)))
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap) keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
keyring_paths = [] keyring_paths = []
@ -277,10 +262,11 @@ class TestClientsWithoutAuth(TestMultiFS):
def test_mount_mon_and_osd_caps_present_mds_caps_absent(self): def test_mount_mon_and_osd_caps_present_mds_caps_absent(self):
# setup part... # setup part...
moncap = f'allow rw fsname={self.fs1.name}, allow rw fsname={self.fs2.name}' moncap = gen_mon_cap_str((('rw', self.fs1.name),
mdscap = f'allow rw fsname={self.fs1.name}' ('rw', self.fs2.name)))
osdcap = (f'allow rw tag cephfs data={self.fs1.name}, allow rw tag ' mdscap = gen_mds_cap_str((('rw', self.fs1.name),))
f'cephfs data={self.fs2.name}') osdcap = gen_osd_cap_str((('rw', self.fs1.name,),
('rw', self.fs2.name)))
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap) keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
keyring_path = self.mount_a.client_remote.mktemp(data=keyring) keyring_path = self.mount_a.client_remote.mktemp(data=keyring)