mirror of
https://github.com/ceph/ceph
synced 2025-01-15 07:23:16 +00:00
77bf36fa5a
These tools are now available in the $PATH so it's no longer necessary to remove them. Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
3236 lines
121 KiB
Python
3236 lines
121 KiB
Python
"""
|
|
ceph manager -- Thrasher and CephManager objects
|
|
"""
|
|
from functools import wraps
|
|
import contextlib
|
|
import errno
|
|
import random
|
|
import signal
|
|
import time
|
|
import gevent
|
|
import base64
|
|
import json
|
|
import logging
|
|
import threading
|
|
import traceback
|
|
import os
|
|
import shlex
|
|
|
|
from io import BytesIO, StringIO
|
|
from subprocess import DEVNULL
|
|
from teuthology import misc as teuthology
|
|
from tasks.scrub import Scrubber
|
|
from tasks.util.rados import cmd_erasure_code_profile
|
|
from tasks.util import get_remote
|
|
|
|
from teuthology.contextutil import safe_while
|
|
from teuthology.orchestra.remote import Remote
|
|
from teuthology.orchestra import run
|
|
from teuthology.parallel import parallel
|
|
from teuthology.exceptions import CommandFailedError
|
|
from tasks.thrasher import Thrasher
|
|
|
|
|
|
DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# this is for cephadm clusters
|
|
def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
|
|
extra_args = []
|
|
if name:
|
|
extra_args = ['-n', name]
|
|
return remote.run(
|
|
args=[
|
|
'sudo',
|
|
ctx.cephadm,
|
|
'--image', ctx.ceph[cluster_name].image,
|
|
'shell',
|
|
] + extra_args + [
|
|
'--fsid', ctx.ceph[cluster_name].fsid,
|
|
'--',
|
|
] + args,
|
|
**kwargs
|
|
)
|
|
|
|
# this is for rook clusters
|
|
def toolbox(ctx, cluster_name, args, **kwargs):
|
|
return ctx.rook[cluster_name].remote.run(
|
|
args=[
|
|
'kubectl',
|
|
'-n', 'rook-ceph',
|
|
'exec',
|
|
ctx.rook[cluster_name].toolbox,
|
|
'--',
|
|
] + args,
|
|
**kwargs
|
|
)
|
|
|
|
|
|
def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
|
|
conf_fp = BytesIO()
|
|
ctx.ceph[cluster].conf.write(conf_fp)
|
|
conf_fp.seek(0)
|
|
writes = ctx.cluster.run(
|
|
args=[
|
|
'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
|
|
'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
|
|
'sudo', 'tee', conf_path, run.Raw('&&'),
|
|
'sudo', 'chmod', '0644', conf_path,
|
|
run.Raw('>'), '/dev/null',
|
|
|
|
],
|
|
stdin=run.PIPE,
|
|
wait=False)
|
|
teuthology.feed_many_stdins_and_close(conf_fp, writes)
|
|
run.wait(writes)
|
|
|
|
def get_valgrind_args(testdir, name, preamble, v, exit_on_first_error=True, cd=True):
|
|
"""
|
|
Build a command line for running valgrind.
|
|
|
|
testdir - test results directory
|
|
name - name of daemon (for naming hte log file)
|
|
preamble - stuff we should run before valgrind
|
|
v - valgrind arguments
|
|
"""
|
|
if v is None:
|
|
return preamble
|
|
if not isinstance(v, list):
|
|
v = [v]
|
|
|
|
# https://tracker.ceph.com/issues/44362
|
|
preamble.extend([
|
|
'env', 'OPENSSL_ia32cap=~0x1000000000000000',
|
|
])
|
|
|
|
val_path = '/var/log/ceph/valgrind'
|
|
if '--tool=memcheck' in v or '--tool=helgrind' in v:
|
|
extra_args = [
|
|
'valgrind',
|
|
'--trace-children=no',
|
|
'--child-silent-after-fork=yes',
|
|
'--soname-synonyms=somalloc=*tcmalloc*',
|
|
'--num-callers=50',
|
|
'--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
|
|
'--xml=yes',
|
|
'--xml-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
|
|
'--time-stamp=yes',
|
|
'--vgdb=yes',
|
|
]
|
|
else:
|
|
extra_args = [
|
|
'valgrind',
|
|
'--trace-children=no',
|
|
'--child-silent-after-fork=yes',
|
|
'--soname-synonyms=somalloc=*tcmalloc*',
|
|
'--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
|
|
'--log-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
|
|
'--time-stamp=yes',
|
|
'--vgdb=yes',
|
|
]
|
|
if exit_on_first_error:
|
|
extra_args.extend([
|
|
# at least Valgrind 3.14 is required
|
|
'--exit-on-first-error=yes',
|
|
'--error-exitcode=42',
|
|
])
|
|
args = []
|
|
if cd:
|
|
args += ['cd', testdir, run.Raw('&&')]
|
|
args += preamble + extra_args + v
|
|
log.debug('running %s under valgrind with args %s', name, args)
|
|
return args
|
|
|
|
|
|
def mount_osd_data(ctx, remote, cluster, osd):
|
|
"""
|
|
Mount a remote OSD
|
|
|
|
:param ctx: Context
|
|
:param remote: Remote site
|
|
:param cluster: name of ceph cluster
|
|
:param osd: Osd name
|
|
"""
|
|
log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
|
|
role = "{0}.osd.{1}".format(cluster, osd)
|
|
alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
|
|
if remote in ctx.disk_config.remote_to_roles_to_dev:
|
|
if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
|
|
role = alt_role
|
|
if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
|
|
return
|
|
dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
|
|
mount_options = ctx.disk_config.\
|
|
remote_to_roles_to_dev_mount_options[remote][role]
|
|
fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
|
|
mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
|
|
|
|
log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
|
|
'mountpoint: {p}, type: {t}, options: {v}'.format(
|
|
o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
|
|
c=cluster))
|
|
|
|
remote.run(
|
|
args=[
|
|
'sudo',
|
|
'mount',
|
|
'-t', fstype,
|
|
'-o', ','.join(mount_options),
|
|
dev,
|
|
mnt,
|
|
]
|
|
)
|
|
|
|
|
|
def log_exc(func):
|
|
@wraps(func)
|
|
def wrapper(self):
|
|
try:
|
|
return func(self)
|
|
except:
|
|
self.log(traceback.format_exc())
|
|
raise
|
|
return wrapper
|
|
|
|
|
|
class PoolType:
|
|
REPLICATED = 1
|
|
ERASURE_CODED = 3
|
|
|
|
|
|
class OSDThrasher(Thrasher):
|
|
"""
|
|
Object used to thrash Ceph
|
|
"""
|
|
def __init__(self, manager, config, name, logger):
|
|
super(OSDThrasher, self).__init__()
|
|
|
|
self.ceph_manager = manager
|
|
self.cluster = manager.cluster
|
|
self.ceph_manager.wait_for_clean()
|
|
osd_status = self.ceph_manager.get_osd_status()
|
|
self.in_osds = osd_status['in']
|
|
self.live_osds = osd_status['live']
|
|
self.out_osds = osd_status['out']
|
|
self.dead_osds = osd_status['dead']
|
|
self.stopping = False
|
|
self.logger = logger
|
|
self.config = config
|
|
self.name = name
|
|
self.revive_timeout = self.config.get("revive_timeout", 360)
|
|
self.pools_to_fix_pgp_num = set()
|
|
if self.config.get('powercycle'):
|
|
self.revive_timeout += 120
|
|
self.clean_wait = self.config.get('clean_wait', 0)
|
|
self.minin = self.config.get("min_in", 4)
|
|
self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
|
|
self.sighup_delay = self.config.get('sighup_delay')
|
|
self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
|
|
self.dump_ops_enable = self.config.get('dump_ops_enable')
|
|
self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
|
|
self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
|
|
self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
|
|
self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
|
|
self.random_eio = self.config.get('random_eio')
|
|
self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
|
|
|
|
num_osds = self.in_osds + self.out_osds
|
|
self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
|
|
self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
|
|
if self.config is None:
|
|
self.config = dict()
|
|
# prevent monitor from auto-marking things out while thrasher runs
|
|
# try both old and new tell syntax, in case we are testing old code
|
|
self.saved_options = []
|
|
# assuming that the default settings do not vary from one daemon to
|
|
# another
|
|
first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
|
|
opts = [('mon', 'mon_osd_down_out_interval', 0)]
|
|
#why do we disable marking an OSD out automatically? :/
|
|
for service, opt, new_value in opts:
|
|
old_value = manager.get_config(first_mon[0],
|
|
first_mon[1],
|
|
opt)
|
|
self.saved_options.append((service, opt, old_value))
|
|
manager.inject_args(service, '*', opt, new_value)
|
|
# initialize ceph_objectstore_tool property - must be done before
|
|
# do_thrash is spawned - http://tracker.ceph.com/issues/18799
|
|
if (self.config.get('powercycle') or
|
|
not self.cmd_exists_on_osds("ceph-objectstore-tool") or
|
|
self.config.get('disable_objectstore_tool_tests', False)):
|
|
self.ceph_objectstore_tool = False
|
|
if self.config.get('powercycle'):
|
|
self.log("Unable to test ceph-objectstore-tool, "
|
|
"powercycle testing")
|
|
else:
|
|
self.log("Unable to test ceph-objectstore-tool, "
|
|
"not available on all OSD nodes")
|
|
else:
|
|
self.ceph_objectstore_tool = \
|
|
self.config.get('ceph_objectstore_tool', True)
|
|
# spawn do_thrash
|
|
self.thread = gevent.spawn(self.do_thrash)
|
|
if self.sighup_delay:
|
|
self.sighup_thread = gevent.spawn(self.do_sighup)
|
|
if self.optrack_toggle_delay:
|
|
self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
|
|
if self.dump_ops_enable == "true":
|
|
self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
|
|
if self.noscrub_toggle_delay:
|
|
self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
|
|
|
|
def log(self, msg, *args, **kwargs):
|
|
self.logger.info(msg, *args, **kwargs)
|
|
|
|
def cmd_exists_on_osds(self, cmd):
|
|
if self.ceph_manager.cephadm or self.ceph_manager.rook:
|
|
return True
|
|
allremotes = self.ceph_manager.ctx.cluster.only(\
|
|
teuthology.is_type('osd', self.cluster)).remotes.keys()
|
|
allremotes = list(set(allremotes))
|
|
for remote in allremotes:
|
|
proc = remote.run(args=['type', cmd], wait=True,
|
|
check_status=False, stdout=BytesIO(),
|
|
stderr=BytesIO())
|
|
if proc.exitstatus != 0:
|
|
return False;
|
|
return True;
|
|
|
|
def run_ceph_objectstore_tool(self, remote, osd, cmd):
|
|
if self.ceph_manager.cephadm:
|
|
return shell(
|
|
self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
|
|
args=['ceph-objectstore-tool', '--err-to-stderr'] + cmd,
|
|
name=osd,
|
|
wait=True, check_status=False,
|
|
stdout=StringIO(),
|
|
stderr=StringIO())
|
|
elif self.ceph_manager.rook:
|
|
assert False, 'not implemented'
|
|
else:
|
|
return remote.run(
|
|
args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool', '--err-to-stderr'] + cmd,
|
|
wait=True, check_status=False,
|
|
stdout=StringIO(),
|
|
stderr=StringIO())
|
|
|
|
def run_ceph_bluestore_tool(self, remote, osd, cmd):
|
|
if self.ceph_manager.cephadm:
|
|
return shell(
|
|
self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
|
|
args=['ceph-bluestore-tool', '--err-to-stderr'] + cmd,
|
|
name=osd,
|
|
wait=True, check_status=False,
|
|
stdout=StringIO(),
|
|
stderr=StringIO())
|
|
elif self.ceph_manager.rook:
|
|
assert False, 'not implemented'
|
|
else:
|
|
return remote.run(
|
|
args=['sudo', 'ceph-bluestore-tool', '--err-to-stderr'] + cmd,
|
|
wait=True, check_status=False,
|
|
stdout=StringIO(),
|
|
stderr=StringIO())
|
|
|
|
def kill_osd(self, osd=None, mark_down=False, mark_out=False):
|
|
"""
|
|
:param osd: Osd to be killed.
|
|
:mark_down: Mark down if true.
|
|
:mark_out: Mark out if true.
|
|
"""
|
|
if osd is None:
|
|
osd = random.choice(self.live_osds)
|
|
self.log("Killing osd %s, live_osds are %s" % (str(osd),
|
|
str(self.live_osds)))
|
|
self.live_osds.remove(osd)
|
|
self.dead_osds.append(osd)
|
|
self.ceph_manager.kill_osd(osd)
|
|
if mark_down:
|
|
self.ceph_manager.mark_down_osd(osd)
|
|
if mark_out and osd in self.in_osds:
|
|
self.out_osd(osd)
|
|
if self.ceph_objectstore_tool:
|
|
self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
|
|
remote = self.ceph_manager.find_remote('osd', osd)
|
|
FSPATH = self.ceph_manager.get_filepath()
|
|
JPATH = os.path.join(FSPATH, "journal")
|
|
exp_osd = imp_osd = osd
|
|
self.log('remote for osd %s is %s' % (osd, remote))
|
|
exp_remote = imp_remote = remote
|
|
# If an older osd is available we'll move a pg from there
|
|
if (len(self.dead_osds) > 1 and
|
|
random.random() < self.chance_move_pg):
|
|
exp_osd = random.choice(self.dead_osds[:-1])
|
|
exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
|
|
self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
|
|
prefix = [
|
|
'--no-mon-config',
|
|
'--log-file=/var/log/ceph/objectstore_tool.$pid.log',
|
|
]
|
|
|
|
if self.ceph_manager.rook:
|
|
assert False, 'not implemented'
|
|
|
|
if not self.ceph_manager.cephadm:
|
|
# ceph-objectstore-tool might be temporarily absent during an
|
|
# upgrade - see http://tracker.ceph.com/issues/18014
|
|
with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
|
|
while proceed():
|
|
proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
|
|
wait=True, check_status=False, stdout=BytesIO(),
|
|
stderr=BytesIO())
|
|
if proc.exitstatus == 0:
|
|
break
|
|
log.debug("ceph-objectstore-tool binary not present, trying again")
|
|
|
|
# ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
|
|
# see http://tracker.ceph.com/issues/19556
|
|
with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
|
|
while proceed():
|
|
proc = self.run_ceph_objectstore_tool(
|
|
exp_remote, 'osd.%s' % exp_osd,
|
|
prefix + [
|
|
'--data-path', FSPATH.format(id=exp_osd),
|
|
'--journal-path', JPATH.format(id=exp_osd),
|
|
'--op', 'list-pgs',
|
|
])
|
|
if proc.exitstatus == 0:
|
|
break
|
|
elif (proc.exitstatus == 1 and
|
|
proc.stderr.getvalue() == "OSD has the store locked"):
|
|
continue
|
|
else:
|
|
raise Exception("ceph-objectstore-tool: "
|
|
"exp list-pgs failure with status {ret}".
|
|
format(ret=proc.exitstatus))
|
|
|
|
pgs = proc.stdout.getvalue().split('\n')[:-1]
|
|
if len(pgs) == 0:
|
|
self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
|
|
return
|
|
pg = random.choice(pgs)
|
|
#exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
|
|
#exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
|
|
exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
|
|
"exp.{pg}.{id}".format(
|
|
pg=pg,
|
|
id=exp_osd))
|
|
if self.ceph_manager.cephadm:
|
|
exp_host_path = os.path.join(
|
|
'/var/log/ceph',
|
|
self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
|
|
"exp.{pg}.{id}".format(
|
|
pg=pg,
|
|
id=exp_osd))
|
|
else:
|
|
exp_host_path = exp_path
|
|
|
|
# export
|
|
# Can't use new export-remove op since this is part of upgrade testing
|
|
proc = self.run_ceph_objectstore_tool(
|
|
exp_remote, 'osd.%s' % exp_osd,
|
|
prefix + [
|
|
'--data-path', FSPATH.format(id=exp_osd),
|
|
'--journal-path', JPATH.format(id=exp_osd),
|
|
'--op', 'export',
|
|
'--pgid', pg,
|
|
'--file', exp_path,
|
|
])
|
|
if proc.exitstatus:
|
|
raise Exception("ceph-objectstore-tool: "
|
|
"export failure with status {ret}".
|
|
format(ret=proc.exitstatus))
|
|
# remove
|
|
proc = self.run_ceph_objectstore_tool(
|
|
exp_remote, 'osd.%s' % exp_osd,
|
|
prefix + [
|
|
'--data-path', FSPATH.format(id=exp_osd),
|
|
'--journal-path', JPATH.format(id=exp_osd),
|
|
'--force',
|
|
'--op', 'remove',
|
|
'--pgid', pg,
|
|
])
|
|
if proc.exitstatus:
|
|
raise Exception("ceph-objectstore-tool: "
|
|
"remove failure with status {ret}".
|
|
format(ret=proc.exitstatus))
|
|
# If there are at least 2 dead osds we might move the pg
|
|
if exp_osd != imp_osd:
|
|
# If pg isn't already on this osd, then we will move it there
|
|
proc = self.run_ceph_objectstore_tool(
|
|
imp_remote,
|
|
'osd.%s' % imp_osd,
|
|
prefix + [
|
|
'--data-path', FSPATH.format(id=imp_osd),
|
|
'--journal-path', JPATH.format(id=imp_osd),
|
|
'--op', 'list-pgs',
|
|
])
|
|
if proc.exitstatus:
|
|
raise Exception("ceph-objectstore-tool: "
|
|
"imp list-pgs failure with status {ret}".
|
|
format(ret=proc.exitstatus))
|
|
pgs = proc.stdout.getvalue().split('\n')[:-1]
|
|
if pg not in pgs:
|
|
self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
|
|
format(pg=pg, fosd=exp_osd, tosd=imp_osd))
|
|
if imp_remote != exp_remote:
|
|
# Copy export file to the other machine
|
|
self.log("Transfer export file from {srem} to {trem}".
|
|
format(srem=exp_remote, trem=imp_remote))
|
|
# just in case an upgrade make /var/log/ceph unreadable by non-root,
|
|
exp_remote.run(args=['sudo', 'chmod', '777',
|
|
'/var/log/ceph'])
|
|
imp_remote.run(args=['sudo', 'chmod', '777',
|
|
'/var/log/ceph'])
|
|
tmpexport = Remote.get_file(exp_remote, exp_host_path,
|
|
sudo=True)
|
|
if exp_host_path != exp_path:
|
|
# push to /var/log/ceph, then rename (we can't
|
|
# chmod 777 the /var/log/ceph/$fsid mountpoint)
|
|
Remote.put_file(imp_remote, tmpexport, exp_path)
|
|
imp_remote.run(args=[
|
|
'sudo', 'mv', exp_path, exp_host_path])
|
|
else:
|
|
Remote.put_file(imp_remote, tmpexport, exp_host_path)
|
|
os.remove(tmpexport)
|
|
else:
|
|
# Can't move the pg after all
|
|
imp_osd = exp_osd
|
|
imp_remote = exp_remote
|
|
# import
|
|
proc = self.run_ceph_objectstore_tool(
|
|
imp_remote, 'osd.%s' % imp_osd,
|
|
[
|
|
'--data-path', FSPATH.format(id=imp_osd),
|
|
'--journal-path', JPATH.format(id=imp_osd),
|
|
'--log-file=/var/log/ceph/objectstore_tool.$pid.log',
|
|
'--op', 'import',
|
|
'--file', exp_path,
|
|
])
|
|
if proc.exitstatus == 1:
|
|
bogosity = "The OSD you are using is older than the exported PG"
|
|
if bogosity in proc.stderr.getvalue():
|
|
self.log("OSD older than exported PG"
|
|
"...ignored")
|
|
elif proc.exitstatus == 10:
|
|
self.log("Pool went away before processing an import"
|
|
"...ignored")
|
|
elif proc.exitstatus == 11:
|
|
self.log("Attempt to import an incompatible export"
|
|
"...ignored")
|
|
elif proc.exitstatus == 12:
|
|
# this should be safe to ignore because we only ever move 1
|
|
# copy of the pg at a time, and merge is only initiated when
|
|
# all replicas are peered and happy. /me crosses fingers
|
|
self.log("PG merged on target"
|
|
"...ignored")
|
|
elif proc.exitstatus:
|
|
raise Exception("ceph-objectstore-tool: "
|
|
"import failure with status {ret}".
|
|
format(ret=proc.exitstatus))
|
|
cmd = "sudo rm -f {file}".format(file=exp_host_path)
|
|
exp_remote.run(args=cmd)
|
|
if imp_remote != exp_remote:
|
|
imp_remote.run(args=cmd)
|
|
|
|
def blackhole_kill_osd(self, osd=None):
|
|
"""
|
|
If all else fails, kill the osd.
|
|
:param osd: Osd to be killed.
|
|
"""
|
|
if osd is None:
|
|
osd = random.choice(self.live_osds)
|
|
self.log("Blackholing and then killing osd %s, live_osds are %s" %
|
|
(str(osd), str(self.live_osds)))
|
|
self.live_osds.remove(osd)
|
|
self.dead_osds.append(osd)
|
|
self.ceph_manager.blackhole_kill_osd(osd)
|
|
|
|
def revive_osd(self, osd=None, skip_admin_check=False):
|
|
"""
|
|
Revive the osd.
|
|
:param osd: Osd to be revived.
|
|
"""
|
|
if osd is None:
|
|
osd = random.choice(self.dead_osds)
|
|
self.log("Reviving osd %s" % (str(osd),))
|
|
self.ceph_manager.revive_osd(
|
|
osd,
|
|
self.revive_timeout,
|
|
skip_admin_check=skip_admin_check)
|
|
self.dead_osds.remove(osd)
|
|
self.live_osds.append(osd)
|
|
if self.random_eio > 0 and osd == self.rerrosd:
|
|
self.ceph_manager.set_config(self.rerrosd,
|
|
filestore_debug_random_read_err = self.random_eio)
|
|
self.ceph_manager.set_config(self.rerrosd,
|
|
bluestore_debug_random_read_err = self.random_eio)
|
|
|
|
|
|
def out_osd(self, osd=None):
|
|
"""
|
|
Mark the osd out
|
|
:param osd: Osd to be marked.
|
|
"""
|
|
if osd is None:
|
|
osd = random.choice(self.in_osds)
|
|
self.log("Removing osd %s, in_osds are: %s" %
|
|
(str(osd), str(self.in_osds)))
|
|
self.ceph_manager.mark_out_osd(osd)
|
|
self.in_osds.remove(osd)
|
|
self.out_osds.append(osd)
|
|
|
|
def in_osd(self, osd=None):
|
|
"""
|
|
Mark the osd out
|
|
:param osd: Osd to be marked.
|
|
"""
|
|
if osd is None:
|
|
osd = random.choice(self.out_osds)
|
|
if osd in self.dead_osds:
|
|
return self.revive_osd(osd)
|
|
self.log("Adding osd %s" % (str(osd),))
|
|
self.out_osds.remove(osd)
|
|
self.in_osds.append(osd)
|
|
self.ceph_manager.mark_in_osd(osd)
|
|
self.log("Added osd %s" % (str(osd),))
|
|
|
|
def reweight_osd_or_by_util(self, osd=None):
|
|
"""
|
|
Reweight an osd that is in
|
|
:param osd: Osd to be marked.
|
|
"""
|
|
if osd is not None or random.choice([True, False]):
|
|
if osd is None:
|
|
osd = random.choice(self.in_osds)
|
|
val = random.uniform(.1, 1.0)
|
|
self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
|
|
str(osd), str(val))
|
|
else:
|
|
# do it several times, the option space is large
|
|
for i in range(5):
|
|
options = {
|
|
'max_change': random.choice(['0.05', '1.0', '3.0']),
|
|
'overage': random.choice(['110', '1000']),
|
|
'type': random.choice([
|
|
'reweight-by-utilization',
|
|
'test-reweight-by-utilization']),
|
|
}
|
|
self.log("Reweighting by: %s"%(str(options),))
|
|
self.ceph_manager.raw_cluster_cmd(
|
|
'osd',
|
|
options['type'],
|
|
options['overage'],
|
|
options['max_change'])
|
|
|
|
def primary_affinity(self, osd=None):
|
|
self.log("primary_affinity")
|
|
if osd is None:
|
|
osd = random.choice(self.in_osds)
|
|
if random.random() >= .5:
|
|
pa = random.random()
|
|
elif random.random() >= .5:
|
|
pa = 1
|
|
else:
|
|
pa = 0
|
|
self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
|
|
str(osd), str(pa))
|
|
|
|
def thrash_cluster_full(self):
|
|
"""
|
|
Set and unset cluster full condition
|
|
"""
|
|
self.log('Setting full ratio to .001')
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
|
|
time.sleep(1)
|
|
self.log('Setting full ratio back to .95')
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
|
|
|
|
def thrash_pg_upmap(self):
|
|
"""
|
|
Install or remove random pg_upmap entries in OSDMap
|
|
"""
|
|
self.log("thrash_pg_upmap")
|
|
from random import shuffle
|
|
out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
|
|
j = json.loads(out)
|
|
self.log('j is %s' % j)
|
|
try:
|
|
if random.random() >= .3:
|
|
pgs = self.ceph_manager.get_pg_stats()
|
|
if not pgs:
|
|
self.log('No pgs; doing nothing')
|
|
return
|
|
pg = random.choice(pgs)
|
|
pgid = str(pg['pgid'])
|
|
poolid = int(pgid.split('.')[0])
|
|
sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
|
|
if len(sizes) == 0:
|
|
self.log('No pools; doing nothing')
|
|
return
|
|
n = sizes[0]
|
|
osds = self.in_osds + self.out_osds
|
|
shuffle(osds)
|
|
osds = osds[0:n]
|
|
self.log('Setting %s to %s' % (pgid, osds))
|
|
cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
|
|
self.log('cmd %s' % cmd)
|
|
self.ceph_manager.raw_cluster_cmd(*cmd)
|
|
else:
|
|
m = j['pg_upmap']
|
|
if len(m) > 0:
|
|
shuffle(m)
|
|
pg = m[0]['pgid']
|
|
self.log('Clearing pg_upmap on %s' % pg)
|
|
self.ceph_manager.raw_cluster_cmd(
|
|
'osd',
|
|
'rm-pg-upmap',
|
|
pg)
|
|
else:
|
|
self.log('No pg_upmap entries; doing nothing')
|
|
except CommandFailedError:
|
|
self.log('Failed to rm-pg-upmap, ignoring')
|
|
|
|
def thrash_pg_upmap_items(self):
|
|
"""
|
|
Install or remove random pg_upmap_items entries in OSDMap
|
|
"""
|
|
self.log("thrash_pg_upmap_items")
|
|
from random import shuffle
|
|
out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
|
|
j = json.loads(out)
|
|
self.log('j is %s' % j)
|
|
try:
|
|
if random.random() >= .3:
|
|
pgs = self.ceph_manager.get_pg_stats()
|
|
if not pgs:
|
|
self.log('No pgs; doing nothing')
|
|
return
|
|
pg = random.choice(pgs)
|
|
pgid = str(pg['pgid'])
|
|
poolid = int(pgid.split('.')[0])
|
|
sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
|
|
if len(sizes) == 0:
|
|
self.log('No pools; doing nothing')
|
|
return
|
|
n = sizes[0]
|
|
osds = self.in_osds + self.out_osds
|
|
shuffle(osds)
|
|
osds = osds[0:n*2]
|
|
self.log('Setting %s to %s' % (pgid, osds))
|
|
cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
|
|
self.log('cmd %s' % cmd)
|
|
self.ceph_manager.raw_cluster_cmd(*cmd)
|
|
else:
|
|
m = j['pg_upmap_items']
|
|
if len(m) > 0:
|
|
shuffle(m)
|
|
pg = m[0]['pgid']
|
|
self.log('Clearing pg_upmap on %s' % pg)
|
|
self.ceph_manager.raw_cluster_cmd(
|
|
'osd',
|
|
'rm-pg-upmap-items',
|
|
pg)
|
|
else:
|
|
self.log('No pg_upmap entries; doing nothing')
|
|
except CommandFailedError:
|
|
self.log('Failed to rm-pg-upmap-items, ignoring')
|
|
|
|
def force_recovery(self):
|
|
"""
|
|
Force recovery on some of PGs
|
|
"""
|
|
backfill = random.random() >= 0.5
|
|
j = self.ceph_manager.get_pgids_to_force(backfill)
|
|
if j:
|
|
try:
|
|
if backfill:
|
|
self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
|
|
else:
|
|
self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
|
|
except CommandFailedError:
|
|
self.log('Failed to force backfill|recovery, ignoring')
|
|
|
|
|
|
def cancel_force_recovery(self):
|
|
"""
|
|
Force recovery on some of PGs
|
|
"""
|
|
backfill = random.random() >= 0.5
|
|
j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
|
|
if j:
|
|
try:
|
|
if backfill:
|
|
self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
|
|
else:
|
|
self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
|
|
except CommandFailedError:
|
|
self.log('Failed to force backfill|recovery, ignoring')
|
|
|
|
def force_cancel_recovery(self):
|
|
"""
|
|
Force or cancel forcing recovery
|
|
"""
|
|
if random.random() >= 0.4:
|
|
self.force_recovery()
|
|
else:
|
|
self.cancel_force_recovery()
|
|
|
|
def all_up(self):
|
|
"""
|
|
Make sure all osds are up and not out.
|
|
"""
|
|
while len(self.dead_osds) > 0:
|
|
self.log("reviving osd")
|
|
self.revive_osd()
|
|
while len(self.out_osds) > 0:
|
|
self.log("inning osd")
|
|
self.in_osd()
|
|
|
|
def all_up_in(self):
|
|
"""
|
|
Make sure all osds are up and fully in.
|
|
"""
|
|
self.all_up();
|
|
for osd in self.live_osds:
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
|
|
str(osd), str(1))
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
|
|
str(osd), str(1))
|
|
|
|
def do_join(self):
|
|
"""
|
|
Break out of this Ceph loop
|
|
"""
|
|
self.stopping = True
|
|
self.thread.get()
|
|
if self.sighup_delay:
|
|
self.log("joining the do_sighup greenlet")
|
|
self.sighup_thread.get()
|
|
if self.optrack_toggle_delay:
|
|
self.log("joining the do_optrack_toggle greenlet")
|
|
self.optrack_toggle_thread.join()
|
|
if self.dump_ops_enable == "true":
|
|
self.log("joining the do_dump_ops greenlet")
|
|
self.dump_ops_thread.join()
|
|
if self.noscrub_toggle_delay:
|
|
self.log("joining the do_noscrub_toggle greenlet")
|
|
self.noscrub_toggle_thread.join()
|
|
|
|
def grow_pool(self):
|
|
"""
|
|
Increase the size of the pool
|
|
"""
|
|
pool = self.ceph_manager.get_pool()
|
|
if pool is None:
|
|
return
|
|
self.log("Growing pool %s" % (pool,))
|
|
if self.ceph_manager.expand_pool(pool,
|
|
self.config.get('pool_grow_by', 10),
|
|
self.max_pgs):
|
|
self.pools_to_fix_pgp_num.add(pool)
|
|
|
|
def shrink_pool(self):
|
|
"""
|
|
Decrease the size of the pool
|
|
"""
|
|
pool = self.ceph_manager.get_pool()
|
|
if pool is None:
|
|
return
|
|
_ = self.ceph_manager.get_pool_pg_num(pool)
|
|
self.log("Shrinking pool %s" % (pool,))
|
|
if self.ceph_manager.contract_pool(
|
|
pool,
|
|
self.config.get('pool_shrink_by', 10),
|
|
self.min_pgs):
|
|
self.pools_to_fix_pgp_num.add(pool)
|
|
|
|
def fix_pgp_num(self, pool=None):
|
|
"""
|
|
Fix number of pgs in pool.
|
|
"""
|
|
if pool is None:
|
|
pool = self.ceph_manager.get_pool()
|
|
if not pool:
|
|
return
|
|
force = False
|
|
else:
|
|
force = True
|
|
self.log("fixing pg num pool %s" % (pool,))
|
|
if self.ceph_manager.set_pool_pgpnum(pool, force):
|
|
self.pools_to_fix_pgp_num.discard(pool)
|
|
|
|
def test_pool_min_size(self):
|
|
"""
|
|
Loop to selectively push PGs below their min_size and test that recovery
|
|
still occurs.
|
|
"""
|
|
self.log("test_pool_min_size")
|
|
self.all_up()
|
|
time.sleep(60) # buffer time for recovery to start.
|
|
self.ceph_manager.wait_for_recovery(
|
|
timeout=self.config.get('timeout')
|
|
)
|
|
minout = int(self.config.get("min_out", 1))
|
|
minlive = int(self.config.get("min_live", 2))
|
|
mindead = int(self.config.get("min_dead", 1))
|
|
self.log("doing min_size thrashing")
|
|
self.ceph_manager.wait_for_clean(timeout=180)
|
|
assert self.ceph_manager.is_clean(), \
|
|
'not clean before minsize thrashing starts'
|
|
while not self.stopping:
|
|
# look up k and m from all the pools on each loop, in case it
|
|
# changes as the cluster runs
|
|
k = 0
|
|
m = 99
|
|
has_pools = False
|
|
pools_json = self.ceph_manager.get_osd_dump_json()['pools']
|
|
|
|
for pool_json in pools_json:
|
|
pool = pool_json['pool_name']
|
|
has_pools = True
|
|
pool_type = pool_json['type'] # 1 for rep, 3 for ec
|
|
min_size = pool_json['min_size']
|
|
self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
|
|
try:
|
|
ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
|
|
if pool_type != PoolType.ERASURE_CODED:
|
|
continue
|
|
ec_profile = pool_json['erasure_code_profile']
|
|
ec_profile_json = self.ceph_manager.raw_cluster_cmd(
|
|
'osd',
|
|
'erasure-code-profile',
|
|
'get',
|
|
ec_profile,
|
|
'--format=json')
|
|
ec_json = json.loads(ec_profile_json)
|
|
local_k = int(ec_json['k'])
|
|
local_m = int(ec_json['m'])
|
|
self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
|
|
k=local_k, m=local_m))
|
|
if local_k > k:
|
|
self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
|
|
k = local_k
|
|
if local_m < m:
|
|
self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
|
|
m = local_m
|
|
except CommandFailedError:
|
|
self.log("failed to read erasure_code_profile. %s was likely removed", pool)
|
|
continue
|
|
|
|
if has_pools :
|
|
self.log("using k={k}, m={m}".format(k=k,m=m))
|
|
else:
|
|
self.log("No pools yet, waiting")
|
|
time.sleep(5)
|
|
continue
|
|
|
|
if minout > len(self.out_osds): # kill OSDs and mark out
|
|
self.log("forced to out an osd")
|
|
self.kill_osd(mark_out=True)
|
|
continue
|
|
elif mindead > len(self.dead_osds): # kill OSDs but force timeout
|
|
self.log("forced to kill an osd")
|
|
self.kill_osd()
|
|
continue
|
|
else: # make mostly-random choice to kill or revive OSDs
|
|
minup = max(minlive, k)
|
|
rand_val = random.uniform(0, 1)
|
|
self.log("choosing based on number of live OSDs and rand val {rand}".\
|
|
format(rand=rand_val))
|
|
if len(self.live_osds) > minup+1 and rand_val < 0.5:
|
|
# chose to knock out as many OSDs as we can w/out downing PGs
|
|
|
|
most_killable = min(len(self.live_osds) - minup, m)
|
|
self.log("chose to kill {n} OSDs".format(n=most_killable))
|
|
for i in range(1, most_killable):
|
|
self.kill_osd(mark_out=True)
|
|
time.sleep(10)
|
|
# try a few times since there might be a concurrent pool
|
|
# creation or deletion
|
|
with safe_while(
|
|
sleep=25, tries=5,
|
|
action='check for active or peered') as proceed:
|
|
while proceed():
|
|
if self.ceph_manager.all_active_or_peered():
|
|
break
|
|
self.log('not all PGs are active or peered')
|
|
else: # chose to revive OSDs, bring up a random fraction of the dead ones
|
|
self.log("chose to revive osds")
|
|
for i in range(1, int(rand_val * len(self.dead_osds))):
|
|
self.revive_osd(i)
|
|
|
|
# let PGs repair themselves or our next knockout might kill one
|
|
self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
|
|
|
|
# / while not self.stopping
|
|
self.all_up_in()
|
|
|
|
self.ceph_manager.wait_for_recovery(
|
|
timeout=self.config.get('timeout')
|
|
)
|
|
|
|
def inject_pause(self, conf_key, duration, check_after, should_be_down):
|
|
"""
|
|
Pause injection testing. Check for osd being down when finished.
|
|
"""
|
|
the_one = random.choice(self.live_osds)
|
|
self.log("inject_pause on osd.{osd}".format(osd=the_one))
|
|
self.log(
|
|
"Testing {key} pause injection for duration {duration}".format(
|
|
key=conf_key,
|
|
duration=duration
|
|
))
|
|
self.log(
|
|
"Checking after {after}, should_be_down={shouldbedown}".format(
|
|
after=check_after,
|
|
shouldbedown=should_be_down
|
|
))
|
|
self.ceph_manager.set_config(the_one, **{conf_key: duration})
|
|
if not should_be_down:
|
|
return
|
|
time.sleep(check_after)
|
|
status = self.ceph_manager.get_osd_status()
|
|
assert the_one in status['down']
|
|
time.sleep(duration - check_after + 20)
|
|
status = self.ceph_manager.get_osd_status()
|
|
assert not the_one in status['down']
|
|
|
|
def test_backfill_full(self):
|
|
"""
|
|
Test backfills stopping when the replica fills up.
|
|
|
|
First, use injectfull admin command to simulate a now full
|
|
osd by setting it to 0 on all of the OSDs.
|
|
|
|
Second, on a random subset, set
|
|
osd_debug_skip_full_check_in_backfill_reservation to force
|
|
the more complicated check in do_scan to be exercised.
|
|
|
|
Then, verify that all backfillings stop.
|
|
"""
|
|
self.log("injecting backfill full")
|
|
for i in self.live_osds:
|
|
self.ceph_manager.set_config(
|
|
i,
|
|
osd_debug_skip_full_check_in_backfill_reservation=
|
|
random.choice(['false', 'true']))
|
|
self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
|
|
check_status=True, timeout=30, stdout=DEVNULL)
|
|
for i in range(30):
|
|
status = self.ceph_manager.compile_pg_status()
|
|
if 'backfilling' not in status.keys():
|
|
break
|
|
self.log(
|
|
"waiting for {still_going} backfillings".format(
|
|
still_going=status.get('backfilling')))
|
|
time.sleep(1)
|
|
assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
|
|
for i in self.live_osds:
|
|
self.ceph_manager.set_config(
|
|
i,
|
|
osd_debug_skip_full_check_in_backfill_reservation='false')
|
|
self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
|
|
check_status=True, timeout=30, stdout=DEVNULL)
|
|
|
|
|
|
def generate_random_sharding(self):
|
|
prefixes = [
|
|
'm','O','P','L'
|
|
]
|
|
new_sharding = ''
|
|
for prefix in prefixes:
|
|
choose = random.choice([False, True])
|
|
if not choose:
|
|
continue
|
|
if new_sharding != '':
|
|
new_sharding = new_sharding + ' '
|
|
columns = random.randint(1, 5)
|
|
do_hash = random.choice([False, True])
|
|
if do_hash:
|
|
low_hash = random.choice([0, 5, 8])
|
|
do_high_hash = random.choice([False, True])
|
|
if do_high_hash:
|
|
high_hash = random.choice([8, 16, 30]) + low_hash
|
|
new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-' + str(high_hash) + ')'
|
|
else:
|
|
new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-)'
|
|
else:
|
|
if columns == 1:
|
|
new_sharding = new_sharding + prefix
|
|
else:
|
|
new_sharding = new_sharding + prefix + '(' + str(columns) + ')'
|
|
return new_sharding
|
|
|
|
def test_bluestore_reshard_action(self):
|
|
"""
|
|
Test if resharding of bluestore works properly.
|
|
If bluestore is not used, or bluestore is in version that
|
|
does not support sharding, skip.
|
|
"""
|
|
|
|
osd = random.choice(self.dead_osds)
|
|
remote = self.ceph_manager.find_remote('osd', osd)
|
|
FSPATH = self.ceph_manager.get_filepath()
|
|
|
|
prefix = [
|
|
'--no-mon-config',
|
|
'--log-file=/var/log/ceph/bluestore_tool.$pid.log',
|
|
'--log-level=10',
|
|
'--path', FSPATH.format(id=osd)
|
|
]
|
|
|
|
# sanity check if bluestore-tool accessible
|
|
self.log('checking if target objectstore is bluestore on osd.%s' % osd)
|
|
cmd = prefix + [
|
|
'show-label'
|
|
]
|
|
proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
|
|
if proc.exitstatus != 0:
|
|
raise Exception("ceph-bluestore-tool access failed.")
|
|
|
|
# check if sharding is possible
|
|
self.log('checking if target bluestore supports sharding on osd.%s' % osd)
|
|
cmd = prefix + [
|
|
'show-sharding'
|
|
]
|
|
proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
|
|
if proc.exitstatus != 0:
|
|
self.log("Unable to test resharding, "
|
|
"ceph-bluestore-tool does not support it.")
|
|
return
|
|
|
|
# now go for reshard to something else
|
|
self.log('applying new sharding to bluestore on osd.%s' % osd)
|
|
new_sharding = self.config.get('bluestore_new_sharding','random')
|
|
|
|
if new_sharding == 'random':
|
|
self.log('generate random sharding')
|
|
new_sharding = self.generate_random_sharding()
|
|
|
|
self.log("applying new sharding: " + new_sharding)
|
|
cmd = prefix + [
|
|
'--sharding', new_sharding,
|
|
'reshard'
|
|
]
|
|
proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
|
|
if proc.exitstatus != 0:
|
|
raise Exception("ceph-bluestore-tool resharding failed.")
|
|
|
|
# now do fsck to
|
|
self.log('running fsck to verify new sharding on osd.%s' % osd)
|
|
cmd = prefix + [
|
|
'fsck'
|
|
]
|
|
proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
|
|
if proc.exitstatus != 0:
|
|
raise Exception("ceph-bluestore-tool fsck failed.")
|
|
self.log('resharding successfully completed')
|
|
|
|
def test_bluestore_reshard(self):
|
|
"""
|
|
1) kills an osd
|
|
2) reshards bluestore on killed osd
|
|
3) revives the osd
|
|
"""
|
|
self.log('test_bluestore_reshard started')
|
|
self.kill_osd(mark_down=True, mark_out=True)
|
|
self.test_bluestore_reshard_action()
|
|
self.revive_osd()
|
|
self.log('test_bluestore_reshard completed')
|
|
|
|
|
|
def test_map_discontinuity(self):
|
|
"""
|
|
1) Allows the osds to recover
|
|
2) kills an osd
|
|
3) allows the remaining osds to recover
|
|
4) waits for some time
|
|
5) revives the osd
|
|
This sequence should cause the revived osd to have to handle
|
|
a map gap since the mons would have trimmed
|
|
"""
|
|
self.log("test_map_discontinuity")
|
|
while len(self.in_osds) < (self.minin + 1):
|
|
self.in_osd()
|
|
self.log("Waiting for recovery")
|
|
self.ceph_manager.wait_for_all_osds_up(
|
|
timeout=self.config.get('timeout')
|
|
)
|
|
# now we wait 20s for the pg status to change, if it takes longer,
|
|
# the test *should* fail!
|
|
time.sleep(20)
|
|
self.ceph_manager.wait_for_clean(
|
|
timeout=self.config.get('timeout')
|
|
)
|
|
|
|
# now we wait 20s for the backfill replicas to hear about the clean
|
|
time.sleep(20)
|
|
self.log("Recovered, killing an osd")
|
|
self.kill_osd(mark_down=True, mark_out=True)
|
|
self.log("Waiting for clean again")
|
|
self.ceph_manager.wait_for_clean(
|
|
timeout=self.config.get('timeout')
|
|
)
|
|
self.log("Waiting for trim")
|
|
time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
|
|
self.revive_osd()
|
|
|
|
def choose_action(self):
|
|
"""
|
|
Random action selector.
|
|
"""
|
|
chance_down = self.config.get('chance_down', 0.4)
|
|
_ = self.config.get('chance_test_min_size', 0)
|
|
chance_test_backfill_full = \
|
|
self.config.get('chance_test_backfill_full', 0)
|
|
if isinstance(chance_down, int):
|
|
chance_down = float(chance_down) / 100
|
|
minin = self.minin
|
|
minout = int(self.config.get("min_out", 0))
|
|
minlive = int(self.config.get("min_live", 2))
|
|
mindead = int(self.config.get("min_dead", 0))
|
|
|
|
self.log('choose_action: min_in %d min_out '
|
|
'%d min_live %d min_dead %d '
|
|
'chance_down %.2f' %
|
|
(minin, minout, minlive, mindead, chance_down))
|
|
actions = []
|
|
if len(self.in_osds) > minin:
|
|
actions.append((self.out_osd, 1.0,))
|
|
if len(self.live_osds) > minlive and chance_down > 0:
|
|
actions.append((self.kill_osd, chance_down,))
|
|
if len(self.out_osds) > minout:
|
|
actions.append((self.in_osd, 1.7,))
|
|
if len(self.dead_osds) > mindead:
|
|
actions.append((self.revive_osd, 1.0,))
|
|
if self.config.get('thrash_primary_affinity', True):
|
|
actions.append((self.primary_affinity, 1.0,))
|
|
actions.append((self.reweight_osd_or_by_util,
|
|
self.config.get('reweight_osd', .5),))
|
|
actions.append((self.grow_pool,
|
|
self.config.get('chance_pgnum_grow', 0),))
|
|
actions.append((self.shrink_pool,
|
|
self.config.get('chance_pgnum_shrink', 0),))
|
|
actions.append((self.fix_pgp_num,
|
|
self.config.get('chance_pgpnum_fix', 0),))
|
|
actions.append((self.test_pool_min_size,
|
|
self.config.get('chance_test_min_size', 0),))
|
|
actions.append((self.test_backfill_full,
|
|
chance_test_backfill_full,))
|
|
if self.chance_thrash_cluster_full > 0:
|
|
actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
|
|
if self.chance_thrash_pg_upmap > 0:
|
|
actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
|
|
if self.chance_thrash_pg_upmap_items > 0:
|
|
actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
|
|
if self.chance_force_recovery > 0:
|
|
actions.append((self.force_cancel_recovery, self.chance_force_recovery))
|
|
|
|
for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
|
|
for scenario in [
|
|
(lambda:
|
|
self.inject_pause(key,
|
|
self.config.get('pause_short', 3),
|
|
0,
|
|
False),
|
|
self.config.get('chance_inject_pause_short', 1),),
|
|
(lambda:
|
|
self.inject_pause(key,
|
|
self.config.get('pause_long', 80),
|
|
self.config.get('pause_check_after', 70),
|
|
True),
|
|
self.config.get('chance_inject_pause_long', 0),)]:
|
|
actions.append(scenario)
|
|
|
|
# only consider resharding if objectstore is bluestore
|
|
cluster_name = self.ceph_manager.cluster
|
|
cluster = self.ceph_manager.ctx.ceph[cluster_name]
|
|
if cluster.conf.get('osd', {}).get('osd objectstore', 'bluestore') == 'bluestore':
|
|
actions.append((self.test_bluestore_reshard,
|
|
self.config.get('chance_bluestore_reshard', 0),))
|
|
|
|
total = sum([y for (x, y) in actions])
|
|
val = random.uniform(0, total)
|
|
for (action, prob) in actions:
|
|
if val < prob:
|
|
return action
|
|
val -= prob
|
|
return None
|
|
|
|
def do_thrash(self):
|
|
"""
|
|
_do_thrash() wrapper.
|
|
"""
|
|
try:
|
|
self._do_thrash()
|
|
except Exception as e:
|
|
# See _run exception comment for MDSThrasher
|
|
self.set_thrasher_exception(e)
|
|
self.logger.exception("exception:")
|
|
# Allow successful completion so gevent doesn't see an exception.
|
|
# The DaemonWatchdog will observe the error and tear down the test.
|
|
|
|
@log_exc
|
|
def do_sighup(self):
|
|
"""
|
|
Loops and sends signal.SIGHUP to a random live osd.
|
|
|
|
Loop delay is controlled by the config value sighup_delay.
|
|
"""
|
|
delay = float(self.sighup_delay)
|
|
self.log("starting do_sighup with a delay of {0}".format(delay))
|
|
while not self.stopping:
|
|
osd = random.choice(self.live_osds)
|
|
self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
|
|
time.sleep(delay)
|
|
|
|
@log_exc
|
|
def do_optrack_toggle(self):
|
|
"""
|
|
Loops and toggle op tracking to all osds.
|
|
|
|
Loop delay is controlled by the config value optrack_toggle_delay.
|
|
"""
|
|
delay = float(self.optrack_toggle_delay)
|
|
osd_state = "true"
|
|
self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
|
|
while not self.stopping:
|
|
if osd_state == "true":
|
|
osd_state = "false"
|
|
else:
|
|
osd_state = "true"
|
|
try:
|
|
self.ceph_manager.inject_args('osd', '*',
|
|
'osd_enable_op_tracker',
|
|
osd_state)
|
|
except CommandFailedError:
|
|
self.log('Failed to tell all osds, ignoring')
|
|
gevent.sleep(delay)
|
|
|
|
@log_exc
|
|
def do_dump_ops(self):
|
|
"""
|
|
Loops and does op dumps on all osds
|
|
"""
|
|
self.log("starting do_dump_ops")
|
|
while not self.stopping:
|
|
for osd in self.live_osds:
|
|
# Ignore errors because live_osds is in flux
|
|
self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
|
|
check_status=False, timeout=30, stdout=DEVNULL)
|
|
self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
|
|
check_status=False, timeout=30, stdout=DEVNULL)
|
|
self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
|
|
check_status=False, timeout=30, stdout=DEVNULL)
|
|
gevent.sleep(0)
|
|
|
|
@log_exc
|
|
def do_noscrub_toggle(self):
|
|
"""
|
|
Loops and toggle noscrub flags
|
|
|
|
Loop delay is controlled by the config value noscrub_toggle_delay.
|
|
"""
|
|
delay = float(self.noscrub_toggle_delay)
|
|
scrub_state = "none"
|
|
self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
|
|
while not self.stopping:
|
|
if scrub_state == "none":
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
|
|
scrub_state = "noscrub"
|
|
elif scrub_state == "noscrub":
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
|
|
scrub_state = "both"
|
|
elif scrub_state == "both":
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
|
|
scrub_state = "nodeep-scrub"
|
|
else:
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
|
|
scrub_state = "none"
|
|
gevent.sleep(delay)
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
|
|
|
|
@log_exc
|
|
def _do_thrash(self):
|
|
"""
|
|
Loop to select random actions to thrash ceph manager with.
|
|
"""
|
|
cleanint = self.config.get("clean_interval", 60)
|
|
scrubint = self.config.get("scrub_interval", -1)
|
|
maxdead = self.config.get("max_dead", 0)
|
|
delay = self.config.get("op_delay", 5)
|
|
self.rerrosd = self.live_osds[0]
|
|
if self.random_eio > 0:
|
|
self.ceph_manager.inject_args('osd', self.rerrosd,
|
|
'filestore_debug_random_read_err',
|
|
self.random_eio)
|
|
self.ceph_manager.inject_args('osd', self.rerrosd,
|
|
'bluestore_debug_random_read_err',
|
|
self.random_eio)
|
|
self.log("starting do_thrash")
|
|
while not self.stopping:
|
|
to_log = [str(x) for x in ["in_osds: ", self.in_osds,
|
|
"out_osds: ", self.out_osds,
|
|
"dead_osds: ", self.dead_osds,
|
|
"live_osds: ", self.live_osds]]
|
|
self.log(" ".join(to_log))
|
|
if random.uniform(0, 1) < (float(delay) / cleanint):
|
|
while len(self.dead_osds) > maxdead:
|
|
self.revive_osd()
|
|
for osd in self.in_osds:
|
|
self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
|
|
str(osd), str(1))
|
|
if random.uniform(0, 1) < float(
|
|
self.config.get('chance_test_map_discontinuity', 0)) \
|
|
and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
|
|
self.test_map_discontinuity()
|
|
else:
|
|
self.ceph_manager.wait_for_recovery(
|
|
timeout=self.config.get('timeout')
|
|
)
|
|
time.sleep(self.clean_wait)
|
|
if scrubint > 0:
|
|
if random.uniform(0, 1) < (float(delay) / scrubint):
|
|
self.log('Scrubbing while thrashing being performed')
|
|
Scrubber(self.ceph_manager, self.config)
|
|
self.choose_action()()
|
|
time.sleep(delay)
|
|
self.all_up()
|
|
if self.random_eio > 0:
|
|
self.ceph_manager.inject_args('osd', self.rerrosd,
|
|
'filestore_debug_random_read_err', '0.0')
|
|
self.ceph_manager.inject_args('osd', self.rerrosd,
|
|
'bluestore_debug_random_read_err', '0.0')
|
|
for pool in list(self.pools_to_fix_pgp_num):
|
|
if self.ceph_manager.get_pool_pg_num(pool) > 0:
|
|
self.fix_pgp_num(pool)
|
|
self.pools_to_fix_pgp_num.clear()
|
|
for service, opt, saved_value in self.saved_options:
|
|
self.ceph_manager.inject_args(service, '*', opt, saved_value)
|
|
self.saved_options = []
|
|
self.all_up_in()
|
|
|
|
|
|
class ObjectStoreTool:
|
|
|
|
def __init__(self, manager, pool, **kwargs):
|
|
self.manager = manager
|
|
self.pool = pool
|
|
self.osd = kwargs.get('osd', None)
|
|
self.object_name = kwargs.get('object_name', None)
|
|
self.do_revive = kwargs.get('do_revive', True)
|
|
if self.osd and self.pool and self.object_name:
|
|
if self.osd == "primary":
|
|
self.osd = self.manager.get_object_primary(self.pool,
|
|
self.object_name)
|
|
assert self.osd is not None
|
|
if self.object_name:
|
|
self.pgid = self.manager.get_object_pg_with_shard(self.pool,
|
|
self.object_name,
|
|
self.osd)
|
|
self.remote = next(iter(self.manager.ctx.\
|
|
cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()))
|
|
path = self.manager.get_filepath().format(id=self.osd)
|
|
self.paths = ("--data-path {path} --journal-path {path}/journal".
|
|
format(path=path))
|
|
|
|
def build_cmd(self, options, args, stdin):
|
|
lines = []
|
|
if self.object_name:
|
|
lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
|
|
"{paths} --pgid {pgid} --op list |"
|
|
"grep '\"oid\":\"{name}\"')".
|
|
format(paths=self.paths,
|
|
pgid=self.pgid,
|
|
name=self.object_name))
|
|
args = '"$object" ' + args
|
|
options += " --pgid {pgid}".format(pgid=self.pgid)
|
|
cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
|
|
format(paths=self.paths,
|
|
args=args,
|
|
options=options))
|
|
if stdin:
|
|
cmd = ("echo {payload} | base64 --decode | {cmd}".
|
|
format(payload=base64.encode(stdin),
|
|
cmd=cmd))
|
|
lines.append(cmd)
|
|
return "\n".join(lines)
|
|
|
|
def run(self, options, args):
|
|
self.manager.kill_osd(self.osd)
|
|
cmd = self.build_cmd(options, args, None)
|
|
self.manager.log(cmd)
|
|
try:
|
|
proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
|
|
check_status=False,
|
|
stdout=BytesIO(),
|
|
stderr=BytesIO())
|
|
proc.wait()
|
|
if proc.exitstatus != 0:
|
|
self.manager.log("failed with " + str(proc.exitstatus))
|
|
error = proc.stdout.getvalue().decode() + " " + \
|
|
proc.stderr.getvalue().decode()
|
|
raise Exception(error)
|
|
finally:
|
|
if self.do_revive:
|
|
self.manager.revive_osd(self.osd)
|
|
self.manager.wait_till_osd_is_up(self.osd, 300)
|
|
|
|
|
|
# XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
|
|
# the same name.
|
|
class CephManager:
|
|
"""
|
|
Ceph manager object.
|
|
Contains several local functions that form a bulk of this module.
|
|
|
|
:param controller: the remote machine where the Ceph commands should be
|
|
executed
|
|
:param ctx: the cluster context
|
|
:param config: path to Ceph config file
|
|
:param logger: for logging messages
|
|
:param cluster: name of the Ceph cluster
|
|
"""
|
|
|
|
def __init__(self, controller, ctx=None, config=None, logger=None,
|
|
cluster='ceph', cephadm=False, rook=False) -> None:
|
|
self.lock = threading.RLock()
|
|
self.ctx = ctx
|
|
self.config = config
|
|
self.controller = controller
|
|
self.next_pool_id = 0
|
|
self.cluster = cluster
|
|
|
|
if (logger):
|
|
self.log = lambda x: logger.info(x)
|
|
else:
|
|
def tmp(x):
|
|
"""
|
|
implement log behavior.
|
|
"""
|
|
print(x)
|
|
self.log = tmp
|
|
|
|
if self.config is None:
|
|
self.config = dict()
|
|
|
|
# NOTE: These variables are meant to be overriden by vstart_runner.py.
|
|
self.rook = rook
|
|
self.cephadm = cephadm
|
|
self.testdir = teuthology.get_testdir(self.ctx)
|
|
# prefix args for ceph cmds to be executed
|
|
pre = ['adjust-ulimits', 'ceph-coverage',
|
|
f'{self.testdir}/archive/coverage']
|
|
self.CEPH_CMD = ['sudo'] + pre + ['timeout', '120', 'ceph',
|
|
'--cluster', self.cluster]
|
|
self.RADOS_CMD = pre + ['rados', '--cluster', self.cluster]
|
|
|
|
pools = self.list_pools()
|
|
self.pools = {}
|
|
for pool in pools:
|
|
# we may race with a pool deletion; ignore failures here
|
|
try:
|
|
self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
|
|
except CommandFailedError:
|
|
self.log('Failed to get pg_num from pool %s, ignoring' % pool)
|
|
|
|
def ceph(self, cmd, **kwargs):
|
|
"""
|
|
Simple Ceph admin command wrapper around run_cluster_cmd.
|
|
"""
|
|
|
|
kwargs.pop('args', None)
|
|
args = shlex.split(cmd)
|
|
stdout = kwargs.pop('stdout', StringIO())
|
|
stderr = kwargs.pop('stderr', StringIO())
|
|
return self.run_cluster_cmd(args=args, stdout=stdout, stderr=stderr, **kwargs)
|
|
|
|
def run_cluster_cmd(self, **kwargs):
|
|
"""
|
|
Run a Ceph command and return the object representing the process
|
|
for the command.
|
|
|
|
Accepts arguments same as that of teuthology.orchestra.run.run()
|
|
"""
|
|
if isinstance(kwargs['args'], str):
|
|
kwargs['args'] = shlex.split(kwargs['args'])
|
|
elif isinstance(kwargs['args'], tuple):
|
|
kwargs['args'] = list(kwargs['args'])
|
|
|
|
prefixcmd = []
|
|
timeoutcmd = kwargs.pop('timeoutcmd', None)
|
|
if timeoutcmd is not None:
|
|
prefixcmd += ['timeout', str(timeoutcmd)]
|
|
|
|
if self.cephadm:
|
|
prefixcmd += ['ceph']
|
|
cmd = prefixcmd + list(kwargs['args'])
|
|
return shell(self.ctx, self.cluster, self.controller,
|
|
args=cmd,
|
|
stdout=StringIO(),
|
|
check_status=kwargs.get('check_status', True))
|
|
elif self.rook:
|
|
prefixcmd += ['ceph']
|
|
cmd = prefixcmd + list(kwargs['args'])
|
|
return toolbox(self.ctx, self.cluster,
|
|
args=cmd,
|
|
stdout=StringIO(),
|
|
check_status=kwargs.get('check_status', True))
|
|
else:
|
|
kwargs['args'] = prefixcmd + self.CEPH_CMD + kwargs['args']
|
|
return self.controller.run(**kwargs)
|
|
|
|
def raw_cluster_cmd(self, *args, **kwargs) -> str:
|
|
"""
|
|
Start ceph on a raw cluster. Return count
|
|
"""
|
|
if kwargs.get('args') is None and args:
|
|
kwargs['args'] = args
|
|
kwargs['stdout'] = kwargs.pop('stdout', StringIO())
|
|
return self.run_cluster_cmd(**kwargs).stdout.getvalue()
|
|
|
|
def raw_cluster_cmd_result(self, *args, **kwargs):
|
|
"""
|
|
Start ceph on a cluster. Return success or failure information.
|
|
"""
|
|
if kwargs.get('args') is None and args:
|
|
kwargs['args'] = args
|
|
kwargs['check_status'] = False
|
|
return self.run_cluster_cmd(**kwargs).exitstatus
|
|
|
|
def get_keyring(self, client_id):
|
|
"""
|
|
Return keyring for the given client.
|
|
|
|
:param client_id: str
|
|
:return keyring: str
|
|
"""
|
|
if client_id.find('client.') != -1:
|
|
client_id = client_id.replace('client.', '')
|
|
|
|
keyring = self.run_cluster_cmd(args=f'auth get client.{client_id}',
|
|
stdout=StringIO()).stdout.getvalue()
|
|
|
|
assert isinstance(keyring, str) and keyring != ''
|
|
return keyring
|
|
|
|
def run_ceph_w(self, watch_channel=None):
|
|
"""
|
|
Execute "ceph -w" in the background with stdout connected to a BytesIO,
|
|
and return the RemoteProcess.
|
|
|
|
:param watch_channel: Specifies the channel to be watched. This can be
|
|
'cluster', 'audit', ...
|
|
:type watch_channel: str
|
|
"""
|
|
args = ['sudo', 'daemon-helper', 'kill', 'ceph', '--cluster', self.cluster, '-w']
|
|
if watch_channel is not None:
|
|
args.append("--watch-channel")
|
|
args.append(watch_channel)
|
|
return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
|
|
|
|
def get_mon_socks(self):
|
|
"""
|
|
Get monitor sockets.
|
|
|
|
:return socks: tuple of strings; strings are individual sockets.
|
|
"""
|
|
from json import loads
|
|
|
|
output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
|
|
socks = []
|
|
for mon in output['mons']:
|
|
for addrvec_mem in mon['public_addrs']['addrvec']:
|
|
socks.append(addrvec_mem['addr'])
|
|
return tuple(socks)
|
|
|
|
def get_msgrv1_mon_socks(self):
|
|
"""
|
|
Get monitor sockets that use msgrv1 to operate.
|
|
|
|
:return socks: tuple of strings; strings are individual sockets.
|
|
"""
|
|
from json import loads
|
|
|
|
output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
|
|
socks = []
|
|
for mon in output['mons']:
|
|
for addrvec_mem in mon['public_addrs']['addrvec']:
|
|
if addrvec_mem['type'] == 'v1':
|
|
socks.append(addrvec_mem['addr'])
|
|
return tuple(socks)
|
|
|
|
def get_msgrv2_mon_socks(self):
|
|
"""
|
|
Get monitor sockets that use msgrv2 to operate.
|
|
|
|
:return socks: tuple of strings; strings are individual sockets.
|
|
"""
|
|
from json import loads
|
|
|
|
output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
|
|
socks = []
|
|
for mon in output['mons']:
|
|
for addrvec_mem in mon['public_addrs']['addrvec']:
|
|
if addrvec_mem['type'] == 'v2':
|
|
socks.append(addrvec_mem['addr'])
|
|
return tuple(socks)
|
|
|
|
def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
|
|
"""
|
|
Flush pg stats from a list of OSD ids, ensuring they are reflected
|
|
all the way to the monitor. Luminous and later only.
|
|
|
|
:param osds: list of OSDs to flush
|
|
:param no_wait: list of OSDs not to wait for seq id. by default, we
|
|
wait for all specified osds, but some of them could be
|
|
moved out of osdmap, so we cannot get their updated
|
|
stat seq from monitor anymore. in that case, you need
|
|
to pass a blocklist.
|
|
:param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
|
|
it. (5 min by default)
|
|
"""
|
|
if no_wait is None:
|
|
no_wait = []
|
|
|
|
def flush_one_osd(osd: int, wait_for_mon: int):
|
|
need = int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
|
|
if not wait_for_mon:
|
|
return
|
|
if osd in no_wait:
|
|
return
|
|
got = 0
|
|
while wait_for_mon > 0:
|
|
got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
|
|
self.log('need seq {need} got {got} for osd.{osd}'.format(
|
|
need=need, got=got, osd=osd))
|
|
if got >= need:
|
|
break
|
|
A_WHILE = 1
|
|
time.sleep(A_WHILE)
|
|
wait_for_mon -= A_WHILE
|
|
else:
|
|
raise Exception('timed out waiting for mon to be updated with '
|
|
'osd.{osd}: {got} < {need}'.
|
|
format(osd=osd, got=got, need=need))
|
|
|
|
with parallel() as p:
|
|
for osd in osds:
|
|
p.spawn(flush_one_osd, osd, wait_for_mon)
|
|
|
|
def flush_all_pg_stats(self):
|
|
self.flush_pg_stats(range(len(self.get_osd_dump())))
|
|
|
|
def do_rados(self, cmd, pool=None, namespace=None, remote=None, **kwargs):
|
|
"""
|
|
Execute a remote rados command.
|
|
"""
|
|
if remote is None:
|
|
remote = self.controller
|
|
|
|
pre = self.RADOS_CMD + [] # deep-copying!
|
|
if pool is not None:
|
|
pre += ['--pool', pool]
|
|
if namespace is not None:
|
|
pre += ['--namespace', namespace]
|
|
pre.extend(cmd)
|
|
proc = remote.run(
|
|
args=pre,
|
|
wait=True,
|
|
**kwargs
|
|
)
|
|
return proc
|
|
|
|
def rados_write_objects(self, pool, num_objects, size,
|
|
timelimit, threads, cleanup=False):
|
|
"""
|
|
Write rados objects
|
|
Threads not used yet.
|
|
"""
|
|
args = [
|
|
'--num-objects', num_objects,
|
|
'-b', size,
|
|
'bench', timelimit,
|
|
'write'
|
|
]
|
|
if not cleanup:
|
|
args.append('--no-cleanup')
|
|
return self.do_rados(map(str, args), pool=pool)
|
|
|
|
def do_put(self, pool, obj, fname, namespace=None):
|
|
"""
|
|
Implement rados put operation
|
|
"""
|
|
args = ['put', obj, fname]
|
|
return self.do_rados(
|
|
args,
|
|
check_status=False,
|
|
pool=pool,
|
|
namespace=namespace
|
|
).exitstatus
|
|
|
|
def do_get(self, pool, obj, fname='/dev/null', namespace=None):
|
|
"""
|
|
Implement rados get operation
|
|
"""
|
|
args = ['get', obj, fname]
|
|
return self.do_rados(
|
|
args,
|
|
check_status=False,
|
|
pool=pool,
|
|
namespace=namespace,
|
|
).exitstatus
|
|
|
|
def do_rm(self, pool, obj, namespace=None):
|
|
"""
|
|
Implement rados rm operation
|
|
"""
|
|
args = ['rm', obj]
|
|
return self.do_rados(
|
|
args,
|
|
check_status=False,
|
|
pool=pool,
|
|
namespace=namespace
|
|
).exitstatus
|
|
|
|
def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
|
|
if stdout is None:
|
|
stdout = StringIO()
|
|
return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
|
|
|
|
def find_remote(self, service_type, service_id):
|
|
"""
|
|
Get the Remote for the host where a particular service runs.
|
|
|
|
:param service_type: 'mds', 'osd', 'client'
|
|
:param service_id: The second part of a role, e.g. '0' for
|
|
the role 'client.0'
|
|
:return: a Remote instance for the host where the
|
|
requested role is placed
|
|
"""
|
|
return get_remote(self.ctx, self.cluster,
|
|
service_type, service_id)
|
|
|
|
def admin_socket(self, service_type, service_id,
|
|
command, check_status=True, timeout=0, stdout=None):
|
|
"""
|
|
Remotely start up ceph specifying the admin socket
|
|
:param command: a list of words to use as the command
|
|
to the admin socket
|
|
"""
|
|
if stdout is None:
|
|
stdout = StringIO()
|
|
|
|
remote = self.find_remote(service_type, service_id)
|
|
|
|
if self.cephadm:
|
|
return shell(
|
|
self.ctx, self.cluster, remote,
|
|
args=[
|
|
'ceph', 'daemon', '%s.%s' % (service_type, service_id),
|
|
] + command,
|
|
stdout=stdout,
|
|
wait=True,
|
|
check_status=check_status,
|
|
)
|
|
if self.rook:
|
|
assert False, 'not implemented'
|
|
|
|
args = [
|
|
'sudo',
|
|
'adjust-ulimits',
|
|
'ceph-coverage',
|
|
f'{self.testdir}/archive/coverage',
|
|
'timeout',
|
|
str(timeout),
|
|
'ceph',
|
|
'--cluster',
|
|
self.cluster,
|
|
'--admin-daemon',
|
|
'/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
|
|
cluster=self.cluster,
|
|
type=service_type,
|
|
id=service_id),
|
|
]
|
|
args.extend(command)
|
|
return remote.run(
|
|
args=args,
|
|
stdout=stdout,
|
|
wait=True,
|
|
check_status=check_status
|
|
)
|
|
|
|
def objectstore_tool(self, pool, options, args, **kwargs):
|
|
return ObjectStoreTool(self, pool, **kwargs).run(options, args)
|
|
|
|
def get_pgid(self, pool, pgnum):
|
|
"""
|
|
:param pool: pool name
|
|
:param pgnum: pg number
|
|
:returns: a string representing this pg.
|
|
"""
|
|
poolnum = self.get_pool_num(pool)
|
|
pg_str = "{poolnum}.{pgnum}".format(
|
|
poolnum=poolnum,
|
|
pgnum=pgnum)
|
|
return pg_str
|
|
|
|
def get_pg_replica(self, pool, pgnum):
|
|
"""
|
|
get replica for pool, pgnum (e.g. (data, 0)->0
|
|
"""
|
|
pg_str = self.get_pgid(pool, pgnum)
|
|
output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
|
|
j = json.loads('\n'.join(output.split('\n')[1:]))
|
|
return int(j['acting'][-1])
|
|
assert False
|
|
|
|
def wait_for_pg_stats(func):
|
|
# both osd_mon_report_interval and mgr_stats_period are 5 seconds
|
|
# by default, and take the faulty injection in ms into consideration,
|
|
# 12 seconds are more than enough
|
|
delays = [1, 1, 2, 3, 5, 8, 13, 0]
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
exc = None
|
|
for delay in delays:
|
|
try:
|
|
return func(self, *args, **kwargs)
|
|
except AssertionError as e:
|
|
time.sleep(delay)
|
|
exc = e
|
|
raise exc
|
|
return wrapper
|
|
|
|
def get_pg_primary(self, pool, pgnum):
|
|
"""
|
|
get primary for pool, pgnum (e.g. (data, 0)->0
|
|
"""
|
|
pg_str = self.get_pgid(pool, pgnum)
|
|
output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
|
|
j = json.loads('\n'.join(output.split('\n')[1:]))
|
|
return int(j['acting'][0])
|
|
assert False
|
|
|
|
def get_pool_num(self, pool):
|
|
"""
|
|
get number for pool (e.g., data -> 2)
|
|
"""
|
|
return int(self.get_pool_dump(pool)['pool'])
|
|
|
|
def list_pools(self):
|
|
"""
|
|
list all pool names
|
|
"""
|
|
osd_dump = self.get_osd_dump_json()
|
|
self.log(osd_dump['pools'])
|
|
return [str(i['pool_name']) for i in osd_dump['pools']]
|
|
|
|
def clear_pools(self):
|
|
"""
|
|
remove all pools
|
|
"""
|
|
[self.remove_pool(i) for i in self.list_pools()]
|
|
|
|
def kick_recovery_wq(self, osdnum):
|
|
"""
|
|
Run kick_recovery_wq on cluster.
|
|
"""
|
|
return self.raw_cluster_cmd(
|
|
'tell', "osd.%d" % (int(osdnum),),
|
|
'debug',
|
|
'kick_recovery_wq',
|
|
'0')
|
|
|
|
def wait_run_admin_socket(self, service_type,
|
|
service_id, args=['version'], timeout=75, stdout=None):
|
|
"""
|
|
If osd_admin_socket call succeeds, return. Otherwise wait
|
|
five seconds and try again.
|
|
"""
|
|
if stdout is None:
|
|
stdout = StringIO()
|
|
tries = 0
|
|
while True:
|
|
proc = self.admin_socket(service_type, service_id,
|
|
args, check_status=False, stdout=stdout)
|
|
if proc.exitstatus == 0:
|
|
return proc
|
|
else:
|
|
tries += 1
|
|
if (tries * 5) > timeout:
|
|
raise Exception('timed out waiting for admin_socket '
|
|
'to appear after {type}.{id} restart'.
|
|
format(type=service_type,
|
|
id=service_id))
|
|
self.log("waiting on admin_socket for {type}-{id}, "
|
|
"{command}".format(type=service_type,
|
|
id=service_id,
|
|
command=args))
|
|
time.sleep(5)
|
|
|
|
def get_pool_dump(self, pool):
|
|
"""
|
|
get the osd dump part of a pool
|
|
"""
|
|
osd_dump = self.get_osd_dump_json()
|
|
for i in osd_dump['pools']:
|
|
if i['pool_name'] == pool:
|
|
return i
|
|
assert False
|
|
|
|
def get_config(self, service_type, service_id, name):
|
|
"""
|
|
:param node: like 'mon.a'
|
|
:param name: the option name
|
|
"""
|
|
proc = self.wait_run_admin_socket(service_type, service_id,
|
|
['config', 'show'])
|
|
j = json.loads(proc.stdout.getvalue())
|
|
return j[name]
|
|
|
|
def inject_args(self, service_type, service_id, name, value):
|
|
whom = '{0}.{1}'.format(service_type, service_id)
|
|
if isinstance(value, bool):
|
|
value = 'true' if value else 'false'
|
|
opt_arg = '--{name}={value}'.format(name=name, value=value)
|
|
self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
|
|
|
|
def set_config(self, osdnum, **argdict):
|
|
"""
|
|
:param osdnum: osd number
|
|
:param argdict: dictionary containing values to set.
|
|
"""
|
|
for k, v in argdict.items():
|
|
self.wait_run_admin_socket(
|
|
'osd', osdnum,
|
|
['config', 'set', str(k), str(v)])
|
|
|
|
def raw_cluster_status(self):
|
|
"""
|
|
Get status from cluster
|
|
"""
|
|
status = self.raw_cluster_cmd('status', '--format=json')
|
|
return json.loads(status)
|
|
|
|
def raw_osd_status(self):
|
|
"""
|
|
Get osd status from cluster
|
|
"""
|
|
return self.raw_cluster_cmd('osd', 'dump')
|
|
|
|
def get_osd_status(self):
|
|
"""
|
|
Get osd statuses sorted by states that the osds are in.
|
|
"""
|
|
osd_lines = list(filter(
|
|
lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
|
|
self.raw_osd_status().split('\n')))
|
|
self.log(osd_lines)
|
|
in_osds = [int(i[4:].split()[0])
|
|
for i in filter(lambda x: " in " in x, osd_lines)]
|
|
out_osds = [int(i[4:].split()[0])
|
|
for i in filter(lambda x: " out " in x, osd_lines)]
|
|
up_osds = [int(i[4:].split()[0])
|
|
for i in filter(lambda x: " up " in x, osd_lines)]
|
|
down_osds = [int(i[4:].split()[0])
|
|
for i in filter(lambda x: " down " in x, osd_lines)]
|
|
dead_osds = [int(x.id_)
|
|
for x in filter(lambda x:
|
|
not x.running(),
|
|
self.ctx.daemons.
|
|
iter_daemons_of_role('osd', self.cluster))]
|
|
live_osds = [int(x.id_) for x in
|
|
filter(lambda x:
|
|
x.running(),
|
|
self.ctx.daemons.iter_daemons_of_role('osd',
|
|
self.cluster))]
|
|
return {'in': in_osds, 'out': out_osds, 'up': up_osds,
|
|
'down': down_osds, 'dead': dead_osds, 'live': live_osds,
|
|
'raw': osd_lines}
|
|
|
|
def get_num_pgs(self):
|
|
"""
|
|
Check cluster status for the number of pgs
|
|
"""
|
|
status = self.raw_cluster_status()
|
|
self.log(status)
|
|
return status['pgmap']['num_pgs']
|
|
|
|
def create_erasure_code_profile(self, profile_name, profile):
|
|
"""
|
|
Create an erasure code profile name that can be used as a parameter
|
|
when creating an erasure coded pool.
|
|
"""
|
|
with self.lock:
|
|
args = cmd_erasure_code_profile(profile_name, profile)
|
|
self.raw_cluster_cmd(*args)
|
|
|
|
def create_pool_with_unique_name(self, pg_num=16,
|
|
erasure_code_profile_name=None,
|
|
min_size=None,
|
|
erasure_code_use_overwrites=False):
|
|
"""
|
|
Create a pool named unique_pool_X where X is unique.
|
|
"""
|
|
name = ""
|
|
with self.lock:
|
|
name = "unique_pool_%s" % (str(self.next_pool_id),)
|
|
self.next_pool_id += 1
|
|
self.create_pool(
|
|
name,
|
|
pg_num,
|
|
erasure_code_profile_name=erasure_code_profile_name,
|
|
min_size=min_size,
|
|
erasure_code_use_overwrites=erasure_code_use_overwrites)
|
|
return name
|
|
|
|
@contextlib.contextmanager
|
|
def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
|
|
self.create_pool(pool_name, pg_num, erasure_code_profile_name)
|
|
yield
|
|
self.remove_pool(pool_name)
|
|
|
|
def create_pool(self, pool_name, pg_num=16,
|
|
erasure_code_profile_name=None,
|
|
min_size=None,
|
|
erasure_code_use_overwrites=False):
|
|
"""
|
|
Create a pool named from the pool_name parameter.
|
|
:param pool_name: name of the pool being created.
|
|
:param pg_num: initial number of pgs.
|
|
:param erasure_code_profile_name: if set and !None create an
|
|
erasure coded pool using the profile
|
|
:param erasure_code_use_overwrites: if true, allow overwrites
|
|
"""
|
|
with self.lock:
|
|
assert isinstance(pool_name, str)
|
|
assert isinstance(pg_num, int)
|
|
assert pool_name not in self.pools
|
|
self.log("creating pool_name %s" % (pool_name,))
|
|
if erasure_code_profile_name:
|
|
self.raw_cluster_cmd('osd', 'pool', 'create',
|
|
pool_name, str(pg_num), str(pg_num),
|
|
'erasure', erasure_code_profile_name)
|
|
else:
|
|
self.raw_cluster_cmd('osd', 'pool', 'create',
|
|
pool_name, str(pg_num))
|
|
if min_size is not None:
|
|
self.raw_cluster_cmd(
|
|
'osd', 'pool', 'set', pool_name,
|
|
'min_size',
|
|
str(min_size))
|
|
if erasure_code_use_overwrites:
|
|
self.raw_cluster_cmd(
|
|
'osd', 'pool', 'set', pool_name,
|
|
'allow_ec_overwrites',
|
|
'true')
|
|
self.raw_cluster_cmd(
|
|
'osd', 'pool', 'application', 'enable',
|
|
pool_name, 'rados', '--yes-i-really-mean-it',
|
|
run.Raw('||'), 'true')
|
|
self.pools[pool_name] = pg_num
|
|
time.sleep(1)
|
|
|
|
def add_pool_snap(self, pool_name, snap_name):
|
|
"""
|
|
Add pool snapshot
|
|
:param pool_name: name of pool to snapshot
|
|
:param snap_name: name of snapshot to take
|
|
"""
|
|
self.raw_cluster_cmd('osd', 'pool', 'mksnap',
|
|
str(pool_name), str(snap_name))
|
|
|
|
def remove_pool_snap(self, pool_name, snap_name):
|
|
"""
|
|
Remove pool snapshot
|
|
:param pool_name: name of pool to snapshot
|
|
:param snap_name: name of snapshot to remove
|
|
"""
|
|
self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
|
|
str(pool_name), str(snap_name))
|
|
|
|
def remove_pool(self, pool_name):
|
|
"""
|
|
Remove the indicated pool
|
|
:param pool_name: Pool to be removed
|
|
"""
|
|
with self.lock:
|
|
assert isinstance(pool_name, str)
|
|
assert pool_name in self.pools
|
|
self.log("removing pool_name %s" % (pool_name,))
|
|
del self.pools[pool_name]
|
|
self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
|
|
"--yes-i-really-really-mean-it")
|
|
|
|
def get_pool(self):
|
|
"""
|
|
Pick a random pool
|
|
"""
|
|
with self.lock:
|
|
if self.pools:
|
|
return random.sample(self.pools.keys(), 1)[0]
|
|
|
|
def get_pool_pg_num(self, pool_name):
|
|
"""
|
|
Return the number of pgs in the pool specified.
|
|
"""
|
|
with self.lock:
|
|
assert isinstance(pool_name, str)
|
|
if pool_name in self.pools:
|
|
return self.pools[pool_name]
|
|
return 0
|
|
|
|
def get_pool_property(self, pool_name, prop):
|
|
"""
|
|
:param pool_name: pool
|
|
:param prop: property to be checked.
|
|
:returns: property as string
|
|
"""
|
|
with self.lock:
|
|
assert isinstance(pool_name, str)
|
|
assert isinstance(prop, str)
|
|
output = self.raw_cluster_cmd(
|
|
'osd',
|
|
'pool',
|
|
'get',
|
|
pool_name,
|
|
prop)
|
|
return output.split()[1]
|
|
|
|
def get_pool_int_property(self, pool_name, prop):
|
|
return int(self.get_pool_property(pool_name, prop))
|
|
|
|
def set_pool_property(self, pool_name, prop, val):
|
|
"""
|
|
:param pool_name: pool
|
|
:param prop: property to be set.
|
|
:param val: value to set.
|
|
|
|
This routine retries if set operation fails.
|
|
"""
|
|
with self.lock:
|
|
assert isinstance(pool_name, str)
|
|
assert isinstance(prop, str)
|
|
assert isinstance(val, int)
|
|
tries = 0
|
|
while True:
|
|
r = self.raw_cluster_cmd_result(
|
|
'osd',
|
|
'pool',
|
|
'set',
|
|
pool_name,
|
|
prop,
|
|
str(val))
|
|
if r != 11: # EAGAIN
|
|
break
|
|
tries += 1
|
|
if tries > 50:
|
|
raise Exception('timed out getting EAGAIN '
|
|
'when setting pool property %s %s = %s' %
|
|
(pool_name, prop, val))
|
|
self.log('got EAGAIN setting pool property, '
|
|
'waiting a few seconds...')
|
|
time.sleep(2)
|
|
|
|
def expand_pool(self, pool_name, by, max_pgs):
|
|
"""
|
|
Increase the number of pgs in a pool
|
|
"""
|
|
with self.lock:
|
|
assert isinstance(pool_name, str)
|
|
assert isinstance(by, int)
|
|
assert pool_name in self.pools
|
|
if self.get_num_creating() > 0:
|
|
return False
|
|
if (self.pools[pool_name] + by) > max_pgs:
|
|
return False
|
|
self.log("increase pool size by %d" % (by,))
|
|
new_pg_num = self.pools[pool_name] + by
|
|
self.set_pool_property(pool_name, "pg_num", new_pg_num)
|
|
self.pools[pool_name] = new_pg_num
|
|
return True
|
|
|
|
def contract_pool(self, pool_name, by, min_pgs):
|
|
"""
|
|
Decrease the number of pgs in a pool
|
|
"""
|
|
with self.lock:
|
|
self.log('contract_pool %s by %s min %s' % (
|
|
pool_name, str(by), str(min_pgs)))
|
|
assert isinstance(pool_name, str)
|
|
assert isinstance(by, int)
|
|
assert pool_name in self.pools
|
|
if self.get_num_creating() > 0:
|
|
self.log('too many creating')
|
|
return False
|
|
proj = self.pools[pool_name] - by
|
|
if proj < min_pgs:
|
|
self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
|
|
return False
|
|
self.log("decrease pool size by %d" % (by,))
|
|
new_pg_num = self.pools[pool_name] - by
|
|
self.set_pool_property(pool_name, "pg_num", new_pg_num)
|
|
self.pools[pool_name] = new_pg_num
|
|
return True
|
|
|
|
def stop_pg_num_changes(self):
|
|
"""
|
|
Reset all pg_num_targets back to pg_num, canceling splits and merges
|
|
"""
|
|
self.log('Canceling any pending splits or merges...')
|
|
osd_dump = self.get_osd_dump_json()
|
|
try:
|
|
for pool in osd_dump['pools']:
|
|
if pool['pg_num'] != pool['pg_num_target']:
|
|
self.log('Setting pool %s (%d) pg_num %d -> %d' %
|
|
(pool['pool_name'], pool['pool'],
|
|
pool['pg_num_target'],
|
|
pool['pg_num']))
|
|
self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
|
|
'pg_num', str(pool['pg_num']))
|
|
except KeyError:
|
|
# we don't support pg_num_target before nautilus
|
|
pass
|
|
|
|
def set_pool_pgpnum(self, pool_name, force):
|
|
"""
|
|
Set pgpnum property of pool_name pool.
|
|
"""
|
|
with self.lock:
|
|
assert isinstance(pool_name, str)
|
|
assert pool_name in self.pools
|
|
if not force and self.get_num_creating() > 0:
|
|
return False
|
|
self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
|
|
return True
|
|
|
|
def list_pg_unfound(self, pgid):
|
|
"""
|
|
return list of unfound pgs with the id specified
|
|
"""
|
|
r = None
|
|
offset = {}
|
|
while True:
|
|
out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
|
|
json.dumps(offset))
|
|
j = json.loads(out)
|
|
if r is None:
|
|
r = j
|
|
else:
|
|
r['objects'].extend(j['objects'])
|
|
if not 'more' in j:
|
|
break
|
|
if j['more'] == 0:
|
|
break
|
|
offset = j['objects'][-1]['oid']
|
|
if 'more' in r:
|
|
del r['more']
|
|
return r
|
|
|
|
def get_pg_stats(self):
|
|
"""
|
|
Dump the cluster and get pg stats
|
|
"""
|
|
out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
|
|
j = json.loads('\n'.join(out.split('\n')[1:]))
|
|
try:
|
|
return j['pg_map']['pg_stats']
|
|
except KeyError:
|
|
return j['pg_stats']
|
|
|
|
def get_osd_df(self, osdid):
|
|
"""
|
|
Get the osd df stats
|
|
"""
|
|
out = self.raw_cluster_cmd('osd', 'df', 'name', 'osd.{}'.format(osdid),
|
|
'--format=json')
|
|
j = json.loads('\n'.join(out.split('\n')[1:]))
|
|
return j['nodes'][0]
|
|
|
|
def get_pool_df(self, name):
|
|
"""
|
|
Get the pool df stats
|
|
"""
|
|
out = self.raw_cluster_cmd('df', 'detail', '--format=json')
|
|
j = json.loads('\n'.join(out.split('\n')[1:]))
|
|
return next((p['stats'] for p in j['pools'] if p['name'] == name),
|
|
None)
|
|
|
|
def get_pgids_to_force(self, backfill):
|
|
"""
|
|
Return the randomized list of PGs that can have their recovery/backfill forced
|
|
"""
|
|
j = self.get_pg_stats();
|
|
pgids = []
|
|
if backfill:
|
|
wanted = ['degraded', 'backfilling', 'backfill_wait']
|
|
else:
|
|
wanted = ['recovering', 'degraded', 'recovery_wait']
|
|
for pg in j:
|
|
status = pg['state'].split('+')
|
|
for t in wanted:
|
|
if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
|
|
pgids.append(pg['pgid'])
|
|
break
|
|
return pgids
|
|
|
|
def get_pgids_to_cancel_force(self, backfill):
|
|
"""
|
|
Return the randomized list of PGs whose recovery/backfill priority is forced
|
|
"""
|
|
j = self.get_pg_stats();
|
|
pgids = []
|
|
if backfill:
|
|
wanted = 'forced_backfill'
|
|
else:
|
|
wanted = 'forced_recovery'
|
|
for pg in j:
|
|
status = pg['state'].split('+')
|
|
if wanted in status and random.random() > 0.5:
|
|
pgids.append(pg['pgid'])
|
|
return pgids
|
|
|
|
def compile_pg_status(self):
|
|
"""
|
|
Return a histogram of pg state values
|
|
"""
|
|
ret = {}
|
|
j = self.get_pg_stats()
|
|
for pg in j:
|
|
for status in pg['state'].split('+'):
|
|
if status not in ret:
|
|
ret[status] = 0
|
|
ret[status] += 1
|
|
return ret
|
|
|
|
@wait_for_pg_stats # type: ignore
|
|
def with_pg_state(self, pool, pgnum, check):
|
|
pgstr = self.get_pgid(pool, pgnum)
|
|
stats = self.get_single_pg_stats(pgstr)
|
|
assert(check(stats['state']))
|
|
|
|
@wait_for_pg_stats # type: ignore
|
|
def with_pg(self, pool, pgnum, check):
|
|
pgstr = self.get_pgid(pool, pgnum)
|
|
stats = self.get_single_pg_stats(pgstr)
|
|
return check(stats)
|
|
|
|
def get_last_scrub_stamp(self, pool, pgnum):
|
|
"""
|
|
Get the timestamp of the last scrub.
|
|
"""
|
|
stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
|
|
return stats["last_scrub_stamp"]
|
|
|
|
def do_pg_scrub(self, pool, pgnum, stype):
|
|
"""
|
|
Scrub pg and wait for scrubbing to finish
|
|
"""
|
|
init = self.get_last_scrub_stamp(pool, pgnum)
|
|
RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
|
|
FATAL_TIMEOUT = RESEND_TIMEOUT * 3
|
|
SLEEP_TIME = 10
|
|
timer = 0
|
|
while init == self.get_last_scrub_stamp(pool, pgnum):
|
|
assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
|
|
self.log("waiting for scrub type %s" % (stype,))
|
|
if (timer % RESEND_TIMEOUT) == 0:
|
|
self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
|
|
# The first time in this loop is the actual request
|
|
if timer != 0 and stype == "repair":
|
|
self.log("WARNING: Resubmitted a non-idempotent repair")
|
|
time.sleep(SLEEP_TIME)
|
|
timer += SLEEP_TIME
|
|
|
|
def wait_snap_trimming_complete(self, pool):
|
|
"""
|
|
Wait for snap trimming on pool to end
|
|
"""
|
|
POLL_PERIOD = 10
|
|
FATAL_TIMEOUT = 600
|
|
start = time.time()
|
|
poolnum = self.get_pool_num(pool)
|
|
poolnumstr = "%s." % (poolnum,)
|
|
while (True):
|
|
now = time.time()
|
|
if (now - start) > FATAL_TIMEOUT:
|
|
assert (now - start) < FATAL_TIMEOUT, \
|
|
'failed to complete snap trimming before timeout'
|
|
all_stats = self.get_pg_stats()
|
|
trimming = False
|
|
for pg in all_stats:
|
|
if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
|
|
self.log("pg {pg} in trimming, state: {state}".format(
|
|
pg=pg['pgid'],
|
|
state=pg['state']))
|
|
trimming = True
|
|
if not trimming:
|
|
break
|
|
self.log("{pool} still trimming, waiting".format(pool=pool))
|
|
time.sleep(POLL_PERIOD)
|
|
|
|
def get_single_pg_stats(self, pgid):
|
|
"""
|
|
Return pg for the pgid specified.
|
|
"""
|
|
all_stats = self.get_pg_stats()
|
|
|
|
for pg in all_stats:
|
|
if pg['pgid'] == pgid:
|
|
return pg
|
|
|
|
return None
|
|
|
|
def get_object_pg_with_shard(self, pool, name, osdid):
|
|
"""
|
|
"""
|
|
pool_dump = self.get_pool_dump(pool)
|
|
object_map = self.get_object_map(pool, name)
|
|
if pool_dump["type"] == PoolType.ERASURE_CODED:
|
|
shard = object_map['acting'].index(osdid)
|
|
return "{pgid}s{shard}".format(pgid=object_map['pgid'],
|
|
shard=shard)
|
|
else:
|
|
return object_map['pgid']
|
|
|
|
def get_object_primary(self, pool, name):
|
|
"""
|
|
"""
|
|
object_map = self.get_object_map(pool, name)
|
|
return object_map['acting_primary']
|
|
|
|
def get_object_map(self, pool, name):
|
|
"""
|
|
osd map --format=json converted to a python object
|
|
:returns: the python object
|
|
"""
|
|
out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
|
|
return json.loads('\n'.join(out.split('\n')[1:]))
|
|
|
|
def get_osd_dump_json(self):
|
|
"""
|
|
osd dump --format=json converted to a python object
|
|
:returns: the python object
|
|
"""
|
|
out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
|
|
return json.loads('\n'.join(out.split('\n')[1:]))
|
|
|
|
def get_osd_dump(self):
|
|
"""
|
|
Dump osds
|
|
:returns: all osds
|
|
"""
|
|
return self.get_osd_dump_json()['osds']
|
|
|
|
def get_osd_metadata(self):
|
|
"""
|
|
osd metadata --format=json converted to a python object
|
|
:returns: the python object containing osd metadata information
|
|
"""
|
|
out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
|
|
return json.loads('\n'.join(out.split('\n')[1:]))
|
|
|
|
def get_mgr_dump(self):
|
|
out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
|
|
return json.loads(out)
|
|
|
|
def get_stuck_pgs(self, type_, threshold):
|
|
"""
|
|
:returns: stuck pg information from the cluster
|
|
"""
|
|
out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
|
|
'--format=json')
|
|
return json.loads(out).get('stuck_pg_stats',[])
|
|
|
|
def get_num_unfound_objects(self):
|
|
"""
|
|
Check cluster status to get the number of unfound objects
|
|
"""
|
|
status = self.raw_cluster_status()
|
|
self.log(status)
|
|
return status['pgmap'].get('unfound_objects', 0)
|
|
|
|
def get_num_creating(self):
|
|
"""
|
|
Find the number of pgs in creating mode.
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
num = 0
|
|
for pg in pgs:
|
|
if 'creating' in pg['state']:
|
|
num += 1
|
|
return num
|
|
|
|
def get_num_active_clean(self):
|
|
"""
|
|
Find the number of active and clean pgs.
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
return self._get_num_active_clean(pgs)
|
|
|
|
def _get_num_active_clean(self, pgs):
|
|
num = 0
|
|
for pg in pgs:
|
|
if (pg['state'].count('active') and
|
|
pg['state'].count('clean') and
|
|
not pg['state'].count('stale')):
|
|
num += 1
|
|
return num
|
|
|
|
def get_num_active_recovered(self):
|
|
"""
|
|
Find the number of active and recovered pgs.
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
return self._get_num_active_recovered(pgs)
|
|
|
|
def _get_num_active_recovered(self, pgs):
|
|
num = 0
|
|
for pg in pgs:
|
|
if (pg['state'].count('active') and
|
|
not pg['state'].count('recover') and
|
|
not pg['state'].count('backfilling') and
|
|
not pg['state'].count('stale')):
|
|
num += 1
|
|
return num
|
|
|
|
def get_is_making_recovery_progress(self):
|
|
"""
|
|
Return whether there is recovery progress discernable in the
|
|
raw cluster status
|
|
"""
|
|
status = self.raw_cluster_status()
|
|
kps = status['pgmap'].get('recovering_keys_per_sec', 0)
|
|
bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
|
|
ops = status['pgmap'].get('recovering_objects_per_sec', 0)
|
|
return kps > 0 or bps > 0 or ops > 0
|
|
|
|
def get_num_active(self):
|
|
"""
|
|
Find the number of active pgs.
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
return self._get_num_active(pgs)
|
|
|
|
def _get_num_active(self, pgs):
|
|
num = 0
|
|
for pg in pgs:
|
|
if pg['state'].count('active') and not pg['state'].count('stale'):
|
|
num += 1
|
|
return num
|
|
|
|
def get_num_down(self):
|
|
"""
|
|
Find the number of pgs that are down.
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
num = 0
|
|
for pg in pgs:
|
|
if ((pg['state'].count('down') and not
|
|
pg['state'].count('stale')) or
|
|
(pg['state'].count('incomplete') and not
|
|
pg['state'].count('stale'))):
|
|
num += 1
|
|
return num
|
|
|
|
def get_num_active_down(self):
|
|
"""
|
|
Find the number of pgs that are either active or down.
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
return self._get_num_active_down(pgs)
|
|
|
|
def _get_num_active_down(self, pgs):
|
|
num = 0
|
|
for pg in pgs:
|
|
if ((pg['state'].count('active') and not
|
|
pg['state'].count('stale')) or
|
|
(pg['state'].count('down') and not
|
|
pg['state'].count('stale')) or
|
|
(pg['state'].count('incomplete') and not
|
|
pg['state'].count('stale'))):
|
|
num += 1
|
|
return num
|
|
|
|
def get_num_peered(self):
|
|
"""
|
|
Find the number of PGs that are peered
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
return self._get_num_peered(pgs)
|
|
|
|
def _get_num_peered(self, pgs):
|
|
num = 0
|
|
for pg in pgs:
|
|
if pg['state'].count('peered') and not pg['state'].count('stale'):
|
|
num += 1
|
|
return num
|
|
|
|
def is_clean(self):
|
|
"""
|
|
True if all pgs are clean
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
if self._get_num_active_clean(pgs) == len(pgs):
|
|
return True
|
|
else:
|
|
self.dump_pgs_not_active_clean()
|
|
return False
|
|
|
|
def is_recovered(self):
|
|
"""
|
|
True if all pgs have recovered
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
return self._get_num_active_recovered(pgs) == len(pgs)
|
|
|
|
def is_active_or_down(self):
|
|
"""
|
|
True if all pgs are active or down
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
return self._get_num_active_down(pgs) == len(pgs)
|
|
|
|
def dump_pgs_not_active_clean(self):
|
|
"""
|
|
Dumps all pgs that are not active+clean
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
for pg in pgs:
|
|
if pg['state'] != 'active+clean':
|
|
self.log('PG %s is not active+clean' % pg['pgid'])
|
|
self.log(pg)
|
|
|
|
def dump_pgs_not_active_down(self):
|
|
"""
|
|
Dumps all pgs that are not active or down
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
for pg in pgs:
|
|
if 'active' not in pg['state'] and 'down' not in pg['state']:
|
|
self.log('PG %s is not active or down' % pg['pgid'])
|
|
self.log(pg)
|
|
|
|
def dump_pgs_not_active(self):
|
|
"""
|
|
Dumps all pgs that are not active
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
for pg in pgs:
|
|
if 'active' not in pg['state']:
|
|
self.log('PG %s is not active' % pg['pgid'])
|
|
self.log(pg)
|
|
|
|
def dump_pgs_not_active_peered(self, pgs):
|
|
for pg in pgs:
|
|
if (not pg['state'].count('active')) and (not pg['state'].count('peered')):
|
|
self.log('PG %s is not active or peered' % pg['pgid'])
|
|
self.log(pg)
|
|
|
|
def wait_for_clean(self, timeout=1200):
|
|
"""
|
|
Returns true when all pgs are clean.
|
|
"""
|
|
self.log("waiting for clean")
|
|
start = time.time()
|
|
num_active_clean = self.get_num_active_clean()
|
|
while not self.is_clean():
|
|
if timeout is not None:
|
|
if self.get_is_making_recovery_progress():
|
|
self.log("making progress, resetting timeout")
|
|
start = time.time()
|
|
else:
|
|
self.log("no progress seen, keeping timeout for now")
|
|
if time.time() - start >= timeout:
|
|
self.log('dumping pgs not clean')
|
|
self.dump_pgs_not_active_clean()
|
|
assert time.time() - start < timeout, \
|
|
'wait_for_clean: failed before timeout expired'
|
|
cur_active_clean = self.get_num_active_clean()
|
|
if cur_active_clean != num_active_clean:
|
|
start = time.time()
|
|
num_active_clean = cur_active_clean
|
|
time.sleep(3)
|
|
self.log("clean!")
|
|
|
|
def are_all_osds_up(self):
|
|
"""
|
|
Returns true if all osds are up.
|
|
"""
|
|
x = self.get_osd_dump()
|
|
return (len(x) == sum([(y['up'] > 0) for y in x]))
|
|
|
|
def wait_for_all_osds_up(self, timeout=None):
|
|
"""
|
|
When this exits, either the timeout has expired, or all
|
|
osds are up.
|
|
"""
|
|
self.log("waiting for all up")
|
|
start = time.time()
|
|
while not self.are_all_osds_up():
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'timeout expired in wait_for_all_osds_up'
|
|
time.sleep(3)
|
|
self.log("all up!")
|
|
|
|
def pool_exists(self, pool):
|
|
if pool in self.list_pools():
|
|
return True
|
|
return False
|
|
|
|
def wait_for_pool(self, pool, timeout=300):
|
|
"""
|
|
Wait for a pool to exist
|
|
"""
|
|
self.log('waiting for pool %s to exist' % pool)
|
|
start = time.time()
|
|
while not self.pool_exists(pool):
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'timeout expired in wait_for_pool'
|
|
time.sleep(3)
|
|
|
|
def wait_for_pools(self, pools):
|
|
for pool in pools:
|
|
self.wait_for_pool(pool)
|
|
|
|
def is_mgr_available(self):
|
|
x = self.get_mgr_dump()
|
|
return x.get('available', False)
|
|
|
|
def wait_for_mgr_available(self, timeout=None):
|
|
self.log("waiting for mgr available")
|
|
start = time.time()
|
|
while not self.is_mgr_available():
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'timeout expired in wait_for_mgr_available'
|
|
time.sleep(3)
|
|
self.log("mgr available!")
|
|
|
|
def wait_for_recovery(self, timeout=None):
|
|
"""
|
|
Check peering. When this exists, we have recovered.
|
|
"""
|
|
self.log("waiting for recovery to complete")
|
|
start = time.time()
|
|
num_active_recovered = self.get_num_active_recovered()
|
|
while not self.is_recovered():
|
|
now = time.time()
|
|
if timeout is not None:
|
|
if self.get_is_making_recovery_progress():
|
|
self.log("making progress, resetting timeout")
|
|
start = time.time()
|
|
else:
|
|
self.log("no progress seen, keeping timeout for now")
|
|
if now - start >= timeout:
|
|
if self.is_recovered():
|
|
break
|
|
self.log('dumping pgs not recovered yet')
|
|
self.dump_pgs_not_active_clean()
|
|
assert now - start < timeout, \
|
|
'wait_for_recovery: failed before timeout expired'
|
|
cur_active_recovered = self.get_num_active_recovered()
|
|
if cur_active_recovered != num_active_recovered:
|
|
start = time.time()
|
|
num_active_recovered = cur_active_recovered
|
|
time.sleep(3)
|
|
self.log("recovered!")
|
|
|
|
def wait_for_active(self, timeout=None):
|
|
"""
|
|
Check peering. When this exists, we are definitely active
|
|
"""
|
|
self.log("waiting for peering to complete")
|
|
start = time.time()
|
|
num_active = self.get_num_active()
|
|
while not self.is_active():
|
|
if timeout is not None:
|
|
if time.time() - start >= timeout:
|
|
self.log('dumping pgs not active')
|
|
self.dump_pgs_not_active()
|
|
assert time.time() - start < timeout, \
|
|
'wait_for_active: failed before timeout expired'
|
|
cur_active = self.get_num_active()
|
|
if cur_active != num_active:
|
|
start = time.time()
|
|
num_active = cur_active
|
|
time.sleep(3)
|
|
self.log("active!")
|
|
|
|
def wait_for_active_or_down(self, timeout=None):
|
|
"""
|
|
Check peering. When this exists, we are definitely either
|
|
active or down
|
|
"""
|
|
self.log("waiting for peering to complete or become blocked")
|
|
start = time.time()
|
|
num_active_down = self.get_num_active_down()
|
|
while not self.is_active_or_down():
|
|
if timeout is not None:
|
|
if time.time() - start >= timeout:
|
|
self.log('dumping pgs not active or down')
|
|
self.dump_pgs_not_active_down()
|
|
assert time.time() - start < timeout, \
|
|
'wait_for_active_or_down: failed before timeout expired'
|
|
cur_active_down = self.get_num_active_down()
|
|
if cur_active_down != num_active_down:
|
|
start = time.time()
|
|
num_active_down = cur_active_down
|
|
time.sleep(3)
|
|
self.log("active or down!")
|
|
|
|
def osd_is_up(self, osd):
|
|
"""
|
|
Wrapper for osd check
|
|
"""
|
|
osds = self.get_osd_dump()
|
|
return osds[osd]['up'] > 0
|
|
|
|
def wait_till_osd_is_up(self, osd, timeout=None):
|
|
"""
|
|
Loop waiting for osd.
|
|
"""
|
|
self.log('waiting for osd.%d to be up' % osd)
|
|
start = time.time()
|
|
while not self.osd_is_up(osd):
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'osd.%d failed to come up before timeout expired' % osd
|
|
time.sleep(3)
|
|
self.log('osd.%d is up' % osd)
|
|
|
|
def is_active(self):
|
|
"""
|
|
Wrapper to check if all pgs are active
|
|
"""
|
|
return self.get_num_active() == self.get_num_pgs()
|
|
|
|
def all_active_or_peered(self):
|
|
"""
|
|
Wrapper to check if all PGs are active or peered
|
|
"""
|
|
pgs = self.get_pg_stats()
|
|
if self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs):
|
|
return True
|
|
else:
|
|
self.dump_pgs_not_active_peered(pgs)
|
|
return False
|
|
|
|
def wait_till_active(self, timeout=None):
|
|
"""
|
|
Wait until all pgs are active.
|
|
"""
|
|
self.log("waiting till active")
|
|
start = time.time()
|
|
while not self.is_active():
|
|
if timeout is not None:
|
|
if time.time() - start >= timeout:
|
|
self.log('dumping pgs not active')
|
|
self.dump_pgs_not_active()
|
|
assert time.time() - start < timeout, \
|
|
'wait_till_active: failed before timeout expired'
|
|
time.sleep(3)
|
|
self.log("active!")
|
|
|
|
def wait_till_pg_convergence(self, timeout=None):
|
|
start = time.time()
|
|
old_stats = None
|
|
active_osds = [osd['osd'] for osd in self.get_osd_dump()
|
|
if osd['in'] and osd['up']]
|
|
while True:
|
|
# strictly speaking, no need to wait for mon. but due to the
|
|
# "ms inject socket failures" setting, the osdmap could be delayed,
|
|
# so mgr is likely to ignore the pg-stat messages with pgs serving
|
|
# newly created pools which is not yet known by mgr. so, to make sure
|
|
# the mgr is updated with the latest pg-stats, waiting for mon/mgr is
|
|
# necessary.
|
|
self.flush_pg_stats(active_osds)
|
|
new_stats = dict((stat['pgid'], stat['state'])
|
|
for stat in self.get_pg_stats())
|
|
if old_stats == new_stats:
|
|
return old_stats
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'failed to reach convergence before %d secs' % timeout
|
|
old_stats = new_stats
|
|
# longer than mgr_stats_period
|
|
time.sleep(5 + 1)
|
|
|
|
def mark_out_osd(self, osd):
|
|
"""
|
|
Wrapper to mark osd out.
|
|
"""
|
|
self.raw_cluster_cmd('osd', 'out', str(osd))
|
|
|
|
def kill_osd(self, osd):
|
|
"""
|
|
Kill osds by either power cycling (if indicated by the config)
|
|
or by stopping.
|
|
"""
|
|
if self.config.get('powercycle'):
|
|
remote = self.find_remote('osd', osd)
|
|
self.log('kill_osd on osd.{o} '
|
|
'doing powercycle of {s}'.format(o=osd, s=remote.name))
|
|
self._assert_ipmi(remote)
|
|
remote.console.power_off()
|
|
elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
|
|
if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
|
|
self.inject_args(
|
|
'osd', osd,
|
|
'bdev-inject-crash', self.config.get('bdev_inject_crash'))
|
|
try:
|
|
self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
|
|
except:
|
|
pass
|
|
else:
|
|
raise RuntimeError('osd.%s did not fail' % osd)
|
|
else:
|
|
self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
|
|
else:
|
|
self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
|
|
|
|
@staticmethod
|
|
def _assert_ipmi(remote):
|
|
assert remote.console.has_ipmi_credentials, (
|
|
"powercycling requested but RemoteConsole is not "
|
|
"initialized. Check ipmi config.")
|
|
|
|
def blackhole_kill_osd(self, osd):
|
|
"""
|
|
Stop osd if nothing else works.
|
|
"""
|
|
self.inject_args('osd', osd,
|
|
'objectstore-blackhole', True)
|
|
time.sleep(2)
|
|
self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
|
|
|
|
def revive_osd(self, osd, timeout=360, skip_admin_check=False):
|
|
"""
|
|
Revive osds by either power cycling (if indicated by the config)
|
|
or by restarting.
|
|
"""
|
|
if self.config.get('powercycle'):
|
|
remote = self.find_remote('osd', osd)
|
|
self.log('kill_osd on osd.{o} doing powercycle of {s}'.
|
|
format(o=osd, s=remote.name))
|
|
self._assert_ipmi(remote)
|
|
remote.console.power_on()
|
|
if not remote.console.check_status(300):
|
|
raise Exception('Failed to revive osd.{o} via ipmi'.
|
|
format(o=osd))
|
|
teuthology.reconnect(self.ctx, 60, [remote])
|
|
mount_osd_data(self.ctx, remote, self.cluster, str(osd))
|
|
self.make_admin_daemon_dir(remote)
|
|
self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
|
|
self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
|
|
|
|
if not skip_admin_check:
|
|
# wait for dump_ops_in_flight; this command doesn't appear
|
|
# until after the signal handler is installed and it is safe
|
|
# to stop the osd again without making valgrind leak checks
|
|
# unhappy. see #5924.
|
|
self.wait_run_admin_socket('osd', osd,
|
|
args=['dump_ops_in_flight'],
|
|
timeout=timeout, stdout=DEVNULL)
|
|
|
|
def mark_down_osd(self, osd):
|
|
"""
|
|
Cluster command wrapper
|
|
"""
|
|
self.raw_cluster_cmd('osd', 'down', str(osd))
|
|
|
|
def mark_in_osd(self, osd):
|
|
"""
|
|
Cluster command wrapper
|
|
"""
|
|
self.raw_cluster_cmd('osd', 'in', str(osd))
|
|
|
|
def signal_osd(self, osd, sig, silent=False):
|
|
"""
|
|
Wrapper to local get_daemon call which sends the given
|
|
signal to the given osd.
|
|
"""
|
|
self.ctx.daemons.get_daemon('osd', osd,
|
|
self.cluster).signal(sig, silent=silent)
|
|
|
|
## monitors
|
|
def signal_mon(self, mon, sig, silent=False):
|
|
"""
|
|
Wrapper to local get_daemon call
|
|
"""
|
|
self.ctx.daemons.get_daemon('mon', mon,
|
|
self.cluster).signal(sig, silent=silent)
|
|
|
|
def kill_mon(self, mon):
|
|
"""
|
|
Kill the monitor by either power cycling (if the config says so),
|
|
or by doing a stop.
|
|
"""
|
|
if self.config.get('powercycle'):
|
|
remote = self.find_remote('mon', mon)
|
|
self.log('kill_mon on mon.{m} doing powercycle of {s}'.
|
|
format(m=mon, s=remote.name))
|
|
self._assert_ipmi(remote)
|
|
remote.console.power_off()
|
|
else:
|
|
self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
|
|
|
|
def revive_mon(self, mon):
|
|
"""
|
|
Restart by either power cycling (if the config says so),
|
|
or by doing a normal restart.
|
|
"""
|
|
if self.config.get('powercycle'):
|
|
remote = self.find_remote('mon', mon)
|
|
self.log('revive_mon on mon.{m} doing powercycle of {s}'.
|
|
format(m=mon, s=remote.name))
|
|
self._assert_ipmi(remote)
|
|
remote.console.power_on()
|
|
self.make_admin_daemon_dir(remote)
|
|
self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
|
|
|
|
def revive_mgr(self, mgr):
|
|
"""
|
|
Restart by either power cycling (if the config says so),
|
|
or by doing a normal restart.
|
|
"""
|
|
if self.config.get('powercycle'):
|
|
remote = self.find_remote('mgr', mgr)
|
|
self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
|
|
format(m=mgr, s=remote.name))
|
|
self._assert_ipmi(remote)
|
|
remote.console.power_on()
|
|
self.make_admin_daemon_dir(remote)
|
|
self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
|
|
|
|
def get_mon_status(self, mon):
|
|
"""
|
|
Extract all the monitor status information from the cluster
|
|
"""
|
|
out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
|
|
return json.loads(out)
|
|
|
|
def get_mon_quorum(self):
|
|
"""
|
|
Extract monitor quorum information from the cluster
|
|
"""
|
|
out = self.raw_cluster_cmd('quorum_status')
|
|
j = json.loads(out)
|
|
return j['quorum']
|
|
|
|
def wait_for_mon_quorum_size(self, size, timeout=300):
|
|
"""
|
|
Loop until quorum size is reached.
|
|
"""
|
|
self.log('waiting for quorum size %d' % size)
|
|
sleep = 3
|
|
with safe_while(sleep=sleep,
|
|
tries=timeout // sleep,
|
|
action=f'wait for quorum size {size}') as proceed:
|
|
while proceed():
|
|
try:
|
|
if len(self.get_mon_quorum()) == size:
|
|
break
|
|
except CommandFailedError as e:
|
|
# could fail instea4d of blocked if the rotating key of the
|
|
# connected monitor is not updated yet after they form the
|
|
# quorum
|
|
if e.exitstatus == errno.EACCES:
|
|
pass
|
|
else:
|
|
raise
|
|
self.log("quorum is size %d" % size)
|
|
|
|
def get_mon_health(self, debug=False, detail=False):
|
|
"""
|
|
Extract all the monitor health information.
|
|
"""
|
|
if detail:
|
|
out = self.raw_cluster_cmd('health', 'detail', '--format=json')
|
|
else:
|
|
out = self.raw_cluster_cmd('health', '--format=json')
|
|
if debug:
|
|
self.log('health:\n{h}'.format(h=out))
|
|
return json.loads(out)
|
|
|
|
def wait_until_healthy(self, timeout=None):
|
|
self.log("wait_until_healthy")
|
|
start = time.time()
|
|
while self.get_mon_health()['status'] != 'HEALTH_OK':
|
|
if timeout is not None:
|
|
assert time.time() - start < timeout, \
|
|
'timeout expired in wait_until_healthy'
|
|
time.sleep(3)
|
|
self.log("wait_until_healthy done")
|
|
|
|
def get_filepath(self):
|
|
"""
|
|
Return path to osd data with {id} needing to be replaced
|
|
"""
|
|
return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
|
|
|
|
def make_admin_daemon_dir(self, remote):
|
|
"""
|
|
Create /var/run/ceph directory on remote site.
|
|
|
|
:param ctx: Context
|
|
:param remote: Remote site
|
|
"""
|
|
remote.run(args=['sudo',
|
|
'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
|
|
|
|
def get_service_task_status(self, service, status_key):
|
|
"""
|
|
Return daemon task status for a given ceph service.
|
|
|
|
:param service: ceph service (mds, osd, etc...)
|
|
:param status_key: matching task status key
|
|
"""
|
|
task_status = {}
|
|
status = self.raw_cluster_status()
|
|
try:
|
|
for k,v in status['servicemap']['services'][service]['daemons'].items():
|
|
ts = dict(v).get('task_status', None)
|
|
if ts:
|
|
task_status[k] = ts[status_key]
|
|
except KeyError: # catches missing service and status key
|
|
return {}
|
|
self.log(task_status)
|
|
return task_status
|
|
|
|
def utility_task(name):
|
|
"""
|
|
Generate ceph_manager subtask corresponding to ceph_manager
|
|
method name
|
|
"""
|
|
def task(ctx, config):
|
|
if config is None:
|
|
config = {}
|
|
args = config.get('args', [])
|
|
kwargs = config.get('kwargs', {})
|
|
cluster = config.get('cluster', 'ceph')
|
|
fn = getattr(ctx.managers[cluster], name)
|
|
fn(*args, **kwargs)
|
|
return task
|
|
|
|
revive_osd = utility_task("revive_osd")
|
|
revive_mon = utility_task("revive_mon")
|
|
kill_osd = utility_task("kill_osd")
|
|
kill_mon = utility_task("kill_mon")
|
|
create_pool = utility_task("create_pool")
|
|
remove_pool = utility_task("remove_pool")
|
|
wait_for_clean = utility_task("wait_for_clean")
|
|
flush_all_pg_stats = utility_task("flush_all_pg_stats")
|
|
set_pool_property = utility_task("set_pool_property")
|
|
do_pg_scrub = utility_task("do_pg_scrub")
|
|
wait_for_pool = utility_task("wait_for_pool")
|
|
wait_for_pools = utility_task("wait_for_pools")
|