Merge pull request #776 from ceph/wip-sharded-scan

tasks/cephfs: test sharded cephfs-data-scan
This commit is contained in:
Gregory Farnum 2016-02-18 06:46:13 -08:00
commit 73e02e8e87
2 changed files with 57 additions and 7 deletions

View File

@ -2,6 +2,7 @@
from StringIO import StringIO
import json
import logging
from gevent import Greenlet
import os
import time
import datetime
@ -752,8 +753,32 @@ class Filesystem(object):
"""
return self._run_tool("cephfs-table-tool", args, None, quiet)
def data_scan(self, args, quiet=False):
def data_scan(self, args, quiet=False, worker_count=1):
"""
Invoke cephfs-data-scan with the passed arguments, and return its stdout
:param worker_count: if greater than 1, multiple workers will be run
in parallel and the return value will be None
"""
return self._run_tool("cephfs-data-scan", args, None, quiet)
workers = []
for n in range(0, worker_count):
if worker_count > 1:
# data-scan args first token is a command, followed by args to it.
# insert worker arguments after the command.
cmd = args[0]
worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
else:
worker_args = args
workers.append(Greenlet.spawn(lambda wargs=worker_args:
self._run_tool("cephfs-data-scan", wargs, None, quiet)))
for w in workers:
w.get()
if worker_count == 1:
return workers[0].value
else:
return None

View File

@ -10,8 +10,7 @@ import traceback
from collections import namedtuple
from teuthology.orchestra.run import CommandFailedError
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from tasks.cephfs.cephfs_test_case import CephFSTestCase, long_running
log = logging.getLogger(__name__)
@ -212,6 +211,28 @@ class StripedStashedLayout(Workload):
return self._errors
class ManyFilesWorkload(Workload):
def __init__(self, filesystem, mount, file_count):
super(ManyFilesWorkload, self).__init__(filesystem, mount)
self.file_count = file_count
def write(self):
self._mount.run_shell(["mkdir", "subdir"])
for n in range(0, self.file_count):
self._mount.write_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
def validate(self):
for n in range(0, self.file_count):
try:
self._mount.validate_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
except CommandFailedError as e:
self._errors.append(
ValidationError("File {0}: {1}".format(n, e), traceback.format_exc(3))
)
return self._errors
class MovedDir(Workload):
def write(self):
# Create a nested dir that we will then move. Two files with two different
@ -289,7 +310,7 @@ class TestDataScan(CephFSTestCase):
mds_map = self.fs.get_mds_map()
return rank in mds_map['damaged']
def _rebuild_metadata(self, workload):
def _rebuild_metadata(self, workload, workers=1):
"""
That when all objects in metadata pool are removed, we can rebuild a metadata pool
based on the contents of a data pool, and a client can see and read our files.
@ -336,8 +357,8 @@ class TestDataScan(CephFSTestCase):
self.fs.journal_tool(["journal", "reset"])
self.fs.journal_tool(["journal", "reset", "--force"])
self.fs.data_scan(["init"])
self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()])
self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()])
self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()], worker_count=workers)
self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()], worker_count=workers)
# Mark the MDS repaired
self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0')
@ -470,3 +491,7 @@ class TestDataScan(CephFSTestCase):
frag_obj_id = "{0:x}.00000000".format(dir_ino)
keys = self._dirfrag_keys(frag_obj_id)
self.assertListEqual(sorted(keys), sorted(["%s_head" % f for f in file_names]))
@long_running
def test_parallel_execution(self):
self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)