Implement topk() and bottomk() functions.

To achieve O(log n * k) runtime, this uses a heap to track the current
bottom-k or top-k elements while iterating over the full set of
available elements.

It would be possible to reuse more code between topk and bottomk, but I
decided for some more duplication for the sake of clarity.

This fixes https://github.com/prometheus/prometheus/issues/399

Change-Id: I7487ddaadbe7acb22ca2cf2283ba6e7915f2b336
This commit is contained in:
Julius Volz 2014-08-05 18:57:47 +02:00 committed by Bjoern Rabenstein
parent 1909686789
commit c582ae73c2
3 changed files with 112 additions and 54 deletions

View File

@ -14,6 +14,7 @@
package ast
import (
"container/heap"
"fmt"
"math"
"sort"
@ -22,7 +23,6 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility"
)
// Function represents a function of the expression language and is
@ -138,7 +138,7 @@ func deltaImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node)
return resultVector
}
// === rate(node *MatrixNode) Vector ===
// === rate(node MatrixNode) Vector ===
func rateImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
args = append(args, &ScalarLiteral{value: 1})
vector := deltaImpl(timestamp, view, args).(Vector)
@ -153,40 +153,97 @@ func rateImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) i
return vector
}
type vectorByValueSorter struct {
vector Vector
type vectorByValueHeap Vector
func (s vectorByValueHeap) Len() int {
return len(s)
}
func (sorter vectorByValueSorter) Len() int {
return len(sorter.vector)
func (s vectorByValueHeap) Less(i, j int) bool {
return s[i].Value < s[j].Value
}
func (sorter vectorByValueSorter) Less(i, j int) bool {
return sorter.vector[i].Value < sorter.vector[j].Value
func (s vectorByValueHeap) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (sorter vectorByValueSorter) Swap(i, j int) {
sorter.vector[i], sorter.vector[j] = sorter.vector[j], sorter.vector[i]
func (s *vectorByValueHeap) Push(x interface{}) {
*s = append(*s, x.(*clientmodel.Sample))
}
// === sort(node *VectorNode) Vector ===
func (s *vectorByValueHeap) Pop() interface{} {
old := *s
n := len(old)
el := old[n-1]
*s = old[0 : n-1]
return el
}
type reverseHeap struct {
heap.Interface
}
func (s reverseHeap) Less(i, j int) bool {
return s.Interface.Less(j, i)
}
// === sort(node VectorNode) Vector ===
func sortImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
byValueSorter := vectorByValueSorter{
vector: args[0].(VectorNode).Eval(timestamp, view),
}
byValueSorter := vectorByValueHeap(args[0].(VectorNode).Eval(timestamp, view))
sort.Sort(byValueSorter)
return byValueSorter.vector
return Vector(byValueSorter)
}
// === sortDesc(node *VectorNode) Vector ===
// === sortDesc(node VectorNode) Vector ===
func sortDescImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
descByValueSorter := utility.ReverseSorter{
Interface: vectorByValueSorter{
vector: args[0].(VectorNode).Eval(timestamp, view),
},
byValueSorter := vectorByValueHeap(args[0].(VectorNode).Eval(timestamp, view))
sort.Sort(sort.Reverse(byValueSorter))
return Vector(byValueSorter)
}
// === topk(k ScalarNode, node VectorNode) Vector ===
func topkImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
k := int(args[0].(ScalarNode).Eval(timestamp, view))
if k < 1 {
return Vector{}
}
sort.Sort(descByValueSorter)
return descByValueSorter.Interface.(vectorByValueSorter).vector
topk := make(vectorByValueHeap, 0, k)
vector := args[1].(VectorNode).Eval(timestamp, view)
for _, el := range vector {
if len(topk) < k || topk[0].Value < el.Value {
if len(topk) == k {
heap.Pop(&topk)
}
heap.Push(&topk, el)
}
}
sort.Sort(sort.Reverse(topk))
return Vector(topk)
}
// === bottomk(k ScalarNode, node VectorNode) Vector ===
func bottomkImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
k := int(args[0].(ScalarNode).Eval(timestamp, view))
if k < 1 {
return Vector{}
}
bottomk := make(vectorByValueHeap, 0, k)
bkHeap := reverseHeap{Interface: &bottomk}
vector := args[1].(VectorNode).Eval(timestamp, view)
for _, el := range vector {
if len(bottomk) < k || bottomk[0].Value > el.Value {
if len(bottomk) == k {
heap.Pop(&bkHeap)
}
heap.Push(&bkHeap, el)
}
}
sort.Sort(bottomk)
return Vector(bottomk)
}
// === sampleVectorImpl() Vector ===
@ -262,7 +319,7 @@ func sampleVectorImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args [
}
}
// === scalar(node *VectorNode) Scalar ===
// === scalar(node VectorNode) Scalar ===
func scalarImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
v := args[0].(VectorNode).Eval(timestamp, view)
if len(v) != 1 {
@ -369,6 +426,12 @@ var functions = map[string]*Function{
returnType: VECTOR,
callFn: avgOverTimeImpl,
},
"bottomk": {
name: "bottomk",
argTypes: []ExprType{SCALAR, VECTOR},
returnType: VECTOR,
callFn: bottomkImpl,
},
"count_over_time": {
name: "count_over_time",
argTypes: []ExprType{MATRIX},
@ -441,6 +504,12 @@ var functions = map[string]*Function{
returnType: SCALAR,
callFn: timeImpl,
},
"topk": {
name: "topk",
argTypes: []ExprType{SCALAR, VECTOR},
returnType: VECTOR,
callFn: topkImpl,
},
}
// GetFunction returns a predefined Function object for the given

View File

@ -323,6 +323,26 @@ func TestExpressions(t *testing.T) {
checkOrder: true,
fullRanges: 0,
intervalRanges: 8,
}, {
expr: `topk(3, http_requests)`,
output: []string{
`http_requests{group="canary", instance="1", job="app-server"} => 800 @[%v]`,
`http_requests{group="canary", instance="0", job="app-server"} => 700 @[%v]`,
`http_requests{group="production", instance="1", job="app-server"} => 600 @[%v]`,
},
checkOrder: true,
fullRanges: 0,
intervalRanges: 8,
}, {
expr: `bottomk(3, http_requests)`,
output: []string{
`http_requests{group="production", instance="0", job="api-server"} => 100 @[%v]`,
`http_requests{group="production", instance="1", job="api-server"} => 200 @[%v]`,
`http_requests{group="canary", instance="0", job="api-server"} => 300 @[%v]`,
},
checkOrder: true,
fullRanges: 0,
intervalRanges: 8,
}, {
// Single-letter label names and values.
expr: `x{y="testvalue"}`,

View File

@ -1,31 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utility
import (
"sort"
)
// ReverseSorter embeds a sort.Interface value and implements a reverse sort
// over that value.
type ReverseSorter struct {
// This embedded interface permits ReverseSorter to use the methods of
// another Interface implementation.
sort.Interface
}
// Less returns the opposite of the embedded implementation's Less method.
func (r ReverseSorter) Less(i, j int) bool {
return r.Interface.Less(j, i)
}