Merge pull request #23497 from noahdesu/insights

mgr/insights: insights reporting module

Reviewed-by: John Spray <john.spray@redhat.com>
This commit is contained in:
John Spray 2018-09-11 15:56:10 +01:00 committed by GitHub
commit 0551d0e14b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1185 additions and 0 deletions

View File

@ -42,3 +42,4 @@ sensible.
Devicehealth plugin <devicehealth>
Orchestrator CLI plugin <orchestrator_cli>
Rook plugin <rook>
Insights plugin <insights>

52
doc/mgr/insights.rst Normal file
View File

@ -0,0 +1,52 @@
Insights plugin
===============
The insights plugin collects and exposes system information to the Insights Core
data analysis framework. It is intended to replace explicit interrogation of
Ceph CLIs and daemon admin sockets, reducing the API surface that Insights
depends on. The insights reports contains the following:
* **Health reports**. In addition to reporting the current health of the
cluster, the insights module reports a summary of the last 24 hours of health
checks. This feature is important for catching cluster health issues that are
transient and may not be present at the moment the report is generated. Health
checks are deduplicated to avoid unbounded data growth.
* **Crash reports**. A summary of any daemon crashes in the past 24 hours is
included in the insights report. Crashes are reported as the number of crashes
per daemon type (e.g. `ceph-osd`) within the time window. Full details of a
crash may be obtained using the `crash module`_.
* Software version, storage utilization, cluster maps, placement group summary,
monitor status, cluster configuration, and OSD metadata.
Enabling
--------
The *insights* module is enabled with::
ceph mgr module enable insights
Commands
--------
::
ceph insights
Generate the full report.
::
ceph insights prune-health <hours>
Remove historical health data older than <hours>. Passing `0` for <hours> will
clear all health data.
This command is useful for cleaning the health history before automated nightly
reports are generated, which may contain spurious health checks accumulated
while performing system maintenance, or other health checks that have been
resolved. There is no need to prune health data to reclaim storage space;
garbage collection is performed regularly to remove old health data from
persistent storage.
.. _crash module: ../crash

View File

