diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 46518686..db1f845c 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -335,40 +335,41 @@ func TestGroups(t *testing.T) { t.Fatal(err) } route := NewRoute(conf.Route, nil) - marker := types.NewMarker(prometheus.DefaultRegisterer) + marker := types.NewMarker(prometheus.NewRegistry()) alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger) if err != nil { t.Fatal(err) } defer alerts.Close() timeout := func(d time.Duration) time.Duration { return time.Duration(0) } - dispatcher := NewDispatcher(alerts, route, &noopStage{}, marker, timeout, logger) + recorder := &recordStage{t: t, alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger) go dispatcher.Run() + defer dispatcher.Stop() - // create alerts. the dispatcher will automatically create the groups + // Create alerts. the dispatcher will automatically create the groups. alerts.Put( newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api"}), newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api"}), newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api"}), newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes"}), ) - // Let alerts get processed - time.Sleep(time.Second) - var routeFilter, alertFilter bool - alertGroups, receivers := dispatcher.Groups(func(_ *Route) bool { - routeFilter = true - return true - }, func(_ *types.Alert, _ time.Time) bool { - alertFilter = true - return true - }) + // Let alerts get processed. + for i := 0; len(recorder.Alerts()) != 5 && i < 10; i++ { + time.Sleep(100 * time.Millisecond) + } + require.Equal(t, 5, len(recorder.Alerts())) - // Verify filter functions were called - require.True(t, routeFilter) - require.True(t, alertFilter) + alertGroups, receivers := dispatcher.Groups( + func(*Route) bool { + return true + }, func(*types.Alert, time.Time) bool { + return true + }, + ) - // Verify grouping works + // Verify that grouping works. require.Equal(t, 5, len(alertGroups)) for _, ag := range alertGroups { if len(ag.Labels) == 2 { @@ -387,9 +388,10 @@ func TestGroups(t *testing.T) { for _, alert := range ag.Alerts { alertsReceivers, ok := receivers[alert.Fingerprint()] require.True(t, ok) + sort.Strings(alertsReceivers) if labelValue := ag.Labels["alertname"]; string(labelValue) == "HighLatency" { // Matches both prod and kafka receivers - require.Equal(t, []string{"prod", "kafka"}, alertsReceivers) + require.Equal(t, []string{"kafka", "prod"}, alertsReceivers) continue } require.Equal(t, 1, len(alertsReceivers)) @@ -398,15 +400,45 @@ func TestGroups(t *testing.T) { } } -type noopStage struct{} +type recordStage struct { + t *testing.T + mtx sync.RWMutex + alerts map[string]map[model.Fingerprint]*types.Alert +} -func (n *noopStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { +func (r *recordStage) Alerts() []*types.Alert { + r.mtx.RLock() + defer r.mtx.RUnlock() + alerts := make([]*types.Alert, 0) + for k := range r.alerts { + for _, a := range r.alerts[k] { + alerts = append(alerts, a) + } + } + return alerts +} + +func (r *recordStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { + r.mtx.Lock() + defer r.mtx.Unlock() + gk, ok := notify.GroupKey(ctx) + if !ok { + panic("GroupKey not present!") + } + if _, ok := r.alerts[gk]; !ok { + r.alerts[gk] = make(map[model.Fingerprint]*types.Alert) + } + for _, a := range alerts { + r.alerts[gk][a.Fingerprint()] = a + } return ctx, nil, nil } var ( - t0 = time.Now() - t1 = t0.Add(100 * time.Millisecond) + // Set the start time in the past to trigger a flush immediately. + t0 = time.Now().Add(-time.Minute) + // Set the end time in the future to avoid deleting the alert. + t1 = t0.Add(2 * time.Minute) ) func newAlert(labels model.LabelSet) *types.Alert {