From ca4c90eb4edbade3b9787a981a07f7af639582ec Mon Sep 17 00:00:00 2001 From: George Robinson Date: Tue, 7 May 2024 10:34:03 +0100 Subject: [PATCH] Fix race condition in dispatch.go (#3826) * Fix race condition in dispatch.go This commit fixes a race condition in dispatch.go that would cause a firing alert to be deleted from the aggregation group when instead it should have been flushed. The root cause is a race condition that can occur when dispatch.go deletes resolved alerts from the aggregation group following a successful notification. If a firing alert with the same fingerprint is added back to the aggregation group at the same time then the firing alert can be deleted. --------- Signed-off-by: George Robinson --- dispatch/dispatch.go | 32 +++++------ dispatch/dispatch_test.go | 30 +++++++++-- store/store.go | 13 +++-- store/store_test.go | 108 +++++++++++++++++++++++++++++++++----- 4 files changed, 145 insertions(+), 38 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 104471e3..822438e0 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -499,14 +499,17 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { } var ( - alerts = ag.alerts.List() - alertsSlice = make(types.AlertSlice, 0, len(alerts)) - now = time.Now() + alerts = ag.alerts.List() + alertsSlice = make(types.AlertSlice, 0, len(alerts)) + resolvedSlice = make(types.AlertSlice, 0, len(alerts)) + now = time.Now() ) for _, alert := range alerts { a := *alert // Ensure that alerts don't resolve as time move forwards. - if !a.ResolvedAt(now) { + if a.ResolvedAt(now) { + resolvedSlice = append(resolvedSlice, &a) + } else { a.EndsAt = time.Time{} } alertsSlice = append(alertsSlice, &a) @@ -516,21 +519,12 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { level.Debug(ag.logger).Log("msg", "flushing", "alerts", fmt.Sprintf("%v", alertsSlice)) if notify(alertsSlice...) { - for _, a := range alertsSlice { - // Only delete if the fingerprint has not been inserted - // again since we notified about it. - fp := a.Fingerprint() - got, err := ag.alerts.Get(fp) - if err != nil { - // This should never happen. - level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String()) - continue - } - if a.Resolved() && got.UpdatedAt == a.UpdatedAt { - if err := ag.alerts.Delete(fp); err != nil { - level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String()) - } - } + // Delete all resolved alerts as we just sent a notification for them, + // and we don't want to send another one. However, we need to make sure + // that each resolved alert has not fired again during the flush as then + // we would delete an active alert thinking it was resolved. + if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil { + level.Error(ag.logger).Log("msg", "error on delete alerts", "err", err) } } } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 85bd62dc..0c8cbf78 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -236,9 +236,33 @@ func TestAggrGroup(t *testing.T) { } } - // Resolve all alerts, they should be removed after the next batch was sent. - a1r, a2r, a3r := *a1, *a2, *a3 - resolved := types.AlertSlice{&a1r, &a2r, &a3r} + // Resolve an alert, and it should be removed after the next batch was sent. + a1r := *a1 + a1r.EndsAt = time.Now() + ag.insert(&a1r) + exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) + + select { + case <-time.After(2 * opts.GroupInterval): + t.Fatalf("expected new batch after group interval but received none") + case batch := <-alertsCh: + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupInterval { + t.Fatalf("received batch too early after %v", s) + } + sort.Sort(batch) + + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alerts %v but got %v", exp, batch) + } + } + + // Resolve all remaining alerts, they should be removed after the next batch was sent. + // Do not add a1r as it should have been deleted following the previous batch. + a2r, a3r := *a2, *a3 + resolved := types.AlertSlice{&a2r, &a3r} for _, a := range resolved { a.EndsAt = time.Now() ag.insert(a) diff --git a/store/store.go b/store/store.go index 9b30542f..5de0c3ee 100644 --- a/store/store.go +++ b/store/store.go @@ -114,12 +114,17 @@ func (a *Alerts) Set(alert *types.Alert) error { return nil } -// Delete removes the Alert with the matching fingerprint from the store. -func (a *Alerts) Delete(fp model.Fingerprint) error { +// DeleteIfNotModified deletes the slice of Alerts from the store if not +// modified. +func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error { a.Lock() defer a.Unlock() - - delete(a.c, fp) + for _, alert := range alerts { + fp := alert.Fingerprint() + if other, ok := a.c[fp]; ok && alert.UpdatedAt == other.UpdatedAt { + delete(a.c, fp) + } + } return nil } diff --git a/store/store_test.go b/store/store_test.go index 1578d6ed..fe1cd0a8 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -37,21 +37,105 @@ func TestSetGet(t *testing.T) { require.Equal(t, want, got.Fingerprint()) } -func TestDelete(t *testing.T) { - a := NewAlerts() - alert := &types.Alert{ - UpdatedAt: time.Now(), - } - require.NoError(t, a.Set(alert)) +func TestDeleteIfNotModified(t *testing.T) { + t.Run("unmodified alert should be deleted", func(t *testing.T) { + a := NewAlerts() + a1 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "foo": "bar", + }, + }, + UpdatedAt: time.Now().Add(-time.Second), + } + require.NoError(t, a.Set(a1)) - fp := alert.Fingerprint() + // a1 should be deleted as it has not been modified. + a.DeleteIfNotModified(types.AlertSlice{a1}) + got, err := a.Get(a1.Fingerprint()) + require.Equal(t, ErrNotFound, err) + require.Nil(t, got) + }) - err := a.Delete(fp) - require.NoError(t, err) + t.Run("modified alert should not be deleted", func(t *testing.T) { + a := NewAlerts() + a1 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "foo": "bar", + }, + }, + UpdatedAt: time.Now(), + } + require.NoError(t, a.Set(a1)) - got, err := a.Get(fp) - require.Nil(t, got) - require.Equal(t, ErrNotFound, err) + // Make a copy of a1 that is older, but do not put it. + // We want to make sure a1 is not deleted. + a2 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "foo": "bar", + }, + }, + UpdatedAt: time.Now().Add(-time.Second), + } + require.True(t, a2.UpdatedAt.Before(a1.UpdatedAt)) + a.DeleteIfNotModified(types.AlertSlice{a2}) + // a1 should not be deleted. + got, err := a.Get(a1.Fingerprint()) + require.NoError(t, err) + require.Equal(t, a1, got) + + // Make another copy of a1 that is older, but do not put it. + // We want to make sure a2 is not deleted here either. + a3 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "foo": "bar", + }, + }, + UpdatedAt: time.Now().Add(time.Second), + } + require.True(t, a3.UpdatedAt.After(a1.UpdatedAt)) + a.DeleteIfNotModified(types.AlertSlice{a3}) + // a1 should not be deleted. + got, err = a.Get(a1.Fingerprint()) + require.NoError(t, err) + require.Equal(t, a1, got) + }) + + t.Run("should not delete other alerts", func(t *testing.T) { + a := NewAlerts() + a1 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "foo": "bar", + }, + }, + UpdatedAt: time.Now(), + } + a2 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "bar": "baz", + }, + }, + UpdatedAt: time.Now(), + } + require.NoError(t, a.Set(a1)) + require.NoError(t, a.Set(a2)) + + // Deleting a1 should not delete a2. + require.NoError(t, a.DeleteIfNotModified(types.AlertSlice{a1})) + // a1 should be deleted. + got, err := a.Get(a1.Fingerprint()) + require.Equal(t, ErrNotFound, err) + require.Nil(t, got) + // a2 should not be deleted. + got, err = a.Get(a2.Fingerprint()) + require.NoError(t, err) + require.Equal(t, a2, got) + }) } func TestGC(t *testing.T) {