mirror of
https://github.com/ceph/ceph
synced 2025-01-01 16:42:29 +00:00
qa: reduce dependence on teuthology role list for mds
It's not yet possible to completely remove the dependency on mds_ids/mds_daemons in the CephFS tests but this commit reduces it enough for most code paths to work with cephadm. The main change here is use of CephManager.do_rados, with some improvements. Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
parent
24bb1aa31b
commit
5292e88201
@ -10,8 +10,7 @@ import errno
|
||||
import random
|
||||
import traceback
|
||||
|
||||
from io import BytesIO
|
||||
from io import StringIO
|
||||
from io import BytesIO, StringIO
|
||||
from errno import EBUSY
|
||||
|
||||
from teuthology.exceptions import CommandFailedError
|
||||
@ -279,17 +278,18 @@ class MDSCluster(CephCluster):
|
||||
a parent of Filesystem. The correct way to use MDSCluster going forward is
|
||||
as a separate instance outside of your (multiple) Filesystem instances.
|
||||
"""
|
||||
|
||||
def __init__(self, ctx):
|
||||
super(MDSCluster, self).__init__(ctx)
|
||||
|
||||
self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
|
||||
@property
|
||||
def mds_ids(self):
|
||||
# do this dynamically because the list of ids may change periodically with cephadm
|
||||
return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
|
||||
|
||||
if len(self.mds_ids) == 0:
|
||||
raise RuntimeError("This task requires at least one MDS")
|
||||
|
||||
if hasattr(self._ctx, "daemons"):
|
||||
# Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
|
||||
self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
|
||||
@property
|
||||
def mds_daemons(self):
|
||||
return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
|
||||
|
||||
def _one_or_all(self, mds_id, cb, in_parallel=True):
|
||||
"""
|
||||
@ -304,6 +304,7 @@ class MDSCluster(CephCluster):
|
||||
:param cb: Callback taking single argument of MDS daemon name
|
||||
:param in_parallel: whether to invoke callbacks concurrently (else one after the other)
|
||||
"""
|
||||
|
||||
if mds_id is None:
|
||||
if in_parallel:
|
||||
with parallel() as p:
|
||||
@ -1050,49 +1051,46 @@ class Filesystem(MDSCluster):
|
||||
|
||||
status = self.status()
|
||||
|
||||
def put_metadata_object_raw(self, object_id, infile):
|
||||
"""
|
||||
Save an object to the metadata pool
|
||||
"""
|
||||
temp_bin_path = infile
|
||||
self.client_remote.run(args=[
|
||||
'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
|
||||
])
|
||||
def dencoder(self, obj_type, obj_blob):
|
||||
args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
|
||||
p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
|
||||
return p.stdout.getvalue()
|
||||
|
||||
def get_metadata_object_raw(self, object_id):
|
||||
def rados(self, *args, **kwargs):
|
||||
"""
|
||||
Retrieve an object from the metadata pool and store it in a file.
|
||||
Callout to rados CLI.
|
||||
"""
|
||||
temp_bin_path = '/tmp/' + object_id + '.bin'
|
||||
|
||||
self.client_remote.run(args=[
|
||||
'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
|
||||
])
|
||||
return self.mon_manager.do_rados(*args, **kwargs)
|
||||
|
||||
return temp_bin_path
|
||||
def radosm(self, *args, **kwargs):
|
||||
"""
|
||||
Interact with the metadata pool via rados CLI.
|
||||
"""
|
||||
|
||||
return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
|
||||
|
||||
def radosmo(self, *args, stdout=BytesIO(), **kwargs):
|
||||
"""
|
||||
Interact with the metadata pool via rados CLI. Get the stdout.
|
||||
"""
|
||||
|
||||
return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
|
||||
|
||||
def get_metadata_object(self, object_type, object_id):
|
||||
"""
|
||||
Retrieve an object from the metadata pool, pass it through
|
||||
ceph-dencoder to dump it to JSON, and return the decoded object.
|
||||
"""
|
||||
temp_bin_path = '/tmp/out.bin'
|
||||
|
||||
self.client_remote.run(args=[
|
||||
'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
|
||||
])
|
||||
|
||||
dump_json = self.client_remote.sh([
|
||||
'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
|
||||
]).strip()
|
||||
o = self.radosmo(['get', object_id, '-'])
|
||||
j = self.dencoder(object_type, o)
|
||||
try:
|
||||
dump = json.loads(dump_json)
|
||||
return json.loads(j)
|
||||
except (TypeError, ValueError):
|
||||
log.error("Failed to decode JSON: '{0}'".format(dump_json))
|
||||
log.error("Failed to decode JSON: '{0}'".format(j))
|
||||
raise
|
||||
|
||||
return dump
|
||||
|
||||
def get_journal_version(self):
|
||||
"""
|
||||
Read the JournalPointer and Journal::Header objects to learn the version of
|
||||
@ -1215,34 +1213,21 @@ class Filesystem(MDSCluster):
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
def _read_data_xattr(self, ino_no, xattr_name, type, pool):
|
||||
mds_id = self.mds_ids[0]
|
||||
remote = self.mds_daemons[mds_id].remote
|
||||
def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
|
||||
if pool is None:
|
||||
pool = self.get_data_pool_name()
|
||||
|
||||
obj_name = "{0:x}.00000000".format(ino_no)
|
||||
|
||||
args = [
|
||||
os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
|
||||
]
|
||||
args = ["getxattr", obj_name, xattr_name]
|
||||
try:
|
||||
proc = remote.run(args=args, stdout=BytesIO())
|
||||
proc = self.rados(args, pool=pool, stdout=BytesIO())
|
||||
except CommandFailedError as e:
|
||||
log.error(e.__str__())
|
||||
raise ObjectNotFound(obj_name)
|
||||
|
||||
data = proc.stdout.getvalue()
|
||||
dump = remote.sh(
|
||||
[os.path.join(self._prefix, "ceph-dencoder"),
|
||||
"type", type,
|
||||
"import", "-",
|
||||
"decode", "dump_json"],
|
||||
stdin=data,
|
||||
stdout=StringIO()
|
||||
)
|
||||
|
||||
return json.loads(dump.strip())
|
||||
obj_blob = proc.stdout.getvalue()
|
||||
return json.loads(self.dencoder(obj_type, obj_blob).strip())
|
||||
|
||||
def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
|
||||
"""
|
||||
@ -1255,16 +1240,12 @@ class Filesystem(MDSCluster):
|
||||
:param pool: name of data pool or None to use primary data pool
|
||||
:return: None
|
||||
"""
|
||||
remote = self.mds_daemons[self.mds_ids[0]].remote
|
||||
if pool is None:
|
||||
pool = self.get_data_pool_name()
|
||||
|
||||
obj_name = "{0:x}.00000000".format(ino_no)
|
||||
args = [
|
||||
os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
|
||||
obj_name, xattr_name, data
|
||||
]
|
||||
remote.sh(args)
|
||||
args = ["setxattr", obj_name, xattr_name, data]
|
||||
self.rados(args, pool=pool)
|
||||
|
||||
def read_backtrace(self, ino_no, pool=None):
|
||||
"""
|
||||
@ -1322,7 +1303,7 @@ class Filesystem(MDSCluster):
|
||||
for n in range(0, ((size - 1) // stripe_size) + 1)
|
||||
]
|
||||
|
||||
exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
|
||||
exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
|
||||
|
||||
return want_objects, exist_objects
|
||||
|
||||
@ -1358,43 +1339,12 @@ class Filesystem(MDSCluster):
|
||||
|
||||
def dirfrag_exists(self, ino, frag):
|
||||
try:
|
||||
self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
|
||||
self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
|
||||
except CommandFailedError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def rados(self, args, pool=None, namespace=None, stdin_data=None,
|
||||
stdin_file=None,
|
||||
stdout_data=None):
|
||||
"""
|
||||
Call into the `rados` CLI from an MDS
|
||||
"""
|
||||
|
||||
if pool is None:
|
||||
pool = self.get_metadata_pool_name()
|
||||
|
||||
# Doesn't matter which MDS we use to run rados commands, they all
|
||||
# have access to the pools
|
||||
mds_id = self.mds_ids[0]
|
||||
remote = self.mds_daemons[mds_id].remote
|
||||
|
||||
# NB we could alternatively use librados pybindings for this, but it's a one-liner
|
||||
# using the `rados` CLI
|
||||
args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
|
||||
(["--namespace", namespace] if namespace else []) +
|
||||
args)
|
||||
|
||||
if stdin_file is not None:
|
||||
args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
|
||||
if stdout_data is None:
|
||||
stdout_data = StringIO()
|
||||
|
||||
p = remote.run(args=args,
|
||||
stdin=stdin_data,
|
||||
stdout=stdout_data)
|
||||
return p.stdout.getvalue().strip()
|
||||
|
||||
def list_dirfrag(self, dir_ino):
|
||||
"""
|
||||
Read the named object and return the list of omap keys
|
||||
@ -1405,21 +1355,22 @@ class Filesystem(MDSCluster):
|
||||
dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
|
||||
|
||||
try:
|
||||
key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
|
||||
key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
|
||||
except CommandFailedError as e:
|
||||
log.error(e.__str__())
|
||||
raise ObjectNotFound(dirfrag_obj_name)
|
||||
|
||||
return key_list_str.split("\n") if key_list_str else []
|
||||
return key_list_str.strip().split("\n") if key_list_str else []
|
||||
|
||||
def get_meta_of_fs_file(self, dir_ino, obj_name, out):
|
||||
"""
|
||||
get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
|
||||
warning : The splitting of directory is not considered here.
|
||||
"""
|
||||
|
||||
dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
|
||||
try:
|
||||
self.rados(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
|
||||
self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
|
||||
except CommandFailedError as e:
|
||||
log.error(e.__str__())
|
||||
raise ObjectNotFound(dir_ino)
|
||||
@ -1432,10 +1383,10 @@ class Filesystem(MDSCluster):
|
||||
This O(N) with the number of objects in the pool, so only suitable
|
||||
for use on toy test filesystems.
|
||||
"""
|
||||
all_objects = self.rados(["ls"]).split("\n")
|
||||
all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
|
||||
matching_objects = [o for o in all_objects if o.startswith(prefix)]
|
||||
for o in matching_objects:
|
||||
self.rados(["rm", o])
|
||||
self.radosm(["rm", o])
|
||||
|
||||
def erase_mds_objects(self, rank):
|
||||
"""
|
||||
@ -1496,8 +1447,7 @@ class Filesystem(MDSCluster):
|
||||
it'll definitely have keys with perms to access cephfs metadata pool. This is public
|
||||
so that tests can use this remote to go get locally written output files from the tools.
|
||||
"""
|
||||
mds_id = self.mds_ids[0]
|
||||
return self.mds_daemons[mds_id].remote
|
||||
return self.mon_manager.controller
|
||||
|
||||
def journal_tool(self, args, rank, quiet=False):
|
||||
"""
|
||||
|
@ -40,7 +40,7 @@ class TestMDSAutoRepair(CephFSTestCase):
|
||||
self.fs.wait_for_daemons()
|
||||
|
||||
# remove testdir1's backtrace
|
||||
self.fs.rados(["rmxattr", dir_objname, "parent"])
|
||||
self.fs.radosm(["rmxattr", dir_objname, "parent"])
|
||||
|
||||
# readdir (fetch dirfrag) should fix testdir1's backtrace
|
||||
self.mount_a.mount_wait()
|
||||
@ -50,7 +50,7 @@ class TestMDSAutoRepair(CephFSTestCase):
|
||||
self.fs.mds_asok(['flush', 'journal'])
|
||||
|
||||
# check if backtrace exists
|
||||
self.fs.rados(["getxattr", dir_objname, "parent"])
|
||||
self.fs.radosm(["getxattr", dir_objname, "parent"])
|
||||
|
||||
def test_mds_readonly(self):
|
||||
"""
|
||||
|
@ -1,3 +1,4 @@
|
||||
from io import BytesIO, StringIO
|
||||
import json
|
||||
import logging
|
||||
import errno
|
||||
@ -53,7 +54,7 @@ class TestDamage(CephFSTestCase):
|
||||
|
||||
self.fs.fail()
|
||||
|
||||
self.fs.rados(['export', '/tmp/metadata.bin'])
|
||||
serialized = self.fs.radosmo(['export', '-'])
|
||||
|
||||
def is_ignored(obj_id, dentry=None):
|
||||
"""
|
||||
@ -83,13 +84,13 @@ class TestDamage(CephFSTestCase):
|
||||
# None means ls will do an "ls -R" in hope of seeing some errors
|
||||
return None
|
||||
|
||||
objects = self.fs.rados(["ls"]).split("\n")
|
||||
objects = self.fs.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
|
||||
objects = [o for o in objects if not is_ignored(o)]
|
||||
|
||||
# Find all objects with an OMAP header
|
||||
omap_header_objs = []
|
||||
for o in objects:
|
||||
header = self.fs.rados(["getomapheader", o])
|
||||
header = self.fs.radosmo(["getomapheader", o], stdout=StringIO())
|
||||
# The rados CLI wraps the header output in a hex-printed style
|
||||
header_bytes = int(re.match("header \((.+) bytes\)", header).group(1))
|
||||
if header_bytes > 0:
|
||||
@ -98,16 +99,16 @@ class TestDamage(CephFSTestCase):
|
||||
# Find all OMAP key/vals
|
||||
omap_keys = []
|
||||
for o in objects:
|
||||
keys_str = self.fs.rados(["listomapkeys", o])
|
||||
keys_str = self.fs.radosmo(["listomapkeys", o], stdout=StringIO())
|
||||
if keys_str:
|
||||
for key in keys_str.split("\n"):
|
||||
for key in keys_str.strip().split("\n"):
|
||||
if not is_ignored(o, key):
|
||||
omap_keys.append((o, key))
|
||||
|
||||
# Find objects that have data in their bodies
|
||||
data_objects = []
|
||||
for obj_id in objects:
|
||||
stat_out = self.fs.rados(["stat", obj_id])
|
||||
stat_out = self.fs.radosmo(["stat", obj_id], stdout=StringIO())
|
||||
size = int(re.match(".+, size (.+)$", stat_out).group(1))
|
||||
if size > 0:
|
||||
data_objects.append(obj_id)
|
||||
@ -156,7 +157,7 @@ class TestDamage(CephFSTestCase):
|
||||
mutations.append(MetadataMutation(
|
||||
o,
|
||||
"Delete {0}".format(o),
|
||||
lambda o=o: self.fs.rados(["rm", o]),
|
||||
lambda o=o: self.fs.radosm(["rm", o]),
|
||||
expectation
|
||||
))
|
||||
|
||||
@ -167,14 +168,14 @@ class TestDamage(CephFSTestCase):
|
||||
mutations.append(MetadataMutation(
|
||||
obj_id,
|
||||
"Corrupt {0}".format(obj_id),
|
||||
lambda o=obj_id: self.fs.rados(["put", o, "-"], stdin_data=junk),
|
||||
lambda o=obj_id: self.fs.radosm(["put", o, "-"], stdin=StringIO(junk)),
|
||||
READONLY
|
||||
))
|
||||
else:
|
||||
mutations.append(MetadataMutation(
|
||||
obj_id,
|
||||
"Corrupt {0}".format(obj_id),
|
||||
lambda o=obj_id: self.fs.rados(["put", o, "-"], stdin_data=junk),
|
||||
lambda o=obj_id: self.fs.radosm(["put", o, "-"], stdin=StringIO(junk)),
|
||||
DAMAGED_ON_START
|
||||
))
|
||||
|
||||
@ -191,7 +192,7 @@ class TestDamage(CephFSTestCase):
|
||||
MetadataMutation(
|
||||
o,
|
||||
"Truncate {0}".format(o),
|
||||
lambda o=o: self.fs.rados(["truncate", o, "0"]),
|
||||
lambda o=o: self.fs.radosm(["truncate", o, "0"]),
|
||||
expectation
|
||||
))
|
||||
|
||||
@ -207,7 +208,7 @@ class TestDamage(CephFSTestCase):
|
||||
MetadataMutation(
|
||||
o,
|
||||
"Corrupt omap key {0}:{1}".format(o, k),
|
||||
lambda o=o,k=k: self.fs.rados(["setomapval", o, k, junk]),
|
||||
lambda o=o,k=k: self.fs.radosm(["setomapval", o, k, junk]),
|
||||
expectation,
|
||||
get_path(o, k)
|
||||
)
|
||||
@ -229,7 +230,7 @@ class TestDamage(CephFSTestCase):
|
||||
MetadataMutation(
|
||||
o,
|
||||
"Corrupt omap header on {0}".format(o),
|
||||
lambda o=o: self.fs.rados(["setomapheader", o, junk]),
|
||||
lambda o=o: self.fs.radosm(["setomapheader", o, junk]),
|
||||
expectation
|
||||
)
|
||||
)
|
||||
@ -245,7 +246,7 @@ class TestDamage(CephFSTestCase):
|
||||
self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0')
|
||||
|
||||
# Reset RADOS pool state
|
||||
self.fs.rados(['import', '/tmp/metadata.bin'])
|
||||
self.fs.radosm(['import', '-'], stdin=BytesIO(serialized))
|
||||
|
||||
# Inject the mutation
|
||||
mutation.mutate_fn()
|
||||
@ -391,7 +392,7 @@ class TestDamage(CephFSTestCase):
|
||||
# Corrupt a dentry
|
||||
junk = "deadbeef" * 10
|
||||
dirfrag_obj = "{0:x}.00000000".format(subdir_ino)
|
||||
self.fs.rados(["setomapval", dirfrag_obj, "file_to_be_damaged_head", junk])
|
||||
self.fs.radosm(["setomapval", dirfrag_obj, "file_to_be_damaged_head", junk])
|
||||
|
||||
# Start up and try to list it
|
||||
self.fs.set_joinable()
|
||||
@ -461,7 +462,7 @@ class TestDamage(CephFSTestCase):
|
||||
self.assertEqual(nfiles, "1")
|
||||
|
||||
# Clean up the omap object
|
||||
self.fs.rados(["setomapval", dirfrag_obj, "file_to_be_damaged_head", junk])
|
||||
self.fs.radosm(["setomapval", dirfrag_obj, "file_to_be_damaged_head", junk])
|
||||
|
||||
# Clean up the damagetable entry
|
||||
self.fs.mon_manager.raw_cluster_cmd(
|
||||
@ -532,7 +533,7 @@ class TestDamage(CephFSTestCase):
|
||||
|
||||
# Case 2: missing dirfrag for the target inode
|
||||
|
||||
self.fs.rados(["rm", "{0:x}.00000000".format(dir2_ino)])
|
||||
self.fs.radosm(["rm", "{0:x}.00000000".format(dir2_ino)])
|
||||
|
||||
# Check that touching the hardlink gives EIO
|
||||
ran = self.mount_a.run_shell(["stat", "testdir/hardlink2"], wait=False)
|
||||
|
@ -9,7 +9,7 @@ import os
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from io import BytesIO
|
||||
from io import BytesIO, StringIO
|
||||
from collections import namedtuple, defaultdict
|
||||
from textwrap import dedent
|
||||
|
||||
@ -62,9 +62,8 @@ class Workload(object):
|
||||
default just wipe everything in the metadata pool
|
||||
"""
|
||||
# Delete every object in the metadata pool
|
||||
objects = self._filesystem.rados(["ls"]).split("\n")
|
||||
for o in objects:
|
||||
self._filesystem.rados(["rm", o])
|
||||
pool = self._filesystem.get_metadata_pool_name()
|
||||
self._filesystem.rados(["purge", pool, '--yes-i-really-really-mean-it'])
|
||||
|
||||
def flush(self):
|
||||
"""
|
||||
@ -342,7 +341,7 @@ class TestDataScan(CephFSTestCase):
|
||||
# only understands how to rebuild metadata under rank 0
|
||||
self.fs.reset()
|
||||
|
||||
self.fs.set_joinable()
|
||||
self.fs.set_joinable() # redundant with reset
|
||||
|
||||
def get_state(mds_id):
|
||||
info = self.mds_cluster.get_mds_info(mds_id)
|
||||
@ -414,9 +413,9 @@ class TestDataScan(CephFSTestCase):
|
||||
self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a))
|
||||
|
||||
def _dirfrag_keys(self, object_id):
|
||||
keys_str = self.fs.rados(["listomapkeys", object_id])
|
||||
keys_str = self.fs.radosmo(["listomapkeys", object_id], stdout=StringIO())
|
||||
if keys_str:
|
||||
return keys_str.split("\n")
|
||||
return keys_str.strip().split("\n")
|
||||
else:
|
||||
return []
|
||||
|
||||
@ -466,7 +465,7 @@ class TestDataScan(CephFSTestCase):
|
||||
victim_key = keys[7] # arbitrary choice
|
||||
log.info("victim_key={0}".format(victim_key))
|
||||
victim_dentry = victim_key.split("_head")[0]
|
||||
self.fs.rados(["rmomapkey", frag_obj_id, victim_key])
|
||||
self.fs.radosm(["rmomapkey", frag_obj_id, victim_key])
|
||||
|
||||
# Start filesystem back up, observe that the file appears to be gone in an `ls`
|
||||
self.fs.set_joinable()
|
||||
@ -575,15 +574,14 @@ class TestDataScan(CephFSTestCase):
|
||||
# introduce duplicated primary link
|
||||
file1_key = "file1_head"
|
||||
self.assertIn(file1_key, dirfrag1_keys)
|
||||
file1_omap_data = self.fs.rados(["getomapval", dirfrag1_oid, file1_key, '-'],
|
||||
stdout_data=BytesIO())
|
||||
self.fs.rados(["setomapval", dirfrag2_oid, file1_key], stdin_data=file1_omap_data)
|
||||
file1_omap_data = self.fs.radosmo(["getomapval", dirfrag1_oid, file1_key, '-'])
|
||||
self.fs.radosm(["setomapval", dirfrag2_oid, file1_key], stdin=BytesIO(file1_omap_data))
|
||||
self.assertIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
|
||||
|
||||
# remove a remote link, make inode link count incorrect
|
||||
link1_key = 'link1_head'
|
||||
self.assertIn(link1_key, dirfrag1_keys)
|
||||
self.fs.rados(["rmomapkey", dirfrag1_oid, link1_key])
|
||||
self.fs.radosm(["rmomapkey", dirfrag1_oid, link1_key])
|
||||
|
||||
# increase good primary link's version
|
||||
self.mount_a.run_shell(["touch", "testdir1/file1"])
|
||||
@ -639,8 +637,8 @@ class TestDataScan(CephFSTestCase):
|
||||
self.fs.mds_asok(["flush", "journal"], mds1_id)
|
||||
self.fs.fail()
|
||||
|
||||
self.fs.rados(["rm", "mds0_inotable"])
|
||||
self.fs.rados(["rm", "mds1_inotable"])
|
||||
self.fs.radosm(["rm", "mds0_inotable"])
|
||||
self.fs.radosm(["rm", "mds1_inotable"])
|
||||
|
||||
self.fs.data_scan(["scan_links", "--filesystem", self.fs.name])
|
||||
|
||||
@ -676,7 +674,7 @@ class TestDataScan(CephFSTestCase):
|
||||
for item in old_snaptable['snapserver']['snaps']:
|
||||
del item['stamp']
|
||||
|
||||
self.fs.rados(["rm", "mds_snaptable"])
|
||||
self.fs.radosm(["rm", "mds_snaptable"])
|
||||
self.fs.data_scan(["scan_links", "--filesystem", self.fs.name])
|
||||
|
||||
new_snaptable = json.loads(self.fs.table_tool([self.fs.name + ":0", "show", "snap"]))
|
||||
|
@ -32,8 +32,8 @@ class TestForwardScrub(CephFSTestCase):
|
||||
"""
|
||||
Read a ceph-encoded string from a rados xattr
|
||||
"""
|
||||
output = self.fs.rados(["getxattr", obj, attr], pool=pool,
|
||||
stdout_data=BytesIO())
|
||||
output = self.fs.mon_manager.do_rados(["getxattr", obj, attr], pool=pool,
|
||||
stdout=BytesIO()).stdout.getvalue()
|
||||
strlen = struct.unpack('i', output[0:4])[0]
|
||||
return output[4:(4 + strlen)].decode(encoding='ascii')
|
||||
|
||||
@ -150,7 +150,7 @@ class TestForwardScrub(CephFSTestCase):
|
||||
self.fs.set_ceph_conf('mds', 'mds verify scatter', False)
|
||||
self.fs.set_ceph_conf('mds', 'mds debug scatterstat', False)
|
||||
frag_obj_id = "{0:x}.00000000".format(inos["./parent/flushed"])
|
||||
self.fs.rados(["rmomapkey", frag_obj_id, "bravo_head"])
|
||||
self.fs.radosm(["rmomapkey", frag_obj_id, "bravo_head"])
|
||||
|
||||
self.fs.set_joinable()
|
||||
self.fs.wait_for_daemons()
|
||||
@ -205,7 +205,7 @@ class TestForwardScrub(CephFSTestCase):
|
||||
print("Trying to fetch inotable object: " + inotable_oid)
|
||||
|
||||
#self.fs.get_metadata_object("InoTable", "mds0_inotable")
|
||||
inotable_raw = self.fs.get_metadata_object_raw(inotable_oid)
|
||||
inotable_raw = self.fs.radosmo(['get', inotable_oid, '-'])
|
||||
inotable_dict[inotable_oid] = inotable_raw
|
||||
return inotable_dict
|
||||
|
||||
@ -250,7 +250,7 @@ class TestForwardScrub(CephFSTestCase):
|
||||
|
||||
# Revert to old inotable.
|
||||
for key, value in inotable_copy.items():
|
||||
self.fs.put_metadata_object_raw(key, value)
|
||||
self.fs.radosm(["put", key, "-"], stdin=BytesIO(value))
|
||||
|
||||
self.fs.set_joinable()
|
||||
self.fs.wait_for_daemons()
|
||||
|
@ -1,4 +1,4 @@
|
||||
|
||||
from io import StringIO
|
||||
|
||||
from tasks.cephfs.cephfs_test_case import CephFSTestCase
|
||||
from teuthology.orchestra import run
|
||||
@ -223,9 +223,9 @@ class TestFragmentation(CephFSTestCase):
|
||||
)
|
||||
# Check that the metadata pool objects for all the myriad
|
||||
# child fragments are gone
|
||||
metadata_objs = self.fs.rados(["ls"])
|
||||
metadata_objs = self.fs.radosmo(["ls"], stdout=StringIO()).strip()
|
||||
frag_objs = []
|
||||
for o in metadata_objs:
|
||||
for o in metadata_objs.split("\n"):
|
||||
if o.startswith("{0:x}.".format(dir_inode_no)):
|
||||
frag_objs.append(o)
|
||||
self.assertListEqual(frag_objs, [])
|
||||
|
@ -1,3 +1,5 @@
|
||||
from io import StringIO
|
||||
|
||||
from tasks.cephfs.cephfs_test_case import CephFSTestCase
|
||||
import json
|
||||
import logging
|
||||
@ -21,7 +23,7 @@ class TestMantle(CephFSTestCase):
|
||||
|
||||
def push_balancer(self, obj, lua_code, expect):
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('fs', 'set', self.fs.name, 'balancer', obj)
|
||||
self.fs.rados(["put", obj, "-"], stdin_data=lua_code)
|
||||
self.fs.radosm(["put", obj, "-"], stdin=StringIO(lua_code))
|
||||
with self.assert_cluster_log(failure + obj + " " + expect):
|
||||
log.info("run a " + obj + " balancer that expects=" + expect)
|
||||
|
||||
@ -58,7 +60,7 @@ class TestMantle(CephFSTestCase):
|
||||
self.start_mantle()
|
||||
lua_code = "BAL_LOG(0, \"test\")\nreturn {3, 4}"
|
||||
self.fs.mon_manager.raw_cluster_cmd_result('fs', 'set', self.fs.name, 'balancer', "valid.lua")
|
||||
self.fs.rados(["put", "valid.lua", "-"], stdin_data=lua_code)
|
||||
self.fs.radosm(["put", "valid.lua", "-"], stdin=StringIO(lua_code))
|
||||
with self.assert_cluster_log(success + "valid.lua"):
|
||||
log.info("run a valid.lua balancer")
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
from io import StringIO
|
||||
|
||||
from tasks.cephfs.fuse_mount import FuseMount
|
||||
from tasks.cephfs.cephfs_test_case import CephFSTestCase
|
||||
@ -83,9 +84,8 @@ class TestMisc(CephFSTestCase):
|
||||
self.fs.metadata_pool_name,
|
||||
self.fs.pgs_per_fs_pool.__str__())
|
||||
|
||||
dummyfile = '/etc/fstab'
|
||||
|
||||
self.fs.put_metadata_object_raw("key", dummyfile)
|
||||
# insert a garbage object
|
||||
self.fs.radosm(["put", "foo", "-"], stdin=StringIO("bar"))
|
||||
|
||||
def get_pool_df(fs, name):
|
||||
try:
|
||||
|
@ -84,7 +84,7 @@ class TestScrub2(CephFSTestCase):
|
||||
|
||||
def assertTagged(ino):
|
||||
file_obj_name = "{0:x}.00000000".format(ino)
|
||||
self.fs.rados(["getxattr", file_obj_name, "scrub_tag"])
|
||||
self.fs.radosm(["getxattr", file_obj_name, "scrub_tag"])
|
||||
|
||||
for ino in inos:
|
||||
assertTagged(ino)
|
||||
@ -95,7 +95,7 @@ class TestScrub2(CephFSTestCase):
|
||||
|
||||
for ino in inos:
|
||||
file_obj_name = "{0:x}.00000000".format(ino)
|
||||
self.fs.rados(["rmxattr", file_obj_name, "parent"])
|
||||
self.fs.radosm(["rmxattr", file_obj_name, "parent"])
|
||||
|
||||
out_json = self.fs.run_scrub(["start", "/d1/d2/d3", "recursive", "force"], 0)
|
||||
self.assertNotEqual(out_json, None)
|
||||
@ -138,7 +138,7 @@ class TestScrub2(CephFSTestCase):
|
||||
|
||||
for ino in inos:
|
||||
file_obj_name = "{0:x}.00000000".format(ino)
|
||||
self.fs.rados(["rmxattr", file_obj_name, "parent"])
|
||||
self.fs.radosm(["rmxattr", file_obj_name, "parent"])
|
||||
|
||||
out_json = self.fs.run_scrub(["start", "/d1/d2/d3", "recursive", "force"], 0)
|
||||
self.assertNotEqual(out_json, None)
|
||||
@ -161,7 +161,7 @@ class TestScrub2(CephFSTestCase):
|
||||
|
||||
for ino in inos:
|
||||
file_obj_name = "{0:x}.00000000".format(ino)
|
||||
self.fs.rados(["rmxattr", file_obj_name, "parent"])
|
||||
self.fs.radosm(["rmxattr", file_obj_name, "parent"])
|
||||
|
||||
out_json = self.fs.run_scrub(["start", "/d1/d2/d3", "recursive", "force"], 0)
|
||||
self.assertNotEqual(out_json, None)
|
||||
@ -190,7 +190,7 @@ class TestScrub2(CephFSTestCase):
|
||||
|
||||
for ino in inos:
|
||||
file_obj_name = "{0:x}.00000000".format(ino)
|
||||
self.fs.rados(["rmxattr", file_obj_name, "parent"])
|
||||
self.fs.radosm(["rmxattr", file_obj_name, "parent"])
|
||||
|
||||
out_json = self.fs.run_scrub(["start", "/d1/d2/d3", "recursive", "force"], 0)
|
||||
self.assertNotEqual(out_json, None)
|
||||
|
@ -42,7 +42,7 @@ class OpenFileTable(CephFSTestCase):
|
||||
mds0_openfiles.1 to hold the extra keys.
|
||||
"""
|
||||
|
||||
self.fs.rados(["stat", "mds0_openfiles.1"])
|
||||
self.fs.radosm(["stat", "mds0_openfiles.1"])
|
||||
|
||||
# Now close the file
|
||||
self.mount_a.kill_background(p)
|
||||
|
@ -56,10 +56,9 @@ class OverlayWorkload(object):
|
||||
Damage the filesystem pools in ways that will be interesting to recover from. By
|
||||
default just wipe everything in the metadata pool
|
||||
"""
|
||||
# Delete every object in the metadata pool
|
||||
objects = self._orig_fs.rados(["ls"]).split("\n")
|
||||
for o in objects:
|
||||
self._orig_fs.rados(["rm", o])
|
||||
|
||||
pool = self._orig_fs.get_metadata_pool_name()
|
||||
self._orig_fs.rados(["purge", pool, '--yes-i-really-really-mean-it'])
|
||||
|
||||
def flush(self):
|
||||
"""
|
||||
|
@ -1,6 +1,8 @@
|
||||
"""
|
||||
Test CephFS scrub (distinct from OSD scrub) functionality
|
||||
"""
|
||||
|
||||
from io import BytesIO
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
|
||||
@ -44,9 +46,8 @@ class Workload(CephFSTestCase):
|
||||
default just wipe everything in the metadata pool
|
||||
"""
|
||||
# Delete every object in the metadata pool
|
||||
objects = self._filesystem.rados(["ls"]).split("\n")
|
||||
for o in objects:
|
||||
self._filesystem.rados(["rm", o])
|
||||
pool = self._filesystem.get_metadata_pool_name()
|
||||
self._filesystem.rados(["purge", pool, '--yes-i-really-really-mean-it'])
|
||||
|
||||
def flush(self):
|
||||
"""
|
||||
@ -92,14 +93,11 @@ class DupInodeWorkload(Workload):
|
||||
self._mount.write_n_mb("parent/child/childfile", 6)
|
||||
|
||||
def damage(self):
|
||||
temp_bin_path = "/tmp/10000000000.00000000_omap.bin"
|
||||
self._mount.umount_wait()
|
||||
self._filesystem.mds_asok(["flush", "journal"])
|
||||
self._filesystem.fail()
|
||||
self._filesystem.rados(["getomapval", "10000000000.00000000",
|
||||
"parentfile_head", temp_bin_path])
|
||||
self._filesystem.rados(["setomapval", "10000000000.00000000",
|
||||
"shadow_head"], stdin_file=temp_bin_path)
|
||||
d = self._filesystem.radosmo(["getomapval", "10000000000.00000000", "parentfile_head", "-"])
|
||||
self._filesystem.radosm(["setomapval", "10000000000.00000000", "shadow_head"], stdin=BytesIO(d))
|
||||
self._test.config_set('mds', 'mds_hack_allow_loading_invalid_metadata', True)
|
||||
self._filesystem.set_joinable()
|
||||
self._filesystem.wait_for_daemons()
|
||||
|
@ -314,8 +314,7 @@ class TestScrubChecks(CephFSTestCase):
|
||||
self.fs.mds_stop()
|
||||
|
||||
# remove the dentry from dirfrag, cause incorrect fragstat/rstat
|
||||
self.fs.rados(["rmomapkey", dir_objname, "file_head"],
|
||||
pool=self.fs.get_metadata_pool_name())
|
||||
self.fs.radosm(["rmomapkey", dir_objname, "file_head"])
|
||||
|
||||
self.fs.mds_fail_restart()
|
||||
self.fs.wait_for_daemons()
|
||||
|
@ -913,7 +913,7 @@ ln dir_1/original dir_2/linkto
|
||||
|
||||
self.mds_cluster.mds_stop()
|
||||
self.mds_cluster.mds_fail()
|
||||
self.fs.rados(["rm", "500.00000000"])
|
||||
self.fs.radosm(["rm", "500.00000000"])
|
||||
self.mds_cluster.mds_restart()
|
||||
self.fs.wait_for_daemons()
|
||||
|
||||
|
@ -836,7 +836,7 @@ class LocalCephManager(CephManager):
|
||||
|
||||
class LocalCephCluster(CephCluster):
|
||||
def __init__(self, ctx):
|
||||
# Deliberately skip calling parent constructor
|
||||
# Deliberately skip calling CephCluster constructor
|
||||
self._ctx = ctx
|
||||
self.mon_manager = LocalCephManager()
|
||||
self._conf = defaultdict(dict)
|
||||
@ -907,9 +907,19 @@ class LocalCephCluster(CephCluster):
|
||||
|
||||
class LocalMDSCluster(LocalCephCluster, MDSCluster):
|
||||
def __init__(self, ctx):
|
||||
super(LocalMDSCluster, self).__init__(ctx)
|
||||
self.mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
|
||||
self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
|
||||
LocalCephCluster.__init__(self, ctx)
|
||||
# Deliberately skip calling MDSCluster constructor
|
||||
self._mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
|
||||
log.debug("Discovered MDS IDs: {0}".format(self._mds_ids))
|
||||
self._mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
|
||||
|
||||
@property
|
||||
def mds_ids(self):
|
||||
return self._mds_ids
|
||||
|
||||
@property
|
||||
def mds_daemons(self):
|
||||
return self._mds_daemons
|
||||
|
||||
def clear_firewall(self):
|
||||
# FIXME: unimplemented
|
||||
@ -934,10 +944,10 @@ class LocalMgrCluster(LocalCephCluster, MgrCluster):
|
||||
self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
|
||||
|
||||
|
||||
class LocalFilesystem(Filesystem, LocalMDSCluster):
|
||||
class LocalFilesystem(LocalMDSCluster, Filesystem):
|
||||
def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
|
||||
# Deliberately skip calling parent constructor
|
||||
self._ctx = ctx
|
||||
# Deliberately skip calling Filesystem constructor
|
||||
LocalMDSCluster.__init__(self, ctx)
|
||||
|
||||
self.id = None
|
||||
self.name = name
|
||||
@ -948,24 +958,8 @@ class LocalFilesystem(Filesystem, LocalMDSCluster):
|
||||
self.fs_config = fs_config
|
||||
self.ec_profile = fs_config.get('ec_profile')
|
||||
|
||||
# Hack: cheeky inspection of ceph.conf to see what MDSs exist
|
||||
self.mds_ids = set()
|
||||
for line in open("ceph.conf").readlines():
|
||||
match = re.match("^\[mds\.(.+)\]$", line)
|
||||
if match:
|
||||
self.mds_ids.add(match.group(1))
|
||||
|
||||
if not self.mds_ids:
|
||||
raise RuntimeError("No MDSs found in ceph.conf!")
|
||||
|
||||
self.mds_ids = list(self.mds_ids)
|
||||
|
||||
log.debug("Discovered MDS IDs: {0}".format(self.mds_ids))
|
||||
|
||||
self.mon_manager = LocalCephManager()
|
||||
|
||||
self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
|
||||
|
||||
self.client_remote = LocalRemote()
|
||||
|
||||
self._conf = defaultdict(dict)
|
||||
|
Loading…
Reference in New Issue
Block a user