Merge pull request #40748 from tchaikov/wip-selftest-ann

pybind/mgr/selftest: add selftest to mypy and cleanups

Reviewed-by: Sebastian Wagner <swagner@suse.com>
This commit is contained in:
Kefu Chai 2021-04-16 23:51:12 +08:00 committed by GitHub
commit 11fe444b83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 200 additions and 217 deletions

View File

@ -67,6 +67,9 @@ disallow_untyped_defs = True
[mypy-rook.*]
disallow_untyped_defs = True
[mypy-selftest.*]
disallow_untyped_defs = True
# external import
[mypy-rook.rook_client.*]
disallow_untyped_defs = False

View File

@ -1,3 +1,2 @@
# flake8: noqa
from .module import Module

View File

@ -1,10 +1,19 @@
from mgr_module import MgrModule, CommandResult
import errno
from mgr_module import MgrModule, CommandResult, CLICommand, Option
import enum
import json
import random
import sys
import threading
from typing import Any, Dict, List, Optional, Tuple
# These workloads are things that can be requested to run inside the
# serve() function
class Workload(enum.Enum):
COMMAND_SPAM = 'command_spam'
THROW_EXCEPTION = 'throw_exception'
SHUTDOWN = 'shutdown'
class Module(MgrModule):
@ -18,174 +27,146 @@ class Module(MgrModule):
activities in its serve() thread.
"""
# These workloads are things that can be requested to run inside the
# serve() function
WORKLOAD_COMMAND_SPAM = "command_spam"
WORKLOAD_THROW_EXCEPTION = "throw_exception"
SHUTDOWN = "shutdown"
WORKLOADS = (WORKLOAD_COMMAND_SPAM, WORKLOAD_THROW_EXCEPTION)
# The test code in qa/ relies on these options existing -- they
# are of course not really used for anything in the module
MODULE_OPTIONS = [
{'name': 'testkey'},
{'name': 'testlkey'},
{'name': 'testnewline'},
{'name': 'roption1'},
{'name': 'roption2', 'type': 'str', 'default': 'xyz'},
{'name': 'rwoption1'},
{'name': 'rwoption2', 'type': 'int'},
{'name': 'rwoption3', 'type': 'float'},
{'name': 'rwoption4', 'type': 'str'},
{'name': 'rwoption5', 'type': 'bool'},
{'name': 'rwoption6', 'type': 'bool', 'default': True},
{'name': 'rwoption7', 'type': 'int', 'min': 1, 'max': 42},
Option(name='testkey'),
Option(name='testlkey'),
Option(name='testnewline'),
Option(name='roption1'),
Option(name='roption2',
type='str',
default='xyz'),
Option(name='rwoption1'),
Option(name='rwoption2',
type='int'),
Option(name='rwoption3',
type='float'),
Option(name='rwoption4',
type='str'),
Option(name='rwoption5',
type='bool'),
Option(name='rwoption6',
type='bool',
default=True),
Option(name='rwoption7',
type='int',
min=1,
max=42),
]
COMMANDS = [
{
"cmd": "mgr self-test run",
"desc": "Run mgr python interface tests",
"perm": "rw"
},
{
"cmd": "mgr self-test background start name=workload,type=CephString",
"desc": "Activate a background workload (one of {0})".format(
", ".join(WORKLOADS)),
"perm": "rw"
},
{
"cmd": "mgr self-test background stop",
"desc": "Stop background workload if any is running",
"perm": "rw"
},
{
"cmd": "mgr self-test config get name=key,type=CephString",
"desc": "Peek at a configuration value",
"perm": "rw"
},
{
"cmd": "mgr self-test config get_localized name=key,type=CephString",
"desc": "Peek at a configuration value (localized variant)",
"perm": "rw"
},
{
"cmd": "mgr self-test remote",
"desc": "Test inter-module calls",
"perm": "rw"
},
{
"cmd": "mgr self-test module name=module,type=CephString",
"desc": "Run another module's self_test() method",
"perm": "rw"
},
{
"cmd": "mgr self-test health set name=checks,type=CephString",
"desc": "Set a health check from a JSON-formatted description.",
"perm": "rw"
},
{
"cmd": "mgr self-test health clear name=checks,type=CephString,n=N,req=False",
"desc": "Clear health checks by name. If no names provided, clear all.",
"perm": "rw"
},
{
"cmd": "mgr self-test insights_set_now_offset name=hours,type=CephString",
"desc": "Set the now time for the insights module.",
"perm": "rw"
},
{
"cmd": "mgr self-test cluster-log name=channel,type=CephString "
"name=priority,type=CephString "
"name=message,type=CephString",
"desc": "Create an audit log record.",
"perm": "rw"
},
{
"cmd": "mgr self-test python-version",
"desc": "Query the version of the embedded Python runtime",
"perm": "r"
},
]
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self._event = threading.Event()
self._workload = None
self._health = {}
self._workload: Optional[Workload] = None
self._health: Dict[str, Dict[str, Any]] = {}
def handle_command(self, inbuf, command):
if command['prefix'] == 'mgr self-test python-version':
major = sys.version_info.major
minor = sys.version_info.minor
micro = sys.version_info.micro
return 0, f'{major}.{minor}.{micro}', ''
@CLICommand('mgr self-test python-version', perm='r')
def python_version(self) -> Tuple[int, str, str]:
'''
Query the version of the embedded Python runtime
'''
major = sys.version_info.major
minor = sys.version_info.minor
micro = sys.version_info.micro
return 0, f'{major}.{minor}.{micro}', ''
elif command['prefix'] == 'mgr self-test run':
self._self_test()
return 0, '', 'Self-test succeeded'
@CLICommand('mgr self-test run')
def run(self) -> Tuple[int, str, str]:
'''
Run mgr python interface tests
'''
self._self_test()
return 0, '', 'Self-test succeeded'
elif command['prefix'] == 'mgr self-test background start':
if command['workload'] not in self.WORKLOADS:
return (-errno.EINVAL, '',
"Workload not found '{0}'".format(command['workload']))
self._workload = command['workload']
@CLICommand('mgr self-test background start')
def backgroun_start(self, workload: Workload) -> Tuple[int, str, str]:
'''
Activate a background workload (one of command_spam, throw_exception)
'''
self._workload = workload
self._event.set()
return 0, '', 'Running `{0}` in background'.format(self._workload)
@CLICommand('mgr self-test background stop')
def background_stop(self) -> Tuple[int, str, str]:
'''
Stop background workload if any is running
'''
if self._workload:
was_running = self._workload
self._workload = None
self._event.set()
return 0, '', 'Running `{0}` in background'.format(self._workload)
elif command['prefix'] == 'mgr self-test background stop':
if self._workload:
was_running = self._workload
self._workload = None
self._event.set()
return 0, '', 'Stopping background workload `{0}`'.format(
was_running)
else:
return 0, '', 'No background workload was running'
elif command['prefix'] == 'mgr self-test config get':
return 0, str(self.get_module_option(command['key'])), ''
elif command['prefix'] == 'mgr self-test config get_localized':
return 0, str(self.get_localized_module_option(command['key'])), ''
elif command['prefix'] == 'mgr self-test remote':
self._test_remote_calls()
return 0, '', 'Successfully called'
elif command['prefix'] == 'mgr self-test module':
try:
r = self.remote(command['module'], "self_test")
except RuntimeError as e:
return -1, '', "Test failed: {0}".format(e)
else:
return 0, str(r), "Self-test OK"
elif command['prefix'] == 'mgr self-test health set':
return self._health_set(inbuf, command)
elif command['prefix'] == 'mgr self-test health clear':
return self._health_clear(inbuf, command)
elif command['prefix'] == 'mgr self-test insights_set_now_offset':
return self._insights_set_now_offset(inbuf, command)
elif command['prefix'] == 'mgr self-test cluster-log':
priority_map = {
'info': self.ClusterLogPrio.INFO,
'security': self.ClusterLogPrio.SEC,
'warning': self.ClusterLogPrio.WARN,
'error': self.ClusterLogPrio.ERROR
}
self.cluster_log(command['channel'],
priority_map[command['priority']],
command['message'])
return 0, '', 'Successfully called'
return 0, '', 'Stopping background workload `{0}`'.format(
was_running)
else:
return (-errno.EINVAL, '',
"Command not found '{0}'".format(command['prefix']))
return 0, '', 'No background workload was running'
def _health_set(self, inbuf, command):
@CLICommand('mgr self-test config get')
def config_get(self, key: str) -> Tuple[int, str, str]:
'''
Peek at a configuration value
'''
return 0, str(self.get_module_option(key)), ''
@CLICommand('mgr self-test config get_localized')
def config_get_localized(self, key: str) -> Tuple[int, str, str]:
'''
Peek at a configuration value (localized variant)
'''
return 0, str(self.get_localized_module_option(key)), ''
@CLICommand('mgr self-test remote')
def test_remote(self) -> Tuple[int, str, str]:
'''
Test inter-module calls
'''
self._test_remote_calls()
return 0, '', 'Successfully called'
@CLICommand('mgr self-test module')
def module(self, module: str) -> Tuple[int, str, str]:
'''
Run another module's self_test() method
'''
try:
checks = json.loads(command["checks"])
r = self.remote(module, "self_test")
except RuntimeError as e:
return -1, '', "Test failed: {0}".format(e)
else:
return 0, str(r), "Self-test OK"
@CLICommand('mgr self-test cluster-log')
def do_cluster_log(self,
channel: str,
priority: str,
message: str) -> Tuple[int, str, str]:
'''
Create an audit log record.
'''
priority_map = {
'info': self.ClusterLogPrio.INFO,
'security': self.ClusterLogPrio.SEC,
'warning': self.ClusterLogPrio.WARN,
'error': self.ClusterLogPrio.ERROR
}
self.cluster_log(channel,
priority_map[priority],
message)
return 0, '', 'Successfully called'
@CLICommand('mgr self-test health set')
def health_set(self, checks: str) -> Tuple[int, str, str]:
'''
Set a health check from a JSON-formatted description.
'''
try:
health_check = json.loads(checks)
except Exception as e:
return -1, "", "Failed to decode JSON input: {}".format(e)
try:
for check, info in checks.items():
for check, info in health_check.items():
self._health[check] = {
"severity": str(info["severity"]),
"summary": str(info["summary"]),
@ -198,9 +179,13 @@ class Module(MgrModule):
self.set_health_checks(self._health)
return 0, "", ""
def _health_clear(self, inbuf, command):
if "checks" in command:
for check in command["checks"]:
@CLICommand('mgr self-test health clear')
def health_clear(self, checks: Optional[List[str]] = None) -> Tuple[int, str, str]:
'''
Clear health checks by name. If no names provided, clear all.
'''
if checks is not None:
for check in checks:
if check in self._health:
del self._health[check]
else:
@ -209,16 +194,15 @@ class Module(MgrModule):
self.set_health_checks(self._health)
return 0, "", ""
def _insights_set_now_offset(self, inbuf, command):
try:
hours = int(command["hours"])
except Exception as e:
return -1, "", "Timestamp must be numeric: {}".format(e)
@CLICommand('mgr self-test insights_set_now_offset')
def insights_set_now_offset(self, hours: int) -> Tuple[int, str, str]:
'''
Set the now time for the insights module.
'''
self.remote("insights", "testing_set_now_time_offset", hours)
return 0, "", ""
def _self_test(self):
def _self_test(self) -> None:
self.log.info("Running self-test procedure...")
self._self_test_osdmap()
@ -228,7 +212,7 @@ class Module(MgrModule):
self._self_test_misc()
self._self_test_perf_counters()
def _self_test_getters(self):
def _self_test_getters(self) -> None:
self.version
self.get_context()
self.get_mgr_id()
@ -238,26 +222,26 @@ class Module(MgrModule):
# not have gone by the time we call another function referring to it
objects = [
"fs_map",
"osdmap_crush_map_text",
"osd_map",
"config",
"mon_map",
"service_map",
"osd_metadata",
"pg_summary",
"pg_status",
"pg_dump",
"pg_ready",
"df",
"pg_stats",
"pool_stats",
"osd_stats",
"osd_ping_times",
"health",
"mon_status",
"mgr_map"
]
"fs_map",
"osdmap_crush_map_text",
"osd_map",
"config",
"mon_map",
"service_map",
"osd_metadata",
"pg_summary",
"pg_status",
"pg_dump",
"pg_ready",
"df",
"pg_stats",
"pool_stats",
"osd_stats",
"osd_ping_times",
"health",
"mon_status",
"mgr_map"
]
for obj in objects:
assert self.get(obj) is not None
@ -273,9 +257,8 @@ class Module(MgrModule):
self.get_metadata("osd", str(osd_id))
self.get_daemon_status("osd", "0")
#send_command
def _self_test_config(self):
def _self_test_config(self) -> None:
# This is not a strong test (can't tell if values really
# persisted), it's just for the python interface bit.
@ -366,26 +349,25 @@ class Module(MgrModule):
assert isinstance(value, bool)
assert value is True
def _self_test_store(self):
def _self_test_store(self) -> None:
existing_keys = set(self.get_store_prefix("test").keys())
self.set_store("testkey", "testvalue")
assert self.get_store("testkey") == "testvalue"
assert sorted(self.get_store_prefix("test").keys()) == sorted(
list({"testkey"} | existing_keys))
assert (set(self.get_store_prefix("test").keys())
== {"testkey"} | existing_keys)
def _self_test_perf_counters(self):
def _self_test_perf_counters(self) -> None:
self.get_perf_schema("osd", "0")
self.get_counter("osd", "0", "osd.op")
#get_counter
#get_all_perf_coutners
# get_counter
# get_all_perf_coutners
def _self_test_misc(self):
def _self_test_misc(self) -> None:
self.set_uri("http://this.is.a.test.com")
self.set_health_checks({})
def _self_test_osdmap(self):
def _self_test_osdmap(self) -> None:
osdmap = self.get_osdmap()
osdmap.get_epoch()
osdmap.get_crush_version()
@ -403,16 +385,16 @@ class Module(MgrModule):
crush.find_takes()
crush.get_take_weight_osd_map(-1)
#osdmap.get_pools_by_take()
#osdmap.calc_pg_upmaps()
#osdmap.map_pools_pgs_up()
# osdmap.get_pools_by_take()
# osdmap.calc_pg_upmaps()
# osdmap.map_pools_pgs_up()
#inc.set_osd_reweights
#inc.set_crush_compat_weight_set_weights
# inc.set_osd_reweights
# inc.set_crush_compat_weight_set_weights
self.log.info("Finished self-test procedure.")
def _test_remote_calls(self):
def _test_remote_calls(self) -> None:
# Test making valid call
self.remote("influx", "self_test")
@ -449,21 +431,19 @@ class Module(MgrModule):
else:
raise RuntimeError("KeyError not raised")
def remote_from_orchestrator_cli_self_test(self, what):
def remote_from_orchestrator_cli_self_test(self, what: str) -> Any:
import orchestrator
if what == 'OrchestratorError':
c = orchestrator.OrchResult(result=None, exception=orchestrator.OrchestratorError('hello, world'))
return c
return orchestrator.OrchResult(result=None, exception=orchestrator.OrchestratorError('hello, world'))
elif what == "ZeroDivisionError":
c = orchestrator.OrchResult(result=None, exception=ZeroDivisionError('hello, world'))
return c
return orchestrator.OrchResult(result=None, exception=ZeroDivisionError('hello, world'))
assert False, repr(what)
def shutdown(self):
self._workload = self.SHUTDOWN
def shutdown(self) -> None:
self._workload = Workload.SHUTDOWN
self._event.set()
def _command_spam(self):
def _command_spam(self) -> None:
self.log.info("Starting command_spam workload...")
while not self._event.is_set():
osdmap = self.get_osdmap()
@ -476,23 +456,22 @@ class Module(MgrModule):
self.send_command(result, 'mon', '', json.dumps({
'prefix': 'osd reweight',
'id': i,
'weight': w
}), '')
'weight': w}), '')
crush = osdmap.get_crush().dump()
_ = osdmap.get_crush().dump()
r, outb, outs = result.wait()
self._event.clear()
self.log.info("Ended command_spam workload...")
def serve(self):
def serve(self) -> None:
while True:
if self._workload == self.WORKLOAD_COMMAND_SPAM:
if self._workload == Workload.COMMAND_SPAM:
self._command_spam()
elif self._workload == self.SHUTDOWN:
elif self._workload == Workload.SHUTDOWN:
self.log.info("Shutting down...")
break
elif self._workload == self.WORKLOAD_THROW_EXCEPTION:
elif self._workload == Workload.THROW_EXCEPTION:
raise RuntimeError("Synthetic exception in serve")
else:
self.log.info("Waiting for workload request...")

View File

@ -82,6 +82,7 @@ commands =
-m rbd_support \
-m rook \
-m snap_schedule \
-m selftest \
-m stats \
-m status \
-m telegraf \
@ -133,6 +134,7 @@ modules =
localpool
orchestrator
prometheus
selftest
commands =
flake8 --config=tox.ini {posargs} \
{posargs:{[testenv:flake8]modules}}