Make a copy of firing alerts with EndsAt=0 when flushing. (#1686)
If the original EndsAt is left in place, then as time moves forwards past the EndsAt then firing alerts will be rendered and treated as resolved alerts which can cause confusion and races. This is most likely to happen on retries for a notification. Mitigate race and fix data races in TestAggrGroup. Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
da8d6803ae
commit
6787dd1bde
|
@ -329,8 +329,14 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
|||
alerts = ag.alerts.List()
|
||||
alertsSlice = make(types.AlertSlice, 0, ag.alerts.Count())
|
||||
)
|
||||
now := time.Now()
|
||||
for alert := range alerts {
|
||||
alertsSlice = append(alertsSlice, alert)
|
||||
a := *alert
|
||||
// Ensure that alerts don't resolve as time move forwards.
|
||||
if !a.ResolvedAt(now) {
|
||||
a.EndsAt = time.Time{}
|
||||
}
|
||||
alertsSlice = append(alertsSlice, &a)
|
||||
}
|
||||
sort.Stable(alertsSlice)
|
||||
|
||||
|
@ -348,7 +354,7 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
|||
level.Error(ag.logger).Log("msg", "failed to get alert", "err", err)
|
||||
continue
|
||||
}
|
||||
if a.Resolved() && got == a {
|
||||
if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
|
||||
if err := ag.alerts.Delete(fp); err != nil {
|
||||
level.Error(ag.logger).Log("msg", "error on delete alert", "err", err)
|
||||
}
|
||||
|
|
|
@ -110,7 +110,8 @@ func TestAggrGroup(t *testing.T) {
|
|||
|
||||
lastCurMtx.Lock()
|
||||
last = current
|
||||
current = time.Now()
|
||||
// Subtract a millisecond to allow for races.
|
||||
current = time.Now().Add(-time.Millisecond)
|
||||
lastCurMtx.Unlock()
|
||||
|
||||
alertsCh <- types.AlertSlice(alerts)
|
||||
|
@ -118,6 +119,15 @@ func TestAggrGroup(t *testing.T) {
|
|||
return true
|
||||
}
|
||||
|
||||
removeEndsAt := func(as types.AlertSlice) types.AlertSlice {
|
||||
for i, a := range as {
|
||||
ac := *a
|
||||
ac.EndsAt = time.Time{}
|
||||
as[i] = &ac
|
||||
}
|
||||
return as
|
||||
}
|
||||
|
||||
// Test regular situation where we wait for group_wait to send out alerts.
|
||||
ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
|
||||
go ag.run(ntfy)
|
||||
|
@ -129,10 +139,13 @@ func TestAggrGroup(t *testing.T) {
|
|||
t.Fatalf("expected initial batch after group_wait")
|
||||
|
||||
case batch := <-alertsCh:
|
||||
if s := time.Since(last); s < opts.GroupWait {
|
||||
lastCurMtx.Lock()
|
||||
s := time.Since(last)
|
||||
lastCurMtx.Unlock()
|
||||
if s < opts.GroupWait {
|
||||
t.Fatalf("received batch too early after %v", s)
|
||||
}
|
||||
exp := types.AlertSlice{a1}
|
||||
exp := removeEndsAt(types.AlertSlice{a1})
|
||||
sort.Sort(batch)
|
||||
|
||||
if !reflect.DeepEqual(batch, exp) {
|
||||
|
@ -149,10 +162,13 @@ func TestAggrGroup(t *testing.T) {
|
|||
t.Fatalf("expected new batch after group interval but received none")
|
||||
|
||||
case batch := <-alertsCh:
|
||||
if s := time.Since(last); s < opts.GroupInterval {
|
||||
lastCurMtx.Lock()
|
||||
s := time.Since(last)
|
||||
lastCurMtx.Unlock()
|
||||
if s < opts.GroupInterval {
|
||||
t.Fatalf("received batch too early after %v", s)
|
||||
}
|
||||
exp := types.AlertSlice{a1, a3}
|
||||
exp := removeEndsAt(types.AlertSlice{a1, a3})
|
||||
sort.Sort(batch)
|
||||
|
||||
if !reflect.DeepEqual(batch, exp) {
|
||||
|
@ -179,7 +195,7 @@ func TestAggrGroup(t *testing.T) {
|
|||
t.Fatalf("expected immediate alert but received none")
|
||||
|
||||
case batch := <-alertsCh:
|
||||
exp := types.AlertSlice{a1, a2}
|
||||
exp := removeEndsAt(types.AlertSlice{a1, a2})
|
||||
sort.Sort(batch)
|
||||
|
||||
if !reflect.DeepEqual(batch, exp) {
|
||||
|
@ -202,7 +218,7 @@ func TestAggrGroup(t *testing.T) {
|
|||
if s < opts.GroupInterval {
|
||||
t.Fatalf("received batch too early after %v", s)
|
||||
}
|
||||
exp := types.AlertSlice{a1, a2, a3}
|
||||
exp := removeEndsAt(types.AlertSlice{a1, a2, a3})
|
||||
sort.Sort(batch)
|
||||
|
||||
if !reflect.DeepEqual(batch, exp) {
|
||||
|
@ -224,7 +240,10 @@ func TestAggrGroup(t *testing.T) {
|
|||
t.Fatalf("expected new batch after group interval but received none")
|
||||
|
||||
case batch := <-alertsCh:
|
||||
if s := time.Since(last); s < opts.GroupInterval {
|
||||
lastCurMtx.Lock()
|
||||
s := time.Since(last)
|
||||
lastCurMtx.Unlock()
|
||||
if s < opts.GroupInterval {
|
||||
t.Fatalf("received batch too early after %v", s)
|
||||
}
|
||||
sort.Sort(batch)
|
||||
|
|
Loading…
Reference in New Issue