mirror of
https://github.com/ceph/ceph
synced 2025-04-01 23:02:17 +00:00
Merge pull request #13262 from batrick/multimds-thrasher
Add multimds:thrash sub-suite and fix bugs in thrasher for multimds Reviewed-by: John Spray <john.spray@redhat.com>
This commit is contained in:
commit
73100305e5
qa
0
qa/suites/multimds/thrash/%
Normal file
0
qa/suites/multimds/thrash/%
Normal file
1
qa/suites/multimds/thrash/begin.yaml
Symbolic link
1
qa/suites/multimds/thrash/begin.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
../../fs/thrash/begin.yaml
|
1
qa/suites/multimds/thrash/ceph-thrash
Symbolic link
1
qa/suites/multimds/thrash/ceph-thrash
Symbolic link
@ -0,0 +1 @@
|
||||
../../fs/thrash/ceph-thrash/
|
4
qa/suites/multimds/thrash/clusters/3-mds-2-standby.yaml
Normal file
4
qa/suites/multimds/thrash/clusters/3-mds-2-standby.yaml
Normal file
@ -0,0 +1,4 @@
|
||||
roles:
|
||||
- [mon.a, mon.c, mds.a, osd.0, osd.1, osd.2, mds.d-s]
|
||||
- [mon.b, mds.b, mds.c, osd.3, osd.4, osd.5, mds.e-s]
|
||||
- [client.0]
|
4
qa/suites/multimds/thrash/clusters/9-mds-3-standby.yaml
Normal file
4
qa/suites/multimds/thrash/clusters/9-mds-3-standby.yaml
Normal file
@ -0,0 +1,4 @@
|
||||
roles:
|
||||
- [mon.a, mon.c, mds.a, mds.b, mds.c, mds.d, osd.0, osd.1, osd.2, mds.j-s, mds.k-s]
|
||||
- [mon.b, mds.e, mds.f, mds.g, mds.h, mds.i, osd.3, osd.4, osd.5, mds.l-s]
|
||||
- [client.0]
|
1
qa/suites/multimds/thrash/fs
Symbolic link
1
qa/suites/multimds/thrash/fs
Symbolic link
@ -0,0 +1 @@
|
||||
../../fs/thrash/fs/
|
1
qa/suites/multimds/thrash/mount/fuse.yaml
Symbolic link
1
qa/suites/multimds/thrash/mount/fuse.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
../../../../cephfs/mount/fuse.yaml
|
1
qa/suites/multimds/thrash/mount/kclient.yaml
Symbolic link
1
qa/suites/multimds/thrash/mount/kclient.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
../../../../cephfs/mount/kclient.yaml
|
1
qa/suites/multimds/thrash/msgr-failures
Symbolic link
1
qa/suites/multimds/thrash/msgr-failures
Symbolic link
@ -0,0 +1 @@
|
||||
../../fs/thrash/msgr-failures/
|
0
qa/suites/multimds/thrash/overrides/%
Normal file
0
qa/suites/multimds/thrash/overrides/%
Normal file
1
qa/suites/multimds/thrash/overrides/fuse-default-perm-no.yaml
Symbolic link
1
qa/suites/multimds/thrash/overrides/fuse-default-perm-no.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
../../../../cephfs/overrides/fuse/default-perm/no.yaml
|
1
qa/suites/multimds/thrash/overrides/thrash
Symbolic link
1
qa/suites/multimds/thrash/overrides/thrash
Symbolic link
@ -0,0 +1 @@
|
||||
../../../fs/thrash/overrides/
|
7
qa/suites/multimds/thrash/overrides/thrash_debug.yaml
Normal file
7
qa/suites/multimds/thrash/overrides/thrash_debug.yaml
Normal file
@ -0,0 +1,7 @@
|
||||
overrides:
|
||||
ceph:
|
||||
conf:
|
||||
mds:
|
||||
debug ms: 10
|
||||
client:
|
||||
debug ms: 10
|
@ -0,0 +1 @@
|
||||
../../../fs/thrash/tasks/cfuse_workunit_suites_fsstress.yaml
|
1
qa/suites/multimds/thrash/tasks/cfuse_workunit_suites_pjd.yaml
Symbolic link
1
qa/suites/multimds/thrash/tasks/cfuse_workunit_suites_pjd.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
../../../fs/thrash/tasks/cfuse_workunit_suites_pjd.yaml
|
1
qa/suites/multimds/thrash/tasks/cfuse_workunit_trivial_sync.yaml
Symbolic link
1
qa/suites/multimds/thrash/tasks/cfuse_workunit_trivial_sync.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
../../../fs/thrash/tasks/cfuse_workunit_trivial_sync.yaml
|
@ -337,39 +337,15 @@ def cephfs_setup(ctx, config):
|
||||
if mdss.remotes:
|
||||
log.info('Setting up CephFS filesystem...')
|
||||
|
||||
Filesystem(ctx, create='cephfs') # TODO: make Filesystem cluster-aware
|
||||
fs = Filesystem(ctx, create='cephfs')
|
||||
|
||||
is_active_mds = lambda role: 'mds.' in role and not role.endswith('-s') and '-s-' not in role
|
||||
all_roles = [item for remote_roles in mdss.remotes.values() for item in remote_roles]
|
||||
num_active = len([r for r in all_roles if is_active_mds(r)])
|
||||
mon_remote.run(
|
||||
args=[
|
||||
'sudo',
|
||||
'adjust-ulimits',
|
||||
'ceph-coverage',
|
||||
coverage_dir,
|
||||
'ceph', 'mds', 'set', 'allow_multimds', 'true',
|
||||
'--yes-i-really-mean-it'],
|
||||
check_status=False, # probably old version, upgrade test
|
||||
)
|
||||
mon_remote.run(args=[
|
||||
'sudo',
|
||||
'adjust-ulimits',
|
||||
'ceph-coverage',
|
||||
coverage_dir,
|
||||
'ceph',
|
||||
'--cluster', cluster_name,
|
||||
'mds', 'set_max_mds', str(num_active)])
|
||||
mon_remote.run(
|
||||
args=[
|
||||
'sudo',
|
||||
'adjust-ulimits',
|
||||
'ceph-coverage',
|
||||
coverage_dir,
|
||||
'ceph', 'mds', 'set', 'allow_dirfrags', 'true',
|
||||
'--yes-i-really-mean-it'],
|
||||
check_status=False, # probably old version, upgrade test
|
||||
)
|
||||
|
||||
fs.set_allow_multimds(True)
|
||||
fs.set_max_mds(num_active)
|
||||
fs.set_allow_dirfrags(True)
|
||||
|
||||
yield
|
||||
|
||||
|
@ -37,7 +37,7 @@ class FSStatus(object):
|
||||
"""
|
||||
def __init__(self, mon_manager):
|
||||
self.mon = mon_manager
|
||||
self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json-pretty"))
|
||||
self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
|
||||
|
||||
def __str__(self):
|
||||
return json.dumps(self.map, indent = 2, sort_keys = True)
|
||||
@ -416,6 +416,12 @@ class Filesystem(MDSCluster):
|
||||
def set_max_mds(self, max_mds):
|
||||
self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
|
||||
|
||||
def set_allow_dirfrags(self, yes):
|
||||
self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
|
||||
|
||||
def set_allow_multimds(self, yes):
|
||||
self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_multimds", str(yes).lower(), '--yes-i-really-mean-it')
|
||||
|
||||
def get_pgs_per_fs_pool(self):
|
||||
"""
|
||||
Calculate how many PGs to use when creating a pool, in order to avoid raising any
|
||||
|
@ -426,8 +426,7 @@ class TestDataScan(CephFSTestCase):
|
||||
That when injecting a dentry into a fragmented directory, we put it in the right fragment.
|
||||
"""
|
||||
|
||||
self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_dirfrags", "true",
|
||||
"--yes-i-really-mean-it")
|
||||
self.fs.set_allow_dirfrags(True)
|
||||
|
||||
file_count = 100
|
||||
file_names = ["%s" % n for n in range(0, file_count)]
|
||||
|
@ -217,10 +217,8 @@ class TestStandbyReplay(CephFSTestCase):
|
||||
|
||||
# Create FS alpha and get mds_a to come up as active
|
||||
fs_a = self.mds_cluster.newfs("alpha")
|
||||
fs_a.mon_manager.raw_cluster_cmd('fs', 'set', fs_a.name,
|
||||
'allow_multimds', "true",
|
||||
"--yes-i-really-mean-it")
|
||||
fs_a.mon_manager.raw_cluster_cmd('fs', 'set', fs_a.name, 'max_mds', "2")
|
||||
fs_a.set_allow_multimds(True)
|
||||
fs_a.set_max_mds(2)
|
||||
|
||||
self.mds_cluster.mds_restart(mds_a)
|
||||
self.wait_until_equal(lambda: fs_a.get_active_names(), [mds_a], 30)
|
||||
@ -239,7 +237,7 @@ class TestStandbyReplay(CephFSTestCase):
|
||||
self.assertEqual(info_a_s['state'], "up:standby-replay")
|
||||
|
||||
# Shrink the cluster
|
||||
fs_a.mon_manager.raw_cluster_cmd('fs', 'set', fs_a.name, 'max_mds', "1")
|
||||
fs_a.set_max_mds(1)
|
||||
fs_a.mon_manager.raw_cluster_cmd("mds", "stop", "{0}:1".format(fs_a.name))
|
||||
self.wait_until_equal(
|
||||
lambda: fs_a.get_active_names(), [mds_a],
|
||||
@ -374,32 +372,27 @@ class TestMultiFilesystems(CephFSTestCase):
|
||||
def test_grow_shrink(self):
|
||||
# Usual setup...
|
||||
fs_a, fs_b = self._setup_two()
|
||||
fs_a.mon_manager.raw_cluster_cmd("fs", "set", fs_a.name,
|
||||
"allow_multimds", "true",
|
||||
"--yes-i-really-mean-it")
|
||||
|
||||
fs_b.mon_manager.raw_cluster_cmd("fs", "set", fs_b.name,
|
||||
"allow_multimds", "true",
|
||||
"--yes-i-really-mean-it")
|
||||
fs_a.set_allow_multimds(True)
|
||||
fs_b.set_allow_multimds(True)
|
||||
|
||||
# Increase max_mds on fs_b, see a standby take up the role
|
||||
fs_b.mon_manager.raw_cluster_cmd('fs', 'set', fs_b.name, 'max_mds', "2")
|
||||
fs_b.set_max_mds(2)
|
||||
self.wait_until_equal(lambda: len(fs_b.get_active_names()), 2, 30,
|
||||
reject_fn=lambda v: v > 2 or v < 1)
|
||||
|
||||
# Increase max_mds on fs_a, see a standby take up the role
|
||||
fs_a.mon_manager.raw_cluster_cmd('fs', 'set', fs_a.name, 'max_mds', "2")
|
||||
fs_a.set_max_mds(2)
|
||||
self.wait_until_equal(lambda: len(fs_a.get_active_names()), 2, 30,
|
||||
reject_fn=lambda v: v > 2 or v < 1)
|
||||
|
||||
# Shrink fs_b back to 1, see a daemon go back to standby
|
||||
fs_b.mon_manager.raw_cluster_cmd('fs', 'set', fs_b.name, 'max_mds', "1")
|
||||
fs_b.mon_manager.raw_cluster_cmd('mds', 'deactivate', "{0}:1".format(fs_b.name))
|
||||
fs_b.set_max_mds(1)
|
||||
fs_b.deactivate(1)
|
||||
self.wait_until_equal(lambda: len(fs_b.get_active_names()), 1, 30,
|
||||
reject_fn=lambda v: v > 2 or v < 1)
|
||||
|
||||
# Grow fs_a up to 3, see the former fs_b daemon join it.
|
||||
fs_a.mon_manager.raw_cluster_cmd('fs', 'set', fs_a.name, 'max_mds', "3")
|
||||
fs_a.set_max_mds(3)
|
||||
self.wait_until_equal(lambda: len(fs_a.get_active_names()), 3, 60,
|
||||
reject_fn=lambda v: v > 3 or v < 2)
|
||||
|
||||
@ -537,19 +530,13 @@ class TestMultiFilesystems(CephFSTestCase):
|
||||
|
||||
# Create two filesystems which should have two ranks each
|
||||
fs_a = self.mds_cluster.newfs("alpha")
|
||||
fs_a.mon_manager.raw_cluster_cmd("fs", "set", fs_a.name,
|
||||
"allow_multimds", "true",
|
||||
"--yes-i-really-mean-it")
|
||||
fs_a.set_allow_multimds(True)
|
||||
|
||||
fs_b = self.mds_cluster.newfs("bravo")
|
||||
fs_b.mon_manager.raw_cluster_cmd("fs", "set", fs_b.name,
|
||||
"allow_multimds", "true",
|
||||
"--yes-i-really-mean-it")
|
||||
fs_b.set_allow_multimds(True)
|
||||
|
||||
fs_a.mon_manager.raw_cluster_cmd('fs', 'set', fs_a.name,
|
||||
'max_mds', "2")
|
||||
fs_b.mon_manager.raw_cluster_cmd('fs', 'set', fs_b.name,
|
||||
'max_mds', "2")
|
||||
fs_a.set_max_mds(2)
|
||||
fs_b.set_max_mds(2)
|
||||
|
||||
# Set all the daemons to have a FSCID assignment but no other
|
||||
# standby preferences.
|
||||
|
@ -38,9 +38,7 @@ class TestFragmentation(CephFSTestCase):
|
||||
for k, v in kwargs.items():
|
||||
self.ceph_cluster.set_ceph_conf("mds", k, v.__str__())
|
||||
|
||||
self.fs.mon_manager.raw_cluster_cmd("fs", "set", self.fs.name,
|
||||
"allow_dirfrags", "true",
|
||||
"--yes-i-really-mean-it")
|
||||
self.fs.set_allow_dirfrags(True)
|
||||
|
||||
self.mds_cluster.mds_fail_restart()
|
||||
self.fs.wait_for_daemons()
|
||||
|
@ -160,9 +160,8 @@ class TestJournalRepair(CephFSTestCase):
|
||||
"""
|
||||
|
||||
# Set max_mds to 2
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "allow_multimds",
|
||||
"true", "--yes-i-really-mean-it")
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "2")
|
||||
self.fs.set_allow_multimds(True)
|
||||
self.fs.set_max_mds(2)
|
||||
|
||||
# See that we have two active MDSs
|
||||
self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
|
||||
|
@ -9,9 +9,8 @@ success = "mantle balancer version changed: "
|
||||
class TestMantle(CephFSTestCase):
|
||||
def start_mantle(self):
|
||||
self.wait_for_health_clear(timeout=30)
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "allow_multimds",
|
||||
"true", "--yes-i-really-mean-it")
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "2")
|
||||
self.fs.set_allow_multimds(True)
|
||||
self.fs.set_max_mds(2)
|
||||
self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
|
||||
reject_fn=lambda v: v > 2 or v < 1)
|
||||
|
||||
|
@ -99,9 +99,8 @@ class TestSessionMap(CephFSTestCase):
|
||||
self.fs.wait_for_daemons()
|
||||
|
||||
# I would like two MDSs, so that I can do an export dir later
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "allow_multimds",
|
||||
"true", "--yes-i-really-mean-it")
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "2")
|
||||
self.fs.set_allow_multimds(True)
|
||||
self.fs.set_max_mds(2)
|
||||
self.fs.wait_for_daemons()
|
||||
|
||||
active_mds_names = self.fs.get_active_names()
|
||||
|
@ -415,9 +415,8 @@ class TestStrays(CephFSTestCase):
|
||||
"""
|
||||
|
||||
# Set up two MDSs
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "allow_multimds",
|
||||
"true", "--yes-i-really-mean-it")
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "2")
|
||||
self.fs.set_allow_multimds(True)
|
||||
self.fs.set_max_mds(2)
|
||||
|
||||
# See that we have two active MDSs
|
||||
self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
|
||||
@ -486,8 +485,8 @@ class TestStrays(CephFSTestCase):
|
||||
self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
|
||||
|
||||
# Shut down rank 1
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1")
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1")
|
||||
self.fs.set_max_mds(1)
|
||||
self.fs.deactivate(1)
|
||||
|
||||
# Wait til we get to a single active MDS mdsmap state
|
||||
def is_stopped():
|
||||
@ -693,7 +692,7 @@ class TestStrays(CephFSTestCase):
|
||||
That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
|
||||
"""
|
||||
|
||||
self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_dirfrags", "true", "--yes-i-really-mean-it")
|
||||
self.fs.set_allow_dirfrags(True)
|
||||
|
||||
LOW_LIMIT = 50
|
||||
for mds in self.fs.get_daemon_names():
|
||||
|
@ -4,9 +4,12 @@ Thrash mds by simulating failures
|
||||
import logging
|
||||
import contextlib
|
||||
import ceph_manager
|
||||
import itertools
|
||||
import random
|
||||
import signal
|
||||
import time
|
||||
|
||||
from gevent import sleep
|
||||
from gevent.greenlet import Greenlet
|
||||
from gevent.event import Event
|
||||
from teuthology import misc as teuthology
|
||||
@ -15,6 +18,109 @@ from tasks.cephfs.filesystem import MDSCluster, Filesystem
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class DaemonWatchdog(Greenlet):
|
||||
"""
|
||||
DaemonWatchdog::
|
||||
|
||||
Watch Ceph daemons for failures. If an extended failure is detected (i.e.
|
||||
not intentional), then the watchdog will unmount file systems and send
|
||||
SIGTERM to all daemons. The duration of an extended failure is configurable
|
||||
with watchdog_daemon_timeout.
|
||||
|
||||
watchdog_daemon_timeout [default: 300]: number of seconds a daemon
|
||||
is allowed to be failed before the watchdog will bark.
|
||||
"""
|
||||
|
||||
def __init__(self, ctx, manager, config, thrashers):
|
||||
Greenlet.__init__(self)
|
||||
self.ctx = ctx
|
||||
self.config = config
|
||||
self.e = None
|
||||
self.logger = log.getChild('daemon_watchdog')
|
||||
self.manager = manager
|
||||
self.name = 'watchdog'
|
||||
self.stopping = Event()
|
||||
self.thrashers = thrashers
|
||||
|
||||
def _run(self):
|
||||
try:
|
||||
self.watch()
|
||||
except Exception as e:
|
||||
# See _run exception comment for MDSThrasher
|
||||
self.e = e
|
||||
self.logger.exception("exception:")
|
||||
# allow successful completion so gevent doesn't see an exception...
|
||||
|
||||
def log(self, x):
|
||||
"""Write data to logger"""
|
||||
self.logger.info(x)
|
||||
|
||||
def stop(self):
|
||||
self.stopping.set()
|
||||
|
||||
def bark(self):
|
||||
self.log("BARK! unmounting mounts and killing all daemons")
|
||||
for mount in self.ctx.mounts.values():
|
||||
try:
|
||||
mount.umount_wait(force=True)
|
||||
except:
|
||||
self.logger.exception("ignoring exception:")
|
||||
daemons = []
|
||||
daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
|
||||
daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
|
||||
for daemon in daemons:
|
||||
try:
|
||||
daemon.signal(signal.SIGTERM)
|
||||
except:
|
||||
self.logger.exception("ignoring exception:")
|
||||
|
||||
def watch(self):
|
||||
self.log("watchdog starting")
|
||||
daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
|
||||
daemon_failure_time = {}
|
||||
while not self.stopping.is_set():
|
||||
bark = False
|
||||
now = time.time()
|
||||
|
||||
mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
|
||||
mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
|
||||
clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
|
||||
|
||||
#for daemon in mons:
|
||||
# self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
|
||||
#for daemon in mdss:
|
||||
# self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
|
||||
|
||||
daemon_failures = []
|
||||
daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
|
||||
daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
|
||||
for daemon in daemon_failures:
|
||||
name = daemon.role + '.' + daemon.id_
|
||||
dt = daemon_failure_time.setdefault(name, (daemon, now))
|
||||
assert dt[0] is daemon
|
||||
delta = now-dt[1]
|
||||
self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
|
||||
if delta > daemon_timeout:
|
||||
bark = True
|
||||
|
||||
# If a daemon is no longer failed, remove it from tracking:
|
||||
for name in daemon_failure_time.keys():
|
||||
if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
|
||||
self.log("daemon {name} has been restored".format(name=name))
|
||||
del daemon_failure_time[name]
|
||||
|
||||
for thrasher in self.thrashers:
|
||||
if thrasher.e is not None:
|
||||
self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
|
||||
bark = True
|
||||
|
||||
if bark:
|
||||
self.bark()
|
||||
return
|
||||
|
||||
sleep(5)
|
||||
|
||||
self.log("watchdog finished")
|
||||
|
||||
class MDSThrasher(Greenlet):
|
||||
"""
|
||||
@ -45,11 +151,15 @@ class MDSThrasher(Greenlet):
|
||||
thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
|
||||
during replay. Value should be between 0.0 and 1.0.
|
||||
|
||||
thrash_max_mds: [default: 0.25] likelihood that the max_mds of the mds
|
||||
thrash_max_mds: [default: 0.0] likelihood that the max_mds of the mds
|
||||
cluster will be modified to a value [1, current) or (current, starting
|
||||
max_mds]. When reduced, randomly selected MDSs other than rank 0 will be
|
||||
deactivated to reach the new max_mds. Value should be between 0.0 and 1.0.
|
||||
|
||||
thrash_while_stopping: [default: false] thrash an MDS while there
|
||||
are MDS in up:stopping (because max_mds was changed and some
|
||||
MDS were deactivated).
|
||||
|
||||
thrash_weights: allows specific MDSs to be thrashed more/less frequently.
|
||||
This option overrides anything specified by max_thrash. This option is a
|
||||
dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b:
|
||||
@ -92,21 +202,21 @@ class MDSThrasher(Greenlet):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, ctx, manager, config, logger, fs, max_mds):
|
||||
super(MDSThrasher, self).__init__()
|
||||
def __init__(self, ctx, manager, config, fs, max_mds):
|
||||
Greenlet.__init__(self)
|
||||
|
||||
self.ctx = ctx
|
||||
self.manager = manager
|
||||
assert self.manager.is_clean()
|
||||
self.config = config
|
||||
self.logger = logger
|
||||
self.ctx = ctx
|
||||
self.e = None
|
||||
self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
|
||||
self.fs = fs
|
||||
self.manager = manager
|
||||
self.max_mds = max_mds
|
||||
|
||||
self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
|
||||
self.stopping = Event()
|
||||
|
||||
self.randomize = bool(self.config.get('randomize', True))
|
||||
self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.25))
|
||||
self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.0))
|
||||
self.max_thrash = int(self.config.get('max_thrash', 1))
|
||||
self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
|
||||
self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
|
||||
@ -118,11 +228,23 @@ class MDSThrasher(Greenlet):
|
||||
def _run(self):
|
||||
try:
|
||||
self.do_thrash()
|
||||
except:
|
||||
# Log exceptions here so we get the full backtrace (it's lost
|
||||
# by the time someone does a .get() on this greenlet)
|
||||
self.logger.exception("Exception in do_thrash:")
|
||||
raise
|
||||
except Exception as e:
|
||||
# Log exceptions here so we get the full backtrace (gevent loses them).
|
||||
# Also allow succesful completion as gevent exception handling is a broken mess:
|
||||
#
|
||||
# 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
|
||||
# File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
|
||||
# self.print_exception(context, type, value, tb)
|
||||
# File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception
|
||||
# traceback.print_exception(type, value, tb, file=errstream)
|
||||
# File "/usr/lib/python2.7/traceback.py", line 124, in print_exception
|
||||
# _print(file, 'Traceback (most recent call last):')
|
||||
# File "/usr/lib/python2.7/traceback.py", line 13, in _print
|
||||
# file.write(str+terminator)
|
||||
# 2017-02-03T14:34:01.261 CRITICAL:root:IOError
|
||||
self.e = e
|
||||
self.logger.exception("exception:")
|
||||
# allow successful completion so gevent doesn't see an exception...
|
||||
|
||||
def log(self, x):
|
||||
"""Write data to logger assigned to this MDThrasher"""
|
||||
@ -168,29 +290,40 @@ class MDSThrasher(Greenlet):
|
||||
|
||||
def wait_for_stable(self, rank = None, gid = None):
|
||||
self.log('waiting for mds cluster to stabilize...')
|
||||
status = self.fs.status()
|
||||
itercount = 0
|
||||
while True:
|
||||
max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
|
||||
if rank is not None:
|
||||
try:
|
||||
info = status.get_rank(self.fs.id, rank)
|
||||
if info['gid'] != gid:
|
||||
self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
|
||||
return status, info['name']
|
||||
except:
|
||||
pass # no rank present
|
||||
else:
|
||||
ranks = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, list(status.get_ranks(self.fs.id)))
|
||||
count = len(ranks)
|
||||
if count >= max_mds:
|
||||
self.log('mds cluster has {count} alive and active, now stable!'.format(count = count))
|
||||
return status, None
|
||||
itercount = itercount + 1
|
||||
if itercount > 10:
|
||||
self.log('mds map: {status}'.format(status=self.fs.status()))
|
||||
time.sleep(2)
|
||||
for itercount in itertools.count():
|
||||
status = self.fs.status()
|
||||
max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
|
||||
ranks = list(status.get_ranks(self.fs.id))
|
||||
stopping = filter(lambda info: "up:stopping" == info['state'], ranks)
|
||||
actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks)
|
||||
|
||||
if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0:
|
||||
if itercount % 5 == 0:
|
||||
self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
|
||||
else:
|
||||
if rank is not None:
|
||||
try:
|
||||
info = status.get_rank(self.fs.id, rank)
|
||||
if info['gid'] != gid and "up:active" == info['state']:
|
||||
self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
|
||||
return status
|
||||
except:
|
||||
pass # no rank present
|
||||
if len(actives) >= max_mds:
|
||||
# no replacement can occur!
|
||||
self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank))
|
||||
return status
|
||||
else:
|
||||
if len(actives) >= max_mds:
|
||||
self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives)))
|
||||
return status, None
|
||||
if itercount > 300/2: # 5 minutes
|
||||
raise RuntimeError('timeout waiting for cluster to stabilize')
|
||||
elif itercount % 5 == 0:
|
||||
self.log('mds map: {status}'.format(status=status))
|
||||
else:
|
||||
self.log('no change')
|
||||
sleep(2)
|
||||
|
||||
def do_thrash(self):
|
||||
"""
|
||||
@ -217,7 +350,7 @@ class MDSThrasher(Greenlet):
|
||||
|
||||
status = self.fs.status()
|
||||
|
||||
if random.randrange(0.0, 1.0) <= self.thrash_max_mds:
|
||||
if random.random() <= self.thrash_max_mds:
|
||||
max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
|
||||
options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
|
||||
if len(options) > 0:
|
||||
@ -228,8 +361,11 @@ class MDSThrasher(Greenlet):
|
||||
stats['max_mds'] += 1
|
||||
|
||||
# Now randomly deactivate mds if we shrank
|
||||
for rank in random.sample(range(1, max_mds), max(0, max_mds-new_max_mds)):
|
||||
self.fs.deactivate(rank)
|
||||
# TODO: it's desirable to deactivate in order. Make config to do random.
|
||||
targets = filter(lambda r: r['rank'] > 0, status.get_ranks(self.fs.id)) # can't deactivate 0
|
||||
for target in random.sample(targets, max(0, max_mds-new_max_mds)):
|
||||
self.log("deactivating rank %d" % target['rank'])
|
||||
self.fs.deactivate(target['rank'])
|
||||
stats['deactivate'] += 1
|
||||
|
||||
status = self.wait_for_stable()[0]
|
||||
@ -278,7 +414,7 @@ class MDSThrasher(Greenlet):
|
||||
itercount = itercount + 1
|
||||
if itercount > 10:
|
||||
self.log('mds map: {status}'.format(status=status))
|
||||
time.sleep(2)
|
||||
sleep(2)
|
||||
|
||||
if last_laggy_since:
|
||||
self.log(
|
||||
@ -287,8 +423,7 @@ class MDSThrasher(Greenlet):
|
||||
self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
|
||||
|
||||
# wait for a standby mds to takeover and become active
|
||||
status, takeover_mds = self.wait_for_stable(rank, gid)
|
||||
self.log('New active mds is mds.{_id}'.format(_id=takeover_mds))
|
||||
status = self.wait_for_stable(rank, gid)
|
||||
|
||||
# wait for a while before restarting old active to become new
|
||||
# standby
|
||||
@ -298,20 +433,22 @@ class MDSThrasher(Greenlet):
|
||||
|
||||
self.log('waiting for {delay} secs before reviving {label}'.format(
|
||||
delay=delay, label=label))
|
||||
time.sleep(delay)
|
||||
sleep(delay)
|
||||
|
||||
self.log('reviving {label}'.format(label=label))
|
||||
self.revive_mds(name)
|
||||
|
||||
while True:
|
||||
for itercount in itertools.count():
|
||||
if itercount > 300/2: # 5 minutes
|
||||
raise RuntimeError('timeout waiting for MDS to revive')
|
||||
status = self.fs.status()
|
||||
info = status.get_mds(name)
|
||||
if info and info['state'] in ('up:standby', 'up:standby-replay'):
|
||||
if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'):
|
||||
self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
|
||||
break
|
||||
self.log(
|
||||
'waiting till mds map indicates {label} is in standby or standby-replay'.format(label=label))
|
||||
time.sleep(2)
|
||||
'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label))
|
||||
sleep(2)
|
||||
|
||||
for stat in stats:
|
||||
self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
|
||||
@ -323,7 +460,7 @@ class MDSThrasher(Greenlet):
|
||||
# delay = self.max_replay_thrash_delay
|
||||
# if self.randomize:
|
||||
# delay = random.randrange(0.0, self.max_replay_thrash_delay)
|
||||
# time.sleep(delay)
|
||||
# sleep(delay)
|
||||
# self.log('kill replaying mds.{id}'.format(id=self.to_kill))
|
||||
# self.kill_mds(self.to_kill)
|
||||
#
|
||||
@ -333,7 +470,7 @@ class MDSThrasher(Greenlet):
|
||||
#
|
||||
# self.log('waiting for {delay} secs before reviving mds.{id}'.format(
|
||||
# delay=delay, id=self.to_kill))
|
||||
# time.sleep(delay)
|
||||
# sleep(delay)
|
||||
#
|
||||
# self.log('revive mds.{id}'.format(id=self.to_kill))
|
||||
# self.revive_mds(self.to_kill)
|
||||
@ -384,31 +521,33 @@ def task(ctx, config):
|
||||
break
|
||||
if steady:
|
||||
break
|
||||
time.sleep(2)
|
||||
sleep(2)
|
||||
status = mds_cluster.status()
|
||||
log.info('Ready to start thrashing')
|
||||
|
||||
thrashers = []
|
||||
|
||||
watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
|
||||
watchdog.start()
|
||||
|
||||
manager.wait_for_clean()
|
||||
thrashers = {}
|
||||
assert manager.is_clean()
|
||||
for fs in status.get_filesystems():
|
||||
name = fs['mdsmap']['fs_name']
|
||||
log.info('Running thrasher against FS {f}'.format(f = name))
|
||||
thrasher = MDSThrasher(
|
||||
ctx, manager, config,
|
||||
log.getChild('fs.[{f}]'.format(f = name)),
|
||||
Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds']
|
||||
)
|
||||
thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
|
||||
thrasher.start()
|
||||
thrashers[name] = thrasher
|
||||
thrashers.append(thrasher)
|
||||
|
||||
try:
|
||||
log.debug('Yielding')
|
||||
yield
|
||||
finally:
|
||||
log.info('joining mds_thrashers')
|
||||
for name in thrashers:
|
||||
log.info('join thrasher mds_thrasher.fs.[{f}]'.format(f=name))
|
||||
thrashers[name].stop()
|
||||
thrashers[name].get() # Raise any exception from _run()
|
||||
thrashers[name].join()
|
||||
for thrasher in thrashers:
|
||||
thrasher.stop()
|
||||
if thrasher.e:
|
||||
raise RuntimeError('error during thrashing')
|
||||
thrasher.join()
|
||||
log.info('done joining')
|
||||
|
||||
watchdog.stop()
|
||||
watchdog.join()
|
||||
|
Loading…
Reference in New Issue
Block a user