promql: aggregations: skip copying input to a Vector

We can work directly from the inputMatrix on each timestep.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-03-01 16:02:54 +00:00
parent c9b6c4c55a
commit b3bda7df4b
1 changed files with 23 additions and 32 deletions

View File

@ -1311,7 +1311,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
// Keep a copy of the original point slice so that it can be returned to the pool. // Keep a copy of the original point slice so that it can be returned to the pool.
origMatrix := inputMatrix origMatrix := inputMatrix
var vector Vector // Input vectors for the function.
biggestLen := len(inputMatrix) biggestLen := len(inputMatrix)
enh := &EvalNodeHelper{} enh := &EvalNodeHelper{}
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. seriess := make(map[uint64]Series, biggestLen) // Output series by series hash.
@ -1321,7 +1320,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
buf := make([]byte, 0, 1024) buf := make([]byte, 0, 1024)
seriesHelper := make([]EvalSeriesHelper, len(inputMatrix)) seriesHelper := make([]EvalSeriesHelper, len(inputMatrix))
bufHelper := make([]EvalSeriesHelper, len(inputMatrix))
for si, series := range inputMatrix { for si, series := range inputMatrix {
seriesHelper[si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) seriesHelper[si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf)
@ -1333,36 +1331,10 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
} }
// Reset number of samples in memory after each timestamp. // Reset number of samples in memory after each timestamp.
ev.currentSamples = tempNumSamples ev.currentSamples = tempNumSamples
// Gather input vectors for this timestamp.
{
vector = vector[:0]
bufHelper = bufHelper[:0]
for si, series := range inputMatrix {
switch {
case len(series.Floats) > 0 && series.Floats[0].T == ts:
vector = append(vector, Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts})
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
inputMatrix[si].Floats = series.Floats[1:]
case len(series.Histograms) > 0 && series.Histograms[0].T == ts:
vector = append(vector, Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts})
inputMatrix[si].Histograms = series.Histograms[1:]
default:
continue
}
bufHelper = append(bufHelper, seriesHelper[si])
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
}
// Make the function call. // Make the function call.
enh.Ts = ts enh.Ts = ts
result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh, seriess) result, ws := ev.aggregation(aggExpr, sortedGrouping, param, inputMatrix, seriesHelper, enh, seriess)
warnings.Merge(ws) warnings.Merge(ws)
@ -2726,7 +2698,7 @@ type groupedAggregation struct {
// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels // aggregation evaluates an aggregation operation on a Vector. The provided grouping labels
// must be sorted. // must be sorted.
func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, inputMatrix Matrix, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) {
op := e.Op op := e.Op
without := e.Without without := e.Without
var annos annotations.Annotations var annos annotations.Annotations
@ -2748,7 +2720,26 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
q = param.(float64) q = param.(float64)
} }
for si, s := range vec { for si, series := range inputMatrix {
var s Sample
switch {
case len(series.Floats) > 0 && series.Floats[0].T == enh.Ts:
s = Sample{Metric: series.Metric, F: series.Floats[0].F, T: enh.Ts}
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
inputMatrix[si].Floats = series.Floats[1:]
case len(series.Histograms) > 0 && series.Histograms[0].T == enh.Ts:
s = Sample{Metric: series.Metric, H: series.Histograms[0].H, T: enh.Ts}
inputMatrix[si].Histograms = series.Histograms[1:]
default:
continue
}
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
metric := s.Metric metric := s.Metric
groupingKey := seriesHelper[si].groupingKey groupingKey := seriesHelper[si].groupingKey
@ -2775,7 +2766,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
newAgg.groupCount = 0 newAgg.groupCount = 0
} }
inputVecLen := int64(len(vec)) inputVecLen := int64(len(inputMatrix))
resultSize := k resultSize := k
switch { switch {
case k > inputVecLen: case k > inputVecLen: