promql: aggregations: output directly to matrix for instant queries

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-02-29 23:39:29 +00:00
parent 3851b74db1
commit c9b6c4c55a

View File

@ -1313,7 +1313,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
var vector Vector // Input vectors for the function.
biggestLen := len(inputMatrix)
enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)}
enh := &EvalNodeHelper{}
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash.
tempNumSamples := ev.currentSamples
@ -1364,22 +1364,13 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
enh.Ts = ts
result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh, seriess)
enh.Out = result[:0] // Reuse result vector.
warnings.Merge(ws)
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
mat := make(Matrix, len(result))
for i, s := range result {
if s.H == nil {
mat[i] = Series{Metric: s.Metric, Floats: []FPoint{{T: ts, F: s.F}}}
} else {
mat[i] = Series{Metric: s.Metric, Histograms: []HPoint{{T: ts, H: s.H}}}
}
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.currentSamples = originalNumSamples + result.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, warnings
return result, warnings
}
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
@ -2735,7 +2726,7 @@ type groupedAggregation struct {
// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels
// must be sorted.
func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Vector, annotations.Annotations) {
func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) {
op := e.Op
without := e.Without
var annos annotations.Annotations
@ -2749,7 +2740,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
}
k = int64(f)
if k < 1 {
return Vector{}, annos
return nil, annos
}
}
var q float64
@ -2939,10 +2930,19 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
// Construct the result from the aggregated groups.
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
var mat Matrix
if ev.endTimestamp == ev.startTimestamp {
mat = make(Matrix, 0, len(orderedResult))
}
add := func(lbls labels.Labels, f float64, h *histogram.FloatHistogram) {
// If this could be an instant query, build a slice so the result is in consistent order.
// If this could be an instant query, add directly to the matrix so the result is in consistent order.
if ev.endTimestamp == ev.startTimestamp {
enh.Out = append(enh.Out, Sample{Metric: lbls, F: f, H: h})
if h == nil {
mat = append(mat, Series{Metric: lbls, Floats: []FPoint{{T: enh.Ts, F: f}}})
} else {
mat = append(mat, Series{Metric: lbls, Histograms: []HPoint{{T: enh.Ts, H: h}}})
}
} else {
// Otherwise the results are added into seriess elements.
hash := lbls.Hash()
@ -3029,7 +3029,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
add(aggr.labels, aggr.floatValue, aggr.histogramValue)
}
return enh.Out, annos
return mat, annos
}
// aggregationK evaluates count_values on vec.