From 7ccd4b39b89e936bf0f9c47811e3fe2b20d1fe3b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 9 Jan 2018 17:44:23 +0100 Subject: [PATCH] *: implement query params This adds a parameter to the storage selection interface which allows query engine(s) to pass information about the operations surrounding a data selection. This can for example be used by remote storage backends to infer the correct downsampling aggregates that need to be provided. --- cmd/prometheus/main.go | 23 +++---- promql/ast.go | 54 +++++++++-------- promql/engine.go | 116 +++++++++++++++++++----------------- promql/engine_test.go | 21 ++++--- promql/functions_test.go | 5 +- promql/test.go | 9 ++- rules/manager.go | 4 +- rules/manager_test.go | 8 +-- rules/recording_test.go | 4 +- storage/fanout.go | 4 +- storage/interface.go | 8 ++- storage/noop.go | 2 +- storage/remote/read.go | 10 ++-- storage/remote/read_test.go | 6 +- storage/tsdb/tsdb.go | 2 +- web/api/v1/api.go | 12 ++-- web/federate.go | 2 +- web/web.go | 4 +- 18 files changed, 156 insertions(+), 138 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 2ad8cf2ea..a611fe323 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -88,12 +88,12 @@ func main() { localStoragePath string notifier notifier.Options notifierTimeout model.Duration - queryEngine promql.EngineOptions web web.Options tsdb tsdb.Options lookbackDelta model.Duration webTimeout model.Duration queryTimeout model.Duration + queryConcurrency int prometheusURL string @@ -102,9 +102,6 @@ func main() { notifier: notifier.Options{ Registerer: prometheus.DefaultRegisterer, }, - queryEngine: promql.EngineOptions{ - Metrics: prometheus.DefaultRegisterer, - }, } a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server") @@ -178,7 +175,7 @@ func main() { Default("2m").SetValue(&cfg.queryTimeout) a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently."). - Default("20").IntVar(&cfg.queryEngine.MaxConcurrentQueries) + Default("20").IntVar(&cfg.queryConcurrency) promlogflag.AddFlags(a, &cfg.logLevel) @@ -209,8 +206,6 @@ func main() { promql.LookbackDelta = time.Duration(cfg.lookbackDelta) - cfg.queryEngine.Timeout = time.Duration(cfg.queryTimeout) - logger := promlog.New(cfg.logLevel) // XXX(fabxc): Kubernetes does background logging which we can only customize by modifying @@ -233,7 +228,6 @@ func main() { fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) - cfg.queryEngine.Logger = log.With(logger, "component", "query engine") var ( ctxWeb, cancelWeb = context.WithCancel(context.Background()) ctxRule = context.Background() @@ -247,10 +241,17 @@ func main() { discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify")) scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage) - queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) - ruleManager = rules.NewManager(&rules.ManagerOptions{ + + queryEngine = promql.NewEngine( + log.With(logger, "component", "query engine"), + prometheus.DefaultRegisterer, + cfg.queryConcurrency, + time.Duration(cfg.queryTimeout), + ) + + ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, - QueryFunc: rules.EngineQueryFunc(queryEngine), + QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), Context: ctxRule, ExternalURL: cfg.web.ExternalURL, diff --git a/promql/ast.go b/promql/ast.go index 8871982ed..59c568058 100644 --- a/promql/ast.go +++ b/promql/ast.go @@ -238,56 +238,58 @@ type VectorMatching struct { } // Visitor allows visiting a Node and its child nodes. The Visit method is -// invoked for each node encountered by Walk. If the result visitor w is not -// nil, Walk visits each of the children of node with the visitor w, followed -// by a call of w.Visit(nil). +// invoked for each node with the path leading to the node provided additionally. +// If the result visitor w is not nil, Walk visits each of the children +// of node with the visitor w, followed by a call of w.Visit(nil, nil). type Visitor interface { - Visit(node Node) (w Visitor) + Visit(node Node, path []Node) (w Visitor) } // Walk traverses an AST in depth-first order: It starts by calling -// v.Visit(node); node must not be nil. If the visitor w returned by -// v.Visit(node) is not nil, Walk is invoked recursively with visitor +// v.Visit(node, path); node must not be nil. If the visitor w returned by +// v.Visit(node, path) is not nil, Walk is invoked recursively with visitor // w for each of the non-nil children of node, followed by a call of // w.Visit(nil). -func Walk(v Visitor, node Node) { - if v = v.Visit(node); v == nil { +// As the tree is descended the path of previous nodes is provided. +func Walk(v Visitor, node Node, path []Node) { + if v = v.Visit(node, path); v == nil { return } + path = append(path, node) switch n := node.(type) { case Statements: for _, s := range n { - Walk(v, s) + Walk(v, s, path) } case *AlertStmt: - Walk(v, n.Expr) + Walk(v, n.Expr, path) case *EvalStmt: - Walk(v, n.Expr) + Walk(v, n.Expr, path) case *RecordStmt: - Walk(v, n.Expr) + Walk(v, n.Expr, path) case Expressions: for _, e := range n { - Walk(v, e) + Walk(v, e, path) } case *AggregateExpr: - Walk(v, n.Expr) + Walk(v, n.Expr, path) case *BinaryExpr: - Walk(v, n.LHS) - Walk(v, n.RHS) + Walk(v, n.LHS, path) + Walk(v, n.RHS, path) case *Call: - Walk(v, n.Args) + Walk(v, n.Args, path) case *ParenExpr: - Walk(v, n.Expr) + Walk(v, n.Expr, path) case *UnaryExpr: - Walk(v, n.Expr) + Walk(v, n.Expr, path) case *MatrixSelector, *NumberLiteral, *StringLiteral, *VectorSelector: // nothing to do @@ -296,21 +298,21 @@ func Walk(v Visitor, node Node) { panic(fmt.Errorf("promql.Walk: unhandled node type %T", node)) } - v.Visit(nil) + v.Visit(nil, nil) } -type inspector func(Node) bool +type inspector func(Node, []Node) bool -func (f inspector) Visit(node Node) Visitor { - if f(node) { +func (f inspector) Visit(node Node, path []Node) Visitor { + if f(node, path) { return f } return nil } // Inspect traverses an AST in depth-first order: It starts by calling -// f(node); node must not be nil. If f returns true, Inspect invokes f +// f(node, path); node must not be nil. If f returns true, Inspect invokes f // for all the non-nil children of node, recursively. -func Inspect(node Node, f func(Node) bool) { - Walk(inspector(f), node) +func Inspect(node Node, f func(Node, []Node) bool) { + Walk(inspector(f), node, nil) } diff --git a/promql/engine.go b/promql/engine.go index a4503f45e..397c2688a 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -89,6 +89,8 @@ type Query interface { // query implements the Query interface. type query struct { + // Underlying data provider. + queryable storage.Queryable // The original query string. q string // Statement of the parsed query. @@ -150,26 +152,18 @@ func contextDone(ctx context.Context, env string) error { // Engine handles the lifetime of queries from beginning to end. // It is connected to a querier. type Engine struct { - // A Querier constructor against an underlying storage. - queryable Queryable - metrics *engineMetrics - // The gate limiting the maximum number of concurrent and waiting queries. + logger log.Logger + metrics *engineMetrics + timeout time.Duration gate *queryGate - options *EngineOptions - - logger log.Logger -} - -// Queryable allows opening a storage querier. -type Queryable interface { - Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) } // NewEngine returns a new engine. -func NewEngine(queryable Queryable, o *EngineOptions) *Engine { - if o == nil { - o = DefaultEngineOptions +func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int, timeout time.Duration) *Engine { + if logger == nil { + logger = log.NewNopLogger() } + metrics := &engineMetrics{ currentQueries: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -212,10 +206,10 @@ func NewEngine(queryable Queryable, o *EngineOptions) *Engine { ConstLabels: prometheus.Labels{"slice": "result_sort"}, }), } - metrics.maxConcurrentQueries.Set(float64(o.MaxConcurrentQueries)) + metrics.maxConcurrentQueries.Set(float64(maxConcurrent)) - if o.Metrics != nil { - o.Metrics.MustRegister( + if reg != nil { + reg.MustRegister( metrics.currentQueries, metrics.maxConcurrentQueries, metrics.queryInnerEval, @@ -225,36 +219,20 @@ func NewEngine(queryable Queryable, o *EngineOptions) *Engine { ) } return &Engine{ - queryable: queryable, - gate: newQueryGate(o.MaxConcurrentQueries), - options: o, - logger: o.Logger, - metrics: metrics, + gate: newQueryGate(maxConcurrent), + timeout: timeout, + logger: logger, + metrics: metrics, } } -// EngineOptions contains configuration parameters for an Engine. -type EngineOptions struct { - MaxConcurrentQueries int - Timeout time.Duration - Logger log.Logger - Metrics prometheus.Registerer -} - -// DefaultEngineOptions are the default engine options. -var DefaultEngineOptions = &EngineOptions{ - MaxConcurrentQueries: 20, - Timeout: 2 * time.Minute, - Logger: log.NewNopLogger(), -} - // NewInstantQuery returns an evaluation query for the given expression at the given time. -func (ng *Engine) NewInstantQuery(qs string, ts time.Time) (Query, error) { +func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (Query, error) { expr, err := ParseExpr(qs) if err != nil { return nil, err } - qry := ng.newQuery(expr, ts, ts, 0) + qry := ng.newQuery(q, expr, ts, ts, 0) qry.q = qs return qry, nil @@ -262,7 +240,7 @@ func (ng *Engine) NewInstantQuery(qs string, ts time.Time) (Query, error) { // NewRangeQuery returns an evaluation query for the given time range and with // the resolution set by the interval. -func (ng *Engine) NewRangeQuery(qs string, start, end time.Time, interval time.Duration) (Query, error) { +func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time.Time, interval time.Duration) (Query, error) { expr, err := ParseExpr(qs) if err != nil { return nil, err @@ -270,13 +248,13 @@ func (ng *Engine) NewRangeQuery(qs string, start, end time.Time, interval time.D if expr.Type() != ValueTypeVector && expr.Type() != ValueTypeScalar { return nil, fmt.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", documentedType(expr.Type())) } - qry := ng.newQuery(expr, start, end, interval) + qry := ng.newQuery(q, expr, start, end, interval) qry.q = qs return qry, nil } -func (ng *Engine) newQuery(expr Expr, start, end time.Time, interval time.Duration) *query { +func (ng *Engine) newQuery(q storage.Queryable, expr Expr, start, end time.Time, interval time.Duration) *query { es := &EvalStmt{ Expr: expr, Start: start, @@ -284,9 +262,10 @@ func (ng *Engine) newQuery(expr Expr, start, end time.Time, interval time.Durati Interval: interval, } qry := &query{ - stmt: es, - ng: ng, - stats: stats.NewTimerGroup(), + stmt: es, + ng: ng, + stats: stats.NewTimerGroup(), + queryable: q, } return qry } @@ -316,7 +295,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { ng.metrics.currentQueries.Inc() defer ng.metrics.currentQueries.Dec() - ctx, cancel := context.WithTimeout(ctx, ng.options.Timeout) + ctx, cancel := context.WithTimeout(ctx, ng.timeout) q.cancel = cancel execTimer := q.stats.GetTimer(stats.ExecTotalTime).Start() @@ -363,9 +342,8 @@ func durationMilliseconds(d time.Duration) int64 { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error) { - prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() - querier, err := ng.populateIterators(ctx, s) + querier, err := ng.populateIterators(ctx, query.queryable, s) prepareTimer.Stop() ng.metrics.queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds()) @@ -489,10 +467,9 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return mat, nil } -func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Querier, error) { +func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) { var maxOffset time.Duration - - Inspect(s.Expr, func(node Node) bool { + Inspect(s.Expr, func(node Node, _ []Node) bool { switch n := node.(type) { case *VectorSelector: if maxOffset < LookbackDelta { @@ -514,15 +491,21 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q mint := s.Start.Add(-maxOffset) - querier, err := ng.queryable.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End)) + querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End)) if err != nil { return nil, err } - Inspect(s.Expr, func(node Node) bool { + Inspect(s.Expr, func(node Node, path []Node) bool { + params := &storage.SelectParams{ + Step: int64(s.Interval / time.Millisecond), + } + switch n := node.(type) { case *VectorSelector: - set, err := querier.Select(n.LabelMatchers...) + params.Func = extractFuncFromPath(path) + + set, err := querier.Select(params, n.LabelMatchers...) if err != nil { level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) return false @@ -539,7 +522,9 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q } case *MatrixSelector: - set, err := querier.Select(n.LabelMatchers...) + params.Func = extractFuncFromPath(path) + + set, err := querier.Select(params, n.LabelMatchers...) if err != nil { level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) return false @@ -559,6 +544,25 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q return querier, err } +// extractFuncFromPath walks up the path and searches for the first instance of +// a function or aggregation. +func extractFuncFromPath(p []Node) string { + if len(p) == 0 { + return "" + } + switch n := p[len(p)-1].(type) { + case *AggregateExpr: + return n.Op.String() + case *Call: + return n.Func.Name + case *BinaryExpr: + // If we hit a binary expression we terminate since we only care about functions + // or aggregations over a single metric. + return "" + } + return extractFuncFromPath(p[:len(p)-1]) +} + func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) { for it.Next() { res = append(res, it.At()) diff --git a/promql/engine_test.go b/promql/engine_test.go index c82cc13e5..eeebe9f5a 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -25,7 +25,9 @@ import ( ) func TestQueryConcurrency(t *testing.T) { - engine := NewEngine(nil, nil) + concurrentQueries := 10 + + engine := NewEngine(nil, nil, concurrentQueries, 10*time.Second) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -38,7 +40,7 @@ func TestQueryConcurrency(t *testing.T) { return nil } - for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ { + for i := 0; i < concurrentQueries; i++ { q := engine.newTestQuery(f) go q.Exec(ctx) select { @@ -70,16 +72,13 @@ func TestQueryConcurrency(t *testing.T) { } // Terminate remaining queries. - for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ { + for i := 0; i < concurrentQueries; i++ { block <- struct{}{} } } func TestQueryTimeout(t *testing.T) { - engine := NewEngine(nil, &EngineOptions{ - Timeout: 5 * time.Millisecond, - MaxConcurrentQueries: 20, - }) + engine := NewEngine(nil, nil, 20, 5*time.Millisecond) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -98,7 +97,7 @@ func TestQueryTimeout(t *testing.T) { } func TestQueryCancel(t *testing.T) { - engine := NewEngine(nil, nil) + engine := NewEngine(nil, nil, 10, 10*time.Second) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -144,7 +143,7 @@ func TestQueryCancel(t *testing.T) { } func TestEngineShutdown(t *testing.T) { - engine := NewEngine(nil, nil) + engine := NewEngine(nil, nil, 10, 10*time.Second) ctx, cancelCtx := context.WithCancel(context.Background()) block := make(chan struct{}) @@ -276,9 +275,9 @@ load 10s var err error var qry Query if c.Interval == 0 { - qry, err = test.QueryEngine().NewInstantQuery(c.Query, c.Start) + qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), c.Query, c.Start) } else { - qry, err = test.QueryEngine().NewRangeQuery(c.Query, c.Start, c.End, c.Interval) + qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval) } if err != nil { t.Fatalf("unexpected error creating query: %q", err) diff --git a/promql/functions_test.go b/promql/functions_test.go index e2f3f9c01..b9197857d 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -16,6 +16,7 @@ package promql import ( "context" "testing" + "time" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" @@ -85,7 +86,7 @@ func TestDeriv(t *testing.T) { // so we test it by hand. storage := testutil.NewStorage(t) defer storage.Close() - engine := NewEngine(storage, nil) + engine := NewEngine(nil, nil, 10, 10*time.Second) a, err := storage.Appender() if err != nil { @@ -100,7 +101,7 @@ func TestDeriv(t *testing.T) { t.Fatal(err) } - query, err := engine.NewInstantQuery("deriv(foo[30m])", timestamp.Time(1493712846939)) + query, err := engine.NewInstantQuery(storage, "deriv(foo[30m])", timestamp.Time(1493712846939)) if err != nil { t.Fatalf("Error parsing query: %s", err) } diff --git a/promql/test.go b/promql/test.go index b600b00f4..7a2454438 100644 --- a/promql/test.go +++ b/promql/test.go @@ -83,6 +83,11 @@ func (t *Test) QueryEngine() *Engine { return t.queryEngine } +// Queryable allows querying the test data. +func (t *Test) Queryable() storage.Queryable { + return t.storage +} + // Context returns the test's context. func (t *Test) Context() context.Context { return t.context @@ -460,7 +465,7 @@ func (t *Test) exec(tc testCommand) error { } case *evalCmd: - q := t.queryEngine.newQuery(cmd.expr, cmd.start, cmd.end, cmd.interval) + q := t.queryEngine.newQuery(t.storage, cmd.expr, cmd.start, cmd.end, cmd.interval) res := q.Exec(t.context) if res.Err != nil { if cmd.fail { @@ -495,7 +500,7 @@ func (t *Test) clear() { } t.storage = testutil.NewStorage(t) - t.queryEngine = NewEngine(t.storage, nil) + t.queryEngine = NewEngine(nil, nil, 20, 10*time.Second) t.context, t.cancelCtx = context.WithCancel(context.Background()) } diff --git a/rules/manager.go b/rules/manager.go index 4328e5c15..211d6580d 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -106,9 +106,9 @@ type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, // EngineQueryFunc returns a new query function that executes instant queries against // the given engine. // It converts scaler into vector results. -func EngineQueryFunc(engine *promql.Engine) QueryFunc { +func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { - q, err := engine.NewInstantQuery(qs, t) + q, err := engine.NewInstantQuery(q, qs, t) if err != nil { return nil, err } diff --git a/rules/manager_test.go b/rules/manager_test.go index 859118797..8202b6ba5 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -144,7 +144,7 @@ func TestAlertingRule(t *testing.T) { evalTime := baseTime.Add(test.time) - res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine()), nil) + res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil) testutil.Ok(t, err) for i := range test.result { @@ -174,9 +174,9 @@ func annotateWithTime(lines []string, ts time.Time) []string { func TestStaleness(t *testing.T) { storage := testutil.NewStorage(t) defer storage.Close() - engine := promql.NewEngine(storage, nil) + engine := promql.NewEngine(nil, nil, 10, 10*time.Second) opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(engine), + QueryFunc: EngineQueryFunc(engine, storage), Appendable: storage, Context: context.Background(), Logger: log.NewNopLogger(), @@ -210,7 +210,7 @@ func TestStaleness(t *testing.T) { matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") testutil.Ok(t, err) - set, err := querier.Select(matcher) + set, err := querier.Select(nil, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) diff --git a/rules/recording_test.go b/rules/recording_test.go index cc997d9b2..484474140 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -28,7 +28,7 @@ func TestRuleEval(t *testing.T) { storage := testutil.NewStorage(t) defer storage.Close() - engine := promql.NewEngine(storage, nil) + engine := promql.NewEngine(nil, nil, 10, 10*time.Second) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -62,7 +62,7 @@ func TestRuleEval(t *testing.T) { for _, test := range suite { rule := NewRecordingRule(test.name, test.expr, test.labels) - result, err := rule.Eval(ctx, now, EngineQueryFunc(engine), nil) + result, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil) testutil.Ok(t, err) testutil.Equals(t, result, test.result) } diff --git a/storage/fanout.go b/storage/fanout.go index 40be2536d..7835d1ef7 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -216,10 +216,10 @@ func NewMergeQuerier(queriers []Querier) Querier { } // Select returns a set of series that matches the given label matchers. -func (q *mergeQuerier) Select(matchers ...*labels.Matcher) (SeriesSet, error) { +func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, error) { seriesSets := make([]SeriesSet, 0, len(q.queriers)) for _, querier := range q.queriers { - set, err := querier.Select(matchers...) + set, err := querier.Select(params, matchers...) if err != nil { return nil, err } diff --git a/storage/interface.go b/storage/interface.go index e04126cc9..9c145c425 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -52,7 +52,7 @@ type Queryable interface { // Querier provides reading access to time series data. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(...*labels.Matcher) (SeriesSet, error) + Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error) // LabelValues returns all potential values for a label name. LabelValues(name string) ([]string, error) @@ -61,6 +61,12 @@ type Querier interface { Close() error } +// SelectParams specifies parameters passed to data selections. +type SelectParams struct { + Step int64 // Query step size in milliseconds. + Func string // String representation of surrounding function or aggregation. +} + // QueryableFunc is an adapter to allow the use of ordinary functions as // Queryables. It follows the idea of http.HandlerFunc. type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error) diff --git a/storage/noop.go b/storage/noop.go index a5ff1bc9b..fa024430f 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -22,7 +22,7 @@ func NoopQuerier() Querier { return noopQuerier{} } -func (noopQuerier) Select(...*labels.Matcher) (SeriesSet, error) { +func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error) { return NoopSeriesSet(), nil } diff --git a/storage/remote/read.go b/storage/remote/read.go index e49d3524a..1fedad884 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -43,7 +43,7 @@ type querier struct { // Select implements storage.Querier and uses the given matchers to read series // sets from the Client. -func (q *querier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) { +func (q *querier) Select(_ *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { query, err := ToQuery(q.mint, q.maxt, matchers) if err != nil { return nil, err @@ -91,9 +91,9 @@ type externalLabelsQuerier struct { // Select adds equality matchers for all external labels to the list of matchers // before calling the wrapped storage.Queryable. The added external labels are // removed from the returned series sets. -func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) { +func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { m, added := q.addExternalLabels(matchers) - s, err := q.Querier.Select(m...) + s, err := q.Querier.Select(p, m...) if err != nil { return nil, err } @@ -144,7 +144,7 @@ type requiredMatchersQuerier struct { // Select returns a NoopSeriesSet if the given matchers don't match the label // set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. -func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) { +func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) { ms := q.requiredMatchers for _, m := range matchers { for i, r := range ms { @@ -160,7 +160,7 @@ func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) (storage.Se if len(ms) > 0 { return storage.NoopSeriesSet(), nil } - return q.Querier.Select(matchers...) + return q.Querier.Select(p, matchers...) } // addExternalLabels adds matchers for each external label. External labels diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index f61ab3742..537f605a8 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -42,7 +42,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) { externalLabels: model.LabelSet{"region": "europe"}, } want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) - have, err := q.Select(matchers...) + have, err := q.Select(nil, matchers...) if err != nil { t.Error(err) } @@ -157,7 +157,7 @@ type mockSeriesSet struct { storage.SeriesSet } -func (mockQuerier) Select(...*labels.Matcher) (storage.SeriesSet, error) { +func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error) { return mockSeriesSet{}, nil } @@ -313,7 +313,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) { requiredMatchers: test.requiredMatchers, } - have, err := q.Select(test.matchers...) + have, err := q.Select(nil, test.matchers...) if err != nil { t.Error(err) } diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 070488fbf..ab4603ea3 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -188,7 +188,7 @@ type querier struct { q tsdb.Querier } -func (q querier) Select(oms ...*labels.Matcher) (storage.SeriesSet, error) { +func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error) { ms := make([]tsdbLabels.Matcher, 0, len(oms)) for _, om := range oms { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 523316c90..c0cb8c6a7 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -108,7 +108,7 @@ type apiFunc func(r *http.Request) (interface{}, *apiError) // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. type API struct { - Queryable promql.Queryable + Queryable storage.Queryable QueryEngine *promql.Engine targetRetriever targetRetriever @@ -125,7 +125,7 @@ type API struct { // NewAPI returns an initialized API type. func NewAPI( qe *promql.Engine, - q promql.Queryable, + q storage.Queryable, tr targetRetriever, ar alertmanagerRetriever, configFunc func() config.Config, @@ -222,7 +222,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { defer cancel() } - qry, err := api.QueryEngine.NewInstantQuery(r.FormValue("query"), ts) + qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts) if err != nil { return nil, &apiError{errorBadData, err} } @@ -296,7 +296,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { defer cancel() } - qry, err := api.QueryEngine.NewRangeQuery(r.FormValue("query"), start, end, step) + qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step) if err != nil { return nil, &apiError{errorBadData, err} } @@ -396,7 +396,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { var sets []storage.SeriesSet for _, mset := range matcherSets { - s, err := q.Select(mset...) + s, err := q.Select(nil, mset...) if err != nil { return nil, &apiError{errorExec, err} } @@ -537,7 +537,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { } } - set, err := querier.Select(filteredMatchers...) + set, err := querier.Select(nil, filteredMatchers...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/web/federate.go b/web/federate.go index 8640a886d..e5a176517 100644 --- a/web/federate.go +++ b/web/federate.go @@ -74,7 +74,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { var sets []storage.SeriesSet for _, mset := range matcherSets { - s, err := q.Select(mset...) + s, err := q.Select(nil, mset...) if err != nil { federationErrors.Inc() http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/web/web.go b/web/web.go index e162cc1b3..d5f0cbe5a 100644 --- a/web/web.go +++ b/web/web.go @@ -536,7 +536,7 @@ func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) { "__console_"+name, data, h.now(), - template.QueryFunc(rules.EngineQueryFunc(h.queryEngine)), + template.QueryFunc(rules.EngineQueryFunc(h.queryEngine, h.storage)), h.options.ExternalURL, ) filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib") @@ -766,7 +766,7 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter name, data, h.now(), - template.QueryFunc(rules.EngineQueryFunc(h.queryEngine)), + template.QueryFunc(rules.EngineQueryFunc(h.queryEngine, h.storage)), h.options.ExternalURL, ) tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options))