Merge PR #36773 into master

* refs/pull/36773/head:
	mgr/volumes: Prevent subvolume recreate if trash is not-empty
	mgr/volumes: Disallow subvolume group level snapshots
	mgr/volumes: Add test case to ensure subvolume is marked
	mgr/volumes: handle idempotent subvolume marks
	mgr/volumes: Tests amended and added to ensure subvolume trash functionality
	mgr/volumes: Mark subvolume root with the vxattr ceph.dir.subvolume
	mgr/volumes: Move incarnations for v2 subvolumes, to subvolume trash
	mgr/volumes: maintain per subvolume trash directory
	mgr/volumes: make subvolume_v2::_is_retained() object property

Reviewed-by: Venky Shankar <vshankar@redhat.com>
Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
Patrick Donnelly 2020-08-26 10:46:07 -07:00
commit d2a3fcc728
No known key found for this signature in database
GPG Key ID: 3A2A7E25BEA8AADB
10 changed files with 548 additions and 65 deletions

View File

@ -91,12 +91,8 @@ List subvolume groups using::
$ ceph fs subvolumegroup ls <vol_name>
Create a snapshot (see :doc:`/cephfs/experimental-features`) of a
subvolume group using::
$ ceph fs subvolumegroup snapshot create <vol_name> <group_name> <snap_name>
This implicitly snapshots all the subvolumes under the subvolume group.
.. note:: Subvolume group snapshot feature is no longer supported in mainline CephFS (existing group
snapshots can still be listed and deleted)
Remove a snapshot of a subvolume group using::

View File

@ -15,5 +15,6 @@ overrides:
tasks:
- cephfs_test_runner:
fail_on_skip: false
modules:
- tasks.cephfs.test_volumes

View File

