Make sure subquery range is taken into account for selection (#5467)
* Make sure subquery range is taken into account for selection Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
This commit is contained in:
parent
ad71f2785f
commit
3da718c21a
|
@ -552,6 +552,14 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
|
||||||
Step: durationToInt64Millis(s.Interval),
|
Step: durationToInt64Millis(s.Interval),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We need to make sure we select the timerange selected by the subquery.
|
||||||
|
// TODO(gouthamve): cumulativeSubqueryOffset gives the sum of range and the offset
|
||||||
|
// we can optimise it by separating out the range and offsets, and substracting the offsets
|
||||||
|
// from end also.
|
||||||
|
subqOffset := ng.cumulativeSubqueryOffset(path)
|
||||||
|
offsetMilliseconds := durationMilliseconds(subqOffset)
|
||||||
|
params.Start = params.Start - offsetMilliseconds
|
||||||
|
|
||||||
switch n := node.(type) {
|
switch n := node.(type) {
|
||||||
case *VectorSelector:
|
case *VectorSelector:
|
||||||
params.Start = params.Start - durationMilliseconds(LookbackDelta)
|
params.Start = params.Start - durationMilliseconds(LookbackDelta)
|
||||||
|
|
|
@ -227,6 +227,166 @@ func TestQueryError(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// paramCheckerQuerier implements storage.Querier which checks the start and end times
|
||||||
|
// in params.
|
||||||
|
type paramCheckerQuerier struct {
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
|
||||||
|
t *testing.T
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||||
|
testutil.Equals(q.t, q.start, sp.Start)
|
||||||
|
testutil.Equals(q.t, q.end, sp.End)
|
||||||
|
|
||||||
|
return errSeriesSet{err: nil}, nil, nil
|
||||||
|
}
|
||||||
|
func (*paramCheckerQuerier) LabelValues(name string) ([]string, error) { return nil, nil }
|
||||||
|
func (*paramCheckerQuerier) LabelNames() ([]string, error) { return nil, nil }
|
||||||
|
func (*paramCheckerQuerier) Close() error { return nil }
|
||||||
|
|
||||||
|
func TestParamsSetCorrectly(t *testing.T) {
|
||||||
|
opts := EngineOpts{
|
||||||
|
Logger: nil,
|
||||||
|
Reg: nil,
|
||||||
|
MaxConcurrent: 10,
|
||||||
|
MaxSamples: 10,
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the lookback to be smaller and reset at the end.
|
||||||
|
currLookback := LookbackDelta
|
||||||
|
LookbackDelta = 5 * time.Second
|
||||||
|
defer func() {
|
||||||
|
LookbackDelta = currLookback
|
||||||
|
}()
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
query string
|
||||||
|
|
||||||
|
// All times are in seconds.
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
|
||||||
|
paramStart int64
|
||||||
|
paramEnd int64
|
||||||
|
}{{
|
||||||
|
query: "foo",
|
||||||
|
start: 10,
|
||||||
|
|
||||||
|
paramStart: 5,
|
||||||
|
paramEnd: 10,
|
||||||
|
}, {
|
||||||
|
query: "foo[2m]",
|
||||||
|
start: 200,
|
||||||
|
|
||||||
|
paramStart: 80, // 200 - 120
|
||||||
|
paramEnd: 200,
|
||||||
|
}, {
|
||||||
|
query: "foo[2m] offset 2m",
|
||||||
|
start: 300,
|
||||||
|
|
||||||
|
paramStart: 60,
|
||||||
|
paramEnd: 180,
|
||||||
|
}, {
|
||||||
|
query: "foo[2m:1s]",
|
||||||
|
start: 300,
|
||||||
|
|
||||||
|
paramStart: 175, // 300 - 120 - 5
|
||||||
|
paramEnd: 300,
|
||||||
|
}, {
|
||||||
|
query: "count_over_time(foo[2m:1s])",
|
||||||
|
start: 300,
|
||||||
|
|
||||||
|
paramStart: 175, // 300 - 120 - 5
|
||||||
|
paramEnd: 300,
|
||||||
|
}, {
|
||||||
|
query: "count_over_time(foo[2m:1s] offset 10s)",
|
||||||
|
start: 300,
|
||||||
|
|
||||||
|
paramStart: 165, // 300 - 120 - 5 - 10
|
||||||
|
paramEnd: 300,
|
||||||
|
}, {
|
||||||
|
query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)",
|
||||||
|
start: 300,
|
||||||
|
|
||||||
|
paramStart: 155, // 300 - 120 - 5 - 10 - 10
|
||||||
|
paramEnd: 290,
|
||||||
|
}, {
|
||||||
|
// Range queries now.
|
||||||
|
query: "foo",
|
||||||
|
start: 10,
|
||||||
|
end: 20,
|
||||||
|
|
||||||
|
paramStart: 5,
|
||||||
|
paramEnd: 20,
|
||||||
|
}, {
|
||||||
|
query: "rate(foo[2m])",
|
||||||
|
start: 200,
|
||||||
|
end: 500,
|
||||||
|
|
||||||
|
paramStart: 80, // 200 - 120
|
||||||
|
paramEnd: 500,
|
||||||
|
}, {
|
||||||
|
query: "rate(foo[2m] offset 2m)",
|
||||||
|
start: 300,
|
||||||
|
end: 500,
|
||||||
|
|
||||||
|
paramStart: 60,
|
||||||
|
paramEnd: 380,
|
||||||
|
}, {
|
||||||
|
query: "rate(foo[2m:1s])",
|
||||||
|
start: 300,
|
||||||
|
end: 500,
|
||||||
|
|
||||||
|
paramStart: 175, // 300 - 120 - 5
|
||||||
|
paramEnd: 500,
|
||||||
|
}, {
|
||||||
|
query: "count_over_time(foo[2m:1s])",
|
||||||
|
start: 300,
|
||||||
|
end: 500,
|
||||||
|
|
||||||
|
paramStart: 175, // 300 - 120 - 5
|
||||||
|
paramEnd: 500,
|
||||||
|
}, {
|
||||||
|
query: "count_over_time(foo[2m:1s] offset 10s)",
|
||||||
|
start: 300,
|
||||||
|
end: 500,
|
||||||
|
|
||||||
|
paramStart: 165, // 300 - 120 - 5 - 10
|
||||||
|
paramEnd: 500,
|
||||||
|
}, {
|
||||||
|
query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)",
|
||||||
|
start: 300,
|
||||||
|
end: 500,
|
||||||
|
|
||||||
|
paramStart: 155, // 300 - 120 - 5 - 10 - 10
|
||||||
|
paramEnd: 490,
|
||||||
|
}}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
engine := NewEngine(opts)
|
||||||
|
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||||
|
return ¶mCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, t: t}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
var (
|
||||||
|
query Query
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if tc.end == 0 {
|
||||||
|
query, err = engine.NewInstantQuery(queryable, tc.query, time.Unix(tc.start, 0))
|
||||||
|
} else {
|
||||||
|
query, err = engine.NewRangeQuery(queryable, tc.query, time.Unix(tc.start, 0), time.Unix(tc.end, 0), time.Second)
|
||||||
|
}
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
res := query.Exec(context.Background())
|
||||||
|
testutil.Ok(t, res.Err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestEngineShutdown(t *testing.T) {
|
func TestEngineShutdown(t *testing.T) {
|
||||||
opts := EngineOpts{
|
opts := EngineOpts{
|
||||||
Logger: nil,
|
Logger: nil,
|
||||||
|
|
Loading…
Reference in New Issue