diff --git a/docs/configuration/recording_rules.md b/docs/configuration/recording_rules.md index ab1f5d32c..8b12632dc 100644 --- a/docs/configuration/recording_rules.md +++ b/docs/configuration/recording_rules.md @@ -78,6 +78,10 @@ name: # How often rules in the group are evaluated. [ interval: | default = global.evaluation_interval ] +# Limit the number of alerts and series individual rules can produce. +# 0 is no limit. +[ limit: | default = 0 ] + rules: [ - ... ] ``` diff --git a/pkg/rulefmt/rulefmt.go b/pkg/rulefmt/rulefmt.go index b670032f0..62cd6de09 100644 --- a/pkg/rulefmt/rulefmt.go +++ b/pkg/rulefmt/rulefmt.go @@ -107,6 +107,7 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) { type RuleGroup struct { Name string `yaml:"name"` Interval model.Duration `yaml:"interval,omitempty"` + Limit int `yaml:"limit,omitempty"` Rules []RuleNode `yaml:"rules"` } diff --git a/rules/alerting.go b/rules/alerting.go index d1fb1cc75..5e7b8975c 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -297,7 +297,7 @@ const resolvedRetention = 15 * time.Minute // Eval evaluates the rule expression and then creates pending alerts and fires // or removes previously pending alerts accordingly. -func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) { +func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) { res, err := query(ctx, r.vector.String(), ts) if err != nil { return nil, err @@ -415,6 +415,12 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, } } + numActive := len(r.active) + if limit != 0 && numActive > limit { + r.active = map[uint64]*Alert{} + return nil, errors.Errorf("exceeded limit of %d with %d alerts", limit, numActive) + } + return vec, nil } diff --git a/rules/alerting_test.go b/rules/alerting_test.go index 1e4617551..94d505445 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -170,7 +170,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { t.Logf("case %d", i) evalTime := baseTime.Add(time.Duration(i) * time.Minute) result[0].Point.T = timestamp.FromTime(evalTime) - res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil) + res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -252,7 +252,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalLabels.Eval( - suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, + suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -266,7 +266,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { } res, err = ruleWithExternalLabels.Eval( - suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, + suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -346,7 +346,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalURL.Eval( - suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, + suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -360,7 +360,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { } res, err = ruleWithExternalURL.Eval( - suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, + suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -417,7 +417,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) { var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := rule.Eval( - suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, + suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -460,7 +460,61 @@ func TestAlertingRuleDuplicate(t *testing.T) { "", true, log.NewNopLogger(), ) - _, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil) + _, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0) require.Error(t, err) require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels") } + +func TestAlertingRuleLimit(t *testing.T) { + storage := teststorage.New(t) + defer storage.Close() + + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + + engine := promql.NewEngine(opts) + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + now := time.Now() + + suite := []struct { + limit int + err string + }{ + { + limit: 0, + }, + { + limit: 1, + }, + { + limit: -1, + err: "exceeded limit of -1 with 1 alerts", + }, + } + + for _, test := range suite { + expr, _ := parser.ParseExpr(`1`) + rule := NewAlertingRule( + "foo", + expr, + time.Minute, + labels.FromStrings("test", "test"), + nil, + nil, + "", + true, log.NewNopLogger(), + ) + _, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, test.limit) + if test.err == "" { + require.NoError(t, err) + } else { + require.Equal(t, test.err, err.Error()) + } + } +} diff --git a/rules/manager.go b/rules/manager.go index 63186d1d8..fa8cd6763 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -213,7 +213,7 @@ type Rule interface { // Labels of the rule. Labels() labels.Labels // eval evaluates the rule, including any associated recording or alerting actions. - Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error) + Eval(context.Context, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) // String returns a human-readable string representation of the rule. String() string // Query returns the rule query expression. @@ -244,6 +244,7 @@ type Group struct { name string file string interval time.Duration + limit int rules []Rule seriesInPreviousEval []map[string]labels.Labels // One per Rule. staleSeries []labels.Labels @@ -267,6 +268,7 @@ type Group struct { type GroupOptions struct { Name, File string Interval time.Duration + Limit int Rules []Rule ShouldRestore bool Opts *ManagerOptions @@ -295,6 +297,7 @@ func NewGroup(o GroupOptions) *Group { name: o.Name, file: o.File, interval: o.Interval, + limit: o.Limit, rules: o.Rules, shouldRestore: o.ShouldRestore, opts: o.Opts, @@ -319,6 +322,9 @@ func (g *Group) Rules() []Rule { return g.rules } // Interval returns the group's interval. func (g *Group) Interval() time.Duration { return g.interval } +// Limit returns the group's limit. +func (g *Group) Limit() int { return g.limit } + func (g *Group) run(ctx context.Context) { defer close(g.terminated) @@ -591,7 +597,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL) + vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) if err != nil { rule.SetHealth(HealthBad) rule.SetLastError(err) @@ -850,6 +856,10 @@ func (g *Group) Equals(ng *Group) bool { return false } + if g.limit != ng.limit { + return false + } + if len(g.rules) != len(ng.rules) { return false } @@ -1086,6 +1096,7 @@ func (m *Manager) LoadGroups( Name: rg.Name, File: fn, Interval: itv, + Limit: rg.Limit, Rules: rules, ShouldRestore: shouldRestore, Opts: m.opts, diff --git a/rules/manager_test.go b/rules/manager_test.go index 8f42228df..f3b335070 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -156,7 +156,7 @@ func TestAlertingRule(t *testing.T) { evalTime := baseTime.Add(test.time) - res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil) + res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -305,7 +305,7 @@ func TestForStateAddSamples(t *testing.T) { forState = float64(value.StaleNaN) } - res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil) + res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS' samples. @@ -773,6 +773,12 @@ func TestUpdate(t *testing.T) { } reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) + // Update limit and reload. + for i := range rgs.Groups { + rgs.Groups[i].Limit = 1 + } + reloadAndValidate(rgs, t, tmpFile, ruleManager, expected, ogs) + // Change group rules and reload. for i, g := range rgs.Groups { for j, r := range g.Rules { @@ -791,6 +797,7 @@ type ruleGroupsTest struct { type ruleGroupTest struct { Name string `yaml:"name"` Interval model.Duration `yaml:"interval,omitempty"` + Limit int `yaml:"limit,omitempty"` Rules []rulefmt.Rule `yaml:"rules"` } @@ -812,6 +819,7 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest { tmp = append(tmp, ruleGroupTest{ Name: g.Name, Interval: g.Interval, + Limit: g.Limit, Rules: rtmp, }) } diff --git a/rules/recording.go b/rules/recording.go index 1ffe98cbd..08a7e37ca 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -73,7 +73,7 @@ func (rule *RecordingRule) Labels() labels.Labels { } // Eval evaluates the rule and then overrides the metric names and labels accordingly. -func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) { +func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) { vector, err := query(ctx, rule.vector.String(), ts) if err != nil { return nil, err @@ -99,6 +99,13 @@ func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFu return nil, fmt.Errorf("vector contains metrics with the same labelset after applying rule labels") } + numSamples := len(vector) + if limit != 0 && numSamples > limit { + return nil, fmt.Errorf("exceeded limit %d with %d samples", limit, numSamples) + } + + rule.SetHealth(HealthGood) + rule.SetLastError(err) return vector, nil } diff --git a/rules/recording_test.go b/rules/recording_test.go index 5ace5ba77..211b6cda6 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -49,7 +49,9 @@ func TestRuleEval(t *testing.T) { name string expr parser.Expr labels labels.Labels + limit int result promql.Vector + err string }{ { name: "nolabels", @@ -69,12 +71,43 @@ func TestRuleEval(t *testing.T) { Point: promql.Point{V: 1, T: timestamp.FromTime(now)}, }}, }, + { + name: "underlimit", + expr: &parser.NumberLiteral{Val: 1}, + labels: labels.FromStrings("foo", "bar"), + limit: 2, + result: promql.Vector{promql.Sample{ + Metric: labels.FromStrings("__name__", "underlimit", "foo", "bar"), + Point: promql.Point{V: 1, T: timestamp.FromTime(now)}, + }}, + }, + { + name: "atlimit", + expr: &parser.NumberLiteral{Val: 1}, + labels: labels.FromStrings("foo", "bar"), + limit: 1, + result: promql.Vector{promql.Sample{ + Metric: labels.FromStrings("__name__", "atlimit", "foo", "bar"), + Point: promql.Point{V: 1, T: timestamp.FromTime(now)}, + }}, + }, + { + name: "overlimit", + expr: &parser.NumberLiteral{Val: 1}, + labels: labels.FromStrings("foo", "bar"), + limit: -1, + err: "exceeded limit -1 with 1 samples", + }, } for _, test := range suite { rule := NewRecordingRule(test.name, test.expr, test.labels) - result, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil) - require.NoError(t, err) + result, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, test.limit) + if test.err == "" { + require.NoError(t, err) + } else { + require.Equal(t, test.err, err.Error()) + } require.Equal(t, test.result, result) } } @@ -114,7 +147,7 @@ func TestRuleEvalDuplicate(t *testing.T) { expr, _ := parser.ParseExpr(`vector(0) or label_replace(vector(0),"test","x","","")`) rule := NewRecordingRule("foo", expr, labels.FromStrings("test", "test")) - _, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil) + _, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0) require.Error(t, err) require.EqualError(t, err, "vector contains metrics with the same labelset after applying rule labels") }