diff --git a/rules/group.go b/rules/group.go index 1ce15be9f..ecc96d0a1 100644 --- a/rules/group.go +++ b/rules/group.go @@ -23,6 +23,9 @@ import ( "sync" "time" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.uber.org/atomic" "github.com/prometheus/prometheus/promql/parser" @@ -30,9 +33,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -511,149 +511,147 @@ func (g *Group) CopyState(from *Group) { // Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled. func (g *Group) Eval(ctx context.Context, ts time.Time) { var ( - samplesTotal atomic.Float64 - wg sync.WaitGroup + samplesTotal atomic.Float64 + ruleQueryOffset = g.QueryOffset() ) - - ruleQueryOffset := g.QueryOffset() - - for i, rule := range g.rules { - select { - case <-g.done: - return - default: + eval := func(i int, rule Rule, cleanup func()) { + if cleanup != nil { + defer cleanup() } - eval := func(i int, rule Rule, cleanup func()) { - if cleanup != nil { - defer cleanup() + logger := g.logger.With("name", rule.Name(), "index", i) + ctx, sp := otel.Tracer("").Start(ctx, "rule") + sp.SetAttributes(attribute.String("name", rule.Name())) + defer func(t time.Time) { + sp.End() + + since := time.Since(t) + g.metrics.EvalDuration.Observe(since.Seconds()) + rule.SetEvaluationDuration(since) + rule.SetEvaluationTimestamp(t) + }(time.Now()) + + if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() { + logger = logger.With("trace_id", sp.SpanContext().TraceID()) + } + + g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + // Canceled queries are intentional termination of queries. This normally + // happens on shutdown and thus we skip logging of any errors here. + var eqc promql.ErrQueryCanceled + if !errors.As(err, &eqc) { + logger.Warn("Evaluating rule failed", "rule", rule, "err", err) } + return + } + rule.SetHealth(HealthGood) + rule.SetLastError(nil) + samplesTotal.Add(float64(len(vector))) - logger := g.logger.With("name", rule.Name(), "index", i) - ctx, sp := otel.Tracer("").Start(ctx, "rule") - sp.SetAttributes(attribute.String("name", rule.Name())) - defer func(t time.Time) { - sp.End() + if ar, ok := rule.(*AlertingRule); ok { + ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) + } + var ( + numOutOfOrder = 0 + numTooOld = 0 + numDuplicates = 0 + ) - since := time.Since(t) - g.metrics.EvalDuration.Observe(since.Seconds()) - rule.SetEvaluationDuration(since) - rule.SetEvaluationTimestamp(t) - }(time.Now()) - - if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() { - logger = logger.With("trace_id", sp.SpanContext().TraceID()) - } - - g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - - vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) - if err != nil { + app := g.opts.Appendable.Appender(ctx) + seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) + defer func() { + if err := app.Commit(); err != nil { rule.SetHealth(HealthBad) rule.SetLastError(err) sp.SetStatus(codes.Error, err.Error()) g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - // Canceled queries are intentional termination of queries. This normally - // happens on shutdown and thus we skip logging of any errors here. - var eqc promql.ErrQueryCanceled - if !errors.As(err, &eqc) { - logger.Warn("Evaluating rule failed", "rule", rule, "err", err) - } + logger.Warn("Rule sample appending failed", "err", err) return } - rule.SetHealth(HealthGood) - rule.SetLastError(nil) - samplesTotal.Add(float64(len(vector))) + g.seriesInPreviousEval[i] = seriesReturned + }() - if ar, ok := rule.(*AlertingRule); ok { - ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) + for _, s := range vector { + if s.H != nil { + _, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H) + } else { + app.SetOptions(g.appOpts) + _, err = app.Append(0, s.Metric, s.T, s.F) } - var ( - numOutOfOrder = 0 - numTooOld = 0 - numDuplicates = 0 - ) - app := g.opts.Appendable.Appender(ctx) - seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) - defer func() { - if err := app.Commit(); err != nil { - rule.SetHealth(HealthBad) - rule.SetLastError(err) - sp.SetStatus(codes.Error, err.Error()) - g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - - logger.Warn("Rule sample appending failed", "err", err) - return + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err } - g.seriesInPreviousEval[i] = seriesReturned - }() - - for _, s := range vector { - if s.H != nil { - _, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H) - } else { - app.SetOptions(g.appOpts) - _, err = app.Append(0, s.Metric, s.T, s.F) + switch { + case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample): + numOutOfOrder++ + logger.Debug("Rule evaluation result discarded", "err", err, "sample", s) + case errors.Is(unwrappedErr, storage.ErrTooOldSample): + numTooOld++ + logger.Debug("Rule evaluation result discarded", "err", err, "sample", s) + case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): + numDuplicates++ + logger.Debug("Rule evaluation result discarded", "err", err, "sample", s) + default: + logger.Warn("Rule evaluation result discarded", "err", err, "sample", s) } + } else { + buf := [1024]byte{} + seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric + } + } + if numOutOfOrder > 0 { + logger.Warn("Error on ingesting out-of-order result from rule evaluation", "num_dropped", numOutOfOrder) + } + if numTooOld > 0 { + logger.Warn("Error on ingesting too old result from rule evaluation", "num_dropped", numTooOld) + } + if numDuplicates > 0 { + logger.Warn("Error on ingesting results from rule evaluation with different value but same timestamp", "num_dropped", numDuplicates) + } - if err != nil { - rule.SetHealth(HealthBad) - rule.SetLastError(err) - sp.SetStatus(codes.Error, err.Error()) - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - switch { - case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample): - numOutOfOrder++ - logger.Debug("Rule evaluation result discarded", "err", err, "sample", s) - case errors.Is(unwrappedErr, storage.ErrTooOldSample): - numTooOld++ - logger.Debug("Rule evaluation result discarded", "err", err, "sample", s) - case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): - numDuplicates++ - logger.Debug("Rule evaluation result discarded", "err", err, "sample", s) - default: - logger.Warn("Rule evaluation result discarded", "err", err, "sample", s) - } - } else { - buf := [1024]byte{} - seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric + for metric, lset := range g.seriesInPreviousEval[i] { + if _, ok := seriesReturned[metric]; !ok { + // Series no longer exposed, mark it stale. + _, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN)) + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + switch { + case unwrappedErr == nil: + case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample), + errors.Is(unwrappedErr, storage.ErrTooOldSample), + errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): + // Do not count these in logging, as this is expected if series + // is exposed from a different rule. + default: + logger.Warn("Adding stale sample failed", "sample", lset.String(), "err", err) } } - if numOutOfOrder > 0 { - logger.Warn("Error on ingesting out-of-order result from rule evaluation", "num_dropped", numOutOfOrder) - } - if numTooOld > 0 { - logger.Warn("Error on ingesting too old result from rule evaluation", "num_dropped", numTooOld) - } - if numDuplicates > 0 { - logger.Warn("Error on ingesting results from rule evaluation with different value but same timestamp", "num_dropped", numDuplicates) - } + } + } - for metric, lset := range g.seriesInPreviousEval[i] { - if _, ok := seriesReturned[metric]; !ok { - // Series no longer exposed, mark it stale. - _, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN)) - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - switch { - case unwrappedErr == nil: - case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample), - errors.Is(unwrappedErr, storage.ErrTooOldSample), - errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): - // Do not count these in logging, as this is expected if series - // is exposed from a different rule. - default: - logger.Warn("Adding stale sample failed", "sample", lset.String(), "err", err) - } - } - } + var wg sync.WaitGroup + for i, rule := range g.rules { + select { + case <-g.done: + return + default: } if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) { @@ -667,7 +665,6 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { eval(i, rule, nil) } } - wg.Wait() g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())