From 9cf37080fc0964b4d8a44eefb05f06e8229b916b Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 23 Dec 2015 13:57:02 +0000 Subject: [PATCH] tasks/cephfs: test sharded cephfs-data-scan Signed-off-by: John Spray --- tasks/cephfs/filesystem.py | 29 ++++++++++++++++++++++++++-- tasks/cephfs/test_data_scan.py | 35 +++++++++++++++++++++++++++++----- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/tasks/cephfs/filesystem.py b/tasks/cephfs/filesystem.py index 368f2c476a3..d32d85e6f05 100644 --- a/tasks/cephfs/filesystem.py +++ b/tasks/cephfs/filesystem.py @@ -2,6 +2,7 @@ from StringIO import StringIO import json import logging +from gevent import Greenlet import os import time import datetime @@ -752,8 +753,32 @@ class Filesystem(object): """ return self._run_tool("cephfs-table-tool", args, None, quiet) - def data_scan(self, args, quiet=False): + def data_scan(self, args, quiet=False, worker_count=1): """ Invoke cephfs-data-scan with the passed arguments, and return its stdout + + :param worker_count: if greater than 1, multiple workers will be run + in parallel and the return value will be None """ - return self._run_tool("cephfs-data-scan", args, None, quiet) + + workers = [] + + for n in range(0, worker_count): + if worker_count > 1: + # data-scan args first token is a command, followed by args to it. + # insert worker arguments after the command. + cmd = args[0] + worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:] + else: + worker_args = args + + workers.append(Greenlet.spawn(lambda wargs=worker_args: + self._run_tool("cephfs-data-scan", wargs, None, quiet))) + + for w in workers: + w.get() + + if worker_count == 1: + return workers[0].value + else: + return None diff --git a/tasks/cephfs/test_data_scan.py b/tasks/cephfs/test_data_scan.py index 47b9651cec5..e92447434a7 100644 --- a/tasks/cephfs/test_data_scan.py +++ b/tasks/cephfs/test_data_scan.py @@ -10,8 +10,7 @@ import traceback from collections import namedtuple from teuthology.orchestra.run import CommandFailedError -from tasks.cephfs.cephfs_test_case import CephFSTestCase - +from tasks.cephfs.cephfs_test_case import CephFSTestCase, long_running log = logging.getLogger(__name__) @@ -212,6 +211,28 @@ class StripedStashedLayout(Workload): return self._errors +class ManyFilesWorkload(Workload): + def __init__(self, filesystem, mount, file_count): + super(ManyFilesWorkload, self).__init__(filesystem, mount) + self.file_count = file_count + + def write(self): + self._mount.run_shell(["mkdir", "subdir"]) + for n in range(0, self.file_count): + self._mount.write_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024) + + def validate(self): + for n in range(0, self.file_count): + try: + self._mount.validate_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024) + except CommandFailedError as e: + self._errors.append( + ValidationError("File {0}: {1}".format(n, e), traceback.format_exc(3)) + ) + + return self._errors + + class MovedDir(Workload): def write(self): # Create a nested dir that we will then move. Two files with two different @@ -289,7 +310,7 @@ class TestDataScan(CephFSTestCase): mds_map = self.fs.get_mds_map() return rank in mds_map['damaged'] - def _rebuild_metadata(self, workload): + def _rebuild_metadata(self, workload, workers=1): """ That when all objects in metadata pool are removed, we can rebuild a metadata pool based on the contents of a data pool, and a client can see and read our files. @@ -336,8 +357,8 @@ class TestDataScan(CephFSTestCase): self.fs.journal_tool(["journal", "reset"]) self.fs.journal_tool(["journal", "reset", "--force"]) self.fs.data_scan(["init"]) - self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()]) - self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()]) + self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()], worker_count=workers) + self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()], worker_count=workers) # Mark the MDS repaired self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0') @@ -470,3 +491,7 @@ class TestDataScan(CephFSTestCase): frag_obj_id = "{0:x}.00000000".format(dir_ino) keys = self._dirfrag_keys(frag_obj_id) self.assertListEqual(sorted(keys), sorted(["%s_head" % f for f in file_names])) + + @long_running + def test_parallel_execution(self): + self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)