@ -0,0 +1,18 @@
tasks:
- install:
- ceph:
# tests may leave mgrs broken, so don't try and call into them
# to invoke e.g. pg dump during teardown.
wait-for-scrub: false
log-whitelist:
- overall HEALTH_
- \(MGR_DOWN\)
- \(MGR_INSIGHTS_WARNING\)
- \(insights_health_check
- \(PG_
- replacing it with standby
- No standby daemons available
- cephfs_test_runner:
modules:
- tasks.mgr.test_insights

View File

@ -0,0 +1,223 @@
import logging
import json
import datetime
import time
from mgr_test_case import MgrTestCase
log = logging.getLogger(__name__)
UUID = 'd5775432-0742-44a3-a435-45095e32e6b2'
DATEFMT = '%Y-%m-%d %H:%M:%S.%f'
class TestInsights(MgrTestCase):
def setUp(self):
self.setup_mgrs()
self._load_module("insights")
self._load_module("selftest")
self.crash_ids = []
def tearDown(self):
self._clear_crashes()
def _insights(self):
retstr = self.mgr_cluster.mon_manager.raw_cluster_cmd("insights")
return json.loads(retstr)
def _add_crash(self, hours, make_invalid = False):
now = datetime.datetime.utcnow()
timestamp = now - datetime.timedelta(hours = hours)
timestamp = timestamp.strftime(DATEFMT) + 'Z'
crash_id = '_'.join((timestamp, UUID)).replace(' ', '_')
crash = {
'crash_id': crash_id,
'timestamp': timestamp,
}
if make_invalid:
crash["timestamp"] = "not a timestamp"
ret = self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
'crash', 'post', '-i', '-',
stdin=json.dumps(crash)
)
self.crash_ids.append(crash_id)
self.assertEqual(0, ret)
def _clear_crashes(self):
for crash_id in self.crash_ids:
self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
'crash', 'rm', crash_id
)
def _wait_for_health_history_checks(self, *args):
"""Wait for a set of health checks to appear in the health history"""
timeout = datetime.datetime.utcnow() + \
datetime.timedelta(seconds = 15)
while True:
report = self._insights()
missing = False
for check in args:
if check not in report["health"]["history"]["checks"]:
missing = True
break
if not missing:
return
self.assertGreater(timeout,
datetime.datetime.utcnow())
time.sleep(0.25)
def _wait_for_curr_health_cleared(self, check):
timeout = datetime.datetime.utcnow() + \
datetime.timedelta(seconds = 15)
while True:
report = self._insights()
if check not in report["health"]["current"]["checks"]:
return
self.assertGreater(timeout,
datetime.datetime.utcnow())
time.sleep(0.25)
def test_health_history(self):
# use empty health history as starting point
self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
"insights", "prune-health", "0")
report = self._insights()
self.assertFalse(report["health"]["history"]["checks"])
# generate health check history entries. we want to avoid the edge case
# of running these tests at _exactly_ the top of the hour so we can
# explicitly control when hourly work occurs. for this we use the
# current time offset to a half hour.
now = datetime.datetime.utcnow()
now = datetime.datetime(
year = now.year,
month = now.month,
day = now.day,
hour = now.hour,
minute = 30)
check_names = set()
for hours in [-18, -11, -5, -1, 0]:
# change the insight module's perception of "now" ...
self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
"mgr", "self-test", "insights_set_now_offset", str(hours))
# ... to simulate health check arrivals in the past
unique_check_name = "insights_health_check_{}".format(hours)
health_check = {
unique_check_name: {
"severity": "warning",
"summary": "summary",
"detail": ["detail"]
}
}
self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
"mgr", "self-test", "health", "set",
json.dumps(health_check))
check_names.add(unique_check_name)
# and also set the same health check to test deduplication
dupe_check_name = "insights_health_check".format(hours)
health_check = {
dupe_check_name: {
"severity": "warning",
"summary": "summary",
"detail": ["detail"]
}
}
self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
"mgr", "self-test", "health", "set",
json.dumps(health_check))
check_names.add(dupe_check_name)
# wait for the health check to show up in the history report
self._wait_for_health_history_checks(unique_check_name, dupe_check_name)
# clear out the current health checks before moving on
self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
"mgr", "self-test", "health", "clear")
self._wait_for_curr_health_cleared(unique_check_name)
report = self._insights()
for check in check_names:
self.assertIn(check, report["health"]["history"]["checks"])
# restart the manager
active_id = self.mgr_cluster.get_active_id()
self.mgr_cluster.mgr_restart(active_id)
# ensure that at least one of the checks is present after the restart.
# we don't for them all to be present because "earlier" checks may not
# have sat in memory long enough to be flushed.
all_missing = True
report = self._insights()
for check in check_names:
if check in report["health"]["history"]["checks"]:
all_missing = False
break
self.assertFalse(all_missing)
# pruning really removes history
self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
"insights", "prune-health", "0")
report = self._insights()
self.assertFalse(report["health"]["history"]["checks"])
def test_insights_health(self):
"""The insights module reports health checks"""
self._add_crash(1, True) # add invalid crash data
timeout = 10
while timeout > 0:
time.sleep(1)
timeout -= 1
# should observe a health check because it can't read the invalid
# crash data created at the beginning of this test
report = self._insights()
if "MGR_INSIGHTS_WARNING" in report["health"]["current"]["checks"]:
self._clear_crashes()
return
self._clear_crashes()
self.fail("Insights module did not set health check")
pass
def test_schema(self):
"""TODO: assert conformance to a full schema specification?"""
report = self._insights()
for key in ["osd_metadata",
"pg_summary",
"mon_status",
"manager_map",
"service_map",
"mon_map",
"crush_map",
"fs_map",
"osd_tree",
"df",
"osd_dump",
"config",
"health",
"crashes",
"version",
"errors"]:
self.assertIn(key, report)
def test_crash_history(self):
self._clear_crashes()
report = self._insights()
self.assertFalse(report["crashes"]["summary"])
self.assertFalse(report["errors"])
# crashes show up in the report
self._add_crash(1)
report = self._insights()
self.assertTrue(report["crashes"]["summary"])
self.assertFalse(report["errors"])
log.warning("{}".format(json.dumps(report["crashes"], indent=2)))
# handling of comm. error with crash module
self._add_crash(1, True)
report = self._insights()
self.assertFalse(report["crashes"]["summary"])
self.assertTrue(report["errors"])
self._clear_crashes()

