From 577e738986a783ea7c3d9561e10c445fe3d16e47 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Mon, 6 Jan 2020 11:33:36 +0100 Subject: [PATCH] Cleanup PromQL functions (#6551) * Cleanup PromQL functions The engine ensures, for Matrix functions, that functions are called with exactly one series at the time. Therefore a lot of code can be inlined and we can directly assume the first element of the arguments exists and contains all the samples needed. Signed-off-by: Julien Pivotto --- promql/functions.go | 396 ++++++++++++++++++++------------------------ 1 file changed, 180 insertions(+), 216 deletions(-) diff --git a/promql/functions.go b/promql/functions.go index 9fdc5f298..1475bb738 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -66,77 +66,74 @@ func extrapolatedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCou ms := args[0].(*MatrixSelector) var ( - matrix = vals[0].(Matrix) + samples = vals[0].(Matrix)[0] rangeStart = enh.ts - durationMilliseconds(ms.Range+ms.Offset) rangeEnd = enh.ts - durationMilliseconds(ms.Offset) ) - for _, samples := range matrix { - // No sense in trying to compute a rate without at least two points. Drop - // this Vector element. - if len(samples.Points) < 2 { - continue - } - var ( - counterCorrection float64 - lastValue float64 - ) - for _, sample := range samples.Points { - if isCounter && sample.V < lastValue { - counterCorrection += lastValue - } - lastValue = sample.V - } - resultValue := lastValue - samples.Points[0].V + counterCorrection - - // Duration between first/last samples and boundary of range. - durationToStart := float64(samples.Points[0].T-rangeStart) / 1000 - durationToEnd := float64(rangeEnd-samples.Points[len(samples.Points)-1].T) / 1000 - - sampledInterval := float64(samples.Points[len(samples.Points)-1].T-samples.Points[0].T) / 1000 - averageDurationBetweenSamples := sampledInterval / float64(len(samples.Points)-1) - - if isCounter && resultValue > 0 && samples.Points[0].V >= 0 { - // Counters cannot be negative. If we have any slope at - // all (i.e. resultValue went up), we can extrapolate - // the zero point of the counter. If the duration to the - // zero point is shorter than the durationToStart, we - // take the zero point as the start of the series, - // thereby avoiding extrapolation to negative counter - // values. - durationToZero := sampledInterval * (samples.Points[0].V / resultValue) - if durationToZero < durationToStart { - durationToStart = durationToZero - } - } - - // If the first/last samples are close to the boundaries of the range, - // extrapolate the result. This is as we expect that another sample - // will exist given the spacing between samples we've seen thus far, - // with an allowance for noise. - extrapolationThreshold := averageDurationBetweenSamples * 1.1 - extrapolateToInterval := sampledInterval - - if durationToStart < extrapolationThreshold { - extrapolateToInterval += durationToStart - } else { - extrapolateToInterval += averageDurationBetweenSamples / 2 - } - if durationToEnd < extrapolationThreshold { - extrapolateToInterval += durationToEnd - } else { - extrapolateToInterval += averageDurationBetweenSamples / 2 - } - resultValue = resultValue * (extrapolateToInterval / sampledInterval) - if isRate { - resultValue = resultValue / ms.Range.Seconds() - } - - enh.out = append(enh.out, Sample{ - Point: Point{V: resultValue}, - }) + // No sense in trying to compute a rate without at least two points. Drop + // this Vector element. + if len(samples.Points) < 2 { + return enh.out } - return enh.out + var ( + counterCorrection float64 + lastValue float64 + ) + for _, sample := range samples.Points { + if isCounter && sample.V < lastValue { + counterCorrection += lastValue + } + lastValue = sample.V + } + resultValue := lastValue - samples.Points[0].V + counterCorrection + + // Duration between first/last samples and boundary of range. + durationToStart := float64(samples.Points[0].T-rangeStart) / 1000 + durationToEnd := float64(rangeEnd-samples.Points[len(samples.Points)-1].T) / 1000 + + sampledInterval := float64(samples.Points[len(samples.Points)-1].T-samples.Points[0].T) / 1000 + averageDurationBetweenSamples := sampledInterval / float64(len(samples.Points)-1) + + if isCounter && resultValue > 0 && samples.Points[0].V >= 0 { + // Counters cannot be negative. If we have any slope at + // all (i.e. resultValue went up), we can extrapolate + // the zero point of the counter. If the duration to the + // zero point is shorter than the durationToStart, we + // take the zero point as the start of the series, + // thereby avoiding extrapolation to negative counter + // values. + durationToZero := sampledInterval * (samples.Points[0].V / resultValue) + if durationToZero < durationToStart { + durationToStart = durationToZero + } + } + + // If the first/last samples are close to the boundaries of the range, + // extrapolate the result. This is as we expect that another sample + // will exist given the spacing between samples we've seen thus far, + // with an allowance for noise. + extrapolationThreshold := averageDurationBetweenSamples * 1.1 + extrapolateToInterval := sampledInterval + + if durationToStart < extrapolationThreshold { + extrapolateToInterval += durationToStart + } else { + extrapolateToInterval += averageDurationBetweenSamples / 2 + } + if durationToEnd < extrapolationThreshold { + extrapolateToInterval += durationToEnd + } else { + extrapolateToInterval += averageDurationBetweenSamples / 2 + } + resultValue = resultValue * (extrapolateToInterval / sampledInterval) + if isRate { + resultValue = resultValue / ms.Range.Seconds() + } + + return append(enh.out, Sample{ + Point: Point{V: resultValue}, + }) } // === delta(Matrix ValueTypeMatrix) Vector === @@ -165,40 +162,38 @@ func funcIdelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { } func instantValue(vals []Value, out Vector, isRate bool) Vector { - for _, samples := range vals[0].(Matrix) { - // No sense in trying to compute a rate without at least two points. Drop - // this Vector element. - if len(samples.Points) < 2 { - continue - } - - lastSample := samples.Points[len(samples.Points)-1] - previousSample := samples.Points[len(samples.Points)-2] - - var resultValue float64 - if isRate && lastSample.V < previousSample.V { - // Counter reset. - resultValue = lastSample.V - } else { - resultValue = lastSample.V - previousSample.V - } - - sampledInterval := lastSample.T - previousSample.T - if sampledInterval == 0 { - // Avoid dividing by 0. - continue - } - - if isRate { - // Convert to per-second. - resultValue /= float64(sampledInterval) / 1000 - } - - out = append(out, Sample{ - Point: Point{V: resultValue}, - }) + samples := vals[0].(Matrix)[0] + // No sense in trying to compute a rate without at least two points. Drop + // this Vector element. + if len(samples.Points) < 2 { + return out } - return out + + lastSample := samples.Points[len(samples.Points)-1] + previousSample := samples.Points[len(samples.Points)-2] + + var resultValue float64 + if isRate && lastSample.V < previousSample.V { + // Counter reset. + resultValue = lastSample.V + } else { + resultValue = lastSample.V - previousSample.V + } + + sampledInterval := lastSample.T - previousSample.T + if sampledInterval == 0 { + // Avoid dividing by 0. + return out + } + + if isRate { + // Convert to per-second. + resultValue /= float64(sampledInterval) / 1000 + } + + return append(out, Sample{ + Point: Point{V: resultValue}, + }) } // Calculate the trend value at the given index i in raw data d. @@ -223,7 +218,7 @@ func calcTrendValue(i int, sf, tf, s0, s1, b float64) float64 { // how trends in historical data will affect the current data. A higher trend factor increases the influence. // of trends. Algorithm taken from https://en.wikipedia.org/wiki/Exponential_smoothing titled: "Double exponential smoothing". func funcHoltWinters(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { - mat := vals[0].(Matrix) + samples := vals[0].(Matrix)[0] // The smoothing factor argument. sf := vals[1].(Vector)[0].V @@ -239,40 +234,35 @@ func funcHoltWinters(vals []Value, args Expressions, enh *EvalNodeHelper) Vector panic(errors.Errorf("invalid trend factor. Expected: 0 < tf < 1, got: %f", tf)) } - var l int - for _, samples := range mat { - l = len(samples.Points) + l := len(samples.Points) - // Can't do the smoothing operation with less than two points. - if l < 2 { - continue - } - - var s0, s1, b float64 - // Set initial values. - s1 = samples.Points[0].V - b = samples.Points[1].V - samples.Points[0].V - - // Run the smoothing operation. - var x, y float64 - for i := 1; i < l; i++ { - - // Scale the raw value against the smoothing factor. - x = sf * samples.Points[i].V - - // Scale the last smoothed value with the trend at this point. - b = calcTrendValue(i-1, sf, tf, s0, s1, b) - y = (1 - sf) * (s1 + b) - - s0, s1 = s1, x+y - } - - enh.out = append(enh.out, Sample{ - Point: Point{V: s1}, - }) + // Can't do the smoothing operation with less than two points. + if l < 2 { + return enh.out } - return enh.out + var s0, s1, b float64 + // Set initial values. + s1 = samples.Points[0].V + b = samples.Points[1].V - samples.Points[0].V + + // Run the smoothing operation. + var x, y float64 + for i := 1; i < l; i++ { + + // Scale the raw value against the smoothing factor. + x = sf * samples.Points[i].V + + // Scale the last smoothed value with the trend at this point. + b = calcTrendValue(i-1, sf, tf, s0, s1, b) + y = (1 - sf) * (s1 + b) + + s0, s1 = s1, x+y + } + + return append(enh.out, Sample{ + Point: Point{V: s1}, + }) } // === sort(node ValueTypeVector) Vector === @@ -355,18 +345,11 @@ func funcScalar(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { } func aggrOverTime(vals []Value, enh *EvalNodeHelper, aggrFn func([]Point) float64) Vector { - mat := vals[0].(Matrix) + el := vals[0].(Matrix)[0] - for _, el := range mat { - if len(el.Points) == 0 { - continue - } - - enh.out = append(enh.out, Sample{ - Point: Point{V: aggrFn(el.Points)}, - }) - } - return enh.out + return append(enh.out, Sample{ + Point: Point{V: aggrFn(el.Points)}, + }) } // === avg_over_time(Matrix ValueTypeMatrix) Vector === @@ -429,22 +412,15 @@ func funcSumOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector // === quantile_over_time(Matrix ValueTypeMatrix) Vector === func funcQuantileOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { q := vals[0].(Vector)[0].V - mat := vals[1].(Matrix) + el := vals[1].(Matrix)[0] - for _, el := range mat { - if len(el.Points) == 0 { - continue - } - - values := make(vectorByValueHeap, 0, len(el.Points)) - for _, v := range el.Points { - values = append(values, Sample{Point: Point{V: v.V}}) - } - enh.out = append(enh.out, Sample{ - Point: Point{V: quantile(q, values)}, - }) + values := make(vectorByValueHeap, 0, len(el.Points)) + for _, v := range el.Points { + values = append(values, Sample{Point: Point{V: v.V}}) } - return enh.out + return append(enh.out, Sample{ + Point: Point{V: quantile(q, values)}, + }) } // === stddev_over_time(Matrix ValueTypeMatrix) Vector === @@ -588,44 +564,38 @@ func linearRegression(samples []Point, interceptTime int64) (slope, intercept fl // === deriv(node ValueTypeMatrix) Vector === func funcDeriv(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { - mat := vals[0].(Matrix) + samples := vals[0].(Matrix)[0] - for _, samples := range mat { - // No sense in trying to compute a derivative without at least two points. - // Drop this Vector element. - if len(samples.Points) < 2 { - continue - } - - // We pass in an arbitrary timestamp that is near the values in use - // to avoid floating point accuracy issues, see - // https://github.com/prometheus/prometheus/issues/2674 - slope, _ := linearRegression(samples.Points, samples.Points[0].T) - enh.out = append(enh.out, Sample{ - Point: Point{V: slope}, - }) + // No sense in trying to compute a derivative without at least two points. + // Drop this Vector element. + if len(samples.Points) < 2 { + return enh.out } - return enh.out + + // We pass in an arbitrary timestamp that is near the values in use + // to avoid floating point accuracy issues, see + // https://github.com/prometheus/prometheus/issues/2674 + slope, _ := linearRegression(samples.Points, samples.Points[0].T) + return append(enh.out, Sample{ + Point: Point{V: slope}, + }) } // === predict_linear(node ValueTypeMatrix, k ValueTypeScalar) Vector === func funcPredictLinear(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { - mat := vals[0].(Matrix) + samples := vals[0].(Matrix)[0] duration := vals[1].(Vector)[0].V - for _, samples := range mat { - // No sense in trying to predict anything without at least two points. - // Drop this Vector element. - if len(samples.Points) < 2 { - continue - } - slope, intercept := linearRegression(samples.Points, enh.ts) - - enh.out = append(enh.out, Sample{ - Point: Point{V: slope*duration + intercept}, - }) + // No sense in trying to predict anything without at least two points. + // Drop this Vector element. + if len(samples.Points) < 2 { + return enh.out } - return enh.out + slope, intercept := linearRegression(samples.Points, enh.ts) + + return append(enh.out, Sample{ + Point: Point{V: slope*duration + intercept}, + }) } // === histogram_quantile(k ValueTypeScalar, Vector ValueTypeVector) Vector === @@ -678,46 +648,40 @@ func funcHistogramQuantile(vals []Value, args Expressions, enh *EvalNodeHelper) // === resets(Matrix ValueTypeMatrix) Vector === func funcResets(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { - in := vals[0].(Matrix) + samples := vals[0].(Matrix)[0] - for _, samples := range in { - resets := 0 - prev := samples.Points[0].V - for _, sample := range samples.Points[1:] { - current := sample.V - if current < prev { - resets++ - } - prev = current + resets := 0 + prev := samples.Points[0].V + for _, sample := range samples.Points[1:] { + current := sample.V + if current < prev { + resets++ } - - enh.out = append(enh.out, Sample{ - Point: Point{V: float64(resets)}, - }) + prev = current } - return enh.out + + return append(enh.out, Sample{ + Point: Point{V: float64(resets)}, + }) } // === changes(Matrix ValueTypeMatrix) Vector === func funcChanges(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { - in := vals[0].(Matrix) + samples := vals[0].(Matrix)[0] - for _, samples := range in { - changes := 0 - prev := samples.Points[0].V - for _, sample := range samples.Points[1:] { - current := sample.V - if current != prev && !(math.IsNaN(current) && math.IsNaN(prev)) { - changes++ - } - prev = current + changes := 0 + prev := samples.Points[0].V + for _, sample := range samples.Points[1:] { + current := sample.V + if current != prev && !(math.IsNaN(current) && math.IsNaN(prev)) { + changes++ } - - enh.out = append(enh.out, Sample{ - Point: Point{V: float64(changes)}, - }) + prev = current } - return enh.out + + return append(enh.out, Sample{ + Point: Point{V: float64(changes)}, + }) } // === label_replace(Vector ValueTypeVector, dst_label, replacement, src_labelname, regex ValueTypeString) Vector === @@ -1297,7 +1261,7 @@ func createLabelsForAbsentFunction(expr Expr) labels.Labels { } for _, v := range empty { - m = labels.NewBuilder(m).Set(v, "").Labels() + m = labels.NewBuilder(m).Del(v).Labels() } return m }