Optimise topk where k==1 (#9365)

* Add benchmark for query_range with topk

Modify sample data so values within a metric differ

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Optimise topk where k==1

In this case we don't need a heap to keep track of values; just a single
slot is fine.

Simplify the initialization of the heap: since all cases start off as a
single-item heap we can just assign the value directly.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Allow at least one slot in results for topk, quantile

k isn't set for quantile, but we need space to start collecting values

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2021-09-21 11:27:28 +01:00 committed by GitHub
parent 03d084f862
commit 7d105277fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 9 deletions

View File

@ -71,7 +71,7 @@ func BenchmarkRangeQuery(b *testing.B) {
a := storage.Appender(context.Background()) a := storage.Appender(context.Background())
ts := int64(s * 10000) // 10s interval. ts := int64(s * 10000) // 10s interval.
for i, metric := range metrics { for i, metric := range metrics {
ref, _ := a.Append(refs[i], metric, ts, float64(s)) ref, _ := a.Append(refs[i], metric, ts, float64(s)+float64(i)/float64(len(metrics)))
refs[i] = ref refs[i] = ref
} }
if err := a.Commit(); err != nil { if err := a.Commit(); err != nil {
@ -159,6 +159,9 @@ func BenchmarkRangeQuery(b *testing.B) {
{ {
expr: "count_values('value', h_X)", expr: "count_values('value', h_X)",
}, },
{
expr: "topk(1, a_X)",
},
// Combinations. // Combinations.
{ {
expr: "rate(a_X[1m]) + rate(b_X[1m])", expr: "rate(a_X[1m]) + rate(b_X[1m])",

View File

@ -2210,22 +2210,24 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
resultSize := k resultSize := k
if k > inputVecLen { if k > inputVecLen {
resultSize = inputVecLen resultSize = inputVecLen
} else if k == 0 {
resultSize = 1
} }
switch op { switch op {
case parser.STDVAR, parser.STDDEV: case parser.STDVAR, parser.STDDEV:
result[groupingKey].value = 0 result[groupingKey].value = 0
case parser.TOPK, parser.QUANTILE: case parser.TOPK, parser.QUANTILE:
result[groupingKey].heap = make(vectorByValueHeap, 0, resultSize) result[groupingKey].heap = make(vectorByValueHeap, 1, resultSize)
heap.Push(&result[groupingKey].heap, &Sample{ result[groupingKey].heap[0] = Sample{
Point: Point{V: s.V}, Point: Point{V: s.V},
Metric: s.Metric, Metric: s.Metric,
}) }
case parser.BOTTOMK: case parser.BOTTOMK:
result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 0, resultSize) result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 1, resultSize)
heap.Push(&result[groupingKey].reverseHeap, &Sample{ result[groupingKey].reverseHeap[0] = Sample{
Point: Point{V: s.V}, Point: Point{V: s.V},
Metric: s.Metric, Metric: s.Metric,
}) }
case parser.GROUP: case parser.GROUP:
result[groupingKey].value = 1 result[groupingKey].value = 1
} }
@ -2283,6 +2285,13 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
case parser.TOPK: case parser.TOPK:
if int64(len(group.heap)) < k || group.heap[0].V < s.V || math.IsNaN(group.heap[0].V) { if int64(len(group.heap)) < k || group.heap[0].V < s.V || math.IsNaN(group.heap[0].V) {
if int64(len(group.heap)) == k { if int64(len(group.heap)) == k {
if k == 1 { // For k==1 we can replace in-situ.
group.heap[0] = Sample{
Point: Point{V: s.V},
Metric: s.Metric,
}
break
}
heap.Pop(&group.heap) heap.Pop(&group.heap)
} }
heap.Push(&group.heap, &Sample{ heap.Push(&group.heap, &Sample{
@ -2294,6 +2303,13 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
case parser.BOTTOMK: case parser.BOTTOMK:
if int64(len(group.reverseHeap)) < k || group.reverseHeap[0].V > s.V || math.IsNaN(group.reverseHeap[0].V) { if int64(len(group.reverseHeap)) < k || group.reverseHeap[0].V > s.V || math.IsNaN(group.reverseHeap[0].V) {
if int64(len(group.reverseHeap)) == k { if int64(len(group.reverseHeap)) == k {
if k == 1 { // For k==1 we can replace in-situ.
group.reverseHeap[0] = Sample{
Point: Point{V: s.V},
Metric: s.Metric,
}
break
}
heap.Pop(&group.reverseHeap) heap.Pop(&group.reverseHeap)
} }
heap.Push(&group.reverseHeap, &Sample{ heap.Push(&group.reverseHeap, &Sample{
@ -2327,7 +2343,9 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
case parser.TOPK: case parser.TOPK:
// The heap keeps the lowest value on top, so reverse it. // The heap keeps the lowest value on top, so reverse it.
if len(aggr.heap) > 1 {
sort.Sort(sort.Reverse(aggr.heap)) sort.Sort(sort.Reverse(aggr.heap))
}
for _, v := range aggr.heap { for _, v := range aggr.heap {
enh.Out = append(enh.Out, Sample{ enh.Out = append(enh.Out, Sample{
Metric: v.Metric, Metric: v.Metric,
@ -2338,7 +2356,9 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
case parser.BOTTOMK: case parser.BOTTOMK:
// The heap keeps the highest value on top, so reverse it. // The heap keeps the highest value on top, so reverse it.
if len(aggr.reverseHeap) > 1 {
sort.Sort(sort.Reverse(aggr.reverseHeap)) sort.Sort(sort.Reverse(aggr.reverseHeap))
}
for _, v := range aggr.reverseHeap { for _, v := range aggr.reverseHeap {
enh.Out = append(enh.Out, Sample{ enh.Out = append(enh.Out, Sample{
Metric: v.Metric, Metric: v.Metric,