mgr/volumes: Use snapshot root directory attrs when creating clone root

If a subvolumes mode or uid/gid values are changed post a snapshot,
and a clone of a snapshot prior to the change is initiated, the clone
inherits the current source subvolumes attributes, rather than the
snapshots attributes.

Fixing this by using the snapshots subvolume root attributes to create
the clone subvolumes root.

Following attributes are picked from the source subvolume snapshot:
- uid, gid, mode, data pool, pool namespace, quota

Fixes: https://tracker.ceph.com/issues/46163

Signed-off-by: Shyamsundar Ranganathan <srangana@redhat.com>
This commit is contained in:
Shyamsundar Ranganathan 2020-06-23 19:57:52 -04:00
parent c625a51635
commit 0eaf9b0bd2
5 changed files with 286 additions and 114 deletions

View File

@ -59,16 +59,22 @@ class TestVolumes(CephFSTestCase):
def _check_clone_canceled(self, clone, clone_group=None):
self.__check_clone_state("canceled", clone, clone_group, timo=1)
def _get_subvolume_snapshot_path(self, snapshot, subvol_path):
(base_path, uuid_str) = os.path.split(subvol_path)
return os.path.join(base_path, ".snap", snapshot, uuid_str)
def _get_subvolume_snapshot_path(self, subvolume, snapshot, source_group, subvol_path, source_version):
if source_version == 2:
# v2
if subvol_path is not None:
(base_path, uuid_str) = os.path.split(subvol_path)
else:
(base_path, uuid_str) = os.path.split(self._get_subvolume_path(self.volname, subvolume, group_name=source_group))
return os.path.join(base_path, ".snap", snapshot, uuid_str)
def _verify_clone_attrs(self, subvolume, clone, source_group=None, clone_group=None, snapshot=None, subvol_path=None):
if snapshot and subvol_path:
path1 = self._get_subvolume_snapshot_path(snapshot, subvol_path)
else:
path1 = self._get_subvolume_path(self.volname, subvolume, group_name=source_group)
path2 = self._get_subvolume_path(self.volname, clone, group_name=clone_group)
# v1
base_path = self._get_subvolume_path(self.volname, subvolume, group_name=source_group)
return os.path.join(base_path, ".snap", snapshot)
def _verify_clone_attrs(self, source_path, clone_path):
path1 = source_path
path2 = clone_path
p = self.mount_a.run_shell(["find", path1])
paths = p.stdout.getvalue().strip().split()
@ -102,17 +108,37 @@ class TestVolumes(CephFSTestCase):
cval = int(self.mount_a.run_shell(['stat', '-c' '%Y', sink_path]).stdout.getvalue().strip())
self.assertEqual(sval, cval)
def _verify_clone(self, subvolume, clone, source_group=None, clone_group=None, snapshot=None, subvol_path=None, timo=120):
# pass in snapshot and subvol_path (subvolume path when snapshot was taken) when subvolume is removed
# but snapshots are retained for clone verification
if snapshot and subvol_path:
path1 = self._get_subvolume_snapshot_path(snapshot, subvol_path)
def _verify_clone_root(self, source_path, clone_path, clone, clone_group, clone_pool):
# verifies following clone root attrs quota, data_pool and pool_namespace
# remaining attributes of clone root are validated in _verify_clone_attrs
clone_info = json.loads(self._get_subvolume_info(self.volname, clone, clone_group))
# verify quota is inherited from source snapshot
src_quota = self.mount_a.getfattr(source_path, "ceph.quota.max_bytes")
self.assertEqual(clone_info["bytes_quota"], "infinite" if src_quota is None else int(src_quota))
if clone_pool:
# verify pool is set as per request
self.assertEqual(clone_info["data_pool"], clone_pool)
else:
path1 = self._get_subvolume_path(self.volname, subvolume, group_name=source_group)
# verify pool and pool namespace are inherited from snapshot
self.assertEqual(clone_info["data_pool"],
self.mount_a.getfattr(source_path, "ceph.dir.layout.pool"))
self.assertEqual(clone_info["pool_namespace"],
self.mount_a.getfattr(source_path, "ceph.dir.layout.pool_namespace"))
def _verify_clone(self, subvolume, snapshot, clone,
source_group=None, clone_group=None, clone_pool=None,
subvol_path=None, source_version=2, timo=120):
# pass in subvol_path (subvolume path when snapshot was taken) when subvolume is removed
# but snapshots are retained for clone verification
path1 = self._get_subvolume_snapshot_path(subvolume, snapshot, source_group, subvol_path, source_version)
path2 = self._get_subvolume_path(self.volname, clone, group_name=clone_group)
check = 0
# TODO: currently rentries are not being returned for snapshots, if source entries are removed
# TODO: currently snapshot rentries are not stable if snapshot source entries
# are removed, https://tracker.ceph.com/issues/46747
while check < timo and subvol_path is None:
val1 = int(self.mount_a.getfattr(path1, "ceph.dir.rentries"))
val2 = int(self.mount_a.getfattr(path2, "ceph.dir.rentries"))
@ -122,8 +148,8 @@ class TestVolumes(CephFSTestCase):
time.sleep(1)
self.assertTrue(check < timo)
self._verify_clone_attrs(subvolume, clone, source_group=source_group, clone_group=clone_group,
snapshot=snapshot, subvol_path=subvol_path)
self._verify_clone_root(path1, path2, clone, clone_group, clone_pool)
self._verify_clone_attrs(path1, path2)
def _generate_random_volume_name(self, count=1):
n = self.volume_start
@ -201,6 +227,25 @@ class TestVolumes(CephFSTestCase):
def _delete_test_volume(self):
self._fs_cmd("volume", "rm", self.volname, "--yes-i-really-mean-it")
def _do_subvolume_pool_and_namespace_update(self, subvolume, pool=None, pool_namespace=None, subvolume_group=None):
subvolpath = self._get_subvolume_path(self.volname, subvolume, group_name=subvolume_group)
if pool is not None:
self.mount_a.setfattr(subvolpath, 'ceph.dir.layout.pool', pool)
if pool_namespace is not None:
self.mount_a.setfattr(subvolpath, 'ceph.dir.layout.pool_namespace', pool_namespace)
def _do_subvolume_attr_update(self, subvolume, uid, gid, mode, subvolume_group=None):
subvolpath = self._get_subvolume_path(self.volname, subvolume, group_name=subvolume_group)
# mode
self.mount_a.run_shell(['chmod', mode, subvolpath])
# ownership
self.mount_a.run_shell(['chown', uid, subvolpath])
self.mount_a.run_shell(['chgrp', gid, subvolpath])
def _do_subvolume_io(self, subvolume, subvolume_group=None, create_dir=None,
number_of_files=DEFAULT_NUMBER_OF_FILES, file_size=DEFAULT_FILE_SIZE):
# get subvolume path for IO
@ -266,7 +311,7 @@ class TestVolumes(CephFSTestCase):
def _create_v1_subvolume(self, subvol_name, subvol_group=None, has_snapshot=True, subvol_type='subvolume', state='complete'):
group = subvol_group if subvol_group is not None else '_nogroup'
basepath = os.path.join(".", "volumes", group, subvol_name)
basepath = os.path.join("volumes", group, subvol_name)
uuid_str = str(uuid.uuid4())
createpath = os.path.join(basepath, uuid_str)
self.mount_a.run_shell(['mkdir', '-p', createpath])
@ -281,7 +326,7 @@ class TestVolumes(CephFSTestCase):
self.mount_a.setfattr(createpath, 'ceph.dir.layout.pool', default_pool)
# create a v1 .meta file
meta_contents = "[GLOBAL]\nversion = 1\ntype = {0}\npath = {1}\nstate = {2}\n".format(subvol_type, createpath, state)
meta_contents = "[GLOBAL]\nversion = 1\ntype = {0}\npath = {1}\nstate = {2}\n".format(subvol_type, "/" + createpath, state)
if state == 'pending':
# add a fake clone source
meta_contents = meta_contents + '[source]\nvolume = fake\nsubvolume = fake\nsnapshot = fake\n'
@ -1713,13 +1758,16 @@ class TestVolumes(CephFSTestCase):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
clone1, clone2 = self._generate_random_clone_name(2)
mode = "777"
uid = "1000"
gid = "1000"
# emulate a v1 subvolume -- in the default group
subvolume_path = self._create_v1_subvolume(subvolume)
# getpath
subvolpath = self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
self.assertEqual(subvolpath.rstrip(), subvolume_path)
subvolpath = self._get_subvolume_path(self.volname, subvolume)
self.assertEqual(subvolpath, subvolume_path)
# ls
subvolumes = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
@ -1740,18 +1788,18 @@ class TestVolumes(CephFSTestCase):
self.assertIn(feature, subvol_info["features"], msg="expected feature '{0}' in subvolume".format(feature))
# resize
nsize = self.DEFAULT_FILE_SIZE*1024*1024
nsize = self.DEFAULT_FILE_SIZE*1024*1024*10
self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
subvol_info = json.loads(self._get_subvolume_info(self.volname, subvolume))
for md in subvol_md:
self.assertIn(md, subvol_info, "'{0}' key not present in metadata of subvolume".format(md))
self.assertEqual(subvol_info["bytes_quota"], nsize, "bytes_quota should be set to '{0}'".format(nsize))
# create (idempotent)
self._fs_cmd("subvolume", "create", self.volname, subvolume)
# create (idempotent) (change some attrs, to ensure attrs are preserved from the snapshot on clone)
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode", mode, "--uid", uid, "--gid", gid)
# TODO: do some IO (fails possibly due to permissions)
#self._do_subvolume_io(subvolume, number_of_files=64)
# do some IO
self._do_subvolume_io(subvolume, number_of_files=8)
# snap-create
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
@ -1765,6 +1813,9 @@ class TestVolumes(CephFSTestCase):
# ensure clone is v2
self._assert_meta_location_and_version(self.volname, clone1, version=2)
# verify clone
self._verify_clone(subvolume, snapshot, clone1, source_version=1)
# clone (older snapshot)
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, 'fake', clone2)
@ -1774,6 +1825,10 @@ class TestVolumes(CephFSTestCase):
# ensure clone is v2
self._assert_meta_location_and_version(self.volname, clone2, version=2)
# verify clone
# TODO: rentries will mismatch till this is fixed https://tracker.ceph.com/issues/46747
#self._verify_clone(subvolume, 'fake', clone2, source_version=1)
# snap-info
snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
for md in snap_md:
@ -1820,11 +1875,11 @@ class TestVolumes(CephFSTestCase):
self._create_v1_subvolume(subvolume3, subvol_type='clone', has_snapshot=False, state='pending')
# this would attempt auto-upgrade on access, but fail to do so as snapshots exist
subvolpath1 = self._fs_cmd("subvolume", "getpath", self.volname, subvolume1)
self.assertEqual(subvolpath1.rstrip(), subvol1_path)
subvolpath1 = self._get_subvolume_path(self.volname, subvolume1)
self.assertEqual(subvolpath1, subvol1_path)
subvolpath2 = self._fs_cmd("subvolume", "getpath", self.volname, subvolume2, group)
self.assertEqual(subvolpath2.rstrip(), subvol2_path)
subvolpath2 = self._get_subvolume_path(self.volname, subvolume2, group_name=group)
self.assertEqual(subvolpath2, subvol2_path)
# this would attempt auto-upgrade on access, but fail to do so as volume is not complete
# use clone status, as only certain operations are allowed in pending state
@ -1875,11 +1930,11 @@ class TestVolumes(CephFSTestCase):
subvol2_path = self._create_v1_subvolume(subvolume2, subvol_group=group, has_snapshot=False)
# this would attempt auto-upgrade on access
subvolpath1 = self._fs_cmd("subvolume", "getpath", self.volname, subvolume1)
self.assertEqual(subvolpath1.rstrip(), subvol1_path)
subvolpath1 = self._get_subvolume_path(self.volname, subvolume1)
self.assertEqual(subvolpath1, subvol1_path)
subvolpath2 = self._fs_cmd("subvolume", "getpath", self.volname, subvolume2, group)
self.assertEqual(subvolpath2.rstrip(), subvol2_path)
subvolpath2 = self._get_subvolume_path(self.volname, subvolume2, group_name=group)
self.assertEqual(subvolpath2, subvol2_path)
# ensure metadata file is in v2 location, with version retained as v2
self._assert_meta_location_and_version(self.volname, subvolume1, version=2)
@ -2128,7 +2183,7 @@ class TestVolumes(CephFSTestCase):
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume, clone, snapshot=snapshot, subvol_path=subvol_path)
self._verify_clone(subvolume, snapshot, clone, subvol_path=subvol_path)
# remove snapshots (removes retained volume)
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
@ -2172,7 +2227,7 @@ class TestVolumes(CephFSTestCase):
self._wait_for_clone_to_complete(subvolume)
# verify clone
self._verify_clone(subvolume, subvolume, snapshot=snapshot, subvol_path=subvol_path)
self._verify_clone(subvolume, snapshot, subvolume, subvol_path=subvol_path)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
@ -2217,7 +2272,7 @@ class TestVolumes(CephFSTestCase):
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume, clone, snapshot=snapshot1, subvol_path=subvol1_path)
self._verify_clone(subvolume, snapshot1, clone, subvol_path=subvol1_path)
# create a snapshot on the clone
self._fs_cmd("subvolume", "snapshot", "create", self.volname, clone, snapshot2)
@ -2303,7 +2358,7 @@ class TestVolumes(CephFSTestCase):
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume, clone, snapshot=snapshot2, subvol_path=subvol2_path)
self._verify_clone(subvolume, snapshot2, clone, subvol_path=subvol2_path)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot1)
@ -2349,12 +2404,12 @@ class TestVolumes(CephFSTestCase):
# now, unprotect snapshot
self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, snapshot, clone)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, clone)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone)
@ -2382,12 +2437,12 @@ class TestVolumes(CephFSTestCase):
# check clone status
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume, snapshot, clone)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, clone)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone)
@ -2419,12 +2474,12 @@ class TestVolumes(CephFSTestCase):
# check clone status
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume, snapshot, clone, clone_pool=new_pool)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, clone)
subvol_path = self._get_subvolume_path(self.volname, clone)
desired_pool = self.mount_a.getfattr(subvol_path, "ceph.dir.layout.pool")
self.assertEqual(desired_pool, new_pool)
@ -2444,6 +2499,9 @@ class TestVolumes(CephFSTestCase):
mode = "777"
uid = "1000"
gid = "1000"
new_uid = "1001"
new_gid = "1001"
new_mode = "700"
# create subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode", mode, "--uid", uid, "--gid", gid)
@ -2454,17 +2512,64 @@ class TestVolumes(CephFSTestCase):
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
# change subvolume attrs (to ensure clone picks up snapshot attrs)
self._do_subvolume_attr_update(subvolume, new_uid, new_gid, new_mode)
# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
# check clone status
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume, snapshot, clone)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_clone_inherit_snapshot_namespace_and_size(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
clone = self._generate_random_clone_name()
osize = self.DEFAULT_FILE_SIZE*1024*1024*12
# create subvolume, in an isolated namespace with a specified size
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--namespace-isolated", "--size", str(osize))
# do some IO
self._do_subvolume_io(subvolume, number_of_files=8)
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
# create a pool different from current subvolume pool
subvol_path = self._get_subvolume_path(self.volname, subvolume)
default_pool = self.mount_a.getfattr(subvol_path, "ceph.dir.layout.pool")
new_pool = "new_pool"
self.assertNotEqual(default_pool, new_pool)
self.fs.add_data_pool(new_pool)
# update source subvolume pool
self._do_subvolume_pool_and_namespace_update(subvolume, pool=new_pool, pool_namespace="")
# schedule a clone, with NO --pool specification
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
# check clone status
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume, clone)
self._verify_clone(subvolume, snapshot, clone)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
@ -2493,12 +2598,12 @@ class TestVolumes(CephFSTestCase):
# check clone status
self._wait_for_clone_to_complete(clone1)
# verify clone
self._verify_clone(subvolume, snapshot, clone1)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, clone1)
# now the clone is just like a normal subvolume -- snapshot the clone and fork
# another clone. before that do some IO so it's can be differentiated.
self._do_subvolume_io(clone1, create_dir="data", number_of_files=32)
@ -2512,12 +2617,12 @@ class TestVolumes(CephFSTestCase):
# check clone status
self._wait_for_clone_to_complete(clone2)
# verify clone
self._verify_clone(clone1, snapshot, clone2)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, clone1, snapshot)
# verify clone
self._verify_clone(clone1, clone2)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone1)
@ -2550,12 +2655,12 @@ class TestVolumes(CephFSTestCase):
# check clone status
self._wait_for_clone_to_complete(clone, clone_group=group)
# verify clone
self._verify_clone(subvolume, snapshot, clone, clone_group=group)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, clone, clone_group=group)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone, group)
@ -2590,12 +2695,12 @@ class TestVolumes(CephFSTestCase):
# check clone status
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume, snapshot, clone, source_group=group)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
# verify clone
self._verify_clone(subvolume, clone, source_group=group)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
self._fs_cmd("subvolume", "rm", self.volname, clone)
@ -2632,12 +2737,12 @@ class TestVolumes(CephFSTestCase):
# check clone status
self._wait_for_clone_to_complete(clone, clone_group=c_group)
# verify clone
self._verify_clone(subvolume, snapshot, clone, source_group=s_group, clone_group=c_group)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, s_group)
# verify clone
self._verify_clone(subvolume, clone, source_group=s_group, clone_group=c_group)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume, s_group)
self._fs_cmd("subvolume", "rm", self.volname, clone, c_group)
@ -2664,6 +2769,10 @@ class TestVolumes(CephFSTestCase):
createpath = os.path.join(".", "volumes", "_nogroup", subvolume)
self.mount_a.run_shell(['mkdir', '-p', createpath])
# add required xattrs to subvolume
default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
self.mount_a.setfattr(createpath, 'ceph.dir.layout.pool', default_pool)
# do some IO
self._do_subvolume_io(subvolume, number_of_files=64)
@ -2687,12 +2796,12 @@ class TestVolumes(CephFSTestCase):
# check clone status
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume, snapshot, clone, source_version=1)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, clone)
# ensure metadata file is in v2 location, with required version v2
self._assert_meta_location_and_version(self.volname, clone)
@ -2736,12 +2845,12 @@ class TestVolumes(CephFSTestCase):
subvolpath = self._get_subvolume_path(self.volname, clone)
self.assertNotEqual(subvolpath, None)
# verify clone
self._verify_clone(subvolume, snapshot, clone)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, clone)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone)
@ -2781,12 +2890,12 @@ class TestVolumes(CephFSTestCase):
subvolpath = self._get_subvolume_path(self.volname, clone)
self.assertNotEqual(subvolpath, None)
# verify clone
self._verify_clone(subvolume, snapshot, clone)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, clone)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone)
@ -2826,12 +2935,12 @@ class TestVolumes(CephFSTestCase):
subvolpath = self._get_subvolume_path(self.volname, clone)
self.assertNotEqual(subvolpath, None)
# verify clone
self._verify_clone(subvolume, snapshot, clone)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, clone)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone)
@ -2897,12 +3006,12 @@ class TestVolumes(CephFSTestCase):
# check clone status
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume1, snapshot, clone)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume1, snapshot)
# verify clone
self._verify_clone(subvolume1, clone)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume1)
self._fs_cmd("subvolume", "rm", self.volname, subvolume2)
@ -2943,7 +3052,7 @@ class TestVolumes(CephFSTestCase):
self._wait_for_clone_to_complete(clone1)
# verify clone
self._verify_clone(subvolume, clone1)
self._verify_clone(subvolume, snapshot, clone1, clone_pool=new_pool)
# wait a bit so that subsequent I/O will give pool full error
time.sleep(120)
@ -2994,12 +3103,12 @@ class TestVolumes(CephFSTestCase):
# check clone status
self._wait_for_clone_to_complete(clone)
# verify clone
self._verify_clone(subvolume, snapshot, clone)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify clone
self._verify_clone(subvolume, clone)
# remove subvolumes
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone)

