Re-add contexts to storage.Storage.Querier() (#3230)
* Re-add contexts to storage.Storage.Querier() These are needed when replacing the storage by a multi-tenant implementation where the tenant is stored in the context. The 1.x query interfaces already had contexts, but they got lost in 2.x. * Convert promql.Engine to use native contexts
This commit is contained in:
parent
c4270fdbf4
commit
f7e8348a88
|
@ -15,6 +15,7 @@ package promql
|
|||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"runtime"
|
||||
|
@ -31,7 +32,6 @@ import (
|
|||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/pkg/value"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/prometheus/prometheus/util/stats"
|
||||
)
|
||||
|
@ -212,7 +212,7 @@ type Engine struct {
|
|||
|
||||
// Queryable allows opening a storage querier.
|
||||
type Queryable interface {
|
||||
Querier(mint, maxt int64) (storage.Querier, error)
|
||||
Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error)
|
||||
}
|
||||
|
||||
// NewEngine returns a new engine.
|
||||
|
@ -507,7 +507,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
|
|||
|
||||
mint := s.Start.Add(-maxOffset)
|
||||
|
||||
querier, err := ng.queryable.Querier(timestamp.FromTime(mint), timestamp.FromTime(s.End))
|
||||
querier, err := ng.queryable.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -14,13 +14,12 @@
|
|||
package promql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
)
|
||||
|
|
|
@ -190,7 +190,7 @@ func TestStaleness(t *testing.T) {
|
|||
group.Eval(time.Unix(1, 0))
|
||||
group.Eval(time.Unix(2, 0))
|
||||
|
||||
querier, err := storage.Querier(0, 2000)
|
||||
querier, err := storage.Querier(context.Background(), 0, 2000)
|
||||
defer querier.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -15,6 +15,7 @@ package storage
|
|||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
|
@ -39,13 +40,13 @@ func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Stora
|
|||
}
|
||||
}
|
||||
|
||||
func (f *fanout) Querier(mint, maxt int64) (Querier, error) {
|
||||
func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
||||
queriers := mergeQuerier{
|
||||
queriers: make([]Querier, 0, 1+len(f.secondaries)),
|
||||
}
|
||||
|
||||
// Add primary querier
|
||||
querier, err := f.primary.Querier(mint, maxt)
|
||||
querier, err := f.primary.Querier(ctx, mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -53,7 +54,7 @@ func (f *fanout) Querier(mint, maxt int64) (Querier, error) {
|
|||
|
||||
// Add secondary queriers
|
||||
for _, storage := range f.secondaries {
|
||||
querier, err := storage.Querier(mint, maxt)
|
||||
querier, err := storage.Querier(ctx, mint, maxt)
|
||||
if err != nil {
|
||||
queriers.Close()
|
||||
return nil, err
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
|
@ -31,7 +32,7 @@ var (
|
|||
// are goroutine-safe. Storage implements storage.SampleAppender.
|
||||
type Storage interface {
|
||||
// Querier returns a new Querier on the storage.
|
||||
Querier(mint, maxt int64) (Querier, error)
|
||||
Querier(ctx context.Context, mint, maxt int64) (Querier, error)
|
||||
|
||||
// Appender returns a new appender against the storage.
|
||||
Appender() (Appender, error)
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
)
|
||||
|
||||
// Querier returns a new Querier on the storage.
|
||||
func (r *Storage) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
func (r *Storage) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
@ -62,9 +63,9 @@ func (s *ReadyStorage) get() *adapter {
|
|||
}
|
||||
|
||||
// Querier implements the Storage interface.
|
||||
func (s *ReadyStorage) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
func (s *ReadyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
if x := s.get(); x != nil {
|
||||
return x.Querier(mint, maxt)
|
||||
return x.Querier(ctx, mint, maxt)
|
||||
}
|
||||
return nil, ErrNotReady
|
||||
}
|
||||
|
@ -138,7 +139,7 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t
|
|||
return db, nil
|
||||
}
|
||||
|
||||
func (a adapter) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
func (a adapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
return querier{q: a.db.Querier(mint, maxt)}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -276,12 +276,13 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) {
|
|||
}
|
||||
|
||||
func (api *API) labelValues(r *http.Request) (interface{}, *apiError) {
|
||||
name := route.Param(r.Context(), "name")
|
||||
ctx := r.Context()
|
||||
name := route.Param(ctx, "name")
|
||||
|
||||
if !model.LabelNameRE.MatchString(name) {
|
||||
return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}
|
||||
}
|
||||
q, err := api.Queryable.Querier(math.MinInt64, math.MaxInt64)
|
||||
q, err := api.Queryable.Querier(ctx, math.MinInt64, math.MaxInt64)
|
||||
if err != nil {
|
||||
return nil, &apiError{errorExec, err}
|
||||
}
|
||||
|
@ -338,7 +339,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
|
|||
matcherSets = append(matcherSets, matchers)
|
||||
}
|
||||
|
||||
q, err := api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end))
|
||||
q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
|
||||
if err != nil {
|
||||
return nil, &apiError{errorExec, err}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package api_v2
|
||||
|
||||
import (
|
||||
native_context "context"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
|
@ -47,7 +48,7 @@ type API struct {
|
|||
enableAdmin bool
|
||||
now func() time.Time
|
||||
db func() *tsdb.DB
|
||||
q func(mint, maxt int64) (storage.Querier, error)
|
||||
q func(ctx native_context.Context, mint, maxt int64) (storage.Querier, error)
|
||||
targets func() []*retrieval.Target
|
||||
alertmanagers func() []*url.URL
|
||||
}
|
||||
|
@ -57,7 +58,7 @@ func New(
|
|||
now func() time.Time,
|
||||
db func() *tsdb.DB,
|
||||
qe *promql.Engine,
|
||||
q func(mint, maxt int64) (storage.Querier, error),
|
||||
q func(ctx native_context.Context, mint, maxt int64) (storage.Querier, error),
|
||||
targets func() []*retrieval.Target,
|
||||
alertmanagers func() []*url.URL,
|
||||
enableAdmin bool,
|
||||
|
|
|
@ -62,7 +62,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
|||
)
|
||||
w.Header().Set("Content-Type", string(format))
|
||||
|
||||
q, err := h.storage.Querier(mint, maxt)
|
||||
q, err := h.storage.Querier(req.Context(), mint, maxt)
|
||||
if err != nil {
|
||||
federationErrors.Inc()
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
|
Loading…
Reference in New Issue