diff --git a/config/config.go b/config/config.go index 08bb7a60..3005b47c 100644 --- a/config/config.go +++ b/config/config.go @@ -24,6 +24,8 @@ import ( "github.com/prometheus/alertmanager/manager" ) +const minimumRepeatRate = 1 * time.Minute + // Config encapsulates the configuration of an Alert Manager instance. It wraps // the raw configuration protocol buffer to be able to add custom methods to // it. @@ -80,19 +82,42 @@ func (c Config) Validate() error { return nil } -// Rules returns all the AggregationRules in a Config object. +func filtersFromPb(filters []*pb.Filter) manager.Filters { + fs := make(manager.Filters, 0, len(filters)) + for _, f := range filters { + fs = append(fs, manager.NewFilter(f.GetNameRe(), f.GetValueRe())) + } + return fs +} + +// AggregationRules returns all the AggregationRules in a Config object. func (c Config) AggregationRules() manager.AggregationRules { rules := make(manager.AggregationRules, 0, len(c.AggregationRule)) for _, r := range c.AggregationRule { - filters := make(manager.Filters, 0, len(r.Filter)) - for _, filter := range r.Filter { - filters = append(filters, manager.NewFilter(filter.GetNameRe(), filter.GetValueRe())) + rate := time.Duration(r.GetRepeatRateSeconds()) * time.Second + if rate < minimumRepeatRate { + rate = minimumRepeatRate } rules = append(rules, &manager.AggregationRule{ - Filters: filters, - RepeatRate: time.Duration(r.GetRepeatRateSeconds()) * time.Second, + Filters: filtersFromPb(r.Filter), + RepeatRate: minimumRepeatRate, NotificationConfigName: r.GetNotificationConfigName(), }) } return rules } + +// InhibitRules returns all the InhibitRules in a Config object. +func (c Config) InhibitRules() manager.InhibitRules { + rules := make(manager.InhibitRules, 0, len(c.InhibitRule)) + for _, r := range c.InhibitRule { + sFilters := filtersFromPb(r.SourceFilter) + tFilters := filtersFromPb(r.TargetFilter) + rules = append(rules, &manager.InhibitRule{ + SourceFilters: sFilters, + TargetFilters: tFilters, + MatchOn: r.MatchOn, + }) + } + return rules +} diff --git a/config/config.proto b/config/config.proto index f8c4e024..7eb8e7a5 100644 --- a/config/config.proto +++ b/config/config.proto @@ -55,10 +55,85 @@ message AggregationRule { optional string notification_config_name = 3; } +// An InhibitRule specifies that a class of (source) alerts should inhibit +// notifications for another class of (target) alerts if all specified matching +// labels are equal between the two alerts. This may be used to inhibit alerts +// from sending notifications if their meaning is logically a subset of a +// higher-level alert. +// +// For example, if an entire job is down, there is little sense in sending a +// notification for every single instance of said job being down. This could be +// expressed as the following inhibit rule: +// +// inhibit_rule { +// # Select all source alerts that are candidates for being inhibitors. All +// # supplied source filters have to match in order to select a source alert. +// source_filter: { +// name_re: "alertname" +// value_re: "JobDown" +// } +// source_filter: { +// name_re: "service" +// value_re: "api" +// } +// +// # Select all target alerts that are candidates for being inhibited. All +// # supplied target filters have to match in order to select a target alert. +// target_filter: { +// name_re: "alertname" +// value_re: "InstanceDown" +// } +// target_filter: { +// name_re: "service" +// value_re: "api" +// } +// +// # A target alert only actually inhibits a source alert if they match on +// # these labels. I.e. the alerts needs to fire for the same job in the same +// # zone for the inhibit to take effect between them. +// match_on: "job" +// match_on: "zone" +// } +// +// In this example, when JobDown is firing for +// +// JobDown{zone="aa",job="test",service="api"} +// +// ...it would inhibit an InstanceDown alert for +// +// InstanceDown{zone="aa",job="test",instance="1",service="api"} +// +// However, an InstanceDown alert for another zone: +// +// {zone="ab",job="test",instance="1",service="api"} +// +// ...would still fire. +message InhibitRule { + // The set of Filters which define the group of source alerts (which inhibit + // the target alerts). + repeated Filter source_filter = 1; + // The set of Filters which define the group of target alerts (which are + // inhibited by the source alerts). + repeated Filter target_filter = 2; + // A set of label names whose label values need to be identical in source and + // target alerts in order for the inhibition to take effect. + repeated string match_on = 3; + // How many seconds to wait for a corresponding inhibit source alert to + // appear before sending any notifications for active target alerts. + // TODO(julius): Not supported yet. Implement this! + // optional int32 before_allowance = 4 [default = 0]; + // How many seconds to wait after a corresponding inhibit source alert + // disappears before sending any notifications for active target alerts. + // TODO(julius): Not supported yet. Implement this! + // optional int32 after_allowance = 5 [default = 0]; +} + // Global alert manager configuration. message AlertManagerConfig { // Aggregation rule definitions. repeated AggregationRule aggregation_rule = 1; // Notification configuration definitions. repeated NotificationConfig notification_config = 2; + // List of alert inhibition rules. + repeated InhibitRule inhibit_rule = 3; } diff --git a/config/generated/config.pb.go b/config/generated/config.pb.go index 1d721341..adcce7b3 100644 --- a/config/generated/config.pb.go +++ b/config/generated/config.pb.go @@ -135,9 +135,42 @@ func (m *AggregationRule) GetNotificationConfigName() string { return "" } +type InhibitRule struct { + SourceFilter []*Filter `protobuf:"bytes,1,rep,name=source_filter" json:"source_filter,omitempty"` + TargetFilter []*Filter `protobuf:"bytes,2,rep,name=target_filter" json:"target_filter,omitempty"` + MatchOn []string `protobuf:"bytes,3,rep,name=match_on" json:"match_on,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *InhibitRule) Reset() { *m = InhibitRule{} } +func (m *InhibitRule) String() string { return proto.CompactTextString(m) } +func (*InhibitRule) ProtoMessage() {} + +func (m *InhibitRule) GetSourceFilter() []*Filter { + if m != nil { + return m.SourceFilter + } + return nil +} + +func (m *InhibitRule) GetTargetFilter() []*Filter { + if m != nil { + return m.TargetFilter + } + return nil +} + +func (m *InhibitRule) GetMatchOn() []string { + if m != nil { + return m.MatchOn + } + return nil +} + type AlertManagerConfig struct { AggregationRule []*AggregationRule `protobuf:"bytes,1,rep,name=aggregation_rule" json:"aggregation_rule,omitempty"` NotificationConfig []*NotificationConfig `protobuf:"bytes,2,rep,name=notification_config" json:"notification_config,omitempty"` + InhibitRule []*InhibitRule `protobuf:"bytes,3,rep,name=inhibit_rule" json:"inhibit_rule,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -159,5 +192,12 @@ func (m *AlertManagerConfig) GetNotificationConfig() []*NotificationConfig { return nil } +func (m *AlertManagerConfig) GetInhibitRule() []*InhibitRule { + if m != nil { + return m.InhibitRule + } + return nil +} + func init() { } diff --git a/config/load.go b/config/load.go index bd554fdb..100bc983 100644 --- a/config/load.go +++ b/config/load.go @@ -15,9 +15,9 @@ package config import ( "io/ioutil" - "log" "code.google.com/p/goprotobuf/proto" + "github.com/golang/glog" pb "github.com/prometheus/alertmanager/config/generated" ) @@ -46,7 +46,7 @@ func LoadFromFile(fileName string) (Config, error) { func MustLoadFromFile(fileName string) Config { conf, err := LoadFromFile(fileName) if err != nil { - log.Fatalf("Error loading configuration from %s: %s", fileName, err) + glog.Fatalf("Error loading configuration from %s: %s", fileName, err) } return conf } diff --git a/config/watcher.go b/config/watcher.go index a66f3d49..8cb8290e 100644 --- a/config/watcher.go +++ b/config/watcher.go @@ -14,8 +14,7 @@ package config import ( - "log" - + "github.com/golang/glog" "github.com/howeyc/fsnotify" ) @@ -38,25 +37,25 @@ func NewFileWatcher(fileName string) *fileWatcher { func (w *fileWatcher) Watch(cb ReloadCallback) { watcher, err := fsnotify.NewWatcher() if err != nil { - log.Fatal(err) + glog.Fatal(err) } err = watcher.WatchFlags(w.fileName, fsnotify.FSN_MODIFY) if err != nil { - log.Fatal(err) + glog.Fatal(err) } for { select { case ev := <-watcher.Event: - log.Printf("Config file changed (%s), attempting reload", ev) + glog.Infof("Config file changed (%s), attempting reload", ev) conf, err := LoadFromFile(w.fileName) if err != nil { - log.Println("Error loading new config:", err) + glog.Error("Error loading new config: ", err) configLoads.Increment(map[string]string{"outcome": "failure"}) } else { cb(&conf) - log.Println("Config reloaded successfully") + glog.Info("Config reloaded successfully") configLoads.Increment(map[string]string{"outcome": "success"}) } // Re-add the file watcher since it can get lost on some changes. E.g. @@ -64,7 +63,7 @@ func (w *fileWatcher) Watch(cb ReloadCallback) { // sequence, after which the newly written file is no longer watched. err = watcher.WatchFlags(w.fileName, fsnotify.FSN_MODIFY) case err := <-watcher.Error: - log.Println("Error watching config:", err) + glog.Error("Error watching config: ", err) } } } diff --git a/main.go b/main.go index 1daa4a81..fff76e12 100644 --- a/main.go +++ b/main.go @@ -15,10 +15,11 @@ package main import ( "flag" - "log" "os" "time" + "github.com/golang/glog" + "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/manager" "github.com/prometheus/alertmanager/web" @@ -26,8 +27,9 @@ import ( ) var ( - configFile = flag.String("configFile", "alertmanager.conf", "Alert Manager configuration file name.") - silencesFile = flag.String("silencesFile", "silences.json", "Silence storage file name.") + configFile = flag.String("configFile", "alertmanager.conf", "Alert Manager configuration file name.") + silencesFile = flag.String("silencesFile", "silences.json", "Silence storage file name.") + minRefreshPeriod = flag.Duration("minRefreshPeriod", 5*time.Minute, "Minimum required alert refresh period before an alert is purged.") ) func main() { @@ -42,13 +44,13 @@ func main() { err := silencer.LoadFromFile(*silencesFile) if err != nil { - log.Println("Couldn't load silences, starting up with empty silence list:", err) + glog.Warning("Couldn't load silences, starting up with empty silence list: ", err) } saveSilencesTicker := time.NewTicker(10 * time.Second) go func() { for _ = range saveSilencesTicker.C { if err := silencer.SaveToFile(*silencesFile); err != nil { - log.Println("Error saving silences to file:", err) + glog.Error("Error saving silences to file: ", err) } } }() @@ -57,9 +59,20 @@ func main() { notifier := manager.NewNotifier(conf.NotificationConfig) defer notifier.Close() - aggregator := manager.NewAggregator(notifier) - defer aggregator.Close() + inhibitor := new(manager.Inhibitor) + inhibitor.SetInhibitRules(conf.InhibitRules()) + options := &manager.MemoryAlertManagerOptions{ + Inhibitor: inhibitor, + Silencer: silencer, + Notifier: notifier, + MinRefreshInterval: *minRefreshPeriod, + } + alertManager := manager.NewMemoryAlertManager(options) + alertManager.SetAggregationRules(conf.AggregationRules()) + go alertManager.Run() + + // Web initialization. flags := map[string]string{} flag.VisitAll(func(f *flag.Flag) { flags[f.Name] = f.Value.String() @@ -75,14 +88,14 @@ func main() { webService := &web.WebService{ // REST API Service. AlertManagerService: &api.AlertManagerService{ - Aggregator: aggregator, - Silencer: silencer, + Manager: alertManager, + Silencer: silencer, }, // Template-based page handlers. AlertsHandler: &web.AlertsHandler{ - Aggregator: aggregator, - IsInhibitedInterrogator: silencer, + Manager: alertManager, + IsSilencedInterrogator: silencer, }, SilencesHandler: &web.SilencesHandler{ Silencer: silencer, @@ -91,15 +104,15 @@ func main() { } go webService.ServeForever() - aggregator.SetRules(conf.AggregationRules()) - + // React to configuration changes. watcher := config.NewFileWatcher(*configFile) go watcher.Watch(func(conf *config.Config) { + inhibitor.SetInhibitRules(conf.InhibitRules()) notifier.SetNotificationConfigs(conf.NotificationConfig) - aggregator.SetRules(conf.AggregationRules()) + alertManager.SetAggregationRules(conf.AggregationRules()) statusHandler.UpdateConfig(conf.String()) }) - log.Println("Running summary dispatcher...") - notifier.Dispatch(silencer) + glog.Info("Running notification dispatcher...") + notifier.Dispatch() } diff --git a/manager/aggregator.go b/manager/aggregator.go deleted file mode 100644 index 20833e49..00000000 --- a/manager/aggregator.go +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package manager - -import ( - "errors" - "log" - "sync" - "time" -) - -const ( - minimumRepeatRate = 5 * time.Minute - minimumRefreshPeriod = 5 * time.Minute - notificationRetryPeriod = 1 * time.Minute -) - -// AggregationRule creates and manages the scope for received events. -type AggregationRule struct { - Filters Filters - RepeatRate time.Duration - NotificationConfigName string -} - -type AggregationInstances []*AggregationInstance - -type AggregationInstance struct { - Rule *AggregationRule - Event *Event - - // When was this AggregationInstance created? - Created time.Time - // When was the last refresh received into this AggregationInstance? - LastRefreshed time.Time - - // When was the last successful notification sent out for this - // AggregationInstance? - lastNotificationSent time.Time - // Timer used to trigger a notification retry/resend. - notificationResendTimer *time.Timer - // Timer used to trigger the deletion of the AggregationInstance after it - // hasn't been refreshed for too long. - expiryTimer *time.Timer -} - -func (r *AggregationRule) Handles(e *Event) bool { - return r.Filters.Handles(e) -} - -func (r *AggregationInstance) Ingest(e *Event) { - r.Event = e - r.LastRefreshed = time.Now() - - r.expiryTimer.Reset(minimumRefreshPeriod) -} - -func (r *AggregationInstance) SendNotification(n Notifier) { - if time.Since(r.lastNotificationSent) < r.Rule.RepeatRate { - return - } - - err := n.QueueNotification(r.Event, r.Rule.NotificationConfigName) - if err != nil { - // BUG: Limit the number of retries. - log.Printf("Error while sending notification: %s, retrying in %v", err, notificationRetryPeriod) - r.resendNotificationAfter(notificationRetryPeriod, n) - return - } - - r.resendNotificationAfter(r.Rule.RepeatRate, n) - r.lastNotificationSent = time.Now() -} - -func (r *AggregationInstance) resendNotificationAfter(d time.Duration, n Notifier) { - r.notificationResendTimer = time.AfterFunc(d, func() { - r.SendNotification(n) - }) -} - -func (r *AggregationInstance) Close() { - if r.notificationResendTimer != nil { - r.notificationResendTimer.Stop() - } - if r.expiryTimer != nil { - r.expiryTimer.Stop() - } -} - -type AggregationRules []*AggregationRule - -type Aggregator struct { - Rules AggregationRules - Aggregates map[EventFingerprint]*AggregationInstance - Notifier Notifier - - // Mutex to protect the above. - mu sync.Mutex -} - -func NewAggregator(n Notifier) *Aggregator { - return &Aggregator{ - Aggregates: make(map[EventFingerprint]*AggregationInstance), - Notifier: n, - } -} - -func (a *Aggregator) Close() { - a.mu.Lock() - defer a.mu.Unlock() - - for _, agg := range a.Aggregates { - agg.Close() - } -} - -func (a *Aggregator) Receive(events Events) error { - a.mu.Lock() - defer a.mu.Unlock() - - if len(a.Rules) == 0 { - return errors.New("No aggregation rules") - } - for _, e := range events { - for _, r := range a.Rules { - if r.Handles(e) { - fp := e.Fingerprint() - aggregation, ok := a.Aggregates[fp] - if !ok { - expTimer := time.AfterFunc(minimumRefreshPeriod, func() { - a.mu.Lock() - defer a.mu.Unlock() - a.removeAggregate(fp) - }) - - aggregation = &AggregationInstance{ - Rule: r, - Created: time.Now(), - expiryTimer: expTimer, - } - - a.Aggregates[fp] = aggregation - } - - aggregation.Ingest(e) - aggregation.SendNotification(a.Notifier) - break - } - } - } - return nil -} - -func (a *Aggregator) SetRules(rules AggregationRules) { - a.mu.Lock() - defer a.mu.Unlock() - - log.Println("Replacing", len(rules), "aggregator rules...") - - for _, rule := range rules { - if rule.RepeatRate < minimumRepeatRate { - log.Println("Rule repeat rate too low, setting to minimum value") - rule.RepeatRate = minimumRepeatRate - } - } - - a.Rules = rules - - // Reparent AggregationInstances to the first new matching rule, drop orphans - // that are not matched by any rule anymore. Expiry and notification resend - // timers are left untouched for reparented alerts, meaning that the last - // rule's RepeatRate needs to pass once before the new one is used. - for fp, agg := range a.Aggregates { - orphaned := true - - for _, r := range a.Rules { - if r.Handles(agg.Event) { - agg.Rule = r - orphaned = false - break - } - } - - if orphaned { - a.removeAggregate(fp) - } - } -} - -func (a *Aggregator) AlertAggregates() AggregationInstances { - a.mu.Lock() - defer a.mu.Unlock() - - aggs := make(AggregationInstances, 0, len(a.Aggregates)) - for _, agg := range a.Aggregates { - aggs = append(aggs, agg) - } - return aggs -} - -func (a *Aggregator) removeAggregate(fp EventFingerprint) { - log.Println("Deleting expired aggregation instance", a) - a.Aggregates[fp].Close() - delete(a.Aggregates, fp) -} diff --git a/manager/aggregator_test.go b/manager/aggregator_test.go deleted file mode 100644 index fcabee93..00000000 --- a/manager/aggregator_test.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package manager - -import ( - "testing" - - pb "github.com/prometheus/alertmanager/config/generated" -) - -type dummyNotifier struct{} - -func (d *dummyNotifier) QueueNotification(*Event, string) error { - return nil -} - -func (d *dummyNotifier) SetNotificationConfigs([]*pb.NotificationConfig) {} - -func (d *dummyNotifier) Dispatch(IsInhibitedInterrogator) {} - -func (d *dummyNotifier) Close() {} - -type testAggregatorScenario struct { - rules AggregationRules - inMatch Events - inNoMatch Events -} - -func (s *testAggregatorScenario) test(i int, t *testing.T) { - a := NewAggregator(&dummyNotifier{}) - a.SetRules(s.rules) - - if len(s.inMatch) > 0 { - err := a.Receive(s.inMatch) - if err != nil { - t.Fatalf("%d. Expected input %v to match, got error: %s", i, s.inMatch, err) - } - } - - if len(s.inNoMatch) > 0 { - err := a.Receive(s.inNoMatch) - // BUG: we need to define more clearly what should happen if a subset of - // events doesn't match. Right now we only return an error if no rules - // are configured. - if len(s.rules) == 0 && err == nil { - t.Fatalf("%d. Expected aggregation error when no rules are set", i) - } - } - - aggs := a.AlertAggregates() - if len(aggs) != len(s.inMatch) { - t.Fatalf("%d. Expected %d aggregates, got %d", i, len(s.inMatch), len(aggs)) - } - - for j, agg := range aggs { - ev := s.inMatch[j] - if len(agg.Event.Labels) != len(ev.Labels) { - t.Fatalf("%d.%d. Expected %d labels, got %d", i, j, len(ev.Labels), len(agg.Event.Labels)) - } - - for l, v := range agg.Event.Labels { - if ev.Labels[l] != v { - t.Fatalf("%d.%d. Expected label %s=%s, got %s=%s", l, ev.Labels[l], l, v) - } - } - } - - a.Close() -} - -func TestAggregator(t *testing.T) { - scenarios := []testAggregatorScenario{ - { - // No rules, one event. - inNoMatch: Events{ - &Event{ - Labels: map[string]string{ - "foo": "bar", - }, - }, - }, - }, - { - // One rule, two matching events, one non-matching. - rules: AggregationRules{ - &AggregationRule{ - Filters: Filters{NewFilter("service", "test(-)?service")}, - }, - }, - inMatch: Events{ - &Event{ - Labels: map[string]string{ - "service": "testservice", - "foo": "bar", - }, - }, - &Event{ - Labels: map[string]string{ - "service": "test-service", - "bar": "baz", - }, - }, - }, - inNoMatch: Events{ - &Event{ - Labels: map[string]string{ - "service": "testservice2", - "foo": "bar", - }, - }, - }, - }, - } - - for i, scenario := range scenarios { - scenario.test(i, t) - } -} diff --git a/manager/event.go b/manager/alert.go similarity index 50% rename from manager/event.go rename to manager/alert.go index 8dc73e48..73182acd 100644 --- a/manager/event.go +++ b/manager/alert.go @@ -19,34 +19,42 @@ import ( "sort" ) -const EventNameLabel = "alertname" +const AlertNameLabel = "alertname" -type EventFingerprint uint64 +type AlertFingerprint uint64 -type EventLabels map[string]string -type EventPayload map[string]string +type AlertLabelSet map[string]string +type AlertLabelSets []AlertLabelSet -// Event models an action triggered by Prometheus. -type Event struct { - // Short summary of event. +type AlertPayload map[string]string + +type Alerts []*Alert + +// Alert models an action triggered by Prometheus. +type Alert struct { + // Short summary of alert. Summary string - // Long description of event. + // Long description of alert. Description string // Label value pairs for purpose of aggregation, matching, and disposition // dispatching. This must minimally include an "alertname" label. - Labels EventLabels + Labels AlertLabelSet // Extra key/value information which is not used for aggregation. - Payload EventPayload + Payload AlertPayload } -func (e Event) Name() string { - return e.Labels[EventNameLabel] +func (a *Alert) Name() string { + return a.Labels[AlertNameLabel] } -func (e Event) Fingerprint() EventFingerprint { +func (a *Alert) Fingerprint() AlertFingerprint { + return a.Labels.Fingerprint() +} + +func (l AlertLabelSet) Fingerprint() AlertFingerprint { keys := []string{} - for k := range e.Labels { + for k := range l { keys = append(keys, k) } @@ -56,10 +64,29 @@ func (e Event) Fingerprint() EventFingerprint { separator := string([]byte{0}) for _, k := range keys { - fmt.Fprintf(summer, "%s%s%s%s", k, separator, e.Labels[k], separator) + fmt.Fprintf(summer, "%s%s%s%s", k, separator, l[k], separator) } - return EventFingerprint(summer.Sum64()) + return AlertFingerprint(summer.Sum64()) } -type Events []*Event +func (l AlertLabelSet) Equal(o AlertLabelSet) bool { + if len(l) != len(o) { + return false + } + for k, v := range l { + if o[k] != v { + return false + } + } + return true +} + +func (l AlertLabelSet) MatchOnLabels(o AlertLabelSet, labels []string) bool { + for _, k := range labels { + if l[k] != o[k] { + return false + } + } + return true +} diff --git a/manager/matcher.go b/manager/filter.go similarity index 82% rename from manager/matcher.go rename to manager/filter.go index de004787..89b3e58e 100644 --- a/manager/matcher.go +++ b/manager/filter.go @@ -43,8 +43,8 @@ func NewFilter(namePattern string, valuePattern string) *Filter { } } -func (f *Filter) Handles(e *Event) bool { - for k, v := range e.Labels { +func (f *Filter) Handles(l AlertLabelSet) bool { + for k, v := range l { if f.Name.MatchString(k) && f.Value.MatchString(v) { return true } @@ -53,12 +53,12 @@ func (f *Filter) Handles(e *Event) bool { return false } -func (f Filters) Handles(e *Event) bool { +func (f Filters) Handles(l AlertLabelSet) bool { fCount := len(f) fMatch := 0 for _, filter := range f { - if filter.Handles(e) { + if filter.Handles(l) { fMatch++ } } @@ -66,6 +66,16 @@ func (f Filters) Handles(e *Event) bool { return fCount == fMatch } +func (f Filters) Filter(l AlertLabelSets) AlertLabelSets { + out := AlertLabelSets{} + for _, labels := range l { + if f.Handles(labels) { + out = append(out, labels) + } + } + return out +} + func (f Filters) fingerprint() uint64 { summer := fnv.New64a() diff --git a/manager/helpers_test.go b/manager/helpers_test.go new file mode 100644 index 00000000..a05d3cf5 --- /dev/null +++ b/manager/helpers_test.go @@ -0,0 +1,48 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "sort" + "testing" +) + +type alertLabelSetsByFingerprint AlertLabelSets + +func (a alertLabelSetsByFingerprint) Len() int { + return len(a) +} + +func (a alertLabelSetsByFingerprint) Less(i, j int) bool { + return a[i].Fingerprint() < a[i].Fingerprint() +} + +func (a alertLabelSetsByFingerprint) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func labelSetsMustBeEqual(i int, t *testing.T, expected, actual AlertLabelSets) { + if len(actual) != len(expected) { + t.Fatalf("%d. Expected %d labelsets, got %d", i, len(expected), len(actual)) + } + + sort.Sort(alertLabelSetsByFingerprint(expected)) + sort.Sort(alertLabelSetsByFingerprint(actual)) + + for j, l := range expected { + if !l.Equal(actual[j]) { + t.Fatalf("%d. Expected %v, got %v", i, l, actual[j]) + } + } +} diff --git a/manager/inhibitor.go b/manager/inhibitor.go new file mode 100644 index 00000000..ae749654 --- /dev/null +++ b/manager/inhibitor.go @@ -0,0 +1,104 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "sync" + + _ "github.com/prometheus/alertmanager/config/generated" +) + +type InhibitRules []*InhibitRule + +type InhibitRule struct { + SourceFilters Filters + TargetFilters Filters + MatchOn []string +} + +// Returns those target AlertLabelSets which are not inhibited by any of the +// source AlertLabelSets. +func (i *InhibitRule) Filter(s AlertLabelSets, t AlertLabelSets) AlertLabelSets { + s = i.SourceFilters.Filter(s) + out := AlertLabelSets{} + for _, tl := range t { + inhibited := false + if i.TargetFilters.Handles(tl) { + for _, sl := range s { + if tl.MatchOnLabels(sl, i.MatchOn) { + inhibited = true + break + } + } + } + if !inhibited { + out = append(out, tl) + } + } + return out +} + +// Inhibitor calculates inhibition rules between its labelset inputs and only +// emits uninhibited alert labelsets. +type Inhibitor struct { + mu sync.Mutex + inhibitRules InhibitRules + dirty bool +} + +// Replaces the current InhibitRules with a new set. +func (i *Inhibitor) SetInhibitRules(r InhibitRules) { + i.mu.Lock() + defer i.mu.Unlock() + + i.inhibitRules = r + i.dirty = true +} + +// Returns those AlertLabelSets which are not inhibited by any other +// AlertLabelSet in the provided list. +func (i *Inhibitor) Filter(l AlertLabelSets) AlertLabelSets { + i.mu.Lock() + defer i.mu.Unlock() + + out := l + for _, r := range i.inhibitRules { + out = r.Filter(l, out) + } + return out +} + +// Returns whether a given AlertLabelSet is inhibited by a group of other +// AlertLabelSets. +func (i *Inhibitor) IsInhibited(t AlertLabelSet, l AlertLabelSets) bool { + i.mu.Lock() + defer i.mu.Unlock() + + for _, r := range i.inhibitRules { + if len(r.Filter(l, AlertLabelSets{t})) != 1 { + return true + } + } + return false +} + +// Returns whether inhibits have changed since the last call to HasChanged. +func (i *Inhibitor) HasChanged() bool { + i.mu.Lock() + defer i.mu.Unlock() + + dirty := i.dirty + i.dirty = false + return dirty +} diff --git a/manager/inhibitor_test.go b/manager/inhibitor_test.go new file mode 100644 index 00000000..c3360753 --- /dev/null +++ b/manager/inhibitor_test.go @@ -0,0 +1,231 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "testing" +) + +type testInhibitorScenario struct { + rules InhibitRules + inhibited AlertLabelSets + uninhibited AlertLabelSets +} + +func (s *testInhibitorScenario) test(i int, t *testing.T) { + allLabelSets := append(s.inhibited, s.uninhibited...) + + // Set the inhibit rules to an empty list. + inhibitor := new(Inhibitor) + filtered := inhibitor.Filter(allLabelSets) + labelSetsMustBeEqual(i, t, allLabelSets, filtered) + + // Add inhibit rules through SetInhibitRules(). + inhibitor.SetInhibitRules(s.rules) + filtered = inhibitor.Filter(allLabelSets) + labelSetsMustBeEqual(i, t, s.uninhibited, filtered) +} + +func TestInhibitor(t *testing.T) { + scenarios := []testInhibitorScenario{ + // No rules. + { + uninhibited: AlertLabelSets{ + AlertLabelSet{ + "alertname": "InstanceDown", + "instance": "1", + "job": "testjob", + }, + AlertLabelSet{ + "alertname": "InstanceDown", + "instance": "2", + "job": "testjob", + }, + AlertLabelSet{ + "alertname": "JobDown", + "job": "testinstance", + }, + }, + }, + // One rule not matching anything. + { + rules: InhibitRules{ + &InhibitRule{ + SourceFilters: Filters{ + NewFilter("alertname", "OtherAlert"), + }, + TargetFilters: Filters{ + NewFilter("alertname", "OtherAlert2"), + }, + MatchOn: []string{"job"}, + }, + }, + uninhibited: AlertLabelSets{ + AlertLabelSet{ + "alertname": "InstanceDown", + "instance": "1", + "job": "testjob", + }, + AlertLabelSet{ + "alertname": "InstanceDown", + "instance": "2", + "job": "testjob", + }, + AlertLabelSet{ + "alertname": "JobDown", + "job": "testinstance", + }, + }, + }, + // One rule matching source and target alerts, but those not matching on labels. + { + rules: InhibitRules{ + &InhibitRule{ + SourceFilters: Filters{ + NewFilter("alertname", "JobDown"), + }, + TargetFilters: Filters{ + NewFilter("alertname", "InstanceDown"), + }, + MatchOn: []string{"job", "zone"}, + }, + }, + uninhibited: AlertLabelSets{ + AlertLabelSet{ + "alertname": "InstanceDown", + "instance": "1", + "job": "testjob", + "zone": "aa", + }, + AlertLabelSet{ + "alertname": "InstanceDown", + "instance": "2", + "job": "testjob", + "zone": "aa", + }, + AlertLabelSet{ + "alertname": "JobDown", + "job": "testinstance", + "zone": "ab", + }, + }, + }, + // Two rules, various match behaviors. + { + rules: InhibitRules{ + &InhibitRule{ + SourceFilters: Filters{ + NewFilter("alertname", "JobDown"), + }, + TargetFilters: Filters{ + NewFilter("alertname", "InstanceDown"), + }, + MatchOn: []string{"job", "zone"}, + }, + &InhibitRule{ + SourceFilters: Filters{ + NewFilter("alertname", "EverythingDown"), + }, + TargetFilters: Filters{ + NewFilter("alertname", "JobDown"), + }, + MatchOn: []string{"owner"}, + }, + }, + uninhibited: AlertLabelSets{ + AlertLabelSet{ + "alertname": "JobDown", + "job": "testjob", + "zone": "aa", + }, + AlertLabelSet{ + "alertname": "JobDown", + "job": "testjob", + "zone": "ab", + }, + }, + inhibited: AlertLabelSets{ + AlertLabelSet{ + "alertname": "InstanceDown", + "instance": "1", + "job": "testjob", + "zone": "aa", + }, + AlertLabelSet{ + "alertname": "InstanceDown", + "instance": "2", + "job": "testjob", + "zone": "aa", + }, + }, + }, + // Inhibited alert inhibiting another alert (ZoneDown => JobDown => InstanceDown). + { + rules: InhibitRules{ + &InhibitRule{ + SourceFilters: Filters{ + NewFilter("alertname", "JobDown"), + }, + TargetFilters: Filters{ + NewFilter("alertname", "InstanceDown"), + }, + MatchOn: []string{"job", "zone"}, + }, + &InhibitRule{ + SourceFilters: Filters{ + NewFilter("alertname", "ZoneDown"), + }, + TargetFilters: Filters{ + NewFilter("alertname", "JobDown"), + }, + MatchOn: []string{"zone"}, + }, + }, + uninhibited: AlertLabelSets{ + AlertLabelSet{ + "alertname": "ZoneDown", + "zone": "aa", + }, + AlertLabelSet{ + "alertname": "JobDown", + "job": "testjob", + "zone": "ab", + }, + }, + inhibited: AlertLabelSets{ + AlertLabelSet{ + "alertname": "JobDown", + "job": "testjob", + "zone": "aa", + }, + AlertLabelSet{ + "alertname": "InstanceDown", + "instance": "1", + "job": "testjob", + "zone": "aa", + }, + AlertLabelSet{ + "alertname": "InstanceDown", + "instance": "2", + "job": "testjob", + "zone": "aa", + }, + }, + }, + } + + for i, scenario := range scenarios { + scenario.test(i, t) + } +} diff --git a/manager/manager.go b/manager/manager.go new file mode 100644 index 00000000..b3be96a7 --- /dev/null +++ b/manager/manager.go @@ -0,0 +1,414 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "container/heap" + "sort" + "strings" + "sync" + "time" + + "github.com/golang/glog" +) + +// AlertManager stores Alerts and removes them upon expiry. +type AlertManager interface { + // Ingests a new alert entry into the store. If an alert with the same + // fingerprint already exists, it only updates the existing entry's metadata. + Receive(Alerts) + // Retrieves all alerts from the store that match the provided Filters. + GetAll(Filters) AlertAggregates + // Sets the AggregationRules to associate with alerts. + SetAggregationRules(AggregationRules) + // Runs the AlertManager dispatcher loop. + Run() +} + +type AggregationRules []*AggregationRule + +// AggregationRule creates and manages the scope for received events. +type AggregationRule struct { + Filters Filters + RepeatRate time.Duration + NotificationConfigName string +} + +// Returns whether a given AggregationRule matches an Alert. +func (r *AggregationRule) Handles(l *Alert) bool { + return r.Filters.Handles(l.Labels) +} + +// An AlertAggregate tracks the latest alert received for a given alert +// fingerprint and some metadata about the alert. +type AlertAggregate struct { + Alert *Alert + Rule *AggregationRule + + // When was this AggregationInstance created? + Created time.Time + // When was the last refresh received into this AlertAggregate? + LastRefreshed time.Time + // When was the last notification sent out for this AlertAggregate? + LastNotification time.Time + // When should the next notification be sent according to the current Rule's + // RepeatRate? + NextNotification time.Time +} + +// Ingests a received Alert into this AlertAggregate and updates metadata. +func (agg *AlertAggregate) Ingest(a *Alert) { + agg.Alert = a + agg.LastRefreshed = time.Now() +} + +type AlertAggregates []*AlertAggregate + +// Helper type for managing a heap based on LastRefreshed time. +type aggregatesByLastRefreshed struct { + AlertAggregates +} + +// Helper type for managing a heap based on NextNotification time. +type aggregatesByNextNotification struct { + AlertAggregates +} + +// Methods implementing heap.Interface. +func (aggs AlertAggregates) Len() int { + return len(aggs) +} + +func (aggs aggregatesByLastRefreshed) Less(i, j int) bool { + return aggs.AlertAggregates[i].LastRefreshed.Before(aggs.AlertAggregates[j].LastRefreshed) +} + +func (aggs aggregatesByNextNotification) Less(i, j int) bool { + return aggs.AlertAggregates[i].NextNotification.Before(aggs.AlertAggregates[j].NextNotification) +} + +func (aggs AlertAggregates) Swap(i, j int) { + aggs[i], aggs[j] = aggs[j], aggs[i] +} + +func (aggs *AlertAggregates) Push(agg interface{}) { + *aggs = append(*aggs, agg.(*AlertAggregate)) +} + +func (aggs *AlertAggregates) Pop() interface{} { + old := *aggs + n := len(old) + item := old[n-1] + *aggs = old[:n-1] + return item +} + +// memoryAlertManager implements the AlertManager interface and only keeps +// state in memory. +type memoryAlertManager struct { + // The minimum interval for alert refreshes before being purged. + minRefreshInterval time.Duration + // Inhibitor for filtering out inhibited alerts. + inhibitor *Inhibitor + // Silencer for filtering out silenced alerts. + silencer *Silencer + // Notifier for dispatching notifications. + notifier Notifier + + // Mutex protecting all fields below. + mu sync.Mutex + // Currently loaded set of AggregationRules. + rules AggregationRules + // Main AlertAggregates index by fingerprint. + aggregates map[AlertFingerprint]*AlertAggregate + // Secondary AlertAggregates index by LastRefreshed time. + aggregatesByLastRefreshed aggregatesByLastRefreshed + // Secondary AlertAggregates index by NextNotification time. + aggregatesByNextNotification aggregatesByNextNotification + // Cache of the last result of computing uninhibited/unsilenced alerts. + filteredAlerts AlertLabelSets + // Tracks whether a change has occurred that requires a recomputation of + // notification outputs. + needsNotificationRefresh bool +} + +// Options for constructing a memoryAlertManager. +type MemoryAlertManagerOptions struct { + // Inhibitor for filtering out inhibited alerts. + Inhibitor *Inhibitor + // Silencer for filtering out silenced alerts. + Silencer *Silencer + // Notifier for dispatching notifications. + Notifier Notifier + // The minimum interval for alert refreshes before being purged. + MinRefreshInterval time.Duration +} + +// Constructs a new memoryAlertManager. +func NewMemoryAlertManager(o *MemoryAlertManagerOptions) AlertManager { + return &memoryAlertManager{ + aggregates: make(map[AlertFingerprint]*AlertAggregate), + + minRefreshInterval: o.MinRefreshInterval, + inhibitor: o.Inhibitor, + silencer: o.Silencer, + notifier: o.Notifier, + } +} + +// Receive and ingest a new list of alert messages (e.g. from the web API). +func (s *memoryAlertManager) Receive(as Alerts) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, a := range as { + s.ingest(a) + } +} + +// Ingests an alert into the memoryAlertManager and creates a new +// AggregationInstance for it, if necessary. +func (s *memoryAlertManager) ingest(a *Alert) { + fp := a.Fingerprint() + agg, ok := s.aggregates[fp] + if !ok { + agg = &AlertAggregate{ + Created: time.Now(), + } + agg.Ingest(a) + + for _, r := range s.rules { + if r.Handles(agg.Alert) { + agg.Rule = r + break + } + } + + s.aggregates[fp] = agg + heap.Push(&s.aggregatesByLastRefreshed, agg) + heap.Push(&s.aggregatesByNextNotification, agg) + + s.needsNotificationRefresh = true + } else { + agg.Ingest(a) + heap.Init(&s.aggregatesByLastRefreshed) + } +} + +// Get all AlertAggregates that match a given set of Filters. +func (s memoryAlertManager) GetAll(f Filters) AlertAggregates { + s.mu.Lock() + defer s.mu.Unlock() + + aggs := make(AlertAggregates, 0, len(s.aggregates)) + for _, agg := range s.aggregates { + if f.Handles(agg.Alert.Labels) { + // Make a deep copy of the AggregationRule so we can safely pass it to the + // outside. + aggCopy := *agg + if agg.Rule != nil { + rule := *agg.Rule + aggCopy.Rule = &rule + } + alert := *agg.Alert + aggCopy.Alert = &alert + + aggs = append(aggs, &aggCopy) + } + } + return aggs +} + +// Replace the current set of loaded AggregationRules by another. +func (s *memoryAlertManager) SetAggregationRules(rules AggregationRules) { + s.mu.Lock() + defer s.mu.Unlock() + + glog.Infof("Replacing aggregator rules (old: %d, new: %d)...", len(s.rules), len(rules)) + s.rules = rules + + // Reassign AlertAggregates to the first new matching rule, set the rule to + // nil if there is no matching rule. + for _, agg := range s.aggregates { + agg.Rule = nil + + for _, r := range s.rules { + if r.Handles(agg.Alert) { + agg.Rule = r + agg.NextNotification = agg.LastNotification.Add(r.RepeatRate) + break + } + } + } + heap.Init(&s.aggregatesByNextNotification) + s.needsNotificationRefresh = true +} + +// Check for any expired AlertAggregates and remove them from all indexes. +func (s *memoryAlertManager) removeExpiredAggregates() { + s.mu.Lock() + defer s.mu.Unlock() + + // This loop is interrupted if either the heap is empty or only non-expired + // aggregates remain in the heap. + for { + if len(s.aggregatesByLastRefreshed.AlertAggregates) == 0 { + return + } + + agg := heap.Pop(&s.aggregatesByLastRefreshed).(*AlertAggregate) + + if time.Since(agg.LastRefreshed) > s.minRefreshInterval { + delete(s.aggregates, agg.Alert.Fingerprint()) + + // Also remove the aggregate from the last-notification-time index. + n := len(s.aggregatesByNextNotification.AlertAggregates) + i := sort.Search(n, func(i int) bool { + return !agg.NextNotification.After(s.aggregatesByNextNotification.AlertAggregates[i].NextNotification) + }) + if i == n { + panic("Missing alert aggregate in aggregatesByNextNotification index") + } else { + for j := i; j < n; j++ { + if s.aggregatesByNextNotification.AlertAggregates[j] == agg { + heap.Remove(&s.aggregatesByNextNotification, j) + break + } + } + } + + s.needsNotificationRefresh = true + } else { + heap.Push(&s.aggregatesByLastRefreshed, agg) + return + } + } +} + +// Check whether one of the filtered (uninhibited, unsilenced) alerts should +// trigger a new notification. +func (s *memoryAlertManager) checkNotificationRepeats() { + s.mu.Lock() + defer s.mu.Unlock() + + now := time.Now() + f := s.filteredLabelSets(true) + for _, agg := range s.aggregatesByNextNotification.AlertAggregates { + for _, fl := range f { + if agg.Alert.Labels.Equal(fl) && agg.NextNotification.Before(now) { + s.needsNotificationRefresh = true + return + } + } + } +} + +// Returns all active AlertLabelSets that are neither inhibited nor silenced. +func (s *memoryAlertManager) filteredLabelSets(useCache bool) AlertLabelSets { + if useCache && s.filteredAlerts != nil { + return s.filteredAlerts + } + + l := make(AlertLabelSets, 0, len(s.aggregates)) + for _, agg := range s.aggregates { + l = append(l, agg.Alert.Labels) + } + + l = s.inhibitor.Filter(l) + s.filteredAlerts = s.silencer.Filter(l) + return s.filteredAlerts +} + +// Recomputes all currently uninhibited/unsilenced alerts and queues +// notifications for them according to their RepeatRate. +func (s *memoryAlertManager) refreshNotifications() { + s.mu.Lock() + defer s.mu.Unlock() + + s.needsNotificationRefresh = false + + l := s.filteredLabelSets(false) + + numSent := 0 + for _, lb := range l { + agg := s.aggregates[lb.Fingerprint()] + if agg.NextNotification.After(time.Now()) { + continue + } + if agg.Rule != nil { + s.notifier.QueueNotification(agg.Alert, agg.Rule.NotificationConfigName) + agg.LastNotification = time.Now() + agg.NextNotification = agg.LastNotification.Add(agg.Rule.RepeatRate) + numSent++ + } + } + if numSent > 0 { + glog.Infof("Sent %d notifications", numSent) + heap.Init(&s.aggregatesByNextNotification) + } +} + +// Reports whether a notification recomputation is required. +func (s *memoryAlertManager) refreshNeeded() (bool, []string) { + s.mu.Lock() + defer s.mu.Unlock() + + needsRefresh := false + reasons := []string{} + if s.needsNotificationRefresh { + needsRefresh = true + reasons = append(reasons, "active alerts have changed") + } + if s.inhibitor.HasChanged() { + needsRefresh = true + reasons = append(reasons, "inhibit rules have changed") + } + if s.silencer.HasChanged() { + needsRefresh = true + reasons = append(reasons, "silences have changed") + } + return needsRefresh, reasons +} + +// Perform some cheap state sanity checks. +func (s *memoryAlertManager) checkSanity() { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.aggregates) != len(s.aggregatesByLastRefreshed.AlertAggregates) { + panic("len(aggregates) != len(aggregatesByLastRefreshed)") + } + if len(s.aggregates) != len(s.aggregatesByNextNotification.AlertAggregates) { + panic("len(aggregates) != len(aggregatesByNextNotification)") + } +} + +// Run a single memoryAlertManager iteration. +func (s *memoryAlertManager) runIteration() { + s.removeExpiredAggregates() + s.checkNotificationRepeats() + if refresh, reasons := s.refreshNeeded(); refresh { + glog.Infof("Recomputing notification outputs (%s)", strings.Join(reasons, ", ")) + s.refreshNotifications() + } +} + +// Run the memoryAlertManager's main dispatcher loop. +func (s *memoryAlertManager) Run() { + iterationTicker := time.NewTicker(time.Second) + for _ = range iterationTicker.C { + s.checkSanity() + s.runIteration() + } +} diff --git a/manager/notifier.go b/manager/notifier.go index 4348e892..1b79ab35 100644 --- a/manager/notifier.go +++ b/manager/notifier.go @@ -20,12 +20,13 @@ import ( "fmt" "io" "io/ioutil" - "log" "net/http" "net/smtp" "sync" "text/template" + "github.com/golang/glog" + pb "github.com/prometheus/alertmanager/config/generated" ) @@ -50,23 +51,23 @@ var ( smtpSender = flag.String("smtpSender", "alertmanager@example.org", "Sender email address to use in email notifications.") ) -// A Notifier is responsible for sending notifications for events according to +// A Notifier is responsible for sending notifications for alerts according to // a provided notification configuration. type Notifier interface { // Queue a notification for asynchronous dispatching. - QueueNotification(e *Event, configName string) error + QueueNotification(a *Alert, configName string) error // Replace current notification configs. Already enqueued messages will remain // unaffected. SetNotificationConfigs([]*pb.NotificationConfig) - // Start event notification dispatch loop. - Dispatch(IsInhibitedInterrogator) - // Stop the event notification dispatch loop. + // Start alert notification dispatch loop. + Dispatch() + // Stop the alert notification dispatch loop. Close() } // Request for sending a notification. type notificationReq struct { - event *Event + alert *Alert notificationConfig *pb.NotificationConfig } @@ -100,7 +101,7 @@ func (n *notifier) SetNotificationConfigs(configs []*pb.NotificationConfig) { } } -func (n *notifier) QueueNotification(event *Event, configName string) error { +func (n *notifier) QueueNotification(a *Alert, configName string) error { n.mu.Lock() nc, ok := n.notificationConfigs[configName] n.mu.Unlock() @@ -113,23 +114,23 @@ func (n *notifier) QueueNotification(event *Event, configName string) error { // notificationReq since the config might be replaced or gone at the time the // message gets dispatched. n.pendingNotifications <- ¬ificationReq{ - event: event, + alert: a, notificationConfig: nc, } return nil } -func (n *notifier) sendPagerDutyNotification(serviceKey string, event *Event) error { +func (n *notifier) sendPagerDutyNotification(serviceKey string, a *Alert) error { // http://developer.pagerduty.com/documentation/integration/events/trigger - incidentKey := event.Fingerprint() + incidentKey := a.Fingerprint() buf, err := json.Marshal(map[string]interface{}{ "service_key": serviceKey, "event_type": "trigger", - "description": event.Description, + "description": a.Description, "incident_key": incidentKey, "details": map[string]interface{}{ - "grouping_labels": event.Labels, - "extra_labels": event.Payload, + "grouping_labels": a.Labels, + "extra_labels": a.Payload, }, }) if err != nil { @@ -151,24 +152,23 @@ func (n *notifier) sendPagerDutyNotification(serviceKey string, event *Event) er return err } - log.Printf("Sent PagerDuty notification: %v: HTTP %d: %s", incidentKey, resp.StatusCode, respBuf) + glog.Infof("Sent PagerDuty notification: %v: HTTP %d: %s", incidentKey, resp.StatusCode, respBuf) // BUG: Check response for result of operation. return nil } -func writeEmailBody(w io.Writer, event *Event) error { - if err := bodyTmpl.Execute(w, event); err != nil { +func writeEmailBody(w io.Writer, a *Alert) error { + if err := bodyTmpl.Execute(w, a); err != nil { return err } buf := &bytes.Buffer{} - if err := bodyTmpl.Execute(buf, event); err != nil { + if err := bodyTmpl.Execute(buf, a); err != nil { return err } - log.Println(buf.String()) return nil } -func (n *notifier) sendEmailNotification(email string, event *Event) error { +func (n *notifier) sendEmailNotification(email string, a *Alert) error { // Connect to the SMTP smarthost. c, err := smtp.Dial(*smtpSmartHost) if err != nil { @@ -187,33 +187,29 @@ func (n *notifier) sendEmailNotification(email string, event *Event) error { } defer wc.Close() - return writeEmailBody(wc, event) + return writeEmailBody(wc, a) } -func (n *notifier) handleNotification(event *Event, config *pb.NotificationConfig, i IsInhibitedInterrogator) { - if inhibited, _ := i.IsInhibited(event); inhibited { - return - } - +func (n *notifier) handleNotification(a *Alert, config *pb.NotificationConfig) { for _, pdConfig := range config.PagerdutyConfig { - if err := n.sendPagerDutyNotification(pdConfig.GetServiceKey(), event); err != nil { - log.Printf("Error sending PagerDuty notification: %s", err) + if err := n.sendPagerDutyNotification(pdConfig.GetServiceKey(), a); err != nil { + glog.Error("Error sending PagerDuty notification: ", err) } } for _, emailConfig := range config.EmailConfig { if *smtpSmartHost == "" { - log.Printf("No SMTP smarthost configured, not sending email notification.") + glog.Warning("No SMTP smarthost configured, not sending email notification.") continue } - if err := n.sendEmailNotification(emailConfig.GetEmail(), event); err != nil { - log.Printf("Error sending email notification: %s", err) + if err := n.sendEmailNotification(emailConfig.GetEmail(), a); err != nil { + glog.Error("Error sending email notification: ", err) } } } -func (n *notifier) Dispatch(i IsInhibitedInterrogator) { +func (n *notifier) Dispatch() { for req := range n.pendingNotifications { - n.handleNotification(req.event, req.notificationConfig, i) + n.handleNotification(req.alert, req.notificationConfig) } } diff --git a/manager/notifier_test.go b/manager/notifier_test.go index 4b6f9fe9..a1990f98 100644 --- a/manager/notifier_test.go +++ b/manager/notifier_test.go @@ -19,15 +19,15 @@ import ( ) func TestWriteEmailBody(t *testing.T) { - event := &Event{ + event := &Alert{ Summary: "Testsummary", Description: "Test alert description, something went wrong here.", - Labels: EventLabels{ + Labels: AlertLabelSet{ "alertname": "TestAlert", "grouping_label1": "grouping_value1", "grouping_label2": "grouping_value2", }, - Payload: EventPayload{ + Payload: AlertPayload{ "payload_label1": "payload_value1", "payload_label2": "payload_value2", }, diff --git a/manager/silencer.go b/manager/silencer.go index a14c14ba..ed03f7ee 100644 --- a/manager/silencer.go +++ b/manager/silencer.go @@ -17,9 +17,10 @@ import ( "encoding/json" "fmt" "io/ioutil" - "log" "sync" "time" + + "github.com/golang/glog" ) type SilenceId uint @@ -36,7 +37,7 @@ type Silence struct { EndsAt time.Time // Additional comment about the silence. Comment string - // Filters that determine which events are silenced. + // Filters that determine which alerts are silenced. Filters Filters // Timer used to trigger the deletion of the Silence after its expiry // time. @@ -97,18 +98,24 @@ func (s *Silence) UnmarshalJSON(data []byte) error { return nil } +func (s Silence) Matches(l AlertLabelSet) bool { + return s.Filters.Handles(l) +} + type Silencer struct { // Silences managed by this Silencer. Silences map[SilenceId]*Silence // Used to track the next Silence Id to allocate. lastId SilenceId + // Tracks whether silences have changed since the last call to HasChanged. + dirty bool // Mutex to protect the above. mu sync.Mutex } -type IsInhibitedInterrogator interface { - IsInhibited(*Event) (bool, *Silence) +type IsSilencedInterrogator interface { + IsSilenced(AlertLabelSet) (bool, *Silence) } func NewSilencer() *Silencer { @@ -129,7 +136,7 @@ func (s *Silencer) setupExpiryTimer(sc *Silence) { expDuration := sc.EndsAt.Sub(time.Now()) sc.expiryTimer = time.AfterFunc(expDuration, func() { if err := s.DelSilence(sc.Id); err != nil { - log.Printf("Failed to delete silence %d: %s", sc.Id, err) + glog.Errorf("Failed to delete silence %d: %s", sc.Id, err) } }) } @@ -138,6 +145,8 @@ func (s *Silencer) AddSilence(sc *Silence) SilenceId { s.mu.Lock() defer s.mu.Unlock() + s.dirty = true + if sc.Id == 0 { sc.Id = s.nextSilenceId() } else { @@ -155,6 +164,8 @@ func (s *Silencer) UpdateSilence(sc *Silence) error { s.mu.Lock() defer s.mu.Unlock() + s.dirty = true + origSilence, ok := s.Silences[sc.Id] if !ok { return fmt.Errorf("Silence with ID %d doesn't exist", sc.Id) @@ -182,6 +193,8 @@ func (s *Silencer) DelSilence(id SilenceId) error { s.mu.Lock() defer s.mu.Unlock() + s.dirty = true + if _, ok := s.Silences[id]; !ok { return fmt.Errorf("Silence with ID %d doesn't exist", id) } @@ -200,18 +213,36 @@ func (s *Silencer) SilenceSummary() Silences { return silences } -func (s *Silencer) IsInhibited(e *Event) (bool, *Silence) { +func (s *Silencer) IsSilenced(l AlertLabelSet) (bool, *Silence) { s.mu.Lock() defer s.mu.Unlock() for _, s := range s.Silences { - if s.Filters.Handles(e) { + if s.Matches(l) { return true, s } } return false, nil } +// Returns only those AlertLabelSets which are not matched by any silence. +func (s *Silencer) Filter(l AlertLabelSets) AlertLabelSets { + s.mu.Lock() + defer s.mu.Unlock() + + out := l + for _, sc := range s.Silences { + unsilenced := AlertLabelSets{} + for _, labels := range out { + if !sc.Matches(labels) { + unsilenced = append(unsilenced, labels) + } + } + out = unsilenced + } + return out +} + // Loads a JSON representation of silences from a file. func (s *Silencer) LoadFromFile(fileName string) error { silenceJson, err := ioutil.ReadFile(fileName) @@ -239,6 +270,17 @@ func (s *Silencer) SaveToFile(fileName string) error { return ioutil.WriteFile(fileName, resultBytes, 0644) } +// Returns whether silences have been added/updated/removed since the last call +// to HasChanged. +func (s *Silencer) HasChanged() bool { + s.mu.Lock() + defer s.mu.Unlock() + + dirty := s.dirty + s.dirty = false + return dirty +} + func (s *Silencer) Close() { s.mu.Lock() defer s.mu.Unlock() diff --git a/manager/silencer_test.go b/manager/silencer_test.go index 9283a2a6..ebec972b 100644 --- a/manager/silencer_test.go +++ b/manager/silencer_test.go @@ -19,9 +19,9 @@ import ( ) type testSilencerScenario struct { - silences Silences - inhibited Events - uninhibited Events + silences Silences + silenced Alerts + unsilenced Alerts } func (scenario *testSilencerScenario) test(i int, t *testing.T) { @@ -42,23 +42,34 @@ func (scenario *testSilencerScenario) test(i int, t *testing.T) { } } - for j, ev := range scenario.inhibited { - inhibited, sc := s.IsInhibited(ev) - if !inhibited { - t.Fatalf("%d.%d. Expected %v to be inhibited", i, j, ev) + for j, a := range scenario.silenced { + silenced, sc := s.IsSilenced(a.Labels) + if !silenced { + t.Fatalf("%d.%d. Expected %v to be silenced", i, j, a) } if sc == nil { - t.Fatalf("%d.%d. Expected non-nil Silence for inhibited event %v", i, j, ev) + t.Fatalf("%d.%d. Expected non-nil Silence for silenced event %v", i, j, a) } } - for j, ev := range scenario.uninhibited { - inhibited, sc := s.IsInhibited(ev) - if inhibited { - t.Fatalf("%d.%d. Expected %v to not be inhibited, was inhibited by %v", i, j, ev, sc) + for j, a := range scenario.unsilenced { + silenced, sc := s.IsSilenced(a.Labels) + if silenced { + t.Fatalf("%d.%d. Expected %v to not be silenced, was silenced by %v", i, j, a, sc) } } + l := AlertLabelSets{} + for _, a := range append(scenario.silenced, scenario.unsilenced...) { + l = append(l, a.Labels) + } + unsilenced := AlertLabelSets{} + for _, a := range scenario.unsilenced { + unsilenced = append(unsilenced, a.Labels) + } + filtered := s.Filter(l) + labelSetsMustBeEqual(i, t, filtered, unsilenced) + silences := s.SilenceSummary() if len(silences) != len(scenario.silences) { t.Fatalf("%d. Expected %d silences, got %d", i, len(scenario.silences), len(silences)) @@ -82,8 +93,8 @@ func TestSilencer(t *testing.T) { scenarios := []testSilencerScenario{ { // No silences, one event. - uninhibited: Events{ - &Event{ + unsilenced: Alerts{ + &Alert{ Labels: map[string]string{ "foo": "bar", }, @@ -102,28 +113,28 @@ func TestSilencer(t *testing.T) { EndsAt: time.Now().Add(time.Hour), }, }, - inhibited: Events{ - &Event{ + silenced: Alerts{ + &Alert{ Labels: map[string]string{ "service": "testservice", "foo": "bar", }, }, - &Event{ + &Alert{ Labels: map[string]string{ "service": "test-service", "bar": "baz", }, }, - &Event{ + &Alert{ Labels: map[string]string{ "service": "bar-service", "testlabel": "testvalue", }, }, }, - uninhibited: Events{ - &Event{ + unsilenced: Alerts{ + &Alert{ Labels: map[string]string{ "service": "testservice2", "foo": "bar", diff --git a/web/alerts.go b/web/alerts.go index 947451bb..4990271e 100644 --- a/web/alerts.go +++ b/web/alerts.go @@ -20,24 +20,24 @@ import ( ) type AlertStatus struct { - AlertAggregates []*manager.AggregationInstance - SilenceForEvent func(*manager.Event) *manager.Silence + AlertAggregates manager.AlertAggregates + SilenceForAlert func(*manager.Alert) *manager.Silence } type AlertsHandler struct { - Aggregator *manager.Aggregator - IsInhibitedInterrogator manager.IsInhibitedInterrogator + Manager manager.AlertManager + IsSilencedInterrogator manager.IsSilencedInterrogator } -func (h *AlertsHandler) silenceForEvent(e *manager.Event) *manager.Silence { - _, silence := h.IsInhibitedInterrogator.IsInhibited(e) +func (h *AlertsHandler) silenceForAlert(a *manager.Alert) *manager.Silence { + _, silence := h.IsSilencedInterrogator.IsSilenced(a.Labels) return silence } func (h *AlertsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { alertStatus := &AlertStatus{ - AlertAggregates: h.Aggregator.AlertAggregates(), - SilenceForEvent: h.silenceForEvent, + AlertAggregates: h.Manager.GetAll(nil), + SilenceForAlert: h.silenceForAlert, } executeTemplate(w, "alerts", alertStatus) } diff --git a/web/api/event.go b/web/api/alert.go similarity index 63% rename from web/api/event.go rename to web/api/alert.go index d0a7c81f..89c30253 100644 --- a/web/api/event.go +++ b/web/api/alert.go @@ -14,32 +14,28 @@ package api import ( - "log" "net/http" + "github.com/golang/glog" + "github.com/prometheus/alertmanager/manager" ) -func (s AlertManagerService) AddEvents(es manager.Events) { - for i, ev := range es { - if ev.Summary == "" || ev.Description == "" { - log.Printf("Missing field in event %d: %s", i, ev) +func (s AlertManagerService) AddAlerts(as manager.Alerts) { + for i, a := range as { + if a.Summary == "" || a.Description == "" { + glog.Errorf("Missing field in alert %d: %s", i, a) rb := s.ResponseBuilder() rb.SetResponseCode(http.StatusBadRequest) return } - if _, ok := ev.Labels[manager.EventNameLabel]; !ok { - log.Printf("Missing alert name label in event %d: %s", i, ev) + if _, ok := a.Labels[manager.AlertNameLabel]; !ok { + glog.Errorf("Missing alert name label in alert %d: %s", i, a) rb := s.ResponseBuilder() rb.SetResponseCode(http.StatusBadRequest) return } } - err := s.Aggregator.Receive(es) - if err != nil { - log.Println("Error during aggregation:", err) - rb := s.ResponseBuilder() - rb.SetResponseCode(http.StatusServiceUnavailable) - } + s.Manager.Receive(as) } diff --git a/web/api/api.go b/web/api/api.go index ea38a2eb..2ad767e9 100644 --- a/web/api/api.go +++ b/web/api/api.go @@ -22,13 +22,13 @@ import ( type AlertManagerService struct { gorest.RestService `root:"/api/" consumes:"application/json" produces:"application/json"` - addEvents gorest.EndPoint `method:"POST" path:"/events" postdata:"Events"` + addAlerts gorest.EndPoint `method:"POST" path:"/alerts" postdata:"Alerts"` addSilence gorest.EndPoint `method:"POST" path:"/silences" postdata:"Silence"` getSilence gorest.EndPoint `method:"GET" path:"/silences/{id:int}" output:"string"` updateSilence gorest.EndPoint `method:"POST" path:"/silences/{id:int}" postdata:"Silence"` delSilence gorest.EndPoint `method:"DELETE" path:"/silences/{id:int}"` silenceSummary gorest.EndPoint `method:"GET" path:"/silences" output:"string"` - Aggregator *manager.Aggregator - Silencer *manager.Silencer + Manager manager.AlertManager + Silencer *manager.Silencer } diff --git a/web/api/silence.go b/web/api/silence.go index 62d0b1cb..b29a58e1 100644 --- a/web/api/silence.go +++ b/web/api/silence.go @@ -16,10 +16,10 @@ package api import ( "encoding/json" "fmt" - "log" "net/http" "code.google.com/p/gorest" + "github.com/golang/glog" "github.com/prometheus/alertmanager/manager" ) @@ -46,14 +46,14 @@ func (s AlertManagerService) GetSilence(id int) string { rb.SetContentType(gorest.Application_Json) silence, err := s.Silencer.GetSilence(manager.SilenceId(id)) if err != nil { - log.Printf("Error getting silence: %s", err) + glog.Error("Error getting silence: ", err) rb.SetResponseCode(http.StatusNotFound) return err.Error() } resultBytes, err := json.Marshal(&silence) if err != nil { - log.Printf("Error marshalling silence: %s", err) + glog.Error("Error marshalling silence: ", err) rb.SetResponseCode(http.StatusInternalServerError) return err.Error() } @@ -64,7 +64,7 @@ func (s AlertManagerService) UpdateSilence(sc manager.Silence, id int) { // BUG: add server-side form validation. sc.Id = manager.SilenceId(id) if err := s.Silencer.UpdateSilence(&sc); err != nil { - log.Printf("Error updating silence: %s", err) + glog.Error("Error updating silence: ", err) rb := s.ResponseBuilder() rb.SetResponseCode(http.StatusNotFound) } @@ -72,7 +72,7 @@ func (s AlertManagerService) UpdateSilence(sc manager.Silence, id int) { func (s AlertManagerService) DelSilence(id int) { if err := s.Silencer.DelSilence(manager.SilenceId(id)); err != nil { - log.Printf("Error deleting silence: %s", err) + glog.Error("Error deleting silence: ", err) rb := s.ResponseBuilder() rb.SetResponseCode(http.StatusNotFound) } @@ -85,7 +85,7 @@ func (s AlertManagerService) SilenceSummary() string { resultBytes, err := json.Marshal(silenceSummary) if err != nil { - log.Printf("Error marshalling silences: %s", err) + glog.Error("Error marshalling silences: ", err) rb.SetResponseCode(http.StatusInternalServerError) return err.Error() } diff --git a/web/blob/blob.go b/web/blob/blob.go index 6a6981e9..7034a815 100644 --- a/web/blob/blob.go +++ b/web/blob/blob.go @@ -5,9 +5,10 @@ import ( "compress/gzip" "fmt" "io" - "log" "net/http" "strings" + + "github.com/golang/glog" ) const ( @@ -50,7 +51,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { file, err := GetFile(StaticFiles, name) if err != nil { if err != io.EOF { - log.Printf("Could not get file: %s", err) + glog.Error("Could not get file: ", err) } w.WriteHeader(http.StatusNotFound) return diff --git a/web/templates/alerts.html b/web/templates/alerts.html index f816fb13..0cf05398 100644 --- a/web/templates/alerts.html +++ b/web/templates/alerts.html @@ -19,25 +19,25 @@ - {{$silenceForEvent := .SilenceForEvent}} + {{$silenceForAlert := .SilenceForAlert}} {{range .AlertAggregates}} - {{index .Event.Name}} + {{index .Alert.Name}}
- + Silence Alert
- {{range $label, $value := .Event.Labels}} + {{range $label, $value := .Alert.Labels}} {{if not (eq $label "alertname")}} {{$label}}="{{$value}}" {{end}} {{end}}
- {{range $label, $value := .Event.Labels}} + {{range $label, $value := .Alert.Labels}} {{end}} @@ -46,10 +46,10 @@ {{timeSince .Created}} ago {{timeSince .LastRefreshed}} ago - {{(truncate .Event.Payload.GeneratorUrl 40)}} - {{.Event.Payload.AlertingRule}} + {{(truncate .Alert.Payload.GeneratorUrl 40)}} + {{.Alert.Payload.AlertingRule}} - {{$silence := call $silenceForEvent .Event}} + {{$silence := call $silenceForAlert .Alert}} {{if $silence}} by silence {{$silence.Id}} {{else}} diff --git a/web/web.go b/web/web.go index 0fe67329..174c84c0 100644 --- a/web/web.go +++ b/web/web.go @@ -17,12 +17,11 @@ import ( "flag" "fmt" "html/template" - "log" "net/http" "net/http/pprof" "code.google.com/p/gorest" - + "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/exp" @@ -70,7 +69,7 @@ func (w WebService) ServeForever() error { exp.Handle("/static/", http.StripPrefix("/static/", new(blob.Handler))) } - log.Printf("listening on %s", *listenAddress) + glog.Info("listening on ", *listenAddress) return http.ListenAndServe(*listenAddress, exp.DefaultCoarseMux) } @@ -90,14 +89,14 @@ func getEmbeddedTemplate(name string) (*template.Template, error) { file, err := blob.GetFile(blob.TemplateFiles, "_base.html") if err != nil { - log.Printf("Could not read base template: %s", err) + glog.Error("Could not read base template: ", err) return nil, err } t.Parse(string(file)) file, err = blob.GetFile(blob.TemplateFiles, name+".html") if err != nil { - log.Printf("Could not read %s template: %s", name, err) + glog.Errorf("Could not read %s template: %s", name, err) return nil, err } t.Parse(string(file)) @@ -122,11 +121,11 @@ func getTemplate(name string) (t *template.Template, err error) { func executeTemplate(w http.ResponseWriter, name string, data interface{}) { tpl, err := getTemplate(name) if err != nil { - log.Printf("Error preparing layout template: %s", err) + glog.Error("Error preparing layout template: ", err) return } err = tpl.Execute(w, data) if err != nil { - log.Printf("Error executing template: %s", err) + glog.Error("Error executing template: ", err) } }