From b3bda7df4b7d1e5e47edb0bb3b7ea9d3057fefdc Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 1 Mar 2024 16:02:54 +0000 Subject: [PATCH] promql: aggregations: skip copying input to a Vector We can work directly from the inputMatrix on each timestep. Signed-off-by: Bryan Boreham --- promql/engine.go | 55 ++++++++++++++++++++---------------------------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 2e5f620e3..5fb1d849d 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1311,7 +1311,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Keep a copy of the original point slice so that it can be returned to the pool. origMatrix := inputMatrix - var vector Vector // Input vectors for the function. biggestLen := len(inputMatrix) enh := &EvalNodeHelper{} seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. @@ -1321,7 +1320,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping buf := make([]byte, 0, 1024) seriesHelper := make([]EvalSeriesHelper, len(inputMatrix)) - bufHelper := make([]EvalSeriesHelper, len(inputMatrix)) for si, series := range inputMatrix { seriesHelper[si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) @@ -1333,36 +1331,10 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping } // Reset number of samples in memory after each timestamp. ev.currentSamples = tempNumSamples - // Gather input vectors for this timestamp. - { - vector = vector[:0] - bufHelper = bufHelper[:0] - - for si, series := range inputMatrix { - switch { - case len(series.Floats) > 0 && series.Floats[0].T == ts: - vector = append(vector, Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts}) - // Move input vectors forward so we don't have to re-scan the same - // past points at the next step. - inputMatrix[si].Floats = series.Floats[1:] - case len(series.Histograms) > 0 && series.Histograms[0].T == ts: - vector = append(vector, Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts}) - inputMatrix[si].Histograms = series.Histograms[1:] - default: - continue - } - bufHelper = append(bufHelper, seriesHelper[si]) - ev.currentSamples++ - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } - } - ev.samplesStats.UpdatePeak(ev.currentSamples) - } // Make the function call. enh.Ts = ts - result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh, seriess) + result, ws := ev.aggregation(aggExpr, sortedGrouping, param, inputMatrix, seriesHelper, enh, seriess) warnings.Merge(ws) @@ -2726,7 +2698,7 @@ type groupedAggregation struct { // aggregation evaluates an aggregation operation on a Vector. The provided grouping labels // must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, inputMatrix Matrix, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op without := e.Without var annos annotations.Annotations @@ -2748,7 +2720,26 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par q = param.(float64) } - for si, s := range vec { + for si, series := range inputMatrix { + var s Sample + + switch { + case len(series.Floats) > 0 && series.Floats[0].T == enh.Ts: + s = Sample{Metric: series.Metric, F: series.Floats[0].F, T: enh.Ts} + // Move input vectors forward so we don't have to re-scan the same + // past points at the next step. + inputMatrix[si].Floats = series.Floats[1:] + case len(series.Histograms) > 0 && series.Histograms[0].T == enh.Ts: + s = Sample{Metric: series.Metric, H: series.Histograms[0].H, T: enh.Ts} + inputMatrix[si].Histograms = series.Histograms[1:] + default: + continue + } + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + metric := s.Metric groupingKey := seriesHelper[si].groupingKey @@ -2775,7 +2766,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par newAgg.groupCount = 0 } - inputVecLen := int64(len(vec)) + inputVecLen := int64(len(inputMatrix)) resultSize := k switch { case k > inputVecLen: