Merge pull request #13980 from prometheus/gotjosh/restore-only-with-rule-query

Rule Manager: Only query once per alert rule when restoring alert state
This commit is contained in:
gotjosh 2024-04-30 15:29:21 +01:00 committed by GitHub
commit 1dd0bff4f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 165 additions and 135 deletions

View File

@ -3,6 +3,7 @@
## unreleased ## unreleased
* [CHANGE] TSDB: Fix the predicate checking for blocks which are beyond the retention period to include the ones right at the retention boundary. #9633 * [CHANGE] TSDB: Fix the predicate checking for blocks which are beyond the retention period to include the ones right at the retention boundary. #9633
* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974 * [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991 * [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991

View File

@ -246,13 +246,16 @@ func (r *AlertingRule) sample(alert *Alert, ts time.Time) promql.Sample {
return s return s
} }
// forStateSample returns the sample for ALERTS_FOR_STATE. // forStateSample returns a promql.Sample with the rule labels, `ALERTS_FOR_STATE` as the metric name and the rule name as the `alertname` label.
// Optionally, if an alert is provided it'll copy the labels of the alert into the sample labels.
func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) promql.Sample { func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) promql.Sample {
lb := labels.NewBuilder(r.labels) lb := labels.NewBuilder(r.labels)
if alert != nil {
alert.Labels.Range(func(l labels.Label) { alert.Labels.Range(func(l labels.Label) {
lb.Set(l.Name, l.Value) lb.Set(l.Name, l.Value)
}) })
}
lb.Set(labels.MetricName, alertForStateMetricName) lb.Set(labels.MetricName, alertForStateMetricName)
lb.Set(labels.AlertName, r.name) lb.Set(labels.AlertName, r.name)
@ -265,9 +268,11 @@ func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) pro
return s return s
} }
// QueryforStateSeries returns the series for ALERTS_FOR_STATE. // QueryForStateSeries returns the series for ALERTS_FOR_STATE of the alert rule.
func (r *AlertingRule) QueryforStateSeries(ctx context.Context, alert *Alert, q storage.Querier) (storage.Series, error) { func (r *AlertingRule) QueryForStateSeries(ctx context.Context, q storage.Querier) (storage.SeriesSet, error) {
smpl := r.forStateSample(alert, time.Now(), 0) // We use a sample to ease the building of matchers.
// Don't provide an alert as we want matchers that match all series for the alert rule.
smpl := r.forStateSample(nil, time.Now(), 0)
var matchers []*labels.Matcher var matchers []*labels.Matcher
smpl.Metric.Range(func(l labels.Label) { smpl.Metric.Range(func(l labels.Label) {
mt, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value) mt, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value)
@ -276,20 +281,9 @@ func (r *AlertingRule) QueryforStateSeries(ctx context.Context, alert *Alert, q
} }
matchers = append(matchers, mt) matchers = append(matchers, mt)
}) })
sset := q.Select(ctx, false, nil, matchers...) sset := q.Select(ctx, false, nil, matchers...)
return sset, sset.Err()
var s storage.Series
for sset.Next() {
// Query assures that smpl.Metric is included in sset.At().Labels(),
// hence just checking the length would act like equality.
// (This is faster than calling labels.Compare again as we already have some info).
if sset.At().Labels().Len() == len(matchers) {
s = sset.At()
break
}
}
return s, sset.Err()
} }
// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation. // SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation.
@ -557,6 +551,13 @@ func (r *AlertingRule) ForEachActiveAlert(f func(*Alert)) {
} }
} }
func (r *AlertingRule) ActiveAlertsCount() int {
r.activeMtx.Lock()
defer r.activeMtx.Unlock()
return len(r.active)
}
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay, interval time.Duration, notifyFunc NotifyFunc) { func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{} alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) { r.ForEachActiveAlert(func(alert *Alert) {

View File

@ -710,19 +710,17 @@ func TestQueryForStateSeries(t *testing.T) {
labels.EmptyLabels(), labels.EmptyLabels(), "", true, nil, labels.EmptyLabels(), labels.EmptyLabels(), "", true, nil,
) )
alert := &Alert{ sample := rule.forStateSample(nil, time.Time{}, 0)
State: 0,
Labels: labels.EmptyLabels(),
Annotations: labels.EmptyLabels(),
Value: 0,
ActiveAt: time.Time{},
FiredAt: time.Time{},
ResolvedAt: time.Time{},
LastSentAt: time.Time{},
ValidUntil: time.Time{},
}
series, err := rule.QueryforStateSeries(context.Background(), alert, querier) seriesSet, err := rule.QueryForStateSeries(context.Background(), querier)
var series storage.Series
for seriesSet.Next() {
if seriesSet.At().Labels().Len() == sample.Metric.Len() {
series = seriesSet.At()
break
}
}
require.Equal(t, tst.expectedSeries, series) require.Equal(t, tst.expectedSeries, series)
require.Equal(t, tst.expectedError, err) require.Equal(t, tst.expectedError, err)
@ -1025,3 +1023,24 @@ func TestAlertingRule_SetNoDependencyRules(t *testing.T) {
rule.SetNoDependencyRules(true) rule.SetNoDependencyRules(true)
require.True(t, rule.NoDependencyRules()) require.True(t, rule.NoDependencyRules())
} }
func TestAlertingRule_ActiveAlertsCount(t *testing.T) {
rule := NewAlertingRule(
"TestRule",
nil,
time.Minute,
0,
labels.FromStrings("severity", "critical"),
labels.EmptyLabels(), labels.EmptyLabels(), "", true, nil,
)
require.Equal(t, 0, rule.ActiveAlertsCount())
// Set an active alert.
lbls := labels.FromStrings("a1", "1")
h := lbls.Hash()
al := &Alert{State: StateFiring, Labels: lbls, ActiveAt: time.Now()}
rule.active[h] = al
require.Equal(t, 1, rule.ActiveAlertsCount())
}

View File

@ -664,25 +664,36 @@ func (g *Group) RestoreForState(ts time.Time) {
continue continue
} }
alertRule.ForEachActiveAlert(func(a *Alert) { sset, err := alertRule.QueryForStateSeries(g.opts.Context, q)
var s storage.Series
s, err := alertRule.QueryforStateSeries(g.opts.Context, a, q)
if err != nil { if err != nil {
// Querier Warnings are ignored. We do not care unless we have an error.
level.Error(g.logger).Log( level.Error(g.logger).Log(
"msg", "Failed to restore 'for' state", "msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(), labels.AlertName, alertRule.Name(),
"stage", "Select", "stage", "Select",
"err", err, "err", err,
) )
return continue
} }
if s == nil { // While not technically the same number of series we expect, it's as good of an approximation as any.
return seriesByLabels := make(map[string]storage.Series, alertRule.ActiveAlertsCount())
for sset.Next() {
seriesByLabels[sset.At().Labels().DropMetricName().String()] = sset.At()
} }
// No results for this alert rule.
if len(seriesByLabels) == 0 {
level.Debug(g.logger).Log("msg", "Failed to find a series to restore the 'for' state", labels.AlertName, alertRule.Name())
continue
}
alertRule.ForEachActiveAlert(func(a *Alert) {
var s storage.Series
s, ok := seriesByLabels[a.Labels.String()]
if !ok {
return
}
// Series found for the 'for' state. // Series found for the 'for' state.
var t int64 var t int64
var v float64 var v float64

View File

@ -397,46 +397,58 @@ func TestForStateRestore(t *testing.T) {
group.Eval(context.TODO(), evalTime) group.Eval(context.TODO(), evalTime)
} }
exp := rule.ActiveAlerts()
for _, aa := range exp {
require.Zero(t, aa.Labels.Get(model.MetricNameLabel), "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
}
sort.Slice(exp, func(i, j int) bool {
return labels.Compare(exp[i].Labels, exp[j].Labels) < 0
})
// Prometheus goes down here. We create new rules and groups. // Prometheus goes down here. We create new rules and groups.
type testInput struct { type testInput struct {
name string
restoreDuration time.Duration restoreDuration time.Duration
alerts []*Alert expectedAlerts []*Alert
num int num int
noRestore bool noRestore bool
gracePeriod bool gracePeriod bool
downDuration time.Duration downDuration time.Duration
before func()
} }
tests := []testInput{ tests := []testInput{
{ {
// Normal restore (alerts were not firing). name: "normal restore (alerts were not firing)",
restoreDuration: 15 * time.Minute, restoreDuration: 15 * time.Minute,
alerts: rule.ActiveAlerts(), expectedAlerts: rule.ActiveAlerts(),
downDuration: 10 * time.Minute, downDuration: 10 * time.Minute,
}, },
{ {
// Testing Outage Tolerance. name: "outage tolerance",
restoreDuration: 40 * time.Minute, restoreDuration: 40 * time.Minute,
noRestore: true, noRestore: true,
num: 2, num: 2,
}, },
{ {
// No active alerts. name: "no active alerts",
restoreDuration: 50 * time.Minute, restoreDuration: 50 * time.Minute,
alerts: []*Alert{}, expectedAlerts: []*Alert{},
},
{
name: "test the grace period",
restoreDuration: 25 * time.Minute,
expectedAlerts: []*Alert{},
gracePeriod: true,
before: func() {
for _, duration := range []time.Duration{10 * time.Minute, 15 * time.Minute, 20 * time.Minute} {
evalTime := baseTime.Add(duration)
group.Eval(context.TODO(), evalTime)
}
},
num: 2,
}, },
} }
testFunc := func(tst testInput) { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.before != nil {
tt.before()
}
newRule := NewAlertingRule( newRule := NewAlertingRule(
"HTTPRequestRateLow", "HTTPRequestRateLow",
expr, expr,
@ -456,7 +468,7 @@ func TestForStateRestore(t *testing.T) {
newGroups := make(map[string]*Group) newGroups := make(map[string]*Group)
newGroups["default;"] = newGroup newGroups["default;"] = newGroup
restoreTime := baseTime.Add(tst.restoreDuration) restoreTime := baseTime.Add(tt.restoreDuration)
// First eval before restoration. // First eval before restoration.
newGroup.Eval(context.TODO(), restoreTime) newGroup.Eval(context.TODO(), restoreTime)
// Restore happens here. // Restore happens here.
@ -472,18 +484,19 @@ func TestForStateRestore(t *testing.T) {
// Checking if we have restored it correctly. // Checking if we have restored it correctly.
switch { switch {
case tst.noRestore: case tt.noRestore:
require.Len(t, got, tst.num) require.Len(t, got, tt.num)
for _, e := range got { for _, e := range got {
require.Equal(t, e.ActiveAt, restoreTime) require.Equal(t, e.ActiveAt, restoreTime)
} }
case tst.gracePeriod: case tt.gracePeriod:
require.Len(t, got, tst.num)
require.Len(t, got, tt.num)
for _, e := range got { for _, e := range got {
require.Equal(t, opts.ForGracePeriod, e.ActiveAt.Add(alertForDuration).Sub(restoreTime)) require.Equal(t, opts.ForGracePeriod, e.ActiveAt.Add(alertForDuration).Sub(restoreTime))
} }
default: default:
exp := tst.alerts exp := tt.expectedAlerts
require.Equal(t, len(exp), len(got)) require.Equal(t, len(exp), len(got))
sortAlerts(exp) sortAlerts(exp)
sortAlerts(got) sortAlerts(got)
@ -492,28 +505,13 @@ func TestForStateRestore(t *testing.T) {
// Difference in time should be within 1e6 ns, i.e. 1ms // Difference in time should be within 1e6 ns, i.e. 1ms
// (due to conversion between ns & ms, float64 & int64). // (due to conversion between ns & ms, float64 & int64).
activeAtDiff := float64(e.ActiveAt.Unix() + int64(tst.downDuration/time.Second) - got[i].ActiveAt.Unix()) activeAtDiff := float64(e.ActiveAt.Unix() + int64(tt.downDuration/time.Second) - got[i].ActiveAt.Unix())
require.Equal(t, 0.0, math.Abs(activeAtDiff), "'for' state restored time is wrong") require.Equal(t, 0.0, math.Abs(activeAtDiff), "'for' state restored time is wrong")
} }
} }
}
for _, tst := range tests {
testFunc(tst)
}
// Testing the grace period.
for _, duration := range []time.Duration{10 * time.Minute, 15 * time.Minute, 20 * time.Minute} {
evalTime := baseTime.Add(duration)
group.Eval(context.TODO(), evalTime)
}
testFunc(testInput{
restoreDuration: 25 * time.Minute,
alerts: []*Alert{},
gracePeriod: true,
num: 2,
}) })
} }
}
func TestStaleness(t *testing.T) { func TestStaleness(t *testing.T) {
st := teststorage.New(t) st := teststorage.New(t)