dispatch: fix missing receivers in Groups() (#1964)

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2019-07-24 17:12:37 +02:00 committed by GitHub
parent f450720213
commit ab537b5b2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 126 additions and 93 deletions

View File

@ -135,16 +135,21 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
// AlertGroup represents how alerts exist within an aggrGroup.
type AlertGroup struct {
Alerts []*types.Alert
Alerts types.AlertSlice
Labels model.LabelSet
Receiver string
}
type AlertGroups []*AlertGroup
func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
func (ag AlertGroups) Less(i, j int) bool { return ag[i].Labels.Before(ag[j].Labels) }
func (ag AlertGroups) Len() int { return len(ag) }
func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
func (ag AlertGroups) Less(i, j int) bool {
if ag[i].Labels.Equal(ag[j].Labels) {
return ag[i].Receiver < ag[j].Receiver
}
return ag[i].Labels.Before(ag[j].Labels)
}
func (ag AlertGroups) Len() int { return len(ag) }
// Groups returns a slice of AlertGroups from the dispatcher's internal state.
func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
@ -153,13 +158,12 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
d.mtx.RLock()
defer d.mtx.RUnlock()
seen := map[model.Fingerprint]*AlertGroup{}
// Keep a list of receivers for an alert to prevent checking each alert
// again against all routes. The alert has already matched against this
// route on ingestion.
receivers := map[model.Fingerprint][]string{}
now := time.Now()
for route, ags := range d.aggrGroups {
if !routeFilter(route) {
continue
@ -167,18 +171,11 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
for _, ag := range ags {
receiver := route.RouteOpts.Receiver
alertGroup, ok := seen[ag.fingerprint()]
if !ok {
alertGroup = &AlertGroup{
Labels: ag.labels,
Receiver: receiver,
}
seen[ag.fingerprint()] = alertGroup
alertGroup := &AlertGroup{
Labels: ag.labels,
Receiver: receiver,
}
now := time.Now()
alerts := ag.alerts.List()
filteredAlerts := make([]*types.Alert, 0, len(alerts))
for _, a := range alerts {
@ -207,8 +204,13 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
groups = append(groups, alertGroup)
}
}
sort.Sort(groups)
for i := range groups {
sort.Sort(groups[i].Alerts)
}
for i := range receivers {
sort.Strings(receivers[i])
}
return groups, receivers
}

View File

@ -329,11 +329,36 @@ func TestGroupByAllLabels(t *testing.T) {
}
func TestGroups(t *testing.T) {
logger := log.NewNopLogger()
conf, _, err := config.LoadFile("testdata/conf.yml")
confData := `receivers:
- name: 'kafka'
- name: 'prod'
- name: 'testing'
route:
group_by: ['alertname']
group_wait: 10ms
group_interval: 10ms
receiver: 'prod'
routes:
- match:
env: 'testing'
receiver: 'testing'
group_by: ['alertname', 'service']
- match:
env: 'prod'
receiver: 'prod'
group_by: ['alertname', 'service', 'cluster']
continue: true
- match:
kafka: 'yes'
receiver: 'kafka'
group_by: ['alertname', 'service', 'cluster']`
conf, err := config.Load(confData)
if err != nil {
t.Fatal(err)
}
logger := log.NewNopLogger()
route := NewRoute(conf.Route, nil)
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger)
@ -341,25 +366,34 @@ func TestGroups(t *testing.T) {
t.Fatal(err)
}
defer alerts.Close()
timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
recorder := &recordStage{t: t, alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger)
go dispatcher.Run()
defer dispatcher.Stop()
// Create alerts. the dispatcher will automatically create the groups.
alerts.Put(
newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api"}),
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api"}),
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api"}),
newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes"}),
)
inputAlerts := []*types.Alert{
// Matches the parent route.
newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}),
// Matches the first sub-route.
newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api", "instance": "inst1"}),
// Matches the second sub-route.
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst1"}),
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst2"}),
// Matches the second sub-route.
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api", "instance": "inst1"}),
// Matches the second and third sub-route.
newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}),
}
alerts.Put(inputAlerts...)
// Let alerts get processed.
for i := 0; len(recorder.Alerts()) != 5 && i < 10; i++ {
time.Sleep(100 * time.Millisecond)
for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ {
time.Sleep(200 * time.Millisecond)
}
require.Equal(t, 5, len(recorder.Alerts()))
require.Equal(t, 7, len(recorder.Alerts()))
alertGroups, receivers := dispatcher.Groups(
func(*Route) bool {
@ -369,39 +403,70 @@ func TestGroups(t *testing.T) {
},
)
// Verify that grouping works.
require.Equal(t, 5, len(alertGroups))
for _, ag := range alertGroups {
if len(ag.Labels) == 2 {
// testing receiver
require.Equal(t, 2, len(ag.Labels))
require.Equal(t, model.LabelSet{"alertname": "TestingAlert", "service": "api"}, ag.Labels)
for _, alert := range ag.Alerts {
alertsReceivers, ok := receivers[alert.Fingerprint()]
require.True(t, ok)
require.Equal(t, 1, len(alertsReceivers))
require.Equal(t, "testing", alertsReceivers[0])
}
continue
}
require.Equal(t, 3, len(ag.Labels))
for _, alert := range ag.Alerts {
alertsReceivers, ok := receivers[alert.Fingerprint()]
require.True(t, ok)
sort.Strings(alertsReceivers)
if labelValue := ag.Labels["alertname"]; string(labelValue) == "HighLatency" {
// Matches both prod and kafka receivers
require.Equal(t, []string{"kafka", "prod"}, alertsReceivers)
continue
}
require.Equal(t, 1, len(alertsReceivers))
require.Equal(t, "prod", alertsReceivers[0])
}
}
require.Equal(t, AlertGroups{
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[0]},
Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("OtherAlert"),
},
Receiver: "prod",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[1]},
Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("TestingAlert"),
model.LabelName("service"): model.LabelValue("api"),
},
Receiver: "testing",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[2], inputAlerts[3]},
Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("HighErrorRate"),
model.LabelName("service"): model.LabelValue("api"),
model.LabelName("cluster"): model.LabelValue("aa"),
},
Receiver: "prod",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[4]},
Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("HighErrorRate"),
model.LabelName("service"): model.LabelValue("api"),
model.LabelName("cluster"): model.LabelValue("bb"),
},
Receiver: "prod",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[5]},
Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("HighLatency"),
model.LabelName("service"): model.LabelValue("db"),
model.LabelName("cluster"): model.LabelValue("bb"),
},
Receiver: "kafka",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[5]},
Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("HighLatency"),
model.LabelName("service"): model.LabelValue("db"),
model.LabelName("cluster"): model.LabelValue("bb"),
},
Receiver: "prod",
},
}, alertGroups)
require.Equal(t, map[model.Fingerprint][]string{
inputAlerts[0].Fingerprint(): []string{"prod"},
inputAlerts[1].Fingerprint(): []string{"testing"},
inputAlerts[2].Fingerprint(): []string{"prod"},
inputAlerts[3].Fingerprint(): []string{"prod"},
inputAlerts[4].Fingerprint(): []string{"prod"},
inputAlerts[5].Fingerprint(): []string{"kafka", "prod"},
}, receivers)
}
type recordStage struct {
t *testing.T
mtx sync.RWMutex
alerts map[string]map[model.Fingerprint]*types.Alert
}

View File

@ -1,34 +0,0 @@
global:
resolve_timeout: 5m
receivers:
- name: 'testing'
webhook_configs:
- url: 'http://127.0.0.1:5001/'
- name: 'prod'
webhook_configs:
- url: 'http://127.0.0.1:5001/'
- name: 'kafka'
webhook_configs:
- url: 'http://127.0.0.1:5001/'
route:
group_by: ['alertname']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'prod'
routes:
- match:
env: 'testing'
receiver: 'testing'
group_by: ['alertname', 'service']
- match:
env: 'prod'
receiver: 'prod'
group_by: ['alertname', 'service', 'cluster']
continue: true
- match:
kafka: 'yes'
receiver: 'kafka'
group_by: ['alertname', 'service', 'cluster']