Merge pull request #259 from ceph/wip-journal-tool

mds_journal_migration task
This commit is contained in:
John Spray 2014-05-28 14:43:34 +01:00
commit 3bf5338cc0
11 changed files with 216 additions and 20 deletions

View File

@ -13,6 +13,10 @@ def run_one_task(taskname, **kwargs):
subtask = 'task'
if '.' in taskname:
(submod, subtask) = taskname.rsplit('.', 1)
# Teuthology configs may refer to modules like ceph_deploy as ceph-deploy
submod = submod.replace('-', '_')
parent = __import__('teuthology.task', globals(), locals(), [submod], 0)
try:
mod = getattr(parent, submod)

View File

@ -18,6 +18,8 @@ from teuthology import contextutil
from ..orchestra import run
import ceph_client as cclient
DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
log = logging.getLogger(__name__)
class DaemonState(object):
@ -409,6 +411,29 @@ def make_admin_daemon_dir(ctx, remote):
],
)
def write_conf(ctx, conf_path=DEFAULT_CONF_PATH):
conf_fp = StringIO()
ctx.ceph.conf.write(conf_fp)
conf_fp.seek(0)
writes = ctx.cluster.run(
args=[
'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
'sudo', 'python',
'-c',
'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
conf_path,
run.Raw('&&'),
'sudo', 'chmod', '0644', conf_path,
],
stdin=run.PIPE,
wait=False)
log.warn("writes: ")
teuthology.feed_many_stdins_and_close(conf_fp, writes)
run.wait(writes)
@contextlib.contextmanager
def cluster(ctx, config):
"""
@ -576,26 +601,8 @@ def cluster(ctx, config):
conf['global']['fsid'] = fsid
log.info('Writing ceph.conf for FSID %s...' % fsid)
conf_path = config.get('conf_path', '/etc/ceph/ceph.conf')
conf_fp = StringIO()
conf.write(conf_fp)
conf_fp.seek(0)
writes = ctx.cluster.run(
args=[
'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
'sudo', 'python',
'-c',
'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
conf_path,
run.Raw('&&'),
'sudo', 'chmod', '0644', conf_path,
],
stdin=run.PIPE,
wait=False,
)
teuthology.feed_many_stdins_and_close(conf_fp, writes)
run.wait(writes)
conf_path = config.get('conf_path', DEFAULT_CONF_PATH)
write_conf(ctx, conf_path)
log.info('Creating admin key on %s...' % firstmon)
ctx.cluster.only(firstmon).run(

View File

@ -0,0 +1,185 @@
import contextlib
import json
import logging
from StringIO import StringIO
import os
import ceph_manager
from teuthology import misc
from teuthology.task.ceph import write_conf
from teuthology.task.ceph_fuse import task as ceph_fuse_ctx
log = logging.getLogger(__name__)
JOURNAL_FORMAT_LEGACY = 0
JOURNAL_FORMAT_RESILIENT = 1
class Filesystem(object):
"""
This object is for driving a CephFS filesystem.
Limitations:
* Assume a single filesystem+cluster
* Assume a single MDS
"""
def __init__(self, ctx, config):
self._ctx = ctx
self._config = config
mds_list = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
if len(mds_list) != 1:
# Require exactly one MDS, the code path for creation failure when
# a standby is available is different
raise RuntimeError("This task requires exactly one MDS")
self.mds_id = mds_list[0]
(mds_remote,) = ctx.cluster.only('mds.{_id}'.format(_id=self.mds_id)).remotes.iterkeys()
manager = ceph_manager.CephManager(
mds_remote, ctx=ctx, logger=log.getChild('ceph_manager'),
)
self.mds_manager = manager
client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
self.client_id = client_list[0]
self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
self.test_files = ['a', 'b', 'c']
def mds_stop(self):
mds = self._ctx.daemons.get_daemon('mds', self.mds_id)
mds.stop()
def mds_restart(self):
mds = self._ctx.daemons.get_daemon('mds', self.mds_id)
mds.restart()
def newfs(self):
log.info("Creating new filesystem")
self.mds_stop()
data_pool_id = self.mds_manager.get_pool_num("data")
md_pool_id = self.mds_manager.get_pool_num("metadata")
self.mds_manager.raw_cluster_cmd_result('mds', 'newfs',
md_pool_id.__str__(), data_pool_id.__str__(),
'--yes-i-really-mean-it')
@property
def _mount_path(self):
return os.path.join(misc.get_testdir(self._ctx), 'mnt.{0}'.format(self.client_id))
def create_files(self):
for suffix in self.test_files:
log.info("Creating file {0}".format(suffix))
self.client_remote.run(args=[
'sudo', 'touch', os.path.join(self._mount_path, suffix)
])
def check_files(self):
"""
This will raise a CommandFailedException if expected files are not present
"""
for suffix in self.test_files:
log.info("Checking file {0}".format(suffix))
r = self.client_remote.run(args=[
'sudo', 'ls', os.path.join(self._mount_path, suffix)
], check_status=False)
if r.exitstatus != 0:
raise RuntimeError("Expected file {0} not found".format(suffix))
def get_metadata_object(self, object_type, object_id):
"""
Retrieve an object from the metadata pool, pass it through
ceph-dencoder to dump it to JSON, and return the decoded object.
"""
temp_bin_path = '/tmp/out.bin'
self.client_remote.run(args=[
'sudo', 'rados', '-p', 'metadata', 'get', object_id, temp_bin_path
])
stdout = StringIO()
self.client_remote.run(args=[
'sudo', 'ceph-dencoder', 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
], stdout=stdout)
dump_json = stdout.getvalue().strip()
try:
dump = json.loads(dump_json)
except (TypeError, ValueError):
log.error("Failed to decode JSON: '{0}'".format(dump_json))
raise
return dump
def get_journal_version(self):
"""
Read the JournalPointer and Journal::Header objects to learn the version of
encoding in use.
"""
journal_pointer_object = '400.00000000'
journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
journal_ino = journal_pointer_dump['journal_pointer']['front']
journal_header_object = "{0:x}.00000000".format(journal_ino)
journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
version = journal_header_dump['journal_header']['stream_format']
log.info("Read journal version {0}".format(version))
return version
@contextlib.contextmanager
def task(ctx, config):
"""
Given a Ceph cluster has already been set up, exercise the migration
of the CephFS journal from an older format to the latest format. On
successful completion the filesystem will be running with a journal
in the new format.
"""
fs = Filesystem(ctx, config)
old_journal_version = JOURNAL_FORMAT_LEGACY
new_journal_version = JOURNAL_FORMAT_RESILIENT
# Set config so that journal will be created in older format
if not hasattr(ctx, 'ceph'):
raise RuntimeError("This task must be nested in 'ceph' task")
if 'mds' not in ctx.ceph.conf:
ctx.ceph.conf['mds'] = {}
ctx.ceph.conf['mds']['mds journal format'] = old_journal_version
write_conf(ctx) # XXX because we don't have the ceph task's config object, if they
# used a different config path this won't work.
# Create a filesystem using the older journal format.
fs.mds_stop()
fs.newfs()
fs.mds_restart()
# Do some client work so that the log is populated with something.
with ceph_fuse_ctx(ctx, None):
fs.create_files()
fs.check_files() # sanity, this should always pass
# Modify the ceph.conf to ask the MDS to use the new journal format.
ctx.ceph.conf['mds']['mds journal format'] = new_journal_version
write_conf(ctx)
# Restart the MDS.
fs.mds_restart()
# Check that files created in the initial client workload are still visible
# in a client mount.
with ceph_fuse_ctx(ctx, None):
fs.check_files()
# Verify that the journal really has been rewritten.
journal_version = fs.get_journal_version()
if journal_version != new_journal_version:
raise RuntimeError("Journal was not upgraded, version should be {0} but is {1}".format(
new_journal_version, journal_version()
))
yield