qa/tasks/cephfs: test fs subvolume authorize/deauthorize

Fixes: https://tracker.ceph.com/issues/40401
Signed-off-by: Ramana Raja <rraja@redhat.com>
Signed-off-by: Kotresh HR <khiremat@redhat.com>
This commit is contained in:
Ramana Raja 2019-07-11 16:44:35 +05:30 committed by Kotresh HR
parent 6c3b7547fb
commit 7c98dc1ad3

View File

@ -26,7 +26,7 @@ class TestVolumesHelper(CephFSTestCase):
TEST_FILE_NAME_PREFIX="subvolume_file"
# for filling subvolume with data
CLIENTS_REQUIRED = 1
CLIENTS_REQUIRED = 2
MDSS_REQUIRED = 2
# io defaults
@ -345,6 +345,26 @@ class TestVolumesHelper(CephFSTestCase):
else:
self.mount_a.run_shell(['rmdir', trashpath])
def _configure_guest_auth(self, guest_mount, authid, key):
"""
Set up auth credentials for a guest client.
"""
# Create keyring file for the guest client.
keyring_txt = dedent("""
[client.{authid}]
key = {key}
""".format(authid=authid,key=key))
guest_mount.client_id = authid
guest_mount.client_remote.write_file(guest_mount.get_keyring_path(),
keyring_txt, sudo=True)
# Add a guest client section to the ceph config file.
self.config_set("client.{0}".format(authid), "debug client", 20)
self.config_set("client.{0}".format(authid), "debug objecter", 20)
self.set_conf("client.{0}".format(authid),
"keyring", guest_mount.get_keyring_path())
def setUp(self):
super(TestVolumesHelper, self).setUp()
self.volname = None
@ -1154,6 +1174,122 @@ class TestSubvolumes(TestVolumesHelper):
# verify trash dir is clean
self._wait_for_trash_empty()
### authorize operations
def test_authorize_deauthorize_legacy_subvolume(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
authid = "alice"
guest_mount = self.mount_b
guest_mount.umount_wait()
# emulate a old-fashioned subvolume in a custom group
createpath = os.path.join(".", "volumes", group, subvolume)
self.mount_a.run_shell(['mkdir', '-p', createpath])
# add required xattrs to subvolume
default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
self.mount_a.setfattr(createpath, 'ceph.dir.layout.pool', default_pool)
mount_path = os.path.join("/", "volumes", group, subvolume)
# authorize guest authID read-write access to subvolume
key = self._fs_cmd("subvolume", "authorize", self.volname, subvolume, authid,
"--group_name", group)
# guest authID should exist
existing_ids = [a['entity'] for a in self.auth_list()]
self.assertIn("client.{0}".format(authid), existing_ids)
# configure credentials for guest client
self._configure_guest_auth(guest_mount, authid, key)
# mount the subvolume, and write to it
guest_mount.mount(cephfs_mntpt=mount_path)
guest_mount.write_n_mb("data.bin", 1)
# authorize guest authID read access to subvolume
key = self._fs_cmd("subvolume", "authorize", self.volname, subvolume, authid,
"--group_name", group, "--access_level", "r")
# guest client sees the change in access level to read only after a
# remount of the subvolume.
guest_mount.umount_wait()
guest_mount.mount(cephfs_mntpt=mount_path)
# read existing content of the subvolume
self.assertListEqual(guest_mount.ls(guest_mount.mountpoint), ["data.bin"])
# cannot write into read-only subvolume
with self.assertRaises(CommandFailedError):
guest_mount.write_n_mb("rogue.bin", 1)
# cleanup
guest_mount.umount_wait()
self._fs_cmd("subvolume", "deauthorize", self.volname, subvolume, authid,
"--group_name", group)
# guest authID should no longer exist
existing_ids = [a['entity'] for a in self.auth_list()]
self.assertNotIn("client.{0}".format(authid), existing_ids)
self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--group_name", group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
def test_authorize_deauthorize_subvolume(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
authid = "alice"
guest_mount = self.mount_b
guest_mount.umount_wait()
# create group
self._fs_cmd("subvolumegroup", "create", self.volname, group)
# create subvolume in group
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
mount_path = self._fs_cmd("subvolume", "getpath", self.volname, subvolume,
"--group_name", group).rstrip()
# authorize guest authID read-write access to subvolume
key = self._fs_cmd("subvolume", "authorize", self.volname, subvolume, authid,
"--group_name", group)
# guest authID should exist
existing_ids = [a['entity'] for a in self.auth_list()]
self.assertIn("client.{0}".format(authid), existing_ids)
# configure credentials for guest client
self._configure_guest_auth(guest_mount, authid, key)
# mount the subvolume, and write to it
guest_mount.mount(cephfs_mntpt=mount_path)
guest_mount.write_n_mb("data.bin", 1)
# authorize guest authID read access to subvolume
key = self._fs_cmd("subvolume", "authorize", self.volname, subvolume, authid,
"--group_name", group, "--access_level", "r")
# guest client sees the change in access level to read only after a
# remount of the subvolume.
guest_mount.umount_wait()
guest_mount.mount(cephfs_mntpt=mount_path)
# read existing content of the subvolume
self.assertListEqual(guest_mount.ls(guest_mount.mountpoint), ["data.bin"])
# cannot write into read-only subvolume
with self.assertRaises(CommandFailedError):
guest_mount.write_n_mb("rogue.bin", 1)
# cleanup
guest_mount.umount_wait()
self._fs_cmd("subvolume", "deauthorize", self.volname, subvolume, authid,
"--group_name", group)
# guest authID should no longer exist
existing_ids = [a['entity'] for a in self.auth_list()]
self.assertNotIn("client.{0}".format(authid), existing_ids)
self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--group_name", group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
def test_subvolume_pin_random(self):
self.fs.set_max_mds(2)
self.fs.wait_for_daemons()
@ -3302,7 +3438,8 @@ class TestMisc(TestVolumesHelper):
"""Miscellaneous tests related to FS volume, subvolume group, and subvolume operations."""
def test_connection_expiration(self):
# unmount any cephfs mounts
self.mount_a.umount_wait()
for i in range(0, self.CLIENTS_REQUIRED):
self.mounts[i].umount_wait()
sessions = self._session_list()
self.assertLessEqual(len(sessions), 1) # maybe mgr is already mounted
@ -3317,7 +3454,8 @@ class TestMisc(TestVolumesHelper):
def test_mgr_eviction(self):
# unmount any cephfs mounts
self.mount_a.umount_wait()
for i in range(0, self.CLIENTS_REQUIRED):
self.mounts[i].umount_wait()
sessions = self._session_list()
self.assertLessEqual(len(sessions), 1) # maybe mgr is already mounted