import random import time import re import gevent from orchestra import run class Thrasher(gevent.Greenlet): def __init__(self, manager, logger = None): self.ceph_manager = manager self.ceph_manager.wait_till_clean() osd_status = self.ceph_manager.get_osd_status() self.in_osds = osd_status['in'] self.out_osds = osd_status['out'] self.stopping = False self.logger = logger if self.logger != None: self.log = lambda x: self.logger.info(x) else: def tmp(x): print x self.log = tmp gevent.Greenlet.__init__(self, self.do_thrash) self.start() def wait_till_clean(self): self.log("Waiting until clean") while not self.ceph_manager.is_clean(): time.sleep(3) print "..." self.log("Clean!") def remove_osd(self): osd = random.choice(self.in_osds) self.log("Removing osd %s"%(str(osd),)) self.in_osds.remove(osd) self.out_osds.append(osd) self.ceph_manager.mark_out_osd(osd) def add_osd(self): osd = random.choice(self.out_osds) self.log("Adding osd %s"%(str(osd),)) self.out_osds.remove(osd) self.in_osds.append(osd) self.ceph_manager.mark_in_osd(osd) def all_up(self): while len(self.out_osds) > 0: self.add_osd() def do_join(self): self.stopping = True self.get() def do_thrash(self): CLEANINT=60 DELAY=5 self.log("starting do_thrash") while not self.stopping: self.log(" ".join([str(x) for x in ["in_osds: ", self.in_osds, " out_osds: ", self.out_osds]])) if random.uniform(0,1) < (float(DELAY)/CLEANINT): self.wait_till_clean() if (len(self.out_osds) == 0): self.remove_osd() elif (len(self.in_osds) <= 2): self.add_osd() else: x = random.choice([self.remove_osd, self.add_osd]) x() time.sleep(DELAY) class CephManager: def __init__(self, controller, logger=None): self.controller = controller if (logger): self.log = lambda x: logger.info(x) else: def tmp(x): print x self.log = tmp def raw_cluster_cmd(self, suffix): proc = self.controller.run( args=[ "/bin/sh", "-c", " ".join([ "LD_LIBRARY_PRELOAD=/tmp/cephtest/binary/usr/local/lib", "/tmp/cephtest/binary/usr/local/bin/ceph -k /tmp/cephtest/ceph.keyring -c "+\ "/tmp/cephtest/ceph.conf " + suffix ]) ], stdout=run.PIPE, wait=False ) out = "" tmp = proc.stdout.read(1) while tmp: out += tmp tmp = proc.stdout.read(1) return out def raw_cluster_status(self): return self.raw_cluster_cmd("-s") def raw_osd_status(self): return self.raw_cluster_cmd("osd dump -o -") def raw_pg_status(self): return self.controller.do_ssh("pg dump -o -") def get_osd_status(self): osd_lines = filter( lambda x: x[:3] == 'osd' and (("up" in x) or ("down" in x)), self.raw_osd_status().split('\n')) self.log(osd_lines) in_osds = [int(i[3:].split()[0]) for i in filter( lambda x: " in " in x, osd_lines)] out_osds = [int(i[3:].split()[0]) for i in filter( lambda x: " out " in x, osd_lines)] up_osds = [int(i[3:].split()[0]) for i in filter( lambda x: " up " in x, osd_lines)] down_osds = [int(i[3:].split()[0]) for i in filter( lambda x: " down " in x, osd_lines)] return { 'in' : in_osds, 'out' : out_osds, 'up' : up_osds, 'down' : down_osds, 'raw' : osd_lines } def get_num_pgs(self): status = self.raw_cluster_status() return int(re.search( "\d* pgs:", status).group(0).split()[0]) def get_num_active_clean(self): status = self.raw_cluster_status() self.log(status) match = re.search( "\d* active.clean", status) if match == None: return 0 else: return int(match.group(0).split()[0]) def is_clean(self): return self.get_num_active_clean() == self.get_num_pgs() def wait_till_clean(self): self.log("waiting till clean") while not self.is_clean(): time.sleep(3) self.log("clean!") def mark_out_osd(self, osd): self.raw_cluster_cmd("osd out %s"%(str(osd,))) def mark_in_osd(self, osd): self.raw_cluster_cmd("osd in %s"%(str(osd,)))