Added group limit to dispatcher.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
This commit is contained in:
Peter Štibraný 2021-05-05 17:26:37 +02:00 committed by Peter Štibraný
parent f686ff3be2
commit 390474ffbe
3 changed files with 114 additions and 40 deletions

View File

@ -460,7 +460,7 @@ func run() int {
silencer.Mutes(labels) silencer.Mutes(labels)
}) })
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, logger, dispMetrics) disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics)
routes.Walk(func(r *dispatch.Route) { routes.Walk(func(r *dispatch.Route) {
if r.RouteOpts.RepeatInterval > *retention { if r.RouteOpts.RepeatInterval > *retention {
level.Warn(configLogger).Log( level.Warn(configLogger).Log(

View File

@ -33,8 +33,9 @@ import (
// DispatcherMetrics represents metrics associated to a dispatcher. // DispatcherMetrics represents metrics associated to a dispatcher.
type DispatcherMetrics struct { type DispatcherMetrics struct {
aggrGroups prometheus.Gauge aggrGroups prometheus.Gauge
processingDuration prometheus.Summary processingDuration prometheus.Summary
aggrGroupLimitReached prometheus.Counter
} }
// NewDispatcherMetrics returns a new registered DispatchMetrics. // NewDispatcherMetrics returns a new registered DispatchMetrics.
@ -52,10 +53,16 @@ func NewDispatcherMetrics(r prometheus.Registerer) *DispatcherMetrics {
Help: "Summary of latencies for the processing of alerts.", Help: "Summary of latencies for the processing of alerts.",
}, },
), ),
aggrGroupLimitReached: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "alertmanager_dispatcher_aggregation_group_limit_reached_total",
Help: "Number of times when dispatcher failed to create new aggregation group due to limit.",
},
),
} }
if r != nil { if r != nil {
r.MustRegister(m.aggrGroups, m.processingDuration) r.MustRegister(m.aggrGroups, m.processingDuration, m.aggrGroupLimitReached)
} }
return &m return &m
@ -68,12 +75,14 @@ type Dispatcher struct {
alerts provider.Alerts alerts provider.Alerts
stage notify.Stage stage notify.Stage
metrics *DispatcherMetrics metrics *DispatcherMetrics
limits Limits
marker types.Marker marker types.Marker
timeout func(time.Duration) time.Duration timeout func(time.Duration) time.Duration
aggrGroups map[*Route]map[model.Fingerprint]*aggrGroup mtx sync.RWMutex
mtx sync.RWMutex aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup
aggrGroupsNum int
done chan struct{} done chan struct{}
ctx context.Context ctx context.Context
@ -82,6 +91,14 @@ type Dispatcher struct {
logger log.Logger logger log.Logger
} }
// Limits describes limits used by Dispatcher.
type Limits interface {
// MaxNumberOfAggregationGroups returns max number of aggregation groups that dispatcher can have.
// 0 or negative value = unlimited.
// If dispatcher hits this limit, it will not create additional groups, but will log an error instead.
MaxNumberOfAggregationGroups() int
}
// NewDispatcher returns a new Dispatcher. // NewDispatcher returns a new Dispatcher.
func NewDispatcher( func NewDispatcher(
ap provider.Alerts, ap provider.Alerts,
@ -89,9 +106,14 @@ func NewDispatcher(
s notify.Stage, s notify.Stage,
mk types.Marker, mk types.Marker,
to func(time.Duration) time.Duration, to func(time.Duration) time.Duration,
lim Limits,
l log.Logger, l log.Logger,
m *DispatcherMetrics, m *DispatcherMetrics,
) *Dispatcher { ) *Dispatcher {
if lim == nil {
lim = nilLimits{}
}
disp := &Dispatcher{ disp := &Dispatcher{
alerts: ap, alerts: ap,
stage: s, stage: s,
@ -100,6 +122,7 @@ func NewDispatcher(
timeout: to, timeout: to,
logger: log.With(l, "component", "dispatcher"), logger: log.With(l, "component", "dispatcher"),
metrics: m, metrics: m,
limits: lim,
} }
return disp return disp
} }
@ -109,7 +132,8 @@ func (d *Dispatcher) Run() {
d.done = make(chan struct{}) d.done = make(chan struct{})
d.mtx.Lock() d.mtx.Lock()
d.aggrGroups = map[*Route]map[model.Fingerprint]*aggrGroup{} d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsNum = 0
d.metrics.aggrGroups.Set(0) d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background()) d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock() d.mtx.Unlock()
@ -152,11 +176,12 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
case <-cleanup.C: case <-cleanup.C:
d.mtx.Lock() d.mtx.Lock()
for _, groups := range d.aggrGroups { for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups { for _, ag := range groups {
if ag.empty() { if ag.empty() {
ag.stop() ag.stop()
delete(groups, ag.fingerprint()) delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec() d.metrics.aggrGroups.Dec()
} }
} }
@ -201,7 +226,7 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
receivers := map[model.Fingerprint][]string{} receivers := map[model.Fingerprint][]string{}
now := time.Now() now := time.Now()
for route, ags := range d.aggrGroups { for route, ags := range d.aggrGroupsPerRoute {
if !routeFilter(route) { if !routeFilter(route) {
continue continue
} }
@ -284,21 +309,28 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() defer d.mtx.Unlock()
group, ok := d.aggrGroups[route] routeGroups, ok := d.aggrGroupsPerRoute[route]
if !ok { if !ok {
group = map[model.Fingerprint]*aggrGroup{} routeGroups = map[model.Fingerprint]*aggrGroup{}
d.aggrGroups[route] = group d.aggrGroupsPerRoute[route] = routeGroups
} }
ag, ok := group[fp] ag, ok := routeGroups[fp]
if ok { if ok {
ag.insert(alert) ag.insert(alert)
return return
} }
// If the group does not exist, create it. // If the group does not exist, create it. But check the limit first.
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
d.metrics.aggrGroupLimitReached.Inc()
level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
return
}
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
group[fp] = ag routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc() d.metrics.aggrGroups.Inc()
// Insert the 1st alert in the group before starting the group's run() // Insert the 1st alert in the group before starting the group's run()
@ -499,3 +531,7 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
} }
} }
} }
type nilLimits struct{}
func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 }

