diff --git a/promql/engine_test.go b/promql/engine_test.go index 9dea17315..d1afb7eb3 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -2327,3 +2327,111 @@ func TestEngineOptsValidation(t *testing.T) { } } } + +func TestRangeQuery(t *testing.T) { + cases := []struct { + Name string + Load string + Query string + Result parser.Value + Start time.Time + End time.Time + Interval time.Duration + }{ + { + Name: "sum_over_time with all values", + Load: `load 30s + bar 0 1 10 100 1000`, + Query: "sum_over_time(bar[30s])", + Result: Matrix{Series{ + Points: []Point{{V: 0, T: 0}, {V: 11, T: 60000}, {V: 1100, T: 120000}}, + Metric: labels.Labels{}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(120, 0), + Interval: 60 * time.Second, + }, + { + Name: "sum_over_time with trailing values", + Load: `load 30s + bar 0 1 10 100 1000 0 0 0 0`, + Query: "sum_over_time(bar[30s])", + Result: Matrix{Series{ + Points: []Point{{V: 0, T: 0}, {V: 11, T: 60000}, {V: 1100, T: 120000}}, + Metric: labels.Labels{}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(120, 0), + Interval: 60 * time.Second, + }, + { + Name: "sum_over_time with all values long", + Load: `load 30s + bar 0 1 10 100 1000 10000 100000 1000000 10000000`, + Query: "sum_over_time(bar[30s])", + Result: Matrix{Series{ + Points: []Point{{V: 0, T: 0}, {V: 11, T: 60000}, {V: 1100, T: 120000}, {V: 110000, T: 180000}, {V: 11000000, T: 240000}}, + Metric: labels.Labels{}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(240, 0), + Interval: 60 * time.Second, + }, + { + Name: "sum_over_time with all values random", + Load: `load 30s + bar 5 17 42 2 7 905 51`, + Query: "sum_over_time(bar[30s])", + Result: Matrix{Series{ + Points: []Point{{V: 5, T: 0}, {V: 59, T: 60000}, {V: 9, T: 120000}, {V: 956, T: 180000}}, + Metric: labels.Labels{}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(180, 0), + Interval: 60 * time.Second, + }, + { + Name: "metric query", + Load: `load 30s + metric 1+1x4`, + Query: "metric", + Result: Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 3, T: 60000}, {V: 5, T: 120000}}, + Metric: labels.Labels{labels.Label{Name: "__name__", Value: "metric"}}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(120, 0), + Interval: 1 * time.Minute, + }, + { + Name: "metric query with trailing values", + Load: `load 30s + metric 1+1x8`, + Query: "metric", + Result: Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 3, T: 60000}, {V: 5, T: 120000}}, + Metric: labels.Labels{labels.Label{Name: "__name__", Value: "metric"}}}, + }, + Start: time.Unix(0, 0), + End: time.Unix(120, 0), + Interval: 1 * time.Minute, + }, + } + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + test, err := NewTest(t, c.Load) + require.NoError(t, err) + defer test.Close() + + err = test.Run() + require.NoError(t, err) + + qry, err := test.QueryEngine().NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval) + require.NoError(t, err) + + res := qry.Exec(test.Context()) + require.NoError(t, res.Err) + require.Equal(t, c.Result, res.Value) + }) + } +} diff --git a/tsdb/head.go b/tsdb/head.go index 2a300ce2d..595cd48d7 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2538,6 +2538,23 @@ type memSafeIterator struct { buf [4]sample } +func (it *memSafeIterator) Seek(t int64) bool { + if it.Err() != nil { + return false + } + + ts, _ := it.At() + + for t > ts || it.i == -1 { + if !it.Next() { + return false + } + ts, _ = it.At() + } + + return true +} + func (it *memSafeIterator) Next() bool { if it.i+1 >= it.stopAfter { return false diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 5e1d73c2f..dfd916b76 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2067,3 +2067,71 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { require.Equal(b, 9, len(actualValues)) } } + +func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { + dir, err := ioutil.TempDir("", "iterator_seek") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + // This is usually taken from the Head, but passing manually here. + chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize) + require.NoError(t, err) + defer func() { + require.NoError(t, chunkDiskMapper.Close()) + }() + + s := newMemSeries(labels.Labels{}, 1, 500, nil) + + for i := 0; i < 7; i++ { + ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) + require.True(t, ok, "sample append failed") + } + + it := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) + _, ok := it.(*memSafeIterator) + require.True(t, ok) + + // First point. + ok = it.Seek(0) + require.True(t, ok) + ts, val := it.At() + require.Equal(t, int64(0), ts) + require.Equal(t, float64(0), val) + + // Advance one point. + ok = it.Next() + require.True(t, ok) + ts, val = it.At() + require.Equal(t, int64(1), ts) + require.Equal(t, float64(1), val) + + // Seeking an older timestamp shouldn't cause the iterator to go backwards. + ok = it.Seek(0) + require.True(t, ok) + ts, val = it.At() + require.Equal(t, int64(1), ts) + require.Equal(t, float64(1), val) + + // Seek into the buffer. + ok = it.Seek(3) + require.True(t, ok) + ts, val = it.At() + require.Equal(t, int64(3), ts) + require.Equal(t, float64(3), val) + + // Iterate through the rest of the buffer. + for i := 4; i < 7; i++ { + ok = it.Next() + require.True(t, ok) + ts, val = it.At() + require.Equal(t, int64(i), ts) + require.Equal(t, float64(i), val) + } + + // Run out of elements in the iterator. + ok = it.Next() + require.False(t, ok) + ok = it.Seek(7) + require.False(t, ok) +}