mirror of
https://github.com/prometheus/prometheus
synced 2024-12-24 23:42:32 +00:00
Merge pull request #3671 from prometheus/queryparams
*: implement query params
This commit is contained in:
commit
309c666426
@ -88,12 +88,12 @@ func main() {
|
||||
localStoragePath string
|
||||
notifier notifier.Options
|
||||
notifierTimeout model.Duration
|
||||
queryEngine promql.EngineOptions
|
||||
web web.Options
|
||||
tsdb tsdb.Options
|
||||
lookbackDelta model.Duration
|
||||
webTimeout model.Duration
|
||||
queryTimeout model.Duration
|
||||
queryConcurrency int
|
||||
|
||||
prometheusURL string
|
||||
|
||||
@ -102,9 +102,6 @@ func main() {
|
||||
notifier: notifier.Options{
|
||||
Registerer: prometheus.DefaultRegisterer,
|
||||
},
|
||||
queryEngine: promql.EngineOptions{
|
||||
Metrics: prometheus.DefaultRegisterer,
|
||||
},
|
||||
}
|
||||
|
||||
a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server")
|
||||
@ -178,7 +175,7 @@ func main() {
|
||||
Default("2m").SetValue(&cfg.queryTimeout)
|
||||
|
||||
a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently.").
|
||||
Default("20").IntVar(&cfg.queryEngine.MaxConcurrentQueries)
|
||||
Default("20").IntVar(&cfg.queryConcurrency)
|
||||
|
||||
promlogflag.AddFlags(a, &cfg.logLevel)
|
||||
|
||||
@ -209,8 +206,6 @@ func main() {
|
||||
|
||||
promql.LookbackDelta = time.Duration(cfg.lookbackDelta)
|
||||
|
||||
cfg.queryEngine.Timeout = time.Duration(cfg.queryTimeout)
|
||||
|
||||
logger := promlog.New(cfg.logLevel)
|
||||
|
||||
// XXX(fabxc): Kubernetes does background logging which we can only customize by modifying
|
||||
@ -233,7 +228,6 @@ func main() {
|
||||
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
|
||||
)
|
||||
|
||||
cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
|
||||
var (
|
||||
ctxWeb, cancelWeb = context.WithCancel(context.Background())
|
||||
ctxRule = context.Background()
|
||||
@ -247,10 +241,17 @@ func main() {
|
||||
discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"))
|
||||
|
||||
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
|
||||
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
|
||||
ruleManager = rules.NewManager(&rules.ManagerOptions{
|
||||
|
||||
queryEngine = promql.NewEngine(
|
||||
log.With(logger, "component", "query engine"),
|
||||
prometheus.DefaultRegisterer,
|
||||
cfg.queryConcurrency,
|
||||
time.Duration(cfg.queryTimeout),
|
||||
)
|
||||
|
||||
ruleManager = rules.NewManager(&rules.ManagerOptions{
|
||||
Appendable: fanoutStorage,
|
||||
QueryFunc: rules.EngineQueryFunc(queryEngine),
|
||||
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
|
||||
NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
|
||||
Context: ctxRule,
|
||||
ExternalURL: cfg.web.ExternalURL,
|
||||
|
@ -238,56 +238,58 @@ type VectorMatching struct {
|
||||
}
|
||||
|
||||
// Visitor allows visiting a Node and its child nodes. The Visit method is
|
||||
// invoked for each node encountered by Walk. If the result visitor w is not
|
||||
// nil, Walk visits each of the children of node with the visitor w, followed
|
||||
// by a call of w.Visit(nil).
|
||||
// invoked for each node with the path leading to the node provided additionally.
|
||||
// If the result visitor w is not nil, Walk visits each of the children
|
||||
// of node with the visitor w, followed by a call of w.Visit(nil, nil).
|
||||
type Visitor interface {
|
||||
Visit(node Node) (w Visitor)
|
||||
Visit(node Node, path []Node) (w Visitor)
|
||||
}
|
||||
|
||||
// Walk traverses an AST in depth-first order: It starts by calling
|
||||
// v.Visit(node); node must not be nil. If the visitor w returned by
|
||||
// v.Visit(node) is not nil, Walk is invoked recursively with visitor
|
||||
// v.Visit(node, path); node must not be nil. If the visitor w returned by
|
||||
// v.Visit(node, path) is not nil, Walk is invoked recursively with visitor
|
||||
// w for each of the non-nil children of node, followed by a call of
|
||||
// w.Visit(nil).
|
||||
func Walk(v Visitor, node Node) {
|
||||
if v = v.Visit(node); v == nil {
|
||||
// As the tree is descended the path of previous nodes is provided.
|
||||
func Walk(v Visitor, node Node, path []Node) {
|
||||
if v = v.Visit(node, path); v == nil {
|
||||
return
|
||||
}
|
||||
path = append(path, node)
|
||||
|
||||
switch n := node.(type) {
|
||||
case Statements:
|
||||
for _, s := range n {
|
||||
Walk(v, s)
|
||||
Walk(v, s, path)
|
||||
}
|
||||
case *AlertStmt:
|
||||
Walk(v, n.Expr)
|
||||
Walk(v, n.Expr, path)
|
||||
|
||||
case *EvalStmt:
|
||||
Walk(v, n.Expr)
|
||||
Walk(v, n.Expr, path)
|
||||
|
||||
case *RecordStmt:
|
||||
Walk(v, n.Expr)
|
||||
Walk(v, n.Expr, path)
|
||||
|
||||
case Expressions:
|
||||
for _, e := range n {
|
||||
Walk(v, e)
|
||||
Walk(v, e, path)
|
||||
}
|
||||
case *AggregateExpr:
|
||||
Walk(v, n.Expr)
|
||||
Walk(v, n.Expr, path)
|
||||
|
||||
case *BinaryExpr:
|
||||
Walk(v, n.LHS)
|
||||
Walk(v, n.RHS)
|
||||
Walk(v, n.LHS, path)
|
||||
Walk(v, n.RHS, path)
|
||||
|
||||
case *Call:
|
||||
Walk(v, n.Args)
|
||||
Walk(v, n.Args, path)
|
||||
|
||||
case *ParenExpr:
|
||||
Walk(v, n.Expr)
|
||||
Walk(v, n.Expr, path)
|
||||
|
||||
case *UnaryExpr:
|
||||
Walk(v, n.Expr)
|
||||
Walk(v, n.Expr, path)
|
||||
|
||||
case *MatrixSelector, *NumberLiteral, *StringLiteral, *VectorSelector:
|
||||
// nothing to do
|
||||
@ -296,21 +298,21 @@ func Walk(v Visitor, node Node) {
|
||||
panic(fmt.Errorf("promql.Walk: unhandled node type %T", node))
|
||||
}
|
||||
|
||||
v.Visit(nil)
|
||||
v.Visit(nil, nil)
|
||||
}
|
||||
|
||||
type inspector func(Node) bool
|
||||
type inspector func(Node, []Node) bool
|
||||
|
||||
func (f inspector) Visit(node Node) Visitor {
|
||||
if f(node) {
|
||||
func (f inspector) Visit(node Node, path []Node) Visitor {
|
||||
if f(node, path) {
|
||||
return f
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Inspect traverses an AST in depth-first order: It starts by calling
|
||||
// f(node); node must not be nil. If f returns true, Inspect invokes f
|
||||
// f(node, path); node must not be nil. If f returns true, Inspect invokes f
|
||||
// for all the non-nil children of node, recursively.
|
||||
func Inspect(node Node, f func(Node) bool) {
|
||||
Walk(inspector(f), node)
|
||||
func Inspect(node Node, f func(Node, []Node) bool) {
|
||||
Walk(inspector(f), node, nil)
|
||||
}
|
||||
|
116
promql/engine.go
116
promql/engine.go
@ -89,6 +89,8 @@ type Query interface {
|
||||
|
||||
// query implements the Query interface.
|
||||
type query struct {
|
||||
// Underlying data provider.
|
||||
queryable storage.Queryable
|
||||
// The original query string.
|
||||
q string
|
||||
// Statement of the parsed query.
|
||||
@ -150,26 +152,18 @@ func contextDone(ctx context.Context, env string) error {
|
||||
// Engine handles the lifetime of queries from beginning to end.
|
||||
// It is connected to a querier.
|
||||
type Engine struct {
|
||||
// A Querier constructor against an underlying storage.
|
||||
queryable Queryable
|
||||
metrics *engineMetrics
|
||||
// The gate limiting the maximum number of concurrent and waiting queries.
|
||||
logger log.Logger
|
||||
metrics *engineMetrics
|
||||
timeout time.Duration
|
||||
gate *queryGate
|
||||
options *EngineOptions
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// Queryable allows opening a storage querier.
|
||||
type Queryable interface {
|
||||
Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error)
|
||||
}
|
||||
|
||||
// NewEngine returns a new engine.
|
||||
func NewEngine(queryable Queryable, o *EngineOptions) *Engine {
|
||||
if o == nil {
|
||||
o = DefaultEngineOptions
|
||||
func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int, timeout time.Duration) *Engine {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
|
||||
metrics := &engineMetrics{
|
||||
currentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
@ -212,10 +206,10 @@ func NewEngine(queryable Queryable, o *EngineOptions) *Engine {
|
||||
ConstLabels: prometheus.Labels{"slice": "result_sort"},
|
||||
}),
|
||||
}
|
||||
metrics.maxConcurrentQueries.Set(float64(o.MaxConcurrentQueries))
|
||||
metrics.maxConcurrentQueries.Set(float64(maxConcurrent))
|
||||
|
||||
if o.Metrics != nil {
|
||||
o.Metrics.MustRegister(
|
||||
if reg != nil {
|
||||
reg.MustRegister(
|
||||
metrics.currentQueries,
|
||||
metrics.maxConcurrentQueries,
|
||||
metrics.queryInnerEval,
|
||||
@ -225,36 +219,20 @@ func NewEngine(queryable Queryable, o *EngineOptions) *Engine {
|
||||
)
|
||||
}
|
||||
return &Engine{
|
||||
queryable: queryable,
|
||||
gate: newQueryGate(o.MaxConcurrentQueries),
|
||||
options: o,
|
||||
logger: o.Logger,
|
||||
metrics: metrics,
|
||||
gate: newQueryGate(maxConcurrent),
|
||||
timeout: timeout,
|
||||
logger: logger,
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
// EngineOptions contains configuration parameters for an Engine.
|
||||
type EngineOptions struct {
|
||||
MaxConcurrentQueries int
|
||||
Timeout time.Duration
|
||||
Logger log.Logger
|
||||
Metrics prometheus.Registerer
|
||||
}
|
||||
|
||||
// DefaultEngineOptions are the default engine options.
|
||||
var DefaultEngineOptions = &EngineOptions{
|
||||
MaxConcurrentQueries: 20,
|
||||
Timeout: 2 * time.Minute,
|
||||
Logger: log.NewNopLogger(),
|
||||
}
|
||||
|
||||
// NewInstantQuery returns an evaluation query for the given expression at the given time.
|
||||
func (ng *Engine) NewInstantQuery(qs string, ts time.Time) (Query, error) {
|
||||
func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (Query, error) {
|
||||
expr, err := ParseExpr(qs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
qry := ng.newQuery(expr, ts, ts, 0)
|
||||
qry := ng.newQuery(q, expr, ts, ts, 0)
|
||||
qry.q = qs
|
||||
|
||||
return qry, nil
|
||||
@ -262,7 +240,7 @@ func (ng *Engine) NewInstantQuery(qs string, ts time.Time) (Query, error) {
|
||||
|
||||
// NewRangeQuery returns an evaluation query for the given time range and with
|
||||
// the resolution set by the interval.
|
||||
func (ng *Engine) NewRangeQuery(qs string, start, end time.Time, interval time.Duration) (Query, error) {
|
||||
func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time.Time, interval time.Duration) (Query, error) {
|
||||
expr, err := ParseExpr(qs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -270,13 +248,13 @@ func (ng *Engine) NewRangeQuery(qs string, start, end time.Time, interval time.D
|
||||
if expr.Type() != ValueTypeVector && expr.Type() != ValueTypeScalar {
|
||||
return nil, fmt.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", documentedType(expr.Type()))
|
||||
}
|
||||
qry := ng.newQuery(expr, start, end, interval)
|
||||
qry := ng.newQuery(q, expr, start, end, interval)
|
||||
qry.q = qs
|
||||
|
||||
return qry, nil
|
||||
}
|
||||
|
||||
func (ng *Engine) newQuery(expr Expr, start, end time.Time, interval time.Duration) *query {
|
||||
func (ng *Engine) newQuery(q storage.Queryable, expr Expr, start, end time.Time, interval time.Duration) *query {
|
||||
es := &EvalStmt{
|
||||
Expr: expr,
|
||||
Start: start,
|
||||
@ -284,9 +262,10 @@ func (ng *Engine) newQuery(expr Expr, start, end time.Time, interval time.Durati
|
||||
Interval: interval,
|
||||
}
|
||||
qry := &query{
|
||||
stmt: es,
|
||||
ng: ng,
|
||||
stats: stats.NewTimerGroup(),
|
||||
stmt: es,
|
||||
ng: ng,
|
||||
stats: stats.NewTimerGroup(),
|
||||
queryable: q,
|
||||
}
|
||||
return qry
|
||||
}
|
||||
@ -316,7 +295,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
|
||||
ng.metrics.currentQueries.Inc()
|
||||
defer ng.metrics.currentQueries.Dec()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, ng.options.Timeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
|
||||
q.cancel = cancel
|
||||
|
||||
execTimer := q.stats.GetTimer(stats.ExecTotalTime).Start()
|
||||
@ -363,9 +342,8 @@ func durationMilliseconds(d time.Duration) int64 {
|
||||
|
||||
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
|
||||
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error) {
|
||||
|
||||
prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start()
|
||||
querier, err := ng.populateIterators(ctx, s)
|
||||
querier, err := ng.populateIterators(ctx, query.queryable, s)
|
||||
prepareTimer.Stop()
|
||||
ng.metrics.queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds())
|
||||
|
||||
@ -489,10 +467,9 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
||||
return mat, nil
|
||||
}
|
||||
|
||||
func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Querier, error) {
|
||||
func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) {
|
||||
var maxOffset time.Duration
|
||||
|
||||
Inspect(s.Expr, func(node Node) bool {
|
||||
Inspect(s.Expr, func(node Node, _ []Node) bool {
|
||||
switch n := node.(type) {
|
||||
case *VectorSelector:
|
||||
if maxOffset < LookbackDelta {
|
||||
@ -514,15 +491,21 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
|
||||
|
||||
mint := s.Start.Add(-maxOffset)
|
||||
|
||||
querier, err := ng.queryable.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
|
||||
querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Inspect(s.Expr, func(node Node) bool {
|
||||
Inspect(s.Expr, func(node Node, path []Node) bool {
|
||||
params := &storage.SelectParams{
|
||||
Step: int64(s.Interval / time.Millisecond),
|
||||
}
|
||||
|
||||
switch n := node.(type) {
|
||||
case *VectorSelector:
|
||||
set, err := querier.Select(n.LabelMatchers...)
|
||||
params.Func = extractFuncFromPath(path)
|
||||
|
||||
set, err := querier.Select(params, n.LabelMatchers...)
|
||||
if err != nil {
|
||||
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||
return false
|
||||
@ -539,7 +522,9 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
|
||||
}
|
||||
|
||||
case *MatrixSelector:
|
||||
set, err := querier.Select(n.LabelMatchers...)
|
||||
params.Func = extractFuncFromPath(path)
|
||||
|
||||
set, err := querier.Select(params, n.LabelMatchers...)
|
||||
if err != nil {
|
||||
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||
return false
|
||||
@ -559,6 +544,25 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
|
||||
return querier, err
|
||||
}
|
||||
|
||||
// extractFuncFromPath walks up the path and searches for the first instance of
|
||||
// a function or aggregation.
|
||||
func extractFuncFromPath(p []Node) string {
|
||||
if len(p) == 0 {
|
||||
return ""
|
||||
}
|
||||
switch n := p[len(p)-1].(type) {
|
||||
case *AggregateExpr:
|
||||
return n.Op.String()
|
||||
case *Call:
|
||||
return n.Func.Name
|
||||
case *BinaryExpr:
|
||||
// If we hit a binary expression we terminate since we only care about functions
|
||||
// or aggregations over a single metric.
|
||||
return ""
|
||||
}
|
||||
return extractFuncFromPath(p[:len(p)-1])
|
||||
}
|
||||
|
||||
func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) {
|
||||
for it.Next() {
|
||||
res = append(res, it.At())
|
||||
|
@ -25,7 +25,9 @@ import (
|
||||
)
|
||||
|
||||
func TestQueryConcurrency(t *testing.T) {
|
||||
engine := NewEngine(nil, nil)
|
||||
concurrentQueries := 10
|
||||
|
||||
engine := NewEngine(nil, nil, concurrentQueries, 10*time.Second)
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
defer cancelCtx()
|
||||
|
||||
@ -38,7 +40,7 @@ func TestQueryConcurrency(t *testing.T) {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ {
|
||||
for i := 0; i < concurrentQueries; i++ {
|
||||
q := engine.newTestQuery(f)
|
||||
go q.Exec(ctx)
|
||||
select {
|
||||
@ -70,16 +72,13 @@ func TestQueryConcurrency(t *testing.T) {
|
||||
}
|
||||
|
||||
// Terminate remaining queries.
|
||||
for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ {
|
||||
for i := 0; i < concurrentQueries; i++ {
|
||||
block <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryTimeout(t *testing.T) {
|
||||
engine := NewEngine(nil, &EngineOptions{
|
||||
Timeout: 5 * time.Millisecond,
|
||||
MaxConcurrentQueries: 20,
|
||||
})
|
||||
engine := NewEngine(nil, nil, 20, 5*time.Millisecond)
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
defer cancelCtx()
|
||||
|
||||
@ -98,7 +97,7 @@ func TestQueryTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestQueryCancel(t *testing.T) {
|
||||
engine := NewEngine(nil, nil)
|
||||
engine := NewEngine(nil, nil, 10, 10*time.Second)
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
defer cancelCtx()
|
||||
|
||||
@ -144,7 +143,7 @@ func TestQueryCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEngineShutdown(t *testing.T) {
|
||||
engine := NewEngine(nil, nil)
|
||||
engine := NewEngine(nil, nil, 10, 10*time.Second)
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
|
||||
block := make(chan struct{})
|
||||
@ -276,9 +275,9 @@ load 10s
|
||||
var err error
|
||||
var qry Query
|
||||
if c.Interval == 0 {
|
||||
qry, err = test.QueryEngine().NewInstantQuery(c.Query, c.Start)
|
||||
qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), c.Query, c.Start)
|
||||
} else {
|
||||
qry, err = test.QueryEngine().NewRangeQuery(c.Query, c.Start, c.End, c.Interval)
|
||||
qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating query: %q", err)
|
||||
|
@ -16,6 +16,7 @@ package promql
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
@ -85,7 +86,7 @@ func TestDeriv(t *testing.T) {
|
||||
// so we test it by hand.
|
||||
storage := testutil.NewStorage(t)
|
||||
defer storage.Close()
|
||||
engine := NewEngine(storage, nil)
|
||||
engine := NewEngine(nil, nil, 10, 10*time.Second)
|
||||
|
||||
a, err := storage.Appender()
|
||||
if err != nil {
|
||||
@ -100,7 +101,7 @@ func TestDeriv(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
query, err := engine.NewInstantQuery("deriv(foo[30m])", timestamp.Time(1493712846939))
|
||||
query, err := engine.NewInstantQuery(storage, "deriv(foo[30m])", timestamp.Time(1493712846939))
|
||||
if err != nil {
|
||||
t.Fatalf("Error parsing query: %s", err)
|
||||
}
|
||||
|
@ -83,6 +83,11 @@ func (t *Test) QueryEngine() *Engine {
|
||||
return t.queryEngine
|
||||
}
|
||||
|
||||
// Queryable allows querying the test data.
|
||||
func (t *Test) Queryable() storage.Queryable {
|
||||
return t.storage
|
||||
}
|
||||
|
||||
// Context returns the test's context.
|
||||
func (t *Test) Context() context.Context {
|
||||
return t.context
|
||||
@ -460,7 +465,7 @@ func (t *Test) exec(tc testCommand) error {
|
||||
}
|
||||
|
||||
case *evalCmd:
|
||||
q := t.queryEngine.newQuery(cmd.expr, cmd.start, cmd.end, cmd.interval)
|
||||
q := t.queryEngine.newQuery(t.storage, cmd.expr, cmd.start, cmd.end, cmd.interval)
|
||||
res := q.Exec(t.context)
|
||||
if res.Err != nil {
|
||||
if cmd.fail {
|
||||
@ -495,7 +500,7 @@ func (t *Test) clear() {
|
||||
}
|
||||
t.storage = testutil.NewStorage(t)
|
||||
|
||||
t.queryEngine = NewEngine(t.storage, nil)
|
||||
t.queryEngine = NewEngine(nil, nil, 20, 10*time.Second)
|
||||
t.context, t.cancelCtx = context.WithCancel(context.Background())
|
||||
}
|
||||
|
||||
|
@ -106,9 +106,9 @@ type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector,
|
||||
// EngineQueryFunc returns a new query function that executes instant queries against
|
||||
// the given engine.
|
||||
// It converts scaler into vector results.
|
||||
func EngineQueryFunc(engine *promql.Engine) QueryFunc {
|
||||
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
|
||||
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
|
||||
q, err := engine.NewInstantQuery(qs, t)
|
||||
q, err := engine.NewInstantQuery(q, qs, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -144,7 +144,7 @@ func TestAlertingRule(t *testing.T) {
|
||||
|
||||
evalTime := baseTime.Add(test.time)
|
||||
|
||||
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine()), nil)
|
||||
res, err := rule.Eval(suite.Context(), evalTime, EngineQueryFunc(suite.QueryEngine(), suite.Storage()), nil)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
for i := range test.result {
|
||||
@ -174,9 +174,9 @@ func annotateWithTime(lines []string, ts time.Time) []string {
|
||||
func TestStaleness(t *testing.T) {
|
||||
storage := testutil.NewStorage(t)
|
||||
defer storage.Close()
|
||||
engine := promql.NewEngine(storage, nil)
|
||||
engine := promql.NewEngine(nil, nil, 10, 10*time.Second)
|
||||
opts := &ManagerOptions{
|
||||
QueryFunc: EngineQueryFunc(engine),
|
||||
QueryFunc: EngineQueryFunc(engine, storage),
|
||||
Appendable: storage,
|
||||
Context: context.Background(),
|
||||
Logger: log.NewNopLogger(),
|
||||
@ -210,7 +210,7 @@ func TestStaleness(t *testing.T) {
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
|
||||
testutil.Ok(t, err)
|
||||
|
||||
set, err := querier.Select(matcher)
|
||||
set, err := querier.Select(nil, matcher)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
samples, err := readSeriesSet(set)
|
||||
|
@ -28,7 +28,7 @@ func TestRuleEval(t *testing.T) {
|
||||
storage := testutil.NewStorage(t)
|
||||
defer storage.Close()
|
||||
|
||||
engine := promql.NewEngine(storage, nil)
|
||||
engine := promql.NewEngine(nil, nil, 10, 10*time.Second)
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
defer cancelCtx()
|
||||
|
||||
@ -62,7 +62,7 @@ func TestRuleEval(t *testing.T) {
|
||||
|
||||
for _, test := range suite {
|
||||
rule := NewRecordingRule(test.name, test.expr, test.labels)
|
||||
result, err := rule.Eval(ctx, now, EngineQueryFunc(engine), nil)
|
||||
result, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, result, test.result)
|
||||
}
|
||||
|
@ -216,10 +216,10 @@ func NewMergeQuerier(queriers []Querier) Querier {
|
||||
}
|
||||
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
func (q *mergeQuerier) Select(matchers ...*labels.Matcher) (SeriesSet, error) {
|
||||
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, error) {
|
||||
seriesSets := make([]SeriesSet, 0, len(q.queriers))
|
||||
for _, querier := range q.queriers {
|
||||
set, err := querier.Select(matchers...)
|
||||
set, err := querier.Select(params, matchers...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ type Queryable interface {
|
||||
// Querier provides reading access to time series data.
|
||||
type Querier interface {
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
Select(...*labels.Matcher) (SeriesSet, error)
|
||||
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error)
|
||||
|
||||
// LabelValues returns all potential values for a label name.
|
||||
LabelValues(name string) ([]string, error)
|
||||
@ -61,6 +61,12 @@ type Querier interface {
|
||||
Close() error
|
||||
}
|
||||
|
||||
// SelectParams specifies parameters passed to data selections.
|
||||
type SelectParams struct {
|
||||
Step int64 // Query step size in milliseconds.
|
||||
Func string // String representation of surrounding function or aggregation.
|
||||
}
|
||||
|
||||
// QueryableFunc is an adapter to allow the use of ordinary functions as
|
||||
// Queryables. It follows the idea of http.HandlerFunc.
|
||||
type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error)
|
||||
|
@ -22,7 +22,7 @@ func NoopQuerier() Querier {
|
||||
return noopQuerier{}
|
||||
}
|
||||
|
||||
func (noopQuerier) Select(...*labels.Matcher) (SeriesSet, error) {
|
||||
func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error) {
|
||||
return NoopSeriesSet(), nil
|
||||
}
|
||||
|
||||
|
@ -43,7 +43,7 @@ type querier struct {
|
||||
|
||||
// Select implements storage.Querier and uses the given matchers to read series
|
||||
// sets from the Client.
|
||||
func (q *querier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
func (q *querier) Select(_ *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
query, err := ToQuery(q.mint, q.maxt, matchers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -91,9 +91,9 @@ type externalLabelsQuerier struct {
|
||||
// Select adds equality matchers for all external labels to the list of matchers
|
||||
// before calling the wrapped storage.Queryable. The added external labels are
|
||||
// removed from the returned series sets.
|
||||
func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
m, added := q.addExternalLabels(matchers)
|
||||
s, err := q.Querier.Select(m...)
|
||||
s, err := q.Querier.Select(p, m...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -144,7 +144,7 @@ type requiredMatchersQuerier struct {
|
||||
|
||||
// Select returns a NoopSeriesSet if the given matchers don't match the label
|
||||
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
|
||||
func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
ms := q.requiredMatchers
|
||||
for _, m := range matchers {
|
||||
for i, r := range ms {
|
||||
@ -160,7 +160,7 @@ func (q requiredMatchersQuerier) Select(matchers ...*labels.Matcher) (storage.Se
|
||||
if len(ms) > 0 {
|
||||
return storage.NoopSeriesSet(), nil
|
||||
}
|
||||
return q.Querier.Select(matchers...)
|
||||
return q.Querier.Select(p, matchers...)
|
||||
}
|
||||
|
||||
// addExternalLabels adds matchers for each external label. External labels
|
||||
|
@ -42,7 +42,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
|
||||
externalLabels: model.LabelSet{"region": "europe"},
|
||||
}
|
||||
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
|
||||
have, err := q.Select(matchers...)
|
||||
have, err := q.Select(nil, matchers...)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@ -157,7 +157,7 @@ type mockSeriesSet struct {
|
||||
storage.SeriesSet
|
||||
}
|
||||
|
||||
func (mockQuerier) Select(...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
return mockSeriesSet{}, nil
|
||||
}
|
||||
|
||||
@ -313,7 +313,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) {
|
||||
requiredMatchers: test.requiredMatchers,
|
||||
}
|
||||
|
||||
have, err := q.Select(test.matchers...)
|
||||
have, err := q.Select(nil, test.matchers...)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ type querier struct {
|
||||
q tsdb.Querier
|
||||
}
|
||||
|
||||
func (q querier) Select(oms ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
ms := make([]tsdbLabels.Matcher, 0, len(oms))
|
||||
|
||||
for _, om := range oms {
|
||||
|
@ -108,7 +108,7 @@ type apiFunc func(r *http.Request) (interface{}, *apiError)
|
||||
// API can register a set of endpoints in a router and handle
|
||||
// them using the provided storage and query engine.
|
||||
type API struct {
|
||||
Queryable promql.Queryable
|
||||
Queryable storage.Queryable
|
||||
QueryEngine *promql.Engine
|
||||
|
||||
targetRetriever targetRetriever
|
||||
@ -125,7 +125,7 @@ type API struct {
|
||||
// NewAPI returns an initialized API type.
|
||||
func NewAPI(
|
||||
qe *promql.Engine,
|
||||
q promql.Queryable,
|
||||
q storage.Queryable,
|
||||
tr targetRetriever,
|
||||
ar alertmanagerRetriever,
|
||||
configFunc func() config.Config,
|
||||
@ -222,7 +222,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) {
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
qry, err := api.QueryEngine.NewInstantQuery(r.FormValue("query"), ts)
|
||||
qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts)
|
||||
if err != nil {
|
||||
return nil, &apiError{errorBadData, err}
|
||||
}
|
||||
@ -296,7 +296,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) {
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
qry, err := api.QueryEngine.NewRangeQuery(r.FormValue("query"), start, end, step)
|
||||
qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step)
|
||||
if err != nil {
|
||||
return nil, &apiError{errorBadData, err}
|
||||
}
|
||||
@ -396,7 +396,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
|
||||
|
||||
var sets []storage.SeriesSet
|
||||
for _, mset := range matcherSets {
|
||||
s, err := q.Select(mset...)
|
||||
s, err := q.Select(nil, mset...)
|
||||
if err != nil {
|
||||
return nil, &apiError{errorExec, err}
|
||||
}
|
||||
@ -537,7 +537,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
set, err := querier.Select(filteredMatchers...)
|
||||
set, err := querier.Select(nil, filteredMatchers...)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -74,7 +74,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
var sets []storage.SeriesSet
|
||||
for _, mset := range matcherSets {
|
||||
s, err := q.Select(mset...)
|
||||
s, err := q.Select(nil, mset...)
|
||||
if err != nil {
|
||||
federationErrors.Inc()
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -536,7 +536,7 @@ func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) {
|
||||
"__console_"+name,
|
||||
data,
|
||||
h.now(),
|
||||
template.QueryFunc(rules.EngineQueryFunc(h.queryEngine)),
|
||||
template.QueryFunc(rules.EngineQueryFunc(h.queryEngine, h.storage)),
|
||||
h.options.ExternalURL,
|
||||
)
|
||||
filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib")
|
||||
@ -766,7 +766,7 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter
|
||||
name,
|
||||
data,
|
||||
h.now(),
|
||||
template.QueryFunc(rules.EngineQueryFunc(h.queryEngine)),
|
||||
template.QueryFunc(rules.EngineQueryFunc(h.queryEngine, h.storage)),
|
||||
h.options.ExternalURL,
|
||||
)
|
||||
tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options))
|
||||
|
Loading…
Reference in New Issue
Block a user