mirror of
https://github.com/prometheus/alertmanager
synced 2025-03-03 02:07:39 +00:00
This allows for use cases such as kicking off shell scripts, logging notifications, or anything else the alertmanager doesn't directly support.
411 lines
12 KiB
Go
411 lines
12 KiB
Go
// Copyright 2013 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package manager
|
|
|
|
import (
|
|
"container/heap"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/log"
|
|
)
|
|
|
|
// AlertManager stores Alerts and removes them upon expiry.
|
|
type AlertManager interface {
|
|
// Ingests a new alert entry into the store. If an alert with the same
|
|
// fingerprint already exists, it only updates the existing entry's metadata.
|
|
Receive(Alerts)
|
|
// Retrieves all alerts from the store that match the provided Filters.
|
|
GetAll(Filters) AlertAggregates
|
|
// Sets the AggregationRules to associate with alerts.
|
|
SetAggregationRules(AggregationRules)
|
|
// Runs the AlertManager dispatcher loop.
|
|
Run()
|
|
}
|
|
|
|
type AggregationRules []*AggregationRule
|
|
|
|
// AggregationRule creates and manages the scope for received events.
|
|
type AggregationRule struct {
|
|
Filters Filters
|
|
RepeatRate time.Duration
|
|
NotificationConfigName string
|
|
}
|
|
|
|
// Returns whether a given AggregationRule matches an Alert.
|
|
func (r *AggregationRule) Handles(l *Alert) bool {
|
|
return r.Filters.Handles(l.Labels)
|
|
}
|
|
|
|
// An AlertAggregate tracks the latest alert received for a given alert
|
|
// fingerprint and some metadata about the alert.
|
|
type AlertAggregate struct {
|
|
Alert *Alert
|
|
Rule *AggregationRule
|
|
|
|
// When was this AggregationInstance created?
|
|
Created time.Time
|
|
// When was the last refresh received into this AlertAggregate?
|
|
LastRefreshed time.Time
|
|
// When was the last notification sent out for this AlertAggregate?
|
|
LastNotification time.Time
|
|
// When should the next notification be sent according to the current Rule's
|
|
// RepeatRate?
|
|
NextNotification time.Time
|
|
}
|
|
|
|
// Ingests a received Alert into this AlertAggregate and updates metadata.
|
|
func (agg *AlertAggregate) Ingest(a *Alert) {
|
|
agg.Alert = a
|
|
agg.LastRefreshed = time.Now()
|
|
}
|
|
|
|
type AlertAggregates []*AlertAggregate
|
|
|
|
// Helper type for managing a heap based on LastRefreshed time.
|
|
type aggregatesByLastRefreshed struct {
|
|
AlertAggregates
|
|
}
|
|
|
|
// Helper type for managing a heap based on NextNotification time.
|
|
type aggregatesByNextNotification struct {
|
|
AlertAggregates
|
|
}
|
|
|
|
// Methods implementing heap.Interface.
|
|
func (aggs AlertAggregates) Len() int {
|
|
return len(aggs)
|
|
}
|
|
|
|
func (aggs aggregatesByLastRefreshed) Less(i, j int) bool {
|
|
return aggs.AlertAggregates[i].LastRefreshed.Before(aggs.AlertAggregates[j].LastRefreshed)
|
|
}
|
|
|
|
func (aggs aggregatesByNextNotification) Less(i, j int) bool {
|
|
return aggs.AlertAggregates[i].NextNotification.Before(aggs.AlertAggregates[j].NextNotification)
|
|
}
|
|
|
|
// rebuildFrom rebuilds the aggregatesByNextNotification index from a provided
|
|
// authoritative AlertAggregates slice.
|
|
func (aggs *aggregatesByNextNotification) rebuildFrom(aa AlertAggregates) {
|
|
aggs.AlertAggregates = aggs.AlertAggregates[:0]
|
|
for _, a := range aa {
|
|
aggs.Push(a)
|
|
}
|
|
}
|
|
|
|
func (aggs AlertAggregates) Swap(i, j int) {
|
|
aggs[i], aggs[j] = aggs[j], aggs[i]
|
|
}
|
|
|
|
func (aggs *AlertAggregates) Push(agg interface{}) {
|
|
*aggs = append(*aggs, agg.(*AlertAggregate))
|
|
}
|
|
|
|
func (aggs *AlertAggregates) Pop() interface{} {
|
|
old := *aggs
|
|
n := len(old)
|
|
item := old[n-1]
|
|
*aggs = old[:n-1]
|
|
return item
|
|
}
|
|
|
|
// memoryAlertManager implements the AlertManager interface and only keeps
|
|
// state in memory.
|
|
type memoryAlertManager struct {
|
|
// The minimum interval for alert refreshes before being purged.
|
|
minRefreshInterval time.Duration
|
|
// Inhibitor for filtering out inhibited alerts.
|
|
inhibitor *Inhibitor
|
|
// Silencer for filtering out silenced alerts.
|
|
silencer *Silencer
|
|
// Notifier for dispatching notifications.
|
|
notifier Notifier
|
|
|
|
// Mutex protecting all fields below.
|
|
mu sync.Mutex
|
|
// Currently loaded set of AggregationRules.
|
|
rules AggregationRules
|
|
// Main AlertAggregates index by fingerprint.
|
|
aggregates map[AlertFingerprint]*AlertAggregate
|
|
// Secondary AlertAggregates index by LastRefreshed time.
|
|
aggregatesByLastRefreshed aggregatesByLastRefreshed
|
|
// Secondary AlertAggregates index by NextNotification time.
|
|
aggregatesByNextNotification aggregatesByNextNotification
|
|
// Cache of the last result of computing uninhibited/unsilenced alerts.
|
|
filteredAlerts AlertLabelSets
|
|
// Tracks whether a change has occurred that requires a recomputation of
|
|
// notification outputs.
|
|
needsNotificationRefresh bool
|
|
}
|
|
|
|
// Options for constructing a memoryAlertManager.
|
|
type MemoryAlertManagerOptions struct {
|
|
// Inhibitor for filtering out inhibited alerts.
|
|
Inhibitor *Inhibitor
|
|
// Silencer for filtering out silenced alerts.
|
|
Silencer *Silencer
|
|
// Notifier for dispatching notifications.
|
|
Notifier Notifier
|
|
// The minimum interval for alert refreshes before being purged.
|
|
MinRefreshInterval time.Duration
|
|
}
|
|
|
|
// Constructs a new memoryAlertManager.
|
|
func NewMemoryAlertManager(o *MemoryAlertManagerOptions) AlertManager {
|
|
return &memoryAlertManager{
|
|
aggregates: make(map[AlertFingerprint]*AlertAggregate),
|
|
|
|
minRefreshInterval: o.MinRefreshInterval,
|
|
inhibitor: o.Inhibitor,
|
|
silencer: o.Silencer,
|
|
notifier: o.Notifier,
|
|
}
|
|
}
|
|
|
|
// Receive and ingest a new list of alert messages (e.g. from the web API).
|
|
func (s *memoryAlertManager) Receive(as Alerts) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
for _, a := range as {
|
|
s.ingest(a)
|
|
}
|
|
}
|
|
|
|
// Ingests an alert into the memoryAlertManager and creates a new
|
|
// AggregationInstance for it, if necessary.
|
|
func (s *memoryAlertManager) ingest(a *Alert) {
|
|
fp := a.Fingerprint()
|
|
agg, ok := s.aggregates[fp]
|
|
if !ok {
|
|
agg = &AlertAggregate{
|
|
Created: time.Now(),
|
|
}
|
|
agg.Ingest(a)
|
|
|
|
for _, r := range s.rules {
|
|
if r.Handles(agg.Alert) {
|
|
agg.Rule = r
|
|
break
|
|
}
|
|
}
|
|
|
|
s.aggregates[fp] = agg
|
|
heap.Push(&s.aggregatesByLastRefreshed, agg)
|
|
heap.Push(&s.aggregatesByNextNotification, agg)
|
|
|
|
s.needsNotificationRefresh = true
|
|
} else {
|
|
agg.Ingest(a)
|
|
heap.Init(&s.aggregatesByLastRefreshed)
|
|
}
|
|
}
|
|
|
|
// Get all AlertAggregates that match a given set of Filters.
|
|
func (s memoryAlertManager) GetAll(f Filters) AlertAggregates {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
aggs := make(AlertAggregates, 0, len(s.aggregates))
|
|
for _, agg := range s.aggregates {
|
|
if f.Handles(agg.Alert.Labels) {
|
|
// Make a deep copy of the AggregationRule so we can safely pass it to the
|
|
// outside.
|
|
aggCopy := *agg
|
|
if agg.Rule != nil {
|
|
rule := *agg.Rule
|
|
aggCopy.Rule = &rule
|
|
}
|
|
alert := *agg.Alert
|
|
aggCopy.Alert = &alert
|
|
|
|
aggs = append(aggs, &aggCopy)
|
|
}
|
|
}
|
|
return aggs
|
|
}
|
|
|
|
// Replace the current set of loaded AggregationRules by another.
|
|
func (s *memoryAlertManager) SetAggregationRules(rules AggregationRules) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
log.Infof("Replacing aggregator rules (old: %d, new: %d)...", len(s.rules), len(rules))
|
|
s.rules = rules
|
|
|
|
// Reassign AlertAggregates to the first new matching rule, set the rule to
|
|
// nil if there is no matching rule.
|
|
for _, agg := range s.aggregates {
|
|
agg.Rule = nil
|
|
|
|
for _, r := range s.rules {
|
|
if r.Handles(agg.Alert) {
|
|
agg.Rule = r
|
|
agg.NextNotification = agg.LastNotification.Add(r.RepeatRate)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
heap.Init(&s.aggregatesByNextNotification)
|
|
s.needsNotificationRefresh = true
|
|
}
|
|
|
|
// Check for any expired AlertAggregates and remove them from all indexes.
|
|
func (s *memoryAlertManager) removeExpiredAggregates() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
// This loop is interrupted if either the heap is empty or only non-expired
|
|
// aggregates remain in the heap.
|
|
for {
|
|
if len(s.aggregatesByLastRefreshed.AlertAggregates) == 0 {
|
|
break
|
|
}
|
|
|
|
agg := heap.Pop(&s.aggregatesByLastRefreshed).(*AlertAggregate)
|
|
|
|
if time.Since(agg.LastRefreshed) > s.minRefreshInterval {
|
|
delete(s.aggregates, agg.Alert.Fingerprint())
|
|
s.notifier.QueueNotification(agg.Alert, notificationOpResolve, agg.Rule.NotificationConfigName)
|
|
s.needsNotificationRefresh = true
|
|
} else {
|
|
heap.Push(&s.aggregatesByLastRefreshed, agg)
|
|
break
|
|
}
|
|
}
|
|
|
|
if s.needsNotificationRefresh {
|
|
s.aggregatesByNextNotification.rebuildFrom(s.aggregatesByLastRefreshed.AlertAggregates)
|
|
}
|
|
}
|
|
|
|
// Check whether one of the filtered (uninhibited, unsilenced) alerts should
|
|
// trigger a new notification.
|
|
func (s *memoryAlertManager) checkNotificationRepeats() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
f := s.filteredLabelSets(true)
|
|
for _, agg := range s.aggregatesByNextNotification.AlertAggregates {
|
|
for _, fl := range f {
|
|
if agg.Alert.Labels.Equal(fl) && agg.NextNotification.Before(now) {
|
|
s.needsNotificationRefresh = true
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Returns all active AlertLabelSets that are neither inhibited nor silenced.
|
|
func (s *memoryAlertManager) filteredLabelSets(useCache bool) AlertLabelSets {
|
|
if useCache && s.filteredAlerts != nil {
|
|
return s.filteredAlerts
|
|
}
|
|
|
|
l := make(AlertLabelSets, 0, len(s.aggregates))
|
|
for _, agg := range s.aggregates {
|
|
l = append(l, agg.Alert.Labels)
|
|
}
|
|
|
|
l = s.inhibitor.Filter(l)
|
|
s.filteredAlerts = s.silencer.Filter(l)
|
|
return s.filteredAlerts
|
|
}
|
|
|
|
// Recomputes all currently uninhibited/unsilenced alerts and queues
|
|
// notifications for them according to their RepeatRate.
|
|
func (s *memoryAlertManager) refreshNotifications() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.needsNotificationRefresh = false
|
|
|
|
l := s.filteredLabelSets(false)
|
|
|
|
numSent := 0
|
|
for _, lb := range l {
|
|
agg := s.aggregates[lb.Fingerprint()]
|
|
if agg.NextNotification.After(time.Now()) {
|
|
continue
|
|
}
|
|
if agg.Rule != nil {
|
|
s.notifier.QueueNotification(agg.Alert, notificationOpTrigger, agg.Rule.NotificationConfigName)
|
|
agg.LastNotification = time.Now()
|
|
agg.NextNotification = agg.LastNotification.Add(agg.Rule.RepeatRate)
|
|
numSent++
|
|
}
|
|
}
|
|
if numSent > 0 {
|
|
log.Infof("Sent %d notifications", numSent)
|
|
heap.Init(&s.aggregatesByNextNotification)
|
|
}
|
|
}
|
|
|
|
// Reports whether a notification recomputation is required.
|
|
func (s *memoryAlertManager) refreshNeeded() (bool, []string) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
needsRefresh := false
|
|
reasons := []string{}
|
|
if s.needsNotificationRefresh {
|
|
needsRefresh = true
|
|
reasons = append(reasons, "active alerts have changed")
|
|
}
|
|
if s.inhibitor.HasChanged() {
|
|
needsRefresh = true
|
|
reasons = append(reasons, "inhibit rules have changed")
|
|
}
|
|
if s.silencer.HasChanged() {
|
|
needsRefresh = true
|
|
reasons = append(reasons, "silences have changed")
|
|
}
|
|
return needsRefresh, reasons
|
|
}
|
|
|
|
// Perform some cheap state sanity checks.
|
|
func (s *memoryAlertManager) checkSanity() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if len(s.aggregates) != len(s.aggregatesByLastRefreshed.AlertAggregates) {
|
|
panic("len(aggregates) != len(aggregatesByLastRefreshed)")
|
|
}
|
|
if len(s.aggregates) != len(s.aggregatesByNextNotification.AlertAggregates) {
|
|
panic("len(aggregates) != len(aggregatesByNextNotification)")
|
|
}
|
|
}
|
|
|
|
// Run a single memoryAlertManager iteration.
|
|
func (s *memoryAlertManager) runIteration() {
|
|
s.removeExpiredAggregates()
|
|
s.checkNotificationRepeats()
|
|
if refresh, reasons := s.refreshNeeded(); refresh {
|
|
log.Infof("Recomputing notification outputs (%s)", strings.Join(reasons, ", "))
|
|
s.refreshNotifications()
|
|
}
|
|
}
|
|
|
|
// Run the memoryAlertManager's main dispatcher loop.
|
|
func (s *memoryAlertManager) Run() {
|
|
iterationTicker := time.NewTicker(time.Second)
|
|
for range iterationTicker.C {
|
|
s.checkSanity()
|
|
s.runIteration()
|
|
}
|
|
}
|