From 9a837b7f3c1b6d5ef75cb36808bea9189eced911 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 4 Jul 2024 15:51:41 +0200 Subject: [PATCH 1/6] promql: Make groupedAggregation.groupCount a float64 It's always used as such. Let's avoid the countless conversions. Signed-off-by: beorn7 --- promql/engine.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index bf19aac8b..7c84a0a27 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2779,7 +2779,7 @@ type groupedAggregation struct { floatValue float64 histogramValue *histogram.FloatHistogram floatMean float64 // Mean, or "compensating value" for Kahan summation. - groupCount int + groupCount float64 groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group heap vectorByValueHeap } @@ -2855,8 +2855,8 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix if h != nil { group.hasHistogram = true if group.histogramValue != nil { - left := h.Copy().Div(float64(group.groupCount)) - right := group.histogramValue.Copy().Div(float64(group.groupCount)) + left := h.Copy().Div(group.groupCount) + right := group.histogramValue.Copy().Div(group.groupCount) toAdd, err := left.Sub(right) if err != nil { handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) @@ -2889,7 +2889,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix } } // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. - group.floatMean += f/float64(group.groupCount) - group.floatMean/float64(group.groupCount) + group.floatMean += f/group.groupCount - group.floatMean/group.groupCount } case parser.GROUP: @@ -2912,7 +2912,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix if h == nil { // Ignore native histograms. group.groupCount++ delta := f - group.floatMean - group.floatMean += delta / float64(group.groupCount) + group.floatMean += delta / group.groupCount group.floatValue += delta * (f - group.floatMean) } @@ -2945,13 +2945,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix } case parser.COUNT: - aggr.floatValue = float64(aggr.groupCount) + aggr.floatValue = aggr.groupCount case parser.STDVAR: - aggr.floatValue /= float64(aggr.groupCount) + aggr.floatValue /= aggr.groupCount case parser.STDDEV: - aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount)) + aggr.floatValue = math.Sqrt(aggr.floatValue / aggr.groupCount) case parser.QUANTILE: aggr.floatValue = quantile(q, aggr.heap) From 44d8c1d1828e063a8d695681826f1f6e13f0f88c Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 4 Jul 2024 15:52:48 +0200 Subject: [PATCH 2/6] nit: add period at end of sentence Signed-off-by: beorn7 --- promql/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/promql/engine.go b/promql/engine.go index 7c84a0a27..06a26e377 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2780,7 +2780,7 @@ type groupedAggregation struct { histogramValue *histogram.FloatHistogram floatMean float64 // Mean, or "compensating value" for Kahan summation. groupCount float64 - groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group + groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group. heap vectorByValueHeap } From b5b04ddbe3733a296a531f2a54462ab5529062a7 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 4 Jul 2024 18:58:41 +0200 Subject: [PATCH 3/6] promql: add avg aggregation benchmark Signed-off-by: beorn7 --- promql/bench_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/promql/bench_test.go b/promql/bench_test.go index bd6728029..33523b2db 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -165,6 +165,9 @@ func rangeQueryCases() []benchCase { { expr: "sum(a_X)", }, + { + expr: "avg(a_X)", + }, { expr: "sum without (l)(h_X)", }, From c46074f4dda17dda1e93d79ed8e6e015bb222bdb Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 4 Jul 2024 15:13:35 +0200 Subject: [PATCH 4/6] promql: make avg aggregation more precise and less expensive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The basic idea here is that the previous code was always doing incremental calculation of the mean value, which is more costly and can be less precise. It protects against overflows, but in most cases, an overflow doesn't happen anyway. The other idea applied here is to expand on #14074, where Kahan summation was applied to sum(). With this commit, the average is calculated in a conventional way (adding everything up and divide in the end) as long as the sum isn't overflowing float64. This is combined with Kahan summation so that the avg aggregation, in most cases, is really equivalent to the sum aggregation with a following division (which is the user's expectation as avg is supposed to be syntactic sugar for sum with a following divison). If the sum hits ±Inf, the calculation reverts to incremental calculation of the mean value. Kahan summation is also applied here, although it cannot fully compensate for the numerical errors introduced by the incremental mean calculation. (The tests added in this commit would fail if incremental mean calculation was always used.) Signed-off-by: beorn7 --- promql/engine.go | 59 +++++++++++++++------ promql/promqltest/testdata/aggregators.test | 20 ++++++- 2 files changed, 61 insertions(+), 18 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 06a26e377..8a0aa23f6 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2773,15 +2773,19 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram } type groupedAggregation struct { + floatValue float64 + histogramValue *histogram.FloatHistogram + floatMean float64 + floatKahanC float64 // "Compensating value" for Kahan summation. + groupCount float64 + heap vectorByValueHeap + + // All bools together for better packing within the struct. seen bool // Was this output groups seen in the input at this timestamp. hasFloat bool // Has at least 1 float64 sample aggregated. hasHistogram bool // Has at least 1 histogram sample aggregated. - floatValue float64 - histogramValue *histogram.FloatHistogram - floatMean float64 // Mean, or "compensating value" for Kahan summation. - groupCount float64 groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group. - heap vectorByValueHeap + incrementalMean bool // True after reverting to incremental calculation of the mean value. } // aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. @@ -2807,13 +2811,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix *group = groupedAggregation{ seen: true, floatValue: f, + floatMean: f, groupCount: 1, } switch op { - case parser.AVG: - group.floatMean = f - fallthrough - case parser.SUM: + case parser.AVG, parser.SUM: if h == nil { group.hasFloat = true } else { @@ -2821,7 +2823,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix group.hasHistogram = true } case parser.STDVAR, parser.STDDEV: - group.floatMean = f group.floatValue = 0 case parser.QUANTILE: group.heap = make(vectorByValueHeap, 1) @@ -2847,7 +2848,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // point in copying the histogram in that case. } else { group.hasFloat = true - group.floatValue, group.floatMean = kahanSumInc(f, group.floatValue, group.floatMean) + group.floatValue, group.floatKahanC = kahanSumInc(f, group.floatValue, group.floatKahanC) } case parser.AVG: @@ -2871,6 +2872,22 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // point in copying the histogram in that case. } else { group.hasFloat = true + if !group.incrementalMean { + newV, newC := kahanSumInc(f, group.floatValue, group.floatKahanC) + if !math.IsInf(newV, 0) { + // The sum doesn't overflow, so we propagate it to the + // group struct and continue with the regular + // calculation of the mean value. + group.floatValue, group.floatKahanC = newV, newC + break + } + // If we are here, we know that the sum _would_ overflow. So + // instead of continue to sum up, we revert to incremental + // calculation of the mean value from here on. + group.incrementalMean = true + group.floatMean = group.floatValue / (group.groupCount - 1) + group.floatKahanC /= group.groupCount - 1 + } if math.IsInf(group.floatMean, 0) { if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) { // The `floatMean` and `s.F` values are `Inf` of the same sign. They @@ -2888,8 +2905,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix break } } - // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. - group.floatMean += f/group.groupCount - group.floatMean/group.groupCount + currentMean := group.floatMean + group.floatKahanC + group.floatMean, group.floatKahanC = kahanSumInc( + // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. + f/group.groupCount-currentMean/group.groupCount, + group.floatMean, + group.floatKahanC, + ) } case parser.GROUP: @@ -2938,10 +2960,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) continue } - if aggr.hasHistogram { + switch { + case aggr.hasHistogram: aggr.histogramValue = aggr.histogramValue.Compact(0) - } else { - aggr.floatValue = aggr.floatMean + case aggr.incrementalMean: + aggr.floatValue = aggr.floatMean + aggr.floatKahanC + default: + aggr.floatValue = (aggr.floatValue + aggr.floatKahanC) / aggr.groupCount } case parser.COUNT: @@ -2965,7 +2990,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix if aggr.hasHistogram { aggr.histogramValue.Compact(0) } else { - aggr.floatValue += aggr.floatMean // Add Kahan summation compensating term. + aggr.floatValue += aggr.floatKahanC } default: // For other aggregations, we already have the right value. diff --git a/promql/promqltest/testdata/aggregators.test b/promql/promqltest/testdata/aggregators.test index cbb255a12..68d2e735b 100644 --- a/promql/promqltest/testdata/aggregators.test +++ b/promql/promqltest/testdata/aggregators.test @@ -503,7 +503,7 @@ eval instant at 1m avg(data{test="-big"}) eval instant at 1m avg(data{test="bigzero"}) {} 0 -# Test summing extreme values. +# Test summing and averaging extreme values. clear load 10s @@ -529,21 +529,39 @@ load 10s eval instant at 1m sum(data{test="ten"}) {} 10 +eval instant at 1m avg(data{test="ten"}) + {} 2.5 + eval instant at 1m sum by (group) (data{test="pos_inf"}) {group="1"} Inf {group="2"} Inf +eval instant at 1m avg by (group) (data{test="pos_inf"}) + {group="1"} Inf + {group="2"} Inf + eval instant at 1m sum by (group) (data{test="neg_inf"}) {group="1"} -Inf {group="2"} -Inf +eval instant at 1m avg by (group) (data{test="neg_inf"}) + {group="1"} -Inf + {group="2"} -Inf + eval instant at 1m sum(data{test="inf_inf"}) {} NaN +eval instant at 1m avg(data{test="inf_inf"}) + {} NaN + eval instant at 1m sum by (group) (data{test="nan"}) {group="1"} NaN {group="2"} NaN +eval instant at 1m avg by (group) (data{test="nan"}) + {group="1"} NaN + {group="2"} NaN + clear # Test that aggregations are deterministic. From 3a908d8e088763e6df7f7a5f74345ada30c9b0bb Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 4 Jul 2024 18:36:32 +0200 Subject: [PATCH 5/6] promql: Improve Kahan usage in avg_over_time The calculation of the mean value in avg_over_time is performed in an incremental fashion. This introduces additional numerical errors that even Kahan summation cannot compensate, but at least we can use the Kahan-corrected mean value when we use the intermediate mean value in the calculation. Signed-off-by: beorn7 --- promql/functions.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/promql/functions.go b/promql/functions.go index dcc2cd759..575f8302d 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -593,7 +593,8 @@ func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode continue } } - mean, c = kahanSumInc(f.F/count-mean/count, mean, c) + correctedMean := mean + c + mean, c = kahanSumInc(f.F/count-correctedMean/count, mean, c) } if math.IsInf(mean, 0) { From cff0429b1ada726887f1fa717f44b63f72d45877 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 4 Jul 2024 18:47:52 +0200 Subject: [PATCH 6/6] promql: make avg_over_time faster and more precise Same idea as for the avg aggregator before: Most of the time, there is no overflow, so we don't have to revert to the more expensive and less precise incremental calculation of the mean value. Signed-off-by: beorn7 --- promql/functions.go | 32 ++++++++++++++++++----- promql/promqltest/testdata/functions.test | 4 ++- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/promql/functions.go b/promql/functions.go index 575f8302d..ca987545d 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -573,9 +573,28 @@ func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode return vec, nil } return aggrOverTime(vals, enh, func(s Series) float64 { - var mean, count, c float64 + var ( + sum, mean, count, kahanC float64 + incrementalMean bool + ) for _, f := range s.Floats { count++ + if !incrementalMean { + newSum, newC := kahanSumInc(f.F, sum, kahanC) + // Perform regular mean calculation as long as + // the sum doesn't overflow and (in any case) + // for the first iteration (even if we start + // with ±Inf) to not run into division-by-zero + // problems below. + if count == 1 || !math.IsInf(newSum, 0) { + sum, kahanC = newSum, newC + continue + } + // Handle overflow by reverting to incremental calculation of the mean value. + incrementalMean = true + mean = sum / (count - 1) + kahanC /= count - 1 + } if math.IsInf(mean, 0) { if math.IsInf(f.F, 0) && (mean > 0) == (f.F > 0) { // The `mean` and `f.F` values are `Inf` of the same sign. They @@ -593,14 +612,13 @@ func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode continue } } - correctedMean := mean + c - mean, c = kahanSumInc(f.F/count-correctedMean/count, mean, c) + correctedMean := mean + kahanC + mean, kahanC = kahanSumInc(f.F/count-correctedMean/count, mean, kahanC) } - - if math.IsInf(mean, 0) { - return mean + if incrementalMean { + return mean + kahanC } - return mean + c + return (sum + kahanC) / count }), nil } diff --git a/promql/promqltest/testdata/functions.test b/promql/promqltest/testdata/functions.test index 718e001c3..290beb5b9 100644 --- a/promql/promqltest/testdata/functions.test +++ b/promql/promqltest/testdata/functions.test @@ -737,7 +737,6 @@ eval instant at 1m avg_over_time(metric6c[1m]) eval instant at 1m sum_over_time(metric6c[1m])/count_over_time(metric6c[1m]) {} NaN - eval instant at 1m avg_over_time(metric7[1m]) {} NaN @@ -772,6 +771,9 @@ load 10s eval instant at 1m sum_over_time(metric[1m]) {} 2 +eval instant at 1m avg_over_time(metric[1m]) + {} 0.5 + # Tests for stddev_over_time and stdvar_over_time. clear load 10s