Merge pull request #2563 from pracucci/fix-issue-2562

Fix race condition causing 1st alert to not be immediately delivered when group_wait is 0s
This commit is contained in:
Julien Pivotto 2021-05-11 21:09:23 +02:00 committed by GitHub
commit 670fd98cea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 20 deletions

View File

@ -290,30 +290,36 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
d.aggrGroups[route] = group
}
// If the group does not exist, create it.
ag, ok := group[fp]
if !ok {
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
group[fp] = ag
d.metrics.aggrGroups.Inc()
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
lvl := level.Error(d.logger)
if ctx.Err() == context.Canceled {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
lvl = level.Debug(d.logger)
}
lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
}
return err == nil
})
if ok {
ag.insert(alert)
return
}
// If the group does not exist, create it.
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
group[fp] = ag
d.metrics.aggrGroups.Inc()
// Insert the 1st alert in the group before starting the group's run()
// function, to make sure that when the run() will be executed the 1st
// alert is already there.
ag.insert(alert)
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
lvl := level.Error(d.logger)
if ctx.Err() == context.Canceled {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
lvl = level.Debug(d.logger)
}
lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
}
return err == nil
})
}
func getGroupLabels(alert *types.Alert, route *Route) model.LabelSet {

View File

@ -15,6 +15,7 @@ package dispatch
import (
"context"
"fmt"
"reflect"
"sort"
"sync"
@ -537,3 +538,50 @@ func TestDispatcherRace(t *testing.T) {
go dispatcher.Run()
dispatcher.Stop()
}
func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) {
const numAlerts = 8000
logger := log.NewNopLogger()
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger)
if err != nil {
t.Fatal(err)
}
defer alerts.Close()
route := &Route{
RouteOpts: RouteOpts{
Receiver: "default",
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: 1 * time.Hour, // Should never hit in this test.
RepeatInterval: 1 * time.Hour, // Should never hit in this test.
},
}
timeout := func(d time.Duration) time.Duration { return d }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry()))
go dispatcher.Run()
defer dispatcher.Stop()
// Push all alerts.
for i := 0; i < numAlerts; i++ {
alert := newAlert(model.LabelSet{"alertname": model.LabelValue(fmt.Sprintf("Alert_%d", i))})
require.NoError(t, alerts.Put(alert))
}
// Wait until the alerts have been notified or the waiting timeout expires.
for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); {
if len(recorder.Alerts()) >= numAlerts {
break
}
// Throttle.
time.Sleep(10 * time.Millisecond)
}
// We expect all alerts to be notified immediately, since they all belong to different groups.
require.Equal(t, numAlerts, len(recorder.Alerts()))
}