diff --git a/promql/bench_test.go b/promql/bench_test.go index bd6728029..33523b2db 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -165,6 +165,9 @@ func rangeQueryCases() []benchCase { { expr: "sum(a_X)", }, + { + expr: "avg(a_X)", + }, { expr: "sum without (l)(h_X)", }, diff --git a/promql/engine.go b/promql/engine.go index 25e67db63..14c370606 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2773,15 +2773,19 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram } type groupedAggregation struct { + floatValue float64 + histogramValue *histogram.FloatHistogram + floatMean float64 + floatKahanC float64 // "Compensating value" for Kahan summation. + groupCount float64 + heap vectorByValueHeap + + // All bools together for better packing within the struct. seen bool // Was this output groups seen in the input at this timestamp. hasFloat bool // Has at least 1 float64 sample aggregated. hasHistogram bool // Has at least 1 histogram sample aggregated. - floatValue float64 - histogramValue *histogram.FloatHistogram - floatMean float64 // Mean, or "compensating value" for Kahan summation. - groupCount int - groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group - heap vectorByValueHeap + groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group. + incrementalMean bool // True after reverting to incremental calculation of the mean value. } // aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. @@ -2807,13 +2811,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix *group = groupedAggregation{ seen: true, floatValue: f, + floatMean: f, groupCount: 1, } switch op { - case parser.AVG: - group.floatMean = f - fallthrough - case parser.SUM: + case parser.AVG, parser.SUM: if h == nil { group.hasFloat = true } else { @@ -2821,7 +2823,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix group.hasHistogram = true } case parser.STDVAR, parser.STDDEV: - group.floatMean = f group.floatValue = 0 case parser.QUANTILE: group.heap = make(vectorByValueHeap, 1) @@ -2847,7 +2848,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // point in copying the histogram in that case. } else { group.hasFloat = true - group.floatValue, group.floatMean = kahanSumInc(f, group.floatValue, group.floatMean) + group.floatValue, group.floatKahanC = kahanSumInc(f, group.floatValue, group.floatKahanC) } case parser.AVG: @@ -2855,8 +2856,8 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix if h != nil { group.hasHistogram = true if group.histogramValue != nil { - left := h.Copy().Div(float64(group.groupCount)) - right := group.histogramValue.Copy().Div(float64(group.groupCount)) + left := h.Copy().Div(group.groupCount) + right := group.histogramValue.Copy().Div(group.groupCount) toAdd, err := left.Sub(right) if err != nil { handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) @@ -2871,6 +2872,22 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // point in copying the histogram in that case. } else { group.hasFloat = true + if !group.incrementalMean { + newV, newC := kahanSumInc(f, group.floatValue, group.floatKahanC) + if !math.IsInf(newV, 0) { + // The sum doesn't overflow, so we propagate it to the + // group struct and continue with the regular + // calculation of the mean value. + group.floatValue, group.floatKahanC = newV, newC + break + } + // If we are here, we know that the sum _would_ overflow. So + // instead of continue to sum up, we revert to incremental + // calculation of the mean value from here on. + group.incrementalMean = true + group.floatMean = group.floatValue / (group.groupCount - 1) + group.floatKahanC /= group.groupCount - 1 + } if math.IsInf(group.floatMean, 0) { if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) { // The `floatMean` and `s.F` values are `Inf` of the same sign. They @@ -2888,8 +2905,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix break } } - // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. - group.floatMean += f/float64(group.groupCount) - group.floatMean/float64(group.groupCount) + currentMean := group.floatMean + group.floatKahanC + group.floatMean, group.floatKahanC = kahanSumInc( + // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. + f/group.groupCount-currentMean/group.groupCount, + group.floatMean, + group.floatKahanC, + ) } case parser.GROUP: @@ -2912,7 +2934,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix if h == nil { // Ignore native histograms. group.groupCount++ delta := f - group.floatMean - group.floatMean += delta / float64(group.groupCount) + group.floatMean += delta / group.groupCount group.floatValue += delta * (f - group.floatMean) } @@ -2938,20 +2960,23 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) continue } - if aggr.hasHistogram { + switch { + case aggr.hasHistogram: aggr.histogramValue = aggr.histogramValue.Compact(0) - } else { - aggr.floatValue = aggr.floatMean + case aggr.incrementalMean: + aggr.floatValue = aggr.floatMean + aggr.floatKahanC + default: + aggr.floatValue = (aggr.floatValue + aggr.floatKahanC) / aggr.groupCount } case parser.COUNT: - aggr.floatValue = float64(aggr.groupCount) + aggr.floatValue = aggr.groupCount case parser.STDVAR: - aggr.floatValue /= float64(aggr.groupCount) + aggr.floatValue /= aggr.groupCount case parser.STDDEV: - aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount)) + aggr.floatValue = math.Sqrt(aggr.floatValue / aggr.groupCount) case parser.QUANTILE: aggr.floatValue = quantile(q, aggr.heap) @@ -2965,7 +2990,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix if aggr.hasHistogram { aggr.histogramValue.Compact(0) } else { - aggr.floatValue += aggr.floatMean // Add Kahan summation compensating term. + aggr.floatValue += aggr.floatKahanC } default: // For other aggregations, we already have the right value. diff --git a/promql/functions.go b/promql/functions.go index b9e93b85a..35dbd2970 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -580,9 +580,28 @@ func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode return vec, nil } return aggrOverTime(vals, enh, func(s Series) float64 { - var mean, count, c float64 + var ( + sum, mean, count, kahanC float64 + incrementalMean bool + ) for _, f := range s.Floats { count++ + if !incrementalMean { + newSum, newC := kahanSumInc(f.F, sum, kahanC) + // Perform regular mean calculation as long as + // the sum doesn't overflow and (in any case) + // for the first iteration (even if we start + // with ±Inf) to not run into division-by-zero + // problems below. + if count == 1 || !math.IsInf(newSum, 0) { + sum, kahanC = newSum, newC + continue + } + // Handle overflow by reverting to incremental calculation of the mean value. + incrementalMean = true + mean = sum / (count - 1) + kahanC /= count - 1 + } if math.IsInf(mean, 0) { if math.IsInf(f.F, 0) && (mean > 0) == (f.F > 0) { // The `mean` and `f.F` values are `Inf` of the same sign. They @@ -600,13 +619,13 @@ func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode continue } } - mean, c = kahanSumInc(f.F/count-mean/count, mean, c) + correctedMean := mean + kahanC + mean, kahanC = kahanSumInc(f.F/count-correctedMean/count, mean, kahanC) } - - if math.IsInf(mean, 0) { - return mean + if incrementalMean { + return mean + kahanC } - return mean + c + return (sum + kahanC) / count }), nil } diff --git a/promql/promqltest/testdata/aggregators.test b/promql/promqltest/testdata/aggregators.test index cbb255a12..68d2e735b 100644 --- a/promql/promqltest/testdata/aggregators.test +++ b/promql/promqltest/testdata/aggregators.test @@ -503,7 +503,7 @@ eval instant at 1m avg(data{test="-big"}) eval instant at 1m avg(data{test="bigzero"}) {} 0 -# Test summing extreme values. +# Test summing and averaging extreme values. clear load 10s @@ -529,21 +529,39 @@ load 10s eval instant at 1m sum(data{test="ten"}) {} 10 +eval instant at 1m avg(data{test="ten"}) + {} 2.5 + eval instant at 1m sum by (group) (data{test="pos_inf"}) {group="1"} Inf {group="2"} Inf +eval instant at 1m avg by (group) (data{test="pos_inf"}) + {group="1"} Inf + {group="2"} Inf + eval instant at 1m sum by (group) (data{test="neg_inf"}) {group="1"} -Inf {group="2"} -Inf +eval instant at 1m avg by (group) (data{test="neg_inf"}) + {group="1"} -Inf + {group="2"} -Inf + eval instant at 1m sum(data{test="inf_inf"}) {} NaN +eval instant at 1m avg(data{test="inf_inf"}) + {} NaN + eval instant at 1m sum by (group) (data{test="nan"}) {group="1"} NaN {group="2"} NaN +eval instant at 1m avg by (group) (data{test="nan"}) + {group="1"} NaN + {group="2"} NaN + clear # Test that aggregations are deterministic. diff --git a/promql/promqltest/testdata/functions.test b/promql/promqltest/testdata/functions.test index b8b36d91e..b89d44fce 100644 --- a/promql/promqltest/testdata/functions.test +++ b/promql/promqltest/testdata/functions.test @@ -748,7 +748,6 @@ eval instant at 1m avg_over_time(metric6c[1m]) eval instant at 1m sum_over_time(metric6c[1m])/count_over_time(metric6c[1m]) {} NaN - eval instant at 1m avg_over_time(metric7[1m]) {} NaN @@ -783,6 +782,9 @@ load 10s eval instant at 1m sum_over_time(metric[1m]) {} 2 +eval instant at 1m avg_over_time(metric[1m]) + {} 0.5 + # Tests for stddev_over_time and stdvar_over_time. clear load 10s