mirror of
https://github.com/ceph/ceph
synced 2024-12-26 05:25:09 +00:00
84cd4ed6c3
We need to wait for peering to either complete, or block because it is waiting for another PG. _Then_ look at all the PG states and compare the mon values with what we get from qeurying the OSDs directly.
410 lines
15 KiB
Python
410 lines
15 KiB
Python
from cStringIO import StringIO
|
|
import random
|
|
import time
|
|
import re
|
|
import gevent
|
|
import json
|
|
|
|
class Thrasher(gevent.Greenlet):
|
|
def __init__(self, manager, config, logger=None):
|
|
self.ceph_manager = manager
|
|
self.ceph_manager.wait_for_clean()
|
|
osd_status = self.ceph_manager.get_osd_status()
|
|
self.in_osds = osd_status['in']
|
|
self.live_osds = osd_status['live']
|
|
self.out_osds = osd_status['out']
|
|
self.dead_osds = osd_status['dead']
|
|
self.stopping = False
|
|
self.logger = logger
|
|
self.config = config
|
|
if self.logger is not None:
|
|
self.log = lambda x: self.logger.info(x)
|
|
else:
|
|
def tmp(x):
|
|
print x
|
|
self.log = tmp
|
|
if self.config is None:
|
|
self.config = dict()
|
|
# prevent monitor from auto-marking things out while thrasher runs
|
|
manager.raw_cluster_cmd('mon', 'tell', '*', 'injectargs',
|
|
'--mon-osd-down-out-interval', '0')
|
|
gevent.Greenlet.__init__(self, self.do_thrash)
|
|
self.start()
|
|
|
|
def kill_osd(self, osd=None):
|
|
if osd is None:
|
|
osd = random.choice(self.live_osds)
|
|
self.log("Killing osd %s, live_osds are %s"%(str(osd),str(self.live_osds)))
|
|
self.live_osds.remove(osd)
|
|
self.dead_osds.append(osd)
|
|
self.ceph_manager.kill_osd(osd)
|
|
|
|
def blackhole_kill_osd(self, osd=None):
|
|
if osd is None:
|
|
osd = random.choice(self.live_osds)
|
|
self.log("Blackholing and then killing osd %s, live_osds are %s"%(str(osd),str(self.live_osds)))
|
|
self.live_osds.remove(osd)
|
|
self.dead_osds.append(osd)
|
|
self.ceph_manager.blackhole_kill_osd(osd)
|
|
|
|
def revive_osd(self, osd=None):
|
|
if osd is None:
|
|
osd = random.choice(self.dead_osds)
|
|
self.log("Reviving osd %s"%(str(osd),))
|
|
self.live_osds.append(osd)
|
|
self.dead_osds.remove(osd)
|
|
self.ceph_manager.revive_osd(osd)
|
|
|
|
def out_osd(self, osd=None):
|
|
if osd is None:
|
|
osd = random.choice(self.in_osds)
|
|
self.log("Removing osd %s, in_osds are: %s"%(str(osd),str(self.in_osds)))
|
|
self.ceph_manager.mark_out_osd(osd)
|
|
self.in_osds.remove(osd)
|
|
self.out_osds.append(osd)
|
|
|
|
def in_osd(self, osd=None):
|
|
if osd is None:
|
|
osd = random.choice(self.out_osds)
|
|
if osd in self.dead_osds:
|
|
return self.revive_osd(osd)
|
|
self.log("Adding osd %s"%(str(osd),))
|
|
self.out_osds.remove(osd)
|
|
self.in_osds.append(osd)
|
|
self.ceph_manager.mark_in_osd(osd)
|
|
|
|
def all_up(self):
|
|
while len(self.dead_osds) > 0:
|
|
self.revive_osd()
|
|
while len(self.out_osds) > 0:
|
|
self.in_osd()
|
|
|
|
def do_join(self):
|
|
self.stopping = True
|
|
self.get()
|
|
|
|
def choose_action(self):
|
|
chance_down = self.config.get("chance_down", 0)
|
|
if isinstance(chance_down, int):
|
|
chance_down = float(chance_down) / 100
|
|
minin = self.config.get("min_in", 2)
|
|
minout = self.config.get("min_out", 0)
|
|
minlive = self.config.get("min_live", 2)
|
|
mindead = self.config.get("min_dead", 0)
|
|
|
|
self.log('choose_action: min_in %d min_out %d min_live %d min_dead %d' %
|
|
(minin,minout,minlive,mindead))
|
|
actions = []
|
|
if len(self.in_osds) > minin:
|
|
actions.append((self.out_osd, 1.0,))
|
|
if len(self.live_osds) > minlive and chance_down > 0:
|
|
actions.append((self.kill_osd, chance_down,))
|
|
if len(self.out_osds) > minout:
|
|
actions.append((self.in_osd, 1.7,))
|
|
if len(self.dead_osds) > mindead:
|
|
actions.append((self.revive_osd, 1.0,))
|
|
|
|
total = sum([y for (x,y) in actions])
|
|
val = random.uniform(0, total)
|
|
for (action, prob) in actions:
|
|
if val < prob:
|
|
return action
|
|
val -= prob
|
|
return None
|
|
|
|
def do_thrash(self):
|
|
cleanint = self.config.get("clean_interval", 60)
|
|
maxdead = self.config.get("max_dead", 0);
|
|
delay = self.config.get("op_delay", 5)
|
|
self.log("starting do_thrash")
|
|
while not self.stopping:
|
|
self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds,
|
|
"dead_osds: ", self.dead_osds, "live_osds: ",
|
|
self.live_osds]]))
|
|
if random.uniform(0,1) < (float(delay) / cleanint):
|
|
while len(self.dead_osds) > maxdead:
|
|
self.revive_osd()
|
|
self.ceph_manager.wait_for_recovery(
|
|
timeout=self.config.get('timeout')
|
|
)
|
|
self.choose_action()()
|
|
time.sleep(delay)
|
|
self.all_up()
|
|
|
|
class CephManager:
|
|
def __init__(self, controller, ctx=None, logger=None):
|
|
self.ctx = ctx
|
|
self.controller = controller
|
|
if (logger):
|
|
self.log = lambda x: logger.info(x)
|
|
else:
|
|
def tmp(x):
|
|
print x
|
|
self.log = tmp
|
|
|
|
def raw_cluster_cmd(self, *args):
|
|
ceph_args = [
|
|
'LD_LIBRARY_PRELOAD=/tmp/cephtest/binary/usr/local/lib',
|
|
'/tmp/cephtest/enable-coredump',
|
|
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
|
|
'/tmp/cephtest/archive/coverage',
|
|
'/tmp/cephtest/binary/usr/local/bin/ceph',
|
|
'-k', '/tmp/cephtest/ceph.keyring',
|
|
'-c', '/tmp/cephtest/ceph.conf',
|
|
'--concise',
|
|
]
|
|
ceph_args.extend(args)
|
|
proc = self.controller.run(
|
|
args=ceph_args,
|
|
stdout=StringIO(),
|
|
)
|
|
return proc.stdout.getvalue()
|
|
|
|
def raw_cluster_status(self):
|
|
return self.raw_cluster_cmd('-s')
|
|
|
|
def raw_osd_status(self):
|
|
return self.raw_cluster_cmd('osd', 'dump')
|
|
|
|
def get_osd_status(self):
|
|
osd_lines = filter(
|
|
lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
|
|
self.raw_osd_status().split('\n'))
|
|
self.log(osd_lines)
|
|
in_osds = [int(i[4:].split()[0]) for i in filter(
|
|
lambda x: " in " in x,
|
|
osd_lines)]
|
|
out_osds = [int(i[4:].split()[0]) for i in filter(
|
|
lambda x: " out " in x,
|
|
osd_lines)]
|
|
up_osds = [int(i[4:].split()[0]) for i in filter(
|
|
lambda x: " up " in x,
|
|
osd_lines)]
|
|
down_osds = [int(i[4:].split()[0]) for i in filter(
|
|
lambda x: " down " in x,
|
|
osd_lines)]
|
|
dead_osds = [int(x.id_) for x in
|
|
filter(lambda x: not x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))]
|
|
live_osds = [int(x.id_) for x in
|
|
filter(lambda x: x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))]
|
|
return { 'in' : in_osds, 'out' : out_osds, 'up' : up_osds,
|
|
'down' : down_osds, 'dead' : dead_osds, 'live' : live_osds, 'raw' : osd_lines }
|
|
|
|
def get_num_pgs(self):
|
|
status = self.raw_cluster_status()
|
|
self.log(status)
|
|
return int(re.search(
|
|
"\d* pgs:",
|
|
status).group(0).split()[0])
|
|
|
|
def list_pg_missing(self, pgid):
|
|
r = None
|
|
offset = {}
|
|
while True:
|
|
out = self.raw_cluster_cmd('--', 'pg',pgid,'list_missing',
|
|
json.dumps(offset))
|
|
j = json.loads('\n'.join(out.split('\n')[1:]))
|
|
if r is None:
|
|
r = j
|
|
else:
|
|
r['objects'].extend(j['objects'])
|
|
if not 'more' in j:
|
|
break
|
|
if j['more'] == 0:
|
|
break
|
|
offset = j['objects'][-1]['oid']
|
|
if 'more' in r:
|
|
del r['more']
|
|
return r
|
|
|
|
def get_pg_stats(self):
|
|
out = self.raw_cluster_cmd('--', 'pg','dump','--format=json')
|
|
j = json.loads('\n'.join(out.split('\n')[1:]))
|
|
return j['pg_stats']
|
|
|
|
def get_osd_dump(self):
|
|
out = self.raw_cluster_cmd('--', 'osd','dump','--format=json')
|
|
j = json.loads('\n'.join(out.split('\n')[1:]))
|
|
return j['osds']
|
|
|
|
def get_stuck_pgs(self, type_, threshold):
|
|
out = self.raw_cluster_cmd('--', 'pg','dump_stuck', type_,
|
|
'--format=json', '-t', str(threshold))
|
|
return json.loads('\n'.join(out.split('\n')[1:]))
|
|
|
|
def get_num_unfound_objects(self):
|
|
status = self.raw_cluster_status()
|
|
self.log(status)
|
|
match = re.search(
|
|
"\d+/\d+ unfound",
|
|
status)
|
|
if match == None:
|
|
return 0
|
|
else:
|
|
return int(match.group(0).split('/')[0])
|
|
|
|
def get_num_active_clean(self):
|
|
pgs = self.get_pg_stats()
|
|
num = 0
|
|
for pg in pgs:
|
|
if pg['state'].count('active') and pg['state'].count('clean') and not pg['state'].count('stale'):
|
|
num += 1
|
|
return num
|
|
|
|
def get_num_active_recovered(self):
|
|
pgs = self.get_pg_stats()
|
|
num = 0
|
|
for pg in pgs:
|
|
if pg['state'].count('active') and not pg['state'].count('recovering') and not pg['state'].count('stale'):
|
|
num += 1
|
|
return num
|
|
|
|
def get_num_active(self):
|
|
pgs = self.get_pg_stats()
|
|
num = 0
|
|
for pg in pgs:
|
|
if pg['state'].count('active') and not pg['state'].count('stale'):
|
|
num += 1
|
|
return num
|
|
|
|
def get_num_active_down(self):
|
|
pgs = self.get_pg_stats()
|
|
num = 0
|
|
for pg in pgs:
|
|
if (pg['state'].count('active') and not pg['state'].count('stale')) or \
|
|
(pg['state'].count('down') and not pg['state'].count('stale')):
|
|
num += 1
|
|
return num
|
|
|
|
def is_clean(self):
|
|
return self.get_num_active_clean() == self.get_num_pgs()
|
|
|
|
def is_recovered(self):
|
|
return self.get_num_active_recovered() == self.get_num_pgs()
|
|
|
|
def is_active_or_down(self):
|
|
return self.get_num_active_down() == self.get_num_pgs()
|
|
|
|
def wait_for_clean(self, timeout=None):
|
|
self.log("waiting for clean")
|
|
start = time.time()
|
|
num_active_clean = self.get_num_active_clean()
|
|
while not self.is_clean():
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'failed to become clean before timeout expired'
|
|
cur_active_clean = self.get_num_active_clean()
|
|
if cur_active_clean != num_active_clean:
|
|
start = time.time()
|
|
num_active_clean = cur_active_clean
|
|
time.sleep(3)
|
|
self.log("clean!")
|
|
|
|
def wait_for_recovery(self, timeout=None):
|
|
self.log("waiting for recovery to complete")
|
|
start = time.time()
|
|
num_active_recovered = self.get_num_active_recovered()
|
|
while not self.is_recovered():
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'failed to recover before timeout expired'
|
|
cur_active_recovered = self.get_num_active_recovered()
|
|
if cur_active_recovered != num_active_recovered:
|
|
start = time.time()
|
|
num_active_recovered = cur_active_recovered
|
|
time.sleep(3)
|
|
self.log("recovered!")
|
|
|
|
def wait_for_active_or_down(self, timeout=None):
|
|
self.log("waiting for peering to complete or become blocked")
|
|
start = time.time()
|
|
num_active_down = self.get_num_active_down()
|
|
while not self.is_active_or_down():
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'failed to recover before timeout expired'
|
|
cur_active_down = self.get_num_active_down()
|
|
if cur_active_down != num_active_down:
|
|
start = time.time()
|
|
num_active_down = cur_active_down
|
|
time.sleep(3)
|
|
self.log("active or down!")
|
|
|
|
def osd_is_up(self, osd):
|
|
osds = self.get_osd_dump()
|
|
return osds[osd]['up'] > 0
|
|
|
|
def wait_till_osd_is_up(self, osd, timeout=None):
|
|
self.log('waiting for osd.%d to be up' % osd);
|
|
start = time.time()
|
|
while not self.osd_is_up(osd):
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'osd.%d failed to come up before timeout expired' % osd
|
|
time.sleep(3)
|
|
self.log('osd.%d is up' % osd)
|
|
|
|
def is_active(self):
|
|
return self.get_num_active() == self.get_num_pgs()
|
|
|
|
def wait_till_active(self, timeout=None):
|
|
self.log("waiting till active")
|
|
start = time.time()
|
|
while not self.is_active():
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'failed to become active before timeout expired'
|
|
time.sleep(3)
|
|
self.log("active!")
|
|
|
|
def mark_out_osd(self, osd):
|
|
self.raw_cluster_cmd('osd', 'out', str(osd))
|
|
|
|
def kill_osd(self, osd):
|
|
self.ctx.daemons.get_daemon('osd', osd).stop()
|
|
|
|
def blackhole_kill_osd(self, osd):
|
|
self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
|
|
'injectargs', '--filestore-blackhole')
|
|
time.sleep(2)
|
|
self.ctx.daemons.get_daemon('osd', osd).stop()
|
|
|
|
def revive_osd(self, osd):
|
|
self.ctx.daemons.get_daemon('osd', osd).restart()
|
|
|
|
def mark_down_osd(self, osd):
|
|
self.raw_cluster_cmd('osd', 'down', str(osd))
|
|
|
|
def mark_in_osd(self, osd):
|
|
self.raw_cluster_cmd('osd', 'in', str(osd))
|
|
|
|
|
|
## monitors
|
|
|
|
def kill_mon(self, mon):
|
|
self.ctx.daemons.get_daemon('mon', mon).stop()
|
|
|
|
def revive_mon(self, mon):
|
|
self.ctx.daemons.get_daemon('mon', mon).restart()
|
|
|
|
def get_mon_status(self, mon):
|
|
addr = self.ctx.ceph.conf['mon.%s' % mon]['mon addr']
|
|
out = self.raw_cluster_cmd('-m', addr, 'mon_status')
|
|
return json.loads(out)
|
|
|
|
def get_mon_quorum(self):
|
|
out = self.raw_cluster_cmd('quorum_status')
|
|
j = json.loads(out)
|
|
self.log('quorum_status is %s' % out)
|
|
return j['quorum']
|
|
|
|
def wait_for_mon_quorum_size(self, size, timeout=300):
|
|
self.log('waiting for quorum size %d' % size)
|
|
start = time.time()
|
|
while not len(self.get_mon_quorum()) == size:
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'failed to reach quorum size %d before timeout expired' % size
|
|
time.sleep(3)
|
|
self.log("quorum is size %d" % size)
|