Merge pull request #296 from ceph/wip-standby-mds

Wip standby mds
This commit is contained in:
Zack Cerza 2014-07-25 16:44:34 -06:00
commit 2999a4d492
5 changed files with 235 additions and 123 deletions

View File

@ -5,12 +5,16 @@ import logging
import time
from teuthology import misc
from teuthology.parallel import parallel
from teuthology.task import ceph_manager
log = logging.getLogger(__name__)
DAEMON_WAIT_TIMEOUT = 120
class Filesystem(object):
"""
This object is for driving a CephFS filesystem.
@ -23,51 +27,112 @@ class Filesystem(object):
self._ctx = ctx
self._config = config
mds_list = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
if len(mds_list) != 1:
# Require exactly one MDS, the code path for creation failure when
# a standby is available is different
raise RuntimeError("This task requires exactly one MDS")
self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
if len(self.mds_ids) == 0:
raise RuntimeError("This task requires at least one MDS")
self.mds_id = mds_list[0]
(mds_remote,) = ctx.cluster.only('mds.{_id}'.format(_id=self.mds_id)).remotes.iterkeys()
manager = ceph_manager.CephManager(
mds_remote, ctx=ctx, logger=log.getChild('ceph_manager'),
)
self.mds_manager = manager
first_mon = misc.get_first_mon(ctx, config)
(mon_remote,) = ctx.cluster.only(first_mon).remotes.iterkeys()
self.mon_manager = ceph_manager.CephManager(mon_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
self.client_id = client_list[0]
self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
def mds_stop(self):
def are_daemons_healthy(self):
"""
Stop the MDS daemon process. If it held a rank, that rank
Return true if all daemons are in one of active, standby, standby-replay
:return:
"""
status = self.mon_manager.get_mds_status_all()
for mds_id, mds_status in status['info'].items():
if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
return False
return True
def wait_for_daemons(self, timeout=None):
"""
Wait until all daemons are healthy
:return:
"""
if timeout is None:
timeout = DAEMON_WAIT_TIMEOUT
elapsed = 0
while True:
if self.are_daemons_healthy():
return
else:
time.sleep(1)
elapsed += 1
if elapsed > timeout:
raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
def get_lone_mds_id(self):
if len(self.mds_ids) != 1:
raise ValueError("Explicit MDS argument required when multiple MDSs in use")
else:
return self.mds_ids[0]
def _one_or_all(self, mds_id, cb):
"""
Call a callback for a single named MDS, or for all
:param mds_id: MDS daemon name, or None
:param cb: Callback taking single argument of MDS daemon name
"""
if mds_id is None:
with parallel() as p:
for mds_id in self.mds_ids:
p.spawn(cb, mds_id)
else:
cb(mds_id)
def mds_stop(self, mds_id=None):
"""
Stop the MDS daemon process(se). If it held a rank, that rank
will eventually go laggy.
"""
mds = self._ctx.daemons.get_daemon('mds', self.mds_id)
mds.stop()
self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
def mds_fail(self):
def mds_fail(self, mds_id=None):
"""
Inform MDSMonitor that the daemon process is dead. If it held
Inform MDSMonitor of the death of the daemon process(es). If it held
a rank, that rank will be relinquished.
"""
self.mds_manager.raw_cluster_cmd("mds", "fail", "0")
self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
def mds_restart(self):
mds = self._ctx.daemons.get_daemon('mds', self.mds_id)
mds.restart()
def mds_restart(self, mds_id=None):
self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
def mds_fail_restart(self, mds_id=None):
"""
Variation on restart that includes marking MDSs as failed, so that doing this
operation followed by waiting for healthy daemon states guarantees that they
have gone down and come up, rather than potentially seeing the healthy states
that existed before the restart.
"""
def _fail_restart(id_):
self.mds_daemons[id_].stop()
self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
self.mds_daemons[id_].restart()
self._one_or_all(mds_id, _fail_restart)
def reset(self):
log.info("Creating new filesystem")
assert not self._ctx.daemons.get_daemon('mds', self.mds_id).running()
self.mds_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "0")
self.mds_manager.raw_cluster_cmd_result('mds', 'fail', self.mds_id)
self.mds_manager.raw_cluster_cmd_result('fs', 'rm', "default", "--yes-i-really-mean-it")
self.mds_manager.raw_cluster_cmd_result('fs', 'new', "default", "metadata", "data")
self.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "0")
for mds_id in self.mds_ids:
assert not self._ctx.daemons.get_daemon('mds', mds_id).running()
self.mon_manager.raw_cluster_cmd_result('mds', 'fail', mds_id)
self.mon_manager.raw_cluster_cmd_result('fs', 'rm', "default", "--yes-i-really-mean-it")
self.mon_manager.raw_cluster_cmd_result('fs', 'new', "default", "metadata", "data")
def get_metadata_object(self, object_type, object_id):
"""
@ -110,8 +175,10 @@ class Filesystem(object):
return version
def mds_asok(self, command):
proc = self.mds_manager.admin_socket('mds', self.mds_id, command)
def mds_asok(self, command, mds_id=None):
if mds_id is None:
mds_id = self.get_lone_mds_id()
proc = self.mon_manager.admin_socket('mds', mds_id, command)
response_data = proc.stdout.getvalue()
log.info("mds_asok output: {0}".format(response_data))
if response_data.strip():
@ -119,7 +186,7 @@ class Filesystem(object):
else:
return None
def wait_for_state(self, goal_state, reject=None, timeout=None):
def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None):
"""
Block until the MDS reaches a particular state, or a failure condition
is met.
@ -130,10 +197,13 @@ class Filesystem(object):
:return: number of seconds waited, rounded down to integer
"""
if mds_id is None:
mds_id = self.get_lone_mds_id()
elapsed = 0
while True:
# mds_info is None if no daemon currently claims this rank
mds_info = self.mds_manager.get_mds_status(self.mds_id)
mds_info = self.mon_manager.get_mds_status(mds_id)
current_state = mds_info['state'] if mds_info else None
if current_state == goal_state:

