mirror of https://github.com/ceph/ceph
166 lines
5.0 KiB
Python
166 lines
5.0 KiB
Python
"""
|
|
Thrash mds by simulating failures
|
|
"""
|
|
import logging
|
|
import contextlib
|
|
|
|
from gevent import sleep, GreenletExit
|
|
from gevent.greenlet import Greenlet
|
|
from gevent.event import Event
|
|
from teuthology import misc as teuthology
|
|
|
|
from tasks import ceph_manager
|
|
from tasks.cephfs.filesystem import MDSCluster, Filesystem
|
|
from tasks.thrasher import Thrasher
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
class ForwardScrubber(Thrasher, Greenlet):
|
|
"""
|
|
ForwardScrubber::
|
|
|
|
The ForwardScrubber does forward scrubbing of file-systems during execution
|
|
of other tasks (workunits, etc).
|
|
"""
|
|
|
|
def __init__(self, fs, scrub_timeout=300, sleep_between_iterations=1):
|
|
super(ForwardScrubber, self).__init__()
|
|
|
|
self.logger = log.getChild('fs.[{f}]'.format(f=fs.name))
|
|
self.fs = fs
|
|
self.name = 'thrasher.fs.[{f}]'.format(f=fs.name)
|
|
self.stopping = Event()
|
|
self.scrub_timeout = scrub_timeout
|
|
self.sleep_between_iterations = sleep_between_iterations
|
|
|
|
def _run(self):
|
|
try:
|
|
self.do_scrub()
|
|
except Exception as e:
|
|
self.set_thrasher_exception(e)
|
|
self.logger.exception("exception:")
|
|
# allow successful completion so gevent doesn't see an exception...
|
|
|
|
def stop(self):
|
|
self.stopping.set()
|
|
|
|
def do_scrub(self):
|
|
"""
|
|
Perform the file-system scrubbing
|
|
"""
|
|
self.logger.info(f'start scrubbing fs: {self.fs.name}')
|
|
|
|
try:
|
|
while not self.stopping.is_set():
|
|
self._scrub()
|
|
sleep(self.sleep_between_iterations)
|
|
except GreenletExit:
|
|
pass
|
|
|
|
self.logger.info(f'end scrubbing fs: {self.fs.name}')
|
|
|
|
def _scrub(self, path="/", recursive=True):
|
|
self.logger.info(f"scrubbing fs: {self.fs.name}")
|
|
scrubopts = ["force"]
|
|
if recursive:
|
|
scrubopts.append("recursive")
|
|
out_json = self.fs.run_scrub(["start", path, ",".join(scrubopts)])
|
|
assert out_json is not None
|
|
|
|
tag = out_json['scrub_tag']
|
|
|
|
assert tag is not None
|
|
assert out_json['return_code'] == 0
|
|
assert out_json['mode'] == 'asynchronous'
|
|
|
|
done = self.fs.wait_until_scrub_complete(tag=tag, sleep=30, timeout=self.scrub_timeout)
|
|
if not done:
|
|
raise RuntimeError('scrub timeout')
|
|
self._check_damage()
|
|
|
|
def _check_damage(self):
|
|
rdmg = self.fs.get_damage()
|
|
types = set()
|
|
for rank, dmg in rdmg.items():
|
|
if dmg:
|
|
for d in dmg:
|
|
types.add(d['damage_type'])
|
|
log.error(f"rank {rank} damaged:\n{dmg}")
|
|
if types:
|
|
raise RuntimeError(f"rank damage found: {types}")
|
|
|
|
def stop_all_fwd_scrubbers(thrashers):
|
|
for thrasher in thrashers:
|
|
if not isinstance(thrasher, ForwardScrubber):
|
|
continue
|
|
thrasher.stop()
|
|
thrasher.join()
|
|
if thrasher.exception is not None:
|
|
raise RuntimeError(f"error during scrub thrashing: {thrasher.exception}")
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def task(ctx, config):
|
|
"""
|
|
Stress test the mds by running scrub iterations while another task/workunit
|
|
is running.
|
|
Example config:
|
|
|
|
- fwd_scrub:
|
|
scrub_timeout: 300
|
|
sleep_between_iterations: 1
|
|
"""
|
|
|
|
mds_cluster = MDSCluster(ctx)
|
|
|
|
if config is None:
|
|
config = {}
|
|
assert isinstance(config, dict), \
|
|
'fwd_scrub task only accepts a dict for configuration'
|
|
mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
|
|
assert len(mdslist) > 0, \
|
|
'fwd_scrub task requires at least 1 metadata server'
|
|
|
|
(first,) = ctx.cluster.only(f'mds.{mdslist[0]}').remotes.keys()
|
|
manager = ceph_manager.CephManager(
|
|
first, ctx=ctx, logger=log.getChild('ceph_manager'),
|
|
)
|
|
|
|
# make sure everyone is in active, standby, or standby-replay
|
|
log.info('Wait for all MDSs to reach steady state...')
|
|
status = mds_cluster.status()
|
|
while True:
|
|
steady = True
|
|
for info in status.get_all():
|
|
state = info['state']
|
|
if state not in ('up:active', 'up:standby', 'up:standby-replay'):
|
|
steady = False
|
|
break
|
|
if steady:
|
|
break
|
|
sleep(2)
|
|
status = mds_cluster.status()
|
|
|
|
log.info('Ready to start scrub thrashing')
|
|
|
|
manager.wait_for_clean()
|
|
assert manager.is_clean()
|
|
|
|
if 'cluster' not in config:
|
|
config['cluster'] = 'ceph'
|
|
|
|
for fs in status.get_filesystems():
|
|
fwd_scrubber = ForwardScrubber(Filesystem(ctx, fscid=fs['id']),
|
|
config['scrub_timeout'],
|
|
config['sleep_between_iterations'])
|
|
fwd_scrubber.start()
|
|
ctx.ceph[config['cluster']].thrashers.append(fwd_scrubber)
|
|
|
|
try:
|
|
log.debug('Yielding')
|
|
yield
|
|
finally:
|
|
log.info('joining ForwardScrubbers')
|
|
stop_all_fwd_scrubbers(ctx.ceph[config['cluster']].thrashers)
|
|
log.info('done joining')
|