Cleanup PromQL functions (#6551)

* Cleanup PromQL functions

The engine ensures, for Matrix functions, that functions are called with exactly one series at the time.
Therefore a lot of code can be inlined and we can directly assume the first element of the arguments exists and contains all the samples needed.

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2020-01-06 11:33:36 +01:00 committed by Brian Brazil
parent 536d416299
commit 577e738986

View File

@ -66,77 +66,74 @@ func extrapolatedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCou
ms := args[0].(*MatrixSelector)
var (
matrix = vals[0].(Matrix)
samples = vals[0].(Matrix)[0]
rangeStart = enh.ts - durationMilliseconds(ms.Range+ms.Offset)
rangeEnd = enh.ts - durationMilliseconds(ms.Offset)
)
for _, samples := range matrix {
// No sense in trying to compute a rate without at least two points. Drop
// this Vector element.
if len(samples.Points) < 2 {
continue
}
var (
counterCorrection float64
lastValue float64
)
for _, sample := range samples.Points {
if isCounter && sample.V < lastValue {
counterCorrection += lastValue
}
lastValue = sample.V
}
resultValue := lastValue - samples.Points[0].V + counterCorrection
// Duration between first/last samples and boundary of range.
durationToStart := float64(samples.Points[0].T-rangeStart) / 1000
durationToEnd := float64(rangeEnd-samples.Points[len(samples.Points)-1].T) / 1000
sampledInterval := float64(samples.Points[len(samples.Points)-1].T-samples.Points[0].T) / 1000
averageDurationBetweenSamples := sampledInterval / float64(len(samples.Points)-1)
if isCounter && resultValue > 0 && samples.Points[0].V >= 0 {
// Counters cannot be negative. If we have any slope at
// all (i.e. resultValue went up), we can extrapolate
// the zero point of the counter. If the duration to the
// zero point is shorter than the durationToStart, we
// take the zero point as the start of the series,
// thereby avoiding extrapolation to negative counter
// values.
durationToZero := sampledInterval * (samples.Points[0].V / resultValue)
if durationToZero < durationToStart {
durationToStart = durationToZero
}
}
// If the first/last samples are close to the boundaries of the range,
// extrapolate the result. This is as we expect that another sample
// will exist given the spacing between samples we've seen thus far,
// with an allowance for noise.
extrapolationThreshold := averageDurationBetweenSamples * 1.1
extrapolateToInterval := sampledInterval
if durationToStart < extrapolationThreshold {
extrapolateToInterval += durationToStart
} else {
extrapolateToInterval += averageDurationBetweenSamples / 2
}
if durationToEnd < extrapolationThreshold {
extrapolateToInterval += durationToEnd
} else {
extrapolateToInterval += averageDurationBetweenSamples / 2
}
resultValue = resultValue * (extrapolateToInterval / sampledInterval)
if isRate {
resultValue = resultValue / ms.Range.Seconds()
}
enh.out = append(enh.out, Sample{
Point: Point{V: resultValue},
})
// No sense in trying to compute a rate without at least two points. Drop
// this Vector element.
if len(samples.Points) < 2 {
return enh.out
}
return enh.out
var (
counterCorrection float64
lastValue float64
)
for _, sample := range samples.Points {
if isCounter && sample.V < lastValue {
counterCorrection += lastValue
}
lastValue = sample.V
}
resultValue := lastValue - samples.Points[0].V + counterCorrection
// Duration between first/last samples and boundary of range.
durationToStart := float64(samples.Points[0].T-rangeStart) / 1000
durationToEnd := float64(rangeEnd-samples.Points[len(samples.Points)-1].T) / 1000
sampledInterval := float64(samples.Points[len(samples.Points)-1].T-samples.Points[0].T) / 1000
averageDurationBetweenSamples := sampledInterval / float64(len(samples.Points)-1)
if isCounter && resultValue > 0 && samples.Points[0].V >= 0 {
// Counters cannot be negative. If we have any slope at
// all (i.e. resultValue went up), we can extrapolate
// the zero point of the counter. If the duration to the
// zero point is shorter than the durationToStart, we
// take the zero point as the start of the series,
// thereby avoiding extrapolation to negative counter
// values.
durationToZero := sampledInterval * (samples.Points[0].V / resultValue)
if durationToZero < durationToStart {
durationToStart = durationToZero
}
}
// If the first/last samples are close to the boundaries of the range,
// extrapolate the result. This is as we expect that another sample
// will exist given the spacing between samples we've seen thus far,
// with an allowance for noise.
extrapolationThreshold := averageDurationBetweenSamples * 1.1
extrapolateToInterval := sampledInterval
if durationToStart < extrapolationThreshold {
extrapolateToInterval += durationToStart
} else {
extrapolateToInterval += averageDurationBetweenSamples / 2
}
if durationToEnd < extrapolationThreshold {
extrapolateToInterval += durationToEnd
} else {
extrapolateToInterval += averageDurationBetweenSamples / 2
}
resultValue = resultValue * (extrapolateToInterval / sampledInterval)
if isRate {
resultValue = resultValue / ms.Range.Seconds()
}
return append(enh.out, Sample{
Point: Point{V: resultValue},
})
}
// === delta(Matrix ValueTypeMatrix) Vector ===
@ -165,40 +162,38 @@ func funcIdelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
}
func instantValue(vals []Value, out Vector, isRate bool) Vector {
for _, samples := range vals[0].(Matrix) {
// No sense in trying to compute a rate without at least two points. Drop
// this Vector element.
if len(samples.Points) < 2 {
continue
}
lastSample := samples.Points[len(samples.Points)-1]
previousSample := samples.Points[len(samples.Points)-2]
var resultValue float64
if isRate && lastSample.V < previousSample.V {
// Counter reset.
resultValue = lastSample.V
} else {
resultValue = lastSample.V - previousSample.V
}
sampledInterval := lastSample.T - previousSample.T
if sampledInterval == 0 {
// Avoid dividing by 0.
continue
}
if isRate {
// Convert to per-second.
resultValue /= float64(sampledInterval) / 1000
}
out = append(out, Sample{
Point: Point{V: resultValue},
})
samples := vals[0].(Matrix)[0]
// No sense in trying to compute a rate without at least two points. Drop
// this Vector element.
if len(samples.Points) < 2 {
return out
}
return out
lastSample := samples.Points[len(samples.Points)-1]
previousSample := samples.Points[len(samples.Points)-2]
var resultValue float64
if isRate && lastSample.V < previousSample.V {
// Counter reset.
resultValue = lastSample.V
} else {
resultValue = lastSample.V - previousSample.V
}
sampledInterval := lastSample.T - previousSample.T
if sampledInterval == 0 {
// Avoid dividing by 0.
return out
}
if isRate {
// Convert to per-second.
resultValue /= float64(sampledInterval) / 1000
}
return append(out, Sample{
Point: Point{V: resultValue},
})
}
// Calculate the trend value at the given index i in raw data d.
@ -223,7 +218,7 @@ func calcTrendValue(i int, sf, tf, s0, s1, b float64) float64 {
// how trends in historical data will affect the current data. A higher trend factor increases the influence.
// of trends. Algorithm taken from https://en.wikipedia.org/wiki/Exponential_smoothing titled: "Double exponential smoothing".
func funcHoltWinters(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
mat := vals[0].(Matrix)
samples := vals[0].(Matrix)[0]
// The smoothing factor argument.
sf := vals[1].(Vector)[0].V
@ -239,40 +234,35 @@ func funcHoltWinters(vals []Value, args Expressions, enh *EvalNodeHelper) Vector
panic(errors.Errorf("invalid trend factor. Expected: 0 < tf < 1, got: %f", tf))
}
var l int
for _, samples := range mat {
l = len(samples.Points)
l := len(samples.Points)
// Can't do the smoothing operation with less than two points.
if l < 2 {
continue
}
var s0, s1, b float64
// Set initial values.
s1 = samples.Points[0].V
b = samples.Points[1].V - samples.Points[0].V
// Run the smoothing operation.
var x, y float64
for i := 1; i < l; i++ {
// Scale the raw value against the smoothing factor.
x = sf * samples.Points[i].V
// Scale the last smoothed value with the trend at this point.
b = calcTrendValue(i-1, sf, tf, s0, s1, b)
y = (1 - sf) * (s1 + b)
s0, s1 = s1, x+y
}
enh.out = append(enh.out, Sample{
Point: Point{V: s1},
})
// Can't do the smoothing operation with less than two points.
if l < 2 {
return enh.out
}
return enh.out
var s0, s1, b float64
// Set initial values.
s1 = samples.Points[0].V
b = samples.Points[1].V - samples.Points[0].V
// Run the smoothing operation.
var x, y float64
for i := 1; i < l; i++ {
// Scale the raw value against the smoothing factor.
x = sf * samples.Points[i].V
// Scale the last smoothed value with the trend at this point.
b = calcTrendValue(i-1, sf, tf, s0, s1, b)
y = (1 - sf) * (s1 + b)
s0, s1 = s1, x+y
}
return append(enh.out, Sample{
Point: Point{V: s1},
})
}
// === sort(node ValueTypeVector) Vector ===
@ -355,18 +345,11 @@ func funcScalar(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
}
func aggrOverTime(vals []Value, enh *EvalNodeHelper, aggrFn func([]Point) float64) Vector {
mat := vals[0].(Matrix)
el := vals[0].(Matrix)[0]
for _, el := range mat {
if len(el.Points) == 0 {
continue
}
enh.out = append(enh.out, Sample{
Point: Point{V: aggrFn(el.Points)},
})
}
return enh.out
return append(enh.out, Sample{
Point: Point{V: aggrFn(el.Points)},
})
}
// === avg_over_time(Matrix ValueTypeMatrix) Vector ===
@ -429,22 +412,15 @@ func funcSumOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector
// === quantile_over_time(Matrix ValueTypeMatrix) Vector ===
func funcQuantileOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
q := vals[0].(Vector)[0].V
mat := vals[1].(Matrix)
el := vals[1].(Matrix)[0]
for _, el := range mat {
if len(el.Points) == 0 {
continue
}
values := make(vectorByValueHeap, 0, len(el.Points))
for _, v := range el.Points {
values = append(values, Sample{Point: Point{V: v.V}})
}
enh.out = append(enh.out, Sample{
Point: Point{V: quantile(q, values)},
})
values := make(vectorByValueHeap, 0, len(el.Points))
for _, v := range el.Points {
values = append(values, Sample{Point: Point{V: v.V}})
}
return enh.out
return append(enh.out, Sample{
Point: Point{V: quantile(q, values)},
})
}
// === stddev_over_time(Matrix ValueTypeMatrix) Vector ===
@ -588,44 +564,38 @@ func linearRegression(samples []Point, interceptTime int64) (slope, intercept fl
// === deriv(node ValueTypeMatrix) Vector ===
func funcDeriv(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
mat := vals[0].(Matrix)
samples := vals[0].(Matrix)[0]
for _, samples := range mat {
// No sense in trying to compute a derivative without at least two points.
// Drop this Vector element.
if len(samples.Points) < 2 {
continue
}
// We pass in an arbitrary timestamp that is near the values in use
// to avoid floating point accuracy issues, see
// https://github.com/prometheus/prometheus/issues/2674
slope, _ := linearRegression(samples.Points, samples.Points[0].T)
enh.out = append(enh.out, Sample{
Point: Point{V: slope},
})
// No sense in trying to compute a derivative without at least two points.
// Drop this Vector element.
if len(samples.Points) < 2 {
return enh.out
}
return enh.out
// We pass in an arbitrary timestamp that is near the values in use
// to avoid floating point accuracy issues, see
// https://github.com/prometheus/prometheus/issues/2674
slope, _ := linearRegression(samples.Points, samples.Points[0].T)
return append(enh.out, Sample{
Point: Point{V: slope},
})
}
// === predict_linear(node ValueTypeMatrix, k ValueTypeScalar) Vector ===
func funcPredictLinear(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
mat := vals[0].(Matrix)
samples := vals[0].(Matrix)[0]
duration := vals[1].(Vector)[0].V
for _, samples := range mat {
// No sense in trying to predict anything without at least two points.
// Drop this Vector element.
if len(samples.Points) < 2 {
continue
}
slope, intercept := linearRegression(samples.Points, enh.ts)
enh.out = append(enh.out, Sample{
Point: Point{V: slope*duration + intercept},
})
// No sense in trying to predict anything without at least two points.
// Drop this Vector element.
if len(samples.Points) < 2 {
return enh.out
}
return enh.out
slope, intercept := linearRegression(samples.Points, enh.ts)
return append(enh.out, Sample{
Point: Point{V: slope*duration + intercept},
})
}
// === histogram_quantile(k ValueTypeScalar, Vector ValueTypeVector) Vector ===
@ -678,46 +648,40 @@ func funcHistogramQuantile(vals []Value, args Expressions, enh *EvalNodeHelper)
// === resets(Matrix ValueTypeMatrix) Vector ===
func funcResets(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
in := vals[0].(Matrix)
samples := vals[0].(Matrix)[0]
for _, samples := range in {
resets := 0
prev := samples.Points[0].V
for _, sample := range samples.Points[1:] {
current := sample.V
if current < prev {
resets++
}
prev = current
resets := 0
prev := samples.Points[0].V
for _, sample := range samples.Points[1:] {
current := sample.V
if current < prev {
resets++
}
enh.out = append(enh.out, Sample{
Point: Point{V: float64(resets)},
})
prev = current
}
return enh.out
return append(enh.out, Sample{
Point: Point{V: float64(resets)},
})
}
// === changes(Matrix ValueTypeMatrix) Vector ===
func funcChanges(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
in := vals[0].(Matrix)
samples := vals[0].(Matrix)[0]
for _, samples := range in {
changes := 0
prev := samples.Points[0].V
for _, sample := range samples.Points[1:] {
current := sample.V
if current != prev && !(math.IsNaN(current) && math.IsNaN(prev)) {
changes++
}
prev = current
changes := 0
prev := samples.Points[0].V
for _, sample := range samples.Points[1:] {
current := sample.V
if current != prev && !(math.IsNaN(current) && math.IsNaN(prev)) {
changes++
}
enh.out = append(enh.out, Sample{
Point: Point{V: float64(changes)},
})
prev = current
}
return enh.out
return append(enh.out, Sample{
Point: Point{V: float64(changes)},
})
}
// === label_replace(Vector ValueTypeVector, dst_label, replacement, src_labelname, regex ValueTypeString) Vector ===
@ -1297,7 +1261,7 @@ func createLabelsForAbsentFunction(expr Expr) labels.Labels {
}
for _, v := range empty {
m = labels.NewBuilder(m).Set(v, "").Labels()
m = labels.NewBuilder(m).Del(v).Labels()
}
return m
}