mirror of
https://github.com/ceph/ceph
synced 2025-02-24 03:27:10 +00:00
This doesn't help on the precise version, but maybe it will on future versions... because if so, it would be really helpful it matching valgrind errors to daemon logs! In any case, though, I verified it is at worse harmless. Signed-off-by: Sage Weil <sage@redhat.com>
1170 lines
33 KiB
Python
1170 lines
33 KiB
Python
"""
|
|
Miscellaneous teuthology functions.
|
|
Used by other modules, but mostly called from tasks.
|
|
"""
|
|
from cStringIO import StringIO
|
|
|
|
import argparse
|
|
import os
|
|
import logging
|
|
import configobj
|
|
import getpass
|
|
import socket
|
|
import sys
|
|
import tarfile
|
|
import time
|
|
import urllib2
|
|
import urlparse
|
|
import yaml
|
|
import json
|
|
import re
|
|
import tempfile
|
|
|
|
from teuthology import safepath
|
|
from .orchestra import run
|
|
from .config import config
|
|
from .contextutil import safe_while
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
import datetime
|
|
stamp = datetime.datetime.now().strftime("%y%m%d%H%M")
|
|
is_vm = lambda x: x.startswith('vpm') or x.startswith('ubuntu@vpm')
|
|
|
|
is_arm = lambda x: x.startswith('tala') or x.startswith(
|
|
'ubuntu@tala') or x.startswith('saya') or x.startswith('ubuntu@saya')
|
|
|
|
hostname_expr = '(?P<user>.*@)?(?P<shortname>.*)\.front\.sepia\.ceph\.com'
|
|
|
|
|
|
def canonicalize_hostname(hostname, user='ubuntu'):
|
|
match = re.match(hostname_expr, hostname)
|
|
if match:
|
|
match_d = match.groupdict()
|
|
shortname = match_d['shortname']
|
|
if user is None:
|
|
user_ = user
|
|
else:
|
|
user_ = match_d.get('user') or user
|
|
else:
|
|
shortname = hostname.split('.')[0]
|
|
user_ = user
|
|
|
|
user_at = user_.strip('@') + '@' if user_ else ''
|
|
|
|
ret = '{user_at}{short}.front.sepia.ceph.com'.format(
|
|
user_at=user_at,
|
|
short=shortname)
|
|
return ret
|
|
|
|
|
|
def decanonicalize_hostname(hostname):
|
|
match = re.match(hostname_expr, hostname)
|
|
if match:
|
|
hostname = match.groupdict()['shortname']
|
|
return hostname
|
|
|
|
|
|
def config_file(string):
|
|
"""
|
|
Create a config file
|
|
|
|
:param string: name of yaml file used for config.
|
|
:returns: Dictionary of configuration information.
|
|
"""
|
|
config_dict = {}
|
|
try:
|
|
with file(string) as f:
|
|
g = yaml.safe_load_all(f)
|
|
for new in g:
|
|
config_dict.update(new)
|
|
except IOError as e:
|
|
raise argparse.ArgumentTypeError(str(e))
|
|
return config_dict
|
|
|
|
|
|
class MergeConfig(argparse.Action):
|
|
"""
|
|
Used by scripts to mergeg configurations. (nuke, run, and
|
|
schedule, for example)
|
|
"""
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
"""
|
|
Perform merges of all the day in the config dictionaries.
|
|
"""
|
|
config_dict = getattr(namespace, self.dest)
|
|
for new in values:
|
|
deep_merge(config_dict, new)
|
|
|
|
|
|
def get_testdir(ctx):
|
|
"""
|
|
:returns: A test directory
|
|
"""
|
|
if 'test_path' in ctx.teuthology_config:
|
|
return ctx.teuthology_config['test_path']
|
|
test_user = get_test_user(ctx)
|
|
# FIXME this ideally should use os.path.expanduser() in the future, in case
|
|
# $HOME isn't /home/$USER - e.g. on a Mac. However, since we're executing
|
|
# this on the server side, it won't work properly.
|
|
return ctx.teuthology_config.get('test_path', '/home/%s/cephtest' %
|
|
test_user)
|
|
|
|
|
|
def get_test_user(ctx):
|
|
"""
|
|
:returns: str -- the user to run tests as on remote hosts
|
|
"""
|
|
return ctx.teuthology_config.get('test_user', 'ubuntu')
|
|
|
|
|
|
def get_archive_dir(ctx):
|
|
"""
|
|
:returns: archive directory (a subdirectory of the test directory)
|
|
"""
|
|
test_dir = get_testdir(ctx)
|
|
return os.path.normpath(os.path.join(test_dir, 'archive'))
|
|
|
|
|
|
def get_http_log_path(archive_dir, job_id=None):
|
|
"""
|
|
:param archive_dir: directory to be searched
|
|
:param job_id: id of job that terminates the name of the log path
|
|
:returns: http log path
|
|
"""
|
|
http_base = config.archive_server
|
|
if not http_base:
|
|
return None
|
|
|
|
sep = os.path.sep
|
|
archive_dir = archive_dir.rstrip(sep)
|
|
archive_subdir = archive_dir.split(sep)[-1]
|
|
if archive_subdir.endswith(str(job_id)):
|
|
archive_subdir = archive_dir.split(sep)[-2]
|
|
|
|
if job_id is None:
|
|
return os.path.join(http_base, archive_subdir, '')
|
|
return os.path.join(http_base, archive_subdir, str(job_id), '')
|
|
|
|
|
|
def get_results_url(run_name, job_id=None):
|
|
"""
|
|
:param run_name: The name of the test run
|
|
:param job_id: The job_id of the job. Optional.
|
|
:returns: URL to the run (or job, if job_id is passed) in the results web
|
|
UI. For example, Inktank uses Pulpito.
|
|
"""
|
|
if not config.results_ui_server:
|
|
return None
|
|
base_url = config.results_ui_server
|
|
|
|
if job_id is None:
|
|
return os.path.join(base_url, run_name, '')
|
|
return os.path.join(base_url, run_name, str(job_id), '')
|
|
|
|
|
|
def get_ceph_binary_url(package=None,
|
|
branch=None, tag=None, sha1=None, dist=None,
|
|
flavor=None, format=None, arch=None):
|
|
"""
|
|
return the url of the ceph binary found on gitbuildder.
|
|
"""
|
|
BASE = 'http://gitbuilder.ceph.com/{package}-{format}-{dist}-{arch}-{flavor}/'.format(
|
|
package=package,
|
|
flavor=flavor,
|
|
arch=arch,
|
|
format=format,
|
|
dist=dist
|
|
)
|
|
|
|
if sha1 is not None:
|
|
assert branch is None, "cannot set both sha1 and branch"
|
|
assert tag is None, "cannot set both sha1 and tag"
|
|
else:
|
|
# gitbuilder uses remote-style ref names for branches, mangled to
|
|
# have underscores instead of slashes; e.g. origin_master
|
|
if tag is not None:
|
|
ref = tag
|
|
assert branch is None, "cannot set both branch and tag"
|
|
else:
|
|
if branch is None:
|
|
branch = 'master'
|
|
ref = branch
|
|
|
|
sha1_url = urlparse.urljoin(BASE, 'ref/{ref}/sha1'.format(ref=ref))
|
|
log.debug('Translating ref to sha1 using url %s', sha1_url)
|
|
|
|
try:
|
|
sha1_fp = urllib2.urlopen(sha1_url)
|
|
sha1 = sha1_fp.read().rstrip('\n')
|
|
sha1_fp.close()
|
|
except urllib2.HTTPError as e:
|
|
log.error('Failed to get url %s', sha1_url)
|
|
raise e
|
|
|
|
log.debug('Using %s %s sha1 %s', package, format, sha1)
|
|
bindir_url = urlparse.urljoin(BASE, 'sha1/{sha1}/'.format(sha1=sha1))
|
|
return (sha1, bindir_url)
|
|
|
|
|
|
def feed_many_stdins(fp, processes):
|
|
"""
|
|
:param fp: input file
|
|
:param processes: list of processes to be written to.
|
|
"""
|
|
while True:
|
|
data = fp.read(8192)
|
|
if not data:
|
|
break
|
|
for proc in processes:
|
|
proc.stdin.write(data)
|
|
|
|
|
|
def feed_many_stdins_and_close(fp, processes):
|
|
"""
|
|
Feed many and then close processes.
|
|
|
|
:param fp: input file
|
|
:param processes: list of processes to be written to.
|
|
"""
|
|
feed_many_stdins(fp, processes)
|
|
for proc in processes:
|
|
proc.stdin.close()
|
|
|
|
|
|
def get_mons(roles, ips):
|
|
"""
|
|
Get monitors and their associated ports
|
|
"""
|
|
mons = {}
|
|
mon_ports = {}
|
|
mon_id = 0
|
|
for idx, roles in enumerate(roles):
|
|
for role in roles:
|
|
if not role.startswith('mon.'):
|
|
continue
|
|
if ips[idx] not in mon_ports:
|
|
mon_ports[ips[idx]] = 6789
|
|
else:
|
|
mon_ports[ips[idx]] += 1
|
|
addr = '{ip}:{port}'.format(
|
|
ip=ips[idx],
|
|
port=mon_ports[ips[idx]],
|
|
)
|
|
mon_id += 1
|
|
mons[role] = addr
|
|
assert mons
|
|
return mons
|
|
|
|
|
|
def generate_caps(type_):
|
|
"""
|
|
Each call will return the next capability for each system type
|
|
(essentially a subset of possible role values). Valid types are osd,
|
|
mds and client.
|
|
"""
|
|
defaults = dict(
|
|
osd=dict(
|
|
mon='allow *',
|
|
osd='allow *',
|
|
),
|
|
mds=dict(
|
|
mon='allow *',
|
|
osd='allow *',
|
|
mds='allow',
|
|
),
|
|
client=dict(
|
|
mon='allow rw',
|
|
osd='allow rwx',
|
|
mds='allow',
|
|
),
|
|
)
|
|
for subsystem, capability in defaults[type_].items():
|
|
yield '--cap'
|
|
yield subsystem
|
|
yield capability
|
|
|
|
|
|
def skeleton_config(ctx, roles, ips):
|
|
"""
|
|
Returns a ConfigObj that is prefilled with a skeleton config.
|
|
|
|
Use conf[section][key]=value or conf.merge to change it.
|
|
|
|
Use conf.write to write it out, override .filename first if you want.
|
|
"""
|
|
path = os.path.join(os.path.dirname(__file__), 'ceph.conf.template')
|
|
t = open(path, 'r')
|
|
skconf = t.read().format(testdir=get_testdir(ctx))
|
|
conf = configobj.ConfigObj(StringIO(skconf), file_error=True)
|
|
mons = get_mons(roles=roles, ips=ips)
|
|
for role, addr in mons.iteritems():
|
|
conf.setdefault(role, {})
|
|
conf[role]['mon addr'] = addr
|
|
# set up standby mds's
|
|
for roles_subset in roles:
|
|
for role in roles_subset:
|
|
if role.startswith('mds.'):
|
|
conf.setdefault(role, {})
|
|
if role.find('-s-') != -1:
|
|
standby_mds = role[role.find('-s-')+3:]
|
|
conf[role]['mds standby for name'] = standby_mds
|
|
return conf
|
|
|
|
|
|
def roles_of_type(roles_for_host, type_):
|
|
"""
|
|
Generator of roles.
|
|
|
|
Each call returns the next possible role of the type specified.
|
|
:param roles_for host: list of roles possible
|
|
:param type_: type of role
|
|
"""
|
|
prefix = '{type}.'.format(type=type_)
|
|
for name in roles_for_host:
|
|
if not name.startswith(prefix):
|
|
continue
|
|
id_ = name[len(prefix):]
|
|
yield id_
|
|
|
|
|
|
def all_roles(cluster):
|
|
"""
|
|
Generator of role values. Each call returns another role.
|
|
|
|
:param cluster: Cluster extracted from the ctx.
|
|
"""
|
|
for _, roles_for_host in cluster.remotes.iteritems():
|
|
for name in roles_for_host:
|
|
yield name
|
|
|
|
|
|
def all_roles_of_type(cluster, type_):
|
|
"""
|
|
Generator of role values. Each call returns another role of the
|
|
type specified.
|
|
|
|
:param cluster: Cluster extracted from the ctx.
|
|
:type_: role type
|
|
"""
|
|
prefix = '{type}.'.format(type=type_)
|
|
for _, roles_for_host in cluster.remotes.iteritems():
|
|
for name in roles_for_host:
|
|
if not name.startswith(prefix):
|
|
continue
|
|
id_ = name[len(prefix):]
|
|
yield id_
|
|
|
|
|
|
def is_type(type_):
|
|
"""
|
|
Returns a matcher function for whether role is of type given.
|
|
"""
|
|
prefix = '{type}.'.format(type=type_)
|
|
|
|
def _is_type(role):
|
|
"""
|
|
Return type based on the starting role name. This should
|
|
probably be improved in the future.
|
|
"""
|
|
return role.startswith(prefix)
|
|
return _is_type
|
|
|
|
|
|
def num_instances_of_type(cluster, type_):
|
|
"""
|
|
Total the number of instances of the role type specified in all remotes.
|
|
|
|
:param cluster: Cluster extracted from ctx.
|
|
:param type_: role
|
|
"""
|
|
remotes_and_roles = cluster.remotes.items()
|
|
roles = [roles for (remote, roles) in remotes_and_roles]
|
|
prefix = '{type}.'.format(type=type_)
|
|
num = sum(sum(1 for role in hostroles if role.startswith(prefix))
|
|
for hostroles in roles)
|
|
return num
|
|
|
|
|
|
def create_simple_monmap(ctx, remote, conf):
|
|
"""
|
|
Writes a simple monmap based on current ceph.conf into <tmpdir>/monmap.
|
|
|
|
Assumes ceph_conf is up to date.
|
|
|
|
Assumes mon sections are named "mon.*", with the dot.
|
|
|
|
:return the FSID (as a string) of the newly created monmap
|
|
"""
|
|
def gen_addresses():
|
|
"""
|
|
Monitor address generator.
|
|
|
|
Each invocation returns the next monitor address
|
|
"""
|
|
for section, data in conf.iteritems():
|
|
PREFIX = 'mon.'
|
|
if not section.startswith(PREFIX):
|
|
continue
|
|
name = section[len(PREFIX):]
|
|
addr = data['mon addr']
|
|
yield (name, addr)
|
|
|
|
addresses = list(gen_addresses())
|
|
assert addresses, "There are no monitors in config!"
|
|
log.debug('Ceph mon addresses: %s', addresses)
|
|
|
|
testdir = get_testdir(ctx)
|
|
args = [
|
|
'adjust-ulimits',
|
|
'ceph-coverage',
|
|
'{tdir}/archive/coverage'.format(tdir=testdir),
|
|
'monmaptool',
|
|
'--create',
|
|
'--clobber',
|
|
]
|
|
for (name, addr) in addresses:
|
|
args.extend(('--add', name, addr))
|
|
args.extend([
|
|
'--print',
|
|
'{tdir}/monmap'.format(tdir=testdir),
|
|
])
|
|
|
|
r = remote.run(
|
|
args=args,
|
|
stdout=StringIO()
|
|
)
|
|
monmap_output = r.stdout.getvalue()
|
|
fsid = re.search("generated fsid (.+)$",
|
|
monmap_output, re.MULTILINE).group(1)
|
|
return fsid
|
|
|
|
|
|
def write_file(remote, path, data):
|
|
"""
|
|
Write data to a remote file
|
|
|
|
:param remote: Remote site.
|
|
:param path: Path on the remote being written to.
|
|
:param data: Data to be written.
|
|
"""
|
|
remote.run(
|
|
args=[
|
|
'python',
|
|
'-c',
|
|
'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
|
|
path,
|
|
],
|
|
stdin=data,
|
|
)
|
|
|
|
|
|
def sudo_write_file(remote, path, data, perms=None, owner=None):
|
|
"""
|
|
Write data to a remote file as super user
|
|
|
|
:param remote: Remote site.
|
|
:param path: Path on the remote being written to.
|
|
:param data: Data to be written.
|
|
:param perms: Permissions on the file being written
|
|
:param owner: Owner for the file being written
|
|
|
|
Both perms and owner are passed directly to chmod.
|
|
"""
|
|
permargs = []
|
|
if perms:
|
|
permargs = [run.Raw('&&'), 'sudo', 'chmod', perms, path]
|
|
owner_args = []
|
|
if owner:
|
|
owner_args = [run.Raw('&&'), 'sudo', 'chown', owner, path]
|
|
remote.run(
|
|
args=[
|
|
'sudo',
|
|
'python',
|
|
'-c',
|
|
'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
|
|
path,
|
|
] + owner_args + permargs,
|
|
stdin=data,
|
|
)
|
|
|
|
|
|
def copy_file(from_remote, from_path, to_remote, to_path=None):
|
|
"""
|
|
Copies a file from one remote to another.
|
|
"""
|
|
if to_path is None:
|
|
to_path = from_path
|
|
from_remote.run(args=[
|
|
'sudo', 'scp', '-v', from_path, "{host}:{file}".format(
|
|
host=to_remote.name, file=to_path)
|
|
])
|
|
|
|
|
|
def move_file(remote, from_path, to_path, sudo=False):
|
|
"""
|
|
Move a file from one path to another on a remote site
|
|
|
|
The file needs to be stat'ed first, to make sure we
|
|
maintain the same permissions
|
|
"""
|
|
args = []
|
|
if sudo:
|
|
args.append('sudo')
|
|
args.extend([
|
|
'stat',
|
|
'-c',
|
|
'\"%a\"',
|
|
to_path
|
|
])
|
|
proc = remote.run(
|
|
args=args,
|
|
stdout=StringIO(),
|
|
)
|
|
perms = proc.stdout.getvalue().rstrip().strip('\"')
|
|
|
|
args = []
|
|
if sudo:
|
|
args.append('sudo')
|
|
args.extend([
|
|
'mv',
|
|
'--',
|
|
from_path,
|
|
to_path,
|
|
])
|
|
proc = remote.run(
|
|
args=args,
|
|
stdout=StringIO(),
|
|
)
|
|
|
|
# reset the file back to the original permissions
|
|
args = []
|
|
if sudo:
|
|
args.append('sudo')
|
|
args.extend([
|
|
'chmod',
|
|
perms,
|
|
to_path,
|
|
])
|
|
proc = remote.run(
|
|
args=args,
|
|
stdout=StringIO(),
|
|
)
|
|
|
|
|
|
def delete_file(remote, path, sudo=False, force=False):
|
|
"""
|
|
rm a file on a remote site.
|
|
"""
|
|
args = []
|
|
if sudo:
|
|
args.append('sudo')
|
|
args.extend(['rm'])
|
|
if force:
|
|
args.extend(['-f'])
|
|
args.extend([
|
|
'--',
|
|
path,
|
|
])
|
|
remote.run(
|
|
args=args,
|
|
stdout=StringIO(),
|
|
)
|
|
|
|
|
|
def remove_lines_from_file(remote, path, line_is_valid_test,
|
|
string_to_test_for):
|
|
"""
|
|
Remove lines from a file. This involves reading the file in, removing
|
|
the appropriate lines, saving the file, and then replacing the original
|
|
file with the new file. Intermediate files are used to prevent data loss
|
|
on when the main site goes up and down.
|
|
"""
|
|
# read in the specified file
|
|
in_data = get_file(remote, path, False)
|
|
out_data = ""
|
|
|
|
first_line = True
|
|
# use the 'line_is_valid_test' function to remove unwanted lines
|
|
for line in in_data.split('\n'):
|
|
if line_is_valid_test(line, string_to_test_for):
|
|
if not first_line:
|
|
out_data += '\n'
|
|
else:
|
|
first_line = False
|
|
|
|
out_data += '{line}'.format(line=line)
|
|
|
|
else:
|
|
log.info('removing line: {bad_line}'.format(bad_line=line))
|
|
|
|
# get a temp file path on the remote host to write to,
|
|
# we don't want to blow away the remote file and then have the
|
|
# network drop out
|
|
temp_file_path = remote.mktemp()
|
|
|
|
# write out the data to a temp file
|
|
write_file(remote, temp_file_path, out_data)
|
|
|
|
# then do a 'mv' to the actual file location
|
|
move_file(remote, temp_file_path, path)
|
|
|
|
|
|
def append_lines_to_file(remote, path, lines, sudo=False):
|
|
"""
|
|
Append lines to a file.
|
|
An intermediate file is used in the same manner as in
|
|
Remove_lines_from_list.
|
|
"""
|
|
|
|
temp_file_path = remote.mktemp()
|
|
|
|
data = get_file(remote, path, sudo)
|
|
|
|
# add the additional data and write it back out, using a temp file
|
|
# in case of connectivity of loss, and then mv it to the
|
|
# actual desired location
|
|
data += lines
|
|
temp_file_path
|
|
write_file(remote, temp_file_path, data)
|
|
|
|
# then do a 'mv' to the actual file location
|
|
move_file(remote, temp_file_path, path)
|
|
|
|
|
|
def create_file(remote, path, data="", permissions=str(644), sudo=False):
|
|
"""
|
|
Create a file on the remote host.
|
|
"""
|
|
args = []
|
|
if sudo:
|
|
args.append('sudo')
|
|
args.extend([
|
|
'touch',
|
|
path,
|
|
run.Raw('&&'),
|
|
'chmod',
|
|
permissions,
|
|
'--',
|
|
path
|
|
])
|
|
remote.run(
|
|
args=args,
|
|
stdout=StringIO(),
|
|
)
|
|
# now write out the data if any was passed in
|
|
if "" != data:
|
|
append_lines_to_file(remote, path, data, sudo)
|
|
|
|
|
|
def get_file(remote, path, sudo=False, dest_dir='/tmp'):
|
|
"""
|
|
Get the contents of a remote file. Do not use for large files; use
|
|
Remote.get_file() instead.
|
|
"""
|
|
local_path = remote.get_file(path, sudo=sudo, dest_dir=dest_dir)
|
|
with open(local_path) as file_obj:
|
|
file_data = file_obj.read()
|
|
os.remove(local_path)
|
|
return file_data
|
|
|
|
|
|
def pull_directory(remote, remotedir, localdir):
|
|
"""
|
|
Copy a remote directory to a local directory.
|
|
"""
|
|
log.debug('Transferring archived files from %s:%s to %s',
|
|
remote.shortname, remotedir, localdir)
|
|
if not os.path.exists(localdir):
|
|
os.mkdir(localdir)
|
|
_, local_tarfile = tempfile.mkstemp(dir=localdir)
|
|
remote.get_tar(remotedir, local_tarfile, sudo=True)
|
|
with open(local_tarfile, 'r+') as fb1:
|
|
tar = tarfile.open(mode='r|gz', fileobj=fb1)
|
|
while True:
|
|
ti = tar.next()
|
|
if ti is None:
|
|
break
|
|
|
|
if ti.isdir():
|
|
# ignore silently; easier to just create leading dirs below
|
|
pass
|
|
elif ti.isfile():
|
|
sub = safepath.munge(ti.name)
|
|
safepath.makedirs(root=localdir, path=os.path.dirname(sub))
|
|
tar.makefile(ti, targetpath=os.path.join(localdir, sub))
|
|
else:
|
|
if ti.isdev():
|
|
type_ = 'device'
|
|
elif ti.issym():
|
|
type_ = 'symlink'
|
|
elif ti.islnk():
|
|
type_ = 'hard link'
|
|
else:
|
|
type_ = 'unknown'
|
|
log.info('Ignoring tar entry: %r type %r', ti.name, type_)
|
|
continue
|
|
os.remove(local_tarfile)
|
|
|
|
|
|
def pull_directory_tarball(remote, remotedir, localfile):
|
|
"""
|
|
Copy a remote directory to a local tarball.
|
|
"""
|
|
log.debug('Transferring archived files from %s:%s to %s',
|
|
remote.shortname, remotedir, localfile)
|
|
remote.get_tar(remotedir, localfile, sudo=True)
|
|
|
|
|
|
def get_wwn_id_map(remote, devs):
|
|
"""
|
|
Extract ww_id_map information from ls output on the associated devs.
|
|
|
|
Sample dev information: /dev/sdb: /dev/disk/by-id/wwn-0xf00bad
|
|
|
|
:returns: map of devices to device id links
|
|
"""
|
|
stdout = None
|
|
try:
|
|
r = remote.run(
|
|
args=[
|
|
'ls',
|
|
'-l',
|
|
'/dev/disk/by-id/wwn-*',
|
|
],
|
|
stdout=StringIO(),
|
|
)
|
|
stdout = r.stdout.getvalue()
|
|
except Exception:
|
|
log.error('Failed to get wwn devices! Using /dev/sd* devices...')
|
|
return dict((d, d) for d in devs)
|
|
|
|
devmap = {}
|
|
|
|
# lines will be:
|
|
# lrwxrwxrwx 1 root root 9 Jan 22 14:58
|
|
# /dev/disk/by-id/wwn-0x50014ee002ddecaf -> ../../sdb
|
|
for line in stdout.splitlines():
|
|
comps = line.split(' ')
|
|
# comps[-1] should be:
|
|
# ../../sdb
|
|
rdev = comps[-1]
|
|
# translate to /dev/sdb
|
|
dev = '/dev/{d}'.format(d=rdev.split('/')[-1])
|
|
|
|
# comps[-3] should be:
|
|
# /dev/disk/by-id/wwn-0x50014ee002ddecaf
|
|
iddev = comps[-3]
|
|
|
|
if dev in devs:
|
|
devmap[dev] = iddev
|
|
|
|
return devmap
|
|
|
|
|
|
def get_scratch_devices(remote):
|
|
"""
|
|
Read the scratch disk list from remote host
|
|
"""
|
|
devs = []
|
|
try:
|
|
file_data = get_file(remote, "/scratch_devs")
|
|
devs = file_data.split()
|
|
except Exception:
|
|
r = remote.run(
|
|
args=['ls', run.Raw('/dev/[sv]d?')],
|
|
stdout=StringIO()
|
|
)
|
|
devs = r.stdout.getvalue().strip().split('\n')
|
|
|
|
# Remove root device (vm guests) from the disk list
|
|
for dev in devs:
|
|
if 'vda' in dev:
|
|
devs.remove(dev)
|
|
log.warn("Removing root device: %s from device list" % dev)
|
|
|
|
log.debug('devs={d}'.format(d=devs))
|
|
|
|
retval = []
|
|
for dev in devs:
|
|
try:
|
|
# FIXME: Split this into multiple calls.
|
|
remote.run(
|
|
args=[
|
|
# node exists
|
|
'stat',
|
|
dev,
|
|
run.Raw('&&'),
|
|
# readable
|
|
'sudo', 'dd', 'if=%s' % dev, 'of=/dev/null', 'count=1',
|
|
run.Raw('&&'),
|
|
# not mounted
|
|
run.Raw('!'),
|
|
'mount',
|
|
run.Raw('|'),
|
|
'grep', '-q', dev,
|
|
]
|
|
)
|
|
retval.append(dev)
|
|
except run.CommandFailedError:
|
|
log.debug("get_scratch_devices: %s is in use" % dev)
|
|
return retval
|
|
|
|
|
|
def wait_until_healthy(ctx, remote):
|
|
"""
|
|
Wait until a Ceph cluster is healthy. Give up after 15min.
|
|
"""
|
|
testdir = get_testdir(ctx)
|
|
with safe_while(tries=(900 / 6), action="wait_until_healthy") as proceed:
|
|
while proceed():
|
|
r = remote.run(
|
|
args=[
|
|
'adjust-ulimits',
|
|
'ceph-coverage',
|
|
'{tdir}/archive/coverage'.format(tdir=testdir),
|
|
'ceph',
|
|
'health',
|
|
],
|
|
stdout=StringIO(),
|
|
logger=log.getChild('health'),
|
|
)
|
|
out = r.stdout.getvalue()
|
|
log.debug('Ceph health: %s', out.rstrip('\n'))
|
|
if out.split(None, 1)[0] == 'HEALTH_OK':
|
|
break
|
|
time.sleep(1)
|
|
|
|
|
|
def wait_until_osds_up(ctx, cluster, remote):
|
|
"""Wait until all Ceph OSDs are booted."""
|
|
num_osds = num_instances_of_type(cluster, 'osd')
|
|
testdir = get_testdir(ctx)
|
|
while True:
|
|
r = remote.run(
|
|
args=[
|
|
'adjust-ulimits',
|
|
'ceph-coverage',
|
|
'{tdir}/archive/coverage'.format(tdir=testdir),
|
|
'ceph',
|
|
'osd', 'dump', '--format=json'
|
|
],
|
|
stdout=StringIO(),
|
|
logger=log.getChild('health'),
|
|
)
|
|
out = r.stdout.getvalue()
|
|
j = json.loads('\n'.join(out.split('\n')[1:]))
|
|
up = len(j['osds'])
|
|
log.debug('%d of %d OSDs are up' % (up, num_osds))
|
|
if up == num_osds:
|
|
break
|
|
time.sleep(1)
|
|
|
|
|
|
def reboot(node, timeout=300, interval=30):
|
|
"""
|
|
Reboots a given system, then waits for it to come back up and
|
|
re-establishes the ssh connection.
|
|
|
|
:param node: The teuthology.orchestra.remote.Remote object of the node
|
|
:param timeout: The amount of time, in seconds, after which to give up
|
|
waiting for the node to return
|
|
:param interval: The amount of time, in seconds, to wait between attempts
|
|
to re-establish with the node. This should not be set to
|
|
less than maybe 10, to make sure the node actually goes
|
|
down first.
|
|
"""
|
|
log.info("Rebooting {host}...".format(host=node.hostname))
|
|
node.run(args=['sudo', 'shutdown', '-r', 'now'])
|
|
reboot_start_time = time.time()
|
|
while time.time() - reboot_start_time < timeout:
|
|
time.sleep(interval)
|
|
if node.is_online or node.reconnect():
|
|
return
|
|
raise RuntimeError(
|
|
"{host} did not come up after reboot within {time}s".format(
|
|
host=node.hostname, time=timeout))
|
|
|
|
|
|
def reconnect(ctx, timeout, remotes=None):
|
|
"""
|
|
Connect to all the machines in ctx.cluster.
|
|
|
|
Presumably, some of them won't be up. Handle this
|
|
by waiting for them, unless the wait time exceeds
|
|
the specified timeout.
|
|
|
|
ctx needs to contain the cluster of machines you
|
|
wish it to try and connect to, as well as a config
|
|
holding the ssh keys for each of them. As long as it
|
|
contains this data, you can construct a context
|
|
that is a subset of your full cluster.
|
|
"""
|
|
log.info('Re-opening connections...')
|
|
starttime = time.time()
|
|
|
|
if remotes:
|
|
need_reconnect = remotes
|
|
else:
|
|
need_reconnect = ctx.cluster.remotes.keys()
|
|
|
|
while need_reconnect:
|
|
for remote in need_reconnect:
|
|
log.info('trying to connect to %s', remote.name)
|
|
success = remote.reconnect()
|
|
if not success:
|
|
if time.time() - starttime > timeout:
|
|
raise RuntimeError("Could not reconnect to %s" %
|
|
remote.name)
|
|
else:
|
|
need_reconnect.remove(remote)
|
|
|
|
log.debug('waited {elapsed}'.format(
|
|
elapsed=str(time.time() - starttime)))
|
|
time.sleep(1)
|
|
|
|
|
|
def get_clients(ctx, roles):
|
|
"""
|
|
return all remote roles that are clients.
|
|
"""
|
|
for role in roles:
|
|
assert isinstance(role, basestring)
|
|
PREFIX = 'client.'
|
|
assert role.startswith(PREFIX)
|
|
id_ = role[len(PREFIX):]
|
|
(remote,) = ctx.cluster.only(role).remotes.iterkeys()
|
|
yield (id_, remote)
|
|
|
|
|
|
def get_user():
|
|
"""
|
|
Return the username in the format user@host.
|
|
"""
|
|
return getpass.getuser() + '@' + socket.gethostname()
|
|
|
|
|
|
def read_config(ctx):
|
|
"""
|
|
read the default teuthology yaml configuration file.
|
|
"""
|
|
ctx.teuthology_config = {}
|
|
filename = os.path.join(os.environ['HOME'], '.teuthology.yaml')
|
|
|
|
if not os.path.exists(filename):
|
|
log.debug("%s not found", filename)
|
|
return
|
|
|
|
with file(filename) as f:
|
|
g = yaml.safe_load_all(f)
|
|
for new in g:
|
|
ctx.teuthology_config.update(new)
|
|
|
|
|
|
def get_mon_names(ctx):
|
|
"""
|
|
:returns: a list of monitor names
|
|
"""
|
|
mons = []
|
|
for remote, roles in ctx.cluster.remotes.items():
|
|
for role in roles:
|
|
if not role.startswith('mon.'):
|
|
continue
|
|
mons.append(role)
|
|
return mons
|
|
|
|
|
|
def get_first_mon(ctx, config):
|
|
"""
|
|
return the "first" mon (alphanumerically, for lack of anything better)
|
|
"""
|
|
firstmon = sorted(get_mon_names(ctx))[0]
|
|
assert firstmon
|
|
return firstmon
|
|
|
|
|
|
def replace_all_with_clients(cluster, config):
|
|
"""
|
|
Converts a dict containing a key all to one
|
|
mapping all clients to the value of config['all']
|
|
"""
|
|
assert isinstance(config, dict), 'config must be a dict'
|
|
if 'all' not in config:
|
|
return config
|
|
norm_config = {}
|
|
assert len(config) == 1, \
|
|
"config cannot have 'all' and specific clients listed"
|
|
for client in all_roles_of_type(cluster, 'client'):
|
|
norm_config['client.{id}'.format(id=client)] = config['all']
|
|
return norm_config
|
|
|
|
|
|
def deep_merge(a, b):
|
|
"""
|
|
Deep Merge. If a and b are both lists, all elements in b are
|
|
added into a. If a and b are both dictionaries, elements in b are
|
|
recursively added to a.
|
|
:param a: object items will be merged into
|
|
:param b: object items will be merged from
|
|
"""
|
|
if a is None:
|
|
return b
|
|
if b is None:
|
|
return a
|
|
if isinstance(a, list):
|
|
assert isinstance(b, list)
|
|
a.extend(b)
|
|
return a
|
|
if isinstance(a, dict):
|
|
assert isinstance(b, dict)
|
|
for (k, v) in b.iteritems():
|
|
if k in a:
|
|
a[k] = deep_merge(a[k], v)
|
|
else:
|
|
a[k] = v
|
|
return a
|
|
return b
|
|
|
|
|
|
def get_valgrind_args(testdir, name, preamble, v):
|
|
"""
|
|
Build a command line for running valgrind.
|
|
|
|
testdir - test results directory
|
|
name - name of daemon (for naming hte log file)
|
|
preamble - stuff we should run before valgrind
|
|
v - valgrind arguments
|
|
"""
|
|
if v is None:
|
|
return preamble
|
|
if not isinstance(v, list):
|
|
v = [v]
|
|
val_path = '/var/log/ceph/valgrind'.format(tdir=testdir)
|
|
if '--tool=memcheck' in v or '--tool=helgrind' in v:
|
|
extra_args = [
|
|
|
|
'valgrind',
|
|
'--num-callers=50',
|
|
'--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
|
|
'--xml=yes',
|
|
'--xml-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
|
|
'--time-stamp=yes',
|
|
]
|
|
else:
|
|
extra_args = [
|
|
'valgrind',
|
|
'--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
|
|
'--log-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
|
|
'--time-stamp=yes',
|
|
]
|
|
args = [
|
|
'cd', testdir,
|
|
run.Raw('&&'),
|
|
] + preamble + extra_args + v
|
|
log.debug('running %s under valgrind with args %s', name, args)
|
|
return args
|
|
|
|
|
|
def stop_daemons_of_type(ctx, type_):
|
|
"""
|
|
:param type_: type of daemons to be stopped.
|
|
"""
|
|
log.info('Shutting down %s daemons...' % type_)
|
|
exc_info = (None, None, None)
|
|
for daemon in ctx.daemons.iter_daemons_of_role(type_):
|
|
try:
|
|
daemon.stop()
|
|
except (run.CommandFailedError,
|
|
run.CommandCrashedError,
|
|
run.ConnectionLostError):
|
|
exc_info = sys.exc_info()
|
|
log.exception('Saw exception from %s.%s', daemon.role, daemon.id_)
|
|
if exc_info != (None, None, None):
|
|
raise exc_info[0], exc_info[1], exc_info[2]
|
|
|
|
|
|
def get_system_type(remote, distro=False, version=False):
|
|
"""
|
|
Return this system type (deb or rpm) or Distro.
|
|
"""
|
|
r = remote.run(
|
|
args=[
|
|
'sudo', 'lsb_release', '-is',
|
|
],
|
|
stdout=StringIO(),
|
|
)
|
|
system_value = r.stdout.getvalue().strip()
|
|
log.debug("System to be installed: %s" % system_value)
|
|
if version:
|
|
v = remote.run(args=['sudo', 'lsb_release', '-rs'], stdout=StringIO())
|
|
version = v.stdout.getvalue().strip()
|
|
if distro and version:
|
|
return system_value.lower(), version
|
|
if distro:
|
|
return system_value.lower()
|
|
if system_value in ['Ubuntu', 'Debian']:
|
|
return "deb"
|
|
if system_value in ['CentOS', 'Fedora', 'RedHatEnterpriseServer',
|
|
'openSUSE project', 'SUSE LINUX']:
|
|
return "rpm"
|
|
if version:
|
|
return version
|
|
return system_value
|
|
|
|
|
|
def get_distro(ctx):
|
|
"""
|
|
Get the name of the distro that we are using (usually the os_type).
|
|
"""
|
|
try:
|
|
os_type = ctx.config.get('os_type', ctx.os_type)
|
|
except AttributeError:
|
|
os_type = 'ubuntu'
|
|
try:
|
|
return ctx.config['downburst'].get('distro', os_type)
|
|
except KeyError:
|
|
return os_type
|
|
except AttributeError:
|
|
return ctx.os_type
|
|
|
|
|
|
def get_distro_version(ctx):
|
|
"""
|
|
Get the verstion of the distro that we are using (release number).
|
|
"""
|
|
default_os_version = dict(
|
|
ubuntu="12.04",
|
|
fedora="18",
|
|
centos="6.4",
|
|
opensuse="12.2",
|
|
sles="11-sp2",
|
|
rhel="6.4",
|
|
debian='7.0'
|
|
)
|
|
distro = get_distro(ctx)
|
|
if ctx.os_version is not None:
|
|
return ctx.os_version
|
|
try:
|
|
os_version = ctx.config.get('os_version', default_os_version[distro])
|
|
except AttributeError:
|
|
os_version = default_os_version[distro]
|
|
try:
|
|
return ctx.config['downburst'].get('distroversion', os_version)
|
|
except (KeyError, AttributeError):
|
|
return os_version
|
|
|
|
|
|
def get_multi_machine_types(machinetype):
|
|
"""
|
|
Converts machine type string to list based on common deliminators
|
|
"""
|
|
machinetypes = []
|
|
machine_type_deliminator = [',', ' ', '\t']
|
|
for deliminator in machine_type_deliminator:
|
|
if deliminator in machinetype:
|
|
machinetypes = machinetype.split(deliminator)
|
|
break
|
|
if not machinetypes:
|
|
machinetypes.append(machinetype)
|
|
return machinetypes
|