From 7078333202f37577d2549aa7e223f6eb98a2d9d1 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Fri, 4 Jan 2019 15:52:20 +0000 Subject: [PATCH] Make a copy of firing alerts with EndsAt=0 when flushing. (#1686) If the original EndsAt is left in place, then as time moves forwards past the EndsAt then firing alerts will be rendered and treated as resolved alerts which can cause confusion and races. This is most likely to happen on retries for a notification. Mitigate race and fix data races in TestAggrGroup. Signed-off-by: Brian Brazil --- dispatch/dispatch.go | 10 ++++++++-- dispatch/dispatch_test.go | 35 +++++++++++++++++++++++++++-------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 89080994..ee911152 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -329,8 +329,14 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { alerts = ag.alerts.List() alertsSlice = make(types.AlertSlice, 0, ag.alerts.Count()) ) + now := time.Now() for alert := range alerts { - alertsSlice = append(alertsSlice, alert) + a := *alert + // Ensure that alerts don't resolve as time move forwards. + if !a.ResolvedAt(now) { + a.EndsAt = time.Time{} + } + alertsSlice = append(alertsSlice, &a) } sort.Stable(alertsSlice) @@ -348,7 +354,7 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { level.Error(ag.logger).Log("msg", "failed to get alert", "err", err) continue } - if a.Resolved() && got == a { + if a.Resolved() && got.UpdatedAt == a.UpdatedAt { if err := ag.alerts.Delete(fp); err != nil { level.Error(ag.logger).Log("msg", "error on delete alert", "err", err) } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 4ec2ab84..8ce94679 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -110,7 +110,8 @@ func TestAggrGroup(t *testing.T) { lastCurMtx.Lock() last = current - current = time.Now() + // Subtract a millisecond to allow for races. + current = time.Now().Add(-time.Millisecond) lastCurMtx.Unlock() alertsCh <- types.AlertSlice(alerts) @@ -118,6 +119,15 @@ func TestAggrGroup(t *testing.T) { return true } + removeEndsAt := func(as types.AlertSlice) types.AlertSlice { + for i, a := range as { + ac := *a + ac.EndsAt = time.Time{} + as[i] = &ac + } + return as + } + // Test regular situation where we wait for group_wait to send out alerts. ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger()) go ag.run(ntfy) @@ -129,10 +139,13 @@ func TestAggrGroup(t *testing.T) { t.Fatalf("expected initial batch after group_wait") case batch := <-alertsCh: - if s := time.Since(last); s < opts.GroupWait { + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupWait { t.Fatalf("received batch too early after %v", s) } - exp := types.AlertSlice{a1} + exp := removeEndsAt(types.AlertSlice{a1}) sort.Sort(batch) if !reflect.DeepEqual(batch, exp) { @@ -149,10 +162,13 @@ func TestAggrGroup(t *testing.T) { t.Fatalf("expected new batch after group interval but received none") case batch := <-alertsCh: - if s := time.Since(last); s < opts.GroupInterval { + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupInterval { t.Fatalf("received batch too early after %v", s) } - exp := types.AlertSlice{a1, a3} + exp := removeEndsAt(types.AlertSlice{a1, a3}) sort.Sort(batch) if !reflect.DeepEqual(batch, exp) { @@ -179,7 +195,7 @@ func TestAggrGroup(t *testing.T) { t.Fatalf("expected immediate alert but received none") case batch := <-alertsCh: - exp := types.AlertSlice{a1, a2} + exp := removeEndsAt(types.AlertSlice{a1, a2}) sort.Sort(batch) if !reflect.DeepEqual(batch, exp) { @@ -202,7 +218,7 @@ func TestAggrGroup(t *testing.T) { if s < opts.GroupInterval { t.Fatalf("received batch too early after %v", s) } - exp := types.AlertSlice{a1, a2, a3} + exp := removeEndsAt(types.AlertSlice{a1, a2, a3}) sort.Sort(batch) if !reflect.DeepEqual(batch, exp) { @@ -224,7 +240,10 @@ func TestAggrGroup(t *testing.T) { t.Fatalf("expected new batch after group interval but received none") case batch := <-alertsCh: - if s := time.Since(last); s < opts.GroupInterval { + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupInterval { t.Fatalf("received batch too early after %v", s) } sort.Sort(batch)