From baf5b0f0fccb0f597385f4a48986255d112901b8 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 13 Jun 2017 10:52:27 +0530 Subject: [PATCH] Fix error where we look into the future. (#2829) * Fix error where we look into the future. So currently we are adding values that are in the future for an older timestamp. For example, if we have [(1, 1), (150, 2)] we will end up showing [(1, 1), (2,2)]. Further it is not advisable to call .At() after Next() returns false. Signed-off-by: Goutham Veeramachaneni * Retuen early if done Signed-off-by: Goutham Veeramachaneni * Handle Seek() where we reach the end of iterator Signed-off-by: Goutham Veeramachaneni * Simplify code Signed-off-by: Goutham Veeramachaneni --- promql/engine_test.go | 12 +++++++++++- storage/buffer.go | 21 +++++++++++++++------ storage/buffer_test.go | 36 ++++++++++++++++++++++++++++-------- 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/promql/engine_test.go b/promql/engine_test.go index e6794f455b..ad3bb0513d 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -253,13 +253,23 @@ load 10s { Query: "metric", Result: Matrix{Series{ - Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 2, T: 2000}}, + Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}}, Metric: labels.FromStrings("__name__", "metric")}, }, Start: time.Unix(0, 0), End: time.Unix(2, 0), Interval: time.Second, }, + { + Query: "metric", + Result: Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric")}, + }, + Start: time.Unix(0, 0), + End: time.Unix(10, 0), + Interval: 5 * time.Second, + }, } for _, c := range cases { diff --git a/storage/buffer.go b/storage/buffer.go index d3687d94fd..0159cfa3fb 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -13,7 +13,9 @@ package storage -import "math" +import ( + "math" +) // BufferedSeriesIterator wraps an iterator with a look-back buffer. type BufferedSeriesIterator struct { @@ -21,6 +23,7 @@ type BufferedSeriesIterator struct { buf *sampleRing lastTime int64 + ok bool } // NewBuffer returns a new iterator that buffers the values within the time range @@ -30,6 +33,7 @@ func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { it: it, buf: newSampleRing(delta, 16), lastTime: math.MinInt64, + ok: true, } it.Next() @@ -56,8 +60,8 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool { if t0 > b.lastTime { b.buf.reset() - ok := b.it.Seek(t0) - if !ok { + b.ok = b.it.Seek(t0) + if !b.ok { return false } b.lastTime, _ = b.Values() @@ -77,14 +81,19 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool { // Next advances the iterator to the next element. func (b *BufferedSeriesIterator) Next() bool { + if !b.ok { + return false + } + // Add current element to buffer before advancing. b.buf.add(b.it.At()) - ok := b.it.Next() - if ok { + b.ok = b.it.Next() + if b.ok { b.lastTime, _ = b.Values() } - return ok + + return b.ok } // Values returns the current element of the iterator. diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 6855a1daea..a03081ab6a 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -137,6 +137,26 @@ func TestBufferedSeriesIterator(t *testing.T) { require.False(t, it.Next(), "next succeeded unexpectedly") } +// At() should not be called once Next() returns false. +func TestBufferedSeriesIteratorNoBadAt(t *testing.T) { + done := false + + m := &mockSeriesIterator{ + seek: func(int64) bool { return false }, + at: func() (int64, float64) { + require.False(t, done) + done = true + return 0, 0 + }, + next: func() bool { return !done }, + err: func() error { return nil }, + } + + it := NewBuffer(m, 60) + it.Next() + it.Next() +} + func BenchmarkBufferedSeriesIterator(b *testing.B) { var ( samples []sample @@ -165,16 +185,16 @@ func BenchmarkBufferedSeriesIterator(b *testing.B) { } type mockSeriesIterator struct { - seek func(int64) bool - values func() (int64, float64) - next func() bool - err func() error + seek func(int64) bool + at func() (int64, float64) + next func() bool + err func() error } -func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) } -func (m *mockSeriesIterator) Values() (int64, float64) { return m.values() } -func (m *mockSeriesIterator) Next() bool { return m.next() } -func (m *mockSeriesIterator) Err() error { return m.err() } +func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) } +func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } +func (m *mockSeriesIterator) Next() bool { return m.next() } +func (m *mockSeriesIterator) Err() error { return m.err() } type mockSeries struct { labels func() labels.Labels