ceph/teuthology/task/ceph_manager.py

744 lines
27 KiB
Python
Raw Normal View History

from cStringIO import StringIO
import random
import time
import re
import gevent
import json
import threading
from teuthology import misc as teuthology
class Thrasher:
def __init__(self, manager, config, logger=None):
self.ceph_manager = manager
self.ceph_manager.wait_for_clean()
osd_status = self.ceph_manager.get_osd_status()
self.in_osds = osd_status['in']
self.live_osds = osd_status['live']
self.out_osds = osd_status['out']
self.dead_osds = osd_status['dead']
self.stopping = False
self.logger = logger
self.config = config
num_osds = self.in_osds + self.out_osds
self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
2011-09-08 19:54:23 +00:00
if self.logger is not None:
self.log = lambda x: self.logger.info(x)
else:
def tmp(x):
print x
self.log = tmp
if self.config is None:
self.config = dict()
# prevent monitor from auto-marking things out while thrasher runs
manager.raw_cluster_cmd('mon', 'tell', '*', 'injectargs',
'--mon-osd-down-out-interval', '0')
self.thread = gevent.spawn(self.do_thrash)
def kill_osd(self, osd=None):
if osd is None:
osd = random.choice(self.live_osds)
self.log("Killing osd %s, live_osds are %s" % (str(osd),str(self.live_osds)))
self.live_osds.remove(osd)
self.dead_osds.append(osd)
self.ceph_manager.kill_osd(osd)
def blackhole_kill_osd(self, osd=None):
if osd is None:
osd = random.choice(self.live_osds)
self.log("Blackholing and then killing osd %s, live_osds are %s" % (str(osd),str(self.live_osds)))
self.live_osds.remove(osd)
self.dead_osds.append(osd)
self.ceph_manager.blackhole_kill_osd(osd)
def revive_osd(self, osd=None):
if osd is None:
osd = random.choice(self.dead_osds)
self.log("Reviving osd %s" % (str(osd),))
self.live_osds.append(osd)
self.dead_osds.remove(osd)
self.ceph_manager.revive_osd(osd)
def out_osd(self, osd=None):
if osd is None:
osd = random.choice(self.in_osds)
self.log("Removing osd %s, in_osds are: %s" % (str(osd),str(self.in_osds)))
self.ceph_manager.mark_out_osd(osd)
self.in_osds.remove(osd)
self.out_osds.append(osd)
def in_osd(self, osd=None):
if osd is None:
osd = random.choice(self.out_osds)
if osd in self.dead_osds:
return self.revive_osd(osd)
self.log("Adding osd %s" % (str(osd),))
self.out_osds.remove(osd)
self.in_osds.append(osd)
self.ceph_manager.mark_in_osd(osd)
self.log("Added osd %s"%(str(osd),))
def all_up(self):
while len(self.dead_osds) > 0:
self.log("reviving osd")
self.revive_osd()
while len(self.out_osds) > 0:
self.log("inning osd")
self.in_osd()
def do_join(self):
self.stopping = True
self.thread.get()
def grow_pool(self):
pool = self.ceph_manager.get_pool()
self.log("Growing pool %s"%(pool,))
self.ceph_manager.expand_pool(pool, self.config.get('pool_grow_by', 10), self.max_pgs)
def fix_pgp_num(self):
pool = self.ceph_manager.get_pool()
self.log("fixing pg num pool %s"%(pool,))
self.ceph_manager.set_pool_pgpnum(pool)
def test_pool_min_size(self):
self.log("test_pool_min_size")
self.all_up()
self.ceph_manager.wait_for_recovery(
timeout=self.config.get('timeout')
)
the_one = random.choice(self.in_osds)
self.log("Killing everyone but %s", the_one)
to_kill = filter(lambda x: x != the_one, self.in_osds)
[self.kill_osd(i) for i in to_kill]
[self.out_osd(i) for i in to_kill]
time.sleep(self.config.get("test_pool_min_size_time", 10))
self.log("Killing %s" % (the_one,))
self.kill_osd(the_one)
self.out_osd(the_one)
self.log("Reviving everyone but %s" % (the_one,))
[self.revive_osd(i) for i in to_kill]
[self.in_osd(i) for i in to_kill]
self.log("Revived everyone but %s" % (the_one,))
self.log("Waiting for clean")
self.ceph_manager.wait_for_recovery(
timeout=self.config.get('timeout')
)
def inject_pause(self, conf_key, duration, check_after, should_be_down):
the_one = random.choice(self.live_osds)
self.log("inject_pause on {osd}".format(osd = the_one))
self.log(
"Testing {key} pause injection for duration {duration}".format(
key = conf_key,
duration = duration
))
self.log(
"Checking after {after}, should_be_down={shouldbedown}".format(
after = check_after,
shouldbedown = should_be_down
))
self.ceph_manager.set_config(the_one, **{conf_key:duration})
if not should_be_down:
return
time.sleep(check_after)
status = self.ceph_manager.get_osd_status()
assert the_one in status['down']
time.sleep(duration - check_after + 20)
status = self.ceph_manager.get_osd_status()
assert not the_one in status['down']
def choose_action(self):
chance_down = self.config.get('chance_down', 0.4)
chance_test_min_size = self.config.get('chance_test_min_size', 0)
if isinstance(chance_down, int):
chance_down = float(chance_down) / 100
minin = self.config.get("min_in", 2)
minout = self.config.get("min_out", 0)
minlive = self.config.get("min_live", 2)
mindead = self.config.get("min_dead", 0)
self.log('choose_action: min_in %d min_out %d min_live %d min_dead %d' %
(minin, minout, minlive, mindead))
actions = []
if len(self.in_osds) > minin:
actions.append((self.out_osd, 1.0,))
if len(self.live_osds) > minlive and chance_down > 0:
actions.append((self.kill_osd, chance_down,))
if len(self.out_osds) > minout:
actions.append((self.in_osd, 1.7,))
if len(self.dead_osds) > mindead:
actions.append((self.revive_osd, 1.0,))
actions.append((self.grow_pool, self.config.get('chance_pgnum_grow', 0),))
actions.append((self.fix_pgp_num, self.config.get('chance_pgpnum_fix', 0),))
actions.append((self.test_pool_min_size, chance_test_min_size,))
for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
for scenario in [
(lambda: self.inject_pause(key,
self.config.get('pause_short', 3),
0,
False),
self.config.get('chance_inject_pause_short', 1),),
(lambda: self.inject_pause(key,
self.config.get('pause_long', 80),
self.config.get('pause_check_after', 70),
True),
self.config.get('chance_inject_pause_long', 0),)]:
actions.append(scenario)
total = sum([y for (x,y) in actions])
val = random.uniform(0, total)
for (action, prob) in actions:
if val < prob:
return action
val -= prob
return None
def do_thrash(self):
2011-09-08 19:54:23 +00:00
cleanint = self.config.get("clean_interval", 60)
maxdead = self.config.get("max_dead", 0);
2011-09-08 19:54:23 +00:00
delay = self.config.get("op_delay", 5)
self.log("starting do_thrash")
while not self.stopping:
self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds,
"dead_osds: ", self.dead_osds, "live_osds: ",
self.live_osds]]))
2011-09-08 19:54:23 +00:00
if random.uniform(0,1) < (float(delay) / cleanint):
while len(self.dead_osds) > maxdead:
self.revive_osd()
self.ceph_manager.wait_for_recovery(
timeout=self.config.get('timeout')
)
self.choose_action()()
time.sleep(delay)
self.all_up()
class CephManager:
def __init__(self, controller, ctx=None, logger=None):
self.lock = threading.RLock()
self.ctx = ctx
self.controller = controller
if (logger):
self.log = lambda x: logger.info(x)
else:
def tmp(x):
print x
self.log = tmp
self.pools = {}
self.pools['data'] = self.get_pool_property('data', 'pg_num')
def raw_cluster_cmd(self, *args):
ceph_args = [
'LD_LIBRARY_PRELOAD=/tmp/cephtest/binary/usr/local/lib',
'/tmp/cephtest/enable-coredump',
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
'/tmp/cephtest/archive/coverage',
'/tmp/cephtest/binary/usr/local/bin/ceph',
'-k', '/tmp/cephtest/ceph.keyring',
'-c', '/tmp/cephtest/ceph.conf',
'--concise',
]
ceph_args.extend(args)
proc = self.controller.run(
args=ceph_args,
stdout=StringIO(),
)
return proc.stdout.getvalue()
def do_rados(self, remote, cmd):
pre = [
'LD_LIBRARY_PATH=/tmp/cephtest/binary/usr/local/lib',
'/tmp/cephtest/enable-coredump',
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
'/tmp/cephtest/archive/coverage',
'/tmp/cephtest/binary/usr/local/bin/rados',
'-c', '/tmp/cephtest/ceph.conf',
];
pre.extend(cmd)
proc = remote.run(
args=pre,
wait=True,
)
return proc
def osd_admin_socket(self, osdnum, command):
remote = None
for _remote, roles_for_host in self.ctx.cluster.remotes.iteritems():
for id_ in teuthology.roles_of_type(roles_for_host, 'osd'):
if int(id_) == osdnum:
remote = _remote
assert remote is not None
args=[
'LD_LIBRARY_PRELOAD=/tmp/cephtest/binary/usr/local/lib',
'/tmp/cephtest/enable-coredump',
'/tmp/cephtest/binary/usr/local/bin/ceph-coverage',
'/tmp/cephtest/archive/coverage',
'/tmp/cephtest/binary/usr/local/bin/ceph',
'-k', '/tmp/cephtest/ceph.keyring',
'-c', '/tmp/cephtest/ceph.conf',
'--admin-daemon',
"/tmp/cephtest/asok.osd.%s"%(str(osdnum),)]
args.extend(command)
return remote.run(
args=args,
stdout=StringIO(),
wait=True,
)
def get_pg_primary(self, pool, pgnum):
"""
get primary for pool, pgnum (e.g. (data, 0)->0
"""
poolnum = self.get_pool_num(pool)
output = self.raw_cluster_cmd("pg", "dump", '--format=json')
j = json.loads('\n'.join(output.split('\n')[1:]))
pg_str = "%d.%d" % (poolnum, pgnum)
for pg in j['pg_stats']:
if pg['pgid'] == pg_str:
return int(pg['acting'][0])
assert False
def get_pool_num(self, pool):
"""
get number for pool (e.g., data -> 2)
"""
out = self.raw_cluster_cmd('--', 'osd','dump','--format=json')
j = json.loads('\n'.join(out.split('\n')[1:]))
for i in j['pools']:
if i['pool_name'] == pool:
return int(i['pool'])
assert False
def kick_recovery_wq(self, osdnum):
return self.raw_cluster_cmd(
'tell', "osd.%d" % (int(osdnum),),
'debug',
'kick_recovery_wq',
'0')
def set_config(self, osdnum, **argdict):
return self.raw_cluster_cmd(
'tell', "osd.%d" % (int(osdnum),),
'injectargs',
" ".join(
[("--" + conf.replace("_", "-") + " " + str(val)) for (conf,val) in
argdict.iteritems()]))
def raw_cluster_status(self):
return self.raw_cluster_cmd('-s')
2011-07-12 01:00:03 +00:00
def raw_osd_status(self):
return self.raw_cluster_cmd('osd', 'dump')
def get_osd_status(self):
osd_lines = filter(
lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
self.raw_osd_status().split('\n'))
self.log(osd_lines)
in_osds = [int(i[4:].split()[0]) for i in filter(
lambda x: " in " in x,
osd_lines)]
out_osds = [int(i[4:].split()[0]) for i in filter(
lambda x: " out " in x,
osd_lines)]
up_osds = [int(i[4:].split()[0]) for i in filter(
lambda x: " up " in x,
osd_lines)]
down_osds = [int(i[4:].split()[0]) for i in filter(
lambda x: " down " in x,
osd_lines)]
dead_osds = [int(x.id_) for x in
filter(lambda x: not x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))]
live_osds = [int(x.id_) for x in
filter(lambda x: x.running(), self.ctx.daemons.iter_daemons_of_role('osd'))]
2011-07-12 01:00:03 +00:00
return { 'in' : in_osds, 'out' : out_osds, 'up' : up_osds,
'down' : down_osds, 'dead' : dead_osds, 'live' : live_osds, 'raw' : osd_lines }
def get_num_pgs(self):
status = self.raw_cluster_status()
self.log(status)
return int(re.search(
"\d* pgs:",
status).group(0).split()[0])
def create_pool(self, pool_name, pg_num=1):
with self.lock:
assert isinstance(pool_name, str)
assert isinstance(pg_num, int)
assert pool_name not in self.pools
self.log("creating pool_name %s"%(pool_name,))
self.raw_cluster_cmd('osd', 'pool', 'create', pool_name, str(pg_num))
self.pools[pool_name] = pg_num
def remove_pool(self, pool_name):
with self.lock:
assert isinstance(pool_name, str)
assert pool_name in self.pools
self.log("creating pool_name %s"%(pool_name,))
del self.pools[pool_name]
self.do_rados(
self.controller,
['rmpool', pool_name, pool_name, "--yes-i-really-really-mean-it"]
)
def get_pool(self):
with self.lock:
return random.choice(self.pools.keys());
def get_pool_pg_num(self, pool_name):
with self.lock:
assert isinstance(pool_name, str)
if pool_name in self.pools:
return self.pools[pool_name]
return 0;
def get_pool_property(self, pool_name, prop):
with self.lock:
assert isinstance(pool_name, str)
assert isinstance(prop, str)
output = self.raw_cluster_cmd(
'osd',
'pool',
'get',
pool_name,
prop)
return int(output.split()[1])
def set_pool_property(self, pool_name, prop, val):
with self.lock:
assert isinstance(pool_name, str)
assert isinstance(prop, str)
assert isinstance(val, int)
self.raw_cluster_cmd(
'osd',
'pool',
'set',
pool_name,
prop,
str(val),
'--allow-experimental-feature')
def expand_pool(self, pool_name, by, max_pgs):
with self.lock:
assert isinstance(pool_name, str)
assert isinstance(by, int)
assert pool_name in self.pools
if self.get_num_creating() > 0:
return
if (self.pools[pool_name] + by) > max_pgs:
return
self.log("increase pool size by %d"%(by,))
new_pg_num = self.pools[pool_name] + by
self.set_pool_property(pool_name, "pg_num", new_pg_num)
self.pools[pool_name] = new_pg_num
def set_pool_pgpnum(self, pool_name):
with self.lock:
assert isinstance(pool_name, str)
assert pool_name in self.pools
if self.get_num_creating() > 0:
return
self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
def list_pg_missing(self, pgid):
r = None
offset = {}
while True:
out = self.raw_cluster_cmd('--', 'pg',pgid,'list_missing',
json.dumps(offset))
j = json.loads('\n'.join(out.split('\n')[1:]))
if r is None:
r = j
else:
r['objects'].extend(j['objects'])
if not 'more' in j:
break
if j['more'] == 0:
break
offset = j['objects'][-1]['oid']
if 'more' in r:
del r['more']
return r
def get_pg_stats(self):
out = self.raw_cluster_cmd('--', 'pg','dump','--format=json')
j = json.loads('\n'.join(out.split('\n')[1:]))
return j['pg_stats']
def get_single_pg_stats(self, pgid):
all_stats = self.get_pg_stats()
for pg in all_stats:
if pg['pgid'] == pgid:
return pg
return None
def get_osd_dump(self):
out = self.raw_cluster_cmd('--', 'osd','dump','--format=json')
j = json.loads('\n'.join(out.split('\n')[1:]))
return j['osds']
def get_stuck_pgs(self, type_, threshold):
out = self.raw_cluster_cmd('--', 'pg','dump_stuck', type_,
'--format=json', '-t', str(threshold))
return json.loads('\n'.join(out.split('\n')[1:]))
def get_num_unfound_objects(self):
status = self.raw_cluster_status()
self.log(status)
match = re.search(
"\d+/\d+ unfound",
status)
if match == None:
return 0
else:
return int(match.group(0).split('/')[0])
def get_num_creating(self):
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
if 'creating' in pg['state']:
num += 1
return num
def get_num_active_clean(self):
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
if pg['state'].count('active') and pg['state'].count('clean') and not pg['state'].count('stale'):
num += 1
return num
def get_num_active_recovered(self):
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
if pg['state'].count('active') and not pg['state'].count('recover') and not pg['state'].count('backfill') and not pg['state'].count('stale'):
num += 1
return num
def get_num_active(self):
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
if pg['state'].count('active') and not pg['state'].count('stale'):
num += 1
return num
def get_num_down(self):
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
if (pg['state'].count('down') and not pg['state'].count('stale')) or \
(pg['state'].count('incomplete') and not pg['state'].count('stale')):
num += 1
return num
def get_num_active_down(self):
pgs = self.get_pg_stats()
num = 0
for pg in pgs:
if (pg['state'].count('active') and not pg['state'].count('stale')) or \
(pg['state'].count('down') and not pg['state'].count('stale')) or \
(pg['state'].count('incomplete') and not pg['state'].count('stale')):
num += 1
return num
def is_clean(self):
return self.get_num_active_clean() == self.get_num_pgs()
def is_recovered(self):
return self.get_num_active_recovered() == self.get_num_pgs()
def is_active_or_down(self):
return self.get_num_active_down() == self.get_num_pgs()
def wait_for_clean(self, timeout=None):
self.log("waiting for clean")
start = time.time()
num_active_clean = self.get_num_active_clean()
while not self.is_clean():
if timeout is not None:
assert time.time() - start < timeout, \
'failed to become clean before timeout expired'
cur_active_clean = self.get_num_active_clean()
if cur_active_clean != num_active_clean:
start = time.time()
num_active_clean = cur_active_clean
time.sleep(3)
self.log("clean!")
def wait_for_recovery(self, timeout=None):
self.log("waiting for recovery to complete")
start = time.time()
num_active_recovered = self.get_num_active_recovered()
while not self.is_recovered():
if timeout is not None:
assert time.time() - start < timeout, \
'failed to recover before timeout expired'
cur_active_recovered = self.get_num_active_recovered()
if cur_active_recovered != num_active_recovered:
start = time.time()
num_active_recovered = cur_active_recovered
time.sleep(3)
self.log("recovered!")
2012-07-28 17:22:13 +00:00
def wait_for_active(self, timeout=None):
self.log("waiting for peering to complete")
start = time.time()
num_active = self.get_num_active()
while not self.is_active():
if timeout is not None:
assert time.time() - start < timeout, \
'failed to recover before timeout expired'
cur_active = self.get_num_active()
if cur_active != num_active:
start = time.time()
num_active = cur_active
time.sleep(3)
self.log("active!")
def wait_for_active_or_down(self, timeout=None):
self.log("waiting for peering to complete or become blocked")
start = time.time()
num_active_down = self.get_num_active_down()
while not self.is_active_or_down():
if timeout is not None:
assert time.time() - start < timeout, \
'failed to recover before timeout expired'
cur_active_down = self.get_num_active_down()
if cur_active_down != num_active_down:
start = time.time()
num_active_down = cur_active_down
time.sleep(3)
self.log("active or down!")
def osd_is_up(self, osd):
osds = self.get_osd_dump()
return osds[osd]['up'] > 0
def wait_till_osd_is_up(self, osd, timeout=None):
self.log('waiting for osd.%d to be up' % osd);
start = time.time()
while not self.osd_is_up(osd):
if timeout is not None:
assert time.time() - start < timeout, \
'osd.%d failed to come up before timeout expired' % osd
time.sleep(3)
self.log('osd.%d is up' % osd)
def is_active(self):
return self.get_num_active() == self.get_num_pgs()
def wait_till_active(self, timeout=None):
self.log("waiting till active")
start = time.time()
while not self.is_active():
if timeout is not None:
assert time.time() - start < timeout, \
'failed to become active before timeout expired'
time.sleep(3)
self.log("active!")
def mark_out_osd(self, osd):
self.raw_cluster_cmd('osd', 'out', str(osd))
def kill_osd(self, osd):
self.ctx.daemons.get_daemon('osd', osd).stop()
def blackhole_kill_osd(self, osd):
self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
'injectargs', '--filestore-blackhole')
time.sleep(2)
self.ctx.daemons.get_daemon('osd', osd).stop()
def revive_osd(self, osd):
self.ctx.daemons.get_daemon('osd', osd).restart()
def mark_down_osd(self, osd):
self.raw_cluster_cmd('osd', 'down', str(osd))
def mark_in_osd(self, osd):
self.raw_cluster_cmd('osd', 'in', str(osd))
2011-11-09 06:02:58 +00:00
## monitors
def kill_mon(self, mon):
self.ctx.daemons.get_daemon('mon', mon).stop()
def revive_mon(self, mon):
self.ctx.daemons.get_daemon('mon', mon).restart()
def get_mon_status(self, mon):
addr = self.ctx.ceph.conf['mon.%s' % mon]['mon addr']
out = self.raw_cluster_cmd('-m', addr, 'mon_status')
return json.loads(out)
def get_mon_quorum(self):
out = self.raw_cluster_cmd('quorum_status')
j = json.loads(out)
2011-11-17 21:52:17 +00:00
self.log('quorum_status is %s' % out)
2011-11-09 06:02:58 +00:00
return j['quorum']
def wait_for_mon_quorum_size(self, size, timeout=300):
2011-11-09 06:02:58 +00:00
self.log('waiting for quorum size %d' % size)
start = time.time()
while not len(self.get_mon_quorum()) == size:
if timeout is not None:
assert time.time() - start < timeout, \
'failed to reach quorum size %d before timeout expired' % size
time.sleep(3)
self.log("quorum is size %d" % size)
def get_mon_health(self, debug=False):
out = self.raw_cluster_cmd('health', '--format=json')
if debug:
self.log('health:\n{h}'.format(h=out))
return json.loads(out)
## metadata servers
def kill_mds(self, mds):
self.ctx.daemons.get_daemon('mds', mds).stop()
def kill_mds_by_rank(self, rank):
status = self.get_mds_status_by_rank(rank)
self.ctx.daemons.get_daemon('mds', status['name']).stop()
def revive_mds(self, mds, standby_for_rank=None):
args = []
if standby_for_rank:
args.extend(['--hot-standby', standby_for_rank])
self.ctx.daemons.get_daemon('mds', mds).restart(*args)
def revive_mds_by_rank(self, rank, standby_for_rank=None):
args = []
if standby_for_rank:
args.extend(['--hot-standby', standby_for_rank])
status = self.get_mds_status_by_rank(rank)
self.ctx.daemons.get_daemon('mds', status['name']).restart(*args)
def get_mds_status(self, mds):
out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
j = json.loads(' '.join(out.splitlines()[1:]))
# collate; for dup ids, larger gid wins.
for info in j['info'].itervalues():
if info['name'] == mds:
return info
return None
def get_mds_status_by_rank(self, rank):
out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
j = json.loads(' '.join(out.splitlines()[1:]))
# collate; for dup ids, larger gid wins.
for info in j['info'].itervalues():
if info['rank'] == rank:
return info
return None
def get_mds_status_all(self):
out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
j = json.loads(' '.join(out.splitlines()[1:]))
return j