mirror of
https://github.com/ceph/ceph
synced 2025-02-24 19:47:44 +00:00
mgr/vol: show clone progress in "ceph status" output
Print a progress bar for ongoing clone job in output of "ceph status". When multiple clones are ongoing, show 1 progress bar in output of "ceph status" shows average of progress made by each clone. When number of clone job is more than number of clone threads, print 2 progress bars in output of "ceph status" command; one for ongoing clone jobs and other for ongoing+pending clone jobs. Fixes: https://tracker.ceph.com/issues/61904 Signed-off-by: Rishabh Dave <ridave@redhat.com>
This commit is contained in:
parent
d7bc8282c8
commit
65b789edfb
@ -8,14 +8,16 @@ import cephfs
|
||||
|
||||
from .index import Index
|
||||
from ..exception import IndexException, VolumeException
|
||||
from ..fs_util import list_one_entry_at_a_time
|
||||
from ..fs_util import list_one_entry_at_a_time, listdir
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
PATH_MAX = 4096
|
||||
|
||||
|
||||
class CloneIndex(Index):
|
||||
SUB_GROUP_NAME = "clone"
|
||||
PATH_MAX = 4096
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
@ -47,6 +49,26 @@ class CloneIndex(Index):
|
||||
except cephfs.Error as e:
|
||||
raise IndexException(-e.args[0], e.args[1])
|
||||
|
||||
def list_entries_by_ctime_order(self):
|
||||
entry_names = listdir(self.fs, self.path, filter_files=False)
|
||||
if not entry_names:
|
||||
return []
|
||||
|
||||
# clone entries with ctime obtained by statig them. basically,
|
||||
# following is a list of tuples where each tuple has 2 memebers.
|
||||
ens_with_ctime = []
|
||||
for en in entry_names:
|
||||
d_path = os.path.join(self.path, en)
|
||||
stb = self.fs.lstat(d_path)
|
||||
|
||||
# add ctime next to clone entry
|
||||
ens_with_ctime.append((en, stb.st_ctime))
|
||||
|
||||
ens_with_ctime.sort(key=lambda ctime: en[1])
|
||||
|
||||
# remove ctime and return list of clone entries sorted by ctime.
|
||||
return [i[0] for i in ens_with_ctime]
|
||||
|
||||
def get_oldest_clone_entry(self, exclude=[]):
|
||||
min_ctime_entry = None
|
||||
exclude_tracking_ids = [v[0] for v in exclude]
|
||||
@ -61,7 +83,7 @@ class CloneIndex(Index):
|
||||
if min_ctime_entry:
|
||||
try:
|
||||
linklen = min_ctime_entry[1].st_size
|
||||
sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), CloneIndex.PATH_MAX)
|
||||
sink_path = self.fs.readlink(os.path.join(self.path, min_ctime_entry[0]), PATH_MAX)
|
||||
return (min_ctime_entry[0], sink_path[:linklen])
|
||||
except cephfs.Error as e:
|
||||
raise IndexException(-e.args[0], e.args[1])
|
||||
@ -74,7 +96,7 @@ class CloneIndex(Index):
|
||||
dpath = os.path.join(self.path, dname)
|
||||
st = self.fs.lstat(dpath)
|
||||
if stat.S_ISLNK(st.st_mode):
|
||||
target_path = self.fs.readlink(dpath, CloneIndex.PATH_MAX)
|
||||
target_path = self.fs.readlink(dpath, PATH_MAX)
|
||||
if sink_path == target_path[:st.st_size]:
|
||||
return dname
|
||||
return None
|
||||
|
@ -5,9 +5,22 @@ and destination directory for the copy operation that is performed for snapshot
|
||||
cloning) and pass, print, log and convert them to human readable format
|
||||
conveniently.
|
||||
'''
|
||||
from os.path import join as os_path_join
|
||||
from typing import Optional
|
||||
from logging import getLogger
|
||||
|
||||
from mgr_util import format_bytes, format_dimless
|
||||
from .operations.volume import open_volume_lockless, list_volumes
|
||||
from .operations.subvolume import open_clone_subvol_pair_in_vol, open_subvol_in_vol
|
||||
from .operations.template import SubvolumeOpType
|
||||
from .operations.clone_index import open_clone_index, PATH_MAX
|
||||
from .operations.resolver import resolve_group_and_subvolume_name
|
||||
from .exception import VolumeException
|
||||
|
||||
from mgr_util import RTimer, format_bytes, format_dimless
|
||||
from cephfs import ObjectNotFound
|
||||
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
||||
|
||||
def get_size_ratio_str(size1, size2):
|
||||
@ -42,6 +55,11 @@ def get_amount_copied(src_path, dst_path, fs_handle):
|
||||
return size_t, size_c, percent
|
||||
|
||||
|
||||
def get_percent_copied(src_path, dst_path, fs_handle):
|
||||
_, _, percent = get_amount_copied(src_path, dst_path, fs_handle)
|
||||
return percent
|
||||
|
||||
|
||||
def get_stats(src_path, dst_path, fs_handle):
|
||||
rentries = 'ceph.dir.rentries'
|
||||
rentries_t = int(fs_handle.getxattr(src_path, rentries))
|
||||
@ -54,3 +72,240 @@ def get_stats(src_path, dst_path, fs_handle):
|
||||
'amount cloned': get_size_ratio_str(size_c, size_t),
|
||||
'files cloned': get_num_ratio_str(rentries_c, rentries_t),
|
||||
}
|
||||
|
||||
|
||||
class CloneInfo:
|
||||
|
||||
def __init__(self, volname):
|
||||
self.volname = volname
|
||||
|
||||
self.src_group_name = None
|
||||
self.src_subvol_name = None
|
||||
self.src_path = None
|
||||
|
||||
self.dst_group_name = None
|
||||
self.dst_subvol_name = None
|
||||
self.dst_path = None
|
||||
|
||||
|
||||
class CloneProgressReporter:
|
||||
|
||||
def __init__(self, volclient, vol_spec):
|
||||
self.vol_spec = vol_spec
|
||||
|
||||
# instance of VolumeClient is needed here so that call to
|
||||
# LibCephFS.getxattr() can be made.
|
||||
self.volclient = volclient
|
||||
|
||||
# need to figure out how many progress bars should be printed. print 1
|
||||
# progress bar if number of ongoing clones is less than this value,
|
||||
# else print 2.
|
||||
self.max_concurrent_clones = self.volclient.mgr.max_concurrent_clones
|
||||
|
||||
# Creating an RTimer instance in advance so that we can check if clone
|
||||
# reporting has already been initiated by calling RTimer.is_alive().
|
||||
self.update_task = RTimer(1, self._update_progress_bars)
|
||||
|
||||
def initiate_reporting(self):
|
||||
if self.update_task.is_alive():
|
||||
log.info('progress reporting thread is already alive, not '
|
||||
'initiating it again')
|
||||
return
|
||||
|
||||
log.info('initiating progress reporting for clones...')
|
||||
# progress event ID for ongoing clone jobs
|
||||
self.on_pev_id: Optional[str] = 'mgr-vol-ongoing-clones'
|
||||
# progress event ID for ongoing+pending clone jobs
|
||||
self.onpen_pev_id: Optional[str] = 'mgr-vol-total-clones'
|
||||
|
||||
self.update_task = RTimer(1, self._update_progress_bars)
|
||||
self.update_task.start()
|
||||
log.info('progress reporting for clones has been initiated')
|
||||
|
||||
def _get_clone_dst_info(self, fs_handle, ci, clone_entry,
|
||||
clone_index_path):
|
||||
log.debug('collecting info for cloning destination')
|
||||
|
||||
ce_path = os_path_join(clone_index_path, clone_entry)
|
||||
# XXX: This may raise ObjectNotFound exception. As soon as cloning is
|
||||
# finished, clone entry is deleted by cloner thread. This exception is
|
||||
# handled in _get_info_for_all_clones().
|
||||
dst_subvol_base_path = fs_handle.readlink(ce_path, PATH_MAX).\
|
||||
decode('utf-8')
|
||||
|
||||
ci.dst_group_name, ci.dst_subvol_name = \
|
||||
resolve_group_and_subvolume_name(self.vol_spec, dst_subvol_base_path)
|
||||
with open_subvol_in_vol(self.volclient, self.vol_spec, ci.volname,
|
||||
ci.dst_group_name, ci.dst_subvol_name,
|
||||
SubvolumeOpType.CLONE_INTERNAL) \
|
||||
as (_, _, dst_subvol):
|
||||
ci.dst_path = dst_subvol.path
|
||||
log.debug(f'destination subvolume path for clone - {ci.dst_path}')
|
||||
|
||||
log.debug('finished collecting info for cloning destination')
|
||||
|
||||
def _get_clone_src_info(self, fs_handle, ci):
|
||||
log.debug('collecting info for cloning source')
|
||||
|
||||
with open_clone_subvol_pair_in_vol(self.volclient, self.vol_spec,
|
||||
ci.volname, ci.dst_group_name,
|
||||
ci.dst_subvol_name) as \
|
||||
(dst_subvol, src_subvol, snap_name):
|
||||
ci.src_group_name = src_subvol.group_name
|
||||
ci.src_subvol_name = src_subvol.subvolname
|
||||
ci.src_path = src_subvol.snapshot_data_path(snap_name)
|
||||
log.debug(f'source subvolume path for clone - {ci.src_path}')
|
||||
|
||||
log.debug('finished collecting info for cloning source')
|
||||
|
||||
def _get_info_for_all_clones(self):
|
||||
clones:list[CloneInfo] = []
|
||||
|
||||
log.debug('collecting all entries in clone index...')
|
||||
volnames = list_volumes(self.volclient.mgr)
|
||||
for volname in volnames:
|
||||
with open_volume_lockless(self.volclient, volname) as fs_handle:
|
||||
with open_clone_index(fs_handle, self.vol_spec) as clone_index:
|
||||
clone_index_path = clone_index.path
|
||||
# get clone in order in which they were launched, this
|
||||
# should be same as the ctime on clone entry.
|
||||
clone_index_entries = clone_index.list_entries_by_ctime_order()
|
||||
log.debug('finished collecting all clone index entries, '
|
||||
f'found {len(clones)} clone index entries')
|
||||
|
||||
log.debug('collecting info for clones found through clone index '
|
||||
'entries...')
|
||||
for ce in clone_index_entries:
|
||||
ci = CloneInfo(volname)
|
||||
|
||||
try:
|
||||
self._get_clone_dst_info(fs_handle, ci, ce,
|
||||
clone_index_path)
|
||||
self._get_clone_src_info(fs_handle, ci)
|
||||
except ObjectNotFound as e:
|
||||
log.info('Exception ObjectNotFound was raised. Apparently '
|
||||
'entry in clone index was removed because one of '
|
||||
'the clone job(s) has completed/cancelled, '
|
||||
'therefore ignoring and proceeding. '
|
||||
f'Printing the exception: {e}')
|
||||
continue
|
||||
except VolumeException as e:
|
||||
if e.error_str != 'error fetching subvolume metadata':
|
||||
raise
|
||||
log.info('Exception VolumeException was raised. Apparently '
|
||||
'an entry from the metadata file of clone source '
|
||||
'was removed because one of the clone job(s) has '
|
||||
'completed/cancelled. Therefore ignoring and '
|
||||
f'proceeding Printing the exception: {e}')
|
||||
continue
|
||||
|
||||
if not ci.src_path or not ci.dst_path:
|
||||
continue
|
||||
|
||||
clones.append(ci)
|
||||
|
||||
log.debug('finished collecting info on all clones, found '
|
||||
f'{len(clones)} clones')
|
||||
return clones
|
||||
|
||||
def _update_progress_bar_event(self, ev_id, ev_msg, ev_progress_fraction):
|
||||
log.debug(f'ev_id = {ev_id} ev_progress_fraction = {ev_progress_fraction}')
|
||||
log.debug(f'ev_msg = {ev_msg}')
|
||||
log.debug('calling update() from mgr/update module')
|
||||
|
||||
self.volclient.mgr.remote('progress', 'update', ev_id=ev_id,
|
||||
ev_msg=ev_msg,
|
||||
ev_progress=ev_progress_fraction,
|
||||
refs=['mds', 'clone'], add_to_ceph_s=True)
|
||||
|
||||
log.debug('call to update() from mgr/update module was successful')
|
||||
|
||||
def _update_progress_bars(self):
|
||||
'''
|
||||
Look for amount of progress made by all cloning operations and prints
|
||||
progress bars, in "ceph -s" output, for average progress made
|
||||
accordingly.
|
||||
|
||||
This method is supposed to be run only by instance of class RTimer
|
||||
present in this class.
|
||||
'''
|
||||
clones = self._get_info_for_all_clones()
|
||||
if not clones:
|
||||
self.finish()
|
||||
return
|
||||
|
||||
# onpen bar (that is progress bar for clone jobs in ongoing and pending
|
||||
# state) is printed when clones are in pending state. it is kept in
|
||||
# printing until all clone jobs finish.
|
||||
show_onpen_bar = True if len(clones) > self.max_concurrent_clones \
|
||||
else False
|
||||
|
||||
percent = 0.0
|
||||
|
||||
assert self.on_pev_id is not None
|
||||
sum_percent_ongoing = 0.0
|
||||
avg_percent_ongoing = 0.0
|
||||
total_ongoing_clones = min(len(clones), self.max_concurrent_clones)
|
||||
|
||||
if show_onpen_bar:
|
||||
assert self.onpen_pev_id is not None
|
||||
sum_percent_onpen = 0.0
|
||||
avg_percent_onpen = 0.0
|
||||
total_onpen_clones = len(clones)
|
||||
|
||||
for clone in clones:
|
||||
with open_volume_lockless(self.volclient, clone.volname) as \
|
||||
fs_handle:
|
||||
percent = get_percent_copied(clone.src_path, clone.dst_path,
|
||||
fs_handle)
|
||||
if clone in clones[:total_ongoing_clones]:
|
||||
sum_percent_ongoing += percent
|
||||
if show_onpen_bar:
|
||||
sum_percent_onpen += percent
|
||||
|
||||
avg_percent_ongoing = round(sum_percent_ongoing / total_ongoing_clones, 3)
|
||||
# progress module takes progress as a fraction between 0.0 to 1.0.
|
||||
avg_progress_fraction = avg_percent_ongoing / 100
|
||||
msg = (f'{total_ongoing_clones} ongoing clones - average progress is '
|
||||
f'{avg_percent_ongoing}%')
|
||||
self._update_progress_bar_event(ev_id=self.on_pev_id, ev_msg=msg,
|
||||
ev_progress_fraction=avg_progress_fraction)
|
||||
log.debug('finished updating progress bar for ongoing clones with '
|
||||
f'following message - {msg}')
|
||||
|
||||
if show_onpen_bar:
|
||||
avg_percent_onpen = round(sum_percent_onpen / total_onpen_clones, 3)
|
||||
# progress module takes progress as a fraction between 0.0 to 1.0.
|
||||
avg_progress_fraction = avg_percent_onpen / 100
|
||||
msg = (f'Total {total_onpen_clones} clones - average progress is '
|
||||
f'{avg_percent_onpen}%')
|
||||
self._update_progress_bar_event(ev_id=self.onpen_pev_id, ev_msg=msg,
|
||||
ev_progress_fraction=avg_progress_fraction)
|
||||
log.debug('finished updating progress bar for ongoing+pending '
|
||||
f'clones with following message - {msg}')
|
||||
|
||||
def _finish_progress_events(self):
|
||||
'''
|
||||
Remove progress bars from "ceph status" output.
|
||||
'''
|
||||
log.info('removing progress bars from "ceph status" output')
|
||||
|
||||
assert self.on_pev_id is not None
|
||||
assert self.onpen_pev_id is not None
|
||||
|
||||
self.volclient.mgr.remote('progress', 'complete', self.on_pev_id)
|
||||
self.on_pev_id = None
|
||||
|
||||
self.volclient.mgr.remote('progress', 'complete', self.onpen_pev_id)
|
||||
self.onpen_pev_id = None
|
||||
|
||||
log.info('finished removing progress bars from "ceph status" output')
|
||||
|
||||
def finish(self):
|
||||
'''
|
||||
All cloning jobs have been completed. Terminate this RTimer thread.
|
||||
'''
|
||||
self._finish_progress_events()
|
||||
|
||||
log.info(f'marking this RTimer thread as finished; thread object ID - {self}')
|
||||
self.update_task.finished.set()
|
||||
|
@ -28,6 +28,7 @@ from .exception import VolumeException, ClusterError, ClusterTimeout, \
|
||||
from .async_cloner import Cloner
|
||||
from .purge_queue import ThreadPoolPurgeQueueMixin
|
||||
from .operations.template import SubvolumeOpType
|
||||
from .stats_util import CloneProgressReporter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from volumes import Module
|
||||
@ -61,6 +62,8 @@ class VolumeClient(CephfsClient["Module"]):
|
||||
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
|
||||
self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay,
|
||||
self.mgr.snapshot_clone_no_wait)
|
||||
self.clone_progress_reporter = CloneProgressReporter(self,
|
||||
self.volspec)
|
||||
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
|
||||
# on startup, queue purge job for available volumes to kickstart
|
||||
# purge for leftover subvolume entries in trash. note that, if the
|
||||
@ -800,6 +803,7 @@ class VolumeClient(CephfsClient["Module"]):
|
||||
else:
|
||||
s_subvolume.attach_snapshot(s_snapname, t_subvolume)
|
||||
self.cloner.queue_job(volname)
|
||||
self.clone_progress_reporter.initiate_reporting()
|
||||
except VolumeException as ve:
|
||||
try:
|
||||
t_subvolume.remove()
|
||||
|
Loading…
Reference in New Issue
Block a user