mirror of
https://github.com/ceph/ceph
synced 2025-01-25 12:34:46 +00:00
1eb6511bcc
Fixes: https://tracker.ceph.com/issues/48702 Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
174 lines
5.6 KiB
Python
174 lines
5.6 KiB
Python
"""
|
|
Thrash mds by simulating failures
|
|
"""
|
|
import logging
|
|
import contextlib
|
|
|
|
from gevent import sleep, GreenletExit
|
|
from gevent.greenlet import Greenlet
|
|
from gevent.event import Event
|
|
from teuthology import misc as teuthology
|
|
from teuthology import contextutil
|
|
from teuthology.orchestra.run import CommandFailedError
|
|
|
|
from tasks import ceph_manager
|
|
from tasks.cephfs.filesystem import MDSCluster, Filesystem
|
|
from tasks.thrasher import Thrasher
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
class ForwardScrubber(Thrasher, Greenlet):
|
|
"""
|
|
ForwardScrubber::
|
|
|
|
The ForwardScrubber does forward scrubbing of file-systems during execution
|
|
of other tasks (workunits, etc).
|
|
"""
|
|
|
|
def __init__(self, fs, scrub_timeout=300, sleep_between_iterations=1):
|
|
super(ForwardScrubber, self).__init__()
|
|
|
|
self.logger = log.getChild('fs.[{f}]'.format(f=fs.name))
|
|
self.fs = fs
|
|
self.name = 'thrasher.fs.[{f}]'.format(f=fs.name)
|
|
self.stopping = Event()
|
|
self.scrub_timeout = scrub_timeout
|
|
self.sleep_between_iterations = sleep_between_iterations
|
|
|
|
def _run(self):
|
|
try:
|
|
self.do_scrub()
|
|
except Exception as e:
|
|
self.set_thrasher_exception(e)
|
|
self.logger.exception("exception:")
|
|
# allow successful completion so gevent doesn't see an exception...
|
|
|
|
def stop(self):
|
|
self.stopping.set()
|
|
|
|
def do_scrub(self):
|
|
"""
|
|
Perform the file-system scrubbing
|
|
"""
|
|
self.logger.info(f'start scrubbing fs: {self.fs.name}')
|
|
|
|
try:
|
|
while not self.stopping.is_set():
|
|
self._scrub()
|
|
sleep(self.sleep_between_iterations)
|
|
except GreenletExit:
|
|
pass
|
|
|
|
self.logger.info(f'end scrubbing fs: {self.fs.name}')
|
|
|
|
def _scrub(self, path="/", recursive=True):
|
|
self.logger.info(f"scrubbing fs: {self.fs.name}")
|
|
recopt = ["recursive", "force"] if recursive else ["force"]
|
|
out_json = self.fs.rank_tell(["scrub", "start", path] + recopt)
|
|
assert out_json is not None
|
|
|
|
tag = out_json['scrub_tag']
|
|
|
|
assert tag is not None
|
|
assert out_json['return_code'] == 0
|
|
assert out_json['mode'] == 'asynchronous'
|
|
|
|
return self._wait_until_scrub_complete(tag)
|
|
|
|
def _wait_until_scrub_complete(self, tag):
|
|
# time out after scrub_timeout seconds and assume as done
|
|
with contextutil.safe_while(sleep=30, tries=self.scrub_timeout//30) as proceed:
|
|
while proceed():
|
|
try:
|
|
out_json = self.fs.rank_tell(["scrub", "status"])
|
|
assert out_json is not None
|
|
if out_json['status'] == "no active scrubs running":
|
|
self.logger.info("all active scrubs completed")
|
|
return
|
|
|
|
status = out_json['scrubs'][tag]
|
|
if status is not None:
|
|
self.logger.info(f"scrub status for tag:{tag} - {status}")
|
|
else:
|
|
self.logger.info(f"scrub has completed for tag:{tag}")
|
|
return
|
|
except CommandFailedError as e:
|
|
self.logger.exception(f"exception while getting scrub status: {e}")
|
|
self.logger.info("retrying scrub status command in a while")
|
|
pass
|
|
|
|
def stop_all_fwd_scrubbers(thrashers):
|
|
for thrasher in thrashers:
|
|
if not isinstance(thrasher, ForwardScrubber):
|
|
continue
|
|
thrasher.stop()
|
|
thrasher.join()
|
|
if thrasher.exception is not None:
|
|
raise RuntimeError(f"error during scrub thrashing: {thrasher.exception}")
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def task(ctx, config):
|
|
"""
|
|
Stress test the mds by running scrub iterations while another task/workunit
|
|
is running.
|
|
Example config:
|
|
|
|
- fwd_scrub:
|
|
scrub_timeout: 300
|
|
sleep_between_iterations: 1
|
|
"""
|
|
|
|
mds_cluster = MDSCluster(ctx)
|
|
|
|
if config is None:
|
|
config = {}
|
|
assert isinstance(config, dict), \
|
|
'fwd_scrub task only accepts a dict for configuration'
|
|
mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
|
|
assert len(mdslist) > 0, \
|
|
'fwd_scrub task requires at least 1 metadata server'
|
|
|
|
(first,) = ctx.cluster.only(f'mds.{mdslist[0]}').remotes.keys()
|
|
manager = ceph_manager.CephManager(
|
|
first, ctx=ctx, logger=log.getChild('ceph_manager'),
|
|
)
|
|
|
|
# make sure everyone is in active, standby, or standby-replay
|
|
log.info('Wait for all MDSs to reach steady state...')
|
|
status = mds_cluster.status()
|
|
while True:
|
|
steady = True
|
|
for info in status.get_all():
|
|
state = info['state']
|
|
if state not in ('up:active', 'up:standby', 'up:standby-replay'):
|
|
steady = False
|
|
break
|
|
if steady:
|
|
break
|
|
sleep(2)
|
|
status = mds_cluster.status()
|
|
|
|
log.info('Ready to start scrub thrashing')
|
|
|
|
manager.wait_for_clean()
|
|
assert manager.is_clean()
|
|
|
|
if 'cluster' not in config:
|
|
config['cluster'] = 'ceph'
|
|
|
|
for fs in status.get_filesystems():
|
|
fwd_scrubber = ForwardScrubber(Filesystem(ctx, fscid=fs['id']),
|
|
config['scrub_timeout'],
|
|
config['sleep_between_iterations'])
|
|
fwd_scrubber.start()
|
|
ctx.ceph[config['cluster']].thrashers.append(fwd_scrubber)
|
|
|
|
try:
|
|
log.debug('Yielding')
|
|
yield
|
|
finally:
|
|
log.info('joining ForwardScrubbers')
|
|
stop_all_fwd_scrubbers(ctx.ceph[config['cluster']].thrashers)
|
|
log.info('done joining')
|