Simplify and split up iterators

This simplifies some of the iterators by loading chunks from the
ChunkReader earlier, filtering of chunks vs filtering or series is
split into separate iterators for easier testing
This commit is contained in:
Fabian Reinartz 2017-03-07 11:29:20 +01:00
parent ed63636de4
commit 2841499212
2 changed files with 155 additions and 69 deletions

View File

@ -76,6 +76,9 @@ func (s *DB) Querier(mint, maxt int64) Querier {
} }
func (q *querier) LabelValues(n string) ([]string, error) { func (q *querier) LabelValues(n string) ([]string, error) {
if len(q.blocks) == 0 {
return nil, nil
}
res, err := q.blocks[0].LabelValues(n) res, err := q.blocks[0].LabelValues(n)
if err != nil { if err != nil {
return nil, err return nil, err
@ -163,12 +166,16 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
} }
return &blockSeriesSet{ return &blockSeriesSet{
index: q.index, set: &populatedChunkSeries{
chunks: q.chunks, set: &baseChunkSeries{
it: p, p: p,
absent: absent, index: q.index,
mint: q.mint, absent: absent,
maxt: q.maxt, },
chunks: q.chunks,
mint: q.mint,
maxt: q.maxt,
},
} }
} }
@ -361,23 +368,31 @@ func (s *partitionSeriesSet) Next() bool {
return true return true
} }
// blockSeriesSet is a set of series from an inverted index query. type chunkSeriesSet interface {
type blockSeriesSet struct { Next() bool
index IndexReader At() (labels.Labels, []ChunkMeta)
chunks ChunkReader Err() error
it Postings // postings list referencing series
absent []string // labels that must not be set for result series
mint, maxt int64 // considered time range
err error
cur Series
} }
func (s *blockSeriesSet) Next() bool { // baseChunkSeries loads the label set and chunk references for a postings
// Step through the postings iterator to find potential series. // list from an index. It filters out series that have labels set that should be unset.
outer: type baseChunkSeries struct {
for s.it.Next() { p Postings
lset, chunks, err := s.index.Series(s.it.At()) index IndexReader
absent []string // labels that must be unset in results.
lset labels.Labels
chks []ChunkMeta
err error
}
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks }
func (s *baseChunkSeries) Err() error { return s.err }
func (s *baseChunkSeries) Next() bool {
Outer:
for s.p.Next() {
lset, chunks, err := s.index.Series(s.p.At())
if err != nil { if err != nil {
s.err = err s.err = err
return false return false
@ -386,35 +401,87 @@ outer:
// If a series contains a label that must be absent, it is skipped as well. // If a series contains a label that must be absent, it is skipped as well.
for _, abs := range s.absent { for _, abs := range s.absent {
if lset.Get(abs) != "" { if lset.Get(abs) != "" {
continue outer continue Outer
} }
} }
ser := &chunkSeries{ s.lset = lset
labels: lset, s.chks = chunks
chunks: make([]ChunkMeta, 0, len(chunks)),
chunk: s.chunks.Chunk, return true
} }
// Only use chunks that fit the time range. if err := s.p.Err(); err != nil {
for _, c := range chunks { s.err = err
}
return false
}
// populatedChunkSeries loads chunk data from a store for a set of series
// with known chunk references. It filters out chunks that do not fit the
// given time range.
type populatedChunkSeries struct {
set chunkSeriesSet
chunks ChunkReader
mint, maxt int64
err error
chks []ChunkMeta
lset labels.Labels
}
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks }
func (s *populatedChunkSeries) Err() error { return s.err }
func (s *populatedChunkSeries) Next() bool {
for s.set.Next() {
lset, chks := s.set.At()
for i := range chks {
c := &chks[i]
if c.MaxTime < s.mint { if c.MaxTime < s.mint {
chks = chks[1:]
continue continue
} }
if c.MinTime > s.maxt { if c.MinTime > s.maxt {
chks = chks[:i]
break break
} }
ser.chunks = append(ser.chunks, c) c.Chunk, s.err = s.chunks.Chunk(c.Ref)
if s.err != nil {
return false
}
} }
// If no chunks of the series apply to the time range, skip it. if len(chks) == 0 {
if len(ser.chunks) == 0 {
continue continue
} }
s.cur = ser s.lset = lset
s.chks = chks
return true return true
} }
if s.it.Err() != nil { if err := s.set.Err(); err != nil {
s.err = s.it.Err() s.err = err
}
return false
}
// blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct {
set chunkSeriesSet
err error
cur Series
}
func (s *blockSeriesSet) Next() bool {
for s.set.Next() {
lset, chunks := s.set.At()
s.cur = &chunkSeries{labels: lset, chunks: chunks}
return true
}
if s.set.Err() != nil {
s.err = s.set.Err()
} }
return false return false
} }
@ -427,10 +494,6 @@ func (s *blockSeriesSet) Err() error { return s.err }
type chunkSeries struct { type chunkSeries struct {
labels labels.Labels labels labels.Labels
chunks []ChunkMeta // in-order chunk refs chunks []ChunkMeta // in-order chunk refs
// chunk is a function that retrieves chunks based on a reference
// number contained in the chunk meta information.
chunk func(ref uint64) (chunks.Chunk, error)
} }
func (s *chunkSeries) Labels() labels.Labels { func (s *chunkSeries) Labels() labels.Labels {
@ -438,21 +501,7 @@ func (s *chunkSeries) Labels() labels.Labels {
} }
func (s *chunkSeries) Iterator() SeriesIterator { func (s *chunkSeries) Iterator() SeriesIterator {
var cs []chunks.Chunk return newChunkSeriesIterator(s.chunks)
var mints []int64
for _, co := range s.chunks {
c, err := s.chunk(co.Ref)
if err != nil {
panic(err) // TODO(fabxc): add error series iterator.
}
cs = append(cs, c)
mints = append(mints, co.MinTime)
}
// TODO(fabxc): consider pushing chunk retrieval further down. In practice, we
// probably have to touch all chunks anyway and it doesn't matter.
return newChunkSeriesIterator(mints, cs)
} }
// SeriesIterator iterates over the data of a time series. // SeriesIterator iterates over the data of a time series.
@ -538,43 +587,38 @@ func (it *chainedSeriesIterator) Err() error {
// chunkSeriesIterator implements a series iterator on top // chunkSeriesIterator implements a series iterator on top
// of a list of time-sorted, non-overlapping chunks. // of a list of time-sorted, non-overlapping chunks.
type chunkSeriesIterator struct { type chunkSeriesIterator struct {
mints []int64 // minimum timestamps for each iterator chunks []ChunkMeta
chunks []chunks.Chunk
i int i int
cur chunks.Iterator cur chunks.Iterator
} }
func newChunkSeriesIterator(mints []int64, cs []chunks.Chunk) *chunkSeriesIterator { func newChunkSeriesIterator(cs []ChunkMeta) *chunkSeriesIterator {
if len(mints) != len(cs) {
panic("chunk references and chunks length don't match")
}
return &chunkSeriesIterator{ return &chunkSeriesIterator{
mints: mints,
chunks: cs, chunks: cs,
i: 0, i: 0,
cur: cs[0].Iterator(), cur: cs[0].Chunk.Iterator(),
} }
} }
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
// Only do binary search forward to stay in line with other iterators // Only do binary search forward to stay in line with other iterators
// that can only move forward. // that can only move forward.
x := sort.Search(len(it.mints[it.i:]), func(i int) bool { return it.mints[i] >= t }) x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t })
x += it.i x += it.i
// If the timestamp was not found, it might be in the last chunk. // If the timestamp was not found, it might be in the last chunk.
if x == len(it.mints) { if x == len(it.chunks) {
x-- x--
} }
// Go to previous chunk if the chunk doesn't exactly start with t. // Go to previous chunk if the chunk doesn't exactly start with t.
// If we are already at the first chunk, we use it as it's the best we have. // If we are already at the first chunk, we use it as it's the best we have.
if x > 0 && it.mints[x] > t { if x > 0 && it.chunks[x].MinTime > t {
x-- x--
} }
it.i = x it.i = x
it.cur = it.chunks[x].Iterator() it.cur = it.chunks[x].Chunk.Iterator()
for it.cur.Next() { for it.cur.Next() {
t0, _ := it.cur.At() t0, _ := it.cur.At()
@ -601,7 +645,7 @@ func (it *chunkSeriesIterator) Next() bool {
} }
it.i++ it.i++
it.cur = it.chunks[it.i].Iterator() it.cur = it.chunks[it.i].Chunk.Iterator()
return it.Next() return it.Next()
} }

