mirror of
https://github.com/ceph/ceph
synced 2025-01-03 01:22:53 +00:00
mgr/volumes: Add config to insert delay at the beginning of the clone
Added the config 'delay_snapshot_clone' to insert delay at the beginning of the clone to avoid races in tests. The default value is set to 0. Fixes: https://tracker.ceph.com/issues/48231 Signed-off-by: Kotresh HR <khiremat@redhat.com>
This commit is contained in:
parent
f6b24ece91
commit
7588f98505
@ -3226,6 +3226,9 @@ class TestSubvolumeSnapshotClones(TestVolumesHelper):
|
||||
# snapshot subvolume
|
||||
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
|
||||
|
||||
# Insert delay at the beginning of snapshot clone
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
|
||||
|
||||
# schedule a clone
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
|
||||
|
||||
@ -3272,6 +3275,9 @@ class TestSubvolumeSnapshotClones(TestVolumesHelper):
|
||||
# snapshot subvolume
|
||||
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
|
||||
|
||||
# Insert delay at the beginning of snapshot clone
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
|
||||
|
||||
# schedule a clone
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
|
||||
|
||||
@ -3317,6 +3323,9 @@ class TestSubvolumeSnapshotClones(TestVolumesHelper):
|
||||
# snapshot subvolume
|
||||
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
|
||||
|
||||
# Insert delay at the beginning of snapshot clone
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
|
||||
|
||||
# schedule a clone
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
|
||||
|
||||
@ -3801,6 +3810,9 @@ class TestSubvolumeSnapshotClones(TestVolumesHelper):
|
||||
# snapshot subvolume
|
||||
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
|
||||
|
||||
# Insert delay at the beginning of snapshot clone
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
|
||||
|
||||
# schedule a clone
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
|
||||
|
||||
@ -4200,6 +4212,9 @@ class TestSubvolumeSnapshotClones(TestVolumesHelper):
|
||||
# ensure metadata file is in legacy location, with required version v1
|
||||
self._assert_meta_location_and_version(self.volname, subvolume, version=1, legacy=True)
|
||||
|
||||
# Insert delay at the beginning of snapshot clone
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
|
||||
|
||||
# schedule a clone
|
||||
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
|
||||
|
||||
@ -4249,6 +4264,25 @@ class TestSubvolumeSnapshotClones(TestVolumesHelper):
|
||||
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
|
||||
self.assertEqual(max_concurrent_clones, 2)
|
||||
|
||||
def test_subvolume_snapshot_config_snapshot_clone_delay(self):
|
||||
"""
|
||||
Validate 'snapshot_clone_delay' config option
|
||||
"""
|
||||
|
||||
# get the default delay before starting the clone
|
||||
default_timeout = int(self.config_get('mgr', 'mgr/volumes/snapshot_clone_delay'))
|
||||
self.assertEqual(default_timeout, 0)
|
||||
|
||||
# Insert delay of 2 seconds at the beginning of the snapshot clone
|
||||
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
|
||||
default_timeout = int(self.config_get('mgr', 'mgr/volumes/snapshot_clone_delay'))
|
||||
self.assertEqual(default_timeout, 2)
|
||||
|
||||
# Decrease number of cloner threads
|
||||
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
|
||||
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
|
||||
self.assertEqual(max_concurrent_clones, 2)
|
||||
|
||||
def test_subvolume_under_group_snapshot_clone(self):
|
||||
subvolume = self._generate_random_subvolume_name()
|
||||
group = self._generate_random_group_name()
|
||||
|
@ -224,12 +224,15 @@ def handle_clone_complete(volume_client, volname, index, groupname, subvolname,
|
||||
log.error("failed to detach clone from snapshot: {0}".format(e))
|
||||
return (None, True)
|
||||
|
||||
def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
|
||||
def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay):
|
||||
finished = False
|
||||
current_state = None
|
||||
try:
|
||||
current_state = get_clone_state(volume_client, volname, groupname, subvolname)
|
||||
log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
|
||||
if current_state == SubvolumeStates.STATE_PENDING:
|
||||
time.sleep(snapshot_clone_delay)
|
||||
log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay))
|
||||
while not finished:
|
||||
handler = state_table.get(current_state, None)
|
||||
if not handler:
|
||||
@ -244,7 +247,7 @@ def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_t
|
||||
log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
|
||||
subvolname, current_state, ve))
|
||||
|
||||
def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
|
||||
def clone(volume_client, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay):
|
||||
log.info("cloning to subvolume path: {0}".format(clone_path))
|
||||
resolved = resolve(volume_client.volspec, clone_path)
|
||||
|
||||
@ -254,7 +257,7 @@ def clone(volume_client, volname, index, clone_path, state_table, should_cancel)
|
||||
|
||||
try:
|
||||
log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
|
||||
start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
|
||||
start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay)
|
||||
log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
|
||||
except VolumeException as ve:
|
||||
log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
|
||||
@ -265,8 +268,9 @@ class Cloner(AsyncJobs):
|
||||
this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
|
||||
the driver. file types supported are directories, symbolic links and regular files.
|
||||
"""
|
||||
def __init__(self, volume_client, tp_size):
|
||||
def __init__(self, volume_client, tp_size, snapshot_clone_delay):
|
||||
self.vc = volume_client
|
||||
self.snapshot_clone_delay = snapshot_clone_delay
|
||||
self.state_table = {
|
||||
SubvolumeStates.STATE_PENDING : handle_clone_pending,
|
||||
SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
|
||||
@ -279,6 +283,9 @@ class Cloner(AsyncJobs):
|
||||
def reconfigure_max_concurrent_clones(self, tp_size):
|
||||
return super(Cloner, self).reconfigure_max_async_threads(tp_size)
|
||||
|
||||
def reconfigure_snapshot_clone_delay(self, timeout):
|
||||
self.snapshot_clone_delay = timeout
|
||||
|
||||
def is_clone_cancelable(self, clone_state):
|
||||
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
|
||||
|
||||
@ -344,4 +351,4 @@ class Cloner(AsyncJobs):
|
||||
return get_next_clone_entry(self.vc, volname, running_jobs)
|
||||
|
||||
def execute_job(self, volname, job, should_cancel):
|
||||
clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)
|
||||
clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay)
|
||||
|
@ -51,7 +51,7 @@ class VolumeClient(CephfsClient["Module"]):
|
||||
super().__init__(mgr)
|
||||
# volume specification
|
||||
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
|
||||
self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
|
||||
self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
|
||||
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
|
||||
# on startup, queue purge job for available volumes to kickstart
|
||||
# purge for leftover subvolume entries in trash. note that, if the
|
||||
|
@ -342,14 +342,19 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
'max_concurrent_clones',
|
||||
type='int',
|
||||
default=4,
|
||||
desc='Number of asynchronous cloner threads',
|
||||
)
|
||||
desc='Number of asynchronous cloner threads'),
|
||||
Option(
|
||||
'snapshot_clone_delay',
|
||||
type='int',
|
||||
default=0,
|
||||
desc='Delay clone begin operation by snapshot_clone_delay seconds')
|
||||
]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.inited = False
|
||||
# for mypy
|
||||
self.max_concurrent_clones = None
|
||||
self.snapshot_clone_delay = None
|
||||
self.lock = threading.Lock()
|
||||
super(Module, self).__init__(*args, **kwargs)
|
||||
# Initialize config option members
|
||||
@ -378,6 +383,8 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
|
||||
if self.inited:
|
||||
if opt['name'] == "max_concurrent_clones":
|
||||
self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
|
||||
elif opt['name'] == "snapshot_clone_delay":
|
||||
self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay)
|
||||
|
||||
def handle_command(self, inbuf, cmd):
|
||||
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
|
||||
|
Loading…
Reference in New Issue
Block a user