diff --git a/promql/functions.go b/promql/functions.go index 50594503d..49ff09678 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -367,7 +367,7 @@ func aggrOverTime(vals []parser.Value, enh *EvalNodeHelper, aggrFn func([]Point) // === avg_over_time(Matrix parser.ValueTypeMatrix) Vector === func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { return aggrOverTime(vals, enh, func(values []Point) float64 { - var mean, count float64 + var mean, count, c float64 for _, v := range values { count++ if math.IsInf(mean, 0) { @@ -387,9 +387,13 @@ func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode continue } } - mean += v.V/count - mean/count + mean, c = kahanSumInc(v.V/count-mean/count, mean, c) } - return mean + + if math.IsInf(mean, 0) { + return mean + } + return mean + c }) } @@ -439,11 +443,14 @@ func funcMinOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode // === sum_over_time(Matrix parser.ValueTypeMatrix) Vector === func funcSumOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { return aggrOverTime(vals, enh, func(values []Point) float64 { - var sum float64 + var sum, c float64 for _, v := range values { - sum += v.V + sum, c = kahanSumInc(v.V, sum, c) } - return sum + if math.IsInf(sum, 0) { + return sum + } + return sum + c }) } @@ -464,28 +471,32 @@ func funcQuantileOverTime(vals []parser.Value, args parser.Expressions, enh *Eva // === stddev_over_time(Matrix parser.ValueTypeMatrix) Vector === func funcStddevOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { return aggrOverTime(vals, enh, func(values []Point) float64 { - var aux, count, mean float64 + var count float64 + var mean, cMean float64 + var aux, cAux float64 for _, v := range values { count++ - delta := v.V - mean - mean += delta / count - aux += delta * (v.V - mean) + delta := v.V - (mean + cMean) + mean, cMean = kahanSumInc(delta/count, mean, cMean) + aux, cAux = kahanSumInc(delta*(v.V-(mean+cMean)), aux, cAux) } - return math.Sqrt(aux / count) + return math.Sqrt((aux + cAux) / count) }) } // === stdvar_over_time(Matrix parser.ValueTypeMatrix) Vector === func funcStdvarOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { return aggrOverTime(vals, enh, func(values []Point) float64 { - var aux, count, mean float64 + var count float64 + var mean, cMean float64 + var aux, cAux float64 for _, v := range values { count++ - delta := v.V - mean - mean += delta / count - aux += delta * (v.V - mean) + delta := v.V - (mean + cMean) + mean, cMean = kahanSumInc(delta/count, mean, cMean) + aux, cAux = kahanSumInc(delta*(v.V-(mean+cMean)), aux, cAux) } - return aux / count + return (aux + cAux) / count }) } @@ -675,23 +686,51 @@ func funcTimestamp(vals []parser.Value, args parser.Expressions, enh *EvalNodeHe return enh.Out } +func kahanSum(samples []float64) float64 { + var sum, c float64 + + for _, v := range samples { + sum, c = kahanSumInc(v, sum, c) + } + return sum + c +} + +func kahanSumInc(inc, sum, c float64) (newSum, newC float64) { + t := sum + inc + // Using Neumaier improvement, swap if next term larger than sum. + if math.Abs(sum) >= math.Abs(inc) { + c += (sum - t) + inc + } else { + c += (inc - t) + sum + } + return t, c +} + // linearRegression performs a least-square linear regression analysis on the // provided SamplePairs. It returns the slope, and the intercept value at the // provided time. func linearRegression(samples []Point, interceptTime int64) (slope, intercept float64) { var ( - n float64 - sumX, sumY float64 - sumXY, sumX2 float64 + n float64 + sumX, cX float64 + sumY, cY float64 + sumXY, cXY float64 + sumX2, cX2 float64 ) for _, sample := range samples { - x := float64(sample.T-interceptTime) / 1e3 n += 1.0 - sumY += sample.V - sumX += x - sumXY += x * sample.V - sumX2 += x * x + x := float64(sample.T-interceptTime) / 1e3 + sumX, cX = kahanSumInc(x, sumX, cX) + sumY, cY = kahanSumInc(sample.V, sumY, cY) + sumXY, cXY = kahanSumInc(x*sample.V, sumXY, cXY) + sumX2, cX2 = kahanSumInc(x*x, sumX2, cX2) } + + sumX = sumX + cX + sumY = sumY + cY + sumXY = sumXY + cXY + sumX2 = sumX2 + cX2 + covXY := sumXY - sumX*sumY/n varX := sumX2 - sumX*sumX/n diff --git a/promql/functions_test.go b/promql/functions_test.go index 5707cbed3..19ee105da 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -15,6 +15,7 @@ package promql import ( "context" + "math" "testing" "time" @@ -71,3 +72,9 @@ func TestFunctionList(t *testing.T) { require.True(t, ok, "function %s exists in parser package, but not in promql package", i) } } + +func TestKahanSum(t *testing.T) { + vals := []float64{1.0, math.Pow(10, 100), 1.0, -1 * math.Pow(10, 100)} + expected := 2.0 + require.Equal(t, expected, kahanSum(vals)) +}