Implement topk() and bottomk() functions.

To achieve O(log n * k) runtime, this uses a heap to track the current
bottom-k or top-k elements while iterating over the full set of
available elements.

It would be possible to reuse more code between topk and bottomk, but I
decided for some more duplication for the sake of clarity.

This fixes https://github.com/prometheus/prometheus/issues/399

Change-Id: I7487ddaadbe7acb22ca2cf2283ba6e7915f2b336
This commit is contained in:
Julius Volz 2014-08-05 18:57:47 +02:00
parent 24ece38f7c
commit 200d02effe
3 changed files with 112 additions and 54 deletions

View File

@ -14,6 +14,7 @@
package ast package ast
import ( import (
"container/heap"
"fmt" "fmt"
"math" "math"
"sort" "sort"
@ -22,7 +23,6 @@ import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility"
) )
// Function represents a function of the expression language and is // Function represents a function of the expression language and is
@ -138,7 +138,7 @@ func deltaImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node)
return resultVector return resultVector
} }
// === rate(node *MatrixNode) Vector === // === rate(node MatrixNode) Vector ===
func rateImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} { func rateImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
args = append(args, &ScalarLiteral{value: 1}) args = append(args, &ScalarLiteral{value: 1})
vector := deltaImpl(timestamp, view, args).(Vector) vector := deltaImpl(timestamp, view, args).(Vector)
@ -153,40 +153,97 @@ func rateImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) i
return vector return vector
} }
type vectorByValueSorter struct { type vectorByValueHeap Vector
vector Vector
func (s vectorByValueHeap) Len() int {
return len(s)
} }
func (sorter vectorByValueSorter) Len() int { func (s vectorByValueHeap) Less(i, j int) bool {
return len(sorter.vector) return s[i].Value < s[j].Value
} }
func (sorter vectorByValueSorter) Less(i, j int) bool { func (s vectorByValueHeap) Swap(i, j int) {
return sorter.vector[i].Value < sorter.vector[j].Value s[i], s[j] = s[j], s[i]
} }
func (sorter vectorByValueSorter) Swap(i, j int) { func (s *vectorByValueHeap) Push(x interface{}) {
sorter.vector[i], sorter.vector[j] = sorter.vector[j], sorter.vector[i] *s = append(*s, x.(*clientmodel.Sample))
} }
// === sort(node *VectorNode) Vector === func (s *vectorByValueHeap) Pop() interface{} {
old := *s
n := len(old)
el := old[n-1]
*s = old[0 : n-1]
return el
}
type reverseHeap struct {
heap.Interface
}
func (s reverseHeap) Less(i, j int) bool {
return s.Interface.Less(j, i)
}
// === sort(node VectorNode) Vector ===
func sortImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} { func sortImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
byValueSorter := vectorByValueSorter{ byValueSorter := vectorByValueHeap(args[0].(VectorNode).Eval(timestamp, view))
vector: args[0].(VectorNode).Eval(timestamp, view),
}
sort.Sort(byValueSorter) sort.Sort(byValueSorter)
return byValueSorter.vector return Vector(byValueSorter)
} }
// === sortDesc(node *VectorNode) Vector === // === sortDesc(node VectorNode) Vector ===
func sortDescImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} { func sortDescImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
descByValueSorter := utility.ReverseSorter{ byValueSorter := vectorByValueHeap(args[0].(VectorNode).Eval(timestamp, view))
Interface: vectorByValueSorter{ sort.Sort(sort.Reverse(byValueSorter))
vector: args[0].(VectorNode).Eval(timestamp, view), return Vector(byValueSorter)
}, }
// === topk(k ScalarNode, node VectorNode) Vector ===
func topkImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
k := int(args[0].(ScalarNode).Eval(timestamp, view))
if k < 1 {
return Vector{}
} }
sort.Sort(descByValueSorter)
return descByValueSorter.Interface.(vectorByValueSorter).vector topk := make(vectorByValueHeap, 0, k)
vector := args[1].(VectorNode).Eval(timestamp, view)
for _, el := range vector {
if len(topk) < k || topk[0].Value < el.Value {
if len(topk) == k {
heap.Pop(&topk)
}
heap.Push(&topk, el)
}
}
sort.Sort(sort.Reverse(topk))
return Vector(topk)
}
// === bottomk(k ScalarNode, node VectorNode) Vector ===
func bottomkImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
k := int(args[0].(ScalarNode).Eval(timestamp, view))
if k < 1 {
return Vector{}
}
bottomk := make(vectorByValueHeap, 0, k)
bkHeap := reverseHeap{Interface: &bottomk}
vector := args[1].(VectorNode).Eval(timestamp, view)
for _, el := range vector {
if len(bottomk) < k || bottomk[0].Value > el.Value {
if len(bottomk) == k {
heap.Pop(&bkHeap)
}
heap.Push(&bkHeap, el)
}
}
sort.Sort(bottomk)
return Vector(bottomk)
} }
// === sampleVectorImpl() Vector === // === sampleVectorImpl() Vector ===
@ -262,7 +319,7 @@ func sampleVectorImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args [
} }
} }
// === scalar(node *VectorNode) Scalar === // === scalar(node VectorNode) Scalar ===
func scalarImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} { func scalarImpl(timestamp clientmodel.Timestamp, view *viewAdapter, args []Node) interface{} {
v := args[0].(VectorNode).Eval(timestamp, view) v := args[0].(VectorNode).Eval(timestamp, view)
if len(v) != 1 { if len(v) != 1 {
@ -369,6 +426,12 @@ var functions = map[string]*Function{
returnType: VECTOR, returnType: VECTOR,
callFn: avgOverTimeImpl, callFn: avgOverTimeImpl,
}, },
"bottomk": {
name: "bottomk",
argTypes: []ExprType{SCALAR, VECTOR},
returnType: VECTOR,
callFn: bottomkImpl,
},
"count_over_time": { "count_over_time": {
name: "count_over_time", name: "count_over_time",
argTypes: []ExprType{MATRIX}, argTypes: []ExprType{MATRIX},
@ -441,6 +504,12 @@ var functions = map[string]*Function{
returnType: SCALAR, returnType: SCALAR,
callFn: timeImpl, callFn: timeImpl,
}, },
"topk": {
name: "topk",
argTypes: []ExprType{SCALAR, VECTOR},
returnType: VECTOR,
callFn: topkImpl,
},
} }
// GetFunction returns a predefined Function object for the given // GetFunction returns a predefined Function object for the given

View File

@ -323,6 +323,26 @@ func TestExpressions(t *testing.T) {
checkOrder: true, checkOrder: true,
fullRanges: 0, fullRanges: 0,
intervalRanges: 8, intervalRanges: 8,
}, {
expr: `topk(3, http_requests)`,
output: []string{
`http_requests{group="canary", instance="1", job="app-server"} => 800 @[%v]`,
`http_requests{group="canary", instance="0", job="app-server"} => 700 @[%v]`,
`http_requests{group="production", instance="1", job="app-server"} => 600 @[%v]`,
},
checkOrder: true,
fullRanges: 0,
intervalRanges: 8,
}, {
expr: `bottomk(3, http_requests)`,
output: []string{
`http_requests{group="production", instance="0", job="api-server"} => 100 @[%v]`,
`http_requests{group="production", instance="1", job="api-server"} => 200 @[%v]`,
`http_requests{group="canary", instance="0", job="api-server"} => 300 @[%v]`,
},
checkOrder: true,
fullRanges: 0,
intervalRanges: 8,
}, { }, {
// Single-letter label names and values. // Single-letter label names and values.
expr: `x{y="testvalue"}`, expr: `x{y="testvalue"}`,

View File

@ -1,31 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utility
import (
"sort"
)
// ReverseSorter embeds a sort.Interface value and implements a reverse sort
// over that value.
type ReverseSorter struct {
// This embedded interface permits ReverseSorter to use the methods of
// another Interface implementation.
sort.Interface
}
// Less returns the opposite of the embedded implementation's Less method.
func (r ReverseSorter) Less(i, j int) bool {
return r.Interface.Less(j, i)
}