View File

@ -24,6 +24,7 @@ import (
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -373,7 +374,9 @@ route:
timeout := func(d time.Duration) time.Duration { return time.Duration(0) } timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry())) lim := limits{groups: 6}
m := NewDispatcherMetrics(prometheus.NewRegistry())
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m)
go dispatcher.Run() go dispatcher.Run()
defer dispatcher.Stop() defer dispatcher.Stop()
@ -391,7 +394,10 @@ route:
// Matches the second and third sub-route. // Matches the second and third sub-route.
newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}), newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}),
} }
alerts.Put(inputAlerts...) err = alerts.Put(inputAlerts...)
if err != nil {
t.Fatal(err)
}
// Let alerts get processed. // Let alerts get processed.
for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ { for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ {
@ -411,63 +417,87 @@ route:
&AlertGroup{ &AlertGroup{
Alerts: []*types.Alert{inputAlerts[0]}, Alerts: []*types.Alert{inputAlerts[0]},
Labels: model.LabelSet{ Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("OtherAlert"), "alertname": "OtherAlert",
}, },
Receiver: "prod", Receiver: "prod",
}, },
&AlertGroup{ &AlertGroup{
Alerts: []*types.Alert{inputAlerts[1]}, Alerts: []*types.Alert{inputAlerts[1]},
Labels: model.LabelSet{ Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("TestingAlert"), "alertname": "TestingAlert",
model.LabelName("service"): model.LabelValue("api"), "service": "api",
}, },
Receiver: "testing", Receiver: "testing",
}, },
&AlertGroup{ &AlertGroup{
Alerts: []*types.Alert{inputAlerts[2], inputAlerts[3]}, Alerts: []*types.Alert{inputAlerts[2], inputAlerts[3]},
Labels: model.LabelSet{ Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("HighErrorRate"), "alertname": "HighErrorRate",
model.LabelName("service"): model.LabelValue("api"), "service": "api",
model.LabelName("cluster"): model.LabelValue("aa"), "cluster": "aa",
}, },
Receiver: "prod", Receiver: "prod",
}, },
&AlertGroup{ &AlertGroup{
Alerts: []*types.Alert{inputAlerts[4]}, Alerts: []*types.Alert{inputAlerts[4]},
Labels: model.LabelSet{ Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("HighErrorRate"), "alertname": "HighErrorRate",
model.LabelName("service"): model.LabelValue("api"), "service": "api",
model.LabelName("cluster"): model.LabelValue("bb"), "cluster": "bb",
}, },
Receiver: "prod", Receiver: "prod",
}, },
&AlertGroup{ &AlertGroup{
Alerts: []*types.Alert{inputAlerts[5]}, Alerts: []*types.Alert{inputAlerts[5]},
Labels: model.LabelSet{ Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("HighLatency"), "alertname": "HighLatency",
model.LabelName("service"): model.LabelValue("db"), "service": "db",
model.LabelName("cluster"): model.LabelValue("bb"), "cluster": "bb",
}, },
Receiver: "kafka", Receiver: "kafka",
}, },
&AlertGroup{ &AlertGroup{
Alerts: []*types.Alert{inputAlerts[5]}, Alerts: []*types.Alert{inputAlerts[5]},
Labels: model.LabelSet{ Labels: model.LabelSet{
model.LabelName("alertname"): model.LabelValue("HighLatency"), "alertname": "HighLatency",
model.LabelName("service"): model.LabelValue("db"), "service": "db",
model.LabelName("cluster"): model.LabelValue("bb"), "cluster": "bb",
}, },
Receiver: "prod", Receiver: "prod",
}, },
}, alertGroups) }, alertGroups)
require.Equal(t, map[model.Fingerprint][]string{ require.Equal(t, map[model.Fingerprint][]string{
inputAlerts[0].Fingerprint(): []string{"prod"}, inputAlerts[0].Fingerprint(): {"prod"},
inputAlerts[1].Fingerprint(): []string{"testing"}, inputAlerts[1].Fingerprint(): {"testing"},
inputAlerts[2].Fingerprint(): []string{"prod"}, inputAlerts[2].Fingerprint(): {"prod"},
inputAlerts[3].Fingerprint(): []string{"prod"}, inputAlerts[3].Fingerprint(): {"prod"},
inputAlerts[4].Fingerprint(): []string{"prod"}, inputAlerts[4].Fingerprint(): {"prod"},
inputAlerts[5].Fingerprint(): []string{"kafka", "prod"}, inputAlerts[5].Fingerprint(): {"kafka", "prod"},
}, receivers) }, receivers)
require.Equal(t, 0.0, testutil.ToFloat64(m.aggrGroupLimitReached))
// Try to store new alert. This time, we will hit limit for number of groups.
err = alerts.Put(newAlert(model.LabelSet{"env": "prod", "alertname": "NewAlert", "cluster": "new-cluster", "service": "db"}))
if err != nil {
t.Fatal(err)
}
// Let alert get processed.
for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ {
time.Sleep(200 * time.Millisecond)
}
require.Equal(t, 1.0, testutil.ToFloat64(m.aggrGroupLimitReached))
// Verify there are still only 6 groups.
alertGroups, _ = dispatcher.Groups(
func(*Route) bool {
return true
}, func(*types.Alert, time.Time) bool {
return true
},
)
require.Len(t, alertGroups, 6)
} }
type recordStage struct { type recordStage struct {
@ -534,7 +564,7 @@ func TestDispatcherRace(t *testing.T) {
defer alerts.Close() defer alerts.Close()
timeout := func(d time.Duration) time.Duration { return time.Duration(0) } timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry())) dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(prometheus.NewRegistry()))
go dispatcher.Run() go dispatcher.Run()
dispatcher.Stop() dispatcher.Stop()
} }
@ -562,7 +592,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
timeout := func(d time.Duration) time.Duration { return d } timeout := func(d time.Duration) time.Duration { return d }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry())) dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(prometheus.NewRegistry()))
go dispatcher.Run() go dispatcher.Run()
defer dispatcher.Stop() defer dispatcher.Stop()
@ -585,3 +615,11 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
// We expect all alerts to be notified immediately, since they all belong to different groups. // We expect all alerts to be notified immediately, since they all belong to different groups.
require.Equal(t, numAlerts, len(recorder.Alerts())) require.Equal(t, numAlerts, len(recorder.Alerts()))
} }
type limits struct {
groups int
}
func (l limits) MaxNumberOfAggregationGroups() int {
return l.groups
}