mirror of
https://github.com/ceph/ceph
synced 2025-01-03 01:22:53 +00:00
Merge PR #52670 into main
* refs/pull/52670/head: doc: add the reject the clone when threads are not available feature in the document qa: add test cases for the support to reject clones feature mgr/volumes: support to reject CephFS clones if cloner threads are not available Reviewed-by: Kotresh Hiremath Ravishankar <khiremat@redhat.com> Reviewed-by: Venky Shankar <vshankar@redhat.com>
This commit is contained in:
commit
435306d9b2
@ -140,6 +140,15 @@ CephFS: Disallow delegating preallocated inode ranges to clients. Config
|
||||
isn't scalable. So we have removed the 'network_ping_times' section from
|
||||
the output. Details in the tracker: https://tracker.ceph.com/issues/57460
|
||||
|
||||
* CephFS: The `subvolume snapshot clone` command now depends on the config option
|
||||
`snapshot_clone_no_wait` which is used to reject the clone operation when
|
||||
all the cloner threads are busy. This config option is enabled by default which means
|
||||
that if no cloner threads are free, the clone request errors out with EAGAIN.
|
||||
The value of the config option can be fetched by using:
|
||||
`ceph config get mgr mgr/volumes/snapshot_clone_no_wait`
|
||||
and it can be disabled by using:
|
||||
`ceph config set mgr mgr/volumes/snapshot_clone_no_wait false`
|
||||
|
||||
>=18.0.0
|
||||
|
||||
* The RGW policy parser now rejects unknown principals by default. If you are
|
||||
|
@ -579,6 +579,8 @@ To initiate a clone operation use:
|
||||
|
||||
ceph fs subvolume snapshot clone <vol_name> <subvol_name> <snap_name> <target_subvol_name>
|
||||
|
||||
.. note:: ``subvolume snapshot clone`` command depends upon the above mentioned config option ``snapshot_clone_no_wait``
|
||||
|
||||
If a snapshot (source subvolume) is a part of non-default group, the group name needs to be specified:
|
||||
|
||||
.. prompt:: bash #
|
||||
@ -597,12 +599,6 @@ Similar to specifying a pool layout when creating a subvolume, pool layout can b
|
||||
|
||||
ceph fs subvolume snapshot clone <vol_name> <subvol_name> <snap_name> <target_subvol_name> --pool_layout <pool_layout>
|
||||
|
||||
Configure the maximum number of concurrent clones. The default is 4:
|
||||
|
||||
.. prompt:: bash #
|
||||
|
||||
ceph config set mgr mgr/volumes/max_concurrent_clones <value>
|
||||
|
||||
To check the status of a clone operation use:
|
||||
|
||||
.. prompt:: bash #
|
||||
@ -728,6 +724,29 @@ On successful cancellation, the cloned subvolume is moved to the ``canceled`` st
|
||||
|
||||
.. note:: The canceled cloned may be deleted by supplying the ``--force`` option to the `fs subvolume rm` command.
|
||||
|
||||
Configurables
|
||||
~~~~~~~~~~~~~
|
||||
|
||||
Configure the maximum number of concurrent clone operations. The default is 4:
|
||||
|
||||
.. prompt:: bash #
|
||||
|
||||
ceph config set mgr mgr/volumes/max_concurrent_clones <value>
|
||||
|
||||
Configure the snapshot_clone_no_wait option :
|
||||
|
||||
.. prompt:: bash #
|
||||
|
||||
``snapshot_clone_no_wait`` config option is used to reject the clone creation request when the cloner threads
|
||||
( which can be configured using above option i.e. ``max_concurrent_clones``) are not available.
|
||||
It is enabled by default i.e. the value set is True, whereas it can be configured by using below command.
|
||||
|
||||
ceph config set mgr mgr/volumes/snapshot_clone_no_wait <bool>
|
||||
|
||||
The current value of ``snapshot_clone_no_wait`` can be fetched by using below command.
|
||||
|
||||
ceph config get mgr mgr/volumes/snapshot_clone_no_wait
|
||||
|
||||
|
||||
.. _subvol-pinning:
|
||||
|
||||
|
@ -7000,6 +7000,11 @@ class TestSubvolumeSnapshotClones(TestVolumesHelper):
|
||||
# snapshot subvolume
|
||||
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
|
||||
|
||||
# Disable the snapshot_clone_no_wait config option
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_no_wait', False)
|
||||
threads_available = self.config_get('mgr', 'mgr/volumes/snapshot_clone_no_wait')
|
||||
self.assertEqual(threads_available, 'false')
|
||||
|
||||
# schedule clones
|
||||
for clone in clones:
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
|
||||
@ -7485,6 +7490,159 @@ class TestSubvolumeSnapshotClones(TestVolumesHelper):
|
||||
# verify trash dir is clean
|
||||
self._wait_for_trash_empty()
|
||||
|
||||
def test_subvolume_snapshot_clone_with_no_wait_enabled(self):
|
||||
subvolume = self._gen_subvol_name()
|
||||
snapshot = self._gen_subvol_snap_name()
|
||||
clone1, clone2, clone3 = self._gen_subvol_clone_name(3)
|
||||
|
||||
# create subvolume
|
||||
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
|
||||
|
||||
# do some IO
|
||||
self._do_subvolume_io(subvolume, number_of_files=10)
|
||||
|
||||
# snapshot subvolume
|
||||
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
|
||||
|
||||
# Decrease number of cloner threads
|
||||
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
|
||||
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
|
||||
self.assertEqual(max_concurrent_clones, 2)
|
||||
|
||||
# Enable the snapshot_clone_no_wait config option
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_no_wait', True)
|
||||
threads_available = self.config_get('mgr', 'mgr/volumes/snapshot_clone_no_wait')
|
||||
self.assertEqual(threads_available, 'true')
|
||||
|
||||
# Insert delay of 15 seconds at the beginning of the snapshot clone
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 15)
|
||||
|
||||
# schedule a clone1
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
|
||||
|
||||
# schedule a clone2
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone2)
|
||||
|
||||
# schedule a clone3
|
||||
cmd_ret = self.run_ceph_cmd(
|
||||
args=["fs", "subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone3], check_status=False, stdout=StringIO(),
|
||||
stderr=StringIO())
|
||||
self.assertEqual(cmd_ret.returncode, errno.EAGAIN, "Expecting EAGAIN error")
|
||||
|
||||
# check clone1 status
|
||||
self._wait_for_clone_to_complete(clone1)
|
||||
|
||||
# verify clone1
|
||||
self._verify_clone(subvolume, snapshot, clone1)
|
||||
|
||||
# check clone2 status
|
||||
self._wait_for_clone_to_complete(clone2)
|
||||
|
||||
# verify clone2
|
||||
self._verify_clone(subvolume, snapshot, clone2)
|
||||
|
||||
# schedule clone3 , it should be successful this time
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone3)
|
||||
|
||||
# check clone3 status
|
||||
self._wait_for_clone_to_complete(clone3)
|
||||
|
||||
# verify clone3
|
||||
self._verify_clone(subvolume, snapshot, clone3)
|
||||
|
||||
# set number of cloner threads to default
|
||||
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 4)
|
||||
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
|
||||
self.assertEqual(max_concurrent_clones, 4)
|
||||
|
||||
# set the snapshot_clone_delay to default
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 0)
|
||||
|
||||
# remove snapshot
|
||||
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
|
||||
|
||||
# remove subvolumes
|
||||
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
|
||||
self._fs_cmd("subvolume", "rm", self.volname, clone1)
|
||||
self._fs_cmd("subvolume", "rm", self.volname, clone2)
|
||||
self._fs_cmd("subvolume", "rm", self.volname, clone3)
|
||||
|
||||
# verify trash dir is clean
|
||||
self._wait_for_trash_empty()
|
||||
|
||||
def test_subvolume_snapshot_clone_with_no_wait_not_enabled(self):
|
||||
subvolume = self._gen_subvol_name()
|
||||
snapshot = self._gen_subvol_snap_name()
|
||||
clone1, clone2, clone3 = self._gen_subvol_clone_name(3)
|
||||
|
||||
# create subvolume
|
||||
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
|
||||
|
||||
# do some IO
|
||||
self._do_subvolume_io(subvolume, number_of_files=10)
|
||||
|
||||
# snapshot subvolume
|
||||
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
|
||||
|
||||
# Disable the snapshot_clone_no_wait config option
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_no_wait', False)
|
||||
threads_available = self.config_get('mgr', 'mgr/volumes/snapshot_clone_no_wait')
|
||||
self.assertEqual(threads_available, 'false')
|
||||
|
||||
# Decrease number of cloner threads
|
||||
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
|
||||
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
|
||||
self.assertEqual(max_concurrent_clones, 2)
|
||||
|
||||
# schedule a clone1
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
|
||||
|
||||
# schedule a clone2
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone2)
|
||||
|
||||
# schedule a clone3
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone3)
|
||||
|
||||
# check clone1 status
|
||||
self._wait_for_clone_to_complete(clone1)
|
||||
|
||||
# verify clone1
|
||||
self._verify_clone(subvolume, snapshot, clone1)
|
||||
|
||||
# check clone2 status
|
||||
self._wait_for_clone_to_complete(clone2)
|
||||
|
||||
# verify clone2
|
||||
self._verify_clone(subvolume, snapshot, clone2)
|
||||
|
||||
# check clone3 status
|
||||
self._wait_for_clone_to_complete(clone3)
|
||||
|
||||
# verify clone3
|
||||
self._verify_clone(subvolume, snapshot, clone3)
|
||||
|
||||
# set the snapshot_clone_no_wait config option to default
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_no_wait', True)
|
||||
threads_available = self.config_get('mgr', 'mgr/volumes/snapshot_clone_no_wait')
|
||||
self.assertEqual(threads_available, 'true')
|
||||
|
||||
# set number of cloner threads to default
|
||||
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 4)
|
||||
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
|
||||
self.assertEqual(max_concurrent_clones, 4)
|
||||
|
||||
# remove snapshot
|
||||
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
|
||||
|
||||
# remove subvolumes
|
||||
self._fs_cmd("subvolume", "rm", self.volname, subvolume)
|
||||
self._fs_cmd("subvolume", "rm", self.volname, clone1)
|
||||
self._fs_cmd("subvolume", "rm", self.volname, clone2)
|
||||
self._fs_cmd("subvolume", "rm", self.volname, clone3)
|
||||
|
||||
# verify trash dir is clean
|
||||
self._wait_for_trash_empty()
|
||||
|
||||
|
||||
class TestMisc(TestVolumesHelper):
|
||||
"""Miscellaneous tests related to FS volume, subvolume group, and subvolume operations."""
|
||||
|
@ -59,6 +59,9 @@ ceph fs subvolume snapshot create cephfs sub_0 snap_0
|
||||
# Set clone snapshot delay
|
||||
ceph config set mgr mgr/volumes/snapshot_clone_delay 15
|
||||
|
||||
# Disable the snapshot_clone_no_wait config option
|
||||
ceph config set mgr mgr/volumes/snapshot_clone_no_wait false
|
||||
|
||||
# Schedule few clones, some would fail with no space
|
||||
for i in $(eval echo {1..$NUM_CLONES});do ceph fs subvolume snapshot clone cephfs sub_0 snap_0 clone_$i;done
|
||||
|
||||
|
@ -337,9 +337,10 @@ class Cloner(AsyncJobs):
|
||||
this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
|
||||
the driver. file types supported are directories, symbolic links and regular files.
|
||||
"""
|
||||
def __init__(self, volume_client, tp_size, snapshot_clone_delay):
|
||||
def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait):
|
||||
self.vc = volume_client
|
||||
self.snapshot_clone_delay = snapshot_clone_delay
|
||||
self.snapshot_clone_no_wait = clone_no_wait
|
||||
self.state_table = {
|
||||
SubvolumeStates.STATE_PENDING : handle_clone_pending,
|
||||
SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
|
||||
@ -355,6 +356,9 @@ class Cloner(AsyncJobs):
|
||||
def reconfigure_snapshot_clone_delay(self, timeout):
|
||||
self.snapshot_clone_delay = timeout
|
||||
|
||||
def reconfigure_reject_clones(self, clone_no_wait):
|
||||
self.snapshot_clone_no_wait = clone_no_wait
|
||||
|
||||
def is_clone_cancelable(self, clone_state):
|
||||
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
|
||||
|
||||
|
@ -9,11 +9,12 @@ from contextlib import contextmanager
|
||||
import orchestrator
|
||||
|
||||
from .lock import GlobalLock
|
||||
from ..exception import VolumeException
|
||||
from ..exception import VolumeException, IndexException
|
||||
from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \
|
||||
remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir
|
||||
from .trash import Trash
|
||||
from mgr_util import open_filesystem, CephfsConnectionException
|
||||
from .clone_index import open_clone_index
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@ -260,6 +261,30 @@ def get_pending_subvol_deletions_count(fs, path):
|
||||
return {'pending_subvolume_deletions': num_pending_subvol_del}
|
||||
|
||||
|
||||
def get_all_pending_clones_count(self, mgr, vol_spec):
|
||||
pending_clones_cnt = 0
|
||||
index_path = ""
|
||||
fs_map = mgr.get('fs_map')
|
||||
for fs in fs_map['filesystems']:
|
||||
volname = fs['mdsmap']['fs_name']
|
||||
try:
|
||||
with open_volume(self, volname) as fs_handle:
|
||||
with open_clone_index(fs_handle, vol_spec) as index:
|
||||
index_path = index.path.decode('utf-8')
|
||||
pending_clones_cnt = pending_clones_cnt \
|
||||
+ len(listdir(fs_handle, index_path,
|
||||
filter_entries=None, filter_files=False))
|
||||
except IndexException as e:
|
||||
if e.errno == -errno.ENOENT:
|
||||
continue
|
||||
raise VolumeException(-e.args[0], e.args[1])
|
||||
except VolumeException as ve:
|
||||
log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
|
||||
raise ve
|
||||
|
||||
return pending_clones_cnt
|
||||
|
||||
|
||||
@contextmanager
|
||||
def open_volume(vc, volname):
|
||||
"""
|
||||
|
@ -13,12 +13,14 @@ from .fs_util import listdir, has_subdir
|
||||
from .operations.group import open_group, create_group, remove_group, \
|
||||
open_group_unique, set_group_attrs
|
||||
from .operations.volume import create_volume, delete_volume, rename_volume, \
|
||||
list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
|
||||
list_volumes, open_volume, get_pool_names, get_pool_ids, \
|
||||
get_pending_subvol_deletions_count, get_all_pending_clones_count
|
||||
from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
|
||||
create_clone
|
||||
|
||||
from .vol_spec import VolSpec
|
||||
from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
|
||||
from .exception import VolumeException, ClusterError, ClusterTimeout, \
|
||||
EvictionError, IndexException
|
||||
from .async_cloner import Cloner
|
||||
from .purge_queue import ThreadPoolPurgeQueueMixin
|
||||
from .operations.template import SubvolumeOpType
|
||||
@ -53,7 +55,8 @@ class VolumeClient(CephfsClient["Module"]):
|
||||
super().__init__(mgr)
|
||||
# volume specification
|
||||
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
|
||||
self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
|
||||
self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay,
|
||||
self.mgr.snapshot_clone_no_wait)
|
||||
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
|
||||
# on startup, queue purge job for available volumes to kickstart
|
||||
# purge for leftover subvolume entries in trash. note that, if the
|
||||
@ -764,6 +767,10 @@ class VolumeClient(CephfsClient["Module"]):
|
||||
s_groupname = kwargs['group_name']
|
||||
|
||||
try:
|
||||
if self.mgr.snapshot_clone_no_wait and \
|
||||
get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones:
|
||||
raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later"))
|
||||
|
||||
with open_volume(self, volname) as fs_handle:
|
||||
with open_group(fs_handle, self.volspec, s_groupname) as s_group:
|
||||
with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
|
||||
|
@ -489,7 +489,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
'periodic_async_work',
|
||||
type='bool',
|
||||
default=False,
|
||||
desc='Periodically check for async work')
|
||||
desc='Periodically check for async work'),
|
||||
Option(
|
||||
'snapshot_clone_no_wait',
|
||||
type='bool',
|
||||
default=True,
|
||||
desc='Reject subvolume clone request when cloner threads are busy')
|
||||
]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
@ -498,6 +503,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
self.max_concurrent_clones = None
|
||||
self.snapshot_clone_delay = None
|
||||
self.periodic_async_work = False
|
||||
self.snapshot_clone_no_wait = None
|
||||
self.lock = threading.Lock()
|
||||
super(Module, self).__init__(*args, **kwargs)
|
||||
# Initialize config option members
|
||||
@ -532,6 +538,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
else:
|
||||
self.vc.cloner.unset_wakeup_timeout()
|
||||
self.vc.purge_queue.unset_wakeup_timeout()
|
||||
elif opt['name'] == "snapshot_clone_no_wait":
|
||||
self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait)
|
||||
|
||||
def handle_command(self, inbuf, cmd):
|
||||
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
|
||||
|
Loading…
Reference in New Issue
Block a user