From ab537b5b2f9dc55cc4a1faed5df5618c7648c088 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Wed, 24 Jul 2019 17:12:37 +0200 Subject: [PATCH] dispatch: fix missing receivers in Groups() (#1964) Signed-off-by: Simon Pasquier --- dispatch/dispatch.go | 36 ++++----- dispatch/dispatch_test.go | 149 ++++++++++++++++++++++++++----------- dispatch/testdata/conf.yml | 34 --------- 3 files changed, 126 insertions(+), 93 deletions(-) delete mode 100644 dispatch/testdata/conf.yml diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index b7bbad9e..24a390ae 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -135,16 +135,21 @@ func (d *Dispatcher) run(it provider.AlertIterator) { // AlertGroup represents how alerts exist within an aggrGroup. type AlertGroup struct { - Alerts []*types.Alert + Alerts types.AlertSlice Labels model.LabelSet Receiver string } type AlertGroups []*AlertGroup -func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] } -func (ag AlertGroups) Less(i, j int) bool { return ag[i].Labels.Before(ag[j].Labels) } -func (ag AlertGroups) Len() int { return len(ag) } +func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] } +func (ag AlertGroups) Less(i, j int) bool { + if ag[i].Labels.Equal(ag[j].Labels) { + return ag[i].Receiver < ag[j].Receiver + } + return ag[i].Labels.Before(ag[j].Labels) +} +func (ag AlertGroups) Len() int { return len(ag) } // Groups returns a slice of AlertGroups from the dispatcher's internal state. func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) { @@ -153,13 +158,12 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ d.mtx.RLock() defer d.mtx.RUnlock() - seen := map[model.Fingerprint]*AlertGroup{} - // Keep a list of receivers for an alert to prevent checking each alert // again against all routes. The alert has already matched against this // route on ingestion. receivers := map[model.Fingerprint][]string{} + now := time.Now() for route, ags := range d.aggrGroups { if !routeFilter(route) { continue @@ -167,18 +171,11 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ for _, ag := range ags { receiver := route.RouteOpts.Receiver - alertGroup, ok := seen[ag.fingerprint()] - if !ok { - alertGroup = &AlertGroup{ - Labels: ag.labels, - Receiver: receiver, - } - - seen[ag.fingerprint()] = alertGroup + alertGroup := &AlertGroup{ + Labels: ag.labels, + Receiver: receiver, } - now := time.Now() - alerts := ag.alerts.List() filteredAlerts := make([]*types.Alert, 0, len(alerts)) for _, a := range alerts { @@ -207,8 +204,13 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ groups = append(groups, alertGroup) } } - sort.Sort(groups) + for i := range groups { + sort.Sort(groups[i].Alerts) + } + for i := range receivers { + sort.Strings(receivers[i]) + } return groups, receivers } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index db1f845c..fd8677a8 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -329,11 +329,36 @@ func TestGroupByAllLabels(t *testing.T) { } func TestGroups(t *testing.T) { - logger := log.NewNopLogger() - conf, _, err := config.LoadFile("testdata/conf.yml") + confData := `receivers: +- name: 'kafka' +- name: 'prod' +- name: 'testing' + +route: + group_by: ['alertname'] + group_wait: 10ms + group_interval: 10ms + receiver: 'prod' + routes: + - match: + env: 'testing' + receiver: 'testing' + group_by: ['alertname', 'service'] + - match: + env: 'prod' + receiver: 'prod' + group_by: ['alertname', 'service', 'cluster'] + continue: true + - match: + kafka: 'yes' + receiver: 'kafka' + group_by: ['alertname', 'service', 'cluster']` + conf, err := config.Load(confData) if err != nil { t.Fatal(err) } + + logger := log.NewNopLogger() route := NewRoute(conf.Route, nil) marker := types.NewMarker(prometheus.NewRegistry()) alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger) @@ -341,25 +366,34 @@ func TestGroups(t *testing.T) { t.Fatal(err) } defer alerts.Close() + timeout := func(d time.Duration) time.Duration { return time.Duration(0) } - recorder := &recordStage{t: t, alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger) go dispatcher.Run() defer dispatcher.Stop() // Create alerts. the dispatcher will automatically create the groups. - alerts.Put( - newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api"}), - newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api"}), - newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api"}), - newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes"}), - ) + inputAlerts := []*types.Alert{ + // Matches the parent route. + newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}), + // Matches the first sub-route. + newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api", "instance": "inst1"}), + // Matches the second sub-route. + newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst1"}), + newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst2"}), + // Matches the second sub-route. + newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api", "instance": "inst1"}), + // Matches the second and third sub-route. + newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}), + } + alerts.Put(inputAlerts...) // Let alerts get processed. - for i := 0; len(recorder.Alerts()) != 5 && i < 10; i++ { - time.Sleep(100 * time.Millisecond) + for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) } - require.Equal(t, 5, len(recorder.Alerts())) + require.Equal(t, 7, len(recorder.Alerts())) alertGroups, receivers := dispatcher.Groups( func(*Route) bool { @@ -369,39 +403,70 @@ func TestGroups(t *testing.T) { }, ) - // Verify that grouping works. - require.Equal(t, 5, len(alertGroups)) - for _, ag := range alertGroups { - if len(ag.Labels) == 2 { - // testing receiver - require.Equal(t, 2, len(ag.Labels)) - require.Equal(t, model.LabelSet{"alertname": "TestingAlert", "service": "api"}, ag.Labels) - for _, alert := range ag.Alerts { - alertsReceivers, ok := receivers[alert.Fingerprint()] - require.True(t, ok) - require.Equal(t, 1, len(alertsReceivers)) - require.Equal(t, "testing", alertsReceivers[0]) - } - continue - } - require.Equal(t, 3, len(ag.Labels)) - for _, alert := range ag.Alerts { - alertsReceivers, ok := receivers[alert.Fingerprint()] - require.True(t, ok) - sort.Strings(alertsReceivers) - if labelValue := ag.Labels["alertname"]; string(labelValue) == "HighLatency" { - // Matches both prod and kafka receivers - require.Equal(t, []string{"kafka", "prod"}, alertsReceivers) - continue - } - require.Equal(t, 1, len(alertsReceivers)) - require.Equal(t, "prod", alertsReceivers[0]) - } - } + require.Equal(t, AlertGroups{ + &AlertGroup{ + Alerts: []*types.Alert{inputAlerts[0]}, + Labels: model.LabelSet{ + model.LabelName("alertname"): model.LabelValue("OtherAlert"), + }, + Receiver: "prod", + }, + &AlertGroup{ + Alerts: []*types.Alert{inputAlerts[1]}, + Labels: model.LabelSet{ + model.LabelName("alertname"): model.LabelValue("TestingAlert"), + model.LabelName("service"): model.LabelValue("api"), + }, + Receiver: "testing", + }, + &AlertGroup{ + Alerts: []*types.Alert{inputAlerts[2], inputAlerts[3]}, + Labels: model.LabelSet{ + model.LabelName("alertname"): model.LabelValue("HighErrorRate"), + model.LabelName("service"): model.LabelValue("api"), + model.LabelName("cluster"): model.LabelValue("aa"), + }, + Receiver: "prod", + }, + &AlertGroup{ + Alerts: []*types.Alert{inputAlerts[4]}, + Labels: model.LabelSet{ + model.LabelName("alertname"): model.LabelValue("HighErrorRate"), + model.LabelName("service"): model.LabelValue("api"), + model.LabelName("cluster"): model.LabelValue("bb"), + }, + Receiver: "prod", + }, + &AlertGroup{ + Alerts: []*types.Alert{inputAlerts[5]}, + Labels: model.LabelSet{ + model.LabelName("alertname"): model.LabelValue("HighLatency"), + model.LabelName("service"): model.LabelValue("db"), + model.LabelName("cluster"): model.LabelValue("bb"), + }, + Receiver: "kafka", + }, + &AlertGroup{ + Alerts: []*types.Alert{inputAlerts[5]}, + Labels: model.LabelSet{ + model.LabelName("alertname"): model.LabelValue("HighLatency"), + model.LabelName("service"): model.LabelValue("db"), + model.LabelName("cluster"): model.LabelValue("bb"), + }, + Receiver: "prod", + }, + }, alertGroups) + require.Equal(t, map[model.Fingerprint][]string{ + inputAlerts[0].Fingerprint(): []string{"prod"}, + inputAlerts[1].Fingerprint(): []string{"testing"}, + inputAlerts[2].Fingerprint(): []string{"prod"}, + inputAlerts[3].Fingerprint(): []string{"prod"}, + inputAlerts[4].Fingerprint(): []string{"prod"}, + inputAlerts[5].Fingerprint(): []string{"kafka", "prod"}, + }, receivers) } type recordStage struct { - t *testing.T mtx sync.RWMutex alerts map[string]map[model.Fingerprint]*types.Alert } diff --git a/dispatch/testdata/conf.yml b/dispatch/testdata/conf.yml deleted file mode 100644 index e0876fa2..00000000 --- a/dispatch/testdata/conf.yml +++ /dev/null @@ -1,34 +0,0 @@ -global: - resolve_timeout: 5m - -receivers: -- name: 'testing' - webhook_configs: - - url: 'http://127.0.0.1:5001/' -- name: 'prod' - webhook_configs: - - url: 'http://127.0.0.1:5001/' -- name: 'kafka' - webhook_configs: - - url: 'http://127.0.0.1:5001/' - -route: - group_by: ['alertname'] - group_wait: 10s - group_interval: 10s - repeat_interval: 1h - receiver: 'prod' - routes: - - match: - env: 'testing' - receiver: 'testing' - group_by: ['alertname', 'service'] - - match: - env: 'prod' - receiver: 'prod' - group_by: ['alertname', 'service', 'cluster'] - continue: true - - match: - kafka: 'yes' - receiver: 'kafka' - group_by: ['alertname', 'service', 'cluster']