Make topk/bottomk aggregators.

This commit is contained in:
Brian Brazil 2016-07-04 13:10:42 +01:00
parent f26823afa7
commit 3e5136e36d
8 changed files with 117 additions and 85 deletions

View File

@ -103,6 +103,7 @@ type Expressions []Expr
type AggregateExpr struct {
Op itemType // The used aggregation operation.
Expr Expr // The vector expression over which is aggregated.
Param Expr // Parameter used by some aggregators.
Grouping model.LabelNames // The labels by which to group the vector.
Without bool // Whether to drop the given labels rather than keep them.
KeepCommonLabels bool // Whether to keep common labels among result elements.

View File

@ -14,6 +14,7 @@
package promql
import (
"container/heap"
"fmt"
"math"
"runtime"
@ -610,7 +611,7 @@ func (ev *evaluator) eval(expr Expr) model.Value {
switch e := expr.(type) {
case *AggregateExpr:
vector := ev.evalVector(e.Expr)
return ev.aggregation(e.Op, e.Grouping, e.Without, e.KeepCommonLabels, vector)
return ev.aggregation(e.Op, e.Grouping, e.Without, e.KeepCommonLabels, e.Param, vector)
case *BinaryExpr:
lhs := ev.evalOneOf(e.LHS, model.ValScalar, model.ValVector)
@ -1060,15 +1061,24 @@ type groupedAggregation struct {
value model.SampleValue
valuesSquaredSum model.SampleValue
groupCount int
heap vectorByValueHeap
reverseHeap vectorByReverseValueHeap
}
// aggregation evaluates an aggregation operation on a vector.
func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without bool, keepCommon bool, vec vector) vector {
func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without bool, keepCommon bool, param Expr, vec vector) vector {
result := map[uint64]*groupedAggregation{}
var k int
if op == itemTopK || op == itemBottomK {
k = ev.evalInt(param)
if k < 1 {
return vector{}
}
}
for _, sample := range vec {
withoutMetric := sample.Metric
for _, s := range vec {
withoutMetric := s.Metric
if without {
for _, l := range grouping {
withoutMetric.Del(l)
@ -1080,7 +1090,7 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
if without {
groupingKey = uint64(withoutMetric.Metric.Fingerprint())
} else {
groupingKey = model.SignatureForLabels(sample.Metric.Metric, grouping...)
groupingKey = model.SignatureForLabels(s.Metric.Metric, grouping...)
}
groupedResult, ok := result[groupingKey]
@ -1088,7 +1098,7 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
if !ok {
var m metric.Metric
if keepCommon {
m = sample.Metric
m = s.Metric
m.Del(model.MetricNameLabel)
} else if without {
m = withoutMetric
@ -1098,44 +1108,65 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
Copied: true,
}
for _, l := range grouping {
if v, ok := sample.Metric.Metric[l]; ok {
if v, ok := s.Metric.Metric[l]; ok {
m.Set(l, v)
}
}
}
result[groupingKey] = &groupedAggregation{
labels: m,
value: sample.Value,
valuesSquaredSum: sample.Value * sample.Value,
value: s.Value,
valuesSquaredSum: s.Value * s.Value,
groupCount: 1,
}
if op == itemTopK {
result[groupingKey].heap = make(vectorByValueHeap, 0, k)
heap.Push(&result[groupingKey].heap, &sample{Value: s.Value, Metric: s.Metric})
} else if op == itemBottomK {
result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 0, k)
heap.Push(&result[groupingKey].reverseHeap, &sample{Value: s.Value, Metric: s.Metric})
}
continue
}
// Add the sample to the existing group.
if keepCommon {
groupedResult.labels = labelIntersection(groupedResult.labels, sample.Metric)
groupedResult.labels = labelIntersection(groupedResult.labels, s.Metric)
}
switch op {
case itemSum:
groupedResult.value += sample.Value
groupedResult.value += s.Value
case itemAvg:
groupedResult.value += sample.Value
groupedResult.value += s.Value
groupedResult.groupCount++
case itemMax:
if groupedResult.value < sample.Value || math.IsNaN(float64(groupedResult.value)) {
groupedResult.value = sample.Value
if groupedResult.value < s.Value || math.IsNaN(float64(groupedResult.value)) {
groupedResult.value = s.Value
}
case itemMin:
if groupedResult.value > sample.Value || math.IsNaN(float64(groupedResult.value)) {
groupedResult.value = sample.Value
if groupedResult.value > s.Value || math.IsNaN(float64(groupedResult.value)) {
groupedResult.value = s.Value
}
case itemCount:
groupedResult.groupCount++
case itemStdvar, itemStddev:
groupedResult.value += sample.Value
groupedResult.valuesSquaredSum += sample.Value * sample.Value
groupedResult.value += s.Value
groupedResult.valuesSquaredSum += s.Value * s.Value
groupedResult.groupCount++
case itemTopK:
if len(groupedResult.heap) < k || groupedResult.heap[0].Value < s.Value || math.IsNaN(float64(groupedResult.heap[0].Value)) {
if len(groupedResult.heap) == k {
heap.Pop(&groupedResult.heap)
}
heap.Push(&groupedResult.heap, &sample{Value: s.Value, Metric: s.Metric})
}
case itemBottomK:
if len(groupedResult.reverseHeap) < k || groupedResult.reverseHeap[0].Value > s.Value || math.IsNaN(float64(groupedResult.reverseHeap[0].Value)) {
if len(groupedResult.reverseHeap) == k {
heap.Pop(&groupedResult.reverseHeap)
}
heap.Push(&groupedResult.reverseHeap, &sample{Value: s.Value, Metric: s.Metric})
}
default:
panic(fmt.Errorf("expected aggregation operator but got %q", op))
}
@ -1156,6 +1187,28 @@ func (ev *evaluator) aggregation(op itemType, grouping model.LabelNames, without
case itemStddev:
avg := float64(aggr.value) / float64(aggr.groupCount)
aggr.value = model.SampleValue(math.Sqrt(float64(aggr.valuesSquaredSum)/float64(aggr.groupCount) - avg*avg))
case itemTopK:
// The heap keeps the lowest value on top, so reverse it.
sort.Sort(sort.Reverse(aggr.heap))
for _, v := range aggr.heap {
resultVector = append(resultVector, &sample{
Metric: v.Metric,
Value: v.Value,
Timestamp: ev.Timestamp,
})
}
continue // Bypass default append.
case itemBottomK:
// The heap keeps the lowest value on top, so reverse it.
sort.Sort(sort.Reverse(aggr.reverseHeap))
for _, v := range aggr.reverseHeap {
resultVector = append(resultVector, &sample{
Metric: v.Metric,
Value: v.Value,
Timestamp: ev.Timestamp,
})
}
continue // Bypass default append.
default:
// For other aggregations, we already have the right value.
}

View File

@ -14,7 +14,6 @@
package promql
import (
"container/heap"
"math"
"regexp"
"sort"
@ -298,52 +297,6 @@ func funcSortDesc(ev *evaluator, args Expressions) model.Value {
return vector(byValueSorter)
}
// === topk(k model.ValScalar, node model.ValVector) Vector ===
func funcTopk(ev *evaluator, args Expressions) model.Value {
k := ev.evalInt(args[0])
if k < 1 {
return vector{}
}
vec := ev.evalVector(args[1])
topk := make(vectorByValueHeap, 0, k)
for _, el := range vec {
if len(topk) < k || topk[0].Value < el.Value || math.IsNaN(float64(topk[0].Value)) {
if len(topk) == k {
heap.Pop(&topk)
}
heap.Push(&topk, el)
}
}
// The heap keeps the lowest value on top, so reverse it.
sort.Sort(sort.Reverse(topk))
return vector(topk)
}
// === bottomk(k model.ValScalar, node model.ValVector) Vector ===
func funcBottomk(ev *evaluator, args Expressions) model.Value {
k := ev.evalInt(args[0])
if k < 1 {
return vector{}
}
vec := ev.evalVector(args[1])
bottomk := make(vectorByReverseValueHeap, 0, k)
for _, el := range vec {
if len(bottomk) < k || bottomk[0].Value > el.Value || math.IsNaN(float64(bottomk[0].Value)) {
if len(bottomk) == k {
heap.Pop(&bottomk)
}
heap.Push(&bottomk, el)
}
}
// The heap keeps the highest value on top, so reverse it.
sort.Sort(sort.Reverse(bottomk))
return vector(bottomk)
}
// === clamp_max(vector model.ValVector, max Scalar) Vector ===
func funcClampMax(ev *evaluator, args Expressions) model.Value {
vec := ev.evalVector(args[0])
@ -866,12 +819,6 @@ var functions = map[string]*Function{
ReturnType: model.ValVector,
Call: funcAvgOverTime,
},
"bottomk": {
Name: "bottomk",
ArgTypes: []model.ValueType{model.ValScalar, model.ValVector},
ReturnType: model.ValVector,
Call: funcBottomk,
},
"ceil": {
Name: "ceil",
ArgTypes: []model.ValueType{model.ValVector},
@ -1053,12 +1000,6 @@ var functions = map[string]*Function{
ReturnType: model.ValScalar,
Call: funcTime,
},
"topk": {
Name: "topk",
ArgTypes: []model.ValueType{model.ValScalar, model.ValVector},
ReturnType: model.ValVector,
Call: funcTopk,
},
"vector": {
Name: "vector",
ArgTypes: []model.ValueType{model.ValScalar},

View File

@ -58,6 +58,10 @@ func (i itemType) isOperator() bool { return i > operatorsStart && i < operators
// Returns false otherwise
func (i itemType) isAggregator() bool { return i > aggregatorsStart && i < aggregatorsEnd }
// isAggregator returns true if the item is an aggregator that takes a parameter.
// Returns false otherwise
func (i itemType) isAggregatorWithParam() bool { return i == itemTopK || i == itemBottomK }
// isKeyword returns true if the item corresponds to a keyword.
// Returns false otherwise.
func (i itemType) isKeyword() bool { return i > keywordsStart && i < keywordsEnd }
@ -170,6 +174,8 @@ const (
itemMax
itemStddev
itemStdvar
itemTopK
itemBottomK
aggregatorsEnd
keywordsStart
@ -203,13 +209,15 @@ var key = map[string]itemType{
"unless": itemLUnless,
// Aggregators.
"sum": itemSum,
"avg": itemAvg,
"count": itemCount,
"min": itemMin,
"max": itemMax,
"stddev": itemStddev,
"stdvar": itemStdvar,
"sum": itemSum,
"avg": itemAvg,
"count": itemCount,
"min": itemMin,
"max": itemMax,
"stddev": itemStddev,
"stdvar": itemStdvar,
"topk": itemTopK,
"bottomk": itemBottomK,
// Keywords.
"alert": itemAlert,

View File

@ -719,6 +719,11 @@ func (p *parser) aggrExpr() *AggregateExpr {
}
p.expect(itemLeftParen, ctx)
var param Expr
if agop.typ.isAggregatorWithParam() {
param = p.expr()
p.expect(itemComma, ctx)
}
e := p.expr()
p.expect(itemRightParen, ctx)
@ -746,6 +751,7 @@ func (p *parser) aggrExpr() *AggregateExpr {
return &AggregateExpr{
Op: agop.typ,
Expr: e,
Param: param,
Grouping: grouping,
Without: without,
KeepCommonLabels: keepCommon,

View File

@ -1201,6 +1201,18 @@ var testExpr = []struct {
},
Grouping: model.LabelNames{},
},
}, {
input: "topk(5, some_metric)",
expected: &AggregateExpr{
Op: itemTopK,
Expr: &VectorSelector{
Name: "some_metric",
LabelMatchers: metric.LabelMatchers{
{Type: metric.Equal, Name: model.MetricNameLabel, Value: "some_metric"},
},
},
Param: &NumberLiteral{5},
},
}, {
input: `sum some_metric by (test)`,
fail: true,
@ -1237,6 +1249,10 @@ var testExpr = []struct {
input: `sum without (test) (some_metric) by (test)`,
fail: true,
errMsg: "could not parse remaining input \"by (test)\"...",
}, {
input: `topk(some_metric)`,
fail: true,
errMsg: "parse error at char 17: unexpected \")\" in aggregation, expected \",\"",
},
// Test function calls.
{

View File

@ -135,7 +135,11 @@ func (es Expressions) String() (s string) {
}
func (node *AggregateExpr) String() string {
aggrString := fmt.Sprintf("%s(%s)", node.Op, node.Expr)
aggrString := fmt.Sprintf("%s(", node.Op)
if node.Op.isAggregatorWithParam() {
aggrString += fmt.Sprintf("%s, ", node.Param)
}
aggrString += fmt.Sprintf("%s)", node.Expr)
if len(node.Grouping) > 0 {
var format string
if node.Without {

View File

@ -71,6 +71,9 @@ func TestExprString(t *testing.T) {
{
in: `sum(task:errors:rate10s{job="s"}) WITHOUT (instance)`,
},
{
in: `topk(5, task:errors:rate10s{job="s"})`,
},
{
in: `a - ON(b) c`,
},