@ -6,7 +6,9 @@ import random
import logging
import collections
import uuid
import unittest
from hashlib import md5
from textwrap import dedent
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.exceptions import CommandFailedError
@ -334,6 +336,14 @@ class TestVolumes(CephFSTestCase):
sudo_write_file(self.mount_a.client_remote, meta_filepath1, meta_contents)
return createpath
def _update_fake_trash(self, subvol_name, subvol_group=None, trash_name='fake', create=True):
group = subvol_group if subvol_group is not None else '_nogroup'
trashpath = os.path.join("volumes", group, subvol_name, '.trash', trash_name)
if create:
self.mount_a.run_shell(['mkdir', '-p', trashpath])
else:
self.mount_a.run_shell(['rmdir', trashpath])
def setUp(self):
super(TestVolumes, self).setUp()
self.volname = None
@ -431,6 +441,42 @@ class TestVolumes(CephFSTestCase):
else:
raise RuntimeError("expected the 'fs volume rm' command to fail.")
def test_subvolume_marked(self):
"""
ensure a subvolume is marked with the ceph.dir.subvolume xattr
"""
subvolume = self._generate_random_subvolume_name()
# create subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume)
# getpath
subvolpath = self._get_subvolume_path(self.volname, subvolume)
# subdirectory of a subvolume cannot be moved outside the subvolume once marked with
# the xattr ceph.dir.subvolume, hence test by attempting to rename subvol path (incarnation)
# outside the subvolume
dstpath = os.path.join(self.mount_a.mountpoint, 'volumes', '_nogroup', 'new_subvol_location')
srcpath = os.path.join(self.mount_a.mountpoint, subvolpath)
rename_script = dedent("""
import os
import errno
try:
os.rename("{src}", "{dst}")
except OSError as e:
if e.errno != errno.EXDEV:
raise RuntimeError("invalid error code on renaming subvolume incarnation out of subvolume directory")
else:
raise RuntimeError("expected renaming subvolume incarnation out of subvolume directory to fail")
""")
self.mount_a.run_python(rename_script.format(src=srcpath, dst=dstpath))
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_volume_rm_arbitrary_pool_removal(self):
"""
That the arbitrary pool added to the volume out of band is removed
@ -526,6 +572,12 @@ class TestVolumes(CephFSTestCase):
size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
self.assertEqual(size, nsize)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolname)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_shrink(self):
"""
That a subvolume can be shrinked in size and its quota matches the expected size.
@ -548,6 +600,12 @@ class TestVolumes(CephFSTestCase):
size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
self.assertEqual(size, nsize)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolname)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_resize_fail_invalid_size(self):
"""
That a subvolume cannot be resized to an invalid size and the quota did not change
@ -567,15 +625,20 @@ class TestVolumes(CephFSTestCase):
try:
self._fs_cmd("subvolume", "resize", self.volname, subvolname, str(nsize))
except CommandFailedError as ce:
if ce.exitstatus != errno.EINVAL:
raise
self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on resize of subvolume with invalid size")
else:
raise RuntimeError("expected the 'fs subvolume resize' command to fail")
self.fail("expected the 'fs subvolume resize' command to fail")
# verify the quota did not change
size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
self.assertEqual(size, osize)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolname)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_resize_fail_zero_size(self):
"""
That a subvolume cannot be resized to a zero size and the quota did not change
@ -595,15 +658,20 @@ class TestVolumes(CephFSTestCase):
try:
self._fs_cmd("subvolume", "resize", self.volname, subvolname, str(nsize))
except CommandFailedError as ce:
if ce.exitstatus != errno.EINVAL:
raise
self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on resize of subvolume with invalid size")
else:
raise RuntimeError("expected the 'fs subvolume resize' command to fail")
self.fail("expected the 'fs subvolume resize' command to fail")
# verify the quota did not change
size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
self.assertEqual(size, osize)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolname)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_resize_quota_lt_used_size(self):
"""
That a subvolume can be resized to a size smaller than the current used size
@ -637,12 +705,18 @@ class TestVolumes(CephFSTestCase):
try:
self._fs_cmd("subvolume", "resize", self.volname, subvolname, str(nsize))
except CommandFailedError:
raise RuntimeError("expected the 'fs subvolume resize' command to succeed")
self.fail("expected the 'fs subvolume resize' command to succeed")
# verify the quota
size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
self.assertEqual(size, nsize)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolname)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_resize_fail_quota_lt_used_size_no_shrink(self):
"""
@ -677,15 +751,20 @@ class TestVolumes(CephFSTestCase):
try:
self._fs_cmd("subvolume", "resize", self.volname, subvolname, str(nsize), "--no_shrink")
except CommandFailedError as ce:
if ce.exitstatus != errno.EINVAL:
raise
self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on resize of subvolume with invalid size")
else:
raise RuntimeError("expected the 'fs subvolume resize' command to fail")
self.fail("expected the 'fs subvolume resize' command to fail")
# verify the quota did not change
size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
self.assertEqual(size, osize)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolname)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_resize_expand_on_full_subvolume(self):
"""
That the subvolume can be expanded from a full subvolume and future writes succeed.
@ -723,12 +802,18 @@ class TestVolumes(CephFSTestCase):
try:
self.mount_a.write_n_mb(os.path.join(subvolpath, filename), file_size)
except CommandFailedError:
raise RuntimeError("expected filling subvolume {0} with {1} file of size {2}MB"
self.fail("expected filling subvolume {0} with {1} file of size {2}MB"
"to succeed".format(subvolname, number_of_files, file_size))
else:
raise RuntimeError("expected filling subvolume {0} with {1} file of size {2}MB"
self.fail("expected filling subvolume {0} with {1} file of size {2}MB"
"to fail".format(subvolname, number_of_files, file_size))
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolname)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_create_idempotence(self):
# create subvolume
subvolume = self._generate_random_subvolume_name()
@ -774,6 +859,12 @@ class TestVolumes(CephFSTestCase):
self._get_subtrees(status=status, rank=1)
self._wait_subtrees([(path, 1)], status=status)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolumegroup_pin_distributed(self):
self.fs.set_max_mds(2)
status = self.fs.wait_for_daemons()
@ -789,6 +880,13 @@ class TestVolumes(CephFSTestCase):
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
self._wait_distributed_subtrees(10, status=status)
# remove subvolumes
for subvolume in subvolumes:
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_pin_random(self):
self.fs.set_max_mds(2)
self.fs.wait_for_daemons()
@ -799,6 +897,12 @@ class TestVolumes(CephFSTestCase):
self._fs_cmd("subvolume", "pin", self.volname, subvolume, "random", ".01")
# no verification
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_create_isolated_namespace(self):
"""
Create subvolume in separate rados namespace
@ -826,10 +930,12 @@ class TestVolumes(CephFSTestCase):
try:
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--pool_layout", data_pool)
except CommandFailedError as ce:
if ce.exitstatus != errno.EINVAL:
raise
self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on create of subvolume with invalid pool layout")
else:
raise RuntimeError("expected the 'fs subvolume create' command to fail")
self.fail("expected the 'fs subvolume create' command to fail")
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_rm_force(self):
# test removing non-existing subvolume with --force
@ -837,7 +943,7 @@ class TestVolumes(CephFSTestCase):
try:
self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--force")
except CommandFailedError:
raise RuntimeError("expected the 'fs subvolume rm --force' command to succeed")
self.fail("expected the 'fs subvolume rm --force' command to succeed")
def test_subvolume_create_with_auto_cleanup_on_fail(self):
subvolume = self._generate_random_subvolume_name()
@ -850,10 +956,12 @@ class TestVolumes(CephFSTestCase):
try:
self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
except CommandFailedError as ce:
if ce.exitstatus != errno.ENOENT:
raise
self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of non-existent subvolume")
else:
raise RuntimeError("expected the 'fs subvolume getpath' command to fail")
self.fail("expected the 'fs subvolume getpath' command to fail")
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_create_with_invalid_size(self):
# create subvolume with an invalid size -1
@ -861,10 +969,12 @@ class TestVolumes(CephFSTestCase):
try:
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--size", "-1")
except CommandFailedError as ce:
if ce.exitstatus != errno.EINVAL:
raise
self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on create of subvolume with invalid size")
else:
raise RuntimeError("expected the 'fs subvolume create' command to fail")
self.fail("expected the 'fs subvolume create' command to fail")
# verify trash dir is clean
self._wait_for_trash_empty()
def test_nonexistent_subvolume_rm(self):
# remove non-existing subvolume
@ -909,6 +1019,9 @@ class TestVolumes(CephFSTestCase):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_ls(self):
# tests the 'fs subvolume ls' command
@ -922,11 +1035,18 @@ class TestVolumes(CephFSTestCase):
# list subvolumes
subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
if len(subvolumels) == 0:
raise RuntimeError("Expected the 'fs subvolume ls' command to list the created subvolumes.")
self.fail("Expected the 'fs subvolume ls' command to list the created subvolumes.")
else:
subvolnames = [subvolume['name'] for subvolume in subvolumels]
if collections.Counter(subvolnames) != collections.Counter(subvolumes):
raise RuntimeError("Error creating or listing subvolumes")
self.fail("Error creating or listing subvolumes")
# remove subvolume
for subvolume in subvolumes:
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_ls_for_notexistent_default_group(self):
# tests the 'fs subvolume ls' command when the default group '_nogroup' doesn't exist
@ -959,6 +1079,12 @@ class TestVolumes(CephFSTestCase):
size = self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes")
self.assertEqual(size, None)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolname)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_resize_infinite_size_future_writes(self):
"""
That a subvolume can be resized to an infinite size and the future writes succeed.
@ -991,9 +1117,15 @@ class TestVolumes(CephFSTestCase):
try:
self.mount_a.write_n_mb(os.path.join(subvolpath, filename), file_size)
except CommandFailedError:
raise RuntimeError("expected filling subvolume {0} with {1} file of size {2}MB "
self.fail("expected filling subvolume {0} with {1} file of size {2}MB "
"to succeed".format(subvolname, number_of_files, file_size))
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolname)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_info(self):
# tests the 'fs subvolume info' command
@ -1203,6 +1335,9 @@ class TestVolumes(CephFSTestCase):
self._fs_cmd("subvolume", "rm", self.volname, subvol1, group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_group_create_with_desired_mode(self):
group1, group2 = self._generate_random_group_name(2)
# default mode
@ -1286,6 +1421,9 @@ class TestVolumes(CephFSTestCase):
self._fs_cmd("subvolume", "rm", self.volname, subvol3, group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_create_with_desired_uid_gid(self):
"""
That the subvolume can be created with the desired uid and gid and its uid and gid matches the
@ -1311,6 +1449,9 @@ class TestVolumes(CephFSTestCase):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolname)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_nonexistent_subvolume_group_rm(self):
group = "non_existent_group"
@ -1533,12 +1674,41 @@ class TestVolumes(CephFSTestCase):
subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
if len(subvolsnapshotls) == 0:
raise RuntimeError("Expected the 'fs subvolume snapshot ls' command to list the created subvolume snapshots")
self.fail("Expected the 'fs subvolume snapshot ls' command to list the created subvolume snapshots")
else:
snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
if collections.Counter(snapshotnames) != collections.Counter(snapshots):
raise RuntimeError("Error creating or listing subvolume snapshots")
self.fail("Error creating or listing subvolume snapshots")
# remove snapshot
for snapshot in snapshots:
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_group_snapshot_unsupported_status(self):
group = self._generate_random_group_name()
snapshot = self._generate_random_snapshot_name()
# create group
self._fs_cmd("subvolumegroup", "create", self.volname, group)
# snapshot group
try:
self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.ENOSYS, "invalid error code on subvolumegroup snapshot create")
else:
self.fail("expected subvolumegroup snapshot create command to fail")
# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
@unittest.skip("skipping subvolumegroup snapshot tests")
def test_subvolume_group_snapshot_create_and_rm(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
@ -1565,6 +1735,7 @@ class TestVolumes(CephFSTestCase):
# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
@unittest.skip("skipping subvolumegroup snapshot tests")
def test_subvolume_group_snapshot_idempotence(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
@ -1594,6 +1765,7 @@ class TestVolumes(CephFSTestCase):
# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
@unittest.skip("skipping subvolumegroup snapshot tests")
def test_nonexistent_subvolume_group_snapshot_rm(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
@ -1629,6 +1801,7 @@ class TestVolumes(CephFSTestCase):
# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
@unittest.skip("skipping subvolumegroup snapshot tests")
def test_subvolume_group_snapshot_rm_force(self):
# test removing non-existing subvolume group snapshot with --force
group = self._generate_random_group_name()
@ -1639,6 +1812,7 @@ class TestVolumes(CephFSTestCase):
except CommandFailedError:
raise RuntimeError("expected the 'fs subvolumegroup snapshot rm --force' command to succeed")
@unittest.skip("skipping subvolumegroup snapshot tests")
def test_subvolume_group_snapshot_ls(self):
# tests the 'fs subvolumegroup snapshot ls' command
@ -2091,6 +2265,149 @@ class TestVolumes(CephFSTestCase):
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_retain_snapshot_invalid_recreate(self):
"""
ensure retained subvolume recreate does not leave any incarnations in the subvolume and trash
"""
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
# create subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume)
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
# remove with snapshot retention
self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
# recreate subvolume with an invalid pool
data_pool = "invalid_pool"
try:
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--pool_layout", data_pool)
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on recreate of subvolume with invalid poolname")
else:
self.fail("expected recreate of subvolume with invalid poolname to fail")
# fetch info
subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
self.assertEqual(subvol_info["state"], "snapshot-retained",
msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
# getpath
try:
self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
else:
self.fail("expected getpath of subvolume with retained snapshots to fail")
# remove snapshot (should remove volume)
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_retain_snapshot_trash_busy_recreate(self):
"""
ensure retained subvolume recreate fails if its trash is not yet purged
"""
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
# create subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume)
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
# remove with snapshot retention
self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
# fake a trash entry
self._update_fake_trash(subvolume)
# recreate subvolume
try:
self._fs_cmd("subvolume", "create", self.volname, subvolume)
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on recreate of subvolume with purge pending")
else:
self.fail("expected recreate of subvolume with purge pending to fail")
# clear fake trash entry
self._update_fake_trash(subvolume, create=False)
# recreate subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_retain_snapshot_trash_busy_recreate_clone(self):
"""
ensure retained clone recreate fails if its trash is not yet purged
"""
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
clone = self._generate_random_clone_name()
# create subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume)
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
# clone subvolume snapshot
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
# check clone status
self._wait_for_clone_to_complete(clone)
# snapshot clone
self._fs_cmd("subvolume", "snapshot", "create", self.volname, clone, snapshot)
# remove clone with snapshot retention
self._fs_cmd("subvolume", "rm", self.volname, clone, "--retain-snapshots")
# fake a trash entry
self._update_fake_trash(clone)
# clone subvolume snapshot (recreate)
try:
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
except CommandFailedError as ce:
self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on recreate of clone with purge pending")
else:
self.fail("expected recreate of clone with purge pending to fail")
# clear fake trash entry
self._update_fake_trash(clone, create=False)
# recreate subvolume
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
# check clone status
self._wait_for_clone_to_complete(clone)
# remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, clone, snapshot)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
self._fs_cmd("subvolume", "rm", self.volname, clone)
# verify trash dir is clean
self._wait_for_trash_empty()
def test_subvolume_retain_snapshot_recreate_subvolume(self):
"""
ensure a retained subvolume can be recreated and further snapshotted

View File

@ -15,3 +15,12 @@ def resolve(vol_spec, path):
groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
subvolname = parts[3]
return (groupname, subvolname)
def resolve_trash(vol_spec, path):
parts = splitall(path)
if len(parts) != 6 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix or \
parts[4] != '.trash':
return None
groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
subvolname = parts[3]
return (groupname, subvolname)

View File

@ -36,7 +36,7 @@ class Trash(GroupTemplate):
with self.fs.opendir(self.path) as d:
entry = self.fs.readdir(d)
while entry:
if entry.d_name not in exclude_list and entry.is_dir():
if entry.d_name not in exclude_list:
return entry.d_name
entry = self.fs.readdir(d)
return None
@ -52,7 +52,7 @@ class Trash(GroupTemplate):
"""
return self._get_single_dir_entry(exclude_list)
def purge(self, trash_entry, should_cancel):
def purge(self, trashpath, should_cancel):
"""
purge a trash entry.
@ -82,7 +82,6 @@ class Trash(GroupTemplate):
if not should_cancel():
self.fs.rmdir(root_path)
trashpath = os.path.join(self.path, trash_entry)
# catch any unlink errors
try:
rmtree(trashpath)
@ -101,6 +100,20 @@ class Trash(GroupTemplate):
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
def link(self, path, bname):
pth = os.path.join(self.path, bname)
try:
self.fs.symlink(path, pth)
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
def delink(self, bname):
pth = os.path.join(self.path, bname)
try:
self.fs.unlink(pth)
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
def create_trashcan(fs, vol_spec):
"""
create a trash can.

View File

@ -114,6 +114,11 @@ class SubvolumeBase(object):
def subvol_type(self):
return SubvolumeTypes.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_TYPE))
@property
def purgeable(self):
""" Boolean declaring if subvolume can be purged """
raise NotImplementedError
def load_config(self):
if self.legacy_mode:
self.metadata_mgr = MetadataManager(self.fs, self.legacy_config_path, 0o640)
@ -281,6 +286,12 @@ class SubvolumeBase(object):
trashcan.dump(path)
log.info("subvolume path '{0}' moved to trashcan".format(path))
def _link_dir(self, path, bname):
create_trashcan(self.fs, self.vol_spec)
with open_trashcan(self.fs, self.vol_spec) as trashcan:
trashcan.link(path, bname)
log.info("subvolume path '{0}' linked in trashcan bname {1}".format(path, bname))
def trash_base_dir(self):
if self.legacy_mode:
self.fs.unlink(self.legacy_config_path)

View File

@ -54,6 +54,17 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
def features(self):
return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value]
def mark_subvolume(self):
# set subvolume attr, on subvolume root, marking it as a CephFS subvolume
# subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes
try:
# MDS treats this as a noop for already marked subvolume
self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0)
except cephfs.InvalidValue as e:
raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
def snapshot_base_path(self):
""" Base path for all snapshots """
return os.path.join(self.path, self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
@ -77,6 +88,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
try:
# create directory and set attributes
self.fs.mkdirs(subvol_path, mode)
self.mark_subvolume()
attrs = {
'uid': uid,
'gid': gid,
@ -138,6 +150,7 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
# create directory and set attributes
self.fs.mkdirs(subvol_path, attrs.get("mode"))
self.mark_subvolume()
self.set_attrs(subvol_path, attrs)
# persist subvolume metadata and clone source
@ -200,6 +213,8 @@ class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
subvol_path = self.path
log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
st = self.fs.stat(subvol_path)
# unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
self.mark_subvolume()
self.uid = int(st.st_uid)
self.gid = int(st.st_gid)

View File

@ -12,6 +12,7 @@ from .op_sm import SubvolumeOpSm
from .subvolume_v1 import SubvolumeV1
from ..template import SubvolumeTemplate
from ...exception import OpSmException, VolumeException, MetadataMgrException
from ...fs_util import listdir
from ..template import SubvolumeOpType
log = logging.getLogger(__name__)
@ -48,6 +49,61 @@ class SubvolumeV2(SubvolumeV1):
SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value,
SubvolumeFeatures.FEATURE_SNAPSHOT_RETENTION.value]
@property
def retained(self):
try:
self.metadata_mgr.refresh()
if self.state == SubvolumeStates.STATE_RETAINED:
return True
return False
except MetadataMgrException as me:
if me.errno != -errno.ENOENT:
raise VolumeException(me.errno, "internal error while processing subvolume '{0}'".format(self.subvolname))
return False
@property
def purgeable(self):
if not self.retained or self.list_snapshots() or self.has_pending_purges:
return False
return True
@property
def has_pending_purges(self):
try:
return not listdir(self.fs, self.trash_dir) == []
except VolumeException as ve:
if ve.errno == -errno.ENOENT:
return False
raise
@property
def trash_dir(self):
return os.path.join(self.base_path, b".trash")
def create_trashcan(self):
"""per subvolume trash directory"""
try:
self.fs.stat(self.trash_dir)
except cephfs.Error as e:
if e.args[0] == errno.ENOENT:
try:
self.fs.mkdir(self.trash_dir, 0o700)
except cephfs.Error as ce:
raise VolumeException(-ce.args[0], ce.args[1])
else:
raise VolumeException(-e.args[0], e.args[1])
def mark_subvolume(self):
# set subvolume attr, on subvolume root, marking it as a CephFS subvolume
# subvolume root is where snapshots would be taken, and hence is the base_path for v2 subvolumes
try:
# MDS treats this as a noop for already marked subvolume
self.fs.setxattr(self.base_path, 'ceph.dir.subvolume', b'1', 0)
except cephfs.InvalidValue as e:
raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
@staticmethod
def is_valid_uuid(uuid_str):
try:
@ -83,22 +139,13 @@ class SubvolumeV2(SubvolumeV1):
return os.path.join(snap_base_path, uuid_str)
def _is_retained(self):
try:
self.metadata_mgr.refresh()
if self.state == SubvolumeStates.STATE_RETAINED:
return True
else:
raise VolumeException(-errno.EINVAL, "invalid state for subvolume '{0}' during create".format(self.subvolname))
except MetadataMgrException as me:
if me.errno != -errno.ENOENT:
raise VolumeException(me.errno, "internal error while processing subvolume '{0}'".format(self.subvolname))
return False
def _remove_on_failure(self, subvol_path, retained):
if retained:
log.info("cleaning up subvolume incarnation with path: {0}".format(subvol_path))
self._trash_dir(subvol_path)
try:
self.fs.rmdir(subvol_path)
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
else:
log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
self.remove()
@ -115,10 +162,13 @@ class SubvolumeV2(SubvolumeV1):
except OpSmException as oe:
raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
retained = self._is_retained()
retained = self.retained
if retained and self.has_pending_purges:
raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress")
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
try:
self.fs.mkdirs(subvol_path, mode)
self.mark_subvolume()
attrs = {
'uid': uid,
'gid': gid,
@ -155,7 +205,9 @@ class SubvolumeV2(SubvolumeV1):
except OpSmException as oe:
raise VolumeException(-errno.EINVAL, "clone failed: internal error")
retained = self._is_retained()
retained = self.retained
if retained and self.has_pending_purges:
raise VolumeException(-errno.EAGAIN, "asynchronous purge of subvolume in progress")
subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
try:
# source snapshot attrs are used to create clone subvolume
@ -169,6 +221,7 @@ class SubvolumeV2(SubvolumeV1):
# create directory and set attributes
self.fs.mkdirs(subvol_path, attrs.get("mode"))
self.mark_subvolume()
self.set_attrs(subvol_path, attrs)
# persist subvolume metadata and clone source
@ -234,6 +287,8 @@ class SubvolumeV2(SubvolumeV1):
op_type.value, self.subvolname))
try:
self.metadata_mgr.refresh()
# unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
self.mark_subvolume()
etype = self.subvol_type
if op_type not in self.allowed_ops_by_type(etype):
@ -268,19 +323,30 @@ class SubvolumeV2(SubvolumeV1):
raise VolumeException(-e.args[0], e.args[1])
def trash_incarnation_dir(self):
self._trash_dir(self.path)
"""rename subvolume (uuid component) to trash"""
self.create_trashcan()
try:
bname = os.path.basename(self.path)
tpath = os.path.join(self.trash_dir, bname)
log.debug("trash: {0} -> {1}".format(self.path, tpath))
self.fs.rename(self.path, tpath)
self._link_dir(tpath, bname)
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
def remove(self, retainsnaps=False):
if self.list_snapshots():
if not retainsnaps:
raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
if self.state != SubvolumeStates.STATE_RETAINED:
self.trash_incarnation_dir()
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
self.metadata_mgr.flush()
else:
self.trash_base_dir()
if not self.has_pending_purges:
self.trash_base_dir()
return
if self.state != SubvolumeStates.STATE_RETAINED:
self.trash_incarnation_dir()
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
self.metadata_mgr.flush()
def info(self):
if self.state != SubvolumeStates.STATE_RETAINED:
@ -290,7 +356,8 @@ class SubvolumeV2(SubvolumeV1):
def remove_snapshot(self, snapname):
super(SubvolumeV2, self).remove_snapshot(snapname)
if self.state == SubvolumeStates.STATE_RETAINED and not self.list_snapshots():
if self.purgeable:
self.trash_base_dir()
# tickle the volume purge job to purge this entry, using ESTALE
raise VolumeException(-errno.ESTALE, "subvolume '{0}' has been removed as the last retained snapshot is removed".format(self.subvolname))
# if not purgeable, subvol is not retained, or has snapshots, or already has purge jobs that will garbage collect this subvol

View File

@ -1,8 +1,16 @@
import errno
import logging
import os
import stat
import cephfs
from .async_job import AsyncJobs
from .exception import VolumeException
from .operations.resolver import resolve_trash
from .operations.template import SubvolumeOpType
from .operations.group import open_group
from .operations.subvolume import open_subvol
from .operations.volume import open_volume, open_volume_lockless
from .operations.trash import open_trashcan
@ -26,15 +34,58 @@ def get_trash_entry_for_volume(volume_client, volname, running_jobs):
log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
return ve.errno, None
def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, should_cancel):
groupname, subvolname = resolve_trash(volume_client.volspec, subvolume_trash_entry.decode('utf-8'))
log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
try:
with open_volume(volume_client, volname) as fs_handle:
with open_group(fs_handle, volume_client.volspec, groupname) as group:
with open_subvol(fs_handle, volume_client.volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable))
if not subvolume.purgeable:
return
# this is fine under the global lock -- there are just a handful
# of entries in the subvolume to purge. moreover, the purge needs
# to be guarded since a create request might sneak in.
trashcan.purge(subvolume.base_path, should_cancel)
except VolumeException as ve:
if not ve.errno == -errno.ENOENT:
raise
# helper for starting a purge operation on a trash entry
def purge_trash_entry_for_volume(volume_client, volname, purge_dir, should_cancel):
log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))
def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_cancel):
log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
ret = 0
try:
with open_volume_lockless(volume_client, volname) as fs_handle:
with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
trashcan.purge(purge_dir, should_cancel)
try:
pth = os.path.join(trashcan.path, purge_entry)
stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
cephfs.AT_SYMLINK_NOFOLLOW)
if stat.S_ISLNK(stx['mode']):
tgt = fs_handle.readlink(pth, 4096)
tgt = tgt[:stx['size']]
log.debug("purging entry pointing to subvolume trash: {0}".format(tgt))
delink = True
try:
trashcan.purge(tgt, should_cancel)
except VolumeException as ve:
if not ve.errno == -errno.ENOENT:
delink = False
return ve.errno
finally:
if delink:
subvolume_purge(volume_client, volname, trashcan, tgt, should_cancel)
log.debug("purging trash link: {0}".format(purge_entry))
trashcan.delink(purge_entry)
else:
log.debug("purging entry pointing to trash: {0}".format(pth))
trashcan.purge(pth, should_cancel)
except cephfs.Error as e:
log.warn("failed to remove trash entry: {0}".format(e))
except VolumeException as ve:
ret = ve.errno
return ret

View File

@ -569,15 +569,18 @@ class VolumeClient(CephfsClient):
### group snapshot
def create_subvolume_group_snapshot(self, **kwargs):
ret = 0, "", ""
ret = -errno.ENOSYS, "", "subvolume group snapshots are not supported"
volname = kwargs['vol_name']
groupname = kwargs['group_name']
snapname = kwargs['snap_name']
# snapname = kwargs['snap_name']
try:
with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, groupname) as group:
group.create_snapshot(snapname)
# as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
# at the subvolume group (see: https://tracker.ceph.com/issues/46074)
# group.create_snapshot(snapname)
pass
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret