mirror of https://github.com/ceph/ceph
572 lines
18 KiB
Python
Executable File
572 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
#
|
|
# Check the Prometheus rules for format, and integration
|
|
# with the unit tests. This script has the following exit
|
|
# codes:
|
|
# 0 .. Everything worked
|
|
# 4 .. rule problems or missing unit tests
|
|
# 8 .. Missing fields in YAML
|
|
# 12 .. Invalid YAML - unable to load
|
|
# 16 .. Missing input files
|
|
#
|
|
# Externals
|
|
# snmptranslate .. used to determine the oid's in the MIB to verify the rule -> MIB is correct
|
|
#
|
|
|
|
import re
|
|
import os
|
|
import sys
|
|
import yaml
|
|
import shutil
|
|
import string
|
|
from bs4 import BeautifulSoup
|
|
from typing import List, Any, Dict, Set, Optional, Tuple
|
|
import subprocess
|
|
|
|
import urllib.request
|
|
import urllib.error
|
|
from urllib.parse import urlparse
|
|
|
|
from settings import ALERTS_FILE, MIB_FILE, UNIT_TESTS_FILE
|
|
|
|
DOCLINK_NAME = 'documentation'
|
|
|
|
|
|
def isascii(s: str) -> bool:
|
|
try:
|
|
s.encode('ascii')
|
|
except UnicodeEncodeError:
|
|
return False
|
|
return True
|
|
|
|
|
|
def read_file(file_name: str) -> Tuple[str, str]:
|
|
try:
|
|
with open(file_name, 'r') as input_file:
|
|
raw_data = input_file.read()
|
|
except OSError:
|
|
return '', f"Unable to open {file_name}"
|
|
|
|
return raw_data, ''
|
|
|
|
|
|
def load_yaml(file_name: str) -> Tuple[Dict[str, Any], str]:
|
|
data = {}
|
|
errs = ''
|
|
|
|
raw_data, err = read_file(file_name)
|
|
if not err:
|
|
|
|
try:
|
|
data = yaml.safe_load(raw_data)
|
|
except yaml.YAMLError as e:
|
|
errs = f"filename '{file_name} is not a valid YAML file"
|
|
|
|
return data, errs
|
|
|
|
|
|
def run_command(command: str):
|
|
c = command.split()
|
|
completion = subprocess.run(c, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
return (completion.returncode,
|
|
completion.stdout.decode('utf-8').split('\n'),
|
|
completion.stderr.decode('utf-8').split('\n'))
|
|
|
|
|
|
class HTMLCache:
|
|
def __init__(self) -> None:
|
|
self.cache: Dict[str, Tuple[int, str]] = {}
|
|
|
|
def fetch(self, url_str: str) -> None:
|
|
parsed = urlparse(url_str)
|
|
url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
|
|
|
|
if url in self.cache:
|
|
return self.cache[url]
|
|
|
|
req = urllib.request.Request(url)
|
|
try:
|
|
r = urllib.request.urlopen(req)
|
|
except urllib.error.HTTPError as e:
|
|
self.cache[url] = e.code, e.reason
|
|
return self.cache[url]
|
|
except urllib.error.URLError as e:
|
|
self.cache[url] = 400, e.reason
|
|
return self.cache[url]
|
|
|
|
if r.status == 200:
|
|
html = r.read().decode('utf-8')
|
|
self.cache[url] = 200, html
|
|
return self.cache[url]
|
|
|
|
self.cache[url] = r.status, r.reason
|
|
return r.status, r.reason
|
|
|
|
@property
|
|
def cached_pages(self) -> List[str]:
|
|
return self.cache.keys()
|
|
|
|
@property
|
|
def cached_pages_total(self) -> int:
|
|
return len(self.cache.keys())
|
|
|
|
class PrometheusRule:
|
|
expected_attrs = [
|
|
'alert',
|
|
'expr',
|
|
'labels',
|
|
'annotations'
|
|
]
|
|
|
|
def __init__(self, rule_group, rule_data: Dict[str, Any]):
|
|
|
|
assert 'alert' in rule_data
|
|
self.group: RuleGroup = rule_group
|
|
self.name = rule_data.get('alert')
|
|
self.rule = rule_data
|
|
self.errors: List[str] = []
|
|
self.warnings: List[str] = []
|
|
self.validate()
|
|
|
|
@property
|
|
def has_oid(self):
|
|
return True if self.rule.get('labels', {}).get('oid', '') else False
|
|
|
|
@property
|
|
def labels(self) -> Dict[str, str]:
|
|
return self.rule.get('labels', {})
|
|
|
|
@property
|
|
def annotations(self) -> Dict[str, str]:
|
|
return self.rule.get('annotations', {})
|
|
|
|
def _check_alert_name(self):
|
|
# this is simplistic, but works in the context of the alert name
|
|
if self.name[0] in string.ascii_uppercase and \
|
|
self.name != self.name.lower() and \
|
|
self.name != self.name.upper() and \
|
|
" " not in self.name and \
|
|
"_" not in self.name:
|
|
return
|
|
|
|
self.warnings.append("Alert name is not in CamelCase format")
|
|
|
|
def _check_structure(self):
|
|
rule_attrs = self.rule.keys()
|
|
missing_attrs = [a for a in PrometheusRule.expected_attrs if a not in rule_attrs]
|
|
|
|
if missing_attrs:
|
|
self.errors.append(
|
|
f"invalid alert structure. Missing field{'s' if len(missing_attrs) > 1 else ''}"
|
|
f": {','.join(missing_attrs)}")
|
|
|
|
def _check_labels(self):
|
|
for rqd in ['severity', 'type']:
|
|
if rqd not in self.labels.keys():
|
|
self.errors.append(f"rule is missing {rqd} label definition")
|
|
|
|
def _check_annotations(self):
|
|
for rqd in ['summary', 'description']:
|
|
if rqd not in self.annotations:
|
|
self.errors.append(f"rule is missing {rqd} annotation definition")
|
|
|
|
def _check_doclink(self):
|
|
doclink = self.annotations.get(DOCLINK_NAME, '')
|
|
|
|
if doclink:
|
|
url = urlparse(doclink)
|
|
status, content = self.group.fetch_html_page(doclink)
|
|
if status == 200:
|
|
if url.fragment:
|
|
soup = BeautifulSoup(content, 'html.parser')
|
|
if not soup.find(id=url.fragment):
|
|
self.errors.append(f"documentation link error: {url.fragment} anchor not found on the page")
|
|
else:
|
|
# catch all
|
|
self.errors.append(f"documentation link error: {status} {content}")
|
|
|
|
def _check_snmp(self):
|
|
oid = self.labels.get('oid', '')
|
|
|
|
if self.labels.get('severity', '') == 'critical' and not oid:
|
|
self.warnings.append("critical level alert is missing an SNMP oid entry")
|
|
if oid and not re.search('^1.3.6.1.4.1.50495.1.2.\\d+.\\d+.\\d+$', oid):
|
|
self.errors.append("invalid OID format provided")
|
|
if self.group.get_oids():
|
|
if oid and oid not in self.group.get_oids():
|
|
self.errors.append(f"rule defines an OID {oid} that is missing from the MIB file({os.path.basename(MIB_FILE)})")
|
|
|
|
def _check_ascii(self):
|
|
if 'oid' not in self.labels:
|
|
return
|
|
|
|
desc = self.annotations.get('description', '')
|
|
summary = self.annotations.get('summary', '')
|
|
if not isascii(desc):
|
|
self.errors.append(f"non-ascii characters found in 'description' field will cause issues in associated snmp trap.")
|
|
if not isascii(summary):
|
|
self.errors.append(f"non-ascii characters found in 'summary' field will cause issues in associated snmp trap.")
|
|
|
|
def validate(self):
|
|
|
|
self._check_alert_name()
|
|
self._check_structure()
|
|
self._check_labels()
|
|
self._check_annotations()
|
|
self._check_doclink()
|
|
self._check_snmp()
|
|
self._check_ascii()
|
|
char = '.'
|
|
|
|
if self.errors:
|
|
char = 'E'
|
|
self.group.update('error', self.name)
|
|
elif self.warnings:
|
|
char = 'W'
|
|
self.group.update('warning', self.name)
|
|
|
|
sys.stdout.write(char)
|
|
|
|
|
|
class RuleGroup:
|
|
|
|
def __init__(self, rule_file, group_name: str, group_name_width: int):
|
|
self.rule_file: RuleFile = rule_file
|
|
self.group_name = group_name
|
|
self.rules: Dict[str, PrometheusRule] = {}
|
|
self.problems = {
|
|
"error": [],
|
|
"warning": [],
|
|
}
|
|
|
|
sys.stdout.write(f"\n\t{group_name:<{group_name_width}} : ")
|
|
|
|
def add_rule(self, rule_data:Dict[str, Any]):
|
|
alert_name = rule_data.get('alert')
|
|
self.rules[alert_name] = PrometheusRule(self, rule_data)
|
|
|
|
def update(self, problem_type:str, alert_name:str):
|
|
assert problem_type in ['error', 'warning']
|
|
|
|
self.problems[problem_type].append(alert_name)
|
|
self.rule_file.update(self.group_name)
|
|
|
|
def fetch_html_page(self, url):
|
|
return self.rule_file.fetch_html_page(url)
|
|
|
|
def get_oids(self):
|
|
return self.rule_file.oid_list
|
|
|
|
@property
|
|
def error_count(self):
|
|
return len(self.problems['error'])
|
|
|
|
def warning_count(self):
|
|
return len(self.problems['warning'])
|
|
|
|
@property
|
|
def count(self):
|
|
return len(self.rules)
|
|
|
|
|
|
class RuleFile:
|
|
|
|
def __init__(self, parent, file_name, rules, oid_list):
|
|
self.parent = parent
|
|
self.file_name = file_name
|
|
self.rules: Dict[str, Any] = rules
|
|
self.oid_list = oid_list
|
|
self.problems: Set[str] = set()
|
|
self.group: Dict[str, RuleGroup] = {}
|
|
self.alert_names_seen: Set[str] = set()
|
|
self.duplicate_alert_names:List[str] = []
|
|
self.html_cache = HTMLCache()
|
|
|
|
assert 'groups' in self.rules
|
|
self.max_group_name_width = self.get_max_group_name()
|
|
self.load_groups()
|
|
|
|
def update(self, group_name):
|
|
self.problems.add(group_name)
|
|
self.parent.mark_invalid()
|
|
|
|
def fetch_html_page(self, url):
|
|
return self.html_cache.fetch(url)
|
|
|
|
@property
|
|
def group_count(self):
|
|
return len(self.rules['groups'])
|
|
|
|
@property
|
|
def rule_count(self):
|
|
rule_count = 0
|
|
for _group_name, rule_group in self.group.items():
|
|
rule_count += rule_group.count
|
|
return rule_count
|
|
|
|
@property
|
|
def oid_count(self):
|
|
oid_count = 0
|
|
for _group_name, rule_group in self.group.items():
|
|
for _rule_name, rule in rule_group.rules.items():
|
|
if rule.has_oid:
|
|
oid_count += 1
|
|
return oid_count
|
|
|
|
@property
|
|
def group_names(self):
|
|
return self.group.keys()
|
|
|
|
@property
|
|
def problem_count(self):
|
|
return len(self.problems)
|
|
|
|
def get_max_group_name(self):
|
|
group_name_list = []
|
|
for group in self.rules.get('groups'):
|
|
group_name_list.append(group['name'])
|
|
return max([len(g) for g in group_name_list])
|
|
|
|
def load_groups(self):
|
|
sys.stdout.write("\nChecking rule groups")
|
|
for group in self.rules.get('groups'):
|
|
group_name = group['name']
|
|
rules = group['rules']
|
|
self.group[group_name] = RuleGroup(self, group_name, self.max_group_name_width)
|
|
for rule_data in rules:
|
|
if 'alert' in rule_data:
|
|
alert_name = rule_data.get('alert')
|
|
if alert_name in self.alert_names_seen:
|
|
self.duplicate_alert_names.append(alert_name)
|
|
else:
|
|
self.alert_names_seen.add(alert_name)
|
|
self.group[group_name].add_rule(rule_data)
|
|
else:
|
|
# skipped recording rule
|
|
pass
|
|
|
|
def report(self):
|
|
def max_width(item_list: Set[str], min_width: int = 0) -> int:
|
|
return max([len(i) for i in item_list] + [min_width])
|
|
|
|
if not self.problems and not self.duplicate_alert_names:
|
|
print("\nNo problems detected in the rule file")
|
|
return
|
|
|
|
print("\nProblem Report\n")
|
|
|
|
group_width = max_width(self.problems, 5)
|
|
alert_names = set()
|
|
for g in self.problems:
|
|
group = self.group[g]
|
|
alert_names.update(group.problems.get('error', []))
|
|
alert_names.update(group.problems.get('warning', []))
|
|
alert_width = max_width(alert_names, 10)
|
|
|
|
template = " {group:<{group_width}} {severity:<8} {alert_name:<{alert_width}} {description}"
|
|
|
|
print(template.format(
|
|
group="Group",
|
|
group_width=group_width,
|
|
severity="Severity",
|
|
alert_name="Alert Name",
|
|
alert_width=alert_width,
|
|
description="Problem Description"))
|
|
|
|
print(template.format(
|
|
group="-----",
|
|
group_width=group_width,
|
|
severity="--------",
|
|
alert_name="----------",
|
|
alert_width=alert_width,
|
|
description="-------------------"))
|
|
|
|
for group_name in sorted(self.problems):
|
|
group = self.group[group_name]
|
|
rules = group.rules
|
|
for alert_name in group.problems.get('error', []):
|
|
for desc in rules[alert_name].errors:
|
|
print(template.format(
|
|
group=group_name,
|
|
group_width=group_width,
|
|
severity="Error",
|
|
alert_name=alert_name,
|
|
alert_width=alert_width,
|
|
description=desc))
|
|
for alert_name in group.problems.get('warning', []):
|
|
for desc in rules[alert_name].warnings:
|
|
print(template.format(
|
|
group=group_name,
|
|
group_width=group_width,
|
|
severity="Warning",
|
|
alert_name=alert_name,
|
|
alert_width=alert_width,
|
|
description=desc))
|
|
if self.duplicate_alert_names:
|
|
print("Duplicate alert names detected:")
|
|
for a in self.duplicate_alert_names:
|
|
print(f" - {a}")
|
|
|
|
|
|
class UnitTests:
|
|
expected_attrs = [
|
|
'rule_files',
|
|
'tests',
|
|
'evaluation_interval'
|
|
]
|
|
def __init__(self, filename):
|
|
self.filename = filename
|
|
self.unit_test_data: Dict[str, Any] = {}
|
|
self.alert_names_seen: Set[str] = set()
|
|
self.problems: List[str] = []
|
|
self.load()
|
|
|
|
def load(self):
|
|
self.unit_test_data, errs = load_yaml(self.filename)
|
|
if errs:
|
|
print(f"\n\nError in unit tests file: {errs}")
|
|
sys.exit(12)
|
|
|
|
missing_attr = [a for a in UnitTests.expected_attrs if a not in self.unit_test_data.keys()]
|
|
if missing_attr:
|
|
print(f"\nMissing attributes in unit tests: {','.join(missing_attr)}")
|
|
sys.exit(8)
|
|
|
|
def _check_alert_names(self, alert_names: List[str]):
|
|
alerts_tested: Set[str] = set()
|
|
for t in self.unit_test_data.get('tests'):
|
|
test_cases = t.get('alert_rule_test', [])
|
|
if not test_cases:
|
|
continue
|
|
for case in test_cases:
|
|
alertname = case.get('alertname', '')
|
|
if alertname:
|
|
alerts_tested.add(alertname)
|
|
|
|
alerts_defined = set(alert_names)
|
|
self.problems = list(alerts_defined.difference(alerts_tested))
|
|
|
|
def process(self, defined_alert_names: List[str]):
|
|
self._check_alert_names(defined_alert_names)
|
|
|
|
def report(self) -> None:
|
|
|
|
if not self.problems:
|
|
print("\nNo problems detected in unit tests file")
|
|
return
|
|
|
|
print("\nUnit tests are incomplete. Tests missing for the following alerts;")
|
|
for p in self.problems:
|
|
print(f" - {p}")
|
|
|
|
class RuleChecker:
|
|
|
|
def __init__(self, rules_filename: str = None, test_filename: str = None):
|
|
self.rules_filename = rules_filename or ALERTS_FILE
|
|
self.test_filename = test_filename or UNIT_TESTS_FILE
|
|
self.rule_file: Optional[RuleFile] = None
|
|
self.unit_tests: Optional[UnitTests] = None
|
|
self.rule_file_problems: bool = False
|
|
self.errors = {}
|
|
self.warnings = {}
|
|
self.error_count = 0
|
|
self.warning_count = 0
|
|
self.oid_count = 0
|
|
|
|
self.oid_list = self.build_oid_list()
|
|
|
|
def build_oid_list(self) -> List[str]:
|
|
|
|
cmd = shutil.which('snmptranslate')
|
|
if not cmd:
|
|
return []
|
|
|
|
rc, stdout, stderr = run_command(f"{cmd} -Pu -Tz -M ../../snmp:/usr/share/snmp/mibs -m CEPH-MIB")
|
|
if rc != 0:
|
|
return []
|
|
|
|
oid_list: List[str] = []
|
|
for line in stdout[:-1]:
|
|
_label, oid = line.replace('"', '').replace('\t', ' ').split()
|
|
oid_list.append(oid)
|
|
|
|
return oid_list
|
|
|
|
@property
|
|
def status(self):
|
|
if self.rule_file_problems or self.unit_tests.problems:
|
|
return 4
|
|
|
|
return 0
|
|
|
|
def mark_invalid(self):
|
|
self.rule_file_problems = True
|
|
|
|
def summarise_rule_file(self):
|
|
for group_name in self.rule_file.problems:
|
|
group = self.rule_file.group[group_name]
|
|
self.error_count += len(group.problems['error'])
|
|
self.warning_count += len(group.problems['warning'])
|
|
|
|
def ready(self):
|
|
errs: List[str] = []
|
|
ready_state = True
|
|
if not os.path.exists(self.rules_filename):
|
|
errs.append(f"rule file '{self.rules_filename}' not found")
|
|
ready_state = False
|
|
|
|
if not os.path.exists(self.test_filename):
|
|
errs.append(f"test file '{self.test_filename}' not found")
|
|
ready_state = False
|
|
|
|
return ready_state, errs
|
|
|
|
def run(self):
|
|
|
|
ready, errs = self.ready()
|
|
if not ready:
|
|
print("Unable to start:")
|
|
for e in errs:
|
|
print(f"- {e}")
|
|
sys.exit(16)
|
|
|
|
rules, errs = load_yaml(self.rules_filename)
|
|
if errs:
|
|
print(errs)
|
|
sys.exit(12)
|
|
|
|
self.rule_file = RuleFile(self, self.rules_filename, rules, self.oid_list)
|
|
self.summarise_rule_file()
|
|
|
|
self.unit_tests = UnitTests(self.test_filename)
|
|
self.unit_tests.process(self.rule_file.alert_names_seen)
|
|
|
|
def report(self):
|
|
print("\n\nSummary\n")
|
|
print(f"Rule file : {self.rules_filename}")
|
|
print(f"Unit Test file : {self.test_filename}")
|
|
print(f"\nRule groups processed : {self.rule_file.group_count:>3}")
|
|
print(f"Rules processed : {self.rule_file.rule_count:>3}")
|
|
print(f"SNMP OIDs declared : {self.rule_file.oid_count:>3} {'(snmptranslate missing, unable to cross check)' if not self.oid_list else ''}")
|
|
print(f"Rule errors : {self.error_count:>3}")
|
|
print(f"Rule warnings : {self.warning_count:>3}")
|
|
print(f"Rule name duplicates : {len(self.rule_file.duplicate_alert_names):>3}")
|
|
print(f"Unit tests missing : {len(self.unit_tests.problems):>3}")
|
|
|
|
self.rule_file.report()
|
|
self.unit_tests.report()
|
|
|
|
|
|
def main():
|
|
checker = RuleChecker()
|
|
|
|
checker.run()
|
|
checker.report()
|
|
print()
|
|
|
|
sys.exit(checker.status)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|