diff --git a/doc/mgr/index.rst b/doc/mgr/index.rst index f6ad6412be5..8b3487dd807 100644 --- a/doc/mgr/index.rst +++ b/doc/mgr/index.rst @@ -42,3 +42,4 @@ sensible. Devicehealth plugin Orchestrator CLI plugin Rook plugin + Insights plugin diff --git a/doc/mgr/insights.rst b/doc/mgr/insights.rst new file mode 100644 index 00000000000..b66de3de4e6 --- /dev/null +++ b/doc/mgr/insights.rst @@ -0,0 +1,52 @@ +Insights plugin +=============== + +The insights plugin collects and exposes system information to the Insights Core +data analysis framework. It is intended to replace explicit interrogation of +Ceph CLIs and daemon admin sockets, reducing the API surface that Insights +depends on. The insights reports contains the following: + +* **Health reports**. In addition to reporting the current health of the + cluster, the insights module reports a summary of the last 24 hours of health + checks. This feature is important for catching cluster health issues that are + transient and may not be present at the moment the report is generated. Health + checks are deduplicated to avoid unbounded data growth. + +* **Crash reports**. A summary of any daemon crashes in the past 24 hours is + included in the insights report. Crashes are reported as the number of crashes + per daemon type (e.g. `ceph-osd`) within the time window. Full details of a + crash may be obtained using the `crash module`_. + +* Software version, storage utilization, cluster maps, placement group summary, + monitor status, cluster configuration, and OSD metadata. + +Enabling +-------- + +The *insights* module is enabled with:: + + ceph mgr module enable insights + +Commands +-------- +:: + + ceph insights + +Generate the full report. + +:: + + ceph insights prune-health + +Remove historical health data older than . Passing `0` for will +clear all health data. + +This command is useful for cleaning the health history before automated nightly +reports are generated, which may contain spurious health checks accumulated +while performing system maintenance, or other health checks that have been +resolved. There is no need to prune health data to reclaim storage space; +garbage collection is performed regularly to remove old health data from +persistent storage. + +.. _crash module: ../crash diff --git a/qa/suites/rados/mgr/tasks/insights.yaml b/qa/suites/rados/mgr/tasks/insights.yaml new file mode 100644 index 00000000000..2d5ccb54b27 --- /dev/null +++ b/qa/suites/rados/mgr/tasks/insights.yaml @@ -0,0 +1,18 @@ + +tasks: + - install: + - ceph: + # tests may leave mgrs broken, so don't try and call into them + # to invoke e.g. pg dump during teardown. + wait-for-scrub: false + log-whitelist: + - overall HEALTH_ + - \(MGR_DOWN\) + - \(MGR_INSIGHTS_WARNING\) + - \(insights_health_check + - \(PG_ + - replacing it with standby + - No standby daemons available + - cephfs_test_runner: + modules: + - tasks.mgr.test_insights diff --git a/qa/tasks/mgr/test_insights.py b/qa/tasks/mgr/test_insights.py new file mode 100644 index 00000000000..37fe0a89c88 --- /dev/null +++ b/qa/tasks/mgr/test_insights.py @@ -0,0 +1,223 @@ +import logging +import json +import datetime +import time +from mgr_test_case import MgrTestCase + +log = logging.getLogger(__name__) +UUID = 'd5775432-0742-44a3-a435-45095e32e6b2' +DATEFMT = '%Y-%m-%d %H:%M:%S.%f' + +class TestInsights(MgrTestCase): + def setUp(self): + self.setup_mgrs() + self._load_module("insights") + self._load_module("selftest") + self.crash_ids = [] + + def tearDown(self): + self._clear_crashes() + + def _insights(self): + retstr = self.mgr_cluster.mon_manager.raw_cluster_cmd("insights") + return json.loads(retstr) + + def _add_crash(self, hours, make_invalid = False): + now = datetime.datetime.utcnow() + timestamp = now - datetime.timedelta(hours = hours) + timestamp = timestamp.strftime(DATEFMT) + 'Z' + crash_id = '_'.join((timestamp, UUID)).replace(' ', '_') + crash = { + 'crash_id': crash_id, + 'timestamp': timestamp, + } + if make_invalid: + crash["timestamp"] = "not a timestamp" + + ret = self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + 'crash', 'post', '-i', '-', + stdin=json.dumps(crash) + ) + self.crash_ids.append(crash_id) + self.assertEqual(0, ret) + + def _clear_crashes(self): + for crash_id in self.crash_ids: + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + 'crash', 'rm', crash_id + ) + + def _wait_for_health_history_checks(self, *args): + """Wait for a set of health checks to appear in the health history""" + timeout = datetime.datetime.utcnow() + \ + datetime.timedelta(seconds = 15) + while True: + report = self._insights() + missing = False + for check in args: + if check not in report["health"]["history"]["checks"]: + missing = True + break + if not missing: + return + self.assertGreater(timeout, + datetime.datetime.utcnow()) + time.sleep(0.25) + + def _wait_for_curr_health_cleared(self, check): + timeout = datetime.datetime.utcnow() + \ + datetime.timedelta(seconds = 15) + while True: + report = self._insights() + if check not in report["health"]["current"]["checks"]: + return + self.assertGreater(timeout, + datetime.datetime.utcnow()) + time.sleep(0.25) + + def test_health_history(self): + # use empty health history as starting point + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + "insights", "prune-health", "0") + report = self._insights() + self.assertFalse(report["health"]["history"]["checks"]) + + # generate health check history entries. we want to avoid the edge case + # of running these tests at _exactly_ the top of the hour so we can + # explicitly control when hourly work occurs. for this we use the + # current time offset to a half hour. + now = datetime.datetime.utcnow() + now = datetime.datetime( + year = now.year, + month = now.month, + day = now.day, + hour = now.hour, + minute = 30) + + check_names = set() + for hours in [-18, -11, -5, -1, 0]: + # change the insight module's perception of "now" ... + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + "mgr", "self-test", "insights_set_now_offset", str(hours)) + + # ... to simulate health check arrivals in the past + unique_check_name = "insights_health_check_{}".format(hours) + health_check = { + unique_check_name: { + "severity": "warning", + "summary": "summary", + "detail": ["detail"] + } + } + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + "mgr", "self-test", "health", "set", + json.dumps(health_check)) + + check_names.add(unique_check_name) + + # and also set the same health check to test deduplication + dupe_check_name = "insights_health_check".format(hours) + health_check = { + dupe_check_name: { + "severity": "warning", + "summary": "summary", + "detail": ["detail"] + } + } + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + "mgr", "self-test", "health", "set", + json.dumps(health_check)) + + check_names.add(dupe_check_name) + + # wait for the health check to show up in the history report + self._wait_for_health_history_checks(unique_check_name, dupe_check_name) + + # clear out the current health checks before moving on + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + "mgr", "self-test", "health", "clear") + self._wait_for_curr_health_cleared(unique_check_name) + + report = self._insights() + for check in check_names: + self.assertIn(check, report["health"]["history"]["checks"]) + + # restart the manager + active_id = self.mgr_cluster.get_active_id() + self.mgr_cluster.mgr_restart(active_id) + + # ensure that at least one of the checks is present after the restart. + # we don't for them all to be present because "earlier" checks may not + # have sat in memory long enough to be flushed. + all_missing = True + report = self._insights() + for check in check_names: + if check in report["health"]["history"]["checks"]: + all_missing = False + break + self.assertFalse(all_missing) + + # pruning really removes history + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + "insights", "prune-health", "0") + report = self._insights() + self.assertFalse(report["health"]["history"]["checks"]) + + def test_insights_health(self): + """The insights module reports health checks""" + self._add_crash(1, True) # add invalid crash data + timeout = 10 + while timeout > 0: + time.sleep(1) + timeout -= 1 + # should observe a health check because it can't read the invalid + # crash data created at the beginning of this test + report = self._insights() + if "MGR_INSIGHTS_WARNING" in report["health"]["current"]["checks"]: + self._clear_crashes() + return + self._clear_crashes() + self.fail("Insights module did not set health check") + pass + + def test_schema(self): + """TODO: assert conformance to a full schema specification?""" + report = self._insights() + for key in ["osd_metadata", + "pg_summary", + "mon_status", + "manager_map", + "service_map", + "mon_map", + "crush_map", + "fs_map", + "osd_tree", + "df", + "osd_dump", + "config", + "health", + "crashes", + "version", + "errors"]: + self.assertIn(key, report) + + def test_crash_history(self): + self._clear_crashes() + report = self._insights() + self.assertFalse(report["crashes"]["summary"]) + self.assertFalse(report["errors"]) + + # crashes show up in the report + self._add_crash(1) + report = self._insights() + self.assertTrue(report["crashes"]["summary"]) + self.assertFalse(report["errors"]) + log.warning("{}".format(json.dumps(report["crashes"], indent=2))) + + # handling of comm. error with crash module + self._add_crash(1, True) + report = self._insights() + self.assertFalse(report["crashes"]["summary"]) + self.assertTrue(report["errors"]) + + self._clear_crashes() diff --git a/src/pybind/mgr/CMakeLists.txt b/src/pybind/mgr/CMakeLists.txt index 360a923847e..916c672b9ba 100644 --- a/src/pybind/mgr/CMakeLists.txt +++ b/src/pybind/mgr/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(dashboard) +add_subdirectory(insights) diff --git a/src/pybind/mgr/insights/CMakeLists.txt b/src/pybind/mgr/insights/CMakeLists.txt new file mode 100644 index 00000000000..00722a99581 --- /dev/null +++ b/src/pybind/mgr/insights/CMakeLists.txt @@ -0,0 +1,7 @@ +set(MGR_INSIGHTS_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-insights-virtualenv) + +add_custom_target(mgr-insights-test-venv + COMMAND ${CMAKE_SOURCE_DIR}/src/tools/setup-virtualenv.sh --python=${MGR_PYTHON_EXECUTABLE} ${MGR_INSIGHTS_VIRTUALENV} + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/src/pybind/mgr/insights + COMMENT "insights tests virtualenv is being created") +add_dependencies(tests mgr-insights-test-venv) diff --git a/src/pybind/mgr/insights/__init__.py b/src/pybind/mgr/insights/__init__.py new file mode 100644 index 00000000000..ea61a12fd7e --- /dev/null +++ b/src/pybind/mgr/insights/__init__.py @@ -0,0 +1,9 @@ +from __future__ import absolute_import +import os + +if 'UNITTEST' not in os.environ: + from .module import Module +else: + import sys + import mock + sys.modules['ceph_module'] = mock.Mock() diff --git a/src/pybind/mgr/insights/health.py b/src/pybind/mgr/insights/health.py new file mode 100644 index 00000000000..866c4b69f44 --- /dev/null +++ b/src/pybind/mgr/insights/health.py @@ -0,0 +1,191 @@ +import json +import six +from collections import defaultdict +import datetime + +# freq to write cached state to disk +PERSIST_PERIOD = datetime.timedelta(seconds = 10) +# on disk key prefix +HEALTH_HISTORY_KEY_PREFIX = "health_history/" +# apply on offset to "now": used for testing +NOW_OFFSET = None + +class HealthEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, set): + return list(obj) + return json.JSONEncoder.default(self, obj) + +class HealthCheckAccumulator(object): + """ + Deuplicated storage of health checks. + """ + def __init__(self, init_checks = None): + # check : severity : { summary, detail } + # summary and detail are deduplicated + self._checks = defaultdict(lambda: + defaultdict(lambda: { + "summary": set(), + "detail": set() + })) + + if init_checks: + self._update(init_checks) + + def __str__(self): + return "check count {}".format(len(self._checks)) + + def add(self, checks): + """ + Add health checks to the current state + + Returns: + bool: True if the state changed, False otherwise. + """ + changed = False + + for check, info in six.iteritems(checks): + + # only keep the icky stuff + severity = info["severity"] + if severity == "HEALTH_OK": + continue + + summary = info["summary"]["message"] + details = map(lambda d: d["message"], info["detail"]) + + if self._add_check(check, severity, [summary], details): + changed = True + + return changed + + def checks(self): + return self._checks + + def merge(self, other): + assert isinstance(other, HealthCheckAccumulator) + self._update(other._checks) + + def _update(self, checks): + """Merge checks with same structure. Does not set dirty bit""" + for check in checks: + for severity in checks[check]: + summaries = set(checks[check][severity]["summary"]) + details = set(checks[check][severity]["detail"]) + self._add_check(check, severity, summaries, details) + + def _add_check(self, check, severity, summaries, details): + changed = False + + for summary in summaries: + if summary not in self._checks[check][severity]["summary"]: + changed = True + self._checks[check][severity]["summary"].add(summary) + + for detail in details: + if detail not in self._checks[check][severity]["detail"]: + changed = True + self._checks[check][severity]["detail"].add(detail) + + return changed + +class HealthHistorySlot(object): + """ + Manage the life cycle of a health history time slot. + + A time slot is a fixed slice of wall clock time (e.g. every hours, from :00 + to :59), and all health updates that occur during this time are deduplicated + together. A slot is initially in a clean state, and becomes dirty when a new + health check is observed. The state of a slot should be persisted when + need_flush returns true. Once the state has been flushed, reset the dirty + bit by calling mark_flushed. + """ + def __init__(self, init_health = dict()): + self._checks = HealthCheckAccumulator(init_health.get("checks")) + self._slot = self._curr_slot() + self._next_flush = None + + def __str__(self): + return "key {} next flush {} checks {}".format( + self.key(), self._next_flush, self._checks) + + def health(self): + return dict(checks = self._checks.checks()) + + def key(self): + """Identifer in the persist store""" + return self._key(self._slot) + + def expired(self): + """True if this slot is the current slot, False otherwise""" + return self._slot != self._curr_slot() + + def need_flush(self): + """True if this slot needs to be flushed, False otherwise""" + now = HealthHistorySlot._now() + if self._next_flush is not None: + if self._next_flush <= now or self.expired(): + return True + return False + + def mark_flushed(self): + """Reset the dirty bit. Caller persists state""" + assert self._next_flush + self._next_flush = None + + def add(self, health): + """ + Add health to the underlying health accumulator. When the slot + transitions from clean to dirty a target flush time is computed. + """ + changed = self._checks.add(health["checks"]) + if changed and not self._next_flush: + self._next_flush = HealthHistorySlot._now() + PERSIST_PERIOD + return changed + + def merge(self, other): + assert isinstance(other, HealthHistorySlot) + self._checks.merge(other._checks) + + @staticmethod + def key_range(hours): + """Return the time slot keys for the past N hours""" + def inner(curr, hours): + slot = curr - datetime.timedelta(hours = hours) + return HealthHistorySlot._key(slot) + curr = HealthHistorySlot._curr_slot() + return map(lambda i: inner(curr, i), range(hours)) + + @staticmethod + def curr_key(): + """Key for the current UTC time slot""" + return HealthHistorySlot._key(HealthHistorySlot._curr_slot()) + + @staticmethod + def key_to_time(key): + """Return key converted into datetime""" + timestr = key[len(HEALTH_HISTORY_KEY_PREFIX):] + return datetime.datetime.strptime(timestr, "%Y-%m-%d_%H") + + @staticmethod + def _key(dt): + """Key format. Example: health_2018_11_05_00""" + return HEALTH_HISTORY_KEY_PREFIX + dt.strftime("%Y-%m-%d_%H") + + @staticmethod + def _now(): + """Control now time for easier testing""" + now = datetime.datetime.utcnow() + if NOW_OFFSET is not None: + now = now + NOW_OFFSET + return now + + @staticmethod + def _curr_slot(): + """Slot for the current UTC time""" + dt = HealthHistorySlot._now() + return datetime.datetime( + year = dt.year, + month = dt.month, + day = dt.day, + hour = dt.hour) diff --git a/src/pybind/mgr/insights/module.py b/src/pybind/mgr/insights/module.py new file mode 100644 index 00000000000..eb6e7e2f5a5 --- /dev/null +++ b/src/pybind/mgr/insights/module.py @@ -0,0 +1,298 @@ +import datetime +import json +import re +import threading +import six +from mgr_module import MgrModule, CommandResult +import health as health_util + +# hours of crash history to report +CRASH_HISTORY_HOURS = 24 +# hours of health history to report +HEALTH_HISTORY_HOURS = 24 +# how many hours of health history to keep +HEALTH_RETENTION_HOURS = 30 +# health check name for insights health +INSIGHTS_HEALTH_CHECK = "MGR_INSIGHTS_WARNING" +# version tag for persistent data format +ON_DISK_VERSION = 1 + +class Module(MgrModule): + COMMANDS = [ + { + "cmd": "insights", + "desc": "Retreive insights report", + "perm": "r", + "poll": "false", + }, + { + 'cmd': 'insights prune-health name=hours,type=CephString', + 'desc': 'Remove health history older than hours', + 'perm': 'rw', + "poll": "false", + }, + ] + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + + self._shutdown = False + self._evt = threading.Event() + + # health history tracking + self._pending_health = [] + self._health_slot = None + + def notify(self, ttype, ident): + """Queue updates for processing""" + if ttype == "health": + self.log.info("Received health check update {} pending".format( + len(self._pending_health))) + health = json.loads(self.get("health")["json"]) + self._pending_health.append(health) + self._evt.set() + + def serve(self): + self._health_reset() + while True: + self._evt.wait(health_util.PERSIST_PERIOD.total_seconds()) + self._evt.clear() + if self._shutdown: + break + + # when the current health slot expires, finalize it by flushing it to + # the store, and initializing a new empty slot. + if self._health_slot.expired(): + self.log.info("Health history slot expired {}".format( + self._health_slot)) + self._health_maybe_flush() + self._health_reset() + self._health_prune_history(HEALTH_RETENTION_HOURS) + + # fold in pending health snapshots and flush + self.log.info("Applying {} health updates to slot {}".format( + len(self._pending_health), self._health_slot)) + for health in self._pending_health: + self._health_slot.add(health) + self._pending_health = [] + self._health_maybe_flush() + + def shutdown(self): + self._shutdown = True + self._evt.set() + + def _health_reset(self): + """Initialize the current health slot + + The slot will be initialized with any state found to have already been + persisted, otherwise the slot will start empty. + """ + key = health_util.HealthHistorySlot.curr_key() + data = self.get_store(key) + if data: + init_health = json.loads(data) + self._health_slot = health_util.HealthHistorySlot(init_health) + else: + self._health_slot = health_util.HealthHistorySlot() + self.log.info("Reset curr health slot {}".format(self._health_slot)) + + def _health_maybe_flush(self): + """Store the health for the current time slot if needed""" + + self.log.info("Maybe flushing slot {} needed {}".format( + self._health_slot, self._health_slot.need_flush())) + + if self._health_slot.need_flush(): + key = self._health_slot.key() + + # build store data entry + slot = self._health_slot.health() + assert "version" not in slot + slot.update(dict(version = ON_DISK_VERSION)) + data = json.dumps(slot, cls=health_util.HealthEncoder) + + self.log.debug("Storing health key {} data {}".format( + key, json.dumps(slot, indent=2, cls=health_util.HealthEncoder))) + + self.set_store(key, data) + self._health_slot.mark_flushed() + + def _health_filter(self, f): + """Filter hourly health reports timestamp""" + matches = filter( + lambda t: f(health_util.HealthHistorySlot.key_to_time(t[0])), + six.iteritems(self.get_store_prefix(health_util.HEALTH_HISTORY_KEY_PREFIX))) + return map(lambda t: t[0], matches) + + def _health_prune_history(self, hours): + """Prune old health entries""" + cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours = hours) + for key in self._health_filter(lambda ts: ts <= cutoff): + self.log.info("Removing old health slot key {}".format(key)) + self.set_store(key, None) + + def _health_report(self, hours): + """ + Report a consolidated health report for the past N hours. + """ + # roll up the past N hours of health info + collector = health_util.HealthHistorySlot() + keys = health_util.HealthHistorySlot.key_range(hours) + for key in keys: + data = self.get_store(key) + self.log.info("Reporting health key {} found {}".format( + key, bool(data))) + health = json.loads(data) if data else {} + slot = health_util.HealthHistorySlot(health) + collector.merge(slot) + + # include history that hasn't yet been flushed + collector.merge(self._health_slot) + + return dict( + current = json.loads(self.get("health")["json"]), + history = collector.health() + ) + + def _version_parse(self, version): + """ + Return the components of a Ceph version string. + + This returns nothing when the verison string cannot be parsed into its + constituent components, such as when Ceph has been built with + ENABLE_GIT_VERSION=OFF. + """ + r = "ceph version (?P\d+)\.(?P\d+)\.(?P\d+)" + m = re.match(r, version) + ver = {} if not m else { + "release": m.group("release"), + "major": m.group("major"), + "minor": m.group("minor") + } + return { k:int(v) for k,v in six.iteritems(ver) } + + def _crash_history(self, hours): + """ + Load crash history for the past N hours from the crash module. + """ + params = dict( + prefix = "crash json_report", + hours = hours + ) + + result = dict( + summary = {}, + hours = params["hours"], + ) + + health_check_details = [] + + try: + _, _, crashes = self.remote("crash", "handle_command", "", params) + result["summary"] = json.loads(crashes) + except Exception as e: + errmsg = "failed to invoke crash module" + self.log.warning("{}: {}".format(errmsg, str(e))) + health_check_details.append(errmsg) + else: + self.log.debug("Crash module invocation succeeded {}".format( + json.dumps(result["summary"], indent=2))) + + return result, health_check_details + + def _config_dump(self): + """Report cluster configuration + + This report is the standard `config dump` report. It does not include + configuration defaults; these can be inferred from the version number. + """ + result = CommandResult("") + args = dict(prefix = "config dump", format = "json") + self.send_command(result, "mon", "", json.dumps(args), "") + ret, outb, outs = result.wait() + if ret == 0: + return json.loads(outb), [] + else: + self.log.warning("send_command 'config dump' failed. \ + ret={}, outs=\"{}\"".format(ret, outs)) + return [], ["Failed to read monitor config dump"] + + def do_report(self, inbuf, command): + health_check_details = [] + report = {} + + report.update({ + "version": dict(full = self.version, + **self._version_parse(self.version)) + }) + + # crash history + crashes, health_details = self._crash_history(CRASH_HISTORY_HOURS) + report["crashes"] = crashes + health_check_details.extend(health_details) + + # health history + report["health"] = self._health_report(HEALTH_HISTORY_HOURS) + + # cluster configuration + config, health_details = self._config_dump() + report["config"] = config + health_check_details.extend(health_details) + + osd_map = self.get("osd_map") + del osd_map['pg_temp'] + report["osd_dump"] = osd_map + + report["df"] = self.get("df") + report["osd_tree"] = self.get("osd_map_tree") + report["fs_map"] = self.get("fs_map") + report["crush_map"] = self.get("osd_map_crush") + report["mon_map"] = self.get("mon_map") + report["service_map"] = self.get("service_map") + report["manager_map"] = self.get("mgr_map") + report["mon_status"] = json.loads(self.get("mon_status")["json"]) + report["pg_summary"] = self.get("pg_summary") + report["osd_metadata"] = self.get("osd_metadata") + + report.update({ + "errors": health_check_details + }) + + if health_check_details: + self.set_health_checks({ + INSIGHTS_HEALTH_CHECK: { + "severity": "warning", + "summary": "Generated incomplete Insights report", + "detail": health_check_details + } + }) + + return 0, json.dumps(report, indent=2, cls=health_util.HealthEncoder), "" + + def do_prune_health(self, inbuf, command): + try: + hours = int(command['hours']) + except ValueError: + return errno.EINVAL, '', 'hours argument must be integer' + + self._health_prune_history(hours) + + return 0, "", "" + + def testing_set_now_time_offset(self, hours): + """ + Control what "now" time it is by applying an offset. This is called from + the selftest module to manage testing scenarios related to tracking + health history. + """ + hours = long(hours) + health_util.NOW_OFFSET = datetime.timedelta(hours = hours) + self.log.warning("Setting now time offset {}".format(health_util.NOW_OFFSET)) + + def handle_command(self, inbuf, command): + if command["prefix"] == "insights": + return self.do_report(inbuf, command) + elif command["prefix"] == "insights prune-health": + return self.do_prune_health(inbuf, command) + else: + raise NotImplementedError(cmd["prefix"]) diff --git a/src/pybind/mgr/insights/run-tox.sh b/src/pybind/mgr/insights/run-tox.sh new file mode 100644 index 00000000000..fb7f755142c --- /dev/null +++ b/src/pybind/mgr/insights/run-tox.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +# run from ./ or from ../ +: ${MGR_INSIGHTS_VIRTUALENV:=/tmp/mgr-insights-virtualenv} +: ${WITH_PYTHON2:=ON} +: ${WITH_PYTHON3:=ON} +: ${CEPH_BUILD_DIR:=$PWD/.tox} +test -d insights && cd insights + +if [ -e tox.ini ]; then + TOX_PATH=`readlink -f tox.ini` +else + TOX_PATH=`readlink -f $(dirname $0)/tox.ini` +fi + +# tox.ini will take care of this. +unset PYTHONPATH +export CEPH_BUILD_DIR=$CEPH_BUILD_DIR + +source ${MGR_INSIGHTS_VIRTUALENV}/bin/activate + +if [ "$WITH_PYTHON2" = "ON" ]; then + ENV_LIST+="py27" +fi +if [ "$WITH_PYTHON3" = "ON" ]; then + ENV_LIST+="py3" +fi + +tox -c ${TOX_PATH} -e ${ENV_LIST} diff --git a/src/pybind/mgr/insights/tests/__init__.py b/src/pybind/mgr/insights/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/pybind/mgr/insights/tests/test_health.py b/src/pybind/mgr/insights/tests/test_health.py new file mode 100644 index 00000000000..41c3a5c65c5 --- /dev/null +++ b/src/pybind/mgr/insights/tests/test_health.py @@ -0,0 +1,273 @@ +import unittest +import mock +from ..health import * + +class HealthChecksTest(unittest.TestCase): + def test_check_accum_empty(self): + # health checks accum initially empty reports empty + h = HealthCheckAccumulator() + self.assertEqual(h.checks(), {}) + + h = HealthCheckAccumulator({}) + self.assertEqual(h.checks(), {}) + + def _get_init_checks(self): + return HealthCheckAccumulator({ + "C0": { + "S0": { + "summary": ["s0", "s1"], + "detail": ("d0", "d1") + } + } + }) + + def test_check_init(self): + # initialization with lists and tuples is OK + h = self._get_init_checks() + self.assertEqual(h.checks(), { + "C0": { + "S0": { + "summary": set(["s0", "s1"]), + "detail": set(["d0", "d1"]) + } + } + }) + + def _get_merged_checks(self): + h = self._get_init_checks() + h.merge(HealthCheckAccumulator({ + "C0": { + "S0": { + "summary": ["s0", "s1", "s2"], + "detail": ("d2",) + }, + "S1": { + "summary": ["s0", "s1", "s2"], + "detail": () + } + }, + "C1": { + "S0": { + "summary": [], + "detail": ("d0", "d1", "d2") + } + } + })) + return h + + def test_check_merge(self): + # merging combines and de-duplicates + h = self._get_merged_checks() + self.assertEqual(h.checks(), { + "C0": { + "S0": { + "summary": set(["s0", "s1", "s2"]), + "detail": set(["d0", "d1", "d2"]) + }, + "S1": { + "summary": set(["s0", "s1", "s2"]), + "detail": set([]) + } + }, + "C1": { + "S0": { + "summary": set([]), + "detail": set(["d0", "d1", "d2"]) + } + } + }) + + def test_check_add_no_change(self): + # returns false when nothing changes + h = self._get_merged_checks() + + self.assertFalse(h.add({})) + + self.assertFalse(h.add({ + "C0": { + "severity": "S0", + "summary": { "message": "s0" }, + "detail": [] + } + })) + + self.assertFalse(h.add({ + "C0": { + "severity": "S0", + "summary": { "message": "s1" }, + "detail": [{ "message": "d1" }] + } + })) + + self.assertFalse(h.add({ + "C0": { + "severity": "S0", + "summary": { "message": "s0" }, + "detail": [{ "message": "d1" }, { "message": "d2" }] + } + })) + + def test_check_add_changed(self): + # new checks report change + h = self._get_merged_checks() + + self.assertTrue(h.add({ + "C0": { + "severity": "S0", + "summary": { "message": "s3" }, + "detail": [] + } + })) + + self.assertTrue(h.add({ + "C0": { + "severity": "S0", + "summary": { "message": "s1" }, + "detail": [{ "message": "d4" }] + } + })) + + self.assertTrue(h.add({ + "C0": { + "severity": "S2", + "summary": { "message": "s0" }, + "detail": [{ "message": "d0" }] + } + })) + + self.assertTrue(h.add({ + "C2": { + "severity": "S0", + "summary": { "message": "s0" }, + "detail": [{ "message": "d0" }, { "message": "d1" }] + } + })) + + self.assertEqual(h.checks(), { + "C0": { + "S0": { + "summary": set(["s0", "s1", "s2", "s3"]), + "detail": set(["d0", "d1", "d2", "d4"]) + }, + "S1": { + "summary": set(["s0", "s1", "s2"]), + "detail": set([]) + }, + "S2": { + "summary": set(["s0"]), + "detail": set(["d0"]) + } + }, + "C1": { + "S0": { + "summary": set([]), + "detail": set(["d0", "d1", "d2"]) + } + }, + "C2": { + "S0": { + "summary": set(["s0"]), + "detail": set(["d0", "d1"]) + } + } + }) + +class HealthHistoryTest(unittest.TestCase): + def _now(self): + # return some time truncated at 30 minutes past the hour. this lets us + # fiddle with time offsets without worrying about accidentically landing + # on exactly the top of the hour which is the edge of a time slot for + # tracking health history. + dt = datetime.datetime.utcnow() + return datetime.datetime( + year = dt.year, + month = dt.month, + day = dt.day, + hour = dt.hour, + minute = 30) + + def test_empty_slot(self): + now = self._now() + + HealthHistorySlot._now = mock.Mock(return_value=now) + h = HealthHistorySlot() + + # reports no historical checks + self.assertEqual(h.health(), { "checks": {} }) + + # an empty slot doesn't need to be flushed + self.assertFalse(h.need_flush()) + + def test_expires(self): + now = self._now() + + HealthHistorySlot._now = mock.Mock(return_value=now) + h = HealthHistorySlot() + self.assertFalse(h.expired()) + + # an hour from now it would be expired + future = now + datetime.timedelta(hours = 1) + HealthHistorySlot._now = mock.Mock(return_value=future) + self.assertTrue(h.expired()) + + def test_need_flush(self): + now = self._now() + + HealthHistorySlot._now = mock.Mock(return_value=now) + h = HealthHistorySlot() + self.assertFalse(h.need_flush()) + + self.assertTrue(h.add(dict(checks = { + "C0": { + "severity": "S0", + "summary": { "message": "s0" }, + "detail": [{ "message": "d0" }] + } + }))) + # no flush needed, yet... + self.assertFalse(h.need_flush()) + + # after persist period time elapses, a flush is needed + future = now + PERSIST_PERIOD + HealthHistorySlot._now = mock.Mock(return_value=future) + self.assertTrue(h.need_flush()) + + # mark flush resets + h.mark_flushed() + self.assertFalse(h.need_flush()) + + def test_need_flush_edge(self): + # test needs flush is true because it has expired, not because it has + # been dirty for the persistence period + dt = datetime.datetime.utcnow() + now = datetime.datetime( + year = dt.year, + month = dt.month, + day = dt.day, + hour = dt.hour, + minute = 59, + second = 59) + HealthHistorySlot._now = mock.Mock(return_value=now) + h = HealthHistorySlot() + self.assertFalse(h.expired()) + self.assertFalse(h.need_flush()) + + # now it is dirty, but it doesn't need a flush + self.assertTrue(h.add(dict(checks = { + "C0": { + "severity": "S0", + "summary": { "message": "s0" }, + "detail": [{ "message": "d0" }] + } + }))) + self.assertFalse(h.expired()) + self.assertFalse(h.need_flush()) + + # advance time past the hour so it expires, but not past the persistence + # period deadline for the last event that set the dirty bit + self.assertTrue(PERSIST_PERIOD.total_seconds() > 5) + future = now + datetime.timedelta(seconds = 5) + HealthHistorySlot._now = mock.Mock(return_value=future) + + self.assertTrue(h.expired()) + self.assertTrue(h.need_flush()) diff --git a/src/pybind/mgr/insights/tox.ini b/src/pybind/mgr/insights/tox.ini new file mode 100644 index 00000000000..989a8bc3cc5 --- /dev/null +++ b/src/pybind/mgr/insights/tox.ini @@ -0,0 +1,16 @@ +[tox] +envlist = py27,py3 +skipsdist = true +toxworkdir = {env:CEPH_BUILD_DIR} +minversion = 2.8.1 + +[testenv] +deps = + pytest + mock +setenv= + UNITTEST = true + py27: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.2 + py3: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.3 +commands= + {envbindir}/py.test tests/ diff --git a/src/pybind/mgr/selftest/module.py b/src/pybind/mgr/selftest/module.py index 794c0cc61ac..3f48ad180eb 100644 --- a/src/pybind/mgr/selftest/module.py +++ b/src/pybind/mgr/selftest/module.py @@ -4,6 +4,7 @@ import threading import random import json import errno +import six class Module(MgrModule): @@ -70,12 +71,28 @@ class Module(MgrModule): "desc": "Run another module's self_test() method", "perm": "rw" }, + { + "cmd": "mgr self-test health set name=checks,type=CephString", + "desc": "Set a health check from a JSON-formatted description.", + "perm": "rw" + }, + { + "cmd": "mgr self-test health clear name=checks,type=CephString,n=N,req=False", + "desc": "Clear health checks by name. If no names provided, clear all.", + "perm": "rw" + }, + { + "cmd": "mgr self-test insights_set_now_offset name=hours,type=CephString", + "desc": "Set the now time for the insights module.", + "perm": "rw" + }, ] def __init__(self, *args, **kwargs): super(Module, self).__init__(*args, **kwargs) self._event = threading.Event() self._workload = None + self._health = {} def handle_command(self, inbuf, command): if command['prefix'] == 'mgr self-test run': @@ -113,10 +130,55 @@ class Module(MgrModule): return -1, '', "Test failed: {0}".format(e.message) else: return 0, str(r), "Self-test OK" + elif command['prefix'] == 'mgr self-test health set': + return self._health_set(inbuf, command) + elif command['prefix'] == 'mgr self-test health clear': + return self._health_clear(inbuf, command) + elif command['prefix'] == 'mgr self-test insights_set_now_offset': + return self._insights_set_now_offset(inbuf, command) else: return (-errno.EINVAL, '', "Command not found '{0}'".format(command['prefix'])) + def _health_set(self, inbuf, command): + try: + checks = json.loads(command["checks"]) + except Exception as e: + return -1, "", "Failed to decode JSON input: {}".format(e.message) + + try: + for check, info in six.iteritems(checks): + self._health[check] = { + "severity": str(info["severity"]), + "summary": str(info["summary"]), + "detail": [str(m) for m in info["detail"]] + } + except Exception as e: + return -1, "", "Invalid health check format: {}".format(e.message) + + self.set_health_checks(self._health) + return 0, "", "" + + def _health_clear(self, inbuf, command): + if "checks" in command: + for check in command["checks"]: + if check in self._health: + del self._health[check] + else: + self._health = dict() + + self.set_health_checks(self._health) + return 0, "", "" + + def _insights_set_now_offset(self, inbuf, command): + try: + hours = long(command["hours"]) + except Exception as e: + return -1, "", "Timestamp must be numeric: {}".format(e.message) + + self.remote("insights", "testing_set_now_time_offset", hours) + return 0, "", "" + def _self_test(self): self.log.info("Running self-test procedure...") diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 9a1cfe2c0a6..89337178940 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -579,6 +579,11 @@ if(WITH_MGR) list(APPEND tox_tests run-tox-mgr-dashboard) set(MGR_DASHBOARD_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-dashboard-virtualenv) list(APPEND env_vars_for_tox_tests MGR_DASHBOARD_VIRTUALENV=${MGR_DASHBOARD_VIRTUALENV}) + + add_test(NAME run-tox-mgr-insights COMMAND bash ${CMAKE_SOURCE_DIR}/src/pybind/mgr/insights/run-tox.sh) + list(APPEND tox_tests run-tox-mgr-insights) + set(MGR_INSIGHTS_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-insights-virtualenv) + list(APPEND env_vars_for_tox_tests MGR_INSIGHTS_VIRTUALENV=${MGR_INSIGHTS_VIRTUALENV}) endif() set_property(