View File

@ -1 +1,2 @@
add_subdirectory(dashboard)
add_subdirectory(insights)

View File

@ -0,0 +1,7 @@
set(MGR_INSIGHTS_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-insights-virtualenv)
add_custom_target(mgr-insights-test-venv
COMMAND ${CMAKE_SOURCE_DIR}/src/tools/setup-virtualenv.sh --python=${MGR_PYTHON_EXECUTABLE} ${MGR_INSIGHTS_VIRTUALENV}
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/src/pybind/mgr/insights
COMMENT "insights tests virtualenv is being created")
add_dependencies(tests mgr-insights-test-venv)

View File

@ -0,0 +1,9 @@
from __future__ import absolute_import
import os
if 'UNITTEST' not in os.environ:
from .module import Module
else:
import sys
import mock
sys.modules['ceph_module'] = mock.Mock()

View File

@ -0,0 +1,191 @@
import json
import six
from collections import defaultdict
import datetime
# freq to write cached state to disk
PERSIST_PERIOD = datetime.timedelta(seconds = 10)
# on disk key prefix
HEALTH_HISTORY_KEY_PREFIX = "health_history/"
# apply on offset to "now": used for testing
NOW_OFFSET = None
class HealthEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
class HealthCheckAccumulator(object):
"""
Deuplicated storage of health checks.
"""
def __init__(self, init_checks = None):
# check : severity : { summary, detail }
# summary and detail are deduplicated
self._checks = defaultdict(lambda:
defaultdict(lambda: {
"summary": set(),
"detail": set()
}))
if init_checks:
self._update(init_checks)
def __str__(self):
return "check count {}".format(len(self._checks))
def add(self, checks):
"""
Add health checks to the current state
Returns:
bool: True if the state changed, False otherwise.
"""
changed = False
for check, info in six.iteritems(checks):
# only keep the icky stuff
severity = info["severity"]
if severity == "HEALTH_OK":
continue
summary = info["summary"]["message"]
details = map(lambda d: d["message"], info["detail"])
if self._add_check(check, severity, [summary], details):
changed = True
return changed
def checks(self):
return self._checks
def merge(self, other):
assert isinstance(other, HealthCheckAccumulator)
self._update(other._checks)
def _update(self, checks):
"""Merge checks with same structure. Does not set dirty bit"""
for check in checks:
for severity in checks[check]:
summaries = set(checks[check][severity]["summary"])
details = set(checks[check][severity]["detail"])
self._add_check(check, severity, summaries, details)
def _add_check(self, check, severity, summaries, details):
changed = False
for summary in summaries:
if summary not in self._checks[check][severity]["summary"]:
changed = True
self._checks[check][severity]["summary"].add(summary)
for detail in details:
if detail not in self._checks[check][severity]["detail"]:
changed = True
self._checks[check][severity]["detail"].add(detail)
return changed
class HealthHistorySlot(object):
"""
Manage the life cycle of a health history time slot.
A time slot is a fixed slice of wall clock time (e.g. every hours, from :00
to :59), and all health updates that occur during this time are deduplicated
together. A slot is initially in a clean state, and becomes dirty when a new
health check is observed. The state of a slot should be persisted when
need_flush returns true. Once the state has been flushed, reset the dirty
bit by calling mark_flushed.
"""
def __init__(self, init_health = dict()):
self._checks = HealthCheckAccumulator(init_health.get("checks"))
self._slot = self._curr_slot()
self._next_flush = None
def __str__(self):
return "key {} next flush {} checks {}".format(
self.key(), self._next_flush, self._checks)
def health(self):
return dict(checks = self._checks.checks())
def key(self):
"""Identifer in the persist store"""
return self._key(self._slot)
def expired(self):
"""True if this slot is the current slot, False otherwise"""
return self._slot != self._curr_slot()
def need_flush(self):
"""True if this slot needs to be flushed, False otherwise"""
now = HealthHistorySlot._now()
if self._next_flush is not None:
if self._next_flush <= now or self.expired():
return True
return False
def mark_flushed(self):
"""Reset the dirty bit. Caller persists state"""
assert self._next_flush
self._next_flush = None
def add(self, health):
"""
Add health to the underlying health accumulator. When the slot
transitions from clean to dirty a target flush time is computed.
"""
changed = self._checks.add(health["checks"])
if changed and not self._next_flush:
self._next_flush = HealthHistorySlot._now() + PERSIST_PERIOD
return changed
def merge(self, other):
assert isinstance(other, HealthHistorySlot)
self._checks.merge(other._checks)
@staticmethod
def key_range(hours):
"""Return the time slot keys for the past N hours"""
def inner(curr, hours):
slot = curr - datetime.timedelta(hours = hours)
return HealthHistorySlot._key(slot)
curr = HealthHistorySlot._curr_slot()
return map(lambda i: inner(curr, i), range(hours))
@staticmethod
def curr_key():
"""Key for the current UTC time slot"""
return HealthHistorySlot._key(HealthHistorySlot._curr_slot())
@staticmethod
def key_to_time(key):
"""Return key converted into datetime"""
timestr = key[len(HEALTH_HISTORY_KEY_PREFIX):]
return datetime.datetime.strptime(timestr, "%Y-%m-%d_%H")
@staticmethod
def _key(dt):
"""Key format. Example: health_2018_11_05_00"""
return HEALTH_HISTORY_KEY_PREFIX + dt.strftime("%Y-%m-%d_%H")
@staticmethod
def _now():
"""Control now time for easier testing"""
now = datetime.datetime.utcnow()
if NOW_OFFSET is not None:
now = now + NOW_OFFSET
return now
@staticmethod
def _curr_slot():
"""Slot for the current UTC time"""
dt = HealthHistorySlot._now()
return datetime.datetime(
year = dt.year,
month = dt.month,
day = dt.day,
hour = dt.hour)

