mirror of
https://github.com/ceph/ceph
synced 2025-01-02 00:52:22 +00:00
Merge pull request #56743 from NitzanMordhai/wip-nitzan-backword-forword-dencoder-tests
suites: adding dencoder test multi versions
This commit is contained in:
commit
48db64c217
0
qa/suites/rados/encoder/%
Normal file
0
qa/suites/rados/encoder/%
Normal file
1
qa/suites/rados/encoder/.qa
Symbolic link
1
qa/suites/rados/encoder/.qa
Symbolic link
@ -0,0 +1 @@
|
||||
../.qa/
|
9
qa/suites/rados/encoder/0-start.yaml
Normal file
9
qa/suites/rados/encoder/0-start.yaml
Normal file
@ -0,0 +1,9 @@
|
||||
roles:
|
||||
- - mon.a
|
||||
- mgr.x
|
||||
- osd.0
|
||||
- client.0
|
||||
openstack:
|
||||
- volumes: # attached to each instance
|
||||
count: 4
|
||||
size: 10 # GB
|
57
qa/suites/rados/encoder/1-tasks.yaml
Normal file
57
qa/suites/rados/encoder/1-tasks.yaml
Normal file
@ -0,0 +1,57 @@
|
||||
tasks:
|
||||
- print: "**** install version -2 (quincy) ****"
|
||||
- install:
|
||||
branch: quincy
|
||||
exclude_packages:
|
||||
- ceph-volume
|
||||
- print: "**** done install task..."
|
||||
|
||||
- print: "**** start installing quincy cephadm ..."
|
||||
- cephadm:
|
||||
image: quay.ceph.io/ceph-ci/ceph:quincy
|
||||
compiled_cephadm_branch: quincy
|
||||
conf:
|
||||
osd:
|
||||
#set config option for which cls modules are allowed to be loaded / used
|
||||
osd_class_load_list: "*"
|
||||
osd_class_default_list: "*"
|
||||
- print: "**** done end installing quincy cephadm ..."
|
||||
|
||||
- print: "**** done start cephadm.shell ceph config set mgr..."
|
||||
- cephadm.shell:
|
||||
mon.a:
|
||||
- ceph config set mgr mgr/cephadm/use_repo_digest true --force
|
||||
- print: "**** done cephadm.shell ceph config set mgr..."
|
||||
|
||||
- print: "**** start dencoder quincy... ****"
|
||||
- workunit:
|
||||
clients:
|
||||
client.0:
|
||||
- dencoder/test-dencoder.sh
|
||||
- print: "**** done end dencoder quincy... ****"
|
||||
|
||||
- print: "**** installing N-1 version (reef) ****"
|
||||
- install:
|
||||
branch: reef
|
||||
exclude_packages:
|
||||
- ceph-volume
|
||||
- print: "**** done end installing task..."
|
||||
|
||||
- print: "**** start dencoder reef... ****"
|
||||
- workunit:
|
||||
clients:
|
||||
client.0:
|
||||
- dencoder/test-dencoder.sh
|
||||
- print: "**** done end dencoder reef... ****"
|
||||
- print: "**** installing N version (squid) ****"
|
||||
- install:
|
||||
branch: squid
|
||||
exclude_packages:
|
||||
- ceph-volume
|
||||
- print: "**** done end installing task..."
|
||||
- print: "**** start dencoder squid... ****"
|
||||
- workunit:
|
||||
clients:
|
||||
client.0:
|
||||
- dencoder/test-dencoder.sh
|
||||
- print: "**** done end dencoder squid... ****"
|
1
qa/suites/rados/encoder/supported-random-distro$
Symbolic link
1
qa/suites/rados/encoder/supported-random-distro$
Symbolic link
@ -0,0 +1 @@
|
||||
.qa/distros/supported-random-distro$
|
94
qa/tasks/dencoder.py
Normal file
94
qa/tasks/dencoder.py
Normal file
@ -0,0 +1,94 @@
|
||||
import logging
|
||||
|
||||
|
||||
from teuthology import misc
|
||||
from teuthology.orchestra import run
|
||||
from teuthology.task import Task
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class DENcoder(Task):
|
||||
"""
|
||||
This task is used to test dencoder on the data on the given device.
|
||||
The task is expected to be run on a remote host.
|
||||
The task will run the DENcoder binary on the remote host
|
||||
"""
|
||||
|
||||
def __init__(self, ctx, config):
|
||||
super(DENcoder, self).__init__(ctx, config)
|
||||
self.ctx = ctx
|
||||
self.config = config
|
||||
self.testdir = misc.get_testdir(ctx)
|
||||
self.branch_N = config.get('branch_N', 'main')
|
||||
self.branch_N_2 = config.get('branch_N-2', 'quincy')
|
||||
self.log = log
|
||||
self.log.info('Starting DENcoder task...')
|
||||
|
||||
def setup(self):
|
||||
"""
|
||||
cloning the ceph repository on the remote host
|
||||
and submodules including the ceph-object-corpus
|
||||
that way we will have the readable.sh script available
|
||||
"""
|
||||
super(DENcoder, self).setup()
|
||||
self.first_mon = next(iter(self.ctx.cluster.only(misc.get_first_mon(self.ctx, self.config)).remotes.keys()))
|
||||
self.first_mon.run(
|
||||
args=[
|
||||
'git', 'clone', '-b', self.branch_N,
|
||||
'https://github.com/ceph/ceph.git',
|
||||
'{tdir}/ceph'.format(tdir=self.testdir)
|
||||
]
|
||||
)
|
||||
self.ceph_dir = '{tdir}/ceph'.format(tdir=self.testdir)
|
||||
|
||||
self.first_mon.run(
|
||||
args=[
|
||||
'cd', '{tdir}/ceph'.format(tdir=self.testdir),
|
||||
run.Raw('&&'),
|
||||
'git', 'submodule', 'update', '--init', '--recursive'
|
||||
]
|
||||
)
|
||||
self.corpus_dir = '{ceph_dir}/ceph-object-corpus'.format(ceph_dir=self.ceph_dir)
|
||||
|
||||
def begin(self):
|
||||
"""
|
||||
Run the dencoder readable.sh script on the remote host
|
||||
find any errors in the output
|
||||
"""
|
||||
super(DENcoder, self).begin()
|
||||
self.log.info('Running DENcoder task...')
|
||||
self.log.info('Running DENcoder on the remote host...')
|
||||
# print ceph-dencoder version
|
||||
self.first_mon.run(
|
||||
args=[
|
||||
'cd', self.ceph_dir,
|
||||
run.Raw('&&'),
|
||||
'ceph-dencoder', 'version'
|
||||
]
|
||||
)
|
||||
# run first check for type ceph-dencoder type MonMap
|
||||
self.first_mon.run(
|
||||
args=[
|
||||
'ceph-dencoder', 'type', 'MonMap'
|
||||
]
|
||||
)
|
||||
|
||||
# run the readable.sh script
|
||||
self.first_mon.run(
|
||||
args=[
|
||||
'CEPH_ROOT={ceph_dir}'.format(ceph_dir=self.ceph_dir),
|
||||
'CEPH_BUILD_DIR={ceph_dir}'.format(ceph_dir=self.ceph_dir),
|
||||
'CEPH_BIN=/usr/bin',
|
||||
'CEPH_LIB=/usr/lib',
|
||||
'src/test/encoding/readable.sh','ceph-dencoder'
|
||||
]
|
||||
)
|
||||
# check for errors in the output
|
||||
|
||||
self.log.info('DENcoder task completed...')
|
||||
|
||||
def end(self):
|
||||
super(DENcoder, self).end()
|
||||
self.log.info('DENcoder task ended...')
|
||||
|
||||
task = DENcoder
|
12
qa/workunits/dencoder/test-dencoder.sh
Executable file
12
qa/workunits/dencoder/test-dencoder.sh
Executable file
@ -0,0 +1,12 @@
|
||||
#!/usr/bin/env bash
|
||||
set -ex
|
||||
CEPH_ARGS=""
|
||||
mydir=`dirname $0`
|
||||
ceph-dencoder version
|
||||
|
||||
# clone the corpus repository on the host
|
||||
git clone -b master https://github.com/ceph/ceph-object-corpus.git $CEPH_MNT/client.0/tmp/ceph-object-corpus-master
|
||||
|
||||
$mydir/test_readable.py $CEPH_MNT/client.0/tmp/ceph-object-corpus-master
|
||||
|
||||
echo $0 OK
|
338
qa/workunits/dencoder/test_readable.py
Executable file
338
qa/workunits/dencoder/test_readable.py
Executable file
@ -0,0 +1,338 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import tempfile
|
||||
import difflib
|
||||
from typing import Dict, Any
|
||||
from pathlib import Path
|
||||
import concurrent.futures
|
||||
from collections import OrderedDict
|
||||
|
||||
temp_unrec = tempfile.mktemp(prefix="unrecognized_")
|
||||
err_file_rc = tempfile.mktemp(prefix="dencoder_err_")
|
||||
|
||||
fast_shouldnt_skip = []
|
||||
backward_compat: Dict[str, Any] = {}
|
||||
incompat_paths: Dict[str, Any] = {}
|
||||
|
||||
def sort_values(obj):
|
||||
if isinstance(obj, dict):
|
||||
return OrderedDict((k, sort_values(v)) for k, v in obj.items())
|
||||
if isinstance(obj, list):
|
||||
return sorted(obj, key=sort_list_values)
|
||||
return obj
|
||||
|
||||
def sort_list_values(obj):
|
||||
if isinstance(obj, dict):
|
||||
return sorted(obj.items())
|
||||
if isinstance(obj, list):
|
||||
return sorted(obj, key=sort_list_values)
|
||||
return obj
|
||||
|
||||
|
||||
def process_type(file_path, type):
|
||||
print(f"dencoder test for {file_path}")
|
||||
cmd1 = [CEPH_DENCODER, "type", type, "import", file_path, "decode", "dump_json"]
|
||||
cmd2 = [CEPH_DENCODER, "type", type, "import", file_path, "decode", "encode", "decode", "dump_json"]
|
||||
|
||||
output1 = ""
|
||||
output2 = ""
|
||||
try:
|
||||
result1 = subprocess.run(cmd1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
output1 = result1.stdout.decode('unicode_escape')
|
||||
result2 = subprocess.run(cmd2, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
output2 = result2.stdout.decode('unicode_escape')
|
||||
|
||||
if result1.returncode != 0 or result2.returncode != 0:
|
||||
debug_print(f"**** reencode of {file_path} resulted in wrong return code ****")
|
||||
print(f"Error encountered in subprocess. Command: {cmd1}")
|
||||
print(f"Return code: {result1.returncode} Command:{result1.args} Output: {result1.stdout.decode('unicode_escape')}")
|
||||
print(f"Error encountered in subprocess. Command: {cmd2}")
|
||||
print(f"Return code: {result2.returncode} Command:{result2.args} Output: {result2.stdout.decode('unicode_escape')}")
|
||||
|
||||
with open(err_file_rc, "a") as f:
|
||||
f.write(f"{type} -- {file_path}")
|
||||
f.write("\n")
|
||||
return 1
|
||||
|
||||
if output1 != output2:
|
||||
cmd_determ = [CEPH_DENCODER, "type", type, "is_deterministic"]
|
||||
determ_res = subprocess.run(cmd_determ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
# Check if the command failed
|
||||
if determ_res.returncode != 0:
|
||||
error_message = determ_res.stderr.decode().strip()
|
||||
debug_print(f"Error running command: {error_message}")
|
||||
return 1
|
||||
|
||||
json_output1 = json.loads(output1)
|
||||
sorted_json_output1 = json.dumps(sort_values(json_output1), indent=4)
|
||||
json_output2 = json.loads(output2)
|
||||
sorted_json_output2 = json.dumps(sort_values(json_output2), indent=4)
|
||||
if sorted_json_output1 == sorted_json_output2:
|
||||
debug_print(f"non-deterministic type {type} passed the test")
|
||||
return 0
|
||||
|
||||
debug_print(f"**** reencode of {file_path} resulted in a different dump ****")
|
||||
diff_output = "\n".join(difflib.ndiff(output1.splitlines(), output2.splitlines()))
|
||||
diff_file = tempfile.mktemp(prefix=f"diff_{type}_{file_path.name}_")
|
||||
with open(diff_file, "w") as f:
|
||||
f.write(diff_output)
|
||||
print(f"Different output for {file_path}:\n{diff_output}")
|
||||
return 1 # File failed the test
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error encountered in subprocess. Command: {cmd1}")
|
||||
print(f"Return code: {e.returncode} Command:{e.cmd} Output: {e.output}")
|
||||
return 1
|
||||
|
||||
except UnicodeDecodeError as e:
|
||||
print(f"Unicode Error encountered in subprocess. Command: {cmd1}")
|
||||
print(f"Return code: {e.returncode} Command:{e.cmd} Output: {e.output}")
|
||||
return 1
|
||||
|
||||
return 0 # File passed the test
|
||||
|
||||
def test_object_wrapper(type, vdir, arversion, current_ver):
|
||||
global incompat_paths
|
||||
_numtests = 0
|
||||
_failed = 0
|
||||
unrecognized = ""
|
||||
|
||||
if subprocess.call([CEPH_DENCODER, "type", type], stderr=subprocess.DEVNULL) == 0:
|
||||
|
||||
if should_skip_object(type, arversion, current_ver) and (type not in incompat_paths or len(incompat_paths[type]) == 0):
|
||||
debug_print(f"skipping object of type {type} due to backward incompatibility")
|
||||
return (_numtests, _failed, unrecognized)
|
||||
|
||||
debug_print(f" {vdir}/objects/{type}")
|
||||
files = list(vdir.joinpath("objects", type).glob('*'))
|
||||
files_without_incompat = []
|
||||
|
||||
# Check symbolic links
|
||||
if type in incompat_paths:
|
||||
incompatible_files = set(incompat_paths[type])
|
||||
files_without_incompat = [f for f in files if f.name not in incompatible_files]
|
||||
else:
|
||||
files_without_incompat = files
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
results = [executor.submit(process_type, f, type) for f in files_without_incompat]
|
||||
|
||||
for result in concurrent.futures.as_completed(results):
|
||||
_numtests += 1
|
||||
_failed += result.result()
|
||||
else:
|
||||
unrecognized = type
|
||||
debug_print("skipping unrecognized type {} return {}".format(type, (_numtests, _failed, unrecognized)))
|
||||
|
||||
return (_numtests, _failed, unrecognized)
|
||||
|
||||
def should_skip_object(type, arversion, current_ver):
|
||||
"""
|
||||
Check if an object of a specific type should be skipped based on backward compatibility.
|
||||
|
||||
Description:
|
||||
This function determines whether an object of a given type should be skipped based on the
|
||||
provided versions and backward compatibility information. It checks the global variable
|
||||
'backward_compat' to make this decision.
|
||||
|
||||
Input:
|
||||
- type: str
|
||||
The type of the object to be checked for skipping.
|
||||
|
||||
- arversion: str
|
||||
The version from which the object is attempted to be accessed (archive version).
|
||||
|
||||
- current_ver: str
|
||||
The version of the object being processed (current version).
|
||||
|
||||
Output:
|
||||
- bool:
|
||||
True if the object should be skipped, False otherwise.
|
||||
|
||||
Note: The function relies on two global variables, 'backward_compat' and 'fast_shouldnt_skip',
|
||||
which should be defined and updated appropriately in the calling code.
|
||||
"""
|
||||
global backward_compat
|
||||
global fast_shouldnt_skip
|
||||
|
||||
if type in fast_shouldnt_skip:
|
||||
debug_print(f"fast Type {type} does not exist in the backward compatibility structure.")
|
||||
return False
|
||||
|
||||
if all(type not in v for v in backward_compat.values()):
|
||||
fast_shouldnt_skip.append(type)
|
||||
return False
|
||||
|
||||
versions = [key for key, value in backward_compat.items() if type in value and key >= arversion and key != current_ver]
|
||||
if len(versions) == 0:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def check_backward_compat():
|
||||
"""
|
||||
Check backward compatibility and collect incompatible paths for different versions and types.
|
||||
|
||||
Description:
|
||||
This function scans the 'archive' directory and identifies backward incompatible paths
|
||||
for each version and type in the archive. It creates dictionaries '_backward_compat' and
|
||||
'_incompat_paths_all' to store the results.
|
||||
|
||||
Input:
|
||||
- None (No explicit input required)
|
||||
|
||||
Output:
|
||||
- _backward_compat: dict
|
||||
A nested dictionary containing backward incompatible paths for each version and type.
|
||||
The structure is as follows:
|
||||
{
|
||||
"version_name1": {
|
||||
"type_name1": ["incompat_path1", "incompat_path2", ...],
|
||||
"type_name2": ["incompat_path3", "incompat_path4", ...],
|
||||
...
|
||||
},
|
||||
"version_name2": {
|
||||
...
|
||||
},
|
||||
...
|
||||
}
|
||||
|
||||
- _incompat_paths_all: dict
|
||||
A dictionary containing all backward incompatible paths for each type across all versions.
|
||||
The structure is as follows:
|
||||
{
|
||||
"type_name1": ["incompat_path1", "incompat_path2", ...],
|
||||
"type_name2": ["incompat_path3", "incompat_path4", ...],
|
||||
...
|
||||
}
|
||||
|
||||
Note: The function uses the global variable 'DIR', which should be defined in the calling code.
|
||||
|
||||
"""
|
||||
_backward_compat = {}
|
||||
_incompat_paths_all = {}
|
||||
archive_dir = Path(os.path.join(DIR, 'archive'))
|
||||
|
||||
if archive_dir.exists() and archive_dir.is_dir():
|
||||
for version in archive_dir.iterdir():
|
||||
if version.is_dir():
|
||||
version_name = version.name
|
||||
_backward_compat[version_name] = {}
|
||||
type_dir = archive_dir / version_name / "forward_incompat"
|
||||
if type_dir.exists() and type_dir.is_dir():
|
||||
for type_entry in type_dir.iterdir():
|
||||
if type_entry.is_dir():
|
||||
type_name = type_entry.name
|
||||
type_path = type_dir / type_name
|
||||
if type_path.exists() and type_path.is_dir():
|
||||
_incompat_paths = [incompat_entry.name for incompat_entry in type_path.iterdir() if incompat_entry.is_dir() or
|
||||
incompat_entry.is_file() or
|
||||
incompat_entry.is_symlink()]
|
||||
_backward_compat[version_name][type_name] = _incompat_paths
|
||||
_incompat_paths_all[type_name] = _incompat_paths
|
||||
_incompat_paths = []
|
||||
else:
|
||||
_backward_compat[version_name][type_entry.name] = []
|
||||
debug_print(f"backward_compat: {_backward_compat}")
|
||||
debug_print(f"incompat_paths: {_incompat_paths_all}")
|
||||
|
||||
return _backward_compat, _incompat_paths_all
|
||||
|
||||
def process_batch(batch):
|
||||
results = []
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
futures = [
|
||||
executor.submit(
|
||||
test_object_wrapper, batch_type, vdir, arversion, current_ver
|
||||
)
|
||||
for batch_type, vdir, arversion, current_ver in batch
|
||||
]
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
result_tuple = future.result()
|
||||
results.append(result_tuple)
|
||||
|
||||
return results
|
||||
|
||||
# Create a generator that processes batches asynchronously
|
||||
def async_process_batches(task_batches):
|
||||
with concurrent.futures.ProcessPoolExecutor() as executor:
|
||||
futures = [executor.submit(process_batch, batch) for batch in task_batches]
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
yield future.result()
|
||||
|
||||
def debug_print(msg):
|
||||
if debug:
|
||||
print("DEBUG: {}".format(msg))
|
||||
|
||||
|
||||
def main():
|
||||
global backward_compat
|
||||
global incompat_paths
|
||||
|
||||
failed = 0
|
||||
numtests = 0
|
||||
task_batches = []
|
||||
current_batch = []
|
||||
batch_size = 100
|
||||
|
||||
backward_compat, incompat_paths = check_backward_compat()
|
||||
debug_print(f'found {len(backward_compat)} backward incompatibilities')
|
||||
|
||||
for arversion_entry in sorted(DIR.joinpath("archive").iterdir(), key=lambda entry: entry.name):
|
||||
arversion = arversion_entry.name
|
||||
vdir = Path(DIR.joinpath("archive", arversion))
|
||||
|
||||
if not arversion_entry.is_dir() or not vdir.joinpath("objects").is_dir():
|
||||
debug_print("skipping non-directory {}".format(arversion))
|
||||
continue
|
||||
|
||||
for type_entry in vdir.joinpath("objects").iterdir():
|
||||
type = type_entry.name
|
||||
current_batch.append((type, vdir, arversion, current_ver))
|
||||
if len(current_batch) >= batch_size:
|
||||
task_batches.append(current_batch)
|
||||
current_batch = []
|
||||
|
||||
if len(current_batch) > 0:
|
||||
task_batches.append(current_batch)
|
||||
|
||||
full_unrecognized = []
|
||||
for results in async_process_batches(task_batches):
|
||||
for result in results:
|
||||
_numtests, _failed, unrecognized = result
|
||||
debug_print("numtests: {}, failed: {}".format(_numtests, _failed))
|
||||
numtests += _numtests
|
||||
failed += _failed
|
||||
if unrecognized.strip() != '':
|
||||
full_unrecognized.append(unrecognized)
|
||||
|
||||
if full_unrecognized is not None and len(full_unrecognized) > 0:
|
||||
with open(temp_unrec, "a") as file_unrec:
|
||||
file_unrec.writelines(line + "\n" for line in full_unrecognized)
|
||||
|
||||
if failed > 0:
|
||||
print("FAILED {}/{} tests.".format(failed, numtests))
|
||||
return 1
|
||||
|
||||
if numtests == 0:
|
||||
print("FAILED: no tests found to run!")
|
||||
|
||||
print("Passed {} tests.".format(numtests))
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 1:
|
||||
print(f"usage: {sys.argv[0]} <corpus-dir>")
|
||||
sys.exit(1)
|
||||
|
||||
DIR = Path(sys.argv[1])
|
||||
CEPH_DENCODER = "ceph-dencoder"
|
||||
subprocess.run([CEPH_DENCODER, 'version'], check=True)
|
||||
current_ver = subprocess.check_output([CEPH_DENCODER, "version"]).decode().strip()
|
||||
debug = False
|
||||
ret = main()
|
||||
sys.exit(ret)
|
Loading…
Reference in New Issue
Block a user