From b7d891cf39cd2d53b85fe331eda9a96b0acede3e Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Fri, 8 Jun 2018 11:37:38 +0200 Subject: [PATCH] notify: notify resolved alerts properly (#1408) * notify: notify resolved alerts properly The PR #1205 while fixing an existing issue introduced another bug when the send_resolved flag of the integration is set to true. With send_resolved set to false, the semantics remain the same: AlertManager generates a notification when new firing alerts are added to the alert group. The notification only carries firing alerts. With send_resolved set to true, AlertManager generates a notification when new firing or resolved alerts are added to the alert group. The notification carries both the firing and resolved notifications. Signed-off-by: Simon Pasquier * Fix comments Signed-off-by: Simon Pasquier --- nflog/nflogpb/set.go | 15 +++ nflog/nflogpb/set_test.go | 28 +++++ notify/impl.go | 23 +--- notify/notify.go | 55 ++++++---- notify/notify_test.go | 204 +++++++++++++++++++++++++++-------- test/acceptance/send_test.go | 10 +- 6 files changed, 248 insertions(+), 87 deletions(-) diff --git a/nflog/nflogpb/set.go b/nflog/nflogpb/set.go index 1c5eb3fc..698f7f2d 100644 --- a/nflog/nflogpb/set.go +++ b/nflog/nflogpb/set.go @@ -21,6 +21,21 @@ func (m *Entry) IsFiringSubset(subset map[uint64]struct{}) bool { set[m.FiringAlerts[i]] = struct{}{} } + return isSubset(set, subset) +} + +// IsResolvedSubset returns whether the given subset is a subset of the alerts +// that were resolved at the time of the last notification. +func (m *Entry) IsResolvedSubset(subset map[uint64]struct{}) bool { + set := map[uint64]struct{}{} + for i := range m.ResolvedAlerts { + set[m.ResolvedAlerts[i]] = struct{}{} + } + + return isSubset(set, subset) +} + +func isSubset(set, subset map[uint64]struct{}) bool { for k := range subset { _, exists := set[k] if !exists { diff --git a/nflog/nflogpb/set_test.go b/nflog/nflogpb/set_test.go index 3f895574..49fe1d33 100644 --- a/nflog/nflogpb/set_test.go +++ b/nflog/nflogpb/set_test.go @@ -45,6 +45,34 @@ func TestIsFiringSubset(t *testing.T) { } } +func TestIsResolvedSubset(t *testing.T) { + e := &Entry{ + ResolvedAlerts: []uint64{1, 2, 3}, + } + + tests := []struct { + subset map[uint64]struct{} + expected bool + }{ + {newSubset(), true}, //empty subset + {newSubset(1), true}, + {newSubset(2), true}, + {newSubset(3), true}, + {newSubset(1, 2), true}, + {newSubset(1, 2), true}, + {newSubset(1, 2, 3), true}, + {newSubset(4), false}, + {newSubset(1, 5), false}, + {newSubset(1, 2, 3, 6), false}, + } + + for _, test := range tests { + if result := e.IsResolvedSubset(test.subset); result != test.expected { + t.Errorf("Expected %t, got %t for subset %v", test.expected, result, elements(test.subset)) + } + } +} + func newSubset(elements ...uint64) map[uint64]struct{} { subset := make(map[uint64]struct{}) for _, el := range elements { diff --git a/notify/impl.go b/notify/impl.go index 0a29814b..ed12f391 100644 --- a/notify/impl.go +++ b/notify/impl.go @@ -45,10 +45,6 @@ import ( "github.com/prometheus/alertmanager/types" ) -type notifierConfig interface { - SendResolved() bool -} - // A Notifier notifies about alerts under constraints of the given context. // It returns an error if unsuccessful and a flag whether the error is // recoverable. This information is useful for a retry logic. @@ -67,24 +63,7 @@ type Integration struct { // Notify implements the Notifier interface. func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) { - var res []*types.Alert - - // Resolved alerts have to be filtered only at this point, because they need - // to end up unfiltered in the SetNotifiesStage. - if i.conf.SendResolved() { - res = alerts - } else { - for _, a := range alerts { - if a.Status() != model.AlertResolved { - res = append(res, a) - } - } - } - if len(res) == 0 { - return false, nil - } - - return i.notifier.Notify(ctx, res...) + return i.notifier.Notify(ctx, alerts...) } // BuildReceiverIntegrations builds a list of integration notifiers off of a diff --git a/notify/notify.go b/notify/notify.go index 6d333cf6..75b791e6 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -91,6 +91,10 @@ func init() { prometheus.Register(notificationLatencySeconds) } +type notifierConfig interface { + SendResolved() bool +} + // MinTimeout is the minimum timeout that is set for the context of a call // to a notification pipeline. const MinTimeout = 10 * time.Second @@ -262,7 +266,7 @@ func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time. } var s MultiStage s = append(s, NewWaitStage(wait)) - s = append(s, NewDedupStage(notificationLog, recv)) + s = append(s, NewDedupStage(i, notificationLog, recv)) s = append(s, NewRetryStage(i, rc.Name)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) @@ -452,16 +456,18 @@ func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al type DedupStage struct { nflog NotificationLog recv *nflogpb.Receiver + conf notifierConfig now func() time.Time hash func(*types.Alert) uint64 } // NewDedupStage wraps a DedupStage that runs against the given notification log. -func NewDedupStage(l NotificationLog, recv *nflogpb.Receiver) *DedupStage { +func NewDedupStage(i Integration, l NotificationLog, recv *nflogpb.Receiver) *DedupStage { return &DedupStage{ nflog: l, recv: recv, + conf: i.conf, now: utcNow, hash: hashAlert, } @@ -511,28 +517,34 @@ func hashAlert(a *types.Alert) uint64 { return hash } -func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) (bool, error) { +func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool { // If we haven't notified about the alert group before, notify right away // unless we only have resolved alerts. if entry == nil { - return len(firing) > 0, nil + return len(firing) > 0 } if !entry.IsFiringSubset(firing) { - return true, nil + return true } - // Notify about all alerts being resolved. If the current alert group and - // last notification contain no firing alert, it means that some alerts - // have been fired and resolved during the last group_wait interval. In - // this case, there is no need to notify the receiver since it doesn't know - // about them. + // Notify about all alerts being resolved. + // This is done irrespective of the send_resolved flag to make sure that + // the firing alerts are cleared from the notification log. if len(firing) == 0 { - return len(entry.FiringAlerts) > 0, nil + // If the current alert group and last notification contain no firing + // alert, it means that some alerts have been fired and resolved during the + // last interval. In this case, there is no need to notify the receiver + // since it doesn't know about them. + return len(entry.FiringAlerts) > 0 + } + + if n.conf.SendResolved() && !entry.IsResolvedSubset(resolved) { + return true } // Nothing changed, only notify if the repeat interval has passed. - return entry.Timestamp.Before(n.now().Add(-repeat)), nil + return entry.Timestamp.Before(n.now().Add(-repeat)) } // Exec implements the Stage interface. @@ -580,9 +592,7 @@ func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al case 2: return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries)) } - if ok, err := n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval); err != nil { - return ctx, nil, err - } else if ok { + if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) { return ctx, alerts, nil } return ctx, nil, nil @@ -605,17 +615,26 @@ func NewRetryStage(i Integration, groupName string) *RetryStage { // Exec implements the Stage interface. func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { + var sent []*types.Alert + // If we shouldn't send notifications for resolved alerts, but there are only // resolved alerts, report them all as successfully notified (we still want the - // notification log to log them). + // notification log to log them for the next run of DedupStage). if !r.integration.conf.SendResolved() { firing, ok := FiringAlerts(ctx) if !ok { - return ctx, alerts, fmt.Errorf("firing alerts missing") + return ctx, nil, fmt.Errorf("firing alerts missing") } if len(firing) == 0 { return ctx, alerts, nil } + for _, a := range alerts { + if a.Status() != model.AlertResolved { + sent = append(sent, a) + } + } + } else { + sent = alerts } var ( @@ -642,7 +661,7 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale select { case <-tick.C: now := time.Now() - retry, err := r.integration.Notify(ctx, alerts...) + retry, err := r.integration.Notify(ctx, sent...) notificationLatencySeconds.WithLabelValues(r.integration.name).Observe(time.Since(now).Seconds()) if err != nil { numFailedNotifications.WithLabelValues(r.integration.name).Inc() diff --git a/notify/notify_test.go b/notify/notify_test.go index 09b2f4ca..276a2449 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -91,26 +91,35 @@ func TestDedupStageNeedsUpdate(t *testing.T) { firingAlerts map[uint64]struct{} resolvedAlerts map[uint64]struct{} repeat time.Duration + resolve bool - res bool - resErr bool + res bool }{ { + // No matching nflog entry should update. entry: nil, firingAlerts: alertHashSet(2, 3, 4), res: true, }, { + // No matching nflog entry shouldn't update if no alert fires. + entry: nil, + resolvedAlerts: alertHashSet(2, 3, 4), + res: false, + }, { + // Different sets of firing alerts should update. entry: &nflogpb.Entry{FiringAlerts: []uint64{1, 2, 3}}, firingAlerts: alertHashSet(2, 3, 4), res: true, }, { + // Zero timestamp in the nflog entry should always update. entry: &nflogpb.Entry{ FiringAlerts: []uint64{1, 2, 3}, - Timestamp: time.Time{}, // zero timestamp should always update + Timestamp: time.Time{}, }, firingAlerts: alertHashSet(1, 2, 3), res: true, }, { + // Identical sets of alerts shouldn't update before repeat_interval. entry: &nflogpb.Entry{ FiringAlerts: []uint64{1, 2, 3}, Timestamp: now.Add(-9 * time.Minute), @@ -119,6 +128,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) { firingAlerts: alertHashSet(1, 2, 3), res: false, }, { + // Identical sets of alerts should update after repeat_interval. entry: &nflogpb.Entry{ FiringAlerts: []uint64{1, 2, 3}, Timestamp: now.Add(-11 * time.Minute), @@ -127,24 +137,17 @@ func TestDedupStageNeedsUpdate(t *testing.T) { firingAlerts: alertHashSet(1, 2, 3), res: true, }, { + // Different sets of resolved alerts without firing alerts shouldn't update after repeat_interval. entry: &nflogpb.Entry{ ResolvedAlerts: []uint64{1, 2, 3}, Timestamp: now.Add(-11 * time.Minute), }, repeat: 10 * time.Minute, resolvedAlerts: alertHashSet(3, 4, 5), + resolve: true, res: false, }, { - entry: &nflogpb.Entry{ - FiringAlerts: []uint64{1, 2}, - ResolvedAlerts: []uint64{3}, - Timestamp: now.Add(-11 * time.Minute), - }, - repeat: 10 * time.Minute, - firingAlerts: alertHashSet(1), - resolvedAlerts: alertHashSet(2, 3), - res: true, - }, { + // Different sets of resolved alerts shouldn't update when resolve is false. entry: &nflogpb.Entry{ FiringAlerts: []uint64{1, 2}, ResolvedAlerts: []uint64{3}, @@ -153,8 +156,22 @@ func TestDedupStageNeedsUpdate(t *testing.T) { repeat: 10 * time.Minute, firingAlerts: alertHashSet(1), resolvedAlerts: alertHashSet(2, 3), + resolve: false, res: false, }, { + // Different sets of resolved alerts should update when resolve is true. + entry: &nflogpb.Entry{ + FiringAlerts: []uint64{1, 2}, + ResolvedAlerts: []uint64{3}, + Timestamp: now.Add(-9 * time.Minute), + }, + repeat: 10 * time.Minute, + firingAlerts: alertHashSet(1), + resolvedAlerts: alertHashSet(2, 3), + resolve: true, + res: true, + }, { + // Empty set of firing alerts should update when resolve is false. entry: &nflogpb.Entry{ FiringAlerts: []uint64{1, 2}, ResolvedAlerts: []uint64{3}, @@ -163,6 +180,19 @@ func TestDedupStageNeedsUpdate(t *testing.T) { repeat: 10 * time.Minute, firingAlerts: alertHashSet(), resolvedAlerts: alertHashSet(1, 2, 3), + resolve: false, + res: true, + }, { + // Empty set of firing alerts should update when resolve is true. + entry: &nflogpb.Entry{ + FiringAlerts: []uint64{1, 2}, + ResolvedAlerts: []uint64{3}, + Timestamp: now.Add(-9 * time.Minute), + }, + repeat: 10 * time.Minute, + firingAlerts: alertHashSet(), + resolvedAlerts: alertHashSet(1, 2, 3), + resolve: true, res: true, }, } @@ -170,15 +200,11 @@ func TestDedupStageNeedsUpdate(t *testing.T) { t.Log("case", i) s := &DedupStage{ - now: func() time.Time { return now }, + now: func() time.Time { return now }, + conf: notifierConfigFunc(func() bool { return c.resolve }), } - ok, err := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat) - if c.resErr { - require.Error(t, err) - } else { - require.NoError(t, err) - } - require.Equal(t, c.res, ok) + res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat) + require.Equal(t, c.res, res) } } @@ -194,6 +220,7 @@ func TestDedupStage(t *testing.T) { now: func() time.Time { return now }, + conf: notifierConfigFunc(func() bool { return false }), } ctx := context.Background() @@ -344,16 +371,62 @@ func TestRoutingStage(t *testing.T) { } } -func TestIntegrationNoResolved(t *testing.T) { - res := []*types.Alert{} - r := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { - res = append(res, alerts...) - - return false, nil - }) +func TestRetryStageWithError(t *testing.T) { + fail, retry := true, true + sent := []*types.Alert{} i := Integration{ - notifier: r, - conf: notifierConfigFunc(func() bool { return false }), + notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + if fail { + fail = false + return retry, errors.New("fail to deliver notification") + } + sent = append(sent, alerts...) + return false, nil + }), + conf: notifierConfigFunc(func() bool { return false }), + } + r := RetryStage{ + integration: i, + } + + alerts := []*types.Alert{ + &types.Alert{ + Alert: model.Alert{ + EndsAt: time.Now().Add(time.Hour), + }, + }, + } + + ctx := context.Background() + ctx = WithFiringAlerts(ctx, []uint64{0}) + + // Notify with a recoverable error should retry and succeed. + resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...) + require.Nil(t, err) + require.Equal(t, alerts, res) + require.Equal(t, alerts, sent) + require.NotNil(t, resctx) + + // Notify with an unrecoverable error should fail. + sent = sent[:0] + fail = true + retry = false + resctx, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.NotNil(t, err) + require.NotNil(t, resctx) +} + +func TestRetryStageNoResolved(t *testing.T) { + sent := []*types.Alert{} + i := Integration{ + notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + sent = append(sent, alerts...) + return false, nil + }), + conf: notifierConfigFunc(func() bool { return false }), + } + r := RetryStage{ + integration: i, } alerts := []*types.Alert{ @@ -369,21 +442,44 @@ func TestIntegrationNoResolved(t *testing.T) { }, } - i.Notify(nil, alerts...) + ctx := context.Background() - require.Equal(t, len(res), 1) + resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...) + require.EqualError(t, err, "firing alerts missing") + require.Nil(t, res) + require.NotNil(t, resctx) + + ctx = WithFiringAlerts(ctx, []uint64{0}) + + resctx, res, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.Nil(t, err) + require.Equal(t, alerts, res) + require.Equal(t, []*types.Alert{alerts[1]}, sent) + require.NotNil(t, resctx) + + // All alerts are resolved. + sent = sent[:0] + ctx = WithFiringAlerts(ctx, []uint64{}) + alerts[1].Alert.EndsAt = time.Now().Add(-time.Hour) + + resctx, res, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.Nil(t, err) + require.Equal(t, alerts, res) + require.Equal(t, []*types.Alert{}, sent) + require.NotNil(t, resctx) } -func TestIntegrationSendResolved(t *testing.T) { - res := []*types.Alert{} - r := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { - res = append(res, alerts...) - - return false, nil - }) +func TestRetryStageSendResolved(t *testing.T) { + sent := []*types.Alert{} i := Integration{ - notifier: r, - conf: notifierConfigFunc(func() bool { return true }), + notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + sent = append(sent, alerts...) + return false, nil + }), + conf: notifierConfigFunc(func() bool { return true }), + } + r := RetryStage{ + integration: i, } alerts := []*types.Alert{ @@ -392,12 +488,32 @@ func TestIntegrationSendResolved(t *testing.T) { EndsAt: time.Now().Add(-time.Hour), }, }, + &types.Alert{ + Alert: model.Alert{ + EndsAt: time.Now().Add(time.Hour), + }, + }, } - i.Notify(nil, alerts...) + ctx := context.Background() + ctx = WithFiringAlerts(ctx, []uint64{0}) - require.Equal(t, len(res), 1) - require.Equal(t, res, alerts) + resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...) + require.Nil(t, err) + require.Equal(t, alerts, res) + require.Equal(t, alerts, sent) + require.NotNil(t, resctx) + + // All alerts are resolved. + sent = sent[:0] + ctx = WithFiringAlerts(ctx, []uint64{}) + alerts[1].Alert.EndsAt = time.Now().Add(-time.Hour) + + resctx, res, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.Nil(t, err) + require.Equal(t, alerts, res) + require.Equal(t, alerts, sent) + require.NotNil(t, resctx) } func TestSetNotifiesStage(t *testing.T) { diff --git a/test/acceptance/send_test.go b/test/acceptance/send_test.go index 53142373..03d59190 100644 --- a/test/acceptance/send_test.go +++ b/test/acceptance/send_test.go @@ -387,8 +387,11 @@ receivers: Alert("alertname", "test", "lbl", "v2").Active(1), Alert("alertname", "test", "lbl", "v3").Active(3), ) - // no notification should be sent after group_interval because no new alert has been fired - co1.Want(Between(12, 12.5)) + // Notification should be sent because the v2 alert is resolved due to the time-out. + co1.Want(Between(12, 12.5), + Alert("alertname", "test", "lbl", "v2").Active(1, 11), + Alert("alertname", "test", "lbl", "v3").Active(3), + ) co2.Want(Between(2, 2.5), Alert("alertname", "test", "lbl", "v1").Active(1), @@ -398,7 +401,8 @@ receivers: Alert("alertname", "test", "lbl", "v2").Active(1), Alert("alertname", "test", "lbl", "v3").Active(3), ) - co1.Want(Between(12, 12.5)) + // No notification should be sent after group_interval because no new alert has been fired. + co2.Want(Between(12, 12.5)) at.Run() }