diff --git a/main.go b/main.go index 71b53e5ba..ec18a7aa5 100644 --- a/main.go +++ b/main.go @@ -131,9 +131,7 @@ func main() { go ts.Serve() go prometheus.interruptHandler() - ast.SetStorage(*ts) - - ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval()) + ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval(), ts) err = ruleManager.AddRulesFromConfig(conf) if err != nil { log.Fatalf("Error loading rule files: %v", err) diff --git a/rules/alerting.go b/rules/alerting.go index 0063ded4e..7b259ebeb 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -17,6 +17,7 @@ import ( "fmt" "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/rules/ast" + "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/utility" "time" ) @@ -87,13 +88,13 @@ type AlertingRule struct { func (rule AlertingRule) Name() string { return rule.name } -func (rule AlertingRule) EvalRaw(timestamp time.Time) (vector ast.Vector, err error) { - return ast.EvalVectorInstant(rule.vector, timestamp) +func (rule AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) { + return ast.EvalVectorInstant(rule.vector, timestamp, storage) } -func (rule AlertingRule) Eval(timestamp time.Time) (vector ast.Vector, err error) { +func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) { // Get the raw value of the rule expression. - exprResult, err := rule.EvalRaw(timestamp) + exprResult, err := rule.EvalRaw(timestamp, storage) if err != nil { return } diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 1ba59cc2e..ad148b97a 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage/metric" "log" "math" "sort" @@ -258,8 +259,8 @@ func labelsToKey(labels model.Metric) string { return strings.Join(keyParts, ",") // TODO not safe when label value contains comma. } -func EvalVectorInstant(node VectorNode, timestamp time.Time) (vector Vector, err error) { - viewAdapter, err := viewAdapterForInstantQuery(node, timestamp) +func EvalVectorInstant(node VectorNode, timestamp time.Time, storage *metric.TieredStorage) (vector Vector, err error) { + viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage) if err != nil { return } @@ -267,12 +268,12 @@ func EvalVectorInstant(node VectorNode, timestamp time.Time) (vector Vector, err return } -func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration) (matrix Matrix, err error) { +func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (matrix Matrix, err error) { // Explicitly initialize to an empty matrix since a nil Matrix encodes to // null in JSON. matrix = Matrix{} - viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval) + viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval, storage) if err != nil { return } diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index 64a6e60cd..4a59833a6 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -22,12 +22,6 @@ import ( var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default staleness delta allowance in seconds during expression evaluations.") -// AST-global storage to use for operations that are not supported by views -// (i.e. metric->fingerprint lookups). -// -// BUG(julius): Wrap this into non-global state. -var queryStorage *metric.TieredStorage - // Describes the lenience limits to apply to values from the materialized view. type StalenessPolicy struct { // Describes the inclusive limit at which individual points if requested will @@ -36,8 +30,15 @@ type StalenessPolicy struct { } type viewAdapter struct { - view metric.View + // Policy that dictates when sample values around an evaluation time are to + // be interpreted as stale. stalenessPolicy StalenessPolicy + // AST-global storage to use for operations that are not supported by views + // (i.e. fingerprint->metric lookups). + storage *metric.TieredStorage + // The materialized view which contains all timeseries data required for + // executing a query. + view metric.View } // interpolateSamples interpolates a value at a target time between two @@ -109,7 +110,7 @@ func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp for _, fingerprint := range fingerprints { sampleCandidates := v.view.GetValueAtTime(fingerprint, timestamp) samplePair := v.chooseClosestSample(sampleCandidates, timestamp) - m, err := queryStorage.GetMetricForFingerprint(fingerprint) + m, err := v.storage.GetMetricForFingerprint(fingerprint) if err != nil { continue } @@ -133,7 +134,7 @@ func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interva } // TODO: memoize/cache this. - m, err := queryStorage.GetMetricForFingerprint(fingerprint) + m, err := v.storage.GetMetricForFingerprint(fingerprint) if err != nil { continue } @@ -155,7 +156,7 @@ func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval * } // TODO: memoize/cache this. - m, err := queryStorage.GetMetricForFingerprint(fingerprint) + m, err := v.storage.GetMetricForFingerprint(fingerprint) if err != nil { continue } @@ -169,17 +170,14 @@ func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval * return sampleSets, nil } -func SetStorage(storage metric.TieredStorage) { - queryStorage = &storage -} - -func NewViewAdapter(view metric.View) *viewAdapter { +func NewViewAdapter(view metric.View, storage *metric.TieredStorage) *viewAdapter { stalenessPolicy := StalenessPolicy{ DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second, } return &viewAdapter{ - view: view, stalenessPolicy: stalenessPolicy, + storage: storage, + view: view, } } diff --git a/rules/ast/printer.go b/rules/ast/printer.go index de19df551..7f322e2d2 100644 --- a/rules/ast/printer.go +++ b/rules/ast/printer.go @@ -17,6 +17,7 @@ import ( "encoding/json" "fmt" "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/utility" "sort" "strings" @@ -151,8 +152,8 @@ func TypedValueToJSON(data interface{}, typeStr string) string { return string(dataJSON) } -func EvalToString(node Node, timestamp time.Time, format OutputFormat) string { - viewAdapter, err := viewAdapterForInstantQuery(node, timestamp) +func EvalToString(node Node, timestamp time.Time, format OutputFormat, storage *metric.TieredStorage) string { + viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage) if err != nil { panic(err) } diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go index 2c6ec92f8..549ea9790 100644 --- a/rules/ast/query_analyzer.go +++ b/rules/ast/query_analyzer.go @@ -35,33 +35,23 @@ type QueryAnalyzer struct { FullRanges FullRangeMap // Interval ranges always implicitly span the whole query interval. IntervalRanges IntervalRangeMap + // The underlying storage to which the query will be applied. Needed for + // extracting timeseries fingerprint information during query analysis. + storage *metric.TieredStorage } -func NewQueryAnalyzer() *QueryAnalyzer { +func NewQueryAnalyzer(storage *metric.TieredStorage) *QueryAnalyzer { return &QueryAnalyzer{ FullRanges: FullRangeMap{}, IntervalRanges: IntervalRangeMap{}, + storage: storage, } } -func minTime(t1, t2 time.Time) time.Time { - if t1.Before(t2) { - return t1 - } - return t2 -} - -func maxTime(t1, t2 time.Time) time.Time { - if t1.After(t2) { - return t1 - } - return t2 -} - func (analyzer *QueryAnalyzer) Visit(node Node) { switch n := node.(type) { case *VectorLiteral: - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(n.labels) + fingerprints, err := analyzer.storage.GetFingerprintsForLabelSet(n.labels) if err != nil { log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) return @@ -73,7 +63,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) { } } case *MatrixLiteral: - fingerprints, err := queryStorage.GetFingerprintsForLabelSet(n.labels) + fingerprints, err := analyzer.storage.GetFingerprintsForLabelSet(n.labels) if err != nil { log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) return @@ -105,8 +95,8 @@ func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) { } } -func viewAdapterForInstantQuery(node Node, timestamp time.Time) (viewAdapter *viewAdapter, err error) { - analyzer := NewQueryAnalyzer() +func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage) (viewAdapter *viewAdapter, err error) { + analyzer := NewQueryAnalyzer(storage) analyzer.AnalyzeQueries(node) viewBuilder := metric.NewViewRequestBuilder() for fingerprint, rangeDuration := range analyzer.FullRanges { @@ -115,15 +105,15 @@ func viewAdapterForInstantQuery(node Node, timestamp time.Time) (viewAdapter *vi for fingerprint := range analyzer.IntervalRanges { viewBuilder.GetMetricAtTime(fingerprint, timestamp) } - view, err := queryStorage.MakeView(viewBuilder, time.Duration(60)*time.Second) + view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second) if err == nil { - viewAdapter = NewViewAdapter(view) + viewAdapter = NewViewAdapter(view, storage) } return } -func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration) (viewAdapter *viewAdapter, err error) { - analyzer := NewQueryAnalyzer() +func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (viewAdapter *viewAdapter, err error) { + analyzer := NewQueryAnalyzer(storage) analyzer.AnalyzeQueries(node) viewBuilder := metric.NewViewRequestBuilder() for fingerprint, rangeDuration := range analyzer.FullRanges { @@ -135,9 +125,9 @@ func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interva for fingerprint := range analyzer.IntervalRanges { viewBuilder.GetMetricAtInterval(fingerprint, start, end, interval) } - view, err := queryStorage.MakeView(viewBuilder, time.Duration(60)*time.Second) + view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second) if err == nil { - viewAdapter = NewViewAdapter(view) + viewAdapter = NewViewAdapter(view, storage) } return } diff --git a/rules/manager.go b/rules/manager.go index e74533fab..82618c4b0 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -16,6 +16,7 @@ package rules import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage/metric" "log" "sync" "time" @@ -35,14 +36,16 @@ type ruleManager struct { results chan *Result done chan bool interval time.Duration + storage *metric.TieredStorage } -func NewRuleManager(results chan *Result, interval time.Duration) RuleManager { +func NewRuleManager(results chan *Result, interval time.Duration, storage *metric.TieredStorage) RuleManager { manager := &ruleManager{ results: results, rules: []Rule{}, done: make(chan bool), interval: interval, + storage: storage, } go manager.run(results) return manager @@ -72,7 +75,7 @@ func (m *ruleManager) runIteration(results chan *Result) { for _, rule := range m.rules { wg.Add(1) go func(rule Rule) { - vector, err := rule.Eval(now) + vector, err := rule.Eval(now, m.storage) samples := model.Samples{} for _, sample := range vector { samples = append(samples, sample) diff --git a/rules/recording.go b/rules/recording.go index 31eb0898d..62984332e 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -17,6 +17,7 @@ import ( "fmt" "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/rules/ast" + "github.com/prometheus/prometheus/storage/metric" "time" ) @@ -30,13 +31,13 @@ type RecordingRule struct { func (rule RecordingRule) Name() string { return rule.name } -func (rule RecordingRule) EvalRaw(timestamp time.Time) (vector ast.Vector, err error) { - return ast.EvalVectorInstant(rule.vector, timestamp) +func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) { + return ast.EvalVectorInstant(rule.vector, timestamp, storage) } -func (rule RecordingRule) Eval(timestamp time.Time) (vector ast.Vector, err error) { +func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) { // Get the raw value of the rule expression. - vector, err = rule.EvalRaw(timestamp) + vector, err = rule.EvalRaw(timestamp, storage) if err != nil { return } diff --git a/rules/rules.go b/rules/rules.go index a9a1917d3..553fb30ac 100644 --- a/rules/rules.go +++ b/rules/rules.go @@ -15,6 +15,7 @@ package rules import ( "github.com/prometheus/prometheus/rules/ast" + "github.com/prometheus/prometheus/storage/metric" "time" ) @@ -25,9 +26,9 @@ type Rule interface { Name() string // EvalRaw evaluates the rule's vector expression without triggering any // other actions, like recording or alerting. - EvalRaw(timestamp time.Time) (vector ast.Vector, err error) + EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) // Eval evaluates the rule, including any associated recording or alerting actions. - Eval(timestamp time.Time) (vector ast.Vector, err error) + Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) // ToDotGraph returns a Graphviz dot graph of the rule. ToDotGraph() string } diff --git a/rules/rules_test.go b/rules/rules_test.go index cdb1560dc..331a4c5ec 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -54,7 +54,6 @@ func newTestStorage(t test.Tester) (storage *metric.TieredStorage, closer test.C if storage == nil { t.Fatal("storage == nil") } - ast.SetStorage(*storage) storeMatrix(*storage, testMatrix) return } @@ -308,7 +307,7 @@ func ExpressionTests(t *testing.T) { t.Errorf("%d. Test should fail, but didn't", i) } failed := false - resultStr := ast.EvalToString(testExpr, testEvalTime, ast.TEXT) + resultStr := ast.EvalToString(testExpr, testEvalTime, ast.TEXT, tieredStorage) resultLines := strings.Split(resultStr, "\n") if len(exprTest.output) != len(resultLines) { @@ -338,7 +337,7 @@ func ExpressionTests(t *testing.T) { } } - analyzer := ast.NewQueryAnalyzer() + analyzer := ast.NewQueryAnalyzer(tieredStorage) analyzer.AnalyzeQueries(testExpr) if exprTest.fullRanges != len(analyzer.FullRanges) { t.Errorf("%d. Count of full ranges didn't match: %v vs %v", i, exprTest.fullRanges, len(analyzer.FullRanges)) @@ -464,7 +463,7 @@ func TestAlertingRule(t *testing.T) { for i, expected := range evalOutputs { evalTime := testStartTime.Add(testSampleInterval * time.Duration(i)) - actual, err := rule.Eval(evalTime) + actual, err := rule.Eval(evalTime, tieredStorage) if err != nil { t.Fatalf("Error during alerting rule evaluation: %s", err) } diff --git a/rules/testdata.go b/rules/testdata.go index 6a4f4780b..c1c43719b 100644 --- a/rules/testdata.go +++ b/rules/testdata.go @@ -143,7 +143,7 @@ var testMatrix = ast.Matrix{ { Metric: model.Metric{ model.MetricNameLabel: "x", - "y": "testvalue", + "y": "testvalue", }, Values: getTestValueStream(0, 100, 10), }, diff --git a/web/api/query.go b/web/api/query.go index 91bb503b7..9c8f40fda 100644 --- a/web/api/query.go +++ b/web/api/query.go @@ -52,7 +52,7 @@ func (serv MetricsService) Query(expr string, formatJson string) (result string) rb.SetContentType(gorest.Text_Plain) } - return ast.EvalToString(exprNode, timestamp, format) + return ast.EvalToString(exprNode, timestamp, format, serv.Storage) } func (serv MetricsService) QueryRange(expr string, end int64, duration int64, step int64) string { @@ -86,7 +86,8 @@ func (serv MetricsService) QueryRange(expr string, end int64, duration int64, st exprNode.(ast.VectorNode), time.Unix(end-duration, 0).UTC(), time.Unix(end, 0).UTC(), - time.Duration(step)*time.Second) + time.Duration(step)*time.Second, + serv.Storage) if err != nil { return ast.ErrorToJSON(err) }