notify: notify resolved alerts properly (#1408)

* notify: notify resolved alerts properly

The PR #1205 while fixing an existing issue introduced another bug when
the send_resolved flag of the integration is set to true.

With send_resolved set to false, the semantics remain the same:
AlertManager generates a notification when new firing alerts are added
to the alert group. The notification only carries firing alerts.

With send_resolved set to true, AlertManager generates a notification
when new firing or resolved alerts are added to the alert group. The
notification carries both the firing and resolved notifications.

Signed-off-by: Simon Pasquier <spasquie@redhat.com>

* Fix comments

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2018-06-08 11:37:38 +02:00 committed by stuart nelson
parent 9f87f9d6e7
commit b7d891cf39
6 changed files with 248 additions and 87 deletions

View File

@ -21,6 +21,21 @@ func (m *Entry) IsFiringSubset(subset map[uint64]struct{}) bool {
set[m.FiringAlerts[i]] = struct{}{}
}
return isSubset(set, subset)
}
// IsResolvedSubset returns whether the given subset is a subset of the alerts
// that were resolved at the time of the last notification.
func (m *Entry) IsResolvedSubset(subset map[uint64]struct{}) bool {
set := map[uint64]struct{}{}
for i := range m.ResolvedAlerts {
set[m.ResolvedAlerts[i]] = struct{}{}
}
return isSubset(set, subset)
}
func isSubset(set, subset map[uint64]struct{}) bool {
for k := range subset {
_, exists := set[k]
if !exists {

View File

@ -45,6 +45,34 @@ func TestIsFiringSubset(t *testing.T) {
}
}
func TestIsResolvedSubset(t *testing.T) {
e := &Entry{
ResolvedAlerts: []uint64{1, 2, 3},
}
tests := []struct {
subset map[uint64]struct{}
expected bool
}{
{newSubset(), true}, //empty subset
{newSubset(1), true},
{newSubset(2), true},
{newSubset(3), true},
{newSubset(1, 2), true},
{newSubset(1, 2), true},
{newSubset(1, 2, 3), true},
{newSubset(4), false},
{newSubset(1, 5), false},
{newSubset(1, 2, 3, 6), false},
}
for _, test := range tests {
if result := e.IsResolvedSubset(test.subset); result != test.expected {
t.Errorf("Expected %t, got %t for subset %v", test.expected, result, elements(test.subset))
}
}
}
func newSubset(elements ...uint64) map[uint64]struct{} {
subset := make(map[uint64]struct{})
for _, el := range elements {

View File

@ -45,10 +45,6 @@ import (
"github.com/prometheus/alertmanager/types"
)
type notifierConfig interface {
SendResolved() bool
}
// A Notifier notifies about alerts under constraints of the given context.
// It returns an error if unsuccessful and a flag whether the error is
// recoverable. This information is useful for a retry logic.
@ -67,24 +63,7 @@ type Integration struct {
// Notify implements the Notifier interface.
func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
var res []*types.Alert
// Resolved alerts have to be filtered only at this point, because they need
// to end up unfiltered in the SetNotifiesStage.
if i.conf.SendResolved() {
res = alerts
} else {
for _, a := range alerts {
if a.Status() != model.AlertResolved {
res = append(res, a)
}
}
}
if len(res) == 0 {
return false, nil
}
return i.notifier.Notify(ctx, res...)
return i.notifier.Notify(ctx, alerts...)
}
// BuildReceiverIntegrations builds a list of integration notifiers off of a

View File

@ -91,6 +91,10 @@ func init() {
prometheus.Register(notificationLatencySeconds)
}
type notifierConfig interface {
SendResolved() bool
}
// MinTimeout is the minimum timeout that is set for the context of a call
// to a notification pipeline.
const MinTimeout = 10 * time.Second
@ -262,7 +266,7 @@ func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(notificationLog, recv))
s = append(s, NewDedupStage(i, notificationLog, recv))
s = append(s, NewRetryStage(i, rc.Name))
s = append(s, NewSetNotifiesStage(notificationLog, recv))
@ -452,16 +456,18 @@ func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al
type DedupStage struct {
nflog NotificationLog
recv *nflogpb.Receiver
conf notifierConfig
now func() time.Time
hash func(*types.Alert) uint64
}
// NewDedupStage wraps a DedupStage that runs against the given notification log.
func NewDedupStage(l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
func NewDedupStage(i Integration, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
return &DedupStage{
nflog: l,
recv: recv,
conf: i.conf,
now: utcNow,
hash: hashAlert,
}
@ -511,28 +517,34 @@ func hashAlert(a *types.Alert) uint64 {
return hash
}
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) (bool, error) {
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
// If we haven't notified about the alert group before, notify right away
// unless we only have resolved alerts.
if entry == nil {
return len(firing) > 0, nil
return len(firing) > 0
}
if !entry.IsFiringSubset(firing) {
return true, nil
return true
}
// Notify about all alerts being resolved. If the current alert group and
// last notification contain no firing alert, it means that some alerts
// have been fired and resolved during the last group_wait interval. In
// this case, there is no need to notify the receiver since it doesn't know
// about them.
// Notify about all alerts being resolved.
// This is done irrespective of the send_resolved flag to make sure that
// the firing alerts are cleared from the notification log.
if len(firing) == 0 {
return len(entry.FiringAlerts) > 0, nil
// If the current alert group and last notification contain no firing
// alert, it means that some alerts have been fired and resolved during the
// last interval. In this case, there is no need to notify the receiver
// since it doesn't know about them.
return len(entry.FiringAlerts) > 0
}
if n.conf.SendResolved() && !entry.IsResolvedSubset(resolved) {
return true
}
// Nothing changed, only notify if the repeat interval has passed.
return entry.Timestamp.Before(n.now().Add(-repeat)), nil
return entry.Timestamp.Before(n.now().Add(-repeat))
}
// Exec implements the Stage interface.
@ -580,9 +592,7 @@ func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al
case 2:
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}
if ok, err := n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval); err != nil {
return ctx, nil, err
} else if ok {
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
return ctx, alerts, nil
}
return ctx, nil, nil
@ -605,17 +615,26 @@ func NewRetryStage(i Integration, groupName string) *RetryStage {
// Exec implements the Stage interface.
func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var sent []*types.Alert
// If we shouldn't send notifications for resolved alerts, but there are only
// resolved alerts, report them all as successfully notified (we still want the
// notification log to log them).
// notification log to log them for the next run of DedupStage).
if !r.integration.conf.SendResolved() {
firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, alerts, fmt.Errorf("firing alerts missing")
return ctx, nil, fmt.Errorf("firing alerts missing")
}
if len(firing) == 0 {
return ctx, alerts, nil
}
for _, a := range alerts {
if a.Status() != model.AlertResolved {
sent = append(sent, a)
}
}
} else {
sent = alerts
}
var (
@ -642,7 +661,7 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
select {
case <-tick.C:
now := time.Now()
retry, err := r.integration.Notify(ctx, alerts...)
retry, err := r.integration.Notify(ctx, sent...)
notificationLatencySeconds.WithLabelValues(r.integration.name).Observe(time.Since(now).Seconds())
if err != nil {
numFailedNotifications.WithLabelValues(r.integration.name).Inc()

View File

@ -91,26 +91,35 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
firingAlerts map[uint64]struct{}
resolvedAlerts map[uint64]struct{}
repeat time.Duration
resolve bool
res bool
resErr bool
res bool
}{
{
// No matching nflog entry should update.
entry: nil,
firingAlerts: alertHashSet(2, 3, 4),
res: true,
}, {
// No matching nflog entry shouldn't update if no alert fires.
entry: nil,
resolvedAlerts: alertHashSet(2, 3, 4),
res: false,
}, {
// Different sets of firing alerts should update.
entry: &nflogpb.Entry{FiringAlerts: []uint64{1, 2, 3}},
firingAlerts: alertHashSet(2, 3, 4),
res: true,
}, {
// Zero timestamp in the nflog entry should always update.
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2, 3},
Timestamp: time.Time{}, // zero timestamp should always update
Timestamp: time.Time{},
},
firingAlerts: alertHashSet(1, 2, 3),
res: true,
}, {
// Identical sets of alerts shouldn't update before repeat_interval.
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2, 3},
Timestamp: now.Add(-9 * time.Minute),
@ -119,6 +128,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
firingAlerts: alertHashSet(1, 2, 3),
res: false,
}, {
// Identical sets of alerts should update after repeat_interval.
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2, 3},
Timestamp: now.Add(-11 * time.Minute),
@ -127,24 +137,17 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
firingAlerts: alertHashSet(1, 2, 3),
res: true,
}, {
// Different sets of resolved alerts without firing alerts shouldn't update after repeat_interval.
entry: &nflogpb.Entry{
ResolvedAlerts: []uint64{1, 2, 3},
Timestamp: now.Add(-11 * time.Minute),
},
repeat: 10 * time.Minute,
resolvedAlerts: alertHashSet(3, 4, 5),
resolve: true,
res: false,
}, {
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2},
ResolvedAlerts: []uint64{3},
Timestamp: now.Add(-11 * time.Minute),
},
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(1),
resolvedAlerts: alertHashSet(2, 3),
res: true,
}, {
// Different sets of resolved alerts shouldn't update when resolve is false.
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2},
ResolvedAlerts: []uint64{3},
@ -153,8 +156,22 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(1),
resolvedAlerts: alertHashSet(2, 3),
resolve: false,
res: false,
}, {
// Different sets of resolved alerts should update when resolve is true.
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2},
ResolvedAlerts: []uint64{3},
Timestamp: now.Add(-9 * time.Minute),
},
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(1),
resolvedAlerts: alertHashSet(2, 3),
resolve: true,
res: true,
}, {
// Empty set of firing alerts should update when resolve is false.
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2},
ResolvedAlerts: []uint64{3},
@ -163,6 +180,19 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(),
resolvedAlerts: alertHashSet(1, 2, 3),
resolve: false,
res: true,
}, {
// Empty set of firing alerts should update when resolve is true.
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2},
ResolvedAlerts: []uint64{3},
Timestamp: now.Add(-9 * time.Minute),
},
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(),
resolvedAlerts: alertHashSet(1, 2, 3),
resolve: true,
res: true,
},
}
@ -170,15 +200,11 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
t.Log("case", i)
s := &DedupStage{
now: func() time.Time { return now },
now: func() time.Time { return now },
conf: notifierConfigFunc(func() bool { return c.resolve }),
}
ok, err := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat)
if c.resErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, c.res, ok)
res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat)
require.Equal(t, c.res, res)
}
}
@ -194,6 +220,7 @@ func TestDedupStage(t *testing.T) {
now: func() time.Time {
return now
},
conf: notifierConfigFunc(func() bool { return false }),
}
ctx := context.Background()
@ -344,16 +371,62 @@ func TestRoutingStage(t *testing.T) {
}
}
func TestIntegrationNoResolved(t *testing.T) {
res := []*types.Alert{}
r := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) {
res = append(res, alerts...)
return false, nil
})
func TestRetryStageWithError(t *testing.T) {
fail, retry := true, true
sent := []*types.Alert{}
i := Integration{
notifier: r,
conf: notifierConfigFunc(func() bool { return false }),
notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) {
if fail {
fail = false
return retry, errors.New("fail to deliver notification")
}
sent = append(sent, alerts...)
return false, nil
}),
conf: notifierConfigFunc(func() bool { return false }),
}
r := RetryStage{
integration: i,
}
alerts := []*types.Alert{
&types.Alert{
Alert: model.Alert{
EndsAt: time.Now().Add(time.Hour),
},
},
}
ctx := context.Background()
ctx = WithFiringAlerts(ctx, []uint64{0})
// Notify with a recoverable error should retry and succeed.
resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...)
require.Nil(t, err)
require.Equal(t, alerts, res)
require.Equal(t, alerts, sent)
require.NotNil(t, resctx)
// Notify with an unrecoverable error should fail.
sent = sent[:0]
fail = true
retry = false
resctx, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...)
require.NotNil(t, err)
require.NotNil(t, resctx)
}
func TestRetryStageNoResolved(t *testing.T) {
sent := []*types.Alert{}
i := Integration{
notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) {
sent = append(sent, alerts...)
return false, nil
}),
conf: notifierConfigFunc(func() bool { return false }),
}
r := RetryStage{
integration: i,
}
alerts := []*types.Alert{
@ -369,21 +442,44 @@ func TestIntegrationNoResolved(t *testing.T) {
},
}
i.Notify(nil, alerts...)
ctx := context.Background()
require.Equal(t, len(res), 1)
resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...)
require.EqualError(t, err, "firing alerts missing")
require.Nil(t, res)
require.NotNil(t, resctx)
ctx = WithFiringAlerts(ctx, []uint64{0})
resctx, res, err = r.Exec(ctx, log.NewNopLogger(), alerts...)
require.Nil(t, err)
require.Equal(t, alerts, res)
require.Equal(t, []*types.Alert{alerts[1]}, sent)
require.NotNil(t, resctx)
// All alerts are resolved.
sent = sent[:0]
ctx = WithFiringAlerts(ctx, []uint64{})
alerts[1].Alert.EndsAt = time.Now().Add(-time.Hour)
resctx, res, err = r.Exec(ctx, log.NewNopLogger(), alerts...)
require.Nil(t, err)
require.Equal(t, alerts, res)
require.Equal(t, []*types.Alert{}, sent)
require.NotNil(t, resctx)
}
func TestIntegrationSendResolved(t *testing.T) {
res := []*types.Alert{}
r := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) {
res = append(res, alerts...)
return false, nil
})
func TestRetryStageSendResolved(t *testing.T) {
sent := []*types.Alert{}
i := Integration{
notifier: r,
conf: notifierConfigFunc(func() bool { return true }),
notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) {
sent = append(sent, alerts...)
return false, nil
}),
conf: notifierConfigFunc(func() bool { return true }),
}
r := RetryStage{
integration: i,
}
alerts := []*types.Alert{
@ -392,12 +488,32 @@ func TestIntegrationSendResolved(t *testing.T) {
EndsAt: time.Now().Add(-time.Hour),
},
},
&types.Alert{
Alert: model.Alert{
EndsAt: time.Now().Add(time.Hour),
},
},
}
i.Notify(nil, alerts...)
ctx := context.Background()
ctx = WithFiringAlerts(ctx, []uint64{0})
require.Equal(t, len(res), 1)
require.Equal(t, res, alerts)
resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...)
require.Nil(t, err)
require.Equal(t, alerts, res)
require.Equal(t, alerts, sent)
require.NotNil(t, resctx)
// All alerts are resolved.
sent = sent[:0]
ctx = WithFiringAlerts(ctx, []uint64{})
alerts[1].Alert.EndsAt = time.Now().Add(-time.Hour)
resctx, res, err = r.Exec(ctx, log.NewNopLogger(), alerts...)
require.Nil(t, err)
require.Equal(t, alerts, res)
require.Equal(t, alerts, sent)
require.NotNil(t, resctx)
}
func TestSetNotifiesStage(t *testing.T) {

View File

@ -387,8 +387,11 @@ receivers:
Alert("alertname", "test", "lbl", "v2").Active(1),
Alert("alertname", "test", "lbl", "v3").Active(3),
)
// no notification should be sent after group_interval because no new alert has been fired
co1.Want(Between(12, 12.5))
// Notification should be sent because the v2 alert is resolved due to the time-out.
co1.Want(Between(12, 12.5),
Alert("alertname", "test", "lbl", "v2").Active(1, 11),
Alert("alertname", "test", "lbl", "v3").Active(3),
)
co2.Want(Between(2, 2.5),
Alert("alertname", "test", "lbl", "v1").Active(1),
@ -398,7 +401,8 @@ receivers:
Alert("alertname", "test", "lbl", "v2").Active(1),
Alert("alertname", "test", "lbl", "v3").Active(3),
)
co1.Want(Between(12, 12.5))
// No notification should be sent after group_interval because no new alert has been fired.
co2.Want(Between(12, 12.5))
at.Run()
}