From 72ef6e04e1ebe1ef54ff18fabd99e360531597f2 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 30 Apr 2021 10:11:10 +0200 Subject: [PATCH 1/3] Fix race condition causing 1st alert to not be immediately delivered when group_wait is 0s Signed-off-by: Marco Pracucci --- dispatch/dispatch.go | 46 +++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index d62e0b8d..a3357857 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -290,30 +290,36 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { d.aggrGroups[route] = group } - // If the group does not exist, create it. ag, ok := group[fp] - if !ok { - ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) - group[fp] = ag - d.metrics.aggrGroups.Inc() - - go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { - _, _, err := d.stage.Exec(ctx, d.logger, alerts...) - if err != nil { - lvl := level.Error(d.logger) - if ctx.Err() == context.Canceled { - // It is expected for the context to be canceled on - // configuration reload or shutdown. In this case, the - // message should only be logged at the debug level. - lvl = level.Debug(d.logger) - } - lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err) - } - return err == nil - }) + if ok { + ag.insert(alert) + return } + // If the group does not exist, create it. + ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) + group[fp] = ag + d.metrics.aggrGroups.Inc() + + // Insert the 1st alert in the group before starting the group's run() + // function, to make sure that when the run() will be executed the 1st + // alert is already there. ag.insert(alert) + + go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { + _, _, err := d.stage.Exec(ctx, d.logger, alerts...) + if err != nil { + lvl := level.Error(d.logger) + if ctx.Err() == context.Canceled { + // It is expected for the context to be canceled on + // configuration reload or shutdown. In this case, the + // message should only be logged at the debug level. + lvl = level.Debug(d.logger) + } + lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err) + } + return err == nil + }) } func getGroupLabels(alert *types.Alert, route *Route) model.LabelSet { From 1ad22c808fbe9822b3063ffdb3a840392a498085 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 11 May 2021 15:48:02 +0200 Subject: [PATCH 2/3] Added unit test Signed-off-by: Marco Pracucci --- dispatch/dispatch_test.go | 48 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index a3dcf08f..7b0e94ef 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -15,6 +15,7 @@ package dispatch import ( "context" + "fmt" "reflect" "sort" "sync" @@ -537,3 +538,50 @@ func TestDispatcherRace(t *testing.T) { go dispatcher.Run() dispatcher.Stop() } + +func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) { + const numAlerts = 100000 + + logger := log.NewNopLogger() + marker := types.NewMarker(prometheus.NewRegistry()) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "default", + GroupBy: map[model.LabelName]struct{}{"alertname": {}}, + GroupWait: 0, + GroupInterval: 1 * time.Hour, // Should never hit in this test. + RepeatInterval: 1 * time.Hour, // Should never hit in this test. + }, + } + + timeout := func(d time.Duration) time.Duration { return d } + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry())) + go dispatcher.Run() + defer dispatcher.Stop() + + // Push all alerts. + for i := 0; i < numAlerts; i++ { + alert := newAlert(model.LabelSet{"alertname": model.LabelValue(fmt.Sprintf("Alert_%d", i))}) + require.NoError(t, alerts.Put(alert)) + } + + // Wait until the alerts have been notified or the waiting timeout expires. + for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); { + if len(recorder.Alerts()) >= numAlerts { + break + } + + // Throttle. + time.Sleep(10 * time.Millisecond) + } + + // We expect all alerts to be notified immediately, since they all belong to different groups. + require.Equal(t, numAlerts, len(recorder.Alerts())) +} From f84af78693343f08bf475664c7d282dd6d55b441 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 11 May 2021 16:15:46 +0200 Subject: [PATCH 3/3] Lowered number of alert groups Signed-off-by: Marco Pracucci --- dispatch/dispatch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 7b0e94ef..45662970 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -540,7 +540,7 @@ func TestDispatcherRace(t *testing.T) { } func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) { - const numAlerts = 100000 + const numAlerts = 8000 logger := log.NewNopLogger() marker := types.NewMarker(prometheus.NewRegistry())