ceph/monitoring/ceph-mixin/tests_alerts/validate_rules.py
Arthur Outhenin-Chalandre 98236e3a1d
mgr/dashboard: monitoring: refactor into ceph-mixin
Mixin is a way to bundle dashboards, prometheus rules and alerts into
jsonnet package. Shifting to mixin will allow easier integration with
monitoring automation that some users may use.

This commit moves `/monitoring/grafana/dashboards` and
`/monitoring/prometheus` to `/monitoring/ceph-mixin`. Prometheus alerts
was also converted to Jsonnet using an automated way (from yaml to json
to jsonnet). This commit minimises any change made to the generated files
and should not change neithers the dashboards nor the Prometheus alerts.

In the future some configuration will also be added to jsonnet to add
more functionalities to the dashboards or alerts (i.e.: multi cluster).

Fixes: https://tracker.ceph.com/issues/53374
Signed-off-by: Arthur Outhenin-Chalandre <arthur.outhenin-chalandre@cern.ch>
2022-02-03 13:08:20 +01:00

572 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Check the Prometheus rules for format, and integration
# with the unit tests. This script has the following exit
# codes:
# 0 .. Everything worked
# 4 .. rule problems or missing unit tests
# 8 .. Missing fields in YAML
# 12 .. Invalid YAML - unable to load
# 16 .. Missing input files
#
# Externals
# snmptranslate .. used to determine the oid's in the MIB to verify the rule -> MIB is correct
#
import re
import os
import sys
import yaml
import shutil
import string
from bs4 import BeautifulSoup
from typing import List, Any, Dict, Set, Optional, Tuple
import subprocess
import urllib.request
import urllib.error
from urllib.parse import urlparse
from settings import ALERTS_FILE, MIB_FILE, UNIT_TESTS_FILE
DOCLINK_NAME = 'documentation'
def isascii(s: str) -> bool:
try:
s.encode('ascii')
except UnicodeEncodeError:
return False
return True
def read_file(file_name: str) -> Tuple[str, str]:
try:
with open(file_name, 'r') as input_file:
raw_data = input_file.read()
except OSError:
return '', f"Unable to open {file_name}"
return raw_data, ''
def load_yaml(file_name: str) -> Tuple[Dict[str, Any], str]:
data = {}
errs = ''
raw_data, err = read_file(file_name)
if not err:
try:
data = yaml.safe_load(raw_data)
except yaml.YAMLError as e:
errs = f"filename '{file_name} is not a valid YAML file"
return data, errs
def run_command(command: str):
c = command.split()
completion = subprocess.run(c, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return (completion.returncode,
completion.stdout.decode('utf-8').split('\n'),
completion.stderr.decode('utf-8').split('\n'))
class HTMLCache:
def __init__(self) -> None:
self.cache: Dict[str, Tuple[int, str]] = {}
def fetch(self, url_str: str) -> None:
parsed = urlparse(url_str)
url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
if url in self.cache:
return self.cache[url]
req = urllib.request.Request(url)
try:
r = urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
self.cache[url] = e.code, e.reason
return self.cache[url]
except urllib.error.URLError as e:
self.cache[url] = 400, e.reason
return self.cache[url]
if r.status == 200:
html = r.read().decode('utf-8')
self.cache[url] = 200, html
return self.cache[url]
self.cache[url] = r.status, r.reason
return r.status, r.reason
@property
def cached_pages(self) -> List[str]:
return self.cache.keys()
@property
def cached_pages_total(self) -> int:
return len(self.cache.keys())
class PrometheusRule:
expected_attrs = [
'alert',
'expr',
'labels',
'annotations'
]
def __init__(self, rule_group, rule_data: Dict[str, Any]):
assert 'alert' in rule_data
self.group: RuleGroup = rule_group
self.name = rule_data.get('alert')
self.rule = rule_data
self.errors: List[str] = []
self.warnings: List[str] = []
self.validate()
@property
def has_oid(self):
return True if self.rule.get('labels', {}).get('oid', '') else False
@property
def labels(self) -> Dict[str, str]:
return self.rule.get('labels', {})
@property
def annotations(self) -> Dict[str, str]:
return self.rule.get('annotations', {})
def _check_alert_name(self):
# this is simplistic, but works in the context of the alert name
if self.name[0] in string.ascii_uppercase and \
self.name != self.name.lower() and \
self.name != self.name.upper() and \
" " not in self.name and \
"_" not in self.name:
return
self.warnings.append("Alert name is not in CamelCase format")
def _check_structure(self):
rule_attrs = self.rule.keys()
missing_attrs = [a for a in PrometheusRule.expected_attrs if a not in rule_attrs]
if missing_attrs:
self.errors.append(
f"invalid alert structure. Missing field{'s' if len(missing_attrs) > 1 else ''}"
f": {','.join(missing_attrs)}")
def _check_labels(self):
for rqd in ['severity', 'type']:
if rqd not in self.labels.keys():
self.errors.append(f"rule is missing {rqd} label definition")
def _check_annotations(self):
for rqd in ['summary', 'description']:
if rqd not in self.annotations:
self.errors.append(f"rule is missing {rqd} annotation definition")
def _check_doclink(self):
doclink = self.annotations.get(DOCLINK_NAME, '')
if doclink:
url = urlparse(doclink)
status, content = self.group.fetch_html_page(doclink)
if status == 200:
if url.fragment:
soup = BeautifulSoup(content, 'html.parser')
if not soup.find(id=url.fragment):
self.errors.append(f"documentation link error: {url.fragment} anchor not found on the page")
else:
# catch all
self.errors.append(f"documentation link error: {status} {content}")
def _check_snmp(self):
oid = self.labels.get('oid', '')
if self.labels.get('severity', '') == 'critical' and not oid:
self.warnings.append("critical level alert is missing an SNMP oid entry")
if oid and not re.search('^1.3.6.1.4.1.50495.1.2.\\d+.\\d+.\\d+$', oid):
self.errors.append("invalid OID format provided")
if self.group.get_oids():
if oid and oid not in self.group.get_oids():
self.errors.append(f"rule defines an OID {oid} that is missing from the MIB file({os.path.basename(MIB_FILE)})")
def _check_ascii(self):
if 'oid' not in self.labels:
return
desc = self.annotations.get('description', '')
summary = self.annotations.get('summary', '')
if not isascii(desc):
self.errors.append(f"non-ascii characters found in 'description' field will cause issues in associated snmp trap.")
if not isascii(summary):
self.errors.append(f"non-ascii characters found in 'summary' field will cause issues in associated snmp trap.")
def validate(self):
self._check_alert_name()
self._check_structure()
self._check_labels()
self._check_annotations()
self._check_doclink()
self._check_snmp()
self._check_ascii()
char = '.'
if self.errors:
char = 'E'
self.group.update('error', self.name)
elif self.warnings:
char = 'W'
self.group.update('warning', self.name)
sys.stdout.write(char)
class RuleGroup:
def __init__(self, rule_file, group_name: str, group_name_width: int):
self.rule_file: RuleFile = rule_file
self.group_name = group_name
self.rules: Dict[str, PrometheusRule] = {}
self.problems = {
"error": [],
"warning": [],
}
sys.stdout.write(f"\n\t{group_name:<{group_name_width}} : ")
def add_rule(self, rule_data:Dict[str, Any]):
alert_name = rule_data.get('alert')
self.rules[alert_name] = PrometheusRule(self, rule_data)
def update(self, problem_type:str, alert_name:str):
assert problem_type in ['error', 'warning']
self.problems[problem_type].append(alert_name)
self.rule_file.update(self.group_name)
def fetch_html_page(self, url):
return self.rule_file.fetch_html_page(url)
def get_oids(self):
return self.rule_file.oid_list
@property
def error_count(self):
return len(self.problems['error'])
def warning_count(self):
return len(self.problems['warning'])
@property
def count(self):
return len(self.rules)
class RuleFile:
def __init__(self, parent, file_name, rules, oid_list):
self.parent = parent
self.file_name = file_name
self.rules: Dict[str, Any] = rules
self.oid_list = oid_list
self.problems: Set[str] = set()
self.group: Dict[str, RuleGroup] = {}
self.alert_names_seen: Set[str] = set()
self.duplicate_alert_names:List[str] = []
self.html_cache = HTMLCache()
assert 'groups' in self.rules
self.max_group_name_width = self.get_max_group_name()
self.load_groups()
def update(self, group_name):
self.problems.add(group_name)
self.parent.mark_invalid()
def fetch_html_page(self, url):
return self.html_cache.fetch(url)
@property
def group_count(self):
return len(self.rules['groups'])
@property
def rule_count(self):
rule_count = 0
for _group_name, rule_group in self.group.items():
rule_count += rule_group.count
return rule_count
@property
def oid_count(self):
oid_count = 0
for _group_name, rule_group in self.group.items():
for _rule_name, rule in rule_group.rules.items():
if rule.has_oid:
oid_count += 1
return oid_count
@property
def group_names(self):
return self.group.keys()
@property
def problem_count(self):
return len(self.problems)
def get_max_group_name(self):
group_name_list = []
for group in self.rules.get('groups'):
group_name_list.append(group['name'])
return max([len(g) for g in group_name_list])
def load_groups(self):
sys.stdout.write("\nChecking rule groups")
for group in self.rules.get('groups'):
group_name = group['name']
rules = group['rules']
self.group[group_name] = RuleGroup(self, group_name, self.max_group_name_width)
for rule_data in rules:
if 'alert' in rule_data:
alert_name = rule_data.get('alert')
if alert_name in self.alert_names_seen:
self.duplicate_alert_names.append(alert_name)
else:
self.alert_names_seen.add(alert_name)
self.group[group_name].add_rule(rule_data)
else:
# skipped recording rule
pass
def report(self):
def max_width(item_list: Set[str], min_width: int = 0) -> int:
return max([len(i) for i in item_list] + [min_width])
if not self.problems and not self.duplicate_alert_names:
print("\nNo problems detected in the rule file")
return
print("\nProblem Report\n")
group_width = max_width(self.problems, 5)
alert_names = set()
for g in self.problems:
group = self.group[g]
alert_names.update(group.problems.get('error', []))
alert_names.update(group.problems.get('warning', []))
alert_width = max_width(alert_names, 10)
template = " {group:<{group_width}} {severity:<8} {alert_name:<{alert_width}} {description}"
print(template.format(
group="Group",
group_width=group_width,
severity="Severity",
alert_name="Alert Name",
alert_width=alert_width,
description="Problem Description"))
print(template.format(
group="-----",
group_width=group_width,
severity="--------",
alert_name="----------",
alert_width=alert_width,
description="-------------------"))
for group_name in sorted(self.problems):
group = self.group[group_name]
rules = group.rules
for alert_name in group.problems.get('error', []):
for desc in rules[alert_name].errors:
print(template.format(
group=group_name,
group_width=group_width,
severity="Error",
alert_name=alert_name,
alert_width=alert_width,
description=desc))
for alert_name in group.problems.get('warning', []):
for desc in rules[alert_name].warnings:
print(template.format(
group=group_name,
group_width=group_width,
severity="Warning",
alert_name=alert_name,
alert_width=alert_width,
description=desc))
if self.duplicate_alert_names:
print("Duplicate alert names detected:")
for a in self.duplicate_alert_names:
print(f" - {a}")
class UnitTests:
expected_attrs = [
'rule_files',
'tests',
'evaluation_interval'
]
def __init__(self, filename):
self.filename = filename
self.unit_test_data: Dict[str, Any] = {}
self.alert_names_seen: Set[str] = set()
self.problems: List[str] = []
self.load()
def load(self):
self.unit_test_data, errs = load_yaml(self.filename)
if errs:
print(f"\n\nError in unit tests file: {errs}")
sys.exit(12)
missing_attr = [a for a in UnitTests.expected_attrs if a not in self.unit_test_data.keys()]
if missing_attr:
print(f"\nMissing attributes in unit tests: {','.join(missing_attr)}")
sys.exit(8)
def _check_alert_names(self, alert_names: List[str]):
alerts_tested: Set[str] = set()
for t in self.unit_test_data.get('tests'):
test_cases = t.get('alert_rule_test', [])
if not test_cases:
continue
for case in test_cases:
alertname = case.get('alertname', '')
if alertname:
alerts_tested.add(alertname)
alerts_defined = set(alert_names)
self.problems = list(alerts_defined.difference(alerts_tested))
def process(self, defined_alert_names: List[str]):
self._check_alert_names(defined_alert_names)
def report(self) -> None:
if not self.problems:
print("\nNo problems detected in unit tests file")
return
print("\nUnit tests are incomplete. Tests missing for the following alerts;")
for p in self.problems:
print(f" - {p}")
class RuleChecker:
def __init__(self, rules_filename: str = None, test_filename: str = None):
self.rules_filename = rules_filename or ALERTS_FILE
self.test_filename = test_filename or UNIT_TESTS_FILE
self.rule_file: Optional[RuleFile] = None
self.unit_tests: Optional[UnitTests] = None
self.rule_file_problems: bool = False
self.errors = {}
self.warnings = {}
self.error_count = 0
self.warning_count = 0
self.oid_count = 0
self.oid_list = self.build_oid_list()
def build_oid_list(self) -> List[str]:
cmd = shutil.which('snmptranslate')
if not cmd:
return []
rc, stdout, stderr = run_command(f"{cmd} -Pu -Tz -M ../../snmp:/usr/share/snmp/mibs -m CEPH-MIB")
if rc != 0:
return []
oid_list: List[str] = []
for line in stdout[:-1]:
_label, oid = line.replace('"', '').replace('\t', ' ').split()
oid_list.append(oid)
return oid_list
@property
def status(self):
if self.rule_file_problems or self.unit_tests.problems:
return 4
return 0
def mark_invalid(self):
self.rule_file_problems = True
def summarise_rule_file(self):
for group_name in self.rule_file.problems:
group = self.rule_file.group[group_name]
self.error_count += len(group.problems['error'])
self.warning_count += len(group.problems['warning'])
def ready(self):
errs: List[str] = []
ready_state = True
if not os.path.exists(self.rules_filename):
errs.append(f"rule file '{self.rules_filename}' not found")
ready_state = False
if not os.path.exists(self.test_filename):
errs.append(f"test file '{self.test_filename}' not found")
ready_state = False
return ready_state, errs
def run(self):
ready, errs = self.ready()
if not ready:
print("Unable to start:")
for e in errs:
print(f"- {e}")
sys.exit(16)
rules, errs = load_yaml(self.rules_filename)
if errs:
print(errs)
sys.exit(12)
self.rule_file = RuleFile(self, self.rules_filename, rules, self.oid_list)
self.summarise_rule_file()
self.unit_tests = UnitTests(self.test_filename)
self.unit_tests.process(self.rule_file.alert_names_seen)
def report(self):
print("\n\nSummary\n")
print(f"Rule file : {self.rules_filename}")
print(f"Unit Test file : {self.test_filename}")
print(f"\nRule groups processed : {self.rule_file.group_count:>3}")
print(f"Rules processed : {self.rule_file.rule_count:>3}")
print(f"SNMP OIDs declared : {self.rule_file.oid_count:>3} {'(snmptranslate missing, unable to cross check)' if not self.oid_list else ''}")
print(f"Rule errors : {self.error_count:>3}")
print(f"Rule warnings : {self.warning_count:>3}")
print(f"Rule name duplicates : {len(self.rule_file.duplicate_alert_names):>3}")
print(f"Unit tests missing : {len(self.unit_tests.problems):>3}")
self.rule_file.report()
self.unit_tests.report()
def main():
checker = RuleChecker()
checker.run()
checker.report()
print()
sys.exit(checker.status)
if __name__ == '__main__':
main()