View File

@ -1,7 +1,10 @@
import os
import stat
import uuid
import errno
import logging
from hashlib import md5
from typing import Dict, Union
import cephfs
@ -117,33 +120,65 @@ class SubvolumeBase(object):
else:
self.metadata_mgr = MetadataManager(self.fs, self.config_path, 0o640)
def set_attrs(self, path, size, isolate_namespace, pool, uid, gid):
def get_attrs(self, pathname):
# get subvolume attributes
attrs = {} # type: Dict[str, Union[int, str, None]]
stx = self.fs.statx(pathname,
cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID | cephfs.CEPH_STATX_MODE,
cephfs.AT_SYMLINK_NOFOLLOW)
attrs["uid"] = int(stx["uid"])
attrs["gid"] = int(stx["gid"])
attrs["mode"] = int(int(stx["mode"]) & ~stat.S_IFMT(stx["mode"]))
try:
attrs["data_pool"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool').decode('utf-8')
except cephfs.NoData:
attrs["data_pool"] = None
try:
attrs["pool_namespace"] = self.fs.getxattr(pathname, 'ceph.dir.layout.pool_namespace').decode('utf-8')
except cephfs.NoData:
attrs["pool_namespace"] = None
try:
attrs["quota"] = int(self.fs.getxattr(pathname, 'ceph.quota.max_bytes').decode('utf-8'))
except cephfs.NoData:
attrs["quota"] = None
return attrs
def set_attrs(self, path, attrs):
# set subvolume attributes
# set size
if size is not None:
quota = attrs.get("quota")
if quota is not None:
try:
self.fs.setxattr(path, 'ceph.quota.max_bytes', str(size).encode('utf-8'), 0)
self.fs.setxattr(path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0)
except cephfs.InvalidValue as e:
raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(size))
raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota))
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
# set pool layout
if pool:
data_pool = attrs.get("data_pool")
if data_pool is not None:
try:
self.fs.setxattr(path, 'ceph.dir.layout.pool', pool.encode('utf-8'), 0)
self.fs.setxattr(path, 'ceph.dir.layout.pool', data_pool.encode('utf-8'), 0)
except cephfs.InvalidValue:
raise VolumeException(-errno.EINVAL,
"invalid pool layout '{0}' -- need a valid data pool".format(pool))
"invalid pool layout '{0}' -- need a valid data pool".format(data_pool))
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
# isolate namespace
xattr_key = xattr_val = None
if isolate_namespace:
pool_namespace = attrs.get("pool_namespace")
if pool_namespace is not None:
# enforce security isolation, use separate namespace for this subvolume
xattr_key = 'ceph.dir.layout.pool_namespace'
xattr_val = self.namespace
elif not pool:
xattr_val = pool_namespace
elif not data_pool:
# If subvolume's namespace layout is not set, then the subvolume's pool
# layout remains unset and will undesirably change with ancestor's
# pool layout changes.
@ -160,24 +195,26 @@ class SubvolumeBase(object):
raise VolumeException(-e.args[0], e.args[1])
# set uid/gid
uid = attrs.get("uid")
if uid is None:
uid = self.group.uid
else:
try:
uid = int(uid)
if uid < 0:
raise ValueError
except ValueError:
raise VolumeException(-errno.EINVAL, "invalid UID")
gid = attrs.get("gid")
if gid is None:
gid = self.group.gid
else:
try:
gid = int(gid)
if gid < 0:
raise ValueError
except ValueError:
raise VolumeException(-errno.EINVAL, "invalid GID")
if uid is not None and gid is not None:
self.fs.chown(path, uid, gid)

