from cStringIO import StringIO import random import time import re import gevent import json import threading from teuthology import misc as teuthology from ..orchestra import run class Thrasher: def __init__(self, manager, config, logger=None): self.ceph_manager = manager self.ceph_manager.wait_for_clean() osd_status = self.ceph_manager.get_osd_status() self.in_osds = osd_status['in'] self.live_osds = osd_status['live'] self.out_osds = osd_status['out'] self.dead_osds = osd_status['dead'] self.stopping = False self.logger = logger self.config = config num_osds = self.in_osds + self.out_osds self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds if self.logger is not None: self.log = lambda x: self.logger.info(x) else: def tmp(x): print x self.log = tmp if self.config is None: self.config = dict() # prevent monitor from auto-marking things out while thrasher runs manager.raw_cluster_cmd('mon', 'tell', '*', 'injectargs', '--mon-osd-down-out-interval', '0') self.thread = gevent.spawn(self.do_thrash) def kill_osd(self, osd=None): if osd is None: osd = random.choice(self.live_osds) self.log("Killing osd %s, live_osds are %s" % (str(osd),str(self.live_osds))) self.live_osds.remove(osd) self.dead_osds.append(osd) self.ceph_manager.kill_osd(osd) def blackhole_kill_osd(self, osd=None): if osd is None: osd = random.choice(self.live_osds) self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd),str(self.live_osds))) self.live_osds.remove(osd) self.dead_osds.append(osd) self.ceph_manager.blackhole_kill_osd(osd) def revive_osd(self, osd=None): if osd is None: osd = random.choice(self.dead_osds) self.log("Reviving osd %s" % (str(osd),)) self.live_osds.append(osd) self.dead_osds.remove(osd) self.ceph_manager.revive_osd(osd) def out_osd(self, osd=None): if osd is None: osd = random.choice(self.in_osds) self.log("Removing osd %s, in_osds are: %s" % (str(osd),str(self.in_osds))) self.ceph_manager.mark_out_osd(osd) self.in_osds.remove(osd) self.out_osds.append(osd) def in_osd(self, osd=None): if osd is None: osd = random.choice(self.out_osds) if osd in self.dead_osds: return self.revive_osd(osd) self.log("Adding osd %s" % (str(osd),)) self.out_osds.remove(osd) self.in_osds.append(osd) self.ceph_manager.mark_in_osd(osd) self.log("Added osd %s"%(str(osd),)) def all_up(self): while len(self.dead_osds) > 0: self.log("reviving osd") self.revive_osd() while len(self.out_osds) > 0: self.log("inning osd") self.in_osd() def do_join(self): self.stopping = True self.thread.get() def grow_pool(self): pool = self.ceph_manager.get_pool() self.log("Growing pool %s"%(pool,)) self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs) def fix_pgp_num(self): pool = self.ceph_manager.get_pool() self.log("fixing pg num pool %s"%(pool,)) self.ceph_manager.set_pool_pgpnum(pool) def test_pool_min_size(self): self.log("test_pool_min_size") self.all_up() self.ceph_manager.wait_for_recovery( timeout=self.config.get('timeout') ) the_one = random.choice(self.in_osds) self.log("Killing everyone but %s", the_one) to_kill = filter(lambda x: x != the_one, self.in_osds) [self.kill_osd(i) for i in to_kill] [self.out_osd(i) for i in to_kill] time.sleep(self.config.get("test_pool_min_size_time", 10)) self.log("Killing %s" % (the_one,)) self.kill_osd(the_one) self.out_osd(the_one) self.log("Reviving everyone but %s" % (the_one,)) [self.revive_osd(i) for i in to_kill] [self.in_osd(i) for i in to_kill] self.log("Revived everyone but %s" % (the_one,)) self.log("Waiting for clean") self.ceph_manager.wait_for_recovery( timeout=self.config.get('timeout') ) def inject_pause(self, conf_key, duration, check_after, should_be_down): the_one = random.choice(self.live_osds) self.log("inject_pause on {osd}".format(osd = the_one)) self.log( "Testing {key} pause injection for duration {duration}".format( key = conf_key, duration = duration )) self.log( "Checking after {after}, should_be_down={shouldbedown}".format( after = check_after, shouldbedown = should_be_down )) self.ceph_manager.set_config(the_one, **{conf_key:duration}) if not should_be_down: return time.sleep(check_after) status = self.ceph_manager.get_osd_status() assert the_one in status['down'] time.sleep(duration - check_after + 20) status = self.ceph_manager.get_osd_status() assert not the_one in status['down'] def choose_action(self): chance_down = self.config.get('chance_down', 0.4) chance_test_min_size = self.config.get('chance_test_min_size', 0) if isinstance(chance_down, int): chance_down = float(chance_down) / 100 minin = self.config.get("min_in", 2) minout = self.config.get("min_out", 0) minlive = self.config.get("min_live", 2) mindead = self.config.get("min_dead", 0) self.log('choose_action: min_in %d min_out %d min_live %d min_dead %d' % (minin, minout, minlive, mindead)) actions = [] if len(self.in_osds) > minin: actions.append((self.out_osd, 1.0,)) if len(self.live_osds) > minlive and chance_down > 0: actions.append((self.kill_osd, chance_down,)) if len(self.out_osds) > minout: actions.append((self.in_osd, 1.7,)) if len(self.dead_osds) > mindead: actions.append((self.revive_osd, 1.0,)) actions.append((self.grow_pool, self.config.get('chance_pgnum_grow', 0),)) actions.append((self.fix_pgp_num, self.config.get('chance_pgpnum_fix', 0),)) actions.append((self.test_pool_min_size, chance_test_min_size,)) for key in ['heartbeat_inject_failure', 'filestore_inject_stall']: for scenario in [ (lambda: self.inject_pause(key, self.config.get('pause_short', 3), 0, False), self.config.get('chance_inject_pause_short', 1),), (lambda: self.inject_pause(key, self.config.get('pause_long', 80), self.config.get('pause_check_after', 70), True), self.config.get('chance_inject_pause_long', 0),)]: actions.append(scenario) total = sum([y for (x,y) in actions]) val = random.uniform(0, total) for (action, prob) in actions: if val < prob: return action val -= prob return None def do_thrash(self): cleanint = self.config.get("clean_interval", 60) maxdead = self.config.get("max_dead", 0); delay = self.config.get("op_delay", 5) self.log("starting do_thrash") while not self.stopping: self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds, "dead_osds: ", self.dead_osds, "live_osds: ", self.live_osds]])) if random.uniform(0,1) < (float(delay) / cleanint): while len(self.dead_osds) > maxdead: self.revive_osd() self.ceph_manager.wait_for_recovery( timeout=self.config.get('timeout') ) self.choose_action()() time.sleep(delay) self.all_up() class CephManager: def __init__(self, controller, ctx=None, logger=None): self.lock = threading.RLock() self.ctx = ctx self.controller = controller if (logger): self.log = lambda x: logger.info(x) else: def tmp(x): print x self.log = tmp self.pools = {} self.pools['data'] = self.get_pool_property('data', 'pg_num') def raw_cluster_cmd(self, *args): ceph_args = [ 'LD_LIBRARY_PRELOAD=/tmp/cephtest/binary/usr/local/lib', '/tmp/cephtest/enable-coredump', '/tmp/cephtest/binary/usr/local/bin/ceph-coverage', '/tmp/cephtest/archive/coverage', '/tmp/cephtest/binary/usr/local/bin/ceph', '-k', '/tmp/cephtest/ceph.keyring', '-c', '/tmp/cephtest/ceph.conf', '--concise', ] ceph_args.extend(args) proc = self.controller.run( args=ceph_args, stdout=StringIO(), ) return proc.stdout.getvalue() def do_rados(self, remote, cmd): pre = [ 'LD_LIBRARY_PATH=/tmp/cephtest/binary/usr/local/lib', '/tmp/cephtest/enable-coredump', '/tmp/cephtest/binary/usr/local/bin/ceph-coverage', '/tmp/cephtest/archive/coverage', '/tmp/cephtest/binary/usr/local/bin/rados', '-c', '/tmp/cephtest/ceph.conf', ]; pre.extend(cmd) proc = remote.run( args=pre, wait=True, ) return proc def osd_admin_socket(self, osdnum, command): remote = None for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems(): for id_ in teuthology.roles_of_type(roles_for_host, 'osd'): if int(id_) == osdnum: remote = _remote assert remote is not None args=[ 'LD_LIBRARY_PRELOAD=/tmp/cephtest/binary/usr/local/lib', '/tmp/cephtest/enable-coredump', '/tmp/cephtest/binary/usr/local/bin/ceph-coverage', '/tmp/cephtest/archive/coverage', '/tmp/cephtest/binary/usr/local/bin/ceph', '-k', '/tmp/cephtest/ceph.keyring', '-c', '/tmp/cephtest/ceph.conf', '--admin-daemon', "/tmp/cephtest/asok.osd.%s"%(str(osdnum),)] args.extend(command) return remote.run( args=args, stdout=StringIO(), wait=True, ) def get_pg_primary(self, pool, pgnum): """ get primary for pool, pgnum (e.g. (data, 0)->0 """ poolnum = self.get_pool_num(pool) output = self.raw_cluster_cmd("pg", "dump", '--format=json') j = json.loads('\n'.join(output.split('\n')[1:])) pg_str = "%d.%d" % (poolnum, pgnum) for pg in j['pg_stats']: if pg['pgid'] == pg_str: return int(pg['acting'][0]) assert False def get_pool_num(self, pool): """ get number for pool (e.g., data -> 2) """ out = self.raw_cluster_cmd('--', 'osd','dump','--format=json') j = json.loads('\n'.join(out.split('\n')[1:])) for i in j['pools']: if i['pool_name'] == pool: return int(i['pool']) assert False def set_config(self, osdnum, **argdict): return self.raw_cluster_cmd( 'tell', "osd.%d" % (int(osdnum),), 'injectargs', " ".join( [("--" + conf.replace("_", "-") + " " + str(val)) for (conf,val) in argdict.iteritems()])) def raw_cluster_status(self): return self.raw_cluster_cmd('-s') def raw_osd_status(self): return self.raw_cluster_cmd('osd', 'dump') def get_osd_status(self): osd_lines = filter( lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)), self.raw_osd_status().split('\n')) self.log(osd_lines) in_osds = [int(i[4:].split()[0]) for i in filter( lambda x: " in " in x, osd_lines)] out_osds = [int(i[4:].split()[0]) for i in filter( lambda x: " out " in x, osd_lines)] up_osds = [int(i[4:].split()[0]) for i in filter( lambda x: " up " in x, osd_lines)] down_osds = [int(i[4:].split()[0]) for i in filter( lambda x: " down " in x, osd_lines)] dead_osds = [int(x.id_) for x in filter(lambda x: not x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))] live_osds = [int(x.id_) for x in filter(lambda x: x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))] return { 'in' : in_osds, 'out' : out_osds, 'up' : up_osds, 'down' : down_osds, 'dead' : dead_osds, 'live' : live_osds, 'raw' : osd_lines } def get_num_pgs(self): status = self.raw_cluster_status() self.log(status) return int(re.search( "\d* pgs:", status).group(0).split()[0]) def create_pool(self, pool_name, pg_num=1): with self.lock: assert isinstance(pool_name, str) assert isinstance(pg_num, int) assert pool_name not in self.pools self.log("creating pool_name %s"%(pool_name,)) self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num)) self.pools[pool_name] = pg_num def remove_pool(self, pool_name): with self.lock: assert isinstance(pool_name, str) assert pool_name in self.pools self.log("creating pool_name %s"%(pool_name,)) del self.pools[pool_name] self.do_rados( self.controller, ['rmpool', pool_name, pool_name, "--yes-i-really-really-mean-it"] ) def get_pool(self): with self.lock: return random.choice(self.pools.keys()); def get_pool_pg_num(self, pool_name): with self.lock: assert isinstance(pool_name, str) if pool_name in self.pools: return self.pools[pool_name] return 0; def get_pool_property(self, pool_name, prop): with self.lock: assert isinstance(pool_name, str) assert isinstance(prop, str) output = self.raw_cluster_cmd( 'osd', 'pool', 'get', pool_name, prop) return int(output.split()[1]) def set_pool_property(self, pool_name, prop, val): with self.lock: assert isinstance(pool_name, str) assert isinstance(prop, str) assert isinstance(val, int) self.raw_cluster_cmd( 'osd', 'pool', 'set', pool_name, prop, str(val), '--allow-experimental-feature') def expand_pool(self, pool_name, by, max_pgs): with self.lock: assert isinstance(pool_name, str) assert isinstance(by, int) assert pool_name in self.pools if self.get_num_creating() > 0: return if (self.pools[pool_name] + by) > max_pgs: return self.log("increase pool size by %d"%(by,)) new_pg_num = self.pools[pool_name] + by self.set_pool_property(pool_name, "pg_num", new_pg_num) self.pools[pool_name] = new_pg_num def set_pool_pgpnum(self, pool_name): with self.lock: assert isinstance(pool_name, str) assert pool_name in self.pools if self.get_num_creating() > 0: return self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name]) def list_pg_missing(self, pgid): r = None offset = {} while True: out = self.raw_cluster_cmd('--', 'pg',pgid,'list_missing', json.dumps(offset)) j = json.loads('\n'.join(out.split('\n')[1:])) if r is None: r = j else: r['objects'].extend(j['objects']) if not 'more' in j: break if j['more'] == 0: break offset = j['objects'][-1]['oid'] if 'more' in r: del r['more'] return r def get_pg_stats(self): out = self.raw_cluster_cmd('--', 'pg','dump','--format=json') j = json.loads('\n'.join(out.split('\n')[1:])) return j['pg_stats'] def get_single_pg_stats(self, pgid): all_stats = self.get_pg_stats() for pg in all_stats: if pg['pgid'] == pgid: return pg return None def get_osd_dump(self): out = self.raw_cluster_cmd('--', 'osd','dump','--format=json') j = json.loads('\n'.join(out.split('\n')[1:])) return j['osds'] def get_stuck_pgs(self, type_, threshold): out = self.raw_cluster_cmd('--', 'pg','dump_stuck', type_, '--format=json', '-t', str(threshold)) return json.loads('\n'.join(out.split('\n')[1:])) def get_num_unfound_objects(self): status = self.raw_cluster_status() self.log(status) match = re.search( "\d+/\d+ unfound", status) if match == None: return 0 else: return int(match.group(0).split('/')[0]) def get_num_creating(self): pgs = self.get_pg_stats() num = 0 for pg in pgs: if 'creating' in pg['state']: num += 1 return num def get_num_active_clean(self): pgs = self.get_pg_stats() num = 0 for pg in pgs: if pg['state'].count('active') and pg['state'].count('clean') and not pg['state'].count('stale'): num += 1 return num def get_num_active_recovered(self): pgs = self.get_pg_stats() num = 0 for pg in pgs: if pg['state'].count('active') and not pg['state'].count('recover') and not pg['state'].count('backfill') and not pg['state'].count('stale'): num += 1 return num def get_num_active(self): pgs = self.get_pg_stats() num = 0 for pg in pgs: if pg['state'].count('active') and not pg['state'].count('stale'): num += 1 return num def get_num_down(self): pgs = self.get_pg_stats() num = 0 for pg in pgs: if (pg['state'].count('down') and not pg['state'].count('stale')) or \ (pg['state'].count('incomplete') and not pg['state'].count('stale')): num += 1 return num def get_num_active_down(self): pgs = self.get_pg_stats() num = 0 for pg in pgs: if (pg['state'].count('active') and not pg['state'].count('stale')) or \ (pg['state'].count('down') and not pg['state'].count('stale')) or \ (pg['state'].count('incomplete') and not pg['state'].count('stale')): num += 1 return num def is_clean(self): return self.get_num_active_clean() == self.get_num_pgs() def is_recovered(self): return self.get_num_active_recovered() == self.get_num_pgs() def is_active_or_down(self): return self.get_num_active_down() == self.get_num_pgs() def wait_for_clean(self, timeout=None): self.log("waiting for clean") start = time.time() num_active_clean = self.get_num_active_clean() while not self.is_clean(): if timeout is not None: assert time.time() - start < timeout, \ 'failed to become clean before timeout expired' cur_active_clean = self.get_num_active_clean() if cur_active_clean != num_active_clean: start = time.time() num_active_clean = cur_active_clean time.sleep(3) self.log("clean!") def wait_for_recovery(self, timeout=None): self.log("waiting for recovery to complete") start = time.time() num_active_recovered = self.get_num_active_recovered() while not self.is_recovered(): if timeout is not None: assert time.time() - start < timeout, \ 'failed to recover before timeout expired' cur_active_recovered = self.get_num_active_recovered() if cur_active_recovered != num_active_recovered: start = time.time() num_active_recovered = cur_active_recovered time.sleep(3) self.log("recovered!") def wait_for_active(self, timeout=None): self.log("waiting for peering to complete") start = time.time() num_active = self.get_num_active() while not self.is_active(): if timeout is not None: assert time.time() - start < timeout, \ 'failed to recover before timeout expired' cur_active = self.get_num_active() if cur_active != num_active: start = time.time() num_active = cur_active time.sleep(3) self.log("active!") def wait_for_active_or_down(self, timeout=None): self.log("waiting for peering to complete or become blocked") start = time.time() num_active_down = self.get_num_active_down() while not self.is_active_or_down(): if timeout is not None: assert time.time() - start < timeout, \ 'failed to recover before timeout expired' cur_active_down = self.get_num_active_down() if cur_active_down != num_active_down: start = time.time() num_active_down = cur_active_down time.sleep(3) self.log("active or down!") def osd_is_up(self, osd): osds = self.get_osd_dump() return osds[osd]['up'] > 0 def wait_till_osd_is_up(self, osd, timeout=None): self.log('waiting for osd.%d to be up' % osd); start = time.time() while not self.osd_is_up(osd): if timeout is not None: assert time.time() - start < timeout, \ 'osd.%d failed to come up before timeout expired' % osd time.sleep(3) self.log('osd.%d is up' % osd) def is_active(self): return self.get_num_active() == self.get_num_pgs() def wait_till_active(self, timeout=None): self.log("waiting till active") start = time.time() while not self.is_active(): if timeout is not None: assert time.time() - start < timeout, \ 'failed to become active before timeout expired' time.sleep(3) self.log("active!") def mark_out_osd(self, osd): self.raw_cluster_cmd('osd', 'out', str(osd)) def kill_osd(self, osd): self.ctx.daemons.get_daemon('osd', osd).stop() def blackhole_kill_osd(self, osd): self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd, 'injectargs', '--filestore-blackhole') time.sleep(2) self.ctx.daemons.get_daemon('osd', osd).stop() def revive_osd(self, osd): self.ctx.daemons.get_daemon('osd', osd).restart() def mark_down_osd(self, osd): self.raw_cluster_cmd('osd', 'down', str(osd)) def mark_in_osd(self, osd): self.raw_cluster_cmd('osd', 'in', str(osd)) ## monitors def kill_mon(self, mon): self.ctx.daemons.get_daemon('mon', mon).stop() def revive_mon(self, mon): self.ctx.daemons.get_daemon('mon', mon).restart() def get_mon_status(self, mon): addr = self.ctx.ceph.conf['mon.%s' % mon]['mon addr'] out = self.raw_cluster_cmd('-m', addr, 'mon_status') return json.loads(out) def get_mon_quorum(self): out = self.raw_cluster_cmd('quorum_status') j = json.loads(out) self.log('quorum_status is %s' % out) return j['quorum'] def wait_for_mon_quorum_size(self, size, timeout=300): self.log('waiting for quorum size %d' % size) start = time.time() while not len(self.get_mon_quorum()) == size: if timeout is not None: assert time.time() - start < timeout, \ 'failed to reach quorum size %d before timeout expired' % size time.sleep(3) self.log("quorum is size %d" % size) def get_mon_health(self, debug=False): out = self.raw_cluster_cmd('health', '--format=json') if debug: self.log('health:\n{h}'.format(h=out)) return json.loads(out) ## metadata servers def kill_mds(self, mds): self.ctx.daemons.get_daemon('mds', mds).stop() def kill_mds_by_rank(self, rank): status = self.get_mds_status_by_rank(rank) self.ctx.daemons.get_daemon('mds', status['name']).stop() def revive_mds(self, mds, standby_for_rank=None): args = [] if standby_for_rank: args.extend(['--hot-standby', standby_for_rank]) self.ctx.daemons.get_daemon('mds', mds).restart(*args) def revive_mds_by_rank(self, rank, standby_for_rank=None): args = [] if standby_for_rank: args.extend(['--hot-standby', standby_for_rank]) status = self.get_mds_status_by_rank(rank) self.ctx.daemons.get_daemon('mds', status['name']).restart(*args) def get_mds_status(self, mds): out = self.raw_cluster_cmd('mds', 'dump', '--format=json') j = json.loads(' '.join(out.splitlines()[1:])) # collate; for dup ids, larger gid wins. for info in j['info'].itervalues(): if info['name'] == mds: return info return None def get_mds_status_by_rank(self, rank): out = self.raw_cluster_cmd('mds', 'dump', '--format=json') j = json.loads(' '.join(out.splitlines()[1:])) # collate; for dup ids, larger gid wins. for info in j['info'].itervalues(): if info['rank'] == rank: return info return None def get_mds_status_all(self): out = self.raw_cluster_cmd('mds', 'dump', '--format=json') j = json.loads(' '.join(out.splitlines()[1:])) return j