From 2841499212791d35330aa37b9e076e0762dfd85c Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 7 Mar 2017 11:29:20 +0100 Subject: [PATCH] Simplify and split up iterators This simplifies some of the iterators by loading chunks from the ChunkReader earlier, filtering of chunks vs filtering or series is split into separate iterators for easier testing --- querier.go | 178 +++++++++++++++++++++++++++++++++-------------------- rw_test.go | 46 +++++++++++++- 2 files changed, 155 insertions(+), 69 deletions(-) diff --git a/querier.go b/querier.go index 9b614e184..a397b9a63 100644 --- a/querier.go +++ b/querier.go @@ -76,6 +76,9 @@ func (s *DB) Querier(mint, maxt int64) Querier { } func (q *querier) LabelValues(n string) ([]string, error) { + if len(q.blocks) == 0 { + return nil, nil + } res, err := q.blocks[0].LabelValues(n) if err != nil { return nil, err @@ -163,12 +166,16 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { } return &blockSeriesSet{ - index: q.index, - chunks: q.chunks, - it: p, - absent: absent, - mint: q.mint, - maxt: q.maxt, + set: &populatedChunkSeries{ + set: &baseChunkSeries{ + p: p, + index: q.index, + absent: absent, + }, + chunks: q.chunks, + mint: q.mint, + maxt: q.maxt, + }, } } @@ -361,23 +368,31 @@ func (s *partitionSeriesSet) Next() bool { return true } -// blockSeriesSet is a set of series from an inverted index query. -type blockSeriesSet struct { - index IndexReader - chunks ChunkReader - it Postings // postings list referencing series - absent []string // labels that must not be set for result series - mint, maxt int64 // considered time range - - err error - cur Series +type chunkSeriesSet interface { + Next() bool + At() (labels.Labels, []ChunkMeta) + Err() error } -func (s *blockSeriesSet) Next() bool { - // Step through the postings iterator to find potential series. -outer: - for s.it.Next() { - lset, chunks, err := s.index.Series(s.it.At()) +// baseChunkSeries loads the label set and chunk references for a postings +// list from an index. It filters out series that have labels set that should be unset. +type baseChunkSeries struct { + p Postings + index IndexReader + absent []string // labels that must be unset in results. + + lset labels.Labels + chks []ChunkMeta + err error +} + +func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } +func (s *baseChunkSeries) Err() error { return s.err } + +func (s *baseChunkSeries) Next() bool { +Outer: + for s.p.Next() { + lset, chunks, err := s.index.Series(s.p.At()) if err != nil { s.err = err return false @@ -386,35 +401,87 @@ outer: // If a series contains a label that must be absent, it is skipped as well. for _, abs := range s.absent { if lset.Get(abs) != "" { - continue outer + continue Outer } } - ser := &chunkSeries{ - labels: lset, - chunks: make([]ChunkMeta, 0, len(chunks)), - chunk: s.chunks.Chunk, - } - // Only use chunks that fit the time range. - for _, c := range chunks { + s.lset = lset + s.chks = chunks + + return true + } + if err := s.p.Err(); err != nil { + s.err = err + } + return false +} + +// populatedChunkSeries loads chunk data from a store for a set of series +// with known chunk references. It filters out chunks that do not fit the +// given time range. +type populatedChunkSeries struct { + set chunkSeriesSet + chunks ChunkReader + mint, maxt int64 + + err error + chks []ChunkMeta + lset labels.Labels +} + +func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } +func (s *populatedChunkSeries) Err() error { return s.err } + +func (s *populatedChunkSeries) Next() bool { + for s.set.Next() { + lset, chks := s.set.At() + + for i := range chks { + c := &chks[i] + if c.MaxTime < s.mint { + chks = chks[1:] continue } if c.MinTime > s.maxt { + chks = chks[:i] break } - ser.chunks = append(ser.chunks, c) + c.Chunk, s.err = s.chunks.Chunk(c.Ref) + if s.err != nil { + return false + } } - // If no chunks of the series apply to the time range, skip it. - if len(ser.chunks) == 0 { + if len(chks) == 0 { continue } - s.cur = ser + s.lset = lset + s.chks = chks + return true } - if s.it.Err() != nil { - s.err = s.it.Err() + if err := s.set.Err(); err != nil { + s.err = err + } + return false +} + +// blockSeriesSet is a set of series from an inverted index query. +type blockSeriesSet struct { + set chunkSeriesSet + err error + cur Series +} + +func (s *blockSeriesSet) Next() bool { + for s.set.Next() { + lset, chunks := s.set.At() + s.cur = &chunkSeries{labels: lset, chunks: chunks} + return true + } + if s.set.Err() != nil { + s.err = s.set.Err() } return false } @@ -427,10 +494,6 @@ func (s *blockSeriesSet) Err() error { return s.err } type chunkSeries struct { labels labels.Labels chunks []ChunkMeta // in-order chunk refs - - // chunk is a function that retrieves chunks based on a reference - // number contained in the chunk meta information. - chunk func(ref uint64) (chunks.Chunk, error) } func (s *chunkSeries) Labels() labels.Labels { @@ -438,21 +501,7 @@ func (s *chunkSeries) Labels() labels.Labels { } func (s *chunkSeries) Iterator() SeriesIterator { - var cs []chunks.Chunk - var mints []int64 - - for _, co := range s.chunks { - c, err := s.chunk(co.Ref) - if err != nil { - panic(err) // TODO(fabxc): add error series iterator. - } - cs = append(cs, c) - mints = append(mints, co.MinTime) - } - - // TODO(fabxc): consider pushing chunk retrieval further down. In practice, we - // probably have to touch all chunks anyway and it doesn't matter. - return newChunkSeriesIterator(mints, cs) + return newChunkSeriesIterator(s.chunks) } // SeriesIterator iterates over the data of a time series. @@ -538,43 +587,38 @@ func (it *chainedSeriesIterator) Err() error { // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { - mints []int64 // minimum timestamps for each iterator - chunks []chunks.Chunk + chunks []ChunkMeta i int cur chunks.Iterator } -func newChunkSeriesIterator(mints []int64, cs []chunks.Chunk) *chunkSeriesIterator { - if len(mints) != len(cs) { - panic("chunk references and chunks length don't match") - } +func newChunkSeriesIterator(cs []ChunkMeta) *chunkSeriesIterator { return &chunkSeriesIterator{ - mints: mints, chunks: cs, i: 0, - cur: cs[0].Iterator(), + cur: cs[0].Chunk.Iterator(), } } func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { // Only do binary search forward to stay in line with other iterators // that can only move forward. - x := sort.Search(len(it.mints[it.i:]), func(i int) bool { return it.mints[i] >= t }) + x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t }) x += it.i // If the timestamp was not found, it might be in the last chunk. - if x == len(it.mints) { + if x == len(it.chunks) { x-- } // Go to previous chunk if the chunk doesn't exactly start with t. // If we are already at the first chunk, we use it as it's the best we have. - if x > 0 && it.mints[x] > t { + if x > 0 && it.chunks[x].MinTime > t { x-- } it.i = x - it.cur = it.chunks[x].Iterator() + it.cur = it.chunks[x].Chunk.Iterator() for it.cur.Next() { t0, _ := it.cur.At() @@ -601,7 +645,7 @@ func (it *chunkSeriesIterator) Next() bool { } it.i++ - it.cur = it.chunks[it.i].Iterator() + it.cur = it.chunks[it.i].Chunk.Iterator() return it.Next() } diff --git a/rw_test.go b/rw_test.go index 9ab03306f..4f6dad169 100644 --- a/rw_test.go +++ b/rw_test.go @@ -4,13 +4,55 @@ import ( "io/ioutil" "math/rand" "os" + "sort" "testing" - "sort" - + "github.com/fabxc/tsdb/chunks" + "github.com/fabxc/tsdb/labels" "github.com/stretchr/testify/require" ) +type mockIndexReader struct { + labelValues func(...string) (StringTuples, error) + postings func(string, string) (Postings, error) + series func(uint32) (labels.Labels, []ChunkMeta, error) + labelIndices func() ([][]string, error) + close func() error +} + +func (ir *mockIndexReader) LabelValues(names ...string) (StringTuples, error) { + return ir.labelValues(names...) +} + +func (ir *mockIndexReader) Postings(name, value string) (Postings, error) { + return ir.postings(name, value) +} + +func (ir *mockIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { + return ir.series(ref) +} + +func (ir *mockIndexReader) LabelIndices() ([][]string, error) { + return ir.labelIndices() +} + +func (ir *mockIndexReader) Close() error { + return ir.close() +} + +type mockChunkReader struct { + chunk func(ref uint64) (chunks.Chunk, error) + close func() error +} + +func (cr *mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { + return cr.chunk(ref) +} + +func (cr *mockChunkReader) Close() error { + return cr.close() +} + func TestPersistence_index_e2e(t *testing.T) { dir, err := ioutil.TempDir("", "test_persistence_e2e") require.NoError(t, err)