View File

@ -77,7 +77,14 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
try:
# create directory and set attributes
self.fs.mkdirs(subvol_path, mode)
self.set_attrs(subvol_path, size, isolate_nspace, pool, uid, gid)
attrs = {
'uid': uid,
'gid': gid,
'data_pool': pool,
'pool_namespace': self.namespace if isolate_nspace else None,
'quota': size
}
self.set_attrs(subvol_path, attrs)
# persist subvolume metadata
qpath = subvol_path.decode('utf-8')
@ -120,9 +127,18 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
try:
# source snapshot attrs are used to create clone subvolume.
# attributes of subvolume's content though, are synced during the cloning process.
attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
# override snapshot pool setting, if one is provided for the clone
if pool is not None:
attrs["data_pool"] = pool
attrs["pool_namespace"] = None
# create directory and set attributes
self.fs.mkdirs(subvol_path, source_subvolume.mode)
self.set_attrs(subvol_path, None, None, pool, source_subvolume.uid, source_subvolume.gid)
self.fs.mkdirs(subvol_path, attrs.get("mode"))
self.set_attrs(subvol_path, attrs)
# persist subvolume metadata and clone source
qpath = subvol_path.decode('utf-8')

View File

@ -119,7 +119,14 @@ class SubvolumeV2(SubvolumeV1):
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
try:
self.fs.mkdirs(subvol_path, mode)
self.set_attrs(subvol_path, size, isolate_nspace, pool, uid, gid)
attrs = {
'uid': uid,
'gid': gid,
'data_pool': pool,
'pool_namespace': self.namespace if isolate_nspace else None,
'quota': size
}
self.set_attrs(subvol_path, attrs)
# persist subvolume metadata
qpath = subvol_path.decode('utf-8')
@ -151,20 +158,18 @@ class SubvolumeV2(SubvolumeV1):
retained = self._is_retained()
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
try:
stx = self.fs.statx(source_subvolume.snapshot_data_path(snapname),
cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_UID | cephfs.CEPH_STATX_GID,
cephfs.AT_SYMLINK_NOFOLLOW)
uid = stx.get('uid')
gid = stx.get('gid')
stx_mode = stx.get('mode')
if stx_mode is not None:
mode = stx_mode & ~stat.S_IFMT(stx_mode)
else:
mode = None
# source snapshot attrs are used to create clone subvolume
# attributes of subvolume's content though, are synced during the cloning process.
attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
# override snapshot pool setting, if one is provided for the clone
if pool is not None:
attrs["data_pool"] = pool
attrs["pool_namespace"] = None
# create directory and set attributes
self.fs.mkdirs(subvol_path, mode)
self.set_attrs(subvol_path, None, None, pool, uid, gid)
self.fs.mkdirs(subvol_path, attrs.get("mode"))
self.set_attrs(subvol_path, attrs)
# persist subvolume metadata and clone source
qpath = subvol_path.decode('utf-8')

View File

@ -161,9 +161,14 @@ class VolumeClient(CephfsClient):
try:
with open_subvol(fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume:
# idempotent creation -- valid. Attributes set is supported.
uid = uid if uid else subvolume.uid
gid = gid if gid else subvolume.gid
subvolume.set_attrs(subvolume.path, size, isolate_nspace, pool, uid, gid)
attrs = {
'uid': uid if uid else subvolume.uid,
'gid': gid if gid else subvolume.gid,
'data_pool': pool,
'pool_namespace': subvolume.namespace if isolate_nspace else None,
'quota': size
}
subvolume.set_attrs(subvolume.path, attrs)
except VolumeException as ve:
if ve.errno == -errno.ENOENT:
self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)