diff --git a/promql/engine.go b/promql/engine.go index 046b97c21..283538ccd 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -552,6 +552,14 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev Step: durationToInt64Millis(s.Interval), } + // We need to make sure we select the timerange selected by the subquery. + // TODO(gouthamve): cumulativeSubqueryOffset gives the sum of range and the offset + // we can optimise it by separating out the range and offsets, and substracting the offsets + // from end also. + subqOffset := ng.cumulativeSubqueryOffset(path) + offsetMilliseconds := durationMilliseconds(subqOffset) + params.Start = params.Start - offsetMilliseconds + switch n := node.(type) { case *VectorSelector: params.Start = params.Start - durationMilliseconds(LookbackDelta) diff --git a/promql/engine_test.go b/promql/engine_test.go index 08ad7c241..68481ed85 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -227,6 +227,166 @@ func TestQueryError(t *testing.T) { } } +// paramCheckerQuerier implements storage.Querier which checks the start and end times +// in params. +type paramCheckerQuerier struct { + start int64 + end int64 + + t *testing.T +} + +func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + testutil.Equals(q.t, q.start, sp.Start) + testutil.Equals(q.t, q.end, sp.End) + + return errSeriesSet{err: nil}, nil, nil +} +func (*paramCheckerQuerier) LabelValues(name string) ([]string, error) { return nil, nil } +func (*paramCheckerQuerier) LabelNames() ([]string, error) { return nil, nil } +func (*paramCheckerQuerier) Close() error { return nil } + +func TestParamsSetCorrectly(t *testing.T) { + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + + // Set the lookback to be smaller and reset at the end. + currLookback := LookbackDelta + LookbackDelta = 5 * time.Second + defer func() { + LookbackDelta = currLookback + }() + + cases := []struct { + query string + + // All times are in seconds. + start int64 + end int64 + + paramStart int64 + paramEnd int64 + }{{ + query: "foo", + start: 10, + + paramStart: 5, + paramEnd: 10, + }, { + query: "foo[2m]", + start: 200, + + paramStart: 80, // 200 - 120 + paramEnd: 200, + }, { + query: "foo[2m] offset 2m", + start: 300, + + paramStart: 60, + paramEnd: 180, + }, { + query: "foo[2m:1s]", + start: 300, + + paramStart: 175, // 300 - 120 - 5 + paramEnd: 300, + }, { + query: "count_over_time(foo[2m:1s])", + start: 300, + + paramStart: 175, // 300 - 120 - 5 + paramEnd: 300, + }, { + query: "count_over_time(foo[2m:1s] offset 10s)", + start: 300, + + paramStart: 165, // 300 - 120 - 5 - 10 + paramEnd: 300, + }, { + query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)", + start: 300, + + paramStart: 155, // 300 - 120 - 5 - 10 - 10 + paramEnd: 290, + }, { + // Range queries now. + query: "foo", + start: 10, + end: 20, + + paramStart: 5, + paramEnd: 20, + }, { + query: "rate(foo[2m])", + start: 200, + end: 500, + + paramStart: 80, // 200 - 120 + paramEnd: 500, + }, { + query: "rate(foo[2m] offset 2m)", + start: 300, + end: 500, + + paramStart: 60, + paramEnd: 380, + }, { + query: "rate(foo[2m:1s])", + start: 300, + end: 500, + + paramStart: 175, // 300 - 120 - 5 + paramEnd: 500, + }, { + query: "count_over_time(foo[2m:1s])", + start: 300, + end: 500, + + paramStart: 175, // 300 - 120 - 5 + paramEnd: 500, + }, { + query: "count_over_time(foo[2m:1s] offset 10s)", + start: 300, + end: 500, + + paramStart: 165, // 300 - 120 - 5 - 10 + paramEnd: 500, + }, { + query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)", + start: 300, + end: 500, + + paramStart: 155, // 300 - 120 - 5 - 10 - 10 + paramEnd: 490, + }} + + for _, tc := range cases { + engine := NewEngine(opts) + queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return ¶mCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, t: t}, nil + }) + + var ( + query Query + err error + ) + if tc.end == 0 { + query, err = engine.NewInstantQuery(queryable, tc.query, time.Unix(tc.start, 0)) + } else { + query, err = engine.NewRangeQuery(queryable, tc.query, time.Unix(tc.start, 0), time.Unix(tc.end, 0), time.Second) + } + testutil.Ok(t, err) + + res := query.Exec(context.Background()) + testutil.Ok(t, res.Err) + } +} + func TestEngineShutdown(t *testing.T) { opts := EngineOpts{ Logger: nil,