dispatch: don't reset timer if flush is in-progress (#1301)
When the aggregation group receives an alert that is past the initial group_wait value, it should reset its timer only if the timer has ever expired. Otherwise it means that the flush is already in-progress.
This commit is contained in:
parent
19715022a4
commit
4cba49155d
|
@ -287,9 +287,9 @@ type aggrGroup struct {
|
||||||
next *time.Timer
|
next *time.Timer
|
||||||
timeout func(time.Duration) time.Duration
|
timeout func(time.Duration) time.Duration
|
||||||
|
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
alerts map[model.Fingerprint]*types.Alert
|
alerts map[model.Fingerprint]*types.Alert
|
||||||
hasSent bool
|
hasFlushed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// newAggrGroup returns a new aggregation group.
|
// newAggrGroup returns a new aggregation group.
|
||||||
|
@ -366,6 +366,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
|
||||||
// Wait the configured interval before calling flush again.
|
// Wait the configured interval before calling flush again.
|
||||||
ag.mtx.Lock()
|
ag.mtx.Lock()
|
||||||
ag.next.Reset(ag.opts.GroupInterval)
|
ag.next.Reset(ag.opts.GroupInterval)
|
||||||
|
ag.hasFlushed = true
|
||||||
ag.mtx.Unlock()
|
ag.mtx.Unlock()
|
||||||
|
|
||||||
ag.flush(func(alerts ...*types.Alert) bool {
|
ag.flush(func(alerts ...*types.Alert) bool {
|
||||||
|
@ -396,7 +397,7 @@ func (ag *aggrGroup) insert(alert *types.Alert) {
|
||||||
|
|
||||||
// Immediately trigger a flush if the wait duration for this
|
// Immediately trigger a flush if the wait duration for this
|
||||||
// alert is already over.
|
// alert is already over.
|
||||||
if !ag.hasSent && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
|
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
|
||||||
ag.next.Reset(0)
|
ag.next.Reset(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -457,8 +458,6 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
||||||
delete(ag.alerts, fp)
|
delete(ag.alerts, fp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ag.hasSent = true
|
|
||||||
ag.mtx.Unlock()
|
ag.mtx.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -406,10 +406,9 @@ receivers:
|
||||||
func TestReload(t *testing.T) {
|
func TestReload(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
// We create a notification config that fans out into two different
|
// This integration test ensures that the first alert isn't notified twice
|
||||||
// webhooks.
|
// and repeat_interval applies after the AlertManager process has been
|
||||||
// The succeeding one must still only receive the first successful
|
// reloaded.
|
||||||
// notifications. Sending to the succeeding one must eventually succeed.
|
|
||||||
conf := `
|
conf := `
|
||||||
route:
|
route:
|
||||||
receiver: "default"
|
receiver: "default"
|
||||||
|
|
Loading…
Reference in New Issue