View File

@ -0,0 +1,298 @@
import datetime
import json
import re
import threading
import six
from mgr_module import MgrModule, CommandResult
import health as health_util
# hours of crash history to report
CRASH_HISTORY_HOURS = 24
# hours of health history to report
HEALTH_HISTORY_HOURS = 24
# how many hours of health history to keep
HEALTH_RETENTION_HOURS = 30
# health check name for insights health
INSIGHTS_HEALTH_CHECK = "MGR_INSIGHTS_WARNING"
# version tag for persistent data format
ON_DISK_VERSION = 1
class Module(MgrModule):
COMMANDS = [
{
"cmd": "insights",
"desc": "Retreive insights report",
"perm": "r",
"poll": "false",
},
{
'cmd': 'insights prune-health name=hours,type=CephString',
'desc': 'Remove health history older than <hours> hours',
'perm': 'rw',
"poll": "false",
},
]
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self._shutdown = False
self._evt = threading.Event()
# health history tracking
self._pending_health = []
self._health_slot = None
def notify(self, ttype, ident):
"""Queue updates for processing"""
if ttype == "health":
self.log.info("Received health check update {} pending".format(
len(self._pending_health)))
health = json.loads(self.get("health")["json"])
self._pending_health.append(health)
self._evt.set()
def serve(self):
self._health_reset()
while True:
self._evt.wait(health_util.PERSIST_PERIOD.total_seconds())
self._evt.clear()
if self._shutdown:
break
# when the current health slot expires, finalize it by flushing it to
# the store, and initializing a new empty slot.
if self._health_slot.expired():
self.log.info("Health history slot expired {}".format(
self._health_slot))
self._health_maybe_flush()
self._health_reset()
self._health_prune_history(HEALTH_RETENTION_HOURS)
# fold in pending health snapshots and flush
self.log.info("Applying {} health updates to slot {}".format(
len(self._pending_health), self._health_slot))
for health in self._pending_health:
self._health_slot.add(health)
self._pending_health = []
self._health_maybe_flush()
def shutdown(self):
self._shutdown = True
self._evt.set()
def _health_reset(self):
"""Initialize the current health slot
The slot will be initialized with any state found to have already been
persisted, otherwise the slot will start empty.
"""
key = health_util.HealthHistorySlot.curr_key()
data = self.get_store(key)
if data:
init_health = json.loads(data)
self._health_slot = health_util.HealthHistorySlot(init_health)
else:
self._health_slot = health_util.HealthHistorySlot()
self.log.info("Reset curr health slot {}".format(self._health_slot))
def _health_maybe_flush(self):
"""Store the health for the current time slot if needed"""
self.log.info("Maybe flushing slot {} needed {}".format(
self._health_slot, self._health_slot.need_flush()))
if self._health_slot.need_flush():
key = self._health_slot.key()
# build store data entry
slot = self._health_slot.health()
assert "version" not in slot
slot.update(dict(version = ON_DISK_VERSION))
data = json.dumps(slot, cls=health_util.HealthEncoder)
self.log.debug("Storing health key {} data {}".format(
key, json.dumps(slot, indent=2, cls=health_util.HealthEncoder)))
self.set_store(key, data)
self._health_slot.mark_flushed()
def _health_filter(self, f):
"""Filter hourly health reports timestamp"""
matches = filter(
lambda t: f(health_util.HealthHistorySlot.key_to_time(t[0])),
six.iteritems(self.get_store_prefix(health_util.HEALTH_HISTORY_KEY_PREFIX)))
return map(lambda t: t[0], matches)
def _health_prune_history(self, hours):
"""Prune old health entries"""
cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours = hours)
for key in self._health_filter(lambda ts: ts <= cutoff):
self.log.info("Removing old health slot key {}".format(key))
self.set_store(key, None)
def _health_report(self, hours):
"""
Report a consolidated health report for the past N hours.
"""
# roll up the past N hours of health info
collector = health_util.HealthHistorySlot()
keys = health_util.HealthHistorySlot.key_range(hours)
for key in keys:
data = self.get_store(key)
self.log.info("Reporting health key {} found {}".format(
key, bool(data)))
health = json.loads(data) if data else {}
slot = health_util.HealthHistorySlot(health)
collector.merge(slot)
# include history that hasn't yet been flushed
collector.merge(self._health_slot)
return dict(
current = json.loads(self.get("health")["json"]),
history = collector.health()
)
def _version_parse(self, version):
"""
Return the components of a Ceph version string.
This returns nothing when the verison string cannot be parsed into its
constituent components, such as when Ceph has been built with
ENABLE_GIT_VERSION=OFF.
"""
r = "ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)"
m = re.match(r, version)
ver = {} if not m else {
"release": m.group("release"),
"major": m.group("major"),
"minor": m.group("minor")
}
return { k:int(v) for k,v in six.iteritems(ver) }
def _crash_history(self, hours):
"""
Load crash history for the past N hours from the crash module.
"""
params = dict(
prefix = "crash json_report",
hours = hours
)
result = dict(
summary = {},
hours = params["hours"],
)
health_check_details = []
try:
_, _, crashes = self.remote("crash", "handle_command", "", params)
result["summary"] = json.loads(crashes)
except Exception as e:
errmsg = "failed to invoke crash module"
self.log.warning("{}: {}".format(errmsg, str(e)))
health_check_details.append(errmsg)
else:
self.log.debug("Crash module invocation succeeded {}".format(
json.dumps(result["summary"], indent=2)))
return result, health_check_details
def _config_dump(self):
"""Report cluster configuration
This report is the standard `config dump` report. It does not include
configuration defaults; these can be inferred from the version number.
"""
result = CommandResult("")
args = dict(prefix = "config dump", format = "json")
self.send_command(result, "mon", "", json.dumps(args), "")
ret, outb, outs = result.wait()
if ret == 0:
return json.loads(outb), []
else:
self.log.warning("send_command 'config dump' failed. \
ret={}, outs=\"{}\"".format(ret, outs))
return [], ["Failed to read monitor config dump"]
def do_report(self, inbuf, command):
health_check_details = []
report = {}
report.update({
"version": dict(full = self.version,
**self._version_parse(self.version))
})
# crash history
crashes, health_details = self._crash_history(CRASH_HISTORY_HOURS)
report["crashes"] = crashes
health_check_details.extend(health_details)
# health history
report["health"] = self._health_report(HEALTH_HISTORY_HOURS)
# cluster configuration
config, health_details = self._config_dump()
report["config"] = config
health_check_details.extend(health_details)
osd_map = self.get("osd_map")
del osd_map['pg_temp']
report["osd_dump"] = osd_map
report["df"] = self.get("df")
report["osd_tree"] = self.get("osd_map_tree")
report["fs_map"] = self.get("fs_map")
report["crush_map"] = self.get("osd_map_crush")
report["mon_map"] = self.get("mon_map")
report["service_map"] = self.get("service_map")
report["manager_map"] = self.get("mgr_map")
report["mon_status"] = json.loads(self.get("mon_status")["json"])
report["pg_summary"] = self.get("pg_summary")
report["osd_metadata"] = self.get("osd_metadata")
report.update({
"errors": health_check_details
})
if health_check_details:
self.set_health_checks({
INSIGHTS_HEALTH_CHECK: {
"severity": "warning",
"summary": "Generated incomplete Insights report",
"detail": health_check_details
}
})
return 0, json.dumps(report, indent=2, cls=health_util.HealthEncoder), ""
def do_prune_health(self, inbuf, command):
try:
hours = int(command['hours'])
except ValueError:
return errno.EINVAL, '', 'hours argument must be integer'
self._health_prune_history(hours)
return 0, "", ""
def testing_set_now_time_offset(self, hours):
"""
Control what "now" time it is by applying an offset. This is called from
the selftest module to manage testing scenarios related to tracking
health history.
"""
hours = long(hours)
health_util.NOW_OFFSET = datetime.timedelta(hours = hours)
self.log.warning("Setting now time offset {}".format(health_util.NOW_OFFSET))
def handle_command(self, inbuf, command):
if command["prefix"] == "insights":
return self.do_report(inbuf, command)
elif command["prefix"] == "insights prune-health":
return self.do_prune_health(inbuf, command)
else:
raise NotImplementedError(cmd["prefix"])

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
# run from ./ or from ../
: ${MGR_INSIGHTS_VIRTUALENV:=/tmp/mgr-insights-virtualenv}
: ${WITH_PYTHON2:=ON}
: ${WITH_PYTHON3:=ON}
: ${CEPH_BUILD_DIR:=$PWD/.tox}
test -d insights && cd insights
if [ -e tox.ini ]; then
TOX_PATH=`readlink -f tox.ini`
else
TOX_PATH=`readlink -f $(dirname $0)/tox.ini`
fi
# tox.ini will take care of this.
unset PYTHONPATH
export CEPH_BUILD_DIR=$CEPH_BUILD_DIR
source ${MGR_INSIGHTS_VIRTUALENV}/bin/activate
if [ "$WITH_PYTHON2" = "ON" ]; then
ENV_LIST+="py27"
fi
if [ "$WITH_PYTHON3" = "ON" ]; then
ENV_LIST+="py3"
fi
tox -c ${TOX_PATH} -e ${ENV_LIST}

