diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 89080994..ee911152 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -329,8 +329,14 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { alerts = ag.alerts.List() alertsSlice = make(types.AlertSlice, 0, ag.alerts.Count()) ) + now := time.Now() for alert := range alerts { - alertsSlice = append(alertsSlice, alert) + a := *alert + // Ensure that alerts don't resolve as time move forwards. + if !a.ResolvedAt(now) { + a.EndsAt = time.Time{} + } + alertsSlice = append(alertsSlice, &a) } sort.Stable(alertsSlice) @@ -348,7 +354,7 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { level.Error(ag.logger).Log("msg", "failed to get alert", "err", err) continue } - if a.Resolved() && got == a { + if a.Resolved() && got.UpdatedAt == a.UpdatedAt { if err := ag.alerts.Delete(fp); err != nil { level.Error(ag.logger).Log("msg", "error on delete alert", "err", err) } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 4ec2ab84..8ce94679 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -110,7 +110,8 @@ func TestAggrGroup(t *testing.T) { lastCurMtx.Lock() last = current - current = time.Now() + // Subtract a millisecond to allow for races. + current = time.Now().Add(-time.Millisecond) lastCurMtx.Unlock() alertsCh <- types.AlertSlice(alerts) @@ -118,6 +119,15 @@ func TestAggrGroup(t *testing.T) { return true } + removeEndsAt := func(as types.AlertSlice) types.AlertSlice { + for i, a := range as { + ac := *a + ac.EndsAt = time.Time{} + as[i] = &ac + } + return as + } + // Test regular situation where we wait for group_wait to send out alerts. ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger()) go ag.run(ntfy) @@ -129,10 +139,13 @@ func TestAggrGroup(t *testing.T) { t.Fatalf("expected initial batch after group_wait") case batch := <-alertsCh: - if s := time.Since(last); s < opts.GroupWait { + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupWait { t.Fatalf("received batch too early after %v", s) } - exp := types.AlertSlice{a1} + exp := removeEndsAt(types.AlertSlice{a1}) sort.Sort(batch) if !reflect.DeepEqual(batch, exp) { @@ -149,10 +162,13 @@ func TestAggrGroup(t *testing.T) { t.Fatalf("expected new batch after group interval but received none") case batch := <-alertsCh: - if s := time.Since(last); s < opts.GroupInterval { + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupInterval { t.Fatalf("received batch too early after %v", s) } - exp := types.AlertSlice{a1, a3} + exp := removeEndsAt(types.AlertSlice{a1, a3}) sort.Sort(batch) if !reflect.DeepEqual(batch, exp) { @@ -179,7 +195,7 @@ func TestAggrGroup(t *testing.T) { t.Fatalf("expected immediate alert but received none") case batch := <-alertsCh: - exp := types.AlertSlice{a1, a2} + exp := removeEndsAt(types.AlertSlice{a1, a2}) sort.Sort(batch) if !reflect.DeepEqual(batch, exp) { @@ -202,7 +218,7 @@ func TestAggrGroup(t *testing.T) { if s < opts.GroupInterval { t.Fatalf("received batch too early after %v", s) } - exp := types.AlertSlice{a1, a2, a3} + exp := removeEndsAt(types.AlertSlice{a1, a2, a3}) sort.Sort(batch) if !reflect.DeepEqual(batch, exp) { @@ -224,7 +240,10 @@ func TestAggrGroup(t *testing.T) { t.Fatalf("expected new batch after group interval but received none") case batch := <-alertsCh: - if s := time.Since(last); s < opts.GroupInterval { + lastCurMtx.Lock() + s := time.Since(last) + lastCurMtx.Unlock() + if s < opts.GroupInterval { t.Fatalf("received batch too early after %v", s) } sort.Sort(batch)