View File

@ -4,13 +4,55 @@ import (
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
"sort"
"testing" "testing"
"sort" "github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type mockIndexReader struct {
labelValues func(...string) (StringTuples, error)
postings func(string, string) (Postings, error)
series func(uint32) (labels.Labels, []ChunkMeta, error)
labelIndices func() ([][]string, error)
close func() error
}
func (ir *mockIndexReader) LabelValues(names ...string) (StringTuples, error) {
return ir.labelValues(names...)
}
func (ir *mockIndexReader) Postings(name, value string) (Postings, error) {
return ir.postings(name, value)
}
func (ir *mockIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
return ir.series(ref)
}
func (ir *mockIndexReader) LabelIndices() ([][]string, error) {
return ir.labelIndices()
}
func (ir *mockIndexReader) Close() error {
return ir.close()
}
type mockChunkReader struct {
chunk func(ref uint64) (chunks.Chunk, error)
close func() error
}
func (cr *mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
return cr.chunk(ref)
}
func (cr *mockChunkReader) Close() error {
return cr.close()
}
func TestPersistence_index_e2e(t *testing.T) { func TestPersistence_index_e2e(t *testing.T) {
dir, err := ioutil.TempDir("", "test_persistence_e2e") dir, err := ioutil.TempDir("", "test_persistence_e2e")
require.NoError(t, err) require.NoError(t, err)