Merge pull request #525 from brancz/fix-resolved-filter
notify: move resolved alert filtering to integration
This commit is contained in:
commit
2433eebf48
|
@ -62,6 +62,23 @@ type Integration struct {
|
|||
|
||||
// Notify implements the Notifier interface.
|
||||
func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
|
||||
var res []*types.Alert
|
||||
|
||||
// Resolved alerts have to be filtered only at this point, because they need
|
||||
// to end up unfiltered in the SetNotifiesStage.
|
||||
if i.conf.SendResolved() {
|
||||
res = alerts
|
||||
} else {
|
||||
for _, a := range alerts {
|
||||
if a.Status() != model.AlertResolved {
|
||||
res = append(res, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(res) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return i.notifier.Notify(ctx, alerts...)
|
||||
}
|
||||
|
||||
|
|
|
@ -206,7 +206,6 @@ func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.
|
|||
}
|
||||
var s MultiStage
|
||||
s = append(s, NewWaitStage(wait))
|
||||
s = append(s, NewFilterResolvedStage(i.conf))
|
||||
s = append(s, NewDedupStage(notificationLog, recv))
|
||||
s = append(s, NewRetryStage(i))
|
||||
s = append(s, NewSetNotifiesStage(notificationLog, recv))
|
||||
|
@ -380,36 +379,6 @@ func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.
|
|||
return ctx, alerts, nil
|
||||
}
|
||||
|
||||
// FilterResolvedStage filters alerts based on a given notifierConfig. Either
|
||||
// returns all alerts or only those that are not resolved.
|
||||
type FilterResolvedStage struct {
|
||||
conf notifierConfig
|
||||
}
|
||||
|
||||
// NewFilterRecolvedStage returns a new instance of a FilterResolvedStage.
|
||||
func NewFilterResolvedStage(conf notifierConfig) *FilterResolvedStage {
|
||||
return &FilterResolvedStage{
|
||||
conf: conf,
|
||||
}
|
||||
}
|
||||
|
||||
// Exec implements the Stage interface.
|
||||
func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
|
||||
var res []*types.Alert
|
||||
|
||||
if fr.conf.SendResolved() {
|
||||
res = alerts
|
||||
} else {
|
||||
for _, a := range alerts {
|
||||
if a.Status() != model.AlertResolved {
|
||||
res = append(res, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ctx, res, nil
|
||||
}
|
||||
|
||||
// DedupStage filters alerts.
|
||||
// Filtering happens based on a notification log.
|
||||
type DedupStage struct {
|
||||
|
|
|
@ -33,6 +33,18 @@ import (
|
|||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
type notifierConfigFunc func() bool
|
||||
|
||||
func (f notifierConfigFunc) SendResolved() bool {
|
||||
return f()
|
||||
}
|
||||
|
||||
type notifierFunc func(ctx context.Context, alerts ...*types.Alert) (bool, error)
|
||||
|
||||
func (f notifierFunc) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
|
||||
return f(ctx, alerts...)
|
||||
}
|
||||
|
||||
type failStage struct{}
|
||||
|
||||
func (s failStage) Exec(ctx context.Context, as ...*types.Alert) (context.Context, []*types.Alert, error) {
|
||||
|
@ -283,6 +295,39 @@ func TestRoutingStage(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIntegration(t *testing.T) {
|
||||
res := []*types.Alert{}
|
||||
r := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) {
|
||||
res = append(res, alerts...)
|
||||
|
||||
return false, nil
|
||||
})
|
||||
i1 := Integration{
|
||||
notifier: r,
|
||||
conf: notifierConfigFunc(func() bool { return false }),
|
||||
}
|
||||
i2 := Integration{
|
||||
notifier: r,
|
||||
conf: notifierConfigFunc(func() bool { return true }),
|
||||
}
|
||||
|
||||
alerts := []*types.Alert{
|
||||
&types.Alert{
|
||||
Alert: model.Alert{
|
||||
EndsAt: time.Now().Add(-time.Hour),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
i1.Notify(nil, alerts...)
|
||||
i2.Notify(nil, alerts...)
|
||||
|
||||
// Even though the alert is sent to both integrations, which end up being
|
||||
// delivered to the same notifier, only one is actually delivered as the
|
||||
// second integration filters the resolved notifications.
|
||||
require.Equal(t, res, alerts)
|
||||
}
|
||||
|
||||
func TestSetNotifiesStage(t *testing.T) {
|
||||
tnflog := &testNflog{}
|
||||
s := &SetNotifiesStage{
|
||||
|
|
|
@ -325,3 +325,101 @@ receivers:
|
|||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestResolvedFilter(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// This integration test ensures that even though resolved alerts may not be
|
||||
// notified about, they must be set as notified. Resolved alerts, even when
|
||||
// filtered, have to end up in the SetNotifiesStage, otherwise when an alert
|
||||
// fires again it is ambiguous whether it was resolved in between or not.
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(10)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
go func() {
|
||||
conf := `
|
||||
global:
|
||||
resolve_timeout: 10s
|
||||
|
||||
route:
|
||||
receiver: "default"
|
||||
group_by: [alertname]
|
||||
group_wait: 1s
|
||||
group_interval: 5s
|
||||
|
||||
receivers:
|
||||
- name: "default"
|
||||
webhook_configs:
|
||||
- url: 'http://%s'
|
||||
send_resolved: true
|
||||
- url: 'http://%s'
|
||||
send_resolved: false
|
||||
`
|
||||
|
||||
at := NewAcceptanceTest(t, &AcceptanceOpts{
|
||||
Tolerance: 150 * time.Millisecond,
|
||||
})
|
||||
|
||||
co1 := at.Collector("webhook1")
|
||||
wh1 := NewWebhook(co1)
|
||||
|
||||
co2 := at.Collector("webhook2")
|
||||
wh2 := NewWebhook(co2)
|
||||
|
||||
am := at.Alertmanager(fmt.Sprintf(conf, wh1.Address(), wh2.Address()))
|
||||
|
||||
am.Push(At(1),
|
||||
Alert("alertname", "test", "lbl", "v1"),
|
||||
Alert("alertname", "test", "lbl", "v2"),
|
||||
Alert("alertname", "test", "lbl", "v3"),
|
||||
)
|
||||
|
||||
am.Push(At(16),
|
||||
Alert("alertname", "test", "lbl", "v1"),
|
||||
Alert("alertname", "test", "lbl", "v2"),
|
||||
Alert("alertname", "test", "lbl", "v3"),
|
||||
)
|
||||
|
||||
co1.Want(Between(2, 2.5),
|
||||
Alert("alertname", "test", "lbl", "v1").Active(1),
|
||||
Alert("alertname", "test", "lbl", "v2").Active(1),
|
||||
Alert("alertname", "test", "lbl", "v3").Active(1),
|
||||
)
|
||||
co1.Want(Between(12, 13),
|
||||
Alert("alertname", "test", "lbl", "v1").Active(1, 11),
|
||||
Alert("alertname", "test", "lbl", "v2").Active(1, 11),
|
||||
Alert("alertname", "test", "lbl", "v3").Active(1, 11),
|
||||
)
|
||||
|
||||
co1.Want(Between(17, 17.5),
|
||||
Alert("alertname", "test", "lbl", "v1").Active(16),
|
||||
Alert("alertname", "test", "lbl", "v2").Active(16),
|
||||
Alert("alertname", "test", "lbl", "v3").Active(16),
|
||||
)
|
||||
co1.Want(Between(27, 28),
|
||||
Alert("alertname", "test", "lbl", "v1").Active(16, 26),
|
||||
Alert("alertname", "test", "lbl", "v2").Active(16, 26),
|
||||
Alert("alertname", "test", "lbl", "v3").Active(16, 26),
|
||||
)
|
||||
|
||||
co2.Want(Between(2, 2.5),
|
||||
Alert("alertname", "test", "lbl", "v1").Active(1),
|
||||
Alert("alertname", "test", "lbl", "v2").Active(1),
|
||||
Alert("alertname", "test", "lbl", "v3").Active(1),
|
||||
)
|
||||
|
||||
co2.Want(Between(17, 17.5),
|
||||
Alert("alertname", "test", "lbl", "v1").Active(16),
|
||||
Alert("alertname", "test", "lbl", "v2").Active(16),
|
||||
Alert("alertname", "test", "lbl", "v3").Active(16),
|
||||
)
|
||||
|
||||
at.Run()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue