promql: Removed global and add ability to have better interval for subqueries if not specified (#7628)
* promql: Removed global and add ability to have better interval for subqueries if not specified ## Changes * Refactored tests for better hints testing * Added various TODO in places to enhance. * Moved DefaultEvalInterval global to opts with func(rangeMillis int64) int64 function instead Motivation: At Thanos we would love to have better control over the subqueries step/interval. This is important to choose proper resolution. I think having proper step also does not harm for Prometheus and remote read users. Especially on stateless querier we do not know evaluation interval and in fact putting global can be wrong to assume for Prometheus even. I think ideally we could try to have at least 3 samples within the range, the same way Prometheus UI and Grafana assumes. Anyway this interfaces allows to decide on promQL user basis. Open question: Is taking parent interval a smart move? Motivation for removing global: I spent 1h fighting with: === RUN TestEvaluations TestEvaluations: promql_test.go:31: unexpected error: error evaluating query "absent_over_time(rate(nonexistant[5m])[5m:])" (line 687): unexpected error: runtime error: integer divide by zero --- FAIL: TestEvaluations (0.32s) FAIL At the end I found that this fails on most of the versions including this master if you run this test alone. If run together with many other tests it passes. This is due to SetDefaultEvaluationInterval(1 * time.Minute) in test that is ran before TestEvaluations. Thanks to globals (: Let's fix it by dropping this global. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Added issue links for TODOs. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Removed irrelevant changes. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
parent
ffc925dd21
commit
a0df8a383a
|
@ -30,6 +30,7 @@ import (
|
|||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
@ -334,7 +335,8 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
promql.SetDefaultEvaluationInterval(time.Duration(config.DefaultGlobalConfig.EvaluationInterval))
|
||||
noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{}
|
||||
noStepSubqueryInterval.Set(config.DefaultGlobalConfig.EvaluationInterval)
|
||||
|
||||
// Above level 6, the k8s client would log bearer tokens in clear-text.
|
||||
klog.ClampLevel(6)
|
||||
|
@ -367,12 +369,13 @@ func main() {
|
|||
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
|
||||
|
||||
opts = promql.EngineOpts{
|
||||
Logger: log.With(logger, "component", "query engine"),
|
||||
Reg: prometheus.DefaultRegisterer,
|
||||
MaxSamples: cfg.queryMaxSamples,
|
||||
Timeout: time.Duration(cfg.queryTimeout),
|
||||
ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")),
|
||||
LookbackDelta: time.Duration(cfg.lookbackDelta),
|
||||
Logger: log.With(logger, "component", "query engine"),
|
||||
Reg: prometheus.DefaultRegisterer,
|
||||
MaxSamples: cfg.queryMaxSamples,
|
||||
Timeout: time.Duration(cfg.queryTimeout),
|
||||
ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")),
|
||||
LookbackDelta: time.Duration(cfg.lookbackDelta),
|
||||
NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,
|
||||
}
|
||||
|
||||
queryEngine = promql.NewEngine(opts)
|
||||
|
@ -606,11 +609,11 @@ func main() {
|
|||
for {
|
||||
select {
|
||||
case <-hup:
|
||||
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
|
||||
if err := reloadConfig(cfg.configFile, logger, noStepSubqueryInterval, reloaders...); err != nil {
|
||||
level.Error(logger).Log("msg", "Error reloading config", "err", err)
|
||||
}
|
||||
case rc := <-webHandler.Reload():
|
||||
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
|
||||
if err := reloadConfig(cfg.configFile, logger, noStepSubqueryInterval, reloaders...); err != nil {
|
||||
level.Error(logger).Log("msg", "Error reloading config", "err", err)
|
||||
rc <- err
|
||||
} else {
|
||||
|
@ -642,7 +645,7 @@ func main() {
|
|||
return nil
|
||||
}
|
||||
|
||||
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
|
||||
if err := reloadConfig(cfg.configFile, logger, noStepSubqueryInterval, reloaders...); err != nil {
|
||||
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
|
||||
}
|
||||
|
||||
|
@ -797,7 +800,22 @@ func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer,
|
|||
return db, nil
|
||||
}
|
||||
|
||||
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
|
||||
type safePromQLNoStepSubqueryInterval struct {
|
||||
value int64
|
||||
}
|
||||
|
||||
func durationToInt64Millis(d time.Duration) int64 {
|
||||
return int64(d / time.Millisecond)
|
||||
}
|
||||
func (i *safePromQLNoStepSubqueryInterval) Set(ev model.Duration) {
|
||||
atomic.StoreInt64(&i.value, durationToInt64Millis(time.Duration(ev)))
|
||||
}
|
||||
|
||||
func (i *safePromQLNoStepSubqueryInterval) Get(int64) int64 {
|
||||
return atomic.LoadInt64(&i.value)
|
||||
}
|
||||
|
||||
func reloadConfig(filename string, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...func(*config.Config) error) (err error) {
|
||||
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
|
||||
|
||||
defer func() {
|
||||
|
@ -825,7 +843,7 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
|
|||
return errors.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
|
||||
}
|
||||
|
||||
promql.SetDefaultEvaluationInterval(time.Duration(conf.GlobalConfig.EvaluationInterval))
|
||||
noStepSuqueryInterval.Set(conf.GlobalConfig.EvaluationInterval)
|
||||
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
|
||||
return nil
|
||||
}
|
||||
|
|
122
promql/engine.go
122
promql/engine.go
|
@ -24,7 +24,6 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
|
@ -56,22 +55,6 @@ const (
|
|||
minInt64 = -9223372036854775808
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultEvaluationInterval is the default evaluation interval of
|
||||
// a subquery in milliseconds.
|
||||
DefaultEvaluationInterval int64
|
||||
)
|
||||
|
||||
// SetDefaultEvaluationInterval sets DefaultEvaluationInterval.
|
||||
func SetDefaultEvaluationInterval(ev time.Duration) {
|
||||
atomic.StoreInt64(&DefaultEvaluationInterval, durationToInt64Millis(ev))
|
||||
}
|
||||
|
||||
// GetDefaultEvaluationInterval returns the DefaultEvaluationInterval as time.Duration.
|
||||
func GetDefaultEvaluationInterval() int64 {
|
||||
return atomic.LoadInt64(&DefaultEvaluationInterval)
|
||||
}
|
||||
|
||||
type engineMetrics struct {
|
||||
currentQueries prometheus.Gauge
|
||||
maxConcurrentQueries prometheus.Gauge
|
||||
|
@ -221,19 +204,24 @@ type EngineOpts struct {
|
|||
// LookbackDelta determines the time since the last sample after which a time
|
||||
// series is considered stale.
|
||||
LookbackDelta time.Duration
|
||||
|
||||
// NoStepSubqueryIntervalFn is the default evaluation interval of
|
||||
// a subquery in milliseconds if no step in range vector was specified `[30m:<step>]`.
|
||||
NoStepSubqueryIntervalFn func(rangeMillis int64) int64
|
||||
}
|
||||
|
||||
// Engine handles the lifetime of queries from beginning to end.
|
||||
// It is connected to a querier.
|
||||
type Engine struct {
|
||||
logger log.Logger
|
||||
metrics *engineMetrics
|
||||
timeout time.Duration
|
||||
maxSamplesPerQuery int
|
||||
activeQueryTracker *ActiveQueryTracker
|
||||
queryLogger QueryLogger
|
||||
queryLoggerLock sync.RWMutex
|
||||
lookbackDelta time.Duration
|
||||
logger log.Logger
|
||||
metrics *engineMetrics
|
||||
timeout time.Duration
|
||||
maxSamplesPerQuery int
|
||||
activeQueryTracker *ActiveQueryTracker
|
||||
queryLogger QueryLogger
|
||||
queryLoggerLock sync.RWMutex
|
||||
lookbackDelta time.Duration
|
||||
noStepSubqueryIntervalFn func(rangeMillis int64) int64
|
||||
}
|
||||
|
||||
// NewEngine returns a new engine.
|
||||
|
@ -328,12 +316,13 @@ func NewEngine(opts EngineOpts) *Engine {
|
|||
}
|
||||
|
||||
return &Engine{
|
||||
timeout: opts.Timeout,
|
||||
logger: opts.Logger,
|
||||
metrics: metrics,
|
||||
maxSamplesPerQuery: opts.MaxSamples,
|
||||
activeQueryTracker: opts.ActiveQueryTracker,
|
||||
lookbackDelta: opts.LookbackDelta,
|
||||
timeout: opts.Timeout,
|
||||
logger: opts.Logger,
|
||||
metrics: metrics,
|
||||
maxSamplesPerQuery: opts.MaxSamples,
|
||||
activeQueryTracker: opts.ActiveQueryTracker,
|
||||
lookbackDelta: opts.LookbackDelta,
|
||||
noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -525,14 +514,14 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
|
|||
if s.Start == s.End && s.Interval == 0 {
|
||||
start := timeMilliseconds(s.Start)
|
||||
evaluator := &evaluator{
|
||||
startTimestamp: start,
|
||||
endTimestamp: start,
|
||||
interval: 1,
|
||||
ctx: ctxInnerEval,
|
||||
maxSamples: ng.maxSamplesPerQuery,
|
||||
defaultEvalInterval: GetDefaultEvaluationInterval(),
|
||||
logger: ng.logger,
|
||||
lookbackDelta: ng.lookbackDelta,
|
||||
startTimestamp: start,
|
||||
endTimestamp: start,
|
||||
interval: 1,
|
||||
ctx: ctxInnerEval,
|
||||
maxSamples: ng.maxSamplesPerQuery,
|
||||
logger: ng.logger,
|
||||
lookbackDelta: ng.lookbackDelta,
|
||||
noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn,
|
||||
}
|
||||
|
||||
val, warnings, err := evaluator.Eval(s.Expr)
|
||||
|
@ -575,14 +564,14 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
|
|||
|
||||
// Range evaluation.
|
||||
evaluator := &evaluator{
|
||||
startTimestamp: timeMilliseconds(s.Start),
|
||||
endTimestamp: timeMilliseconds(s.End),
|
||||
interval: durationMilliseconds(s.Interval),
|
||||
ctx: ctxInnerEval,
|
||||
maxSamples: ng.maxSamplesPerQuery,
|
||||
defaultEvalInterval: GetDefaultEvaluationInterval(),
|
||||
logger: ng.logger,
|
||||
lookbackDelta: ng.lookbackDelta,
|
||||
startTimestamp: timeMilliseconds(s.Start),
|
||||
endTimestamp: timeMilliseconds(s.End),
|
||||
interval: durationMilliseconds(s.Interval),
|
||||
ctx: ctxInnerEval,
|
||||
maxSamples: ng.maxSamplesPerQuery,
|
||||
logger: ng.logger,
|
||||
lookbackDelta: ng.lookbackDelta,
|
||||
noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn,
|
||||
}
|
||||
val, warnings, err := evaluator.Eval(s.Expr)
|
||||
if err != nil {
|
||||
|
@ -657,7 +646,7 @@ func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) {
|
|||
hints := &storage.SelectHints{
|
||||
Start: timestamp.FromTime(s.Start),
|
||||
End: timestamp.FromTime(s.End),
|
||||
Step: durationToInt64Millis(s.Interval),
|
||||
Step: durationMilliseconds(s.Interval),
|
||||
}
|
||||
|
||||
// We need to make sure we select the timerange selected by the subquery.
|
||||
|
@ -769,11 +758,11 @@ type evaluator struct {
|
|||
endTimestamp int64 // End time in milliseconds.
|
||||
interval int64 // Interval in milliseconds.
|
||||
|
||||
maxSamples int
|
||||
currentSamples int
|
||||
defaultEvalInterval int64
|
||||
logger log.Logger
|
||||
lookbackDelta time.Duration
|
||||
maxSamples int
|
||||
currentSamples int
|
||||
logger log.Logger
|
||||
lookbackDelta time.Duration
|
||||
noStepSubqueryIntervalFn func(rangeMillis int64) int64
|
||||
}
|
||||
|
||||
// errorf causes a panic with the input formatted into an error.
|
||||
|
@ -1333,21 +1322,22 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
|
|||
return ev.matrixSelector(e)
|
||||
|
||||
case *parser.SubqueryExpr:
|
||||
offsetMillis := durationToInt64Millis(e.Offset)
|
||||
rangeMillis := durationToInt64Millis(e.Range)
|
||||
offsetMillis := durationMilliseconds(e.Offset)
|
||||
rangeMillis := durationMilliseconds(e.Range)
|
||||
newEv := &evaluator{
|
||||
endTimestamp: ev.endTimestamp - offsetMillis,
|
||||
interval: ev.defaultEvalInterval,
|
||||
ctx: ev.ctx,
|
||||
currentSamples: ev.currentSamples,
|
||||
maxSamples: ev.maxSamples,
|
||||
defaultEvalInterval: ev.defaultEvalInterval,
|
||||
logger: ev.logger,
|
||||
lookbackDelta: ev.lookbackDelta,
|
||||
endTimestamp: ev.endTimestamp - offsetMillis,
|
||||
ctx: ev.ctx,
|
||||
currentSamples: ev.currentSamples,
|
||||
maxSamples: ev.maxSamples,
|
||||
logger: ev.logger,
|
||||
lookbackDelta: ev.lookbackDelta,
|
||||
noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn,
|
||||
}
|
||||
|
||||
if e.Step != 0 {
|
||||
newEv.interval = durationToInt64Millis(e.Step)
|
||||
newEv.interval = durationMilliseconds(e.Step)
|
||||
} else {
|
||||
newEv.interval = ev.noStepSubqueryIntervalFn(rangeMillis)
|
||||
}
|
||||
|
||||
// Start with the first timestamp after (ev.startTimestamp - offset - range)
|
||||
|
@ -1367,10 +1357,6 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
|
|||
panic(errors.Errorf("unhandled expression of type: %T", expr))
|
||||
}
|
||||
|
||||
func durationToInt64Millis(d time.Duration) int64 {
|
||||
return int64(d / time.Millisecond)
|
||||
}
|
||||
|
||||
// vectorSelector evaluates a *parser.VectorSelector expression.
|
||||
func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) {
|
||||
ws, err := checkAndExpandSeriesSet(ev.ctx, node)
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
|
@ -1114,30 +1113,27 @@ func TestSubquerySelector(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
SetDefaultEvaluationInterval(1 * time.Minute)
|
||||
for _, tst := range tests {
|
||||
test, err := NewTest(t, tst.loadString)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
defer test.Close()
|
||||
|
||||
err = test.Run()
|
||||
testutil.Ok(t, err)
|
||||
|
||||
engine := test.QueryEngine()
|
||||
for _, c := range tst.cases {
|
||||
var err error
|
||||
var qry Query
|
||||
|
||||
qry, err = engine.NewInstantQuery(test.Queryable(), c.Query, c.Start)
|
||||
t.Run("", func(t *testing.T) {
|
||||
test, err := NewTest(t, tst.loadString)
|
||||
testutil.Ok(t, err)
|
||||
defer test.Close()
|
||||
|
||||
res := qry.Exec(test.Context())
|
||||
testutil.Equals(t, c.Result.Err, res.Err)
|
||||
mat := res.Value.(Matrix)
|
||||
sort.Sort(mat)
|
||||
testutil.Equals(t, c.Result.Value, mat)
|
||||
}
|
||||
testutil.Ok(t, test.Run())
|
||||
engine := test.QueryEngine()
|
||||
for _, c := range tst.cases {
|
||||
t.Run(c.Query, func(t *testing.T) {
|
||||
qry, err := engine.NewInstantQuery(test.Queryable(), c.Query, c.Start)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
res := qry.Exec(test.Context())
|
||||
testutil.Equals(t, c.Result.Err, res.Err, "errors do not match for query %s", c.Query)
|
||||
mat := res.Value.(Matrix)
|
||||
sort.Sort(mat)
|
||||
testutil.Equals(t, c.Result.Value, mat)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -518,10 +518,11 @@ func (t *Test) clear() {
|
|||
t.storage = teststorage.New(t)
|
||||
|
||||
opts := EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxSamples: 10000,
|
||||
Timeout: 100 * time.Second,
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxSamples: 10000,
|
||||
Timeout: 100 * time.Second,
|
||||
NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) },
|
||||
}
|
||||
|
||||
t.queryEngine = NewEngine(opts)
|
||||
|
|
Loading…
Reference in New Issue