diff --git a/promql/bench_test.go b/promql/bench_test.go index 8abfcfdd2..88025d932 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -157,6 +157,9 @@ func rangeQueryCases() []benchCase { { expr: "topk(1, a_X)", }, + { + expr: "topk(5, a_X)", + }, // Combinations. { expr: "rate(a_X[1m]) + rate(b_X[1m])", diff --git a/promql/engine.go b/promql/engine.go index a93bd4fc9..ad30b1550 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2507,39 +2507,39 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without group.value += delta * (s.V - group.mean) case parser.TOPK: - if int64(len(group.heap)) < k || group.heap[0].V < s.V || math.IsNaN(group.heap[0].V) { - if int64(len(group.heap)) == k { - if k == 1 { // For k==1 we can replace in-situ. - group.heap[0] = Sample{ - Point: Point{V: s.V}, - Metric: s.Metric, - } - break - } - heap.Pop(&group.heap) - } + // We build a heap of up to k elements, with the smallest element at heap[0]. + if int64(len(group.heap)) < k { heap.Push(&group.heap, &Sample{ Point: Point{V: s.V}, Metric: s.Metric, }) + } else if group.heap[0].V < s.V || (math.IsNaN(group.heap[0].V) && !math.IsNaN(s.V)) { + // This new element is bigger than the previous smallest element - overwrite that. + group.heap[0] = Sample{ + Point: Point{V: s.V}, + Metric: s.Metric, + } + if k > 1 { + heap.Fix(&group.heap, 0) // Maintain the heap invariant. + } } case parser.BOTTOMK: - if int64(len(group.reverseHeap)) < k || group.reverseHeap[0].V > s.V || math.IsNaN(group.reverseHeap[0].V) { - if int64(len(group.reverseHeap)) == k { - if k == 1 { // For k==1 we can replace in-situ. - group.reverseHeap[0] = Sample{ - Point: Point{V: s.V}, - Metric: s.Metric, - } - break - } - heap.Pop(&group.reverseHeap) - } + // We build a heap of up to k elements, with the biggest element at heap[0]. + if int64(len(group.reverseHeap)) < k { heap.Push(&group.reverseHeap, &Sample{ Point: Point{V: s.V}, Metric: s.Metric, }) + } else if group.reverseHeap[0].V > s.V || (math.IsNaN(group.reverseHeap[0].V) && !math.IsNaN(s.V)) { + // This new element is smaller than the previous biggest element - overwrite that. + group.reverseHeap[0] = Sample{ + Point: Point{V: s.V}, + Metric: s.Metric, + } + if k > 1 { + heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant. + } } case parser.QUANTILE: