Major rewrite of alertmanager, adding inhibit support.

Change-Id: If11f3aec70ba2ac816b9b824a387ffdd2e51790f
This commit is contained in:
Julius Volz 2013-08-27 15:32:08 +02:00
parent 7c21fb1f5a
commit be0e958d8f
25 changed files with 1194 additions and 507 deletions

View File

@ -24,6 +24,8 @@ import (
"github.com/prometheus/alertmanager/manager"
)
const minimumRepeatRate = 1 * time.Minute
// Config encapsulates the configuration of an Alert Manager instance. It wraps
// the raw configuration protocol buffer to be able to add custom methods to
// it.
@ -80,19 +82,42 @@ func (c Config) Validate() error {
return nil
}
// Rules returns all the AggregationRules in a Config object.
func filtersFromPb(filters []*pb.Filter) manager.Filters {
fs := make(manager.Filters, 0, len(filters))
for _, f := range filters {
fs = append(fs, manager.NewFilter(f.GetNameRe(), f.GetValueRe()))
}
return fs
}
// AggregationRules returns all the AggregationRules in a Config object.
func (c Config) AggregationRules() manager.AggregationRules {
rules := make(manager.AggregationRules, 0, len(c.AggregationRule))
for _, r := range c.AggregationRule {
filters := make(manager.Filters, 0, len(r.Filter))
for _, filter := range r.Filter {
filters = append(filters, manager.NewFilter(filter.GetNameRe(), filter.GetValueRe()))
rate := time.Duration(r.GetRepeatRateSeconds()) * time.Second
if rate < minimumRepeatRate {
rate = minimumRepeatRate
}
rules = append(rules, &manager.AggregationRule{
Filters: filters,
RepeatRate: time.Duration(r.GetRepeatRateSeconds()) * time.Second,
Filters: filtersFromPb(r.Filter),
RepeatRate: minimumRepeatRate,
NotificationConfigName: r.GetNotificationConfigName(),
})
}
return rules
}
// InhibitRules returns all the InhibitRules in a Config object.
func (c Config) InhibitRules() manager.InhibitRules {
rules := make(manager.InhibitRules, 0, len(c.InhibitRule))
for _, r := range c.InhibitRule {
sFilters := filtersFromPb(r.SourceFilter)
tFilters := filtersFromPb(r.TargetFilter)
rules = append(rules, &manager.InhibitRule{
SourceFilters: sFilters,
TargetFilters: tFilters,
MatchOn: r.MatchOn,
})
}
return rules
}

View File

@ -55,10 +55,85 @@ message AggregationRule {
optional string notification_config_name = 3;
}
// An InhibitRule specifies that a class of (source) alerts should inhibit
// notifications for another class of (target) alerts if all specified matching
// labels are equal between the two alerts. This may be used to inhibit alerts
// from sending notifications if their meaning is logically a subset of a
// higher-level alert.
//
// For example, if an entire job is down, there is little sense in sending a
// notification for every single instance of said job being down. This could be
// expressed as the following inhibit rule:
//
// inhibit_rule {
// # Select all source alerts that are candidates for being inhibitors. All
// # supplied source filters have to match in order to select a source alert.
// source_filter: {
// name_re: "alertname"
// value_re: "JobDown"
// }
// source_filter: {
// name_re: "service"
// value_re: "api"
// }
//
// # Select all target alerts that are candidates for being inhibited. All
// # supplied target filters have to match in order to select a target alert.
// target_filter: {
// name_re: "alertname"
// value_re: "InstanceDown"
// }
// target_filter: {
// name_re: "service"
// value_re: "api"
// }
//
// # A target alert only actually inhibits a source alert if they match on
// # these labels. I.e. the alerts needs to fire for the same job in the same
// # zone for the inhibit to take effect between them.
// match_on: "job"
// match_on: "zone"
// }
//
// In this example, when JobDown is firing for
//
// JobDown{zone="aa",job="test",service="api"}
//
// ...it would inhibit an InstanceDown alert for
//
// InstanceDown{zone="aa",job="test",instance="1",service="api"}
//
// However, an InstanceDown alert for another zone:
//
// {zone="ab",job="test",instance="1",service="api"}
//
// ...would still fire.
message InhibitRule {
// The set of Filters which define the group of source alerts (which inhibit
// the target alerts).
repeated Filter source_filter = 1;
// The set of Filters which define the group of target alerts (which are
// inhibited by the source alerts).
repeated Filter target_filter = 2;
// A set of label names whose label values need to be identical in source and
// target alerts in order for the inhibition to take effect.
repeated string match_on = 3;
// How many seconds to wait for a corresponding inhibit source alert to
// appear before sending any notifications for active target alerts.
// TODO(julius): Not supported yet. Implement this!
// optional int32 before_allowance = 4 [default = 0];
// How many seconds to wait after a corresponding inhibit source alert
// disappears before sending any notifications for active target alerts.
// TODO(julius): Not supported yet. Implement this!
// optional int32 after_allowance = 5 [default = 0];
}
// Global alert manager configuration.
message AlertManagerConfig {
// Aggregation rule definitions.
repeated AggregationRule aggregation_rule = 1;
// Notification configuration definitions.
repeated NotificationConfig notification_config = 2;
// List of alert inhibition rules.
repeated InhibitRule inhibit_rule = 3;
}

View File

@ -135,9 +135,42 @@ func (m *AggregationRule) GetNotificationConfigName() string {
return ""
}
type InhibitRule struct {
SourceFilter []*Filter `protobuf:"bytes,1,rep,name=source_filter" json:"source_filter,omitempty"`
TargetFilter []*Filter `protobuf:"bytes,2,rep,name=target_filter" json:"target_filter,omitempty"`
MatchOn []string `protobuf:"bytes,3,rep,name=match_on" json:"match_on,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *InhibitRule) Reset() { *m = InhibitRule{} }
func (m *InhibitRule) String() string { return proto.CompactTextString(m) }
func (*InhibitRule) ProtoMessage() {}
func (m *InhibitRule) GetSourceFilter() []*Filter {
if m != nil {
return m.SourceFilter
}
return nil
}
func (m *InhibitRule) GetTargetFilter() []*Filter {
if m != nil {
return m.TargetFilter
}
return nil
}
func (m *InhibitRule) GetMatchOn() []string {
if m != nil {
return m.MatchOn
}
return nil
}
type AlertManagerConfig struct {
AggregationRule []*AggregationRule `protobuf:"bytes,1,rep,name=aggregation_rule" json:"aggregation_rule,omitempty"`
NotificationConfig []*NotificationConfig `protobuf:"bytes,2,rep,name=notification_config" json:"notification_config,omitempty"`
InhibitRule []*InhibitRule `protobuf:"bytes,3,rep,name=inhibit_rule" json:"inhibit_rule,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -159,5 +192,12 @@ func (m *AlertManagerConfig) GetNotificationConfig() []*NotificationConfig {
return nil
}
func (m *AlertManagerConfig) GetInhibitRule() []*InhibitRule {
if m != nil {
return m.InhibitRule
}
return nil
}
func init() {
}

View File

@ -15,9 +15,9 @@ package config
import (
"io/ioutil"
"log"
"code.google.com/p/goprotobuf/proto"
"github.com/golang/glog"
pb "github.com/prometheus/alertmanager/config/generated"
)
@ -46,7 +46,7 @@ func LoadFromFile(fileName string) (Config, error) {
func MustLoadFromFile(fileName string) Config {
conf, err := LoadFromFile(fileName)
if err != nil {
log.Fatalf("Error loading configuration from %s: %s", fileName, err)
glog.Fatalf("Error loading configuration from %s: %s", fileName, err)
}
return conf
}

View File

@ -14,8 +14,7 @@
package config
import (
"log"
"github.com/golang/glog"
"github.com/howeyc/fsnotify"
)
@ -38,25 +37,25 @@ func NewFileWatcher(fileName string) *fileWatcher {
func (w *fileWatcher) Watch(cb ReloadCallback) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
glog.Fatal(err)
}
err = watcher.WatchFlags(w.fileName, fsnotify.FSN_MODIFY)
if err != nil {
log.Fatal(err)
glog.Fatal(err)
}
for {
select {
case ev := <-watcher.Event:
log.Printf("Config file changed (%s), attempting reload", ev)
glog.Infof("Config file changed (%s), attempting reload", ev)
conf, err := LoadFromFile(w.fileName)
if err != nil {
log.Println("Error loading new config:", err)
glog.Error("Error loading new config: ", err)
configLoads.Increment(map[string]string{"outcome": "failure"})
} else {
cb(&conf)
log.Println("Config reloaded successfully")
glog.Info("Config reloaded successfully")
configLoads.Increment(map[string]string{"outcome": "success"})
}
// Re-add the file watcher since it can get lost on some changes. E.g.
@ -64,7 +63,7 @@ func (w *fileWatcher) Watch(cb ReloadCallback) {
// sequence, after which the newly written file is no longer watched.
err = watcher.WatchFlags(w.fileName, fsnotify.FSN_MODIFY)
case err := <-watcher.Error:
log.Println("Error watching config:", err)
glog.Error("Error watching config: ", err)
}
}
}

45
main.go
View File

@ -15,10 +15,11 @@ package main
import (
"flag"
"log"
"os"
"time"
"github.com/golang/glog"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/manager"
"github.com/prometheus/alertmanager/web"
@ -26,8 +27,9 @@ import (
)
var (
configFile = flag.String("configFile", "alertmanager.conf", "Alert Manager configuration file name.")
silencesFile = flag.String("silencesFile", "silences.json", "Silence storage file name.")
configFile = flag.String("configFile", "alertmanager.conf", "Alert Manager configuration file name.")
silencesFile = flag.String("silencesFile", "silences.json", "Silence storage file name.")
minRefreshPeriod = flag.Duration("minRefreshPeriod", 5*time.Minute, "Minimum required alert refresh period before an alert is purged.")
)
func main() {
@ -42,13 +44,13 @@ func main() {
err := silencer.LoadFromFile(*silencesFile)
if err != nil {
log.Println("Couldn't load silences, starting up with empty silence list:", err)
glog.Warning("Couldn't load silences, starting up with empty silence list: ", err)
}
saveSilencesTicker := time.NewTicker(10 * time.Second)
go func() {
for _ = range saveSilencesTicker.C {
if err := silencer.SaveToFile(*silencesFile); err != nil {
log.Println("Error saving silences to file:", err)
glog.Error("Error saving silences to file: ", err)
}
}
}()
@ -57,9 +59,20 @@ func main() {
notifier := manager.NewNotifier(conf.NotificationConfig)
defer notifier.Close()
aggregator := manager.NewAggregator(notifier)
defer aggregator.Close()
inhibitor := new(manager.Inhibitor)
inhibitor.SetInhibitRules(conf.InhibitRules())
options := &manager.MemoryAlertManagerOptions{
Inhibitor: inhibitor,
Silencer: silencer,
Notifier: notifier,
MinRefreshInterval: *minRefreshPeriod,
}
alertManager := manager.NewMemoryAlertManager(options)
alertManager.SetAggregationRules(conf.AggregationRules())
go alertManager.Run()
// Web initialization.
flags := map[string]string{}
flag.VisitAll(func(f *flag.Flag) {
flags[f.Name] = f.Value.String()
@ -75,14 +88,14 @@ func main() {
webService := &web.WebService{
// REST API Service.
AlertManagerService: &api.AlertManagerService{
Aggregator: aggregator,
Silencer: silencer,
Manager: alertManager,
Silencer: silencer,
},
// Template-based page handlers.
AlertsHandler: &web.AlertsHandler{
Aggregator: aggregator,
IsInhibitedInterrogator: silencer,
Manager: alertManager,
IsSilencedInterrogator: silencer,
},
SilencesHandler: &web.SilencesHandler{
Silencer: silencer,
@ -91,15 +104,15 @@ func main() {
}
go webService.ServeForever()
aggregator.SetRules(conf.AggregationRules())
// React to configuration changes.
watcher := config.NewFileWatcher(*configFile)
go watcher.Watch(func(conf *config.Config) {
inhibitor.SetInhibitRules(conf.InhibitRules())
notifier.SetNotificationConfigs(conf.NotificationConfig)
aggregator.SetRules(conf.AggregationRules())
alertManager.SetAggregationRules(conf.AggregationRules())
statusHandler.UpdateConfig(conf.String())
})
log.Println("Running summary dispatcher...")
notifier.Dispatch(silencer)
glog.Info("Running notification dispatcher...")
notifier.Dispatch()
}

View File

@ -1,215 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"errors"
"log"
"sync"
"time"
)
const (
minimumRepeatRate = 5 * time.Minute
minimumRefreshPeriod = 5 * time.Minute
notificationRetryPeriod = 1 * time.Minute
)
// AggregationRule creates and manages the scope for received events.
type AggregationRule struct {
Filters Filters
RepeatRate time.Duration
NotificationConfigName string
}
type AggregationInstances []*AggregationInstance
type AggregationInstance struct {
Rule *AggregationRule
Event *Event
// When was this AggregationInstance created?
Created time.Time
// When was the last refresh received into this AggregationInstance?
LastRefreshed time.Time
// When was the last successful notification sent out for this
// AggregationInstance?
lastNotificationSent time.Time
// Timer used to trigger a notification retry/resend.
notificationResendTimer *time.Timer
// Timer used to trigger the deletion of the AggregationInstance after it
// hasn't been refreshed for too long.
expiryTimer *time.Timer
}
func (r *AggregationRule) Handles(e *Event) bool {
return r.Filters.Handles(e)
}
func (r *AggregationInstance) Ingest(e *Event) {
r.Event = e
r.LastRefreshed = time.Now()
r.expiryTimer.Reset(minimumRefreshPeriod)
}
func (r *AggregationInstance) SendNotification(n Notifier) {
if time.Since(r.lastNotificationSent) < r.Rule.RepeatRate {
return
}
err := n.QueueNotification(r.Event, r.Rule.NotificationConfigName)
if err != nil {
// BUG: Limit the number of retries.
log.Printf("Error while sending notification: %s, retrying in %v", err, notificationRetryPeriod)
r.resendNotificationAfter(notificationRetryPeriod, n)
return
}
r.resendNotificationAfter(r.Rule.RepeatRate, n)
r.lastNotificationSent = time.Now()
}
func (r *AggregationInstance) resendNotificationAfter(d time.Duration, n Notifier) {
r.notificationResendTimer = time.AfterFunc(d, func() {
r.SendNotification(n)
})
}
func (r *AggregationInstance) Close() {
if r.notificationResendTimer != nil {
r.notificationResendTimer.Stop()
}
if r.expiryTimer != nil {
r.expiryTimer.Stop()
}
}
type AggregationRules []*AggregationRule
type Aggregator struct {
Rules AggregationRules
Aggregates map[EventFingerprint]*AggregationInstance
Notifier Notifier
// Mutex to protect the above.
mu sync.Mutex
}
func NewAggregator(n Notifier) *Aggregator {
return &Aggregator{
Aggregates: make(map[EventFingerprint]*AggregationInstance),
Notifier: n,
}
}
func (a *Aggregator) Close() {
a.mu.Lock()
defer a.mu.Unlock()
for _, agg := range a.Aggregates {
agg.Close()
}
}
func (a *Aggregator) Receive(events Events) error {
a.mu.Lock()
defer a.mu.Unlock()
if len(a.Rules) == 0 {
return errors.New("No aggregation rules")
}
for _, e := range events {
for _, r := range a.Rules {
if r.Handles(e) {
fp := e.Fingerprint()
aggregation, ok := a.Aggregates[fp]
if !ok {
expTimer := time.AfterFunc(minimumRefreshPeriod, func() {
a.mu.Lock()
defer a.mu.Unlock()
a.removeAggregate(fp)
})
aggregation = &AggregationInstance{
Rule: r,
Created: time.Now(),
expiryTimer: expTimer,
}
a.Aggregates[fp] = aggregation
}
aggregation.Ingest(e)
aggregation.SendNotification(a.Notifier)
break
}
}
}
return nil
}
func (a *Aggregator) SetRules(rules AggregationRules) {
a.mu.Lock()
defer a.mu.Unlock()
log.Println("Replacing", len(rules), "aggregator rules...")
for _, rule := range rules {
if rule.RepeatRate < minimumRepeatRate {
log.Println("Rule repeat rate too low, setting to minimum value")
rule.RepeatRate = minimumRepeatRate
}
}
a.Rules = rules
// Reparent AggregationInstances to the first new matching rule, drop orphans
// that are not matched by any rule anymore. Expiry and notification resend
// timers are left untouched for reparented alerts, meaning that the last
// rule's RepeatRate needs to pass once before the new one is used.
for fp, agg := range a.Aggregates {
orphaned := true
for _, r := range a.Rules {
if r.Handles(agg.Event) {
agg.Rule = r
orphaned = false
break
}
}
if orphaned {
a.removeAggregate(fp)
}
}
}
func (a *Aggregator) AlertAggregates() AggregationInstances {
a.mu.Lock()
defer a.mu.Unlock()
aggs := make(AggregationInstances, 0, len(a.Aggregates))
for _, agg := range a.Aggregates {
aggs = append(aggs, agg)
}
return aggs
}
func (a *Aggregator) removeAggregate(fp EventFingerprint) {
log.Println("Deleting expired aggregation instance", a)
a.Aggregates[fp].Close()
delete(a.Aggregates, fp)
}

View File

@ -1,129 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"testing"
pb "github.com/prometheus/alertmanager/config/generated"
)
type dummyNotifier struct{}
func (d *dummyNotifier) QueueNotification(*Event, string) error {
return nil
}
func (d *dummyNotifier) SetNotificationConfigs([]*pb.NotificationConfig) {}
func (d *dummyNotifier) Dispatch(IsInhibitedInterrogator) {}
func (d *dummyNotifier) Close() {}
type testAggregatorScenario struct {
rules AggregationRules
inMatch Events
inNoMatch Events
}
func (s *testAggregatorScenario) test(i int, t *testing.T) {
a := NewAggregator(&dummyNotifier{})
a.SetRules(s.rules)
if len(s.inMatch) > 0 {
err := a.Receive(s.inMatch)
if err != nil {
t.Fatalf("%d. Expected input %v to match, got error: %s", i, s.inMatch, err)
}
}
if len(s.inNoMatch) > 0 {
err := a.Receive(s.inNoMatch)
// BUG: we need to define more clearly what should happen if a subset of
// events doesn't match. Right now we only return an error if no rules
// are configured.
if len(s.rules) == 0 && err == nil {
t.Fatalf("%d. Expected aggregation error when no rules are set", i)
}
}
aggs := a.AlertAggregates()
if len(aggs) != len(s.inMatch) {
t.Fatalf("%d. Expected %d aggregates, got %d", i, len(s.inMatch), len(aggs))
}
for j, agg := range aggs {
ev := s.inMatch[j]
if len(agg.Event.Labels) != len(ev.Labels) {
t.Fatalf("%d.%d. Expected %d labels, got %d", i, j, len(ev.Labels), len(agg.Event.Labels))
}
for l, v := range agg.Event.Labels {
if ev.Labels[l] != v {
t.Fatalf("%d.%d. Expected label %s=%s, got %s=%s", l, ev.Labels[l], l, v)
}
}
}
a.Close()
}
func TestAggregator(t *testing.T) {
scenarios := []testAggregatorScenario{
{
// No rules, one event.
inNoMatch: Events{
&Event{
Labels: map[string]string{
"foo": "bar",
},
},
},
},
{
// One rule, two matching events, one non-matching.
rules: AggregationRules{
&AggregationRule{
Filters: Filters{NewFilter("service", "test(-)?service")},
},
},
inMatch: Events{
&Event{
Labels: map[string]string{
"service": "testservice",
"foo": "bar",
},
},
&Event{
Labels: map[string]string{
"service": "test-service",
"bar": "baz",
},
},
},
inNoMatch: Events{
&Event{
Labels: map[string]string{
"service": "testservice2",
"foo": "bar",
},
},
},
},
}
for i, scenario := range scenarios {
scenario.test(i, t)
}
}

View File

@ -19,34 +19,42 @@ import (
"sort"
)
const EventNameLabel = "alertname"
const AlertNameLabel = "alertname"
type EventFingerprint uint64
type AlertFingerprint uint64
type EventLabels map[string]string
type EventPayload map[string]string
type AlertLabelSet map[string]string
type AlertLabelSets []AlertLabelSet
// Event models an action triggered by Prometheus.
type Event struct {
// Short summary of event.
type AlertPayload map[string]string
type Alerts []*Alert
// Alert models an action triggered by Prometheus.
type Alert struct {
// Short summary of alert.
Summary string
// Long description of event.
// Long description of alert.
Description string
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include an "alertname" label.
Labels EventLabels
Labels AlertLabelSet
// Extra key/value information which is not used for aggregation.
Payload EventPayload
Payload AlertPayload
}
func (e Event) Name() string {
return e.Labels[EventNameLabel]
func (a *Alert) Name() string {
return a.Labels[AlertNameLabel]
}
func (e Event) Fingerprint() EventFingerprint {
func (a *Alert) Fingerprint() AlertFingerprint {
return a.Labels.Fingerprint()
}
func (l AlertLabelSet) Fingerprint() AlertFingerprint {
keys := []string{}
for k := range e.Labels {
for k := range l {
keys = append(keys, k)
}
@ -56,10 +64,29 @@ func (e Event) Fingerprint() EventFingerprint {
separator := string([]byte{0})
for _, k := range keys {
fmt.Fprintf(summer, "%s%s%s%s", k, separator, e.Labels[k], separator)
fmt.Fprintf(summer, "%s%s%s%s", k, separator, l[k], separator)
}
return EventFingerprint(summer.Sum64())
return AlertFingerprint(summer.Sum64())
}
type Events []*Event
func (l AlertLabelSet) Equal(o AlertLabelSet) bool {
if len(l) != len(o) {
return false
}
for k, v := range l {
if o[k] != v {
return false
}
}
return true
}
func (l AlertLabelSet) MatchOnLabels(o AlertLabelSet, labels []string) bool {
for _, k := range labels {
if l[k] != o[k] {
return false
}
}
return true
}

View File

@ -43,8 +43,8 @@ func NewFilter(namePattern string, valuePattern string) *Filter {
}
}
func (f *Filter) Handles(e *Event) bool {
for k, v := range e.Labels {
func (f *Filter) Handles(l AlertLabelSet) bool {
for k, v := range l {
if f.Name.MatchString(k) && f.Value.MatchString(v) {
return true
}
@ -53,12 +53,12 @@ func (f *Filter) Handles(e *Event) bool {
return false
}
func (f Filters) Handles(e *Event) bool {
func (f Filters) Handles(l AlertLabelSet) bool {
fCount := len(f)
fMatch := 0
for _, filter := range f {
if filter.Handles(e) {
if filter.Handles(l) {
fMatch++
}
}
@ -66,6 +66,16 @@ func (f Filters) Handles(e *Event) bool {
return fCount == fMatch
}
func (f Filters) Filter(l AlertLabelSets) AlertLabelSets {
out := AlertLabelSets{}
for _, labels := range l {
if f.Handles(labels) {
out = append(out, labels)
}
}
return out
}
func (f Filters) fingerprint() uint64 {
summer := fnv.New64a()

48
manager/helpers_test.go Normal file
View File

@ -0,0 +1,48 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"sort"
"testing"
)
type alertLabelSetsByFingerprint AlertLabelSets
func (a alertLabelSetsByFingerprint) Len() int {
return len(a)
}
func (a alertLabelSetsByFingerprint) Less(i, j int) bool {
return a[i].Fingerprint() < a[i].Fingerprint()
}
func (a alertLabelSetsByFingerprint) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func labelSetsMustBeEqual(i int, t *testing.T, expected, actual AlertLabelSets) {
if len(actual) != len(expected) {
t.Fatalf("%d. Expected %d labelsets, got %d", i, len(expected), len(actual))
}
sort.Sort(alertLabelSetsByFingerprint(expected))
sort.Sort(alertLabelSetsByFingerprint(actual))
for j, l := range expected {
if !l.Equal(actual[j]) {
t.Fatalf("%d. Expected %v, got %v", i, l, actual[j])
}
}
}

104
manager/inhibitor.go Normal file
View File

@ -0,0 +1,104 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"sync"
_ "github.com/prometheus/alertmanager/config/generated"
)
type InhibitRules []*InhibitRule
type InhibitRule struct {
SourceFilters Filters
TargetFilters Filters
MatchOn []string
}
// Returns those target AlertLabelSets which are not inhibited by any of the
// source AlertLabelSets.
func (i *InhibitRule) Filter(s AlertLabelSets, t AlertLabelSets) AlertLabelSets {
s = i.SourceFilters.Filter(s)
out := AlertLabelSets{}
for _, tl := range t {
inhibited := false
if i.TargetFilters.Handles(tl) {
for _, sl := range s {
if tl.MatchOnLabels(sl, i.MatchOn) {
inhibited = true
break
}
}
}
if !inhibited {
out = append(out, tl)
}
}
return out
}
// Inhibitor calculates inhibition rules between its labelset inputs and only
// emits uninhibited alert labelsets.
type Inhibitor struct {
mu sync.Mutex
inhibitRules InhibitRules
dirty bool
}
// Replaces the current InhibitRules with a new set.
func (i *Inhibitor) SetInhibitRules(r InhibitRules) {
i.mu.Lock()
defer i.mu.Unlock()
i.inhibitRules = r
i.dirty = true
}
// Returns those AlertLabelSets which are not inhibited by any other
// AlertLabelSet in the provided list.
func (i *Inhibitor) Filter(l AlertLabelSets) AlertLabelSets {
i.mu.Lock()
defer i.mu.Unlock()
out := l
for _, r := range i.inhibitRules {
out = r.Filter(l, out)
}
return out
}
// Returns whether a given AlertLabelSet is inhibited by a group of other
// AlertLabelSets.
func (i *Inhibitor) IsInhibited(t AlertLabelSet, l AlertLabelSets) bool {
i.mu.Lock()
defer i.mu.Unlock()
for _, r := range i.inhibitRules {
if len(r.Filter(l, AlertLabelSets{t})) != 1 {
return true
}
}
return false
}
// Returns whether inhibits have changed since the last call to HasChanged.
func (i *Inhibitor) HasChanged() bool {
i.mu.Lock()
defer i.mu.Unlock()
dirty := i.dirty
i.dirty = false
return dirty
}

231
manager/inhibitor_test.go Normal file
View File

@ -0,0 +1,231 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"testing"
)
type testInhibitorScenario struct {
rules InhibitRules
inhibited AlertLabelSets
uninhibited AlertLabelSets
}
func (s *testInhibitorScenario) test(i int, t *testing.T) {
allLabelSets := append(s.inhibited, s.uninhibited...)
// Set the inhibit rules to an empty list.
inhibitor := new(Inhibitor)
filtered := inhibitor.Filter(allLabelSets)
labelSetsMustBeEqual(i, t, allLabelSets, filtered)
// Add inhibit rules through SetInhibitRules().
inhibitor.SetInhibitRules(s.rules)
filtered = inhibitor.Filter(allLabelSets)
labelSetsMustBeEqual(i, t, s.uninhibited, filtered)
}
func TestInhibitor(t *testing.T) {
scenarios := []testInhibitorScenario{
// No rules.
{
uninhibited: AlertLabelSets{
AlertLabelSet{
"alertname": "InstanceDown",
"instance": "1",
"job": "testjob",
},
AlertLabelSet{
"alertname": "InstanceDown",
"instance": "2",
"job": "testjob",
},
AlertLabelSet{
"alertname": "JobDown",
"job": "testinstance",
},
},
},
// One rule not matching anything.
{
rules: InhibitRules{
&InhibitRule{
SourceFilters: Filters{
NewFilter("alertname", "OtherAlert"),
},
TargetFilters: Filters{
NewFilter("alertname", "OtherAlert2"),
},
MatchOn: []string{"job"},
},
},
uninhibited: AlertLabelSets{
AlertLabelSet{
"alertname": "InstanceDown",
"instance": "1",
"job": "testjob",
},
AlertLabelSet{
"alertname": "InstanceDown",
"instance": "2",
"job": "testjob",
},
AlertLabelSet{
"alertname": "JobDown",
"job": "testinstance",
},
},
},
// One rule matching source and target alerts, but those not matching on labels.
{
rules: InhibitRules{
&InhibitRule{
SourceFilters: Filters{
NewFilter("alertname", "JobDown"),
},
TargetFilters: Filters{
NewFilter("alertname", "InstanceDown"),
},
MatchOn: []string{"job", "zone"},
},
},
uninhibited: AlertLabelSets{
AlertLabelSet{
"alertname": "InstanceDown",
"instance": "1",
"job": "testjob",
"zone": "aa",
},
AlertLabelSet{
"alertname": "InstanceDown",
"instance": "2",
"job": "testjob",
"zone": "aa",
},
AlertLabelSet{
"alertname": "JobDown",
"job": "testinstance",
"zone": "ab",
},
},
},
// Two rules, various match behaviors.
{
rules: InhibitRules{
&InhibitRule{
SourceFilters: Filters{
NewFilter("alertname", "JobDown"),
},
TargetFilters: Filters{
NewFilter("alertname", "InstanceDown"),
},
MatchOn: []string{"job", "zone"},
},
&InhibitRule{
SourceFilters: Filters{
NewFilter("alertname", "EverythingDown"),
},
TargetFilters: Filters{
NewFilter("alertname", "JobDown"),
},
MatchOn: []string{"owner"},
},
},
uninhibited: AlertLabelSets{
AlertLabelSet{
"alertname": "JobDown",
"job": "testjob",
"zone": "aa",
},
AlertLabelSet{
"alertname": "JobDown",
"job": "testjob",
"zone": "ab",
},
},
inhibited: AlertLabelSets{
AlertLabelSet{
"alertname": "InstanceDown",
"instance": "1",
"job": "testjob",
"zone": "aa",
},
AlertLabelSet{
"alertname": "InstanceDown",
"instance": "2",
"job": "testjob",
"zone": "aa",
},
},
},
// Inhibited alert inhibiting another alert (ZoneDown => JobDown => InstanceDown).
{
rules: InhibitRules{
&InhibitRule{
SourceFilters: Filters{
NewFilter("alertname", "JobDown"),
},
TargetFilters: Filters{
NewFilter("alertname", "InstanceDown"),
},
MatchOn: []string{"job", "zone"},
},
&InhibitRule{
SourceFilters: Filters{
NewFilter("alertname", "ZoneDown"),
},
TargetFilters: Filters{
NewFilter("alertname", "JobDown"),
},
MatchOn: []string{"zone"},
},
},
uninhibited: AlertLabelSets{
AlertLabelSet{
"alertname": "ZoneDown",
"zone": "aa",
},
AlertLabelSet{
"alertname": "JobDown",
"job": "testjob",
"zone": "ab",
},
},
inhibited: AlertLabelSets{
AlertLabelSet{
"alertname": "JobDown",
"job": "testjob",
"zone": "aa",
},
AlertLabelSet{
"alertname": "InstanceDown",
"instance": "1",
"job": "testjob",
"zone": "aa",
},
AlertLabelSet{
"alertname": "InstanceDown",
"instance": "2",
"job": "testjob",
"zone": "aa",
},
},
},
}
for i, scenario := range scenarios {
scenario.test(i, t)
}
}

414
manager/manager.go Normal file
View File

@ -0,0 +1,414 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"container/heap"
"sort"
"strings"
"sync"
"time"
"github.com/golang/glog"
)
// AlertManager stores Alerts and removes them upon expiry.
type AlertManager interface {
// Ingests a new alert entry into the store. If an alert with the same
// fingerprint already exists, it only updates the existing entry's metadata.
Receive(Alerts)
// Retrieves all alerts from the store that match the provided Filters.
GetAll(Filters) AlertAggregates
// Sets the AggregationRules to associate with alerts.
SetAggregationRules(AggregationRules)
// Runs the AlertManager dispatcher loop.
Run()
}
type AggregationRules []*AggregationRule
// AggregationRule creates and manages the scope for received events.
type AggregationRule struct {
Filters Filters
RepeatRate time.Duration
NotificationConfigName string
}
// Returns whether a given AggregationRule matches an Alert.
func (r *AggregationRule) Handles(l *Alert) bool {
return r.Filters.Handles(l.Labels)
}
// An AlertAggregate tracks the latest alert received for a given alert
// fingerprint and some metadata about the alert.
type AlertAggregate struct {
Alert *Alert
Rule *AggregationRule
// When was this AggregationInstance created?
Created time.Time
// When was the last refresh received into this AlertAggregate?
LastRefreshed time.Time
// When was the last notification sent out for this AlertAggregate?
LastNotification time.Time
// When should the next notification be sent according to the current Rule's
// RepeatRate?
NextNotification time.Time
}
// Ingests a received Alert into this AlertAggregate and updates metadata.
func (agg *AlertAggregate) Ingest(a *Alert) {
agg.Alert = a
agg.LastRefreshed = time.Now()
}
type AlertAggregates []*AlertAggregate
// Helper type for managing a heap based on LastRefreshed time.
type aggregatesByLastRefreshed struct {
AlertAggregates
}
// Helper type for managing a heap based on NextNotification time.
type aggregatesByNextNotification struct {
AlertAggregates
}
// Methods implementing heap.Interface.
func (aggs AlertAggregates) Len() int {
return len(aggs)
}
func (aggs aggregatesByLastRefreshed) Less(i, j int) bool {
return aggs.AlertAggregates[i].LastRefreshed.Before(aggs.AlertAggregates[j].LastRefreshed)
}
func (aggs aggregatesByNextNotification) Less(i, j int) bool {
return aggs.AlertAggregates[i].NextNotification.Before(aggs.AlertAggregates[j].NextNotification)
}
func (aggs AlertAggregates) Swap(i, j int) {
aggs[i], aggs[j] = aggs[j], aggs[i]
}
func (aggs *AlertAggregates) Push(agg interface{}) {
*aggs = append(*aggs, agg.(*AlertAggregate))
}
func (aggs *AlertAggregates) Pop() interface{} {
old := *aggs
n := len(old)
item := old[n-1]
*aggs = old[:n-1]
return item
}
// memoryAlertManager implements the AlertManager interface and only keeps
// state in memory.
type memoryAlertManager struct {
// The minimum interval for alert refreshes before being purged.
minRefreshInterval time.Duration
// Inhibitor for filtering out inhibited alerts.
inhibitor *Inhibitor
// Silencer for filtering out silenced alerts.
silencer *Silencer
// Notifier for dispatching notifications.
notifier Notifier
// Mutex protecting all fields below.
mu sync.Mutex
// Currently loaded set of AggregationRules.
rules AggregationRules
// Main AlertAggregates index by fingerprint.
aggregates map[AlertFingerprint]*AlertAggregate
// Secondary AlertAggregates index by LastRefreshed time.
aggregatesByLastRefreshed aggregatesByLastRefreshed
// Secondary AlertAggregates index by NextNotification time.
aggregatesByNextNotification aggregatesByNextNotification
// Cache of the last result of computing uninhibited/unsilenced alerts.
filteredAlerts AlertLabelSets
// Tracks whether a change has occurred that requires a recomputation of
// notification outputs.
needsNotificationRefresh bool
}
// Options for constructing a memoryAlertManager.
type MemoryAlertManagerOptions struct {
// Inhibitor for filtering out inhibited alerts.
Inhibitor *Inhibitor
// Silencer for filtering out silenced alerts.
Silencer *Silencer
// Notifier for dispatching notifications.
Notifier Notifier
// The minimum interval for alert refreshes before being purged.
MinRefreshInterval time.Duration
}
// Constructs a new memoryAlertManager.
func NewMemoryAlertManager(o *MemoryAlertManagerOptions) AlertManager {
return &memoryAlertManager{
aggregates: make(map[AlertFingerprint]*AlertAggregate),
minRefreshInterval: o.MinRefreshInterval,
inhibitor: o.Inhibitor,
silencer: o.Silencer,
notifier: o.Notifier,
}
}
// Receive and ingest a new list of alert messages (e.g. from the web API).
func (s *memoryAlertManager) Receive(as Alerts) {
s.mu.Lock()
defer s.mu.Unlock()
for _, a := range as {
s.ingest(a)
}
}
// Ingests an alert into the memoryAlertManager and creates a new
// AggregationInstance for it, if necessary.
func (s *memoryAlertManager) ingest(a *Alert) {
fp := a.Fingerprint()
agg, ok := s.aggregates[fp]
if !ok {
agg = &AlertAggregate{
Created: time.Now(),
}
agg.Ingest(a)
for _, r := range s.rules {
if r.Handles(agg.Alert) {
agg.Rule = r
break
}
}
s.aggregates[fp] = agg
heap.Push(&s.aggregatesByLastRefreshed, agg)
heap.Push(&s.aggregatesByNextNotification, agg)
s.needsNotificationRefresh = true
} else {
agg.Ingest(a)
heap.Init(&s.aggregatesByLastRefreshed)
}
}
// Get all AlertAggregates that match a given set of Filters.
func (s memoryAlertManager) GetAll(f Filters) AlertAggregates {
s.mu.Lock()
defer s.mu.Unlock()
aggs := make(AlertAggregates, 0, len(s.aggregates))
for _, agg := range s.aggregates {
if f.Handles(agg.Alert.Labels) {
// Make a deep copy of the AggregationRule so we can safely pass it to the
// outside.
aggCopy := *agg
if agg.Rule != nil {
rule := *agg.Rule
aggCopy.Rule = &rule
}
alert := *agg.Alert
aggCopy.Alert = &alert
aggs = append(aggs, &aggCopy)
}
}
return aggs
}
// Replace the current set of loaded AggregationRules by another.
func (s *memoryAlertManager) SetAggregationRules(rules AggregationRules) {
s.mu.Lock()
defer s.mu.Unlock()
glog.Infof("Replacing aggregator rules (old: %d, new: %d)...", len(s.rules), len(rules))
s.rules = rules
// Reassign AlertAggregates to the first new matching rule, set the rule to
// nil if there is no matching rule.
for _, agg := range s.aggregates {
agg.Rule = nil
for _, r := range s.rules {
if r.Handles(agg.Alert) {
agg.Rule = r
agg.NextNotification = agg.LastNotification.Add(r.RepeatRate)
break
}
}
}
heap.Init(&s.aggregatesByNextNotification)
s.needsNotificationRefresh = true
}
// Check for any expired AlertAggregates and remove them from all indexes.
func (s *memoryAlertManager) removeExpiredAggregates() {
s.mu.Lock()
defer s.mu.Unlock()
// This loop is interrupted if either the heap is empty or only non-expired
// aggregates remain in the heap.
for {
if len(s.aggregatesByLastRefreshed.AlertAggregates) == 0 {
return
}
agg := heap.Pop(&s.aggregatesByLastRefreshed).(*AlertAggregate)
if time.Since(agg.LastRefreshed) > s.minRefreshInterval {
delete(s.aggregates, agg.Alert.Fingerprint())
// Also remove the aggregate from the last-notification-time index.
n := len(s.aggregatesByNextNotification.AlertAggregates)
i := sort.Search(n, func(i int) bool {
return !agg.NextNotification.After(s.aggregatesByNextNotification.AlertAggregates[i].NextNotification)
})
if i == n {
panic("Missing alert aggregate in aggregatesByNextNotification index")
} else {
for j := i; j < n; j++ {
if s.aggregatesByNextNotification.AlertAggregates[j] == agg {
heap.Remove(&s.aggregatesByNextNotification, j)
break
}
}
}
s.needsNotificationRefresh = true
} else {
heap.Push(&s.aggregatesByLastRefreshed, agg)
return
}
}
}
// Check whether one of the filtered (uninhibited, unsilenced) alerts should
// trigger a new notification.
func (s *memoryAlertManager) checkNotificationRepeats() {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
f := s.filteredLabelSets(true)
for _, agg := range s.aggregatesByNextNotification.AlertAggregates {
for _, fl := range f {
if agg.Alert.Labels.Equal(fl) && agg.NextNotification.Before(now) {
s.needsNotificationRefresh = true
return
}
}
}
}
// Returns all active AlertLabelSets that are neither inhibited nor silenced.
func (s *memoryAlertManager) filteredLabelSets(useCache bool) AlertLabelSets {
if useCache && s.filteredAlerts != nil {
return s.filteredAlerts
}
l := make(AlertLabelSets, 0, len(s.aggregates))
for _, agg := range s.aggregates {
l = append(l, agg.Alert.Labels)
}
l = s.inhibitor.Filter(l)
s.filteredAlerts = s.silencer.Filter(l)
return s.filteredAlerts
}
// Recomputes all currently uninhibited/unsilenced alerts and queues
// notifications for them according to their RepeatRate.
func (s *memoryAlertManager) refreshNotifications() {
s.mu.Lock()
defer s.mu.Unlock()
s.needsNotificationRefresh = false
l := s.filteredLabelSets(false)
numSent := 0
for _, lb := range l {
agg := s.aggregates[lb.Fingerprint()]
if agg.NextNotification.After(time.Now()) {
continue
}
if agg.Rule != nil {
s.notifier.QueueNotification(agg.Alert, agg.Rule.NotificationConfigName)
agg.LastNotification = time.Now()
agg.NextNotification = agg.LastNotification.Add(agg.Rule.RepeatRate)
numSent++
}
}
if numSent > 0 {
glog.Infof("Sent %d notifications", numSent)
heap.Init(&s.aggregatesByNextNotification)
}
}
// Reports whether a notification recomputation is required.
func (s *memoryAlertManager) refreshNeeded() (bool, []string) {
s.mu.Lock()
defer s.mu.Unlock()
needsRefresh := false
reasons := []string{}
if s.needsNotificationRefresh {
needsRefresh = true
reasons = append(reasons, "active alerts have changed")
}
if s.inhibitor.HasChanged() {
needsRefresh = true
reasons = append(reasons, "inhibit rules have changed")
}
if s.silencer.HasChanged() {
needsRefresh = true
reasons = append(reasons, "silences have changed")
}
return needsRefresh, reasons
}
// Perform some cheap state sanity checks.
func (s *memoryAlertManager) checkSanity() {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.aggregates) != len(s.aggregatesByLastRefreshed.AlertAggregates) {
panic("len(aggregates) != len(aggregatesByLastRefreshed)")
}
if len(s.aggregates) != len(s.aggregatesByNextNotification.AlertAggregates) {
panic("len(aggregates) != len(aggregatesByNextNotification)")
}
}
// Run a single memoryAlertManager iteration.
func (s *memoryAlertManager) runIteration() {
s.removeExpiredAggregates()
s.checkNotificationRepeats()
if refresh, reasons := s.refreshNeeded(); refresh {
glog.Infof("Recomputing notification outputs (%s)", strings.Join(reasons, ", "))
s.refreshNotifications()
}
}
// Run the memoryAlertManager's main dispatcher loop.
func (s *memoryAlertManager) Run() {
iterationTicker := time.NewTicker(time.Second)
for _ = range iterationTicker.C {
s.checkSanity()
s.runIteration()
}
}

View File

@ -20,12 +20,13 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/smtp"
"sync"
"text/template"
"github.com/golang/glog"
pb "github.com/prometheus/alertmanager/config/generated"
)
@ -50,23 +51,23 @@ var (
smtpSender = flag.String("smtpSender", "alertmanager@example.org", "Sender email address to use in email notifications.")
)
// A Notifier is responsible for sending notifications for events according to
// A Notifier is responsible for sending notifications for alerts according to
// a provided notification configuration.
type Notifier interface {
// Queue a notification for asynchronous dispatching.
QueueNotification(e *Event, configName string) error
QueueNotification(a *Alert, configName string) error
// Replace current notification configs. Already enqueued messages will remain
// unaffected.
SetNotificationConfigs([]*pb.NotificationConfig)
// Start event notification dispatch loop.
Dispatch(IsInhibitedInterrogator)
// Stop the event notification dispatch loop.
// Start alert notification dispatch loop.
Dispatch()
// Stop the alert notification dispatch loop.
Close()
}
// Request for sending a notification.
type notificationReq struct {
event *Event
alert *Alert
notificationConfig *pb.NotificationConfig
}
@ -100,7 +101,7 @@ func (n *notifier) SetNotificationConfigs(configs []*pb.NotificationConfig) {
}
}
func (n *notifier) QueueNotification(event *Event, configName string) error {
func (n *notifier) QueueNotification(a *Alert, configName string) error {
n.mu.Lock()
nc, ok := n.notificationConfigs[configName]
n.mu.Unlock()
@ -113,23 +114,23 @@ func (n *notifier) QueueNotification(event *Event, configName string) error {
// notificationReq since the config might be replaced or gone at the time the
// message gets dispatched.
n.pendingNotifications <- &notificationReq{
event: event,
alert: a,
notificationConfig: nc,
}
return nil
}
func (n *notifier) sendPagerDutyNotification(serviceKey string, event *Event) error {
func (n *notifier) sendPagerDutyNotification(serviceKey string, a *Alert) error {
// http://developer.pagerduty.com/documentation/integration/events/trigger
incidentKey := event.Fingerprint()
incidentKey := a.Fingerprint()
buf, err := json.Marshal(map[string]interface{}{
"service_key": serviceKey,
"event_type": "trigger",
"description": event.Description,
"description": a.Description,
"incident_key": incidentKey,
"details": map[string]interface{}{
"grouping_labels": event.Labels,
"extra_labels": event.Payload,
"grouping_labels": a.Labels,
"extra_labels": a.Payload,
},
})
if err != nil {
@ -151,24 +152,23 @@ func (n *notifier) sendPagerDutyNotification(serviceKey string, event *Event) er
return err
}
log.Printf("Sent PagerDuty notification: %v: HTTP %d: %s", incidentKey, resp.StatusCode, respBuf)
glog.Infof("Sent PagerDuty notification: %v: HTTP %d: %s", incidentKey, resp.StatusCode, respBuf)
// BUG: Check response for result of operation.
return nil
}
func writeEmailBody(w io.Writer, event *Event) error {
if err := bodyTmpl.Execute(w, event); err != nil {
func writeEmailBody(w io.Writer, a *Alert) error {
if err := bodyTmpl.Execute(w, a); err != nil {
return err
}
buf := &bytes.Buffer{}
if err := bodyTmpl.Execute(buf, event); err != nil {
if err := bodyTmpl.Execute(buf, a); err != nil {
return err
}
log.Println(buf.String())
return nil
}
func (n *notifier) sendEmailNotification(email string, event *Event) error {
func (n *notifier) sendEmailNotification(email string, a *Alert) error {
// Connect to the SMTP smarthost.
c, err := smtp.Dial(*smtpSmartHost)
if err != nil {
@ -187,33 +187,29 @@ func (n *notifier) sendEmailNotification(email string, event *Event) error {
}
defer wc.Close()
return writeEmailBody(wc, event)
return writeEmailBody(wc, a)
}
func (n *notifier) handleNotification(event *Event, config *pb.NotificationConfig, i IsInhibitedInterrogator) {
if inhibited, _ := i.IsInhibited(event); inhibited {
return
}
func (n *notifier) handleNotification(a *Alert, config *pb.NotificationConfig) {
for _, pdConfig := range config.PagerdutyConfig {
if err := n.sendPagerDutyNotification(pdConfig.GetServiceKey(), event); err != nil {
log.Printf("Error sending PagerDuty notification: %s", err)
if err := n.sendPagerDutyNotification(pdConfig.GetServiceKey(), a); err != nil {
glog.Error("Error sending PagerDuty notification: ", err)
}
}
for _, emailConfig := range config.EmailConfig {
if *smtpSmartHost == "" {
log.Printf("No SMTP smarthost configured, not sending email notification.")
glog.Warning("No SMTP smarthost configured, not sending email notification.")
continue
}
if err := n.sendEmailNotification(emailConfig.GetEmail(), event); err != nil {
log.Printf("Error sending email notification: %s", err)
if err := n.sendEmailNotification(emailConfig.GetEmail(), a); err != nil {
glog.Error("Error sending email notification: ", err)
}
}
}
func (n *notifier) Dispatch(i IsInhibitedInterrogator) {
func (n *notifier) Dispatch() {
for req := range n.pendingNotifications {
n.handleNotification(req.event, req.notificationConfig, i)
n.handleNotification(req.alert, req.notificationConfig)
}
}

View File

@ -19,15 +19,15 @@ import (
)
func TestWriteEmailBody(t *testing.T) {
event := &Event{
event := &Alert{
Summary: "Testsummary",
Description: "Test alert description, something went wrong here.",
Labels: EventLabels{
Labels: AlertLabelSet{
"alertname": "TestAlert",
"grouping_label1": "grouping_value1",
"grouping_label2": "grouping_value2",
},
Payload: EventPayload{
Payload: AlertPayload{
"payload_label1": "payload_value1",
"payload_label2": "payload_value2",
},

View File

@ -17,9 +17,10 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"sync"
"time"
"github.com/golang/glog"
)
type SilenceId uint
@ -36,7 +37,7 @@ type Silence struct {
EndsAt time.Time
// Additional comment about the silence.
Comment string
// Filters that determine which events are silenced.
// Filters that determine which alerts are silenced.
Filters Filters
// Timer used to trigger the deletion of the Silence after its expiry
// time.
@ -97,18 +98,24 @@ func (s *Silence) UnmarshalJSON(data []byte) error {
return nil
}
func (s Silence) Matches(l AlertLabelSet) bool {
return s.Filters.Handles(l)
}
type Silencer struct {
// Silences managed by this Silencer.
Silences map[SilenceId]*Silence
// Used to track the next Silence Id to allocate.
lastId SilenceId
// Tracks whether silences have changed since the last call to HasChanged.
dirty bool
// Mutex to protect the above.
mu sync.Mutex
}
type IsInhibitedInterrogator interface {
IsInhibited(*Event) (bool, *Silence)
type IsSilencedInterrogator interface {
IsSilenced(AlertLabelSet) (bool, *Silence)
}
func NewSilencer() *Silencer {
@ -129,7 +136,7 @@ func (s *Silencer) setupExpiryTimer(sc *Silence) {
expDuration := sc.EndsAt.Sub(time.Now())
sc.expiryTimer = time.AfterFunc(expDuration, func() {
if err := s.DelSilence(sc.Id); err != nil {
log.Printf("Failed to delete silence %d: %s", sc.Id, err)
glog.Errorf("Failed to delete silence %d: %s", sc.Id, err)
}
})
}
@ -138,6 +145,8 @@ func (s *Silencer) AddSilence(sc *Silence) SilenceId {
s.mu.Lock()
defer s.mu.Unlock()
s.dirty = true
if sc.Id == 0 {
sc.Id = s.nextSilenceId()
} else {
@ -155,6 +164,8 @@ func (s *Silencer) UpdateSilence(sc *Silence) error {
s.mu.Lock()
defer s.mu.Unlock()
s.dirty = true
origSilence, ok := s.Silences[sc.Id]
if !ok {
return fmt.Errorf("Silence with ID %d doesn't exist", sc.Id)
@ -182,6 +193,8 @@ func (s *Silencer) DelSilence(id SilenceId) error {
s.mu.Lock()
defer s.mu.Unlock()
s.dirty = true
if _, ok := s.Silences[id]; !ok {
return fmt.Errorf("Silence with ID %d doesn't exist", id)
}
@ -200,18 +213,36 @@ func (s *Silencer) SilenceSummary() Silences {
return silences
}
func (s *Silencer) IsInhibited(e *Event) (bool, *Silence) {
func (s *Silencer) IsSilenced(l AlertLabelSet) (bool, *Silence) {
s.mu.Lock()
defer s.mu.Unlock()
for _, s := range s.Silences {
if s.Filters.Handles(e) {
if s.Matches(l) {
return true, s
}
}
return false, nil
}
// Returns only those AlertLabelSets which are not matched by any silence.
func (s *Silencer) Filter(l AlertLabelSets) AlertLabelSets {
s.mu.Lock()
defer s.mu.Unlock()
out := l
for _, sc := range s.Silences {
unsilenced := AlertLabelSets{}
for _, labels := range out {
if !sc.Matches(labels) {
unsilenced = append(unsilenced, labels)
}
}
out = unsilenced
}
return out
}
// Loads a JSON representation of silences from a file.
func (s *Silencer) LoadFromFile(fileName string) error {
silenceJson, err := ioutil.ReadFile(fileName)
@ -239,6 +270,17 @@ func (s *Silencer) SaveToFile(fileName string) error {
return ioutil.WriteFile(fileName, resultBytes, 0644)
}
// Returns whether silences have been added/updated/removed since the last call
// to HasChanged.
func (s *Silencer) HasChanged() bool {
s.mu.Lock()
defer s.mu.Unlock()
dirty := s.dirty
s.dirty = false
return dirty
}
func (s *Silencer) Close() {
s.mu.Lock()
defer s.mu.Unlock()

View File

@ -19,9 +19,9 @@ import (
)
type testSilencerScenario struct {
silences Silences
inhibited Events
uninhibited Events
silences Silences
silenced Alerts
unsilenced Alerts
}
func (scenario *testSilencerScenario) test(i int, t *testing.T) {
@ -42,23 +42,34 @@ func (scenario *testSilencerScenario) test(i int, t *testing.T) {
}
}
for j, ev := range scenario.inhibited {
inhibited, sc := s.IsInhibited(ev)
if !inhibited {
t.Fatalf("%d.%d. Expected %v to be inhibited", i, j, ev)
for j, a := range scenario.silenced {
silenced, sc := s.IsSilenced(a.Labels)
if !silenced {
t.Fatalf("%d.%d. Expected %v to be silenced", i, j, a)
}
if sc == nil {
t.Fatalf("%d.%d. Expected non-nil Silence for inhibited event %v", i, j, ev)
t.Fatalf("%d.%d. Expected non-nil Silence for silenced event %v", i, j, a)
}
}
for j, ev := range scenario.uninhibited {
inhibited, sc := s.IsInhibited(ev)
if inhibited {
t.Fatalf("%d.%d. Expected %v to not be inhibited, was inhibited by %v", i, j, ev, sc)
for j, a := range scenario.unsilenced {
silenced, sc := s.IsSilenced(a.Labels)
if silenced {
t.Fatalf("%d.%d. Expected %v to not be silenced, was silenced by %v", i, j, a, sc)
}
}
l := AlertLabelSets{}
for _, a := range append(scenario.silenced, scenario.unsilenced...) {
l = append(l, a.Labels)
}
unsilenced := AlertLabelSets{}
for _, a := range scenario.unsilenced {
unsilenced = append(unsilenced, a.Labels)
}
filtered := s.Filter(l)
labelSetsMustBeEqual(i, t, filtered, unsilenced)
silences := s.SilenceSummary()
if len(silences) != len(scenario.silences) {
t.Fatalf("%d. Expected %d silences, got %d", i, len(scenario.silences), len(silences))
@ -82,8 +93,8 @@ func TestSilencer(t *testing.T) {
scenarios := []testSilencerScenario{
{
// No silences, one event.
uninhibited: Events{
&Event{
unsilenced: Alerts{
&Alert{
Labels: map[string]string{
"foo": "bar",
},
@ -102,28 +113,28 @@ func TestSilencer(t *testing.T) {
EndsAt: time.Now().Add(time.Hour),
},
},
inhibited: Events{
&Event{
silenced: Alerts{
&Alert{
Labels: map[string]string{
"service": "testservice",
"foo": "bar",
},
},
&Event{
&Alert{
Labels: map[string]string{
"service": "test-service",
"bar": "baz",
},
},
&Event{
&Alert{
Labels: map[string]string{
"service": "bar-service",
"testlabel": "testvalue",
},
},
},
uninhibited: Events{
&Event{
unsilenced: Alerts{
&Alert{
Labels: map[string]string{
"service": "testservice2",
"foo": "bar",

View File

@ -20,24 +20,24 @@ import (
)
type AlertStatus struct {
AlertAggregates []*manager.AggregationInstance
SilenceForEvent func(*manager.Event) *manager.Silence
AlertAggregates manager.AlertAggregates
SilenceForAlert func(*manager.Alert) *manager.Silence
}
type AlertsHandler struct {
Aggregator *manager.Aggregator
IsInhibitedInterrogator manager.IsInhibitedInterrogator
Manager manager.AlertManager
IsSilencedInterrogator manager.IsSilencedInterrogator
}
func (h *AlertsHandler) silenceForEvent(e *manager.Event) *manager.Silence {
_, silence := h.IsInhibitedInterrogator.IsInhibited(e)
func (h *AlertsHandler) silenceForAlert(a *manager.Alert) *manager.Silence {
_, silence := h.IsSilencedInterrogator.IsSilenced(a.Labels)
return silence
}
func (h *AlertsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
alertStatus := &AlertStatus{
AlertAggregates: h.Aggregator.AlertAggregates(),
SilenceForEvent: h.silenceForEvent,
AlertAggregates: h.Manager.GetAll(nil),
SilenceForAlert: h.silenceForAlert,
}
executeTemplate(w, "alerts", alertStatus)
}

View File

@ -14,32 +14,28 @@
package api
import (
"log"
"net/http"
"github.com/golang/glog"
"github.com/prometheus/alertmanager/manager"
)
func (s AlertManagerService) AddEvents(es manager.Events) {
for i, ev := range es {
if ev.Summary == "" || ev.Description == "" {
log.Printf("Missing field in event %d: %s", i, ev)
func (s AlertManagerService) AddAlerts(as manager.Alerts) {
for i, a := range as {
if a.Summary == "" || a.Description == "" {
glog.Errorf("Missing field in alert %d: %s", i, a)
rb := s.ResponseBuilder()
rb.SetResponseCode(http.StatusBadRequest)
return
}
if _, ok := ev.Labels[manager.EventNameLabel]; !ok {
log.Printf("Missing alert name label in event %d: %s", i, ev)
if _, ok := a.Labels[manager.AlertNameLabel]; !ok {
glog.Errorf("Missing alert name label in alert %d: %s", i, a)
rb := s.ResponseBuilder()
rb.SetResponseCode(http.StatusBadRequest)
return
}
}
err := s.Aggregator.Receive(es)
if err != nil {
log.Println("Error during aggregation:", err)
rb := s.ResponseBuilder()
rb.SetResponseCode(http.StatusServiceUnavailable)
}
s.Manager.Receive(as)
}

View File

@ -22,13 +22,13 @@ import (
type AlertManagerService struct {
gorest.RestService `root:"/api/" consumes:"application/json" produces:"application/json"`
addEvents gorest.EndPoint `method:"POST" path:"/events" postdata:"Events"`
addAlerts gorest.EndPoint `method:"POST" path:"/alerts" postdata:"Alerts"`
addSilence gorest.EndPoint `method:"POST" path:"/silences" postdata:"Silence"`
getSilence gorest.EndPoint `method:"GET" path:"/silences/{id:int}" output:"string"`
updateSilence gorest.EndPoint `method:"POST" path:"/silences/{id:int}" postdata:"Silence"`
delSilence gorest.EndPoint `method:"DELETE" path:"/silences/{id:int}"`
silenceSummary gorest.EndPoint `method:"GET" path:"/silences" output:"string"`
Aggregator *manager.Aggregator
Silencer *manager.Silencer
Manager manager.AlertManager
Silencer *manager.Silencer
}

View File

@ -16,10 +16,10 @@ package api
import (
"encoding/json"
"fmt"
"log"
"net/http"
"code.google.com/p/gorest"
"github.com/golang/glog"
"github.com/prometheus/alertmanager/manager"
)
@ -46,14 +46,14 @@ func (s AlertManagerService) GetSilence(id int) string {
rb.SetContentType(gorest.Application_Json)
silence, err := s.Silencer.GetSilence(manager.SilenceId(id))
if err != nil {
log.Printf("Error getting silence: %s", err)
glog.Error("Error getting silence: ", err)
rb.SetResponseCode(http.StatusNotFound)
return err.Error()
}
resultBytes, err := json.Marshal(&silence)
if err != nil {
log.Printf("Error marshalling silence: %s", err)
glog.Error("Error marshalling silence: ", err)
rb.SetResponseCode(http.StatusInternalServerError)
return err.Error()
}
@ -64,7 +64,7 @@ func (s AlertManagerService) UpdateSilence(sc manager.Silence, id int) {
// BUG: add server-side form validation.
sc.Id = manager.SilenceId(id)
if err := s.Silencer.UpdateSilence(&sc); err != nil {
log.Printf("Error updating silence: %s", err)
glog.Error("Error updating silence: ", err)
rb := s.ResponseBuilder()
rb.SetResponseCode(http.StatusNotFound)
}
@ -72,7 +72,7 @@ func (s AlertManagerService) UpdateSilence(sc manager.Silence, id int) {
func (s AlertManagerService) DelSilence(id int) {
if err := s.Silencer.DelSilence(manager.SilenceId(id)); err != nil {
log.Printf("Error deleting silence: %s", err)
glog.Error("Error deleting silence: ", err)
rb := s.ResponseBuilder()
rb.SetResponseCode(http.StatusNotFound)
}
@ -85,7 +85,7 @@ func (s AlertManagerService) SilenceSummary() string {
resultBytes, err := json.Marshal(silenceSummary)
if err != nil {
log.Printf("Error marshalling silences: %s", err)
glog.Error("Error marshalling silences: ", err)
rb.SetResponseCode(http.StatusInternalServerError)
return err.Error()
}

View File

@ -5,9 +5,10 @@ import (
"compress/gzip"
"fmt"
"io"
"log"
"net/http"
"strings"
"github.com/golang/glog"
)
const (
@ -50,7 +51,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
file, err := GetFile(StaticFiles, name)
if err != nil {
if err != io.EOF {
log.Printf("Could not get file: %s", err)
glog.Error("Could not get file: ", err)
}
w.WriteHeader(http.StatusNotFound)
return

View File

@ -19,25 +19,25 @@
</tr>
</thead>
<tbody>
{{$silenceForEvent := .SilenceForEvent}}
{{$silenceForAlert := .SilenceForAlert}}
{{range .AlertAggregates}}
<tr>
<td>
<span class="label label-important">{{index .Event.Name}}</span>
<span class="label label-important">{{index .Alert.Name}}</span>
<form class="add_silence_form">
<input type="hidden" name="label[]" value="alertname">
<input type="hidden" name="value[]" value="{{.Event.Name}}">
<input type="hidden" name="value[]" value="{{.Alert.Name}}">
<a href="#edit_silence_modal" role="button" class="btn btn-mini add_silence_btn" data-toggle="modal">Silence Alert</a>
</form>
</td>
<td>
{{range $label, $value := .Event.Labels}}
{{range $label, $value := .Alert.Labels}}
{{if not (eq $label "alertname")}}
<span class="label label-info">{{$label}}="{{$value}}"</span>
{{end}}
{{end}}
<form class="add_silence_form">
{{range $label, $value := .Event.Labels}}
{{range $label, $value := .Alert.Labels}}
<input type="hidden" name="label[]" value="{{$label}}">
<input type="hidden" name="value[]" value="{{$value}}">
{{end}}
@ -46,10 +46,10 @@
</td>
<td>{{timeSince .Created}} ago</td>
<td>{{timeSince .LastRefreshed}} ago</td>
<td><a href="{{.Event.Payload.GeneratorUrl}}">{{(truncate .Event.Payload.GeneratorUrl 40)}}</a></td>
<td>{{.Event.Payload.AlertingRule}}</td>
<td><a href="{{.Alert.Payload.GeneratorUrl}}">{{(truncate .Alert.Payload.GeneratorUrl 40)}}</a></td>
<td>{{.Alert.Payload.AlertingRule}}</td>
<td>
{{$silence := call $silenceForEvent .Event}}
{{$silence := call $silenceForAlert .Alert}}
{{if $silence}}
by <a href="#" class="silence_link">silence {{$silence.Id}}</a>
{{else}}

View File

@ -17,12 +17,11 @@ import (
"flag"
"fmt"
"html/template"
"log"
"net/http"
"net/http/pprof"
"code.google.com/p/gorest"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/exp"
@ -70,7 +69,7 @@ func (w WebService) ServeForever() error {
exp.Handle("/static/", http.StripPrefix("/static/", new(blob.Handler)))
}
log.Printf("listening on %s", *listenAddress)
glog.Info("listening on ", *listenAddress)
return http.ListenAndServe(*listenAddress, exp.DefaultCoarseMux)
}
@ -90,14 +89,14 @@ func getEmbeddedTemplate(name string) (*template.Template, error) {
file, err := blob.GetFile(blob.TemplateFiles, "_base.html")
if err != nil {
log.Printf("Could not read base template: %s", err)
glog.Error("Could not read base template: ", err)
return nil, err
}
t.Parse(string(file))
file, err = blob.GetFile(blob.TemplateFiles, name+".html")
if err != nil {
log.Printf("Could not read %s template: %s", name, err)
glog.Errorf("Could not read %s template: %s", name, err)
return nil, err
}
t.Parse(string(file))
@ -122,11 +121,11 @@ func getTemplate(name string) (t *template.Template, err error) {
func executeTemplate(w http.ResponseWriter, name string, data interface{}) {
tpl, err := getTemplate(name)
if err != nil {
log.Printf("Error preparing layout template: %s", err)
glog.Error("Error preparing layout template: ", err)
return
}
err = tpl.Execute(w, data)
if err != nil {
log.Printf("Error executing template: %s", err)
glog.Error("Error executing template: ", err)
}
}