View File

@ -0,0 +1,273 @@
import unittest
import mock
from ..health import *
class HealthChecksTest(unittest.TestCase):
def test_check_accum_empty(self):
# health checks accum initially empty reports empty
h = HealthCheckAccumulator()
self.assertEqual(h.checks(), {})
h = HealthCheckAccumulator({})
self.assertEqual(h.checks(), {})
def _get_init_checks(self):
return HealthCheckAccumulator({
"C0": {
"S0": {
"summary": ["s0", "s1"],
"detail": ("d0", "d1")
}
}
})
def test_check_init(self):
# initialization with lists and tuples is OK
h = self._get_init_checks()
self.assertEqual(h.checks(), {
"C0": {
"S0": {
"summary": set(["s0", "s1"]),
"detail": set(["d0", "d1"])
}
}
})
def _get_merged_checks(self):
h = self._get_init_checks()
h.merge(HealthCheckAccumulator({
"C0": {
"S0": {
"summary": ["s0", "s1", "s2"],
"detail": ("d2",)
},
"S1": {
"summary": ["s0", "s1", "s2"],
"detail": ()
}
},
"C1": {
"S0": {
"summary": [],
"detail": ("d0", "d1", "d2")
}
}
}))
return h
def test_check_merge(self):
# merging combines and de-duplicates
h = self._get_merged_checks()
self.assertEqual(h.checks(), {
"C0": {
"S0": {
"summary": set(["s0", "s1", "s2"]),
"detail": set(["d0", "d1", "d2"])
},
"S1": {
"summary": set(["s0", "s1", "s2"]),
"detail": set([])
}
},
"C1": {
"S0": {
"summary": set([]),
"detail": set(["d0", "d1", "d2"])
}
}
})
def test_check_add_no_change(self):
# returns false when nothing changes
h = self._get_merged_checks()
self.assertFalse(h.add({}))
self.assertFalse(h.add({
"C0": {
"severity": "S0",
"summary": { "message": "s0" },
"detail": []
}
}))
self.assertFalse(h.add({
"C0": {
"severity": "S0",
"summary": { "message": "s1" },
"detail": [{ "message": "d1" }]
}
}))
self.assertFalse(h.add({
"C0": {
"severity": "S0",
"summary": { "message": "s0" },
"detail": [{ "message": "d1" }, { "message": "d2" }]
}
}))
def test_check_add_changed(self):
# new checks report change
h = self._get_merged_checks()
self.assertTrue(h.add({
"C0": {
"severity": "S0",
"summary": { "message": "s3" },
"detail": []
}
}))
self.assertTrue(h.add({
"C0": {
"severity": "S0",
"summary": { "message": "s1" },
"detail": [{ "message": "d4" }]
}
}))
self.assertTrue(h.add({
"C0": {
"severity": "S2",
"summary": { "message": "s0" },
"detail": [{ "message": "d0" }]
}
}))
self.assertTrue(h.add({
"C2": {
"severity": "S0",
"summary": { "message": "s0" },
"detail": [{ "message": "d0" }, { "message": "d1" }]
}
}))
self.assertEqual(h.checks(), {
"C0": {
"S0": {
"summary": set(["s0", "s1", "s2", "s3"]),
"detail": set(["d0", "d1", "d2", "d4"])
},
"S1": {
"summary": set(["s0", "s1", "s2"]),
"detail": set([])
},
"S2": {
"summary": set(["s0"]),
"detail": set(["d0"])
}
},
"C1": {
"S0": {
"summary": set([]),
"detail": set(["d0", "d1", "d2"])
}
},
"C2": {
"S0": {
"summary": set(["s0"]),
"detail": set(["d0", "d1"])
}
}
})
class HealthHistoryTest(unittest.TestCase):
def _now(self):
# return some time truncated at 30 minutes past the hour. this lets us
# fiddle with time offsets without worrying about accidentically landing
# on exactly the top of the hour which is the edge of a time slot for
# tracking health history.
dt = datetime.datetime.utcnow()
return datetime.datetime(
year = dt.year,
month = dt.month,
day = dt.day,
hour = dt.hour,
minute = 30)
def test_empty_slot(self):
now = self._now()
HealthHistorySlot._now = mock.Mock(return_value=now)
h = HealthHistorySlot()
# reports no historical checks
self.assertEqual(h.health(), { "checks": {} })
# an empty slot doesn't need to be flushed
self.assertFalse(h.need_flush())
def test_expires(self):
now = self._now()
HealthHistorySlot._now = mock.Mock(return_value=now)
h = HealthHistorySlot()
self.assertFalse(h.expired())
# an hour from now it would be expired
future = now + datetime.timedelta(hours = 1)
HealthHistorySlot._now = mock.Mock(return_value=future)
self.assertTrue(h.expired())
def test_need_flush(self):
now = self._now()
HealthHistorySlot._now = mock.Mock(return_value=now)
h = HealthHistorySlot()
self.assertFalse(h.need_flush())
self.assertTrue(h.add(dict(checks = {
"C0": {
"severity": "S0",
"summary": { "message": "s0" },
"detail": [{ "message": "d0" }]
}
})))
# no flush needed, yet...
self.assertFalse(h.need_flush())
# after persist period time elapses, a flush is needed
future = now + PERSIST_PERIOD
HealthHistorySlot._now = mock.Mock(return_value=future)
self.assertTrue(h.need_flush())
# mark flush resets
h.mark_flushed()
self.assertFalse(h.need_flush())
def test_need_flush_edge(self):
# test needs flush is true because it has expired, not because it has
# been dirty for the persistence period
dt = datetime.datetime.utcnow()
now = datetime.datetime(
year = dt.year,
month = dt.month,
day = dt.day,
hour = dt.hour,
minute = 59,
second = 59)
HealthHistorySlot._now = mock.Mock(return_value=now)
h = HealthHistorySlot()
self.assertFalse(h.expired())
self.assertFalse(h.need_flush())
# now it is dirty, but it doesn't need a flush
self.assertTrue(h.add(dict(checks = {
"C0": {
"severity": "S0",
"summary": { "message": "s0" },
"detail": [{ "message": "d0" }]
}
})))
self.assertFalse(h.expired())
self.assertFalse(h.need_flush())
# advance time past the hour so it expires, but not past the persistence
# period deadline for the last event that set the dirty bit
self.assertTrue(PERSIST_PERIOD.total_seconds() > 5)
future = now + datetime.timedelta(seconds = 5)
HealthHistorySlot._now = mock.Mock(return_value=future)
self.assertTrue(h.expired())
self.assertTrue(h.need_flush())

