Fix race condition in dispatch.go (#3826)

* Fix race condition in dispatch.go

This commit fixes a race condition in dispatch.go that would cause
a firing alert to be deleted from the aggregation group when instead
it should have been flushed.

The root cause is a race condition that can occur when dispatch.go
deletes resolved alerts from the aggregation group following a
successful notification. If a firing alert with the same
fingerprint is added back to the aggregation group at the same time
then the firing alert can be deleted.

---------

Signed-off-by: George Robinson <george.robinson@grafana.com>
This commit is contained in:
George Robinson 2024-05-07 10:34:03 +01:00 committed by GitHub
parent d7ad5e12f8
commit ca4c90eb4e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 145 additions and 38 deletions

View File

@ -501,12 +501,15 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
var ( var (
alerts = ag.alerts.List() alerts = ag.alerts.List()
alertsSlice = make(types.AlertSlice, 0, len(alerts)) alertsSlice = make(types.AlertSlice, 0, len(alerts))
resolvedSlice = make(types.AlertSlice, 0, len(alerts))
now = time.Now() now = time.Now()
) )
for _, alert := range alerts { for _, alert := range alerts {
a := *alert a := *alert
// Ensure that alerts don't resolve as time move forwards. // Ensure that alerts don't resolve as time move forwards.
if !a.ResolvedAt(now) { if a.ResolvedAt(now) {
resolvedSlice = append(resolvedSlice, &a)
} else {
a.EndsAt = time.Time{} a.EndsAt = time.Time{}
} }
alertsSlice = append(alertsSlice, &a) alertsSlice = append(alertsSlice, &a)
@ -516,21 +519,12 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
level.Debug(ag.logger).Log("msg", "flushing", "alerts", fmt.Sprintf("%v", alertsSlice)) level.Debug(ag.logger).Log("msg", "flushing", "alerts", fmt.Sprintf("%v", alertsSlice))
if notify(alertsSlice...) { if notify(alertsSlice...) {
for _, a := range alertsSlice { // Delete all resolved alerts as we just sent a notification for them,
// Only delete if the fingerprint has not been inserted // and we don't want to send another one. However, we need to make sure
// again since we notified about it. // that each resolved alert has not fired again during the flush as then
fp := a.Fingerprint() // we would delete an active alert thinking it was resolved.
got, err := ag.alerts.Get(fp) if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil {
if err != nil { level.Error(ag.logger).Log("msg", "error on delete alerts", "err", err)
// This should never happen.
level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String())
continue
}
if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
if err := ag.alerts.Delete(fp); err != nil {
level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String())
}
}
} }
} }
} }

View File

@ -236,9 +236,33 @@ func TestAggrGroup(t *testing.T) {
} }
} }
// Resolve all alerts, they should be removed after the next batch was sent. // Resolve an alert, and it should be removed after the next batch was sent.
a1r, a2r, a3r := *a1, *a2, *a3 a1r := *a1
resolved := types.AlertSlice{&a1r, &a2r, &a3r} a1r.EndsAt = time.Now()
ag.insert(&a1r)
exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...)
select {
case <-time.After(2 * opts.GroupInterval):
t.Fatalf("expected new batch after group interval but received none")
case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
sort.Sort(batch)
if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
}
// Resolve all remaining alerts, they should be removed after the next batch was sent.
// Do not add a1r as it should have been deleted following the previous batch.
a2r, a3r := *a2, *a3
resolved := types.AlertSlice{&a2r, &a3r}
for _, a := range resolved { for _, a := range resolved {
a.EndsAt = time.Now() a.EndsAt = time.Now()
ag.insert(a) ag.insert(a)

View File

@ -114,12 +114,17 @@ func (a *Alerts) Set(alert *types.Alert) error {
return nil return nil
} }
// Delete removes the Alert with the matching fingerprint from the store. // DeleteIfNotModified deletes the slice of Alerts from the store if not
func (a *Alerts) Delete(fp model.Fingerprint) error { // modified.
func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error {
a.Lock() a.Lock()
defer a.Unlock() defer a.Unlock()
for _, alert := range alerts {
fp := alert.Fingerprint()
if other, ok := a.c[fp]; ok && alert.UpdatedAt == other.UpdatedAt {
delete(a.c, fp) delete(a.c, fp)
}
}
return nil return nil
} }

View File

@ -37,21 +37,105 @@ func TestSetGet(t *testing.T) {
require.Equal(t, want, got.Fingerprint()) require.Equal(t, want, got.Fingerprint())
} }
func TestDelete(t *testing.T) { func TestDeleteIfNotModified(t *testing.T) {
t.Run("unmodified alert should be deleted", func(t *testing.T) {
a := NewAlerts() a := NewAlerts()
alert := &types.Alert{ a1 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"foo": "bar",
},
},
UpdatedAt: time.Now().Add(-time.Second),
}
require.NoError(t, a.Set(a1))
// a1 should be deleted as it has not been modified.
a.DeleteIfNotModified(types.AlertSlice{a1})
got, err := a.Get(a1.Fingerprint())
require.Equal(t, ErrNotFound, err)
require.Nil(t, got)
})
t.Run("modified alert should not be deleted", func(t *testing.T) {
a := NewAlerts()
a1 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"foo": "bar",
},
},
UpdatedAt: time.Now(), UpdatedAt: time.Now(),
} }
require.NoError(t, a.Set(alert)) require.NoError(t, a.Set(a1))
fp := alert.Fingerprint() // Make a copy of a1 that is older, but do not put it.
// We want to make sure a1 is not deleted.
err := a.Delete(fp) a2 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"foo": "bar",
},
},
UpdatedAt: time.Now().Add(-time.Second),
}
require.True(t, a2.UpdatedAt.Before(a1.UpdatedAt))
a.DeleteIfNotModified(types.AlertSlice{a2})
// a1 should not be deleted.
got, err := a.Get(a1.Fingerprint())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, a1, got)
got, err := a.Get(fp) // Make another copy of a1 that is older, but do not put it.
require.Nil(t, got) // We want to make sure a2 is not deleted here either.
a3 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"foo": "bar",
},
},
UpdatedAt: time.Now().Add(time.Second),
}
require.True(t, a3.UpdatedAt.After(a1.UpdatedAt))
a.DeleteIfNotModified(types.AlertSlice{a3})
// a1 should not be deleted.
got, err = a.Get(a1.Fingerprint())
require.NoError(t, err)
require.Equal(t, a1, got)
})
t.Run("should not delete other alerts", func(t *testing.T) {
a := NewAlerts()
a1 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"foo": "bar",
},
},
UpdatedAt: time.Now(),
}
a2 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"bar": "baz",
},
},
UpdatedAt: time.Now(),
}
require.NoError(t, a.Set(a1))
require.NoError(t, a.Set(a2))
// Deleting a1 should not delete a2.
require.NoError(t, a.DeleteIfNotModified(types.AlertSlice{a1}))
// a1 should be deleted.
got, err := a.Get(a1.Fingerprint())
require.Equal(t, ErrNotFound, err) require.Equal(t, ErrNotFound, err)
require.Nil(t, got)
// a2 should not be deleted.
got, err = a.Get(a2.Fingerprint())
require.NoError(t, err)
require.Equal(t, a2, got)
})
} }
func TestGC(t *testing.T) { func TestGC(t *testing.T) {