Make a copy of firing alerts with EndsAt=0 when flushing. (#1686)
If the original EndsAt is left in place, then as time moves forwards past the EndsAt then firing alerts will be rendered and treated as resolved alerts which can cause confusion and races. This is most likely to happen on retries for a notification. Mitigate race and fix data races in TestAggrGroup. Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
b676fa79c0
commit
7078333202
|
@ -329,8 +329,14 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
|||
alerts = ag.alerts.List()
|
||||
alertsSlice = make(types.AlertSlice, 0, ag.alerts.Count())
|
||||
)
|
||||
now := time.Now()
|
||||
for alert := range alerts {
|
||||
alertsSlice = append(alertsSlice, alert)
|
||||
a := *alert
|
||||
// Ensure that alerts don't resolve as time move forwards.
|
||||
if !a.ResolvedAt(now) {
|
||||
a.EndsAt = time.Time{}
|
||||
}
|
||||
alertsSlice = append(alertsSlice, &a)
|
||||
}
|
||||
sort.Stable(alertsSlice)
|
||||
|
||||
|
@ -348,7 +354,7 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
|||
level.Error(ag.logger).Log("msg", "failed to get alert", "err", err)
|
||||
continue
|
||||
}
|
||||
if a.Resolved() && got == a {
|
||||
if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
|
||||
if err := ag.alerts.Delete(fp); err != nil {
|
||||
level.Error(ag.logger).Log("msg", "error on delete alert", "err", err)
|
||||
}
|
||||
|
|
|
@ -110,7 +110,8 @@ func TestAggrGroup(t *testing.T) {
|
|||
|
||||
lastCurMtx.Lock()
|
||||
last = current
|
||||
current = time.Now()
|
||||
// Subtract a millisecond to allow for races.
|
||||
current = time.Now().Add(-time.Millisecond)
|
||||
lastCurMtx.Unlock()
|
||||
|
||||
alertsCh <- types.AlertSlice(alerts)
|
||||
|
@ -118,6 +119,15 @@ func TestAggrGroup(t *testing.T) {
|
|||
return true
|
||||
}
|
||||
|
||||
removeEndsAt := func(as types.AlertSlice) types.AlertSlice {
|
||||
for i, a := range as {
|
||||
ac := *a
|
||||
ac.EndsAt = time.Time{}
|
||||
as[i] = &ac
|
||||
}
|
||||
return as
|
||||
}
|
||||
|
||||
// Test regular situation where we wait for group_wait to send out alerts.
|
||||
ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
|
||||
go ag.run(ntfy)
|
||||
|
@ -129,10 +139,13 @@ func TestAggrGroup(t *testing.T) {
|
|||
t.Fatalf("expected initial batch after group_wait")
|
||||
|
||||
case batch := <-alertsCh:
|
||||
if s := time.Since(last); s < opts.GroupWait {
|
||||
lastCurMtx.Lock()
|
||||
s := time.Since(last)
|
||||
lastCurMtx.Unlock()
|
||||
if s < opts.GroupWait {
|
||||
t.Fatalf("received batch too early after %v", s)
|
||||
}
|
||||
exp := types.AlertSlice{a1}
|
||||
exp := removeEndsAt(types.AlertSlice{a1})
|
||||
sort.Sort(batch)
|
||||
|
||||
if !reflect.DeepEqual(batch, exp) {
|
||||
|
@ -149,10 +162,13 @@ func TestAggrGroup(t *testing.T) {
|
|||
t.Fatalf("expected new batch after group interval but received none")
|
||||
|
||||
case batch := <-alertsCh:
|
||||
if s := time.Since(last); s < opts.GroupInterval {
|
||||
lastCurMtx.Lock()
|
||||
s := time.Since(last)
|
||||
lastCurMtx.Unlock()
|
||||
if s < opts.GroupInterval {
|
||||
t.Fatalf("received batch too early after %v", s)
|
||||
}
|
||||
exp := types.AlertSlice{a1, a3}
|
||||
exp := removeEndsAt(types.AlertSlice{a1, a3})
|
||||
sort.Sort(batch)
|
||||
|
||||
if !reflect.DeepEqual(batch, exp) {
|
||||
|
@ -179,7 +195,7 @@ func TestAggrGroup(t *testing.T) {
|
|||
t.Fatalf("expected immediate alert but received none")
|
||||
|
||||
case batch := <-alertsCh:
|
||||
exp := types.AlertSlice{a1, a2}
|
||||
exp := removeEndsAt(types.AlertSlice{a1, a2})
|
||||
sort.Sort(batch)
|
||||
|
||||
if !reflect.DeepEqual(batch, exp) {
|
||||
|
@ -202,7 +218,7 @@ func TestAggrGroup(t *testing.T) {
|
|||
if s < opts.GroupInterval {
|
||||
t.Fatalf("received batch too early after %v", s)
|
||||
}
|
||||
exp := types.AlertSlice{a1, a2, a3}
|
||||
exp := removeEndsAt(types.AlertSlice{a1, a2, a3})
|
||||
sort.Sort(batch)
|
||||
|
||||
if !reflect.DeepEqual(batch, exp) {
|
||||
|
@ -224,7 +240,10 @@ func TestAggrGroup(t *testing.T) {
|
|||
t.Fatalf("expected new batch after group interval but received none")
|
||||
|
||||
case batch := <-alertsCh:
|
||||
if s := time.Since(last); s < opts.GroupInterval {
|
||||
lastCurMtx.Lock()
|
||||
s := time.Since(last)
|
||||
lastCurMtx.Unlock()
|
||||
if s < opts.GroupInterval {
|
||||
t.Fatalf("received batch too early after %v", s)
|
||||
}
|
||||
sort.Sort(batch)
|
||||
|
|
Loading…
Reference in New Issue