From 8e4c5b0ceacbbc396d62dfc65333c6362467c355 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 21 Mar 2013 18:06:15 +0100 Subject: [PATCH] Use AST query analyzer and views with tiered storage. --- appstate/appstate.go | 2 +- main.go | 53 ++---------- rules/ast/ast.go | 93 ++++++++++++-------- rules/ast/functions.go | 22 ++--- rules/ast/persistence_adapter.go | 130 +++++++++++++++++++--------- rules/ast/printer.go | 13 ++- rules/ast/query_analyzer.go | 141 +++++++++++++++++++++++++++++++ rules/ast/walk.go | 27 ++++++ rules/rules.go | 4 +- rules/rules_test.go | 99 +++++++++++++++++----- rules/testdata.go | 4 +- web/api/query.go | 7 +- 12 files changed, 431 insertions(+), 164 deletions(-) create mode 100644 rules/ast/query_analyzer.go create mode 100644 rules/ast/walk.go diff --git a/appstate/appstate.go b/appstate/appstate.go index 543f5c4b7..7938227b3 100644 --- a/appstate/appstate.go +++ b/appstate/appstate.go @@ -25,7 +25,7 @@ import ( // require it. type ApplicationState struct { Config *config.Config - Persistence metric.MetricPersistence RuleManager rules.RuleManager + Storage metric.Storage TargetManager retrieval.TargetManager } diff --git a/main.go b/main.go index 6b2024dc3..a74c5e28a 100644 --- a/main.go +++ b/main.go @@ -18,7 +18,6 @@ import ( "fmt" "github.com/prometheus/prometheus/appstate" "github.com/prometheus/prometheus/config" - // "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval/format" "github.com/prometheus/prometheus/rules" @@ -40,7 +39,6 @@ var ( scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.") ruleResultsQueueCapacity = flag.Int("ruleResultsQueueCapacity", 4096, "The size of the rule results queue.") concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") - memoryArena = flag.Bool("experimental.useMemoryArena", false, "Use in-memory timeseries arena.") ) func main() { @@ -50,33 +48,16 @@ func main() { log.Fatalf("Error loading configuration from %s: %v", *configFile, err) } - var ( - persistence metric.MetricPersistence - ts metric.Storage - ) - - if *memoryArena { - persistence = metric.NewMemorySeriesStorage() - } else { - ts = metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) - go ts.Serve() - - // persistence, err = metric.NewLevelDBMetricPersistence(*metricsStoragePath) - // if err != nil { - // log.Fatalf("Error opening storage: %v", err) - // } - } - + ts := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) + go ts.Serve() go func() { notifier := make(chan os.Signal) signal.Notify(notifier, os.Interrupt) <-notifier - // persistence.Close() + ts.Close() os.Exit(0) }() - // defer persistence.Close() - // Queue depth will need to be exposed scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity) @@ -85,7 +66,8 @@ func main() { ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity) - ast.SetPersistence(persistence, nil) + ast.SetStorage(ts) + ruleManager := rules.NewRuleManager(ruleResults, conf.Global.EvaluationInterval) err = ruleManager.AddRulesFromConfig(conf) if err != nil { @@ -94,45 +76,22 @@ func main() { appState := &appstate.ApplicationState{ Config: conf, - Persistence: persistence, RuleManager: ruleManager, + Storage: ts, TargetManager: targetManager, } web.StartServing(appState) - // go func() { - // ticker := time.Tick(time.Second) - // for i := 0; i < 120; i++ { - // <-ticker - // if i%10 == 0 { - // fmt.Printf(".") - // } - // } - // fmt.Println() - // //f := model.NewFingerprintFromRowKey("9776005627788788740-g-131-0") - // f := model.NewFingerprintFromRowKey("09923616460706181007-g-131-0") - // v := metric.NewViewRequestBuilder() - // v.GetMetricAtTime(f, time.Now().Add(-120*time.Second)) - - // view, err := ts.MakeView(v, time.Minute) - // fmt.Println(view, err) - // }() - for { select { case scrapeResult := <-scrapeResults: if scrapeResult.Err == nil { - // f := model.NewFingerprintFromMetric(scrapeResult.Sample.Metric) - // fmt.Println(f) - // persistence.AppendSample(scrapeResult.Sample) ts.AppendSample(scrapeResult.Sample) } case ruleResult := <-ruleResults: for _, sample := range ruleResult.Samples { - // XXX: Wart - // persistence.AppendSample(*sample) ts.AppendSample(*sample) } } diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 44dc7ea21..30b40755d 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -90,29 +90,30 @@ const ( type Node interface { Type() ExprType NodeTreeToDotGraph() string + Children() []Node } // All node types implement one of the following interfaces. The name of the // interface represents the type returned to the parent node. type ScalarNode interface { Node - Eval(timestamp *time.Time) model.SampleValue + Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue } type VectorNode interface { Node - Eval(timestamp *time.Time) Vector + Eval(timestamp *time.Time, view *viewAdapter) Vector } type MatrixNode interface { Node - Eval(timestamp *time.Time) Matrix - EvalBoundaries(timestamp *time.Time) Matrix + Eval(timestamp *time.Time, view *viewAdapter) Matrix + EvalBoundaries(timestamp *time.Time, view *viewAdapter) Matrix } type StringNode interface { Node - Eval(timestamp *time.Time) string + Eval(timestamp *time.Time, view *viewAdapter) string } // ---------------------------------------------------------------------------- @@ -198,6 +199,7 @@ type ( // ---------------------------------------------------------------------------- // Implementations. +// Node.Type() methods. func (node ScalarLiteral) Type() ExprType { return SCALAR } func (node ScalarFunctionCall) Type() ExprType { return SCALAR } func (node ScalarArithExpr) Type() ExprType { return SCALAR } @@ -209,18 +211,30 @@ func (node MatrixLiteral) Type() ExprType { return MATRIX } func (node StringLiteral) Type() ExprType { return STRING } func (node StringFunctionCall) Type() ExprType { return STRING } -func (node *ScalarLiteral) Eval(timestamp *time.Time) model.SampleValue { +// Node.Children() methods. +func (node ScalarLiteral) Children() []Node { return []Node{} } +func (node ScalarFunctionCall) Children() []Node { return node.args } +func (node ScalarArithExpr) Children() []Node { return []Node{node.lhs, node.rhs} } +func (node VectorLiteral) Children() []Node { return []Node{} } +func (node VectorFunctionCall) Children() []Node { return node.args } +func (node VectorAggregation) Children() []Node { return []Node{node.vector} } +func (node VectorArithExpr) Children() []Node { return []Node{node.lhs, node.rhs} } +func (node MatrixLiteral) Children() []Node { return []Node{} } +func (node StringLiteral) Children() []Node { return []Node{} } +func (node StringFunctionCall) Children() []Node { return node.args } + +func (node *ScalarLiteral) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue { return node.value } -func (node *ScalarArithExpr) Eval(timestamp *time.Time) model.SampleValue { - lhs := node.lhs.Eval(timestamp) - rhs := node.rhs.Eval(timestamp) +func (node *ScalarArithExpr) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue { + lhs := node.lhs.Eval(timestamp, view) + rhs := node.rhs.Eval(timestamp, view) return evalScalarBinop(node.opType, lhs, rhs) } -func (node *ScalarFunctionCall) Eval(timestamp *time.Time) model.SampleValue { - return node.function.callFn(timestamp, node.args).(model.SampleValue) +func (node *ScalarFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) model.SampleValue { + return node.function.callFn(timestamp, view, node.args).(model.SampleValue) } func (node *VectorAggregation) labelsToGroupingKey(labels model.Metric) string { @@ -240,11 +254,25 @@ func labelsToKey(labels model.Metric) string { return strings.Join(keyParts, ",") // TODO not safe when label value contains comma. } -func EvalVectorRange(node VectorNode, start time.Time, end time.Time, step time.Duration) Matrix { +func EvalVectorInstant(node VectorNode, timestamp time.Time) (vector Vector) { + viewAdapter, err := viewAdapterForInstantQuery(node, timestamp) + if err != nil { + // TODO: propagate errors. + return + } + return node.Eval(×tamp, viewAdapter) +} + +func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration) (matrix Matrix, err error) { + viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval) + if err != nil { + // TODO: propagate errors. + return + } // TODO implement watchdog timer for long-running queries. sampleSets := map[string]*model.SampleSet{} - for t := start; t.Before(end); t = t.Add(step) { - vector := node.Eval(&t) + for t := start; t.Before(end); t = t.Add(interval) { + vector := node.Eval(&t, viewAdapter) for _, sample := range vector { samplePair := model.SamplePair{ Value: sample.Value, @@ -262,11 +290,10 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, step time. } } - matrix := Matrix{} for _, sampleSet := range sampleSets { matrix = append(matrix, sampleSet) } - return matrix + return } func labelIntersection(metric1, metric2 model.Metric) model.Metric { @@ -295,8 +322,8 @@ func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[stri return vector } -func (node *VectorAggregation) Eval(timestamp *time.Time) Vector { - vector := node.vector.Eval(timestamp) +func (node *VectorAggregation) Eval(timestamp *time.Time, view *viewAdapter) Vector { + vector := node.vector.Eval(timestamp, view) result := map[string]*groupedAggregation{} for _, sample := range vector { groupingKey := node.labelsToGroupingKey(sample.Metric) @@ -328,8 +355,8 @@ func (node *VectorAggregation) Eval(timestamp *time.Time) Vector { return node.groupedAggregationsToVector(result, timestamp) } -func (node *VectorLiteral) Eval(timestamp *time.Time) Vector { - values, err := persistenceAdapter.GetValueAtTime(node.labels, timestamp) +func (node *VectorLiteral) Eval(timestamp *time.Time, view *viewAdapter) Vector { + values, err := view.GetValueAtTime(node.labels, timestamp) if err != nil { log.Printf("Unable to get vector values") return Vector{} @@ -337,8 +364,8 @@ func (node *VectorLiteral) Eval(timestamp *time.Time) Vector { return values } -func (node *VectorFunctionCall) Eval(timestamp *time.Time) Vector { - return node.function.callFn(timestamp, node.args).(Vector) +func (node *VectorFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) Vector { + return node.function.callFn(timestamp, view, node.args).(Vector) } func evalScalarBinop(opType BinOpType, @@ -481,11 +508,11 @@ func labelsEqual(labels1, labels2 model.Metric) bool { return true } -func (node *VectorArithExpr) Eval(timestamp *time.Time) Vector { - lhs := node.lhs.Eval(timestamp) +func (node *VectorArithExpr) Eval(timestamp *time.Time, view *viewAdapter) Vector { + lhs := node.lhs.Eval(timestamp, view) result := Vector{} if node.rhs.Type() == SCALAR { - rhs := node.rhs.(ScalarNode).Eval(timestamp) + rhs := node.rhs.(ScalarNode).Eval(timestamp, view) for _, lhsSample := range lhs { value, keep := evalVectorBinop(node.opType, lhsSample.Value, rhs) if keep { @@ -495,7 +522,7 @@ func (node *VectorArithExpr) Eval(timestamp *time.Time) Vector { } return result } else if node.rhs.Type() == VECTOR { - rhs := node.rhs.(VectorNode).Eval(timestamp) + rhs := node.rhs.(VectorNode).Eval(timestamp, view) for _, lhsSample := range lhs { for _, rhsSample := range rhs { if labelsEqual(lhsSample.Metric, rhsSample.Metric) { @@ -512,12 +539,12 @@ func (node *VectorArithExpr) Eval(timestamp *time.Time) Vector { panic("Invalid vector arithmetic expression operands") } -func (node *MatrixLiteral) Eval(timestamp *time.Time) Matrix { +func (node *MatrixLiteral) Eval(timestamp *time.Time, view *viewAdapter) Matrix { interval := &model.Interval{ OldestInclusive: timestamp.Add(-node.interval), NewestInclusive: *timestamp, } - values, err := persistenceAdapter.GetRangeValues(node.labels, interval) + values, err := view.GetRangeValues(node.labels, interval) if err != nil { log.Printf("Unable to get values for vector interval") return Matrix{} @@ -525,12 +552,12 @@ func (node *MatrixLiteral) Eval(timestamp *time.Time) Matrix { return values } -func (node *MatrixLiteral) EvalBoundaries(timestamp *time.Time) Matrix { +func (node *MatrixLiteral) EvalBoundaries(timestamp *time.Time, view *viewAdapter) Matrix { interval := &model.Interval{ OldestInclusive: timestamp.Add(-node.interval), NewestInclusive: *timestamp, } - values, err := persistenceAdapter.GetBoundaryValues(node.labels, interval) + values, err := view.GetBoundaryValues(node.labels, interval) if err != nil { log.Printf("Unable to get boundary values for vector interval") return Matrix{} @@ -550,12 +577,12 @@ func (matrix Matrix) Swap(i, j int) { matrix[i], matrix[j] = matrix[j], matrix[i] } -func (node *StringLiteral) Eval(timestamp *time.Time) string { +func (node *StringLiteral) Eval(timestamp *time.Time, view *viewAdapter) string { return node.str } -func (node *StringFunctionCall) Eval(timestamp *time.Time) string { - return node.function.callFn(timestamp, node.args).(string) +func (node *StringFunctionCall) Eval(timestamp *time.Time, view *viewAdapter) string { + return node.function.callFn(timestamp, view, node.args).(string) } // ---------------------------------------------------------------------------- diff --git a/rules/ast/functions.go b/rules/ast/functions.go index d51f8ee1d..dba1c023f 100644 --- a/rules/ast/functions.go +++ b/rules/ast/functions.go @@ -24,7 +24,7 @@ type Function struct { name string argTypes []ExprType returnType ExprType - callFn func(timestamp *time.Time, args []Node) interface{} + callFn func(timestamp *time.Time, view *viewAdapter, args []Node) interface{} } func (function *Function) CheckArgTypes(args []Node) error { @@ -63,19 +63,19 @@ func (function *Function) CheckArgTypes(args []Node) error { } // === time() === -func timeImpl(timestamp *time.Time, args []Node) interface{} { +func timeImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { return model.SampleValue(time.Now().Unix()) } // === count(vector VectorNode) === -func countImpl(timestamp *time.Time, args []Node) interface{} { - return model.SampleValue(len(args[0].(VectorNode).Eval(timestamp))) +func countImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { + return model.SampleValue(len(args[0].(VectorNode).Eval(timestamp, view))) } // === delta(matrix MatrixNode, isCounter ScalarNode) === -func deltaImpl(timestamp *time.Time, args []Node) interface{} { +func deltaImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { matrixNode := args[0].(MatrixNode) - isCounter := int(args[1].(ScalarNode).Eval(timestamp)) + isCounter := int(args[1].(ScalarNode).Eval(timestamp, view)) resultVector := Vector{} // If we treat these metrics as counters, we need to fetch all values @@ -83,9 +83,9 @@ func deltaImpl(timestamp *time.Time, args []Node) interface{} { // I.e. if a counter resets, we want to ignore that reset. var matrixValue Matrix if isCounter > 0 { - matrixValue = matrixNode.Eval(timestamp) + matrixValue = matrixNode.Eval(timestamp, view) } else { - matrixValue = matrixNode.EvalBoundaries(timestamp) + matrixValue = matrixNode.EvalBoundaries(timestamp, view) } for _, samples := range matrixValue { counterCorrection := model.SampleValue(0) @@ -109,9 +109,9 @@ func deltaImpl(timestamp *time.Time, args []Node) interface{} { } // === rate(node *MatrixNode) === -func rateImpl(timestamp *time.Time, args []Node) interface{} { +func rateImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { args = append(args, &ScalarLiteral{value: 1}) - vector := deltaImpl(timestamp, args).(Vector) + vector := deltaImpl(timestamp, view, args).(Vector) // TODO: could be other type of MatrixNode in the future (right now, only // MatrixLiteral exists). Find a better way of getting the duration of a @@ -124,7 +124,7 @@ func rateImpl(timestamp *time.Time, args []Node) interface{} { } // === sampleVectorImpl() === -func sampleVectorImpl(timestamp *time.Time, args []Node) interface{} { +func sampleVectorImpl(timestamp *time.Time, view *viewAdapter, args []Node) interface{} { return Vector{ &model.Sample{ Metric: model.Metric{ diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index a57c2b416..9fa82c675 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -22,85 +22,131 @@ import ( var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default staleness delta allowance in seconds during expression evaluations.") -type PersistenceAdapter struct { - persistence metric.MetricPersistence +// AST-global storage to use for operations that are not supported by views +// (i.e. metric->fingerprint lookups). +var queryStorage metric.Storage = nil + +type viewAdapter struct { + view metric.View + // TODO: use this. stalenessPolicy *metric.StalenessPolicy } -// AST-global persistence to use. -var persistenceAdapter *PersistenceAdapter = nil - -func (p *PersistenceAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time) (samples []*model.Sample, err error) { - fingerprints, err := p.persistence.GetFingerprintsForLabelSet(labels) - if err != nil { - return - } - - for _, fingerprint := range fingerprints { - var sample *model.Sample // Don't shadow err. - sample, err = p.persistence.GetValueAtTime(fingerprint, *timestamp, *p.stalenessPolicy) - if err != nil { - return +func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp *time.Time) (sample *model.SamplePair) { + var minDelta time.Duration + for _, candidate := range samples { + // Ignore samples outside of staleness policy window. + delta := candidate.Timestamp.Sub(*timestamp) + if delta < 0 { + delta = -delta } - if sample == nil { + if delta > v.stalenessPolicy.DeltaAllowance { continue } - samples = append(samples, sample) + + // Skip sample if we've seen a closer one before this. + if sample != nil { + if delta > minDelta { + continue + } + } + + sample = &candidate + minDelta = delta } return } -func (p *PersistenceAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { - fingerprints, err := p.persistence.GetFingerprintsForLabelSet(labels) +func (v *viewAdapter) GetValueAtTime(labels model.LabelSet, timestamp *time.Time) (samples []*model.Sample, err error) { + fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) + if err != nil { + return + } + + for _, fingerprint := range fingerprints { + sampleCandidates := v.view.GetValueAtTime(fingerprint, *timestamp) + samplePair := v.chooseClosestSample(sampleCandidates, timestamp) + m, err := queryStorage.GetMetricForFingerprint(fingerprint) + if err != nil { + continue + } + if samplePair != nil { + samples = append(samples, &model.Sample{ + Metric: *m, + Value: samplePair.Value, + Timestamp: *timestamp, + }) + } + } + return +} + +func (v *viewAdapter) GetBoundaryValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { + fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) if err != nil { return } for _, fingerprint := range fingerprints { - var sampleSet *model.SampleSet // Don't shadow err. // TODO: change to GetBoundaryValues() once it has the right return type. - sampleSet, err = p.persistence.GetRangeValues(fingerprint, *interval) - if err != nil { - return nil, err - } - if sampleSet == nil { + samplePairs := v.view.GetRangeValues(fingerprint, *interval) + if samplePairs == nil { continue } + // TODO: memoize/cache this. + m, err := queryStorage.GetMetricForFingerprint(fingerprint) + if err != nil { + continue + } + + sampleSet := &model.SampleSet{ + Metric: *m, + Values: samplePairs, + } sampleSets = append(sampleSets, sampleSet) } return sampleSets, nil } -func (p *PersistenceAdapter) GetRangeValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { - fingerprints, err := p.persistence.GetFingerprintsForLabelSet(labels) +func (v *viewAdapter) GetRangeValues(labels model.LabelSet, interval *model.Interval) (sampleSets []*model.SampleSet, err error) { + fingerprints, err := queryStorage.GetFingerprintsForLabelSet(labels) if err != nil { return } for _, fingerprint := range fingerprints { - var sampleSet *model.SampleSet // Don't shadow err. - sampleSet, err = p.persistence.GetRangeValues(fingerprint, *interval) - if err != nil { - return nil, err - } - if sampleSet == nil { + samplePairs := v.view.GetRangeValues(fingerprint, *interval) + if samplePairs == nil { continue } + // TODO: memoize/cache this. + m, err := queryStorage.GetMetricForFingerprint(fingerprint) + if err != nil { + continue + } + + sampleSet := &model.SampleSet{ + Metric: *m, + Values: samplePairs, + } sampleSets = append(sampleSets, sampleSet) } return sampleSets, nil } -func SetPersistence(persistence metric.MetricPersistence, policy *metric.StalenessPolicy) { - if policy == nil { - policy = &metric.StalenessPolicy{ - DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second, - } +func SetStorage(storage metric.Storage) { + queryStorage = storage +} + +func NewViewAdapter(view metric.View) *viewAdapter { + stalenessPolicy := metric.StalenessPolicy{ + DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second, } - persistenceAdapter = &PersistenceAdapter{ - persistence: persistence, - stalenessPolicy: policy, + + return &viewAdapter{ + view: view, + stalenessPolicy: &stalenessPolicy, } } diff --git a/rules/ast/printer.go b/rules/ast/printer.go index 7e8a08d49..a2f00bffb 100644 --- a/rules/ast/printer.go +++ b/rules/ast/printer.go @@ -152,9 +152,14 @@ func TypedValueToJSON(data interface{}, typeStr string) string { } func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string { + viewAdapter, err := viewAdapterForInstantQuery(node, *timestamp) + if err != nil { + panic(err) + } + switch node.Type() { case SCALAR: - scalar := node.(ScalarNode).Eval(timestamp) + scalar := node.(ScalarNode).Eval(timestamp, viewAdapter) switch format { case TEXT: return fmt.Sprintf("scalar: %v", scalar) @@ -162,7 +167,7 @@ func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string { return TypedValueToJSON(scalar, "scalar") } case VECTOR: - vector := node.(VectorNode).Eval(timestamp) + vector := node.(VectorNode).Eval(timestamp, viewAdapter) switch format { case TEXT: return vector.String() @@ -170,7 +175,7 @@ func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string { return TypedValueToJSON(vector, "vector") } case MATRIX: - matrix := node.(MatrixNode).Eval(timestamp) + matrix := node.(MatrixNode).Eval(timestamp, viewAdapter) switch format { case TEXT: return matrix.String() @@ -178,7 +183,7 @@ func EvalToString(node Node, timestamp *time.Time, format OutputFormat) string { return TypedValueToJSON(matrix, "matrix") } case STRING: - str := node.(StringNode).Eval(timestamp) + str := node.(StringNode).Eval(timestamp, viewAdapter) switch format { case TEXT: return str diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go new file mode 100644 index 000000000..17578509d --- /dev/null +++ b/rules/ast/query_analyzer.go @@ -0,0 +1,141 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ast + +import ( + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage/metric" + "log" + "time" +) + +type FullRangeMap map[model.Fingerprint]time.Duration +type IntervalRangeMap map[model.Fingerprint]bool + +type QueryAnalyzer struct { + // Values collected by query analysis. + // + // Full ranges always implicitly span a time range of: + // - start: query interval start - duration + // - end: query interval end + // + // This is because full ranges can only result from matrix literals (like + // "foo[5m]"), which have said time-spanning behavior during a ranged query. + FullRanges FullRangeMap + // Interval ranges always implicitly span the whole query interval. + IntervalRanges IntervalRangeMap +} + +func NewQueryAnalyzer() *QueryAnalyzer { + return &QueryAnalyzer{ + FullRanges: FullRangeMap{}, + IntervalRanges: IntervalRangeMap{}, + } +} + +func minTime(t1, t2 time.Time) time.Time { + if t1.Before(t2) { + return t1 + } + return t2 +} + +func maxTime(t1, t2 time.Time) time.Time { + if t1.After(t2) { + return t1 + } + return t2 +} + +func (analyzer *QueryAnalyzer) Visit(node Node) { + switch n := node.(type) { + case *VectorLiteral: + fingerprints, err := queryStorage.GetFingerprintsForLabelSet(n.labels) + if err != nil { + log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) + return + } + for _, fingerprint := range fingerprints { + if !analyzer.IntervalRanges[fingerprint] { + analyzer.IntervalRanges[fingerprint] = true + } + } + case *MatrixLiteral: + fingerprints, err := queryStorage.GetFingerprintsForLabelSet(n.labels) + if err != nil { + log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) + return + } + for _, fingerprint := range fingerprints { + interval := n.interval + // If an interval has already been recorded for this fingerprint, merge + // it with the current interval. + if oldInterval, ok := analyzer.FullRanges[fingerprint]; ok { + if oldInterval > interval { + interval = oldInterval + } + } + analyzer.FullRanges[fingerprint] = interval + } + } +} + +func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) { + Walk(analyzer, node) + // Find and dedupe overlaps between full and stepped ranges. Full ranges + // always contain more points *and* span more time than stepped ranges, so + // throw away stepped ranges for fingerprints which have full ranges. + for fingerprint := range analyzer.FullRanges { + if analyzer.IntervalRanges[fingerprint] { + delete(analyzer.IntervalRanges, fingerprint) + } + } +} + +func viewAdapterForInstantQuery(node Node, timestamp time.Time) (viewAdapter *viewAdapter, err error) { + analyzer := NewQueryAnalyzer() + analyzer.AnalyzeQueries(node) + viewBuilder := metric.NewViewRequestBuilder() + for fingerprint, rangeDuration := range analyzer.FullRanges { + viewBuilder.GetMetricRange(fingerprint, timestamp.Add(-rangeDuration), timestamp) + } + for fingerprint := range analyzer.IntervalRanges { + viewBuilder.GetMetricAtTime(fingerprint, timestamp) + } + view, err := queryStorage.MakeView(viewBuilder, time.Duration(60)*time.Second) + if err == nil { + viewAdapter = NewViewAdapter(view) + } + return +} + +func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration) (viewAdapter *viewAdapter, err error) { + analyzer := NewQueryAnalyzer() + analyzer.AnalyzeQueries(node) + viewBuilder := metric.NewViewRequestBuilder() + for fingerprint, rangeDuration := range analyzer.FullRanges { + // TODO: we should support GetMetricRangeAtInterval() or similar ops in the view builder. + for t := start; t.Before(end); t = t.Add(interval) { + viewBuilder.GetMetricRange(fingerprint, t.Add(-rangeDuration), t) + } + } + for fingerprint := range analyzer.IntervalRanges { + viewBuilder.GetMetricAtInterval(fingerprint, start, end, interval) + } + view, err := queryStorage.MakeView(viewBuilder, time.Duration(60)*time.Second) + if err == nil { + viewAdapter = NewViewAdapter(view) + } + return +} diff --git a/rules/ast/walk.go b/rules/ast/walk.go new file mode 100644 index 000000000..c74c0fd9d --- /dev/null +++ b/rules/ast/walk.go @@ -0,0 +1,27 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ast + +type Visitor interface { + Visit(node Node) +} + +// Walk() does a depth-first traversal of the AST, calling visitor.Visit() for +// each encountered node in the tree. +func Walk(visitor Visitor, node Node) { + visitor.Visit(node) + for _, childNode := range node.Children() { + Walk(visitor, childNode) + } +} diff --git a/rules/rules.go b/rules/rules.go index cc6b36daf..adefaea7c 100644 --- a/rules/rules.go +++ b/rules/rules.go @@ -30,8 +30,8 @@ type Rule struct { func (rule *Rule) Name() string { return rule.name } -func (rule *Rule) EvalRaw(timestamp *time.Time) ast.Vector { - return rule.vector.Eval(timestamp) +func (rule *Rule) EvalRaw(timestamp *time.Time) (vector ast.Vector) { + return ast.EvalVectorInstant(rule.vector, *timestamp) } func (rule *Rule) Eval(timestamp *time.Time) ast.Vector { diff --git a/rules/rules_test.go b/rules/rules_test.go index bd763da3e..b0f1d400d 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -21,6 +21,7 @@ import ( "os" "strings" "testing" + "time" ) var testEvalTime = testStartTime.Add(testDuration5m * 10) @@ -28,19 +29,25 @@ var testEvalTime = testStartTime.Add(testDuration5m * 10) // Expected output needs to be alphabetically sorted (labels within one line // must be sorted and lines between each other must be sorted too). var expressionTests = []struct { - expr string - output []string - shouldFail bool + expr string + output []string + shouldFail bool + fullRanges int + intervalRanges int }{ { - expr: "SUM(http_requests)", - output: []string{"http_requests{} => 3600 @[%v]"}, + expr: "SUM(http_requests)", + output: []string{"http_requests{} => 3600 @[%v]"}, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job)", output: []string{ "http_requests{job='api-server'} => 1000 @[%v]", "http_requests{job='app-server'} => 2600 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job, group)", output: []string{ @@ -49,74 +56,116 @@ var expressionTests = []struct { "http_requests{group='production',job='api-server'} => 300 @[%v]", "http_requests{group='production',job='app-server'} => 1100 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "AVG(http_requests) BY (job)", output: []string{ "http_requests{job='api-server'} => 250 @[%v]", "http_requests{job='app-server'} => 650 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "MIN(http_requests) BY (job)", output: []string{ "http_requests{job='api-server'} => 100 @[%v]", "http_requests{job='app-server'} => 500 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "MAX(http_requests) BY (job)", output: []string{ "http_requests{job='api-server'} => 400 @[%v]", "http_requests{job='app-server'} => 800 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job) - count(http_requests)", output: []string{ "http_requests{job='api-server'} => 992 @[%v]", "http_requests{job='app-server'} => 2592 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job) - 2", output: []string{ "http_requests{job='api-server'} => 998 @[%v]", "http_requests{job='app-server'} => 2598 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job) % 3", output: []string{ "http_requests{job='api-server'} => 1 @[%v]", "http_requests{job='app-server'} => 2 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job) / 0", output: []string{ "http_requests{job='api-server'} => +Inf @[%v]", "http_requests{job='app-server'} => +Inf @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job) > 1000", output: []string{ "http_requests{job='app-server'} => 2600 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job) <= 1000", output: []string{ "http_requests{job='api-server'} => 1000 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job) != 1000", output: []string{ "http_requests{job='app-server'} => 2600 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job) == 1000", output: []string{ "http_requests{job='api-server'} => 1000 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, }, { expr: "SUM(http_requests) BY (job) + SUM(http_requests) BY (job)", output: []string{ "http_requests{job='api-server'} => 2000 @[%v]", "http_requests{job='app-server'} => 5200 @[%v]", }, + fullRanges: 0, + intervalRanges: 8, + }, { + expr: "http_requests{job='api-server', group='canary'}", + output: []string{ + "http_requests{group='canary',instance='0',job='api-server'} => 300 @[%v]", + "http_requests{group='canary',instance='1',job='api-server'} => 400 @[%v]", + }, + fullRanges: 0, + intervalRanges: 2, + }, { + expr: "http_requests{job='api-server', group='canary'} + delta(http_requests{job='api-server'}[5m], 1)", + output: []string{ + "http_requests{group='canary',instance='0',job='api-server'} => 330 @[%v]", + "http_requests{group='canary',instance='1',job='api-server'} => 440 @[%v]", + }, + fullRanges: 4, + intervalRanges: 0, }, { expr: "delta(http_requests[25m], 1)", output: []string{ @@ -129,6 +178,8 @@ var expressionTests = []struct { "http_requests{group='production',instance='1',job='api-server'} => 100 @[%v]", "http_requests{group='production',instance='1',job='app-server'} => 300 @[%v]", }, + fullRanges: 8, + intervalRanges: 0, // Invalid expressions that should fail to parse. }, { expr: "", @@ -162,31 +213,24 @@ func vectorComparisonString(expected []string, actual []string) string { } func TestExpressions(t *testing.T) { - temporaryDirectory, err := ioutil.TempDir("", "leveldb_metric_persistence_test") + temporaryDirectory, err := ioutil.TempDir("", "rule_expression_tests") if err != nil { t.Errorf("Could not create temporary directory: %q\n", err) return } + tieredStorage := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, temporaryDirectory) + go tieredStorage.Serve() defer func() { + tieredStorage.Close() if err = os.RemoveAll(temporaryDirectory); err != nil { t.Errorf("Could not remove temporary directory: %q\n", err) } }() - persistence, err := metric.NewLevelDBMetricPersistence(temporaryDirectory) - if err != nil { - t.Errorf("Could not create LevelDB Metric Persistence: %q\n", err) - return - } - if persistence == nil { - t.Errorf("Received nil LevelDB Metric Persistence.\n") - return - } - defer func() { - persistence.Close() - }() - storeMatrix(persistence, testMatrix) - ast.SetPersistence(persistence, nil) + ast.SetStorage(tieredStorage) + + storeMatrix(tieredStorage, testMatrix) + tieredStorage.Flush() for _, exprTest := range expressionTests { expectedLines := annotateWithTime(exprTest.output) @@ -200,6 +244,9 @@ func TestExpressions(t *testing.T) { t.Errorf("Error during parsing: %v", err) t.Errorf("Expression: %v", exprTest.expr) } else { + if exprTest.shouldFail { + t.Errorf("Test should fail, but didn't") + } failed := false resultStr := ast.EvalToString(testExpr, &testEvalTime, ast.TEXT) resultLines := strings.Split(resultStr, "\n") @@ -221,6 +268,18 @@ func TestExpressions(t *testing.T) { failed = true } } + + analyzer := ast.NewQueryAnalyzer() + analyzer.AnalyzeQueries(testExpr) + if exprTest.fullRanges != len(analyzer.FullRanges) { + t.Errorf("Count of full ranges didn't match: %v vs %v", exprTest.fullRanges, len(analyzer.FullRanges)) + failed = true + } + if exprTest.intervalRanges != len(analyzer.IntervalRanges) { + t.Errorf("Count of stepped ranges didn't match: %v vs %v", exprTest.intervalRanges, len(analyzer.IntervalRanges)) + failed = true + } + if failed { t.Errorf("Expression: %v\n%v", exprTest.expr, vectorComparisonString(expectedLines, resultLines)) } diff --git a/rules/testdata.go b/rules/testdata.go index 958846abf..49f7c54ba 100644 --- a/rules/testdata.go +++ b/rules/testdata.go @@ -51,10 +51,10 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector { return vector } -func storeMatrix(persistence metric.MetricPersistence, matrix ast.Matrix) error { +func storeMatrix(storage metric.Storage, matrix ast.Matrix) error { for _, sampleSet := range matrix { for _, sample := range sampleSet.Values { - err := persistence.AppendSample(model.Sample{ + err := storage.AppendSample(model.Sample{ Metric: sampleSet.Metric, Value: sample.Value, Timestamp: sample.Timestamp, diff --git a/web/api/query.go b/web/api/query.go index 9448acc3b..eb9e27dcc 100644 --- a/web/api/query.go +++ b/web/api/query.go @@ -72,18 +72,21 @@ func (serv MetricsService) QueryRange(expr string, end int64, duration int64, st // Align the start to step "tick" boundary. end -= end % step - matrix := ast.EvalVectorRange( + matrix, err := ast.EvalVectorRange( exprNode.(ast.VectorNode), time.Unix(end-duration, 0), time.Unix(end, 0), time.Duration(step)*time.Second) + if err != nil { + return ast.ErrorToJSON(err) + } sort.Sort(matrix) return ast.TypedValueToJSON(matrix, "matrix") } func (serv MetricsService) Metrics() string { - metricNames, err := serv.appState.Persistence.GetAllMetricNames() + metricNames, err := serv.appState.Storage.GetAllMetricNames() rb := serv.ResponseBuilder() rb.SetContentType(gorest.Application_Json) if err != nil {