View File

@ -0,0 +1,16 @@
[tox]
envlist = py27,py3
skipsdist = true
toxworkdir = {env:CEPH_BUILD_DIR}
minversion = 2.8.1
[testenv]
deps =
pytest
mock
setenv=
UNITTEST = true
py27: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.2
py3: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.3
commands=
{envbindir}/py.test tests/

View File

@ -4,6 +4,7 @@ import threading
import random
import json
import errno
import six
class Module(MgrModule):
@ -70,12 +71,28 @@ class Module(MgrModule):
"desc": "Run another module's self_test() method",
"perm": "rw"
},
{
"cmd": "mgr self-test health set name=checks,type=CephString",
"desc": "Set a health check from a JSON-formatted description.",
"perm": "rw"
},
{
"cmd": "mgr self-test health clear name=checks,type=CephString,n=N,req=False",
"desc": "Clear health checks by name. If no names provided, clear all.",
"perm": "rw"
},
{
"cmd": "mgr self-test insights_set_now_offset name=hours,type=CephString",
"desc": "Set the now time for the insights module.",
"perm": "rw"
},
]
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self._event = threading.Event()
self._workload = None
self._health = {}
def handle_command(self, inbuf, command):
if command['prefix'] == 'mgr self-test run':
@ -113,10 +130,55 @@ class Module(MgrModule):
return -1, '', "Test failed: {0}".format(e.message)
else:
return 0, str(r), "Self-test OK"
elif command['prefix'] == 'mgr self-test health set':
return self._health_set(inbuf, command)
elif command['prefix'] == 'mgr self-test health clear':
return self._health_clear(inbuf, command)
elif command['prefix'] == 'mgr self-test insights_set_now_offset':
return self._insights_set_now_offset(inbuf, command)
else:
return (-errno.EINVAL, '',
"Command not found '{0}'".format(command['prefix']))
def _health_set(self, inbuf, command):
try:
checks = json.loads(command["checks"])
except Exception as e:
return -1, "", "Failed to decode JSON input: {}".format(e.message)
try:
for check, info in six.iteritems(checks):
self._health[check] = {
"severity": str(info["severity"]),
"summary": str(info["summary"]),
"detail": [str(m) for m in info["detail"]]
}
except Exception as e:
return -1, "", "Invalid health check format: {}".format(e.message)
self.set_health_checks(self._health)
return 0, "", ""
def _health_clear(self, inbuf, command):
if "checks" in command:
for check in command["checks"]:
if check in self._health:
del self._health[check]
else:
self._health = dict()
self.set_health_checks(self._health)
return 0, "", ""
def _insights_set_now_offset(self, inbuf, command):
try:
hours = long(command["hours"])
except Exception as e:
return -1, "", "Timestamp must be numeric: {}".format(e.message)
self.remote("insights", "testing_set_now_time_offset", hours)
return 0, "", ""
def _self_test(self):
self.log.info("Running self-test procedure...")

View File

@ -579,6 +579,11 @@ if(WITH_MGR)
list(APPEND tox_tests run-tox-mgr-dashboard)
set(MGR_DASHBOARD_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-dashboard-virtualenv)
list(APPEND env_vars_for_tox_tests MGR_DASHBOARD_VIRTUALENV=${MGR_DASHBOARD_VIRTUALENV})
add_test(NAME run-tox-mgr-insights COMMAND bash ${CMAKE_SOURCE_DIR}/src/pybind/mgr/insights/run-tox.sh)
list(APPEND tox_tests run-tox-mgr-insights)
set(MGR_INSIGHTS_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-insights-virtualenv)
list(APPEND env_vars_for_tox_tests MGR_INSIGHTS_VIRTUALENV=${MGR_INSIGHTS_VIRTUALENV})
endif()
set_property(