Add deduping notifier

This commit is contained in:
Fabian Reinartz 2015-09-27 19:50:41 +02:00
parent 49da433478
commit 95b57b3622
3 changed files with 93 additions and 14 deletions

View File

@ -240,6 +240,9 @@ func (ag *aggrGroup) empty() bool {
// flush sends notifications for all new alerts.
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
if ag.empty() {
return
}
ag.mtx.Lock()
var (

19
main.go
View File

@ -34,10 +34,12 @@ func main() {
log.Fatal(err)
}
memAlerts := provider.NewMemAlerts()
memSilences := provider.NewMemSilences()
data := provider.NewMemData()
inhibitor := &Inhibitor{alerts: memAlerts}
alerts := provider.NewMemAlerts(data)
silences := provider.NewMemSilences()
inhibitor := &Inhibitor{alerts: alerts}
inhibitor.ApplyConfig(conf)
routedNotifier := newRoutedNotifier(func(conf *config.Config) map[string]Notifier {
@ -45,21 +47,22 @@ func main() {
for _, cn := range conf.NotificationConfigs {
res[cn.Name] = &LogNotifier{name: cn.Name}
}
return res
})
routedNotifier.ApplyConfig(conf)
var notifier Notifier
notifier = routedNotifier
notifier = &mutingNotifier{
Notifier: notifier,
notifier: notifier,
silencer: inhibitor,
}
notifier = &mutingNotifier{
Notifier: notifier,
silencer: memSilences,
notifier: notifier,
silencer: silences,
}
disp := NewDispatcher(memAlerts, notifier)
disp := NewDispatcher(alerts, notifier)
disp.ApplyConfig(conf)
go disp.Run()
@ -67,7 +70,7 @@ func main() {
router := route.New()
NewAPI(router.WithPrefix("/api"), memAlerts, memSilences)
NewAPI(router.WithPrefix("/api"), alerts, silences)
http.ListenAndServe(":9091", router)
}

View File

@ -3,11 +3,14 @@ package main
import (
"fmt"
"sync"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/log"
"golang.org/x/net/context"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/types"
)
@ -15,23 +18,88 @@ type notifyKey int
const (
notifyName notifyKey = iota
notifyRepeatInterval
notifySendResolved
)
type Notifier interface {
Notify(context.Context, ...*types.Alert) error
}
type dedupingNotifier struct {
notifies provider.Notifies
notifier Notifier
}
func (n *dedupingNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error {
name, ok := ctx.Value(notifyName).(string)
if !ok {
return fmt.Errorf("notifier name missing")
}
repeatInterval, ok := ctx.Value(notifyRepeatInterval).(time.Duration)
if !ok {
return fmt.Errorf("repeat interval missing")
}
sendResolved, ok := ctx.Value(notifySendResolved).(bool)
if !ok {
return fmt.Errorf("send resolved missing")
}
var fps []model.Fingerprint
for _, a := range alerts {
fps = append(fps, a.Fingerprint())
}
notifies, err := n.notifies.Get(name, fps...)
if err != nil {
return err
}
now := time.Now()
var filtered []*types.Alert
for i, a := range alerts {
last := notifies[i]
// If the initial alert was not delivered successfully,
// there is no point in sending a resolved notification.
if a.Resolved() && (!last.Delivered || !sendResolved) {
continue
}
// Always send if the alert went from resolved to unresolved.
if last.Resolved && !a.Resolved() {
// Do not send again if last was delivered unless
// the repeat interval has already passed.
if last.Delivered && !now.After(last.Timestamp.Add(repeatInterval)) {
continue
}
}
filtered = append(filtered, a)
}
if err := n.notifier.Notify(ctx, filtered...); err != nil {
return err
}
return nil
}
// routedNotifier dispatches the alerts to one of a set of
// named notifiers based on the name value provided in the context.
type routedNotifier struct {
mtx sync.RWMutex
notifiers map[string]Notifier
mtx sync.RWMutex
notifiers map[string]Notifier
notifierOpts map[string]*config.NotificationConfig
// build creates a new set of named notifiers based on a config.
build func(*config.Config) map[string]Notifier
}
func newRoutedNotifier(build func(*config.Config) map[string]Notifier) {
func newRoutedNotifier(build func(*config.Config) map[string]Notifier) *routedNotifier {
return &routedNotifier{
build: build,
}
@ -50,6 +118,12 @@ func (n *routedNotifier) Notify(ctx context.Context, alerts ...*types.Alert) err
if !ok {
return fmt.Errorf("notifier %q does not exist", name)
}
opts := n.notifierOpts[name]
// Populate the context with the the filtering options
// of the notifier.
ctx = context.WithValue(ctx, notifyRepeatInterval, opts.RepeatInterval)
ctx = context.WithValue(ctx, notifySendResolved, opts.SendResolved)
return notifier.Notify(ctx, alerts...)
}
@ -64,8 +138,7 @@ func (n *routedNotifier) ApplyConfig(conf *config.Config) {
// mutingNotifier wraps a notifier and applies a Silencer
// before sending out an alert.
type mutingNotifier struct {
Notifier
notifier Notifier
silencer types.Silencer
}
@ -80,7 +153,7 @@ func (n *mutingNotifier) Notify(ctx context.Context, alerts ...*types.Alert) err
}
}
return n.Notifier.Notify(ctx, filtered...)
return n.notifier.Notify(ctx, filtered...)
}
type LogNotifier struct {