From ed5a0f0abeeb3c313c8fbc4472cf620dfb8331d1 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 15 Sep 2016 13:52:50 +0200 Subject: [PATCH 1/2] promql: Allow per-query contexts. For Weaveworks' Frankenstein, we need to support multitenancy. In Frankenstein, we initially solved this without modifying the promql package at all: we constructed a new promql.Engine for every query and injected a storage implementation into that engine which would be primed to only collect data for a given user. This is problematic to upstream, however. Prometheus assumes that there is only one engine: the query concurrency gate is part of the engine, and the engine contains one central cancellable context to shut down all queries. Also, creating a new engine for every query seems like overkill. Thus, we want to be able to pass per-query contexts into a single engine. This change gets rid of the promql.Engine's built-in base context and allows passing in a per-query context instead. Central cancellation of all queries is still possible by deriving all passed-in contexts from one central one, but this is now the responsibility of the caller. The central query context is now created in main() and passed into the relevant components (web handler / API, rule manager). In a next step, the per-query context would have to be passed to the storage implementation, so that the storage can implement multi-tenancy or other features based on the contextual information. --- cmd/prometheus/main.go | 14 +++++++++----- promql/engine.go | 31 +++++++++---------------------- promql/engine_test.go | 30 +++++++++++++++++------------- promql/test.go | 23 ++++++++++++++++------- rules/alerting.go | 7 +++++-- rules/manager.go | 7 +++++-- rules/manager_test.go | 2 +- rules/recording.go | 5 +++-- rules/recording_test.go | 6 +++++- template/template.go | 9 +++++---- template/template_test.go | 3 ++- web/api/v1/api.go | 8 +++++--- web/api/v1/api_test.go | 1 + web/web.go | 10 +++++++--- 14 files changed, 90 insertions(+), 66 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index bbb99e373..7d7bdc77d 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -26,6 +26,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/version" + "golang.org/x/net/context" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" @@ -102,15 +104,17 @@ func Main() int { } var ( - notifier = notifier.New(&cfg.notifier) - targetManager = retrieval.NewTargetManager(sampleAppender) - queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) + notifier = notifier.New(&cfg.notifier) + targetManager = retrieval.NewTargetManager(sampleAppender) + queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) + queryCtx, cancelQueries = context.WithCancel(context.Background()) ) ruleManager := rules.NewManager(&rules.ManagerOptions{ SampleAppender: sampleAppender, Notifier: notifier, QueryEngine: queryEngine, + QueryCtx: queryCtx, ExternalURL: cfg.web.ExternalURL, }) @@ -128,7 +132,7 @@ func Main() int { GoVersion: version.GoVersion, } - webHandler := web.New(localStorage, queryEngine, targetManager, ruleManager, version, flags, &cfg.web) + webHandler := web.New(localStorage, queryEngine, queryCtx, targetManager, ruleManager, version, flags, &cfg.web) reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier) @@ -201,7 +205,7 @@ func Main() int { // Shutting down the query engine before the rule manager will cause pending queries // to be canceled and ensures a quick shutdown of the rule manager. - defer queryEngine.Stop() + defer cancelQueries() go webHandler.Run() diff --git a/promql/engine.go b/promql/engine.go index d9c33d413..184d41b3d 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -150,7 +150,7 @@ func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was cancele // it is associated with. type Query interface { // Exec processes the query and - Exec() *Result + Exec(ctx context.Context) *Result // Statement returns the parsed statement of the query. Statement() Statement // Stats returns statistics about the lifetime of the query. @@ -192,8 +192,8 @@ func (q *query) Cancel() { } // Exec implements the Query interface. -func (q *query) Exec() *Result { - res, err := q.ng.exec(q) +func (q *query) Exec(ctx context.Context) *Result { + res, err := q.ng.exec(ctx, q) return &Result{Err: err, Value: res} } @@ -220,13 +220,8 @@ func contextDone(ctx context.Context, env string) error { type Engine struct { // The querier on which the engine operates. querier local.Querier - - // The base context for all queries and its cancellation function. - baseCtx context.Context - cancelQueries func() // The gate limiting the maximum number of concurrent and waiting queries. - gate *queryGate - + gate *queryGate options *EngineOptions } @@ -235,13 +230,10 @@ func NewEngine(querier local.Querier, o *EngineOptions) *Engine { if o == nil { o = DefaultEngineOptions } - ctx, cancel := context.WithCancel(context.Background()) return &Engine{ - querier: querier, - baseCtx: ctx, - cancelQueries: cancel, - gate: newQueryGate(o.MaxConcurrentQueries), - options: o, + querier: querier, + gate: newQueryGate(o.MaxConcurrentQueries), + options: o, } } @@ -257,11 +249,6 @@ var DefaultEngineOptions = &EngineOptions{ Timeout: 2 * time.Minute, } -// Stop the engine and cancel all running queries. -func (ng *Engine) Stop() { - ng.cancelQueries() -} - // NewInstantQuery returns an evaluation query for the given expression at the given time. func (ng *Engine) NewInstantQuery(qs string, ts model.Time) (Query, error) { expr, err := ParseExpr(qs) @@ -326,8 +313,8 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query { // // At this point per query only one EvalStmt is evaluated. Alert and record // statements are not handled by the Engine. -func (ng *Engine) exec(q *query) (model.Value, error) { - ctx, cancel := context.WithTimeout(q.ng.baseCtx, ng.options.Timeout) +func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) { + ctx, cancel := context.WithTimeout(ctx, ng.options.Timeout) q.cancel = cancel queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start() diff --git a/promql/engine_test.go b/promql/engine_test.go index 0064c180f..6aa63d284 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -23,7 +23,8 @@ import ( func TestQueryConcurrency(t *testing.T) { engine := NewEngine(nil, nil) - defer engine.Stop() + ctx, cancelQueries := context.WithCancel(context.Background()) + defer cancelQueries() block := make(chan struct{}) processing := make(chan struct{}) @@ -36,7 +37,7 @@ func TestQueryConcurrency(t *testing.T) { for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ { q := engine.newTestQuery(f) - go q.Exec() + go q.Exec(ctx) select { case <-processing: // Expected. @@ -46,7 +47,7 @@ func TestQueryConcurrency(t *testing.T) { } q := engine.newTestQuery(f) - go q.Exec() + go q.Exec(ctx) select { case <-processing: @@ -76,14 +77,15 @@ func TestQueryTimeout(t *testing.T) { Timeout: 5 * time.Millisecond, MaxConcurrentQueries: 20, }) - defer engine.Stop() + ctx, cancelQueries := context.WithCancel(context.Background()) + defer cancelQueries() query := engine.newTestQuery(func(ctx context.Context) error { time.Sleep(50 * time.Millisecond) return contextDone(ctx, "test statement execution") }) - res := query.Exec() + res := query.Exec(ctx) if res.Err == nil { t.Fatalf("expected timeout error but got none") } @@ -94,7 +96,8 @@ func TestQueryTimeout(t *testing.T) { func TestQueryCancel(t *testing.T) { engine := NewEngine(nil, nil) - defer engine.Stop() + ctx, cancelQueries := context.WithCancel(context.Background()) + defer cancelQueries() // Cancel a running query before it completes. block := make(chan struct{}) @@ -109,7 +112,7 @@ func TestQueryCancel(t *testing.T) { var res *Result go func() { - res = query1.Exec() + res = query1.Exec(ctx) processing <- struct{}{} }() @@ -131,14 +134,15 @@ func TestQueryCancel(t *testing.T) { }) query2.Cancel() - res = query2.Exec() + res = query2.Exec(ctx) if res.Err != nil { - t.Fatalf("unexpeceted error on executing query2: %s", res.Err) + t.Fatalf("unexpected error on executing query2: %s", res.Err) } } func TestEngineShutdown(t *testing.T) { engine := NewEngine(nil, nil) + ctx, cancelQueries := context.WithCancel(context.Background()) block := make(chan struct{}) processing := make(chan struct{}) @@ -158,12 +162,12 @@ func TestEngineShutdown(t *testing.T) { var res *Result go func() { - res = query1.Exec() + res = query1.Exec(ctx) processing <- struct{}{} }() <-processing - engine.Stop() + cancelQueries() block <- struct{}{} <-processing @@ -181,9 +185,9 @@ func TestEngineShutdown(t *testing.T) { // The second query is started after the engine shut down. It must // be canceled immediately. - res2 := query2.Exec() + res2 := query2.Exec(ctx) if res2.Err == nil { - t.Fatalf("expected error on querying shutdown engine but got none") + t.Fatalf("expected error on querying with canceled context but got none") } if _, ok := res2.Err.(ErrQueryCanceled); !ok { t.Fatalf("expected cancelation error, got %q", res2.Err) diff --git a/promql/test.go b/promql/test.go index 60acb9285..d7866239b 100644 --- a/promql/test.go +++ b/promql/test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" @@ -49,9 +50,11 @@ type Test struct { cmds []testCommand - storage local.Storage - closeStorage func() - queryEngine *Engine + storage local.Storage + closeStorage func() + queryEngine *Engine + queryCtx context.Context + cancelQueries context.CancelFunc } // NewTest returns an initialized empty Test. @@ -79,6 +82,11 @@ func (t *Test) QueryEngine() *Engine { return t.queryEngine } +// Context returns the test's query context. +func (t *Test) Context() context.Context { + return t.queryCtx +} + // Storage returns the test's storage. func (t *Test) Storage() local.Storage { return t.storage @@ -463,7 +471,7 @@ func (t *Test) exec(tc testCommand) error { case *evalCmd: q := t.queryEngine.newQuery(cmd.expr, cmd.start, cmd.end, cmd.interval) - res := q.Exec() + res := q.Exec(t.queryCtx) if res.Err != nil { if cmd.fail { return nil @@ -490,8 +498,8 @@ func (t *Test) clear() { if t.closeStorage != nil { t.closeStorage() } - if t.queryEngine != nil { - t.queryEngine.Stop() + if t.cancelQueries != nil { + t.cancelQueries() } var closer testutil.Closer @@ -499,11 +507,12 @@ func (t *Test) clear() { t.closeStorage = closer.Close t.queryEngine = NewEngine(t.storage, nil) + t.queryCtx, t.cancelQueries = context.WithCancel(context.Background()) } // Close closes resources associated with the Test. func (t *Test) Close() { - t.queryEngine.Stop() + t.cancelQueries() t.closeStorage() } diff --git a/rules/alerting.go b/rules/alerting.go index 39b5906e1..edd09f5f6 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -18,6 +18,8 @@ import ( "sync" "time" + "golang.org/x/net/context" + html_template "html/template" "github.com/prometheus/common/log" @@ -146,12 +148,12 @@ const resolvedRetention = 15 * time.Minute // eval evaluates the rule expression and then creates pending alerts and fires // or removes previously pending alerts accordingly. -func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) { +func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, queryCtx context.Context, externalURLPath string) (model.Vector, error) { query, err := engine.NewInstantQuery(r.vector.String(), ts) if err != nil { return nil, err } - res, err := query.Exec().Vector() + res, err := query.Exec(queryCtx).Vector() if err != nil { return nil, err } @@ -188,6 +190,7 @@ func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, externalURLPat tmplData, ts, engine, + queryCtx, externalURLPath, ) result, err := tmpl.Expand() diff --git a/rules/manager.go b/rules/manager.go index 8bd79a7ff..dac4f4b0d 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "golang.org/x/net/context" + html_template "html/template" "github.com/prometheus/client_golang/prometheus" @@ -105,7 +107,7 @@ const ( type Rule interface { Name() string // eval evaluates the rule, including any associated recording or alerting actions. - eval(model.Time, *promql.Engine, string) (model.Vector, error) + eval(model.Time, *promql.Engine, context.Context, string) (model.Vector, error) // String returns a human-readable string representation of the rule. String() string // HTMLSnippet returns a human-readable string representation of the rule, @@ -256,7 +258,7 @@ func (g *Group) eval() { evalTotal.WithLabelValues(rtyp).Inc() - vector, err := rule.eval(now, g.opts.QueryEngine, g.opts.ExternalURL.Path) + vector, err := rule.eval(now, g.opts.QueryEngine, g.opts.QueryCtx, g.opts.ExternalURL.Path) if err != nil { // Canceled queries are intentional termination of queries. This normally // happens on shutdown and thus we skip logging of any errors here. @@ -341,6 +343,7 @@ type Manager struct { type ManagerOptions struct { ExternalURL *url.URL QueryEngine *promql.Engine + QueryCtx context.Context Notifier *notifier.Notifier SampleAppender storage.SampleAppender } diff --git a/rules/manager_test.go b/rules/manager_test.go index dfee078d7..6f8a1c4fb 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -105,7 +105,7 @@ func TestAlertingRule(t *testing.T) { for i, test := range tests { evalTime := model.Time(0).Add(test.time) - res, err := rule.eval(evalTime, suite.QueryEngine(), "") + res, err := rule.eval(evalTime, suite.QueryEngine(), suite.Context(), "") if err != nil { t.Fatalf("Error during alerting rule evaluation: %s", err) } diff --git a/rules/recording.go b/rules/recording.go index e40fbed5e..ffe39c941 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -18,6 +18,7 @@ import ( "html/template" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/util/strutil" @@ -45,14 +46,14 @@ func (rule RecordingRule) Name() string { } // eval evaluates the rule and then overrides the metric names and labels accordingly. -func (rule RecordingRule) eval(timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) { +func (rule RecordingRule) eval(timestamp model.Time, engine *promql.Engine, queryCtx context.Context, _ string) (model.Vector, error) { query, err := engine.NewInstantQuery(rule.vector.String(), timestamp) if err != nil { return nil, err } var ( - result = query.Exec() + result = query.Exec(queryCtx) vector model.Vector ) if result.Err != nil { diff --git a/rules/recording_test.go b/rules/recording_test.go index 36b5ac1dd..6b58f4eaf 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage/local" @@ -27,6 +28,9 @@ func TestRuleEval(t *testing.T) { storage, closer := local.NewTestStorage(t, 2) defer closer.Close() engine := promql.NewEngine(storage, nil) + queryCtx, cancelQueries := context.WithCancel(context.Background()) + defer cancelQueries() + now := model.Now() suite := []struct { @@ -59,7 +63,7 @@ func TestRuleEval(t *testing.T) { for _, test := range suite { rule := NewRecordingRule(test.name, test.expr, test.labels) - result, err := rule.eval(now, engine, "") + result, err := rule.eval(now, engine, queryCtx, "") if err != nil { t.Fatalf("Error evaluating %s", test.name) } diff --git a/template/template.go b/template/template.go index a1d6cfe06..d07c446e3 100644 --- a/template/template.go +++ b/template/template.go @@ -26,6 +26,7 @@ import ( text_template "text/template" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/util/strutil" @@ -55,12 +56,12 @@ func (q queryResultByLabelSorter) Swap(i, j int) { q.results[i], q.results[j] = q.results[j], q.results[i] } -func query(q string, timestamp model.Time, queryEngine *promql.Engine) (queryResult, error) { +func query(ctx context.Context, q string, timestamp model.Time, queryEngine *promql.Engine) (queryResult, error) { query, err := queryEngine.NewInstantQuery(q, timestamp) if err != nil { return nil, err } - res := query.Exec() + res := query.Exec(ctx) if res.Err != nil { return nil, res.Err } @@ -110,14 +111,14 @@ type Expander struct { } // NewTemplateExpander returns a template expander ready to use. -func NewTemplateExpander(text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, pathPrefix string) *Expander { +func NewTemplateExpander(text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, queryCtx context.Context, pathPrefix string) *Expander { return &Expander{ text: text, name: name, data: data, funcMap: text_template.FuncMap{ "query": func(q string) (queryResult, error) { - return query(q, timestamp, queryEngine) + return query(queryCtx, q, timestamp, queryEngine) }, "first": func(v queryResult) (*sample, error) { if len(v) > 0 { diff --git a/template/template_test.go b/template/template_test.go index bab51d651..9d1c787a4 100644 --- a/template/template_test.go +++ b/template/template_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage/local" @@ -220,7 +221,7 @@ func TestTemplateExpansion(t *testing.T) { for i, s := range scenarios { var result string var err error - expander := NewTemplateExpander(s.text, "test", s.input, time, engine, "") + expander := NewTemplateExpander(s.text, "test", s.input, time, engine, context.Background(), "") if s.html { result, err = expander.ExpandHTML(nil) } else { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 97085cda6..a79f9f9c7 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -87,15 +87,17 @@ type apiFunc func(r *http.Request) (interface{}, *apiError) type API struct { Storage local.Storage QueryEngine *promql.Engine + QueryCtx context.Context context func(r *http.Request) context.Context now func() model.Time } // NewAPI returns an initialized API type. -func NewAPI(qe *promql.Engine, st local.Storage) *API { +func NewAPI(qe *promql.Engine, qc context.Context, st local.Storage) *API { return &API{ QueryEngine: qe, + QueryCtx: qc, Storage: st, context: route.Context, now: model.Now, @@ -157,7 +159,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, err} } - res := qry.Exec() + res := qry.Exec(api.QueryCtx) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: @@ -204,7 +206,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, err} } - res := qry.Exec() + res := qry.Exec(api.QueryCtx) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 38ee39cfb..f0e66eab0 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -52,6 +52,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Storage: suite.Storage(), QueryEngine: suite.QueryEngine(), + QueryCtx: suite.Context(), now: func() model.Time { return now }, } diff --git a/web/web.go b/web/web.go index 9af2496ae..0cce11cc4 100644 --- a/web/web.go +++ b/web/web.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/promql" @@ -55,6 +56,7 @@ type Handler struct { targetManager *retrieval.TargetManager ruleManager *rules.Manager queryEngine *promql.Engine + queryCtx context.Context storage local.Storage apiV1 *api_v1.API @@ -112,6 +114,7 @@ type Options struct { func New( st local.Storage, qe *promql.Engine, + qc context.Context, tm *retrieval.TargetManager, rm *rules.Manager, version *PrometheusVersion, @@ -133,9 +136,10 @@ func New( targetManager: tm, ruleManager: rm, queryEngine: qe, + queryCtx: qc, storage: st, - apiV1: api_v1.NewAPI(qe, st), + apiV1: api_v1.NewAPI(qe, qc, st), now: model.Now, } @@ -293,7 +297,7 @@ func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) { Path: strings.TrimLeft(name, "/"), } - tmpl := template.NewTemplateExpander(string(text), "__console_"+name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path) + tmpl := template.NewTemplateExpander(string(text), "__console_"+name, data, h.now(), h.queryEngine, h.queryCtx, h.options.ExternalURL.Path) filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -466,7 +470,7 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter http.Error(w, err.Error(), http.StatusInternalServerError) } - tmpl := template.NewTemplateExpander(text, name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path) + tmpl := template.NewTemplateExpander(text, name, data, h.now(), h.queryEngine, h.queryCtx, h.options.ExternalURL.Path) tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options)) result, err := tmpl.ExpandHTML(nil) From c18730836689652d38999e55acaa332f04f09d6d Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 16 Sep 2016 00:58:06 +0200 Subject: [PATCH 2/2] storage: Contextify storage interfaces. This is based on https://github.com/prometheus/prometheus/pull/1997. This adds contexts to the relevant Storage methods and already passes PromQL's new per-query context into the storage's query methods. The immediate motivation supporting multi-tenancy in Frankenstein, but this could also be used by Prometheus's normal local storage to support cancellations and timeouts at some point. --- cmd/prometheus/main.go | 30 +++++++++++++++---------- promql/engine.go | 7 ++++-- promql/engine_test.go | 16 +++++++------- promql/test.go | 24 ++++++++++---------- rules/alerting.go | 6 ++--- rules/manager.go | 9 ++++---- rules/manager_test.go | 2 +- rules/recording.go | 4 ++-- rules/recording_test.go | 6 ++--- storage/local/interface.go | 13 ++++++----- storage/local/noop_storage.go | 13 ++++++----- storage/local/storage.go | 14 +++++++----- storage/local/storage_test.go | 12 +++++++--- template/template.go | 4 ++-- template/template_test.go | 2 +- web/api/v1/api.go | 16 +++++++------- web/api/v1/api_test.go | 2 +- web/federate.go | 2 +- web/web.go | 41 +++++++++++++++++------------------ 19 files changed, 121 insertions(+), 102 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 7d7bdc77d..8fbe85884 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -104,26 +104,27 @@ func Main() int { } var ( - notifier = notifier.New(&cfg.notifier) - targetManager = retrieval.NewTargetManager(sampleAppender) - queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) - queryCtx, cancelQueries = context.WithCancel(context.Background()) + notifier = notifier.New(&cfg.notifier) + targetManager = retrieval.NewTargetManager(sampleAppender) + queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) + ctx, cancelCtx = context.WithCancel(context.Background()) ) ruleManager := rules.NewManager(&rules.ManagerOptions{ SampleAppender: sampleAppender, Notifier: notifier, QueryEngine: queryEngine, - QueryCtx: queryCtx, + Context: ctx, ExternalURL: cfg.web.ExternalURL, }) - flags := map[string]string{} - cfg.fs.VisitAll(func(f *flag.Flag) { - flags[f.Name] = f.Value.String() - }) + cfg.web.Context = ctx + cfg.web.Storage = localStorage + cfg.web.QueryEngine = queryEngine + cfg.web.TargetManager = targetManager + cfg.web.RuleManager = ruleManager - version := &web.PrometheusVersion{ + cfg.web.Version = &web.PrometheusVersion{ Version: version.Version, Revision: version.Revision, Branch: version.Branch, @@ -132,7 +133,12 @@ func Main() int { GoVersion: version.GoVersion, } - webHandler := web.New(localStorage, queryEngine, queryCtx, targetManager, ruleManager, version, flags, &cfg.web) + cfg.web.Flags = map[string]string{} + cfg.fs.VisitAll(func(f *flag.Flag) { + cfg.web.Flags[f.Name] = f.Value.String() + }) + + webHandler := web.New(&cfg.web) reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier) @@ -205,7 +211,7 @@ func Main() int { // Shutting down the query engine before the rule manager will cause pending queries // to be canceled and ensures a quick shutdown of the rule manager. - defer cancelQueries() + defer cancelCtx() go webHandler.Run() diff --git a/promql/engine.go b/promql/engine.go index 184d41b3d..64265060a 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -352,7 +352,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) { prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() - err := ng.populateIterators(s) + err := ng.populateIterators(ctx, s) prepareTimer.Stop() if err != nil { return nil, err @@ -463,19 +463,21 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return resMatrix, nil } -func (ng *Engine) populateIterators(s *EvalStmt) error { +func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error { var queryErr error Inspect(s.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: if s.Start.Equal(s.End) { n.iterators, queryErr = ng.querier.QueryInstant( + ctx, s.Start.Add(-n.Offset), StalenessDelta, n.LabelMatchers..., ) } else { n.iterators, queryErr = ng.querier.QueryRange( + ctx, s.Start.Add(-n.Offset-StalenessDelta), s.End.Add(-n.Offset), n.LabelMatchers..., @@ -486,6 +488,7 @@ func (ng *Engine) populateIterators(s *EvalStmt) error { } case *MatrixSelector: n.iterators, queryErr = ng.querier.QueryRange( + ctx, s.Start.Add(-n.Offset-n.Range), s.End.Add(-n.Offset), n.LabelMatchers..., diff --git a/promql/engine_test.go b/promql/engine_test.go index 6aa63d284..134f417f3 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -23,8 +23,8 @@ import ( func TestQueryConcurrency(t *testing.T) { engine := NewEngine(nil, nil) - ctx, cancelQueries := context.WithCancel(context.Background()) - defer cancelQueries() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() block := make(chan struct{}) processing := make(chan struct{}) @@ -77,8 +77,8 @@ func TestQueryTimeout(t *testing.T) { Timeout: 5 * time.Millisecond, MaxConcurrentQueries: 20, }) - ctx, cancelQueries := context.WithCancel(context.Background()) - defer cancelQueries() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() query := engine.newTestQuery(func(ctx context.Context) error { time.Sleep(50 * time.Millisecond) @@ -96,8 +96,8 @@ func TestQueryTimeout(t *testing.T) { func TestQueryCancel(t *testing.T) { engine := NewEngine(nil, nil) - ctx, cancelQueries := context.WithCancel(context.Background()) - defer cancelQueries() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() // Cancel a running query before it completes. block := make(chan struct{}) @@ -142,7 +142,7 @@ func TestQueryCancel(t *testing.T) { func TestEngineShutdown(t *testing.T) { engine := NewEngine(nil, nil) - ctx, cancelQueries := context.WithCancel(context.Background()) + ctx, cancelCtx := context.WithCancel(context.Background()) block := make(chan struct{}) processing := make(chan struct{}) @@ -167,7 +167,7 @@ func TestEngineShutdown(t *testing.T) { }() <-processing - cancelQueries() + cancelCtx() block <- struct{}{} <-processing diff --git a/promql/test.go b/promql/test.go index d7866239b..8b10d450f 100644 --- a/promql/test.go +++ b/promql/test.go @@ -50,11 +50,11 @@ type Test struct { cmds []testCommand - storage local.Storage - closeStorage func() - queryEngine *Engine - queryCtx context.Context - cancelQueries context.CancelFunc + storage local.Storage + closeStorage func() + queryEngine *Engine + context context.Context + cancelCtx context.CancelFunc } // NewTest returns an initialized empty Test. @@ -82,9 +82,9 @@ func (t *Test) QueryEngine() *Engine { return t.queryEngine } -// Context returns the test's query context. +// Context returns the test's context. func (t *Test) Context() context.Context { - return t.queryCtx + return t.context } // Storage returns the test's storage. @@ -471,7 +471,7 @@ func (t *Test) exec(tc testCommand) error { case *evalCmd: q := t.queryEngine.newQuery(cmd.expr, cmd.start, cmd.end, cmd.interval) - res := q.Exec(t.queryCtx) + res := q.Exec(t.context) if res.Err != nil { if cmd.fail { return nil @@ -498,8 +498,8 @@ func (t *Test) clear() { if t.closeStorage != nil { t.closeStorage() } - if t.cancelQueries != nil { - t.cancelQueries() + if t.cancelCtx != nil { + t.cancelCtx() } var closer testutil.Closer @@ -507,12 +507,12 @@ func (t *Test) clear() { t.closeStorage = closer.Close t.queryEngine = NewEngine(t.storage, nil) - t.queryCtx, t.cancelQueries = context.WithCancel(context.Background()) + t.context, t.cancelCtx = context.WithCancel(context.Background()) } // Close closes resources associated with the Test. func (t *Test) Close() { - t.cancelQueries() + t.cancelCtx() t.closeStorage() } diff --git a/rules/alerting.go b/rules/alerting.go index edd09f5f6..ae1535c34 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -148,12 +148,12 @@ const resolvedRetention = 15 * time.Minute // eval evaluates the rule expression and then creates pending alerts and fires // or removes previously pending alerts accordingly. -func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, queryCtx context.Context, externalURLPath string) (model.Vector, error) { +func (r *AlertingRule) eval(ctx context.Context, ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) { query, err := engine.NewInstantQuery(r.vector.String(), ts) if err != nil { return nil, err } - res, err := query.Exec(queryCtx).Vector() + res, err := query.Exec(ctx).Vector() if err != nil { return nil, err } @@ -185,12 +185,12 @@ func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, queryCtx conte expand := func(text model.LabelValue) model.LabelValue { tmpl := template.NewTemplateExpander( + ctx, defs+string(text), "__alert_"+r.Name(), tmplData, ts, engine, - queryCtx, externalURLPath, ) result, err := tmpl.Expand() diff --git a/rules/manager.go b/rules/manager.go index dac4f4b0d..397055ceb 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -21,13 +21,12 @@ import ( "sync" "time" - "golang.org/x/net/context" - html_template "html/template" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" @@ -107,7 +106,7 @@ const ( type Rule interface { Name() string // eval evaluates the rule, including any associated recording or alerting actions. - eval(model.Time, *promql.Engine, context.Context, string) (model.Vector, error) + eval(context.Context, model.Time, *promql.Engine, string) (model.Vector, error) // String returns a human-readable string representation of the rule. String() string // HTMLSnippet returns a human-readable string representation of the rule, @@ -258,7 +257,7 @@ func (g *Group) eval() { evalTotal.WithLabelValues(rtyp).Inc() - vector, err := rule.eval(now, g.opts.QueryEngine, g.opts.QueryCtx, g.opts.ExternalURL.Path) + vector, err := rule.eval(g.opts.Context, now, g.opts.QueryEngine, g.opts.ExternalURL.Path) if err != nil { // Canceled queries are intentional termination of queries. This normally // happens on shutdown and thus we skip logging of any errors here. @@ -343,7 +342,7 @@ type Manager struct { type ManagerOptions struct { ExternalURL *url.URL QueryEngine *promql.Engine - QueryCtx context.Context + Context context.Context Notifier *notifier.Notifier SampleAppender storage.SampleAppender } diff --git a/rules/manager_test.go b/rules/manager_test.go index 6f8a1c4fb..8eb31175e 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -105,7 +105,7 @@ func TestAlertingRule(t *testing.T) { for i, test := range tests { evalTime := model.Time(0).Add(test.time) - res, err := rule.eval(evalTime, suite.QueryEngine(), suite.Context(), "") + res, err := rule.eval(suite.Context(), evalTime, suite.QueryEngine(), "") if err != nil { t.Fatalf("Error during alerting rule evaluation: %s", err) } diff --git a/rules/recording.go b/rules/recording.go index ffe39c941..14cd26a2c 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -46,14 +46,14 @@ func (rule RecordingRule) Name() string { } // eval evaluates the rule and then overrides the metric names and labels accordingly. -func (rule RecordingRule) eval(timestamp model.Time, engine *promql.Engine, queryCtx context.Context, _ string) (model.Vector, error) { +func (rule RecordingRule) eval(ctx context.Context, timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) { query, err := engine.NewInstantQuery(rule.vector.String(), timestamp) if err != nil { return nil, err } var ( - result = query.Exec(queryCtx) + result = query.Exec(ctx) vector model.Vector ) if result.Err != nil { diff --git a/rules/recording_test.go b/rules/recording_test.go index 6b58f4eaf..3ad40f915 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -28,8 +28,8 @@ func TestRuleEval(t *testing.T) { storage, closer := local.NewTestStorage(t, 2) defer closer.Close() engine := promql.NewEngine(storage, nil) - queryCtx, cancelQueries := context.WithCancel(context.Background()) - defer cancelQueries() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() now := model.Now() @@ -63,7 +63,7 @@ func TestRuleEval(t *testing.T) { for _, test := range suite { rule := NewRecordingRule(test.name, test.expr, test.labels) - result, err := rule.eval(now, engine, queryCtx, "") + result, err := rule.eval(ctx, now, engine, "") if err != nil { t.Fatalf("Error evaluating %s", test.name) } diff --git a/storage/local/interface.go b/storage/local/interface.go index 5bbefa309..4b88a7f6b 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -17,6 +17,7 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/metric" @@ -40,7 +41,7 @@ type Storage interface { // Drop all time series associated with the given label matchers. Returns // the number series that were dropped. - DropMetricsForLabelMatchers(...*metric.LabelMatcher) (int, error) + DropMetricsForLabelMatchers(context.Context, ...*metric.LabelMatcher) (int, error) // Run the various maintenance loops in goroutines. Returns when the // storage is ready to use. Keeps everything running in the background // until Stop is called. @@ -59,10 +60,10 @@ type Querier interface { // QueryRange returns a list of series iterators for the selected // time range and label matchers. The iterators need to be closed // after usage. - QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) + QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) // QueryInstant returns a list of series iterators for the selected // instant and label matchers. The iterators need to be closed after usage. - QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) + QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) // MetricsForLabelMatchers returns the metrics from storage that satisfy // the given sets of label matchers. Each set of matchers must contain at // least one label matcher that does not match the empty string. Otherwise, @@ -72,14 +73,14 @@ type Querier interface { // storage to optimize the search. The storage MAY exclude metrics that // have no samples in the specified interval from the returned map. In // doubt, specify model.Earliest for from and model.Latest for through. - MetricsForLabelMatchers(from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) + MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) // LastSampleForLabelMatchers returns the last samples that have been // ingested for the time series matching the given set of label matchers. // The label matching behavior is the same as in MetricsForLabelMatchers. // All returned samples are between the specified cutoff time and now. - LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) + LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) // Get all of the label values that are associated with a given label name. - LabelValuesForLabelName(model.LabelName) (model.LabelValues, error) + LabelValuesForLabelName(context.Context, model.LabelName) (model.LabelValues, error) } // SeriesIterator enables efficient access of sample values in a series. Its diff --git a/storage/local/noop_storage.go b/storage/local/noop_storage.go index 4832f8f45..77a48bbb8 100644 --- a/storage/local/noop_storage.go +++ b/storage/local/noop_storage.go @@ -17,6 +17,8 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" + "github.com/prometheus/prometheus/storage/metric" ) @@ -39,22 +41,23 @@ func (s *NoopStorage) WaitForIndexing() { } // LastSampleForLabelMatchers implements Storage. -func (s *NoopStorage) LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { +func (s *NoopStorage) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { return nil, nil } // QueryRange implements Storage. -func (s *NoopStorage) QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +func (s *NoopStorage) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { return nil, nil } // QueryInstant implements Storage. -func (s *NoopStorage) QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +func (s *NoopStorage) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { return nil, nil } // MetricsForLabelMatchers implements Storage. func (s *NoopStorage) MetricsForLabelMatchers( + ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers, ) ([]metric.Metric, error) { @@ -62,12 +65,12 @@ func (s *NoopStorage) MetricsForLabelMatchers( } // LabelValuesForLabelName implements Storage. -func (s *NoopStorage) LabelValuesForLabelName(labelName model.LabelName) (model.LabelValues, error) { +func (s *NoopStorage) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) { return nil, nil } // DropMetricsForLabelMatchers implements Storage. -func (s *NoopStorage) DropMetricsForLabelMatchers(matchers ...*metric.LabelMatcher) (int, error) { +func (s *NoopStorage) DropMetricsForLabelMatchers(ctx context.Context, matchers ...*metric.LabelMatcher) (int, error) { return 0, nil } diff --git a/storage/local/storage.go b/storage/local/storage.go index ef4ff89ed..a00da8b8a 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/storage/metric" ) @@ -413,7 +414,7 @@ func (s *MemorySeriesStorage) WaitForIndexing() { } // LastSampleForLabelMatchers implements Storage. -func (s *MemorySeriesStorage) LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { +func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { fps := map[model.Fingerprint]struct{}{} for _, matchers := range matcherSets { fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...) @@ -483,7 +484,7 @@ func (bit *boundedIterator) Close() { } // QueryRange implements Storage. -func (s *MemorySeriesStorage) QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) if err != nil { return nil, err @@ -497,7 +498,7 @@ func (s *MemorySeriesStorage) QueryRange(from, through model.Time, matchers ...* } // QueryInstant implements Storage. -func (s *MemorySeriesStorage) QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +func (s *MemorySeriesStorage) QueryInstant(_ context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { from := ts.Add(-stalenessDelta) through := ts @@ -540,6 +541,7 @@ func (s *MemorySeriesStorage) fingerprintsForLabelPair( // MetricsForLabelMatchers implements Storage. func (s *MemorySeriesStorage) MetricsForLabelMatchers( + _ context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers, ) ([]metric.Metric, error) { @@ -603,7 +605,7 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( break } - lvs, err := s.LabelValuesForLabelName(m.Name) + lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name) if err != nil { return nil, err } @@ -693,12 +695,12 @@ func (s *MemorySeriesStorage) metricForRange( } // LabelValuesForLabelName implements Storage. -func (s *MemorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) (model.LabelValues, error) { +func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelName model.LabelName) (model.LabelValues, error) { return s.persistence.labelValuesForLabelName(labelName) } // DropMetricsForLabelMatchers implements Storage. -func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(matchers ...*metric.LabelMatcher) (int, error) { +func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) { fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...) if err != nil { return 0, err diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index d04c5ac92..cd9e866f5 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/util/testutil" @@ -194,6 +195,7 @@ func TestMatches(t *testing.T) { for _, mt := range matcherTests { metrics, err := storage.MetricsForLabelMatchers( + context.Background(), model.Earliest, model.Latest, mt.matchers, ) @@ -218,6 +220,7 @@ func TestMatches(t *testing.T) { } // Smoketest for from/through. metrics, err = storage.MetricsForLabelMatchers( + context.Background(), model.Earliest, -10000, mt.matchers, ) @@ -228,6 +231,7 @@ func TestMatches(t *testing.T) { t.Error("expected no matches with 'through' older than any sample") } metrics, err = storage.MetricsForLabelMatchers( + context.Background(), 10000, model.Latest, mt.matchers, ) @@ -243,6 +247,7 @@ func TestMatches(t *testing.T) { through model.Time = 75 ) metrics, err = storage.MetricsForLabelMatchers( + context.Background(), from, through, mt.matchers, ) @@ -451,6 +456,7 @@ func BenchmarkLabelMatching(b *testing.B) { benchLabelMatchingRes = []metric.Metric{} for _, mt := range matcherTests { benchLabelMatchingRes, err = s.MetricsForLabelMatchers( + context.Background(), model.Earliest, model.Latest, mt, ) @@ -493,7 +499,7 @@ func TestRetentionCutoff(t *testing.T) { if err != nil { t.Fatalf("error creating label matcher: %s", err) } - its, err := s.QueryRange(insertStart, now, lm) + its, err := s.QueryRange(context.Background(), insertStart, now, lm) if err != nil { t.Fatal(err) } @@ -581,7 +587,7 @@ func TestDropMetrics(t *testing.T) { fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived} - n, err := s.DropMetricsForLabelMatchers(lm1) + n, err := s.DropMetricsForLabelMatchers(context.Background(), lm1) if err != nil { t.Fatal(err) } @@ -614,7 +620,7 @@ func TestDropMetrics(t *testing.T) { t.Errorf("chunk file does not exist for fp=%v", fpList[2]) } - n, err = s.DropMetricsForLabelMatchers(lmAll) + n, err = s.DropMetricsForLabelMatchers(context.Background(), lmAll) if err != nil { t.Fatal(err) } diff --git a/template/template.go b/template/template.go index d07c446e3..f468b9b6e 100644 --- a/template/template.go +++ b/template/template.go @@ -111,14 +111,14 @@ type Expander struct { } // NewTemplateExpander returns a template expander ready to use. -func NewTemplateExpander(text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, queryCtx context.Context, pathPrefix string) *Expander { +func NewTemplateExpander(ctx context.Context, text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, pathPrefix string) *Expander { return &Expander{ text: text, name: name, data: data, funcMap: text_template.FuncMap{ "query": func(q string) (queryResult, error) { - return query(queryCtx, q, timestamp, queryEngine) + return query(ctx, q, timestamp, queryEngine) }, "first": func(v queryResult) (*sample, error) { if len(v) > 0 { diff --git a/template/template_test.go b/template/template_test.go index 9d1c787a4..3f919ab1b 100644 --- a/template/template_test.go +++ b/template/template_test.go @@ -221,7 +221,7 @@ func TestTemplateExpansion(t *testing.T) { for i, s := range scenarios { var result string var err error - expander := NewTemplateExpander(s.text, "test", s.input, time, engine, context.Background(), "") + expander := NewTemplateExpander(context.Background(), s.text, "test", s.input, time, engine, "") if s.html { result, err = expander.ExpandHTML(nil) } else { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index a79f9f9c7..af9d86e7a 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -85,19 +85,19 @@ type apiFunc func(r *http.Request) (interface{}, *apiError) // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. type API struct { + Context context.Context Storage local.Storage QueryEngine *promql.Engine - QueryCtx context.Context context func(r *http.Request) context.Context now func() model.Time } // NewAPI returns an initialized API type. -func NewAPI(qe *promql.Engine, qc context.Context, st local.Storage) *API { +func NewAPI(ctx context.Context, qe *promql.Engine, st local.Storage) *API { return &API{ + Context: ctx, QueryEngine: qe, - QueryCtx: qc, Storage: st, context: route.Context, now: model.Now, @@ -159,7 +159,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, err} } - res := qry.Exec(api.QueryCtx) + res := qry.Exec(api.Context) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: @@ -206,7 +206,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, err} } - res := qry.Exec(api.QueryCtx) + res := qry.Exec(api.Context) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: @@ -228,7 +228,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) { if !model.LabelNameRE.MatchString(name) { return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} } - vals, err := api.Storage.LabelValuesForLabelName(model.LabelName(name)) + vals, err := api.Storage.LabelValuesForLabelName(api.Context, model.LabelName(name)) if err != nil { return nil, &apiError{errorExec, err} } @@ -274,7 +274,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { matcherSets = append(matcherSets, matchers) } - res, err := api.Storage.MetricsForLabelMatchers(start, end, matcherSets...) + res, err := api.Storage.MetricsForLabelMatchers(api.Context, start, end, matcherSets...) if err != nil { return nil, &apiError{errorExec, err} } @@ -298,7 +298,7 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { if err != nil { return nil, &apiError{errorBadData, err} } - n, err := api.Storage.DropMetricsForLabelMatchers(matchers...) + n, err := api.Storage.DropMetricsForLabelMatchers(context.TODO(), matchers...) if err != nil { return nil, &apiError{errorExec, err} } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index f0e66eab0..5d3bec8ff 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -52,7 +52,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Storage: suite.Storage(), QueryEngine: suite.QueryEngine(), - QueryCtx: suite.Context(), + Context: suite.Context(), now: func() model.Time { return now }, } diff --git a/web/federate.go b/web/federate.go index 9004db880..9a7d15193 100644 --- a/web/federate.go +++ b/web/federate.go @@ -50,7 +50,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { ) w.Header().Set("Content-Type", string(format)) - vector, err := h.storage.LastSampleForLabelMatchers(minTimestamp, matcherSets...) + vector, err := h.storage.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/web/web.go b/web/web.go index 0cce11cc4..e7eef450d 100644 --- a/web/web.go +++ b/web/web.go @@ -56,7 +56,7 @@ type Handler struct { targetManager *retrieval.TargetManager ruleManager *rules.Manager queryEngine *promql.Engine - queryCtx context.Context + context context.Context storage local.Storage apiV1 *api_v1.API @@ -99,6 +99,14 @@ type PrometheusVersion struct { // Options for the web Handler. type Options struct { + Context context.Context + Storage local.Storage + QueryEngine *promql.Engine + TargetManager *retrieval.TargetManager + RuleManager *rules.Manager + Version *PrometheusVersion + Flags map[string]string + ListenAddress string ExternalURL *url.URL RoutePrefix string @@ -111,16 +119,7 @@ type Options struct { } // New initializes a new web Handler. -func New( - st local.Storage, - qe *promql.Engine, - qc context.Context, - tm *retrieval.TargetManager, - rm *rules.Manager, - version *PrometheusVersion, - flags map[string]string, - o *Options, -) *Handler { +func New(o *Options) *Handler { router := route.New() h := &Handler{ @@ -129,17 +128,17 @@ func New( quitCh: make(chan struct{}), reloadCh: make(chan chan error), options: o, - versionInfo: version, + versionInfo: o.Version, birth: time.Now(), - flagsMap: flags, + flagsMap: o.Flags, - targetManager: tm, - ruleManager: rm, - queryEngine: qe, - queryCtx: qc, - storage: st, + context: o.Context, + targetManager: o.TargetManager, + ruleManager: o.RuleManager, + queryEngine: o.QueryEngine, + storage: o.Storage, - apiV1: api_v1.NewAPI(qe, qc, st), + apiV1: api_v1.NewAPI(o.Context, o.QueryEngine, o.Storage), now: model.Now, } @@ -297,7 +296,7 @@ func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) { Path: strings.TrimLeft(name, "/"), } - tmpl := template.NewTemplateExpander(string(text), "__console_"+name, data, h.now(), h.queryEngine, h.queryCtx, h.options.ExternalURL.Path) + tmpl := template.NewTemplateExpander(h.context, string(text), "__console_"+name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path) filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -470,7 +469,7 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter http.Error(w, err.Error(), http.StatusInternalServerError) } - tmpl := template.NewTemplateExpander(text, name, data, h.now(), h.queryEngine, h.queryCtx, h.options.ExternalURL.Path) + tmpl := template.NewTemplateExpander(h.context, text, name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path) tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options)) result, err := tmpl.ExpandHTML(nil)