From 95b57b3622116ca3288cd63361ea1d4e1c8c3120 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sun, 27 Sep 2015 19:50:41 +0200 Subject: [PATCH] Add deduping notifier --- dispatch.go | 3 ++ main.go | 19 +++++++----- notify.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 93 insertions(+), 14 deletions(-) diff --git a/dispatch.go b/dispatch.go index 046c251e..9e60f073 100644 --- a/dispatch.go +++ b/dispatch.go @@ -240,6 +240,9 @@ func (ag *aggrGroup) empty() bool { // flush sends notifications for all new alerts. func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { + if ag.empty() { + return + } ag.mtx.Lock() var ( diff --git a/main.go b/main.go index 0f2ad22c..23f453ee 100644 --- a/main.go +++ b/main.go @@ -34,10 +34,12 @@ func main() { log.Fatal(err) } - memAlerts := provider.NewMemAlerts() - memSilences := provider.NewMemSilences() + data := provider.NewMemData() - inhibitor := &Inhibitor{alerts: memAlerts} + alerts := provider.NewMemAlerts(data) + silences := provider.NewMemSilences() + + inhibitor := &Inhibitor{alerts: alerts} inhibitor.ApplyConfig(conf) routedNotifier := newRoutedNotifier(func(conf *config.Config) map[string]Notifier { @@ -45,21 +47,22 @@ func main() { for _, cn := range conf.NotificationConfigs { res[cn.Name] = &LogNotifier{name: cn.Name} } + return res }) routedNotifier.ApplyConfig(conf) var notifier Notifier notifier = routedNotifier notifier = &mutingNotifier{ - Notifier: notifier, + notifier: notifier, silencer: inhibitor, } notifier = &mutingNotifier{ - Notifier: notifier, - silencer: memSilences, + notifier: notifier, + silencer: silences, } - disp := NewDispatcher(memAlerts, notifier) + disp := NewDispatcher(alerts, notifier) disp.ApplyConfig(conf) go disp.Run() @@ -67,7 +70,7 @@ func main() { router := route.New() - NewAPI(router.WithPrefix("/api"), memAlerts, memSilences) + NewAPI(router.WithPrefix("/api"), alerts, silences) http.ListenAndServe(":9091", router) } diff --git a/notify.go b/notify.go index ef41d3d3..75cb0994 100644 --- a/notify.go +++ b/notify.go @@ -3,11 +3,14 @@ package main import ( "fmt" "sync" + "time" + "github.com/prometheus/common/model" "github.com/prometheus/log" "golang.org/x/net/context" "github.com/prometheus/alertmanager/config" + "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/types" ) @@ -15,23 +18,88 @@ type notifyKey int const ( notifyName notifyKey = iota + notifyRepeatInterval + notifySendResolved ) type Notifier interface { Notify(context.Context, ...*types.Alert) error } +type dedupingNotifier struct { + notifies provider.Notifies + notifier Notifier +} + +func (n *dedupingNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error { + name, ok := ctx.Value(notifyName).(string) + if !ok { + return fmt.Errorf("notifier name missing") + } + + repeatInterval, ok := ctx.Value(notifyRepeatInterval).(time.Duration) + if !ok { + return fmt.Errorf("repeat interval missing") + } + + sendResolved, ok := ctx.Value(notifySendResolved).(bool) + if !ok { + return fmt.Errorf("send resolved missing") + } + + var fps []model.Fingerprint + for _, a := range alerts { + fps = append(fps, a.Fingerprint()) + } + + notifies, err := n.notifies.Get(name, fps...) + if err != nil { + return err + } + + now := time.Now() + + var filtered []*types.Alert + for i, a := range alerts { + last := notifies[i] + + // If the initial alert was not delivered successfully, + // there is no point in sending a resolved notification. + if a.Resolved() && (!last.Delivered || !sendResolved) { + continue + } + + // Always send if the alert went from resolved to unresolved. + if last.Resolved && !a.Resolved() { + // Do not send again if last was delivered unless + // the repeat interval has already passed. + if last.Delivered && !now.After(last.Timestamp.Add(repeatInterval)) { + continue + } + } + + filtered = append(filtered, a) + } + + if err := n.notifier.Notify(ctx, filtered...); err != nil { + return err + } + + return nil +} + // routedNotifier dispatches the alerts to one of a set of // named notifiers based on the name value provided in the context. type routedNotifier struct { - mtx sync.RWMutex - notifiers map[string]Notifier + mtx sync.RWMutex + notifiers map[string]Notifier + notifierOpts map[string]*config.NotificationConfig // build creates a new set of named notifiers based on a config. build func(*config.Config) map[string]Notifier } -func newRoutedNotifier(build func(*config.Config) map[string]Notifier) { +func newRoutedNotifier(build func(*config.Config) map[string]Notifier) *routedNotifier { return &routedNotifier{ build: build, } @@ -50,6 +118,12 @@ func (n *routedNotifier) Notify(ctx context.Context, alerts ...*types.Alert) err if !ok { return fmt.Errorf("notifier %q does not exist", name) } + opts := n.notifierOpts[name] + + // Populate the context with the the filtering options + // of the notifier. + ctx = context.WithValue(ctx, notifyRepeatInterval, opts.RepeatInterval) + ctx = context.WithValue(ctx, notifySendResolved, opts.SendResolved) return notifier.Notify(ctx, alerts...) } @@ -64,8 +138,7 @@ func (n *routedNotifier) ApplyConfig(conf *config.Config) { // mutingNotifier wraps a notifier and applies a Silencer // before sending out an alert. type mutingNotifier struct { - Notifier - + notifier Notifier silencer types.Silencer } @@ -80,7 +153,7 @@ func (n *mutingNotifier) Notify(ctx context.Context, alerts ...*types.Alert) err } } - return n.Notifier.Notify(ctx, filtered...) + return n.notifier.Notify(ctx, filtered...) } type LogNotifier struct {