From 390474ffbe37b6c3d61176757095196e94cf888f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 5 May 2021 17:26:37 +0200 Subject: [PATCH] Added group limit to dispatcher. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- cmd/alertmanager/main.go | 2 +- dispatch/dispatch.go | 64 +++++++++++++++++++++------- dispatch/dispatch_test.go | 88 ++++++++++++++++++++++++++++----------- 3 files changed, 114 insertions(+), 40 deletions(-) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 0de35f78..8b2757de 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -460,7 +460,7 @@ func run() int { silencer.Mutes(labels) }) - disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, logger, dispMetrics) + disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { level.Warn(configLogger).Log( diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index a3357857..65e4dda8 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -33,8 +33,9 @@ import ( // DispatcherMetrics represents metrics associated to a dispatcher. type DispatcherMetrics struct { - aggrGroups prometheus.Gauge - processingDuration prometheus.Summary + aggrGroups prometheus.Gauge + processingDuration prometheus.Summary + aggrGroupLimitReached prometheus.Counter } // NewDispatcherMetrics returns a new registered DispatchMetrics. @@ -52,10 +53,16 @@ func NewDispatcherMetrics(r prometheus.Registerer) *DispatcherMetrics { Help: "Summary of latencies for the processing of alerts.", }, ), + aggrGroupLimitReached: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "alertmanager_dispatcher_aggregation_group_limit_reached_total", + Help: "Number of times when dispatcher failed to create new aggregation group due to limit.", + }, + ), } if r != nil { - r.MustRegister(m.aggrGroups, m.processingDuration) + r.MustRegister(m.aggrGroups, m.processingDuration, m.aggrGroupLimitReached) } return &m @@ -68,12 +75,14 @@ type Dispatcher struct { alerts provider.Alerts stage notify.Stage metrics *DispatcherMetrics + limits Limits marker types.Marker timeout func(time.Duration) time.Duration - aggrGroups map[*Route]map[model.Fingerprint]*aggrGroup - mtx sync.RWMutex + mtx sync.RWMutex + aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup + aggrGroupsNum int done chan struct{} ctx context.Context @@ -82,6 +91,14 @@ type Dispatcher struct { logger log.Logger } +// Limits describes limits used by Dispatcher. +type Limits interface { + // MaxNumberOfAggregationGroups returns max number of aggregation groups that dispatcher can have. + // 0 or negative value = unlimited. + // If dispatcher hits this limit, it will not create additional groups, but will log an error instead. + MaxNumberOfAggregationGroups() int +} + // NewDispatcher returns a new Dispatcher. func NewDispatcher( ap provider.Alerts, @@ -89,9 +106,14 @@ func NewDispatcher( s notify.Stage, mk types.Marker, to func(time.Duration) time.Duration, + lim Limits, l log.Logger, m *DispatcherMetrics, ) *Dispatcher { + if lim == nil { + lim = nilLimits{} + } + disp := &Dispatcher{ alerts: ap, stage: s, @@ -100,6 +122,7 @@ func NewDispatcher( timeout: to, logger: log.With(l, "component", "dispatcher"), metrics: m, + limits: lim, } return disp } @@ -109,7 +132,8 @@ func (d *Dispatcher) Run() { d.done = make(chan struct{}) d.mtx.Lock() - d.aggrGroups = map[*Route]map[model.Fingerprint]*aggrGroup{} + d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{} + d.aggrGroupsNum = 0 d.metrics.aggrGroups.Set(0) d.ctx, d.cancel = context.WithCancel(context.Background()) d.mtx.Unlock() @@ -152,11 +176,12 @@ func (d *Dispatcher) run(it provider.AlertIterator) { case <-cleanup.C: d.mtx.Lock() - for _, groups := range d.aggrGroups { + for _, groups := range d.aggrGroupsPerRoute { for _, ag := range groups { if ag.empty() { ag.stop() delete(groups, ag.fingerprint()) + d.aggrGroupsNum-- d.metrics.aggrGroups.Dec() } } @@ -201,7 +226,7 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ receivers := map[model.Fingerprint][]string{} now := time.Now() - for route, ags := range d.aggrGroups { + for route, ags := range d.aggrGroupsPerRoute { if !routeFilter(route) { continue } @@ -284,21 +309,28 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { d.mtx.Lock() defer d.mtx.Unlock() - group, ok := d.aggrGroups[route] + routeGroups, ok := d.aggrGroupsPerRoute[route] if !ok { - group = map[model.Fingerprint]*aggrGroup{} - d.aggrGroups[route] = group + routeGroups = map[model.Fingerprint]*aggrGroup{} + d.aggrGroupsPerRoute[route] = routeGroups } - ag, ok := group[fp] + ag, ok := routeGroups[fp] if ok { ag.insert(alert) return } - // If the group does not exist, create it. + // If the group does not exist, create it. But check the limit first. + if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit { + d.metrics.aggrGroupLimitReached.Inc() + level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + return + } + ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) - group[fp] = ag + routeGroups[fp] = ag + d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() // Insert the 1st alert in the group before starting the group's run() @@ -499,3 +531,7 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { } } } + +type nilLimits struct{} + +func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 4d78178a..b2dd5bb5 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -24,6 +24,7 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -373,7 +374,9 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry())) + lim := limits{groups: 6} + m := NewDispatcherMetrics(prometheus.NewRegistry()) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m) go dispatcher.Run() defer dispatcher.Stop() @@ -391,7 +394,10 @@ route: // Matches the second and third sub-route. newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}), } - alerts.Put(inputAlerts...) + err = alerts.Put(inputAlerts...) + if err != nil { + t.Fatal(err) + } // Let alerts get processed. for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ { @@ -411,63 +417,87 @@ route: &AlertGroup{ Alerts: []*types.Alert{inputAlerts[0]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("OtherAlert"), + "alertname": "OtherAlert", }, Receiver: "prod", }, &AlertGroup{ Alerts: []*types.Alert{inputAlerts[1]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("TestingAlert"), - model.LabelName("service"): model.LabelValue("api"), + "alertname": "TestingAlert", + "service": "api", }, Receiver: "testing", }, &AlertGroup{ Alerts: []*types.Alert{inputAlerts[2], inputAlerts[3]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("HighErrorRate"), - model.LabelName("service"): model.LabelValue("api"), - model.LabelName("cluster"): model.LabelValue("aa"), + "alertname": "HighErrorRate", + "service": "api", + "cluster": "aa", }, Receiver: "prod", }, &AlertGroup{ Alerts: []*types.Alert{inputAlerts[4]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("HighErrorRate"), - model.LabelName("service"): model.LabelValue("api"), - model.LabelName("cluster"): model.LabelValue("bb"), + "alertname": "HighErrorRate", + "service": "api", + "cluster": "bb", }, Receiver: "prod", }, &AlertGroup{ Alerts: []*types.Alert{inputAlerts[5]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("HighLatency"), - model.LabelName("service"): model.LabelValue("db"), - model.LabelName("cluster"): model.LabelValue("bb"), + "alertname": "HighLatency", + "service": "db", + "cluster": "bb", }, Receiver: "kafka", }, &AlertGroup{ Alerts: []*types.Alert{inputAlerts[5]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("HighLatency"), - model.LabelName("service"): model.LabelValue("db"), - model.LabelName("cluster"): model.LabelValue("bb"), + "alertname": "HighLatency", + "service": "db", + "cluster": "bb", }, Receiver: "prod", }, }, alertGroups) require.Equal(t, map[model.Fingerprint][]string{ - inputAlerts[0].Fingerprint(): []string{"prod"}, - inputAlerts[1].Fingerprint(): []string{"testing"}, - inputAlerts[2].Fingerprint(): []string{"prod"}, - inputAlerts[3].Fingerprint(): []string{"prod"}, - inputAlerts[4].Fingerprint(): []string{"prod"}, - inputAlerts[5].Fingerprint(): []string{"kafka", "prod"}, + inputAlerts[0].Fingerprint(): {"prod"}, + inputAlerts[1].Fingerprint(): {"testing"}, + inputAlerts[2].Fingerprint(): {"prod"}, + inputAlerts[3].Fingerprint(): {"prod"}, + inputAlerts[4].Fingerprint(): {"prod"}, + inputAlerts[5].Fingerprint(): {"kafka", "prod"}, }, receivers) + + require.Equal(t, 0.0, testutil.ToFloat64(m.aggrGroupLimitReached)) + + // Try to store new alert. This time, we will hit limit for number of groups. + err = alerts.Put(newAlert(model.LabelSet{"env": "prod", "alertname": "NewAlert", "cluster": "new-cluster", "service": "db"})) + if err != nil { + t.Fatal(err) + } + + // Let alert get processed. + for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + require.Equal(t, 1.0, testutil.ToFloat64(m.aggrGroupLimitReached)) + + // Verify there are still only 6 groups. + alertGroups, _ = dispatcher.Groups( + func(*Route) bool { + return true + }, func(*types.Alert, time.Time) bool { + return true + }, + ) + require.Len(t, alertGroups, 6) } type recordStage struct { @@ -534,7 +564,7 @@ func TestDispatcherRace(t *testing.T) { defer alerts.Close() timeout := func(d time.Duration) time.Duration { return time.Duration(0) } - dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(prometheus.NewRegistry())) go dispatcher.Run() dispatcher.Stop() } @@ -562,7 +592,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) timeout := func(d time.Duration) time.Duration { return d } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(prometheus.NewRegistry())) go dispatcher.Run() defer dispatcher.Stop() @@ -585,3 +615,11 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) // We expect all alerts to be notified immediately, since they all belong to different groups. require.Equal(t, numAlerts, len(recorder.Alerts())) } + +type limits struct { + groups int +} + +func (l limits) MaxNumberOfAggregationGroups() int { + return l.groups +}