""" Ceph cluster task, deployed via ceph-daemon and ssh orchestrator """ from cStringIO import StringIO import argparse import configobj import contextlib import errno import logging import os import json import time import gevent import re import socket import uuid from paramiko import SSHException from ceph_manager import CephManager, write_conf from tarfile import ReadError from tasks.cephfs.filesystem import Filesystem from teuthology import misc as teuthology from teuthology import contextutil from teuthology import exceptions from teuthology.orchestra import run import ceph_client as cclient from teuthology.orchestra.daemon import DaemonGroup from tasks.daemonwatchdog import DaemonWatchdog # these items we use from ceph.py should probably eventually move elsewhere from tasks.ceph import get_mons CEPH_ROLE_TYPES = ['mon', 'mgr', 'osd', 'mds', 'rgw'] log = logging.getLogger(__name__) def shell(ctx, cluster_name, remote, args, **kwargs): testdir = teuthology.get_testdir(ctx) return remote.run( args=[ 'sudo', '{}/ceph-daemon'.format(testdir), '--image', ctx.image, 'shell', '-c', '{}/{}.conf'.format(testdir, cluster_name), '-k', '{}/{}.keyring'.format(testdir, cluster_name), '--fsid', ctx.ceph[cluster_name].fsid, '--', ] + args, **kwargs ) def build_initial_config(ctx, config): cluster_name = config['cluster'] #path = os.path.join(os.path.dirname(__file__), 'ceph.conf.template') conf = configobj.ConfigObj() #path, file_error=True) conf.setdefault('global', {}) conf['global']['fsid'] = ctx.ceph[cluster_name].fsid # overrides for section, keys in config['conf'].items(): for key, value in keys.items(): log.info("[%s] %s = %s" % (section, key, value)) if section not in conf: conf[section] = {} conf[section][key] = value return conf @contextlib.contextmanager def normalize_hostnames(ctx): """ Ensure we have short hostnames throughout, for consistency between remote.shortname and socket.gethostname() in ceph-daemon. """ log.info('Normalizing hostnames...') ctx.cluster.run(args=[ 'sudo', 'hostname', run.Raw('$(hostname -s)'), ]) try: yield finally: pass @contextlib.contextmanager def download_ceph_daemon(ctx, config): cluster_name = config['cluster'] testdir = teuthology.get_testdir(ctx) branch = config.get('ceph-daemon-branch', 'master') log.info('Downloading ceph-daemon...') ctx.cluster.run( args=[ 'curl', '--silent', 'https://raw.githubusercontent.com/ceph/ceph/%s/src/ceph-daemon/ceph-daemon' % branch, run.Raw('>'), '{tdir}/ceph-daemon'.format(tdir=testdir), run.Raw('&&'), 'test', '-s', '{tdir}/ceph-daemon'.format(tdir=testdir), run.Raw('&&'), 'chmod', '+x', '{tdir}/ceph-daemon'.format(tdir=testdir), ], ) try: yield finally: log.info('Removing cluster...') ctx.cluster.run(args=[ 'sudo', '{}/ceph-daemon'.format(testdir), 'rm-cluster', '--fsid', ctx.ceph[cluster_name].fsid, '--force', ]) log.info('Removing ceph-daemon ...') ctx.cluster.run( args=[ 'rm', '-rf', '{tdir}/ceph-daemon'.format(tdir=testdir), ], ) @contextlib.contextmanager def ceph_log(ctx, config): cluster_name = config['cluster'] fsid = ctx.ceph[cluster_name].fsid try: yield finally: if ctx.archive is not None and \ not (ctx.config.get('archive-on-error') and ctx.summary['success']): # and logs log.info('Compressing logs...') run.wait( ctx.cluster.run( args=[ 'sudo', 'find', '/var/log/ceph/' + fsid, '-name', '*.log', '-print0', run.Raw('|'), 'sudo', 'xargs', '-0', '--no-run-if-empty', '--', 'gzip', '--', ], wait=False, ), ) log.info('Archiving logs...') path = os.path.join(ctx.archive, 'remote') try: os.makedirs(path) except OSError as e: pass for remote in ctx.cluster.remotes.keys(): sub = os.path.join(path, remote.name) try: os.makedirs(sub) except OSError as e: pass teuthology.pull_directory(remote, '/var/log/ceph/' + fsid, os.path.join(sub, 'log')) @contextlib.contextmanager def ceph_crash(ctx, config): """ Gather crash dumps from /var/lib/ceph/$fsid/crash """ cluster_name = config['cluster'] fsid = ctx.ceph[cluster_name].fsid try: yield finally: if ctx.archive is not None: log.info('Archiving crash dumps...') path = os.path.join(ctx.archive, 'remote') try: os.makedirs(path) except OSError as e: pass for remote in ctx.cluster.remotes.keys(): sub = os.path.join(path, remote.name) try: os.makedirs(sub) except OSError as e: pass try: teuthology.pull_directory(remote, '/var/lib/ceph/%s/crash' % fsid, os.path.join(sub, 'crash')) except ReadError as e: pass @contextlib.contextmanager def ceph_bootstrap(ctx, config): cluster_name = config['cluster'] testdir = teuthology.get_testdir(ctx) fsid = ctx.ceph[cluster_name].fsid mons = ctx.ceph[cluster_name].mons first_mon_role = sorted(mons.keys())[0] _, _, first_mon = teuthology.split_role(first_mon_role) (bootstrap_remote,) = ctx.cluster.only(first_mon_role).remotes.keys() log.info('First mon is mon.%s on %s' % (first_mon, bootstrap_remote.shortname)) ctx.ceph[cluster_name].bootstrap_remote = bootstrap_remote ctx.ceph[cluster_name].first_mon = first_mon others = ctx.cluster.remotes[bootstrap_remote] log.info('others %s' % others) mgrs = sorted([r for r in others if teuthology.is_type('mgr', cluster_name)(r)]) if not mgrs: raise RuntimeError('no mgrs on the same host as first mon %s' % first_mon) _, _, first_mgr = teuthology.split_role(mgrs[0]) log.info('First mgr is %s' % (first_mgr)) ctx.ceph[cluster_name].first_mgr = first_mgr try: # write seed config log.info('Writing seed config...') conf_fp = StringIO() seed_config = build_initial_config(ctx, config) seed_config.write(conf_fp) teuthology.write_file( remote=bootstrap_remote, path='{}/seed.{}.conf'.format(testdir, cluster_name), data=conf_fp.getvalue()) # bootstrap log.info('Bootstrapping...') cmd = [ 'sudo', '{}/ceph-daemon'.format(testdir), '--image', ctx.image, 'bootstrap', '--fsid', fsid, '--mon-id', first_mon, '--mgr-id', first_mgr, '--config', '{}/seed.{}.conf'.format(testdir, cluster_name), '--output-config', '{}/{}.conf'.format(testdir, cluster_name), '--output-keyring', '{}/{}.keyring'.format(testdir, cluster_name), '--output-pub-ssh-key', '{}/{}.pub'.format(testdir, cluster_name), ] if mons[first_mon_role].startswith('['): cmd += ['--mon-addrv', mons[first_mon_role]] else: cmd += ['--mon-ip', mons[first_mon_role]] if config.get('skip_dashboard'): cmd += ['--skip-dashboard'] # bootstrap makes the keyring root 0600, so +r it for our purposes cmd += [ run.Raw('&&'), 'sudo', 'chmod', '+r', '{}/{}.keyring'.format(testdir, cluster_name), ] bootstrap_remote.run(args=cmd) # register initial daemons ctx.daemons.register_daemon( bootstrap_remote, 'mon', first_mon, cluster=cluster_name, fsid=fsid, logger=log.getChild('mon.' + first_mon), wait=False, started=True, ) ctx.daemons.register_daemon( bootstrap_remote, 'mgr', first_mgr, cluster=cluster_name, fsid=fsid, logger=log.getChild('mgr.' + first_mgr), wait=False, started=True, ) # fetch keys and configs log.info('Fetching config...') ctx.ceph[cluster_name].config_file = teuthology.get_file( remote=bootstrap_remote, path='{}/{}.conf'.format(testdir, cluster_name)) log.info('Fetching client.admin keyring...') ctx.ceph[cluster_name].admin_keyring = teuthology.get_file( remote=bootstrap_remote, path='{}/{}.keyring'.format(testdir, cluster_name)) log.info('Fetching mon keyring...') ctx.ceph[cluster_name].mon_keyring = teuthology.get_file( remote=bootstrap_remote, path='/var/lib/ceph/%s/mon.%s/keyring' % (fsid, first_mon), sudo=True) # fetch ssh key, distribute to additional nodes log.info('Fetching pub ssh key...') ssh_pub_key = teuthology.get_file( remote=bootstrap_remote, path='{}/{}.pub'.format(testdir, cluster_name) ).strip() log.info('Installing pub ssh key for root users...') ctx.cluster.run(args=[ 'sudo', 'install', '-d', '-m', '0700', '/root/.ssh', run.Raw('&&'), 'echo', ssh_pub_key, run.Raw('|'), 'sudo', 'tee', '-a', '/root/.ssh/authorized_keys', run.Raw('&&'), 'sudo', 'chmod', '0600', '/root/.ssh/authorized_keys', ]) # add other hosts for remote in ctx.cluster.remotes.keys(): if remote == bootstrap_remote: continue log.info('Writing conf and keyring to %s' % remote.shortname) teuthology.write_file( remote=remote, path='{}/{}.conf'.format(testdir, cluster_name), data=ctx.ceph[cluster_name].config_file) teuthology.write_file( remote=remote, path='{}/{}.keyring'.format(testdir, cluster_name), data=ctx.ceph[cluster_name].admin_keyring) log.info('Adding host %s to orchestrator...' % remote.shortname) shell(ctx, cluster_name, remote, [ 'ceph', 'orchestrator', 'host', 'add', remote.shortname ]) yield finally: log.info('Cleaning up testdir ceph.* files...') ctx.cluster.run(args=[ 'rm', '-f', '{}/seed.{}.conf'.format(testdir, cluster_name), '{}/{}.pub'.format(testdir, cluster_name), '{}/{}.conf'.format(testdir, cluster_name), '{}/{}.keyring'.format(testdir, cluster_name), ]) log.info('Stopping all daemons...') # this doesn't block until they are all stopped... #ctx.cluster.run(args=['sudo', 'systemctl', 'stop', 'ceph.target']) # so, stop them individually for role in ctx.daemons.resolve_role_list(None, CEPH_ROLE_TYPES): cluster, type_, id_ = teuthology.split_role(role) ctx.daemons.get_daemon(type_, id_, cluster).stop() @contextlib.contextmanager def ceph_mons(ctx, config): """ Deploy any additional mons """ cluster_name = config['cluster'] fsid = ctx.ceph[cluster_name].fsid testdir = teuthology.get_testdir(ctx) num_mons = 1 try: for remote, roles in ctx.cluster.remotes.items(): for mon in [r for r in roles if teuthology.is_type('mon', cluster_name)(r)]: c_, _, id_ = teuthology.split_role(mon) if c_ == cluster_name and id_ == ctx.ceph[cluster_name].first_mon: continue log.info('Adding %s on %s' % (mon, remote.shortname)) num_mons += 1 shell(ctx, cluster_name, remote, [ 'ceph', 'orchestrator', 'mon', 'update', str(num_mons), remote.shortname + ':' + ctx.ceph[cluster_name].mons[mon] + '=' + id_, ]) ctx.daemons.register_daemon( remote, 'mon', id_, cluster=cluster_name, fsid=fsid, logger=log.getChild(mon), wait=False, started=True, ) while True: log.info('Waiting for %d mons in monmap...' % (num_mons)) r = shell( ctx=ctx, cluster_name=cluster_name, remote=remote, args=[ 'ceph', 'mon', 'dump', '-f', 'json', ], stdout=StringIO(), ) j = json.loads(r.stdout.getvalue()) if len(j['mons']) == num_mons: break time.sleep(1) ## FIXME: refresh ceph.conf files for all mons + first mgr ## yield finally: pass @contextlib.contextmanager def ceph_mgrs(ctx, config): """ Deploy any additional mgrs """ cluster_name = config['cluster'] fsid = ctx.ceph[cluster_name].fsid testdir = teuthology.get_testdir(ctx) try: nodes = [] for remote, roles in ctx.cluster.remotes.items(): for mgr in [r for r in roles if teuthology.is_type('mgr', cluster_name)(r)]: c_, _, id_ = teuthology.split_role(mgr) if c_ == cluster_name and id_ == ctx.ceph[cluster_name].first_mgr: continue log.info('Adding %s on %s' % (mgr, remote.shortname)) nodes.append(remote.shortname + '=' + id_) ctx.daemons.register_daemon( remote, 'mgr', id_, cluster=cluster_name, fsid=fsid, logger=log.getChild(mgr), wait=False, started=True, ) shell(ctx, cluster_name, remote, [ 'ceph', 'orchestrator', 'mgr', 'update', str(len(nodes) + 1)] + nodes ) yield finally: pass @contextlib.contextmanager def ceph_osds(ctx, config): """ Deploy OSDs """ cluster_name = config['cluster'] fsid = ctx.ceph[cluster_name].fsid try: log.info('Deploying OSDs...') # provision OSDs in numeric order id_to_remote = {} devs_by_remote = {} for remote, roles in ctx.cluster.remotes.items(): devs_by_remote[remote] = teuthology.get_scratch_devices(remote) for osd in [r for r in roles if teuthology.is_type('osd', cluster_name)(r)]: _, _, id_ = teuthology.split_role(osd) id_to_remote[int(id_)] = (osd, remote) cur = 0 for osd_id in sorted(id_to_remote.keys()): osd, remote = id_to_remote[osd_id] _, _, id_ = teuthology.split_role(osd) assert int(id_) == cur devs = devs_by_remote[remote] assert devs ## FIXME ## dev = devs.pop() log.info('Deploying %s on %s with %s...' % ( osd, remote.shortname, dev)) shell(ctx, cluster_name, remote, [ 'ceph-volume', 'lvm', 'zap', dev]) shell(ctx, cluster_name, remote, [ 'ceph', 'orchestrator', 'osd', 'create', remote.shortname + ':' + dev ]) ctx.daemons.register_daemon( remote, 'osd', id_, cluster=cluster_name, fsid=fsid, logger=log.getChild(osd), wait=False, started=True, ) cur += 1 yield finally: pass @contextlib.contextmanager def ceph_mdss(ctx, config): """ Deploy MDSss """ cluster_name = config['cluster'] fsid = ctx.ceph[cluster_name].fsid testdir = teuthology.get_testdir(ctx) nodes = [] for remote, roles in ctx.cluster.remotes.items(): for role in [r for r in roles if teuthology.is_type('mds', cluster_name)(r)]: c_, _, id_ = teuthology.split_role(role) log.info('Adding %s on %s' % (role, remote.shortname)) nodes.append(remote.shortname + '=' + id_) ctx.daemons.register_daemon( remote, 'mds', id_, cluster=cluster_name, fsid=fsid, logger=log.getChild(role), wait=False, started=True, ) shell(ctx, cluster_name, remote, [ 'ceph', 'orchestrator', 'mds', 'update', 'all', str(len(nodes))] + nodes ) yield @contextlib.contextmanager def ceph_initial(): try: yield finally: log.info('Teardown complete') ## public methods @contextlib.contextmanager def stop(ctx, config): """ Stop ceph daemons For example:: tasks: - ceph.stop: [mds.*] tasks: - ceph.stop: [osd.0, osd.2] tasks: - ceph.stop: daemons: [osd.0, osd.2] """ if config is None: config = {} elif isinstance(config, list): config = {'daemons': config} daemons = ctx.daemons.resolve_role_list( config.get('daemons', None), CEPH_ROLE_TYPES, True) clusters = set() for role in daemons: cluster, type_, id_ = teuthology.split_role(role) ctx.daemons.get_daemon(type_, id_, cluster).stop() clusters.add(cluster) # for cluster in clusters: # ctx.ceph[cluster].watchdog.stop() # ctx.ceph[cluster].watchdog.join() yield @contextlib.contextmanager def distribute_config_and_admin_keyring(ctx, config): """ Distribute a sufficient config and keyring for clients """ cluster_name = config['cluster'] log.info('Distributing config and client.admin keyring...') for remote, roles in ctx.cluster.remotes.items(): remote.run(args=['sudo', 'mkdir', '-p', '/etc/ceph']) teuthology.sudo_write_file( remote=remote, path='/etc/ceph/{}.conf'.format(cluster_name), data=ctx.ceph[cluster_name].config_file) teuthology.sudo_write_file( remote=remote, path='/etc/ceph/{}.client.admin.keyring'.format(cluster_name), data=ctx.ceph[cluster_name].admin_keyring) try: yield finally: ctx.cluster.run(args=[ 'sudo', 'rm', '-f', '/etc/ceph/{}.conf'.format(cluster_name), '/etc/ceph/{}.client.admin.keyring'.format(cluster_name), ]) @contextlib.contextmanager def task(ctx, config): if config is None: config = {} assert isinstance(config, dict), \ "task only supports a dictionary for configuration" overrides = ctx.config.get('overrides', {}) teuthology.deep_merge(config, overrides.get('ceph', {})) log.info('Config: ' + str(config)) testdir = teuthology.get_testdir(ctx) # set up cluster context first_ceph_cluster = False if not hasattr(ctx, 'daemons'): first_ceph_cluster = True ctx.daemons = DaemonGroup(use_ceph_daemon=True) if not hasattr(ctx, 'ceph'): ctx.ceph = {} if 'cluster' not in config: config['cluster'] = 'ceph' cluster_name = config['cluster'] ctx.ceph[cluster_name] = argparse.Namespace() #validate_config(ctx, config) # image branch = config.get('branch', 'master') ### FIXME ### if branch in ['master', 'nautilus']: ctx.image = 'ceph/daemon-base:latest-%s-devel' % branch else: # ctx.image = 'ceph-ci/ceph:%s' % branch ctx.image = 'cephci/daemon-base:%s' % branch log.info('Cluster image is %s' % ctx.image) # uuid fsid = str(uuid.uuid1()) log.info('Cluster fsid is %s' % fsid) ctx.ceph[cluster_name].fsid = fsid # mon ips log.info('Choosing monitor IPs and ports...') remotes_and_roles = ctx.cluster.remotes.items() roles = [role_list for (remote, role_list) in remotes_and_roles] ips = [host for (host, port) in (remote.ssh.get_transport().getpeername() for (remote, role_list) in remotes_and_roles)] ctx.ceph[cluster_name].mons = get_mons( roles, ips, cluster_name, mon_bind_msgr2=config.get('mon_bind_msgr2', True), mon_bind_addrvec=config.get('mon_bind_addrvec', True), ) log.info('Monitor IPs: %s' % ctx.ceph[cluster_name].mons) with contextutil.nested( lambda: ceph_initial(), lambda: normalize_hostnames(ctx=ctx), lambda: download_ceph_daemon(ctx=ctx, config=config), lambda: ceph_log(ctx=ctx, config=config), lambda: ceph_crash(ctx=ctx, config=config), lambda: ceph_bootstrap(ctx=ctx, config=config), lambda: ceph_mons(ctx=ctx, config=config), lambda: ceph_mgrs(ctx=ctx, config=config), lambda: ceph_osds(ctx=ctx, config=config), lambda: ceph_mdss(ctx=ctx, config=config), lambda: distribute_config_and_admin_keyring(ctx=ctx, config=config), ): try: log.info('Setup complete, yielding') yield finally: log.info('Teardown begin')