ceph/teuthology/task/ceph_manager.py
2011-07-11 18:07:37 -07:00

162 lines
4.9 KiB
Python

import random
import time
import re
import gevent
from orchestra import run
class Thrasher(gevent.Greenlet):
def __init__(self, manager, logger=None):
self.ceph_manager = manager
self.ceph_manager.wait_till_clean()
osd_status = self.ceph_manager.get_osd_status()
self.in_osds = osd_status['in']
self.out_osds = osd_status['out']
self.stopping = False
self.logger = logger
if self.logger != None:
self.log = lambda x: self.logger.info(x)
else:
def tmp(x):
print x
self.log = tmp
gevent.Greenlet.__init__(self, self.do_thrash)
self.start()
def wait_till_clean(self):
self.log("Waiting until clean")
while not self.ceph_manager.is_clean():
time.sleep(3)
print "..."
self.log("Clean!")
def remove_osd(self):
osd = random.choice(self.in_osds)
self.log("Removing osd %s"%(str(osd),))
self.in_osds.remove(osd)
self.out_osds.append(osd)
self.ceph_manager.mark_out_osd(osd)
def add_osd(self):
osd = random.choice(self.out_osds)
self.log("Adding osd %s"%(str(osd),))
self.out_osds.remove(osd)
self.in_osds.append(osd)
self.ceph_manager.mark_in_osd(osd)
def all_up(self):
while len(self.out_osds) > 0:
self.add_osd()
def do_join(self):
self.stopping = True
self.get()
def do_thrash(self):
CLEANINT=60
DELAY=5
self.log("starting do_thrash")
while not self.stopping:
self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds]]))
if random.uniform(0,1) < (float(DELAY)/CLEANINT):
self.wait_till_clean()
if (len(self.out_osds) == 0):
self.remove_osd()
elif (len(self.in_osds) <= 2):
self.add_osd()
else:
x = random.choice([self.remove_osd, self.add_osd])
x()
time.sleep(DELAY)
class CephManager:
def __init__(self, controller, logger=None):
self.controller = controller
if (logger):
self.log = lambda x: logger.info(x)
else:
def tmp(x):
print x
self.log = tmp
def raw_cluster_cmd(self, suffix):
proc = self.controller.run(
args=[
"/bin/sh", "-c",
" ".join([
"LD_LIBRARY_PRELOAD=/tmp/cephtest/binary/usr/local/lib",
"/tmp/cephtest/binary/usr/local/bin/ceph -k /tmp/cephtest/ceph.keyring -c "+\
"/tmp/cephtest/ceph.conf " + suffix
])
],
stdout=run.PIPE,
wait=False
)
out = ""
tmp = proc.stdout.read(1)
while tmp:
out += tmp
tmp = proc.stdout.read(1)
return out
def raw_cluster_status(self):
return self.raw_cluster_cmd("-s")
def raw_osd_status(self):
return self.raw_cluster_cmd("osd dump -o -")
def raw_pg_status(self):
return self.controller.do_ssh("pg dump -o -")
def get_osd_status(self):
osd_lines = filter(
lambda x: x[:3] == 'osd' and (("up" in x) or ("down" in x)),
self.raw_osd_status().split('\n'))
self.log(osd_lines)
in_osds = [int(i[3:].split()[0]) for i in filter(
lambda x: " in " in x,
osd_lines)]
out_osds = [int(i[3:].split()[0]) for i in filter(
lambda x: " out " in x,
osd_lines)]
up_osds = [int(i[3:].split()[0]) for i in filter(
lambda x: " up " in x,
osd_lines)]
down_osds = [int(i[3:].split()[0]) for i in filter(
lambda x: " down " in x,
osd_lines)]
return { 'in' : in_osds, 'out' : out_osds, 'up' : up_osds,
'down' : down_osds, 'raw' : osd_lines }
def get_num_pgs(self):
status = self.raw_cluster_status()
return int(re.search(
"\d* pgs:",
status).group(0).split()[0])
def get_num_active_clean(self):
status = self.raw_cluster_status()
self.log(status)
match = re.search(
"\d* active.clean",
status)
if match == None:
return 0
else:
return int(match.group(0).split()[0])
def is_clean(self):
return self.get_num_active_clean() == self.get_num_pgs()
def wait_till_clean(self):
self.log("waiting till clean")
while not self.is_clean():
time.sleep(3)
self.log("clean!")
def mark_out_osd(self, osd):
self.raw_cluster_cmd("osd out %s"%(str(osd,)))
def mark_in_osd(self, osd):
self.raw_cluster_cmd("osd in %s"%(str(osd,)))