diff --git a/querier.go b/querier.go index bb282d4c1..1759a9212 100644 --- a/querier.go +++ b/querier.go @@ -466,8 +466,8 @@ func (s *chunkSeries) Iterator() SeriesIterator { // SeriesIterator iterates over the data of a time series. type SeriesIterator interface { // Seek advances the iterator forward to the given timestamp. - // If there's no value exactly at ts, it advances to the last value - // before tt. + // If there's no value exactly at t, it advances to the first value + // after t. Seek(t int64) bool // At returns the current timestamp/value pair. At() (t int64, v float64) @@ -488,7 +488,7 @@ func (s *chainedSeries) Labels() labels.Labels { } func (s *chainedSeries) Iterator() SeriesIterator { - return &chainedSeriesIterator{series: s.series} + return newChainedSeriesIterator(s.series...) } // chainedSeriesIterator implements a series iterater over a list @@ -500,6 +500,14 @@ type chainedSeriesIterator struct { cur SeriesIterator } +func newChainedSeriesIterator(s ...Series) *chainedSeriesIterator { + return &chainedSeriesIterator{ + series: s, + i: 0, + cur: s[0].Iterator(), + } +} + func (it *chainedSeriesIterator) Seek(t int64) bool { // We just scan the chained series sequentially as they are already // pre-selected by relevant time and should be accessed sequentially anyway. @@ -516,9 +524,6 @@ func (it *chainedSeriesIterator) Seek(t int64) bool { } func (it *chainedSeriesIterator) Next() bool { - if it.cur == nil { - it.cur = it.series[it.i].Iterator() - } if it.cur.Next() { return true } @@ -569,10 +574,10 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { // If the timestamp was not found, it might be in the last chunk. if x == len(it.chunks) { x-- - } - // Go to previous chunk if the chunk doesn't exactly start with t. - // If we are already at the first chunk, we use it as it's the best we have. - if x > 0 && it.chunks[x].MinTime > t { + + // Go to previous chunk if the chunk doesn't exactly start with t. + // If we are already at the first chunk, we use it as it's the best we have. + } else if x > 0 && it.chunks[x].MinTime > t { x-- } diff --git a/querier_test.go b/querier_test.go index 87e81c8be..248bfa45b 100644 --- a/querier_test.go +++ b/querier_test.go @@ -4,6 +4,7 @@ import ( "sort" "testing" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" "github.com/stretchr/testify/require" ) @@ -200,3 +201,404 @@ func expandSeriesIterator(it SeriesIterator) (r []sample, err error) { return r, it.Err() } + + +func TestBaseChunkSeries(t *testing.T) { + type refdSeries struct { + lset labels.Labels + chunks []*ChunkMeta + + ref uint32 + } + + cases := []struct { + series []refdSeries + // Postings should be in the sorted order of the the series + postings []uint32 + + expIdxs []int + }{ + { + series: []refdSeries{ + { + lset: labels.New([]labels.Label{{"a", "a"}}...), + chunks: []*ChunkMeta{ + {Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344}, + {Ref: 121}, + }, + ref: 12, + }, + { + lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), + chunks: []*ChunkMeta{ + {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, + }, + ref: 10, + }, + { + lset: labels.New([]labels.Label{{"b", "c"}}...), + chunks: []*ChunkMeta{{Ref: 8282}}, + ref: 1, + }, + { + lset: labels.New([]labels.Label{{"b", "b"}}...), + chunks: []*ChunkMeta{ + {Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269}, + }, + ref: 108, + }, + }, + postings: []uint32{12, 10, 108}, + + expIdxs: []int{0, 1, 3}, + }, + { + series: []refdSeries{ + { + lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), + chunks: []*ChunkMeta{ + {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, + }, + ref: 10, + }, + { + lset: labels.New([]labels.Label{{"b", "c"}}...), + chunks: []*ChunkMeta{{Ref: 8282}}, + ref: 1, + }, + }, + postings: []uint32{}, + + expIdxs: []int{}, + }, + } + + for _, tc := range cases { + mi := newMockIndex() + for _, s := range tc.series { + mi.AddSeries(s.ref, s.lset, s.chunks...) + } + + bcs := &baseChunkSeries{ + p: newListPostings(tc.postings), + index: mi, + } + + i := 0 + for bcs.Next() { + lset, chks := bcs.At() + + idx := tc.expIdxs[i] + + require.Equal(t, tc.series[idx].lset, lset) + require.Equal(t, tc.series[idx].chunks, chks) + + i++ + } + require.Equal(t, len(tc.expIdxs), i) + require.NoError(t, bcs.Err()) + } + + return +} + +// TODO: Remove after simpleSeries is merged +type itSeries struct { + si SeriesIterator +} + +func (s itSeries) Iterator() SeriesIterator { return s.si } +func (s itSeries) Labels() labels.Labels { return labels.Labels{} } + +func chunkFromSamples(s []sample) *ChunkMeta { + mint, maxt := int64(0), int64(0) + + if len(s) > 0 { + mint, maxt = s[0].t, s[len(s)-1].t + } + + c := chunks.NewXORChunk() + ca, _ := c.Appender() + + for _, s := range s { + ca.Append(s.t, s.v) + } + return &ChunkMeta{ + MinTime: mint, + MaxTime: maxt, + + Chunk: c, + } +} + +func TestSeriesIterator(t *testing.T) { + itcases := []struct { + a, b, c []sample + exp []sample + }{ + { + a: []sample{}, + b: []sample{}, + c: []sample{}, + + exp: []sample{}, + }, + { + a: []sample{ + {1, 2}, + {2, 3}, + {3, 5}, + {6, 1}, + }, + b: []sample{}, + c: []sample{ + {7, 89}, + {9, 8}, + }, + + exp: []sample{ + {1, 2}, + {2, 3}, + {3, 5}, + {6, 1}, + {7, 89}, + {9, 8}, + }, + }, + { + a: []sample{}, + b: []sample{ + {1, 2}, + {2, 3}, + {3, 5}, + {6, 1}, + }, + c: []sample{ + {7, 89}, + {9, 8}, + }, + + exp: []sample{ + {1, 2}, + {2, 3}, + {3, 5}, + {6, 1}, + {7, 89}, + {9, 8}, + }, + }, + { + a: []sample{ + {1, 2}, + {2, 3}, + {3, 5}, + {6, 1}, + }, + b: []sample{ + {7, 89}, + {9, 8}, + }, + c: []sample{ + {10, 22}, + {203, 3493}, + }, + + exp: []sample{ + {1, 2}, + {2, 3}, + {3, 5}, + {6, 1}, + {7, 89}, + {9, 8}, + {10, 22}, + {203, 3493}, + }, + }, + } + + seekcases := []struct { + a, b, c []sample + + seek int64 + success bool + exp []sample + }{ + { + a: []sample{}, + b: []sample{}, + c: []sample{}, + + seek: 0, + success: false, + exp: nil, + }, + { + a: []sample{ + {2, 3}, + }, + b: []sample{}, + c: []sample{ + {7, 89}, + {9, 8}, + }, + + seek: 10, + success: false, + exp: nil, + }, + { + a: []sample{}, + b: []sample{ + {1, 2}, + {3, 5}, + {6, 1}, + }, + c: []sample{ + {7, 89}, + {9, 8}, + }, + + seek: 2, + success: true, + exp: []sample{ + {3, 5}, + {6, 1}, + {7, 89}, + {9, 8}, + }, + }, + { + a: []sample{ + {6, 1}, + }, + b: []sample{ + {9, 8}, + }, + c: []sample{ + {10, 22}, + {203, 3493}, + }, + + seek: 10, + success: true, + exp: []sample{ + {10, 22}, + {203, 3493}, + }, + }, + { + a: []sample{ + {6, 1}, + }, + b: []sample{ + {9, 8}, + }, + c: []sample{ + {10, 22}, + {203, 3493}, + }, + + seek: 203, + success: true, + exp: []sample{ + {203, 3493}, + }, + }, + } + + t.Run("Chunk", func(t *testing.T) { + for _, tc := range itcases { + chkMetas := []*ChunkMeta{ + chunkFromSamples(tc.a), + chunkFromSamples(tc.b), + chunkFromSamples(tc.c), + } + res := newChunkSeriesIterator(chkMetas) + exp := newListSeriesIterator(tc.exp) + + smplExp, errExp := expandSeriesIterator(exp) + smplRes, errRes := expandSeriesIterator(res) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + + t.Run("Seek", func(t *testing.T) { + for _, tc := range seekcases { + chkMetas := []*ChunkMeta{ + chunkFromSamples(tc.a), + chunkFromSamples(tc.b), + chunkFromSamples(tc.c), + } + res := newChunkSeriesIterator(chkMetas) + exp := newListSeriesIterator(tc.exp) + + require.Equal(t, tc.success, res.Seek(tc.seek)) + + if tc.success { + // Init the list and then proceed to check. + remaining := exp.Next() + require.True(t, remaining) + + for remaining { + sExp, eExp := exp.At() + sRes, eRes := res.At() + require.Equal(t, eExp, eRes, "samples error") + require.Equal(t, sExp, sRes, "samples") + + remaining = exp.Next() + require.Equal(t, remaining, res.Next()) + } + } + } + }) + }) + + t.Run("Chain", func(t *testing.T) { + for _, tc := range itcases { + a, b, c := itSeries{newListSeriesIterator(tc.a)}, + itSeries{newListSeriesIterator(tc.b)}, + itSeries{newListSeriesIterator(tc.c)} + + res := newChainedSeriesIterator(a, b, c) + exp := newListSeriesIterator(tc.exp) + + smplExp, errExp := expandSeriesIterator(exp) + smplRes, errRes := expandSeriesIterator(res) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + + t.Run("Seek", func(t *testing.T) { + for _, tc := range seekcases { + a, b, c := itSeries{newListSeriesIterator(tc.a)}, + itSeries{newListSeriesIterator(tc.b)}, + itSeries{newListSeriesIterator(tc.c)} + + res := newChainedSeriesIterator(a, b, c) + exp := newListSeriesIterator(tc.exp) + + require.Equal(t, tc.success, res.Seek(tc.seek)) + + if tc.success { + // Init the list and then proceed to check. + remaining := exp.Next() + require.True(t, remaining) + + for remaining { + sExp, eExp := exp.At() + sRes, eRes := res.At() + require.Equal(t, eExp, eRes, "samples error") + require.Equal(t, sExp, sRes, "samples") + + remaining = exp.Next() + require.Equal(t, remaining, res.Next()) + } + } + } + }) + }) + + return +}