mirror of
https://github.com/ceph/ceph
synced 2025-01-19 17:41:39 +00:00
2d465e15fd
Add tests for the case where "fs authorize" subcommand upgrades the caps for the client. Signed-off-by: Rishabh Dave <ridave@redhat.com>
316 lines
13 KiB
Python
316 lines
13 KiB
Python
"""
|
|
Test for Ceph clusters with multiple FSs.
|
|
"""
|
|
import logging
|
|
|
|
from tasks.cephfs.cephfs_test_case import CephFSTestCase
|
|
from tasks.cephfs.caps_helper import (MonCapTester, MdsCapTester,
|
|
gen_mon_cap_str, gen_osd_cap_str,
|
|
gen_mds_cap_str)
|
|
|
|
from teuthology.exceptions import CommandFailedError
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class TestMultiFS(CephFSTestCase):
|
|
client_id = 'testuser'
|
|
client_name = 'client.' + client_id
|
|
# one dedicated for each FS
|
|
MDSS_REQUIRED = 2
|
|
CLIENTS_REQUIRED = 2
|
|
|
|
def setUp(self):
|
|
super(TestMultiFS, self).setUp()
|
|
|
|
# we might have it - the client - if the same cluster was used for a
|
|
# different vstart_runner.py run.
|
|
self.run_ceph_cmd(f'auth rm {self.client_name}')
|
|
|
|
self.fs1 = self.fs
|
|
self.fs2 = self.mds_cluster.newfs(name='cephfs2', create=True)
|
|
|
|
# we'll reassign caps to client.1 so that it can operate with cephfs2
|
|
self.run_ceph_cmd(f'auth caps client.{self.mount_b.client_id} mon '
|
|
f'"allow r" osd "allow rw '
|
|
f'pool={self.fs2.get_data_pool_name()}" mds allow')
|
|
self.mount_b.remount(cephfs_name=self.fs2.name)
|
|
|
|
|
|
class TestMONCaps(TestMultiFS):
|
|
|
|
def test_moncap_with_one_fs_names(self):
|
|
self.captester = MonCapTester()
|
|
moncap = gen_mon_cap_str((('r', self.fs1.name),))
|
|
self.create_client(self.client_id, moncap)
|
|
|
|
self.captester.run_mon_cap_tests(self.fs1, self.client_id)
|
|
|
|
def test_moncap_with_multiple_fs_names(self):
|
|
self.captester = MonCapTester()
|
|
moncap = gen_mon_cap_str((('r', self.fs1.name), ('r', self.fs2.name)))
|
|
self.create_client(self.client_id, moncap)
|
|
|
|
self.captester.run_mon_cap_tests(self.fs1, self.client_id)
|
|
|
|
def test_moncap_with_blanket_allow(self):
|
|
self.captester = MonCapTester()
|
|
moncap = 'allow r'
|
|
self.create_client(self.client_id, moncap)
|
|
|
|
self.captester.run_mon_cap_tests(self.fs1, self.client_id)
|
|
|
|
|
|
#TODO: add tests for capsecs 'p' and 's'.
|
|
class TestMDSCaps(TestMultiFS):
|
|
"""
|
|
0. Have 2 FSs on Ceph cluster.
|
|
1. Create new files on both FSs.
|
|
2. Create a new client that has authorization for both FSs.
|
|
3. Remount the current mounts with this new client.
|
|
4. Test read and write on both FSs.
|
|
"""
|
|
def test_rw_with_fsname_and_no_path_in_cap(self):
|
|
PERM = 'rw'
|
|
self.captesters = (MdsCapTester(self.mount_a),
|
|
MdsCapTester(self.mount_b))
|
|
|
|
moncap, osdcap, mdscap = self._gen_caps(PERM, both_fsnames=True)
|
|
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
|
|
self.remount_with_new_client(keyring)
|
|
|
|
self.captesters[0].run_mds_cap_tests(PERM)
|
|
self.captesters[1].run_mds_cap_tests(PERM)
|
|
|
|
def test_r_with_fsname_and_no_path_in_cap(self):
|
|
PERM = 'r'
|
|
self.captesters = (MdsCapTester(self.mount_a),
|
|
MdsCapTester(self.mount_b))
|
|
|
|
moncap, osdcap, mdscap = self._gen_caps(PERM, both_fsnames=True)
|
|
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
|
|
self.remount_with_new_client(keyring)
|
|
|
|
self.captesters[0].run_mds_cap_tests(PERM)
|
|
self.captesters[1].run_mds_cap_tests(PERM)
|
|
|
|
def test_rw_with_fsname_and_path_in_cap(self):
|
|
PERM, CEPHFS_MNTPT = 'rw', '/dir1'
|
|
self.mount_a.run_shell(f'mkdir .{CEPHFS_MNTPT}')
|
|
self.mount_b.run_shell(f'mkdir .{CEPHFS_MNTPT}')
|
|
self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
|
|
MdsCapTester(self.mount_b, CEPHFS_MNTPT))
|
|
|
|
moncap, osdcap, mdscap = self._gen_caps(PERM, True, CEPHFS_MNTPT)
|
|
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
|
|
self.remount_with_new_client(keyring, CEPHFS_MNTPT)
|
|
|
|
self.captesters[0].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
|
|
self.captesters[1].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
|
|
|
|
def test_r_with_fsname_and_path_in_cap(self):
|
|
PERM, CEPHFS_MNTPT = 'r', '/dir1'
|
|
self.mount_a.run_shell(f'mkdir .{CEPHFS_MNTPT}')
|
|
self.mount_b.run_shell(f'mkdir .{CEPHFS_MNTPT}')
|
|
self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
|
|
MdsCapTester(self.mount_b, CEPHFS_MNTPT))
|
|
|
|
moncap, osdcap, mdscap = self._gen_caps(PERM, True, CEPHFS_MNTPT)
|
|
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
|
|
self.remount_with_new_client(keyring, CEPHFS_MNTPT)
|
|
|
|
self.captesters[0].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
|
|
self.captesters[1].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
|
|
|
|
# XXX: this tests the backward compatibility; "allow rw path=<dir1>" is
|
|
# treated as "allow rw fsname=* path=<dir1>"
|
|
def test_rw_with_no_fsname_and_path_in_cap(self):
|
|
PERM, CEPHFS_MNTPT = 'rw', '/dir1'
|
|
self.mount_a.run_shell(f'mkdir .{CEPHFS_MNTPT}')
|
|
self.mount_b.run_shell(f'mkdir .{CEPHFS_MNTPT}')
|
|
self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
|
|
MdsCapTester(self.mount_b, CEPHFS_MNTPT))
|
|
|
|
moncap, osdcap, mdscap = self._gen_caps(PERM, False, CEPHFS_MNTPT)
|
|
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
|
|
self.remount_with_new_client(keyring, CEPHFS_MNTPT)
|
|
|
|
self.captesters[0].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
|
|
self.captesters[1].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
|
|
|
|
# XXX: this tests the backward compatibility; "allow r path=<dir1>" is
|
|
# treated as "allow r fsname=* path=<dir1>"
|
|
def test_r_with_no_fsname_and_path_in_cap(self):
|
|
PERM, CEPHFS_MNTPT = 'r', '/dir1'
|
|
self.mount_a.run_shell(f'mkdir .{CEPHFS_MNTPT}')
|
|
self.mount_b.run_shell(f'mkdir .{CEPHFS_MNTPT}')
|
|
self.captesters = (MdsCapTester(self.mount_a, CEPHFS_MNTPT),
|
|
MdsCapTester(self.mount_b, CEPHFS_MNTPT))
|
|
|
|
moncap, osdcap, mdscap = self._gen_caps(PERM, False, CEPHFS_MNTPT)
|
|
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
|
|
self.remount_with_new_client(keyring, CEPHFS_MNTPT)
|
|
|
|
self.captesters[0].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
|
|
self.captesters[1].run_mds_cap_tests(PERM, CEPHFS_MNTPT)
|
|
|
|
def test_rw_with_no_fsname_and_no_path(self):
|
|
PERM = 'rw'
|
|
self.captesters = (MdsCapTester(self.mount_a),
|
|
MdsCapTester(self.mount_b))
|
|
|
|
moncap, osdcap, mdscap = self._gen_caps(PERM)
|
|
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
|
|
self.remount_with_new_client(keyring)
|
|
|
|
self.captesters[0].run_mds_cap_tests(PERM)
|
|
self.captesters[1].run_mds_cap_tests(PERM)
|
|
|
|
def test_r_with_no_fsname_and_no_path(self):
|
|
PERM = 'r'
|
|
self.captesters = (MdsCapTester(self.mount_a),
|
|
MdsCapTester(self.mount_b))
|
|
|
|
moncap, osdcap, mdscap = self._gen_caps(PERM)
|
|
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
|
|
self.remount_with_new_client(keyring)
|
|
|
|
self.captesters[0].run_mds_cap_tests(PERM)
|
|
self.captesters[1].run_mds_cap_tests(PERM)
|
|
|
|
def tearDown(self):
|
|
self.mount_a.umount_wait()
|
|
self.mount_b.umount_wait()
|
|
|
|
super(type(self), self).tearDown()
|
|
|
|
def _gen_caps(self, perm, both_fsnames=False, cephfs_mntpt='/'):
|
|
moncap = 'allow r'
|
|
osdcap = gen_osd_cap_str(((perm, self.fs1.name),
|
|
(perm, self.fs2.name)))
|
|
|
|
if both_fsnames:
|
|
mdscap = gen_mds_cap_str(((perm, self.fs1.name, cephfs_mntpt),
|
|
(perm, self.fs2.name, cephfs_mntpt)))
|
|
else:
|
|
mdscap = gen_mds_cap_str(((perm, None, cephfs_mntpt),
|
|
(perm, None, cephfs_mntpt)))
|
|
|
|
return moncap, osdcap, mdscap
|
|
|
|
def remount_with_new_client(self, keyring, cephfs_mntpt='/'):
|
|
log.info(f'keyring generated for testing is -\n{keyring}')
|
|
|
|
if (isinstance(cephfs_mntpt, str) and cephfs_mntpt != '/' and
|
|
cephfs_mntpt[0] != '/'):
|
|
cephfs_mntpt = '/' + cephfs_mntpt
|
|
|
|
keyring_path = self.mount_a.client_remote.mktemp(data=keyring)
|
|
self.mount_a.remount(client_id=self.client_id,
|
|
client_keyring_path=keyring_path,
|
|
client_remote=self.mount_a.client_remote,
|
|
cephfs_name=self.fs1.name,
|
|
cephfs_mntpt=cephfs_mntpt,
|
|
hostfs_mntpt=self.mount_a.hostfs_mntpt,
|
|
wait=True)
|
|
|
|
keyring_path = self.mount_b.client_remote.mktemp(data=keyring)
|
|
self.mount_b.remount(client_id=self.client_id,
|
|
client_keyring_path=keyring_path,
|
|
client_remote=self.mount_b.client_remote,
|
|
cephfs_name=self.fs2.name,
|
|
cephfs_mntpt=cephfs_mntpt,
|
|
hostfs_mntpt=self.mount_b.hostfs_mntpt,
|
|
wait=True)
|
|
|
|
|
|
class TestClientsWithoutAuth(TestMultiFS):
|
|
|
|
def setUp(self):
|
|
super(TestClientsWithoutAuth, self).setUp()
|
|
|
|
# TODO: When MON and OSD caps for a Ceph FS are assigned to a
|
|
# client but MDS caps are not, mount.ceph prints "permission
|
|
# denied". But when MON caps are not assigned and MDS and OSD
|
|
# caps are, mount.ceph prints "no mds server or cluster laggy"
|
|
# instead of "permission denied".
|
|
#
|
|
# Before uncommenting the following line a fix would be required
|
|
# for latter case to change "no mds server is up or the cluster is
|
|
# laggy" to "permission denied".
|
|
self.kernel_errmsgs = ('permission denied', 'no mds server is up or '
|
|
'the cluster is laggy', 'no such file or '
|
|
'directory',
|
|
'input/output error')
|
|
|
|
# TODO: When MON and OSD caps are assigned for a Ceph FS to a
|
|
# client but MDS caps are not, ceph-fuse prints "operation not
|
|
# permitted". But when MON caps are not assigned and MDS and OSD
|
|
# caps are, ceph-fuse prints "no such file or directory" instead
|
|
# of "operation not permitted".
|
|
#
|
|
# Before uncommenting the following line a fix would be required
|
|
# for the latter case to change "no such file or directory" to
|
|
# "operation not permitted".
|
|
#self.assertIn('operation not permitted', retval[2].lower())
|
|
self.fuse_errmsgs = ('operation not permitted', 'no such file or '
|
|
'directory')
|
|
|
|
if 'kernel' in str(type(self.mount_a)).lower():
|
|
self.errmsgs = self.kernel_errmsgs
|
|
elif 'fuse' in str(type(self.mount_a)).lower():
|
|
self.errmsgs = self.fuse_errmsgs
|
|
else:
|
|
raise RuntimeError('strange, the client was neither based on '
|
|
'kernel nor FUSE.')
|
|
|
|
def check_that_mount_failed_for_right_reason(self, stderr):
|
|
stderr = stderr.lower()
|
|
for errmsg in self.errmsgs:
|
|
if errmsg in stderr:
|
|
break
|
|
else:
|
|
raise AssertionError('can\'t find expected set of words in the '
|
|
f'stderr\nself.errmsgs - {self.errmsgs}\n'
|
|
f'stderr - {stderr}')
|
|
|
|
def test_mount_all_caps_absent(self):
|
|
# setup part...
|
|
keyring = self.fs1.authorize(self.client_id, ('/', 'rw'))
|
|
keyring_path = self.mount_a.client_remote.mktemp(data=keyring)
|
|
|
|
# mount the FS for which client has no auth...
|
|
retval = self.mount_a.remount(client_id=self.client_id,
|
|
client_keyring_path=keyring_path,
|
|
cephfs_name=self.fs2.name,
|
|
check_status=False)
|
|
|
|
# tests...
|
|
self.assertIsInstance(retval, tuple)
|
|
self.assertEqual(len(retval), 3)
|
|
self.assertIsInstance(retval[0], CommandFailedError)
|
|
self.check_that_mount_failed_for_right_reason(retval[2])
|
|
|
|
def test_mount_mon_and_osd_caps_present_mds_caps_absent(self):
|
|
# setup part...
|
|
moncap = gen_mon_cap_str((('rw', self.fs1.name),
|
|
('rw', self.fs2.name)))
|
|
mdscap = gen_mds_cap_str((('rw', self.fs1.name),))
|
|
osdcap = gen_osd_cap_str((('rw', self.fs1.name,),
|
|
('rw', self.fs2.name)))
|
|
keyring = self.create_client(self.client_id, moncap, osdcap, mdscap)
|
|
keyring_path = self.mount_a.client_remote.mktemp(data=keyring)
|
|
|
|
# mount the FS for which client has no auth...
|
|
retval = self.mount_a.remount(client_id=self.client_id,
|
|
client_keyring_path=keyring_path,
|
|
cephfs_name=self.fs2.name,
|
|
check_status=False)
|
|
|
|
# tests...
|
|
self.assertIsInstance(retval, tuple)
|
|
self.assertEqual(len(retval), 3)
|
|
self.assertIsInstance(retval[0], CommandFailedError)
|
|
self.check_that_mount_failed_for_right_reason(retval[2])
|