View File

@ -80,3 +80,19 @@ class KernelMount(CephFSMount):
mnt,
],
)
def cleanup(self):
pass
def umount_wait(self):
pass
def is_mounted(self):
return True
def wait_until_mounted(self):
pass
def teardown(self):
super(KernelMount, self).teardown()
self.umount()

View File

@ -1,4 +1,4 @@
from contextlib import contextmanager
import logging
import datetime
from textwrap import dedent
@ -47,6 +47,22 @@ class CephFSMount(object):
def cleanup(self):
raise NotImplementedError()
def wait_until_mounted(self):
raise NotImplementedError()
@contextmanager
def mounted(self):
"""
A context manager, from an initially unmounted state, to mount
this, yield, and then unmount and clean up.
"""
self.mount()
self.wait_until_mounted()
try:
yield
finally:
self.umount_wait()
def create_files(self):
assert(self.is_mounted())

View File

@ -4,7 +4,6 @@ import logging
from teuthology import misc
from teuthology.task.ceph import write_conf
from teuthology.task.ceph_fuse import task as ceph_fuse_ctx
from teuthology.task.cephfs.filesystem import Filesystem
log = logging.getLogger(__name__)
@ -28,6 +27,13 @@ def task(ctx, config):
client: client.0
"""
if not hasattr(ctx, 'ceph'):
raise RuntimeError("This task must be nested in 'ceph' task")
if not hasattr(ctx, 'mounts'):
raise RuntimeError("This task must be nested inside 'kclient' or 'ceph_fuse' task")
# Determine which client we will use
if config and 'client' in config:
# Use client specified in config
client_role = config['client']
@ -43,17 +49,13 @@ def task(ctx, config):
client_id = client_list[0]
except IndexError:
raise RuntimeError("This task requires at least one client")
else:
client_role = "client.{0}".format(client_id)
fs = Filesystem(ctx, config)
ctx.fs = fs
old_journal_version = JOURNAL_FORMAT_LEGACY
new_journal_version = JOURNAL_FORMAT_RESILIENT
# Set config so that journal will be created in older format
if not hasattr(ctx, 'ceph'):
raise RuntimeError("This task must be nested in 'ceph' task")
if 'mds' not in ctx.ceph.conf:
ctx.ceph.conf['mds'] = {}
ctx.ceph.conf['mds']['mds journal format'] = old_journal_version
@ -61,13 +63,15 @@ def task(ctx, config):
# used a different config path this won't work.
# Create a filesystem using the older journal format.
for mount in ctx.mounts.values():
mount.umount_wait()
fs.mds_stop()
fs.reset()
fs.mds_restart()
# Do some client work so that the log is populated with something.
with ceph_fuse_ctx(ctx, [client_role]) as client_mounts:
mount = client_mounts[client_id]
mount = ctx.mounts[client_id]
with mount.mounted():
mount.create_files()
mount.check_files() # sanity, this should always pass
@ -76,12 +80,15 @@ def task(ctx, config):
write_conf(ctx)
# Restart the MDS.
fs.mds_restart()
fs.mds_fail_restart()
fs.wait_for_daemons()
# This ensures that all daemons come up into a valid state
fs.wait_for_daemons()
# Check that files created in the initial client workload are still visible
# in a client mount.
with ceph_fuse_ctx(ctx, [client_role]) as client_mounts:
mount = client_mounts[client_id]
with mount.mounted():
mount.check_files()
# Verify that the journal really has been rewritten.
@ -91,4 +98,8 @@ def task(ctx, config):
new_journal_version, journal_version()
))
# Leave all MDSs and clients running for any child tasks
for mount in ctx.mounts.values():
mount.mount()
yield

View File

@ -5,12 +5,15 @@ import logging
import pipes
import os
from teuthology import misc as teuthology
from teuthology import misc
from teuthology.orchestra.run import CommandFailedError
from teuthology.parallel import parallel
from ..orchestra import run
log = logging.getLogger(__name__)
CLIENT_PREFIX = 'client.'
def task(ctx, config):
"""
@ -63,7 +66,7 @@ def task(ctx, config):
'configuration must contain a dictionary of clients'
overrides = ctx.config.get('overrides', {})
teuthology.deep_merge(config, overrides.get('workunit', {}))
misc.deep_merge(config, overrides.get('workunit', {}))
refspec = config.get('branch')
if refspec is None:
@ -77,46 +80,42 @@ def task(ctx, config):
log.info('Pulling workunits from ref %s', refspec)
created_dir_dict = {}
created_mountpoint = {}
if config.get('env') is not None:
assert isinstance(config['env'], dict), 'env must be a dictionary'
clients = config['clients']
# Create scratch dirs for any non-all workunits
log.info('Making a separate scratch dir for every client...')
for role in clients.iterkeys():
assert isinstance(role, basestring)
if role == "all":
continue
PREFIX = 'client.'
assert role.startswith(PREFIX)
created_mnt_dir = _make_scratch_dir(ctx, role, config.get('subdir'))
created_dir_dict[role] = created_mnt_dir
all_spec = False #is there an all grouping?
assert role.startswith(CLIENT_PREFIX)
created_mnt_dir = _make_scratch_dir(ctx, role, config.get('subdir'))
created_mountpoint[role] = created_mnt_dir
# Execute any non-all workunits
with parallel() as p:
for role, tests in clients.iteritems():
if role != "all":
p.spawn(_run_tests, ctx, refspec, role, tests,
config.get('env'), timeout=timeout)
else:
all_spec = True
if all_spec:
# Clean up dirs from any non-all workunits
for role, created in created_mountpoint.items():
_delete_dir(ctx, role, created)
# Execute any 'all' workunits
if 'all' in clients:
all_tasks = clients["all"]
_spawn_on_all_clients(ctx, refspec, all_tasks, config.get('env'),
config.get('subdir'), timeout=timeout)
for role in clients.iterkeys():
assert isinstance(role, basestring)
if role == "all":
continue
PREFIX = 'client.'
assert role.startswith(PREFIX)
if created_dir_dict[role]:
_delete_dir(ctx, role)
def _delete_dir(ctx, role):
def _delete_dir(ctx, role, created_mountpoint):
"""
Delete file used by this role, and delete the directory that this
role appeared in.
@ -124,37 +123,35 @@ def _delete_dir(ctx, role):
:param ctx: Context
:param role: "role.#" where # is used for the role id.
"""
PREFIX = 'client.'
testdir = teuthology.get_testdir(ctx)
id_ = role[len(PREFIX):]
testdir = misc.get_testdir(ctx)
id_ = role[len(CLIENT_PREFIX):]
(remote,) = ctx.cluster.only(role).remotes.iterkeys()
mnt = os.path.join(testdir, 'mnt.{id}'.format(id=id_))
# Is there any reason why this is not: join(mnt, role) ?
client = os.path.join(mnt, 'client.{id}'.format(id=id_))
try:
remote.run(
args=[
'rm',
'-rf',
'--',
client,
],
)
log.info("Deleted dir {dir}".format(dir=client))
except Exception:
log.exception("Caught an exception deleting dir {dir}".format(dir=client))
try:
# Remove the directory inside the mount where the workunit ran
remote.run(
args=[
'rm',
'-rf',
'--',
client,
],
)
log.info("Deleted dir {dir}".format(dir=mnt))
# If the mount was an artificially created dir, delete that too
if created_mountpoint:
remote.run(
args=[
'rmdir',
'--',
mnt,
],
)
log.info("Deleted dir {dir}".format(dir=mnt))
except Exception:
log.exception("Caught an exception deleting dir {dir}".format(dir=mnt))
],
)
log.info("Deleted artificial mount point {dir}".format(dir=client))
def _make_scratch_dir(ctx, role, subdir):
"""
@ -165,13 +162,12 @@ def _make_scratch_dir(ctx, role, subdir):
:param role: "role.#" where # is used for the role id.
:param subdir: use this subdir (False if not used)
"""
retVal = False
PREFIX = 'client.'
id_ = role[len(PREFIX):]
created_mountpoint = False
id_ = role[len(CLIENT_PREFIX):]
log.debug("getting remote for {id} role {role_}".format(id=id_, role_=role))
(remote,) = ctx.cluster.only(role).remotes.iterkeys()
dir_owner = remote.user
mnt = os.path.join(teuthology.get_testdir(ctx), 'mnt.{id}'.format(id=id_))
mnt = os.path.join(misc.get_testdir(ctx), 'mnt.{id}'.format(id=id_))
# if neither kclient nor ceph-fuse are required for a workunit,
# mnt may not exist. Stat and create the directory if it doesn't.
try:
@ -180,22 +176,24 @@ def _make_scratch_dir(ctx, role, subdir):
'stat',
'--',
mnt,
],
)
],
)
log.info('Did not need to create dir {dir}'.format(dir=mnt))
except Exception:
except CommandFailedError:
remote.run(
args=[
'mkdir',
'--',
mnt,
],
)
],
)
log.info('Created dir {dir}'.format(dir=mnt))
retVal = True
created_mountpoint = True
if not subdir: subdir = 'client.{id}'.format(id=id_)
if retVal:
if not subdir:
subdir = 'client.{id}'.format(id=id_)
if created_mountpoint:
remote.run(
args=[
'cd',
@ -205,8 +203,8 @@ def _make_scratch_dir(ctx, role, subdir):
'mkdir',
'--',
subdir,
],
)
],
)
else:
remote.run(
args=[
@ -224,10 +222,10 @@ def _make_scratch_dir(ctx, role, subdir):
'--owner={user}'.format(user=dir_owner),
'--',
subdir,
],
)
],
)
return retVal
return created_mountpoint
def _spawn_on_all_clients(ctx, refspec, tests, env, subdir, timeout=None):
@ -237,12 +235,14 @@ def _spawn_on_all_clients(ctx, refspec, tests, env, subdir, timeout=None):
See run_tests() for parameter documentation.
"""
client_generator = teuthology.all_roles_of_type(ctx.cluster, 'client')
client_generator = misc.all_roles_of_type(ctx.cluster, 'client')
client_remotes = list()
created_mountpoint = {}
for client in client_generator:
(client_remote,) = ctx.cluster.only('client.{id}'.format(id=client)).remotes.iterkeys()
client_remotes.append((client_remote, 'client.{id}'.format(id=client)))
_make_scratch_dir(ctx, "client.{id}".format(id=client), subdir)
created_mountpoint[client] = _make_scratch_dir(ctx, "client.{id}".format(id=client), subdir)
for unit in tests:
with parallel() as p:
@ -251,9 +251,9 @@ def _spawn_on_all_clients(ctx, refspec, tests, env, subdir, timeout=None):
timeout=timeout)
# cleanup the generated client directories
client_generator = teuthology.all_roles_of_type(ctx.cluster, 'client')
client_generator = misc.all_roles_of_type(ctx.cluster, 'client')
for client in client_generator:
_delete_dir(ctx, 'client.{id}'.format(id=client))
_delete_dir(ctx, 'client.{id}'.format(id=client), created_mountpoint[client])
def _run_tests(ctx, refspec, role, tests, env, subdir=None, timeout=None):
@ -274,11 +274,10 @@ def _run_tests(ctx, refspec, role, tests, env, subdir=None, timeout=None):
hours, or 'd' for days. If '0' or anything that evaluates
to False is passed, the 'timeout' command is not used.
"""
testdir = teuthology.get_testdir(ctx)
testdir = misc.get_testdir(ctx)
assert isinstance(role, basestring)
PREFIX = 'client.'
assert role.startswith(PREFIX)
id_ = role[len(PREFIX):]
assert role.startswith(CLIENT_PREFIX)
id_ = role[len(CLIENT_PREFIX):]
(remote,) = ctx.cluster.only(role).remotes.iterkeys()
mnt = os.path.join(testdir, 'mnt.{id}'.format(id=id_))
# subdir so we can remove and recreate this a lot without sudo
@ -309,12 +308,12 @@ def _run_tests(ctx, refspec, role, tests, env, subdir=None, timeout=None):
run.Raw('&&'),
'find', '-executable', '-type', 'f', '-printf', r'%P\0'.format(srcdir=srcdir),
run.Raw('>{tdir}/workunits.list'.format(tdir=testdir)),
],
)
],
)
workunits = sorted(teuthology.get_file(
remote,
'{tdir}/workunits.list'.format(tdir=testdir)).split('\0'))
workunits = sorted(misc.get_file(
remote,
'{tdir}/workunits.list'.format(tdir=testdir)).split('\0'))
assert workunits
try:
@ -336,7 +335,7 @@ def _run_tests(ctx, refspec, role, tests, env, subdir=None, timeout=None):
run.Raw('CEPH_REF={ref}'.format(ref=refspec)),
run.Raw('TESTDIR="{tdir}"'.format(tdir=testdir)),
run.Raw('CEPH_ID="{id}"'.format(id=id_)),
]
]
if env is not None:
for var, val in env.iteritems():
quoted_val = pipes.quote(val)
@ -352,21 +351,21 @@ def _run_tests(ctx, refspec, role, tests, env, subdir=None, timeout=None):
'{srcdir}/{workunit}'.format(
srcdir=srcdir,
workunit=workunit,
),
])
),
])
remote.run(
logger=log.getChild(role),
args=args,
)
)
remote.run(
logger=log.getChild(role),
args=['sudo', 'rm', '-rf', '--', scratch_tmp],
)
)
finally:
log.info('Stopping %s on %s...', spec, role)
log.info('Stopping %s on %s...', tests, role)
remote.run(
logger=log.getChild(role),
args=[
'rm', '-rf', '--', '{tdir}/workunits.list'.format(tdir=testdir), srcdir,
],
)
],
)