diff --git a/index.go b/index.go index 11b3b3711..e16ab3c25 100644 --- a/index.go +++ b/index.go @@ -65,7 +65,7 @@ type memPostings struct { // Postings returns an iterator over the postings list for s. func (p *memPostings) get(t term) Postings { - return &listIterator{list: p.m[t], idx: -1} + return &listPostings{list: p.m[t], idx: -1} } // add adds a document to the index. The caller has to ensure that no @@ -169,22 +169,22 @@ func (it *mergePostings) Err() error { return nil } -// listIterator implements the Iterator interface over a plain list. -type listIterator struct { +// listPostings implements the Postings interface over a plain list. +type listPostings struct { list []uint32 idx int } -func (it *listIterator) Value() uint32 { +func (it *listPostings) Value() uint32 { return it.list[it.idx] } -func (it *listIterator) Next() bool { +func (it *listPostings) Next() bool { it.idx++ return it.idx < len(it.list) } -func (it *listIterator) Seek(x uint32) bool { +func (it *listPostings) Seek(x uint32) bool { // Do binary search between current position and end. it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { return it.list[i+it.idx] >= x @@ -192,7 +192,7 @@ func (it *listIterator) Seek(x uint32) bool { return it.idx < len(it.list) } -func (it *listIterator) Err() error { +func (it *listPostings) Err() error { return nil } diff --git a/querier.go b/querier.go index 33141c32a..b9ee62a81 100644 --- a/querier.go +++ b/querier.go @@ -491,6 +491,9 @@ func (it *chainedSeriesIterator) Seek(t int64) bool { } func (it *chainedSeriesIterator) Next() bool { + if it.cur == nil { + it.cur = it.series[it.i].Iterator() + } if it.cur.Next() { return true } diff --git a/querier_test.go b/querier_test.go index 0da4d0608..8e1a6f736 100644 --- a/querier_test.go +++ b/querier_test.go @@ -2,11 +2,185 @@ package tsdb import ( "math/rand" + "sort" "testing" "github.com/stretchr/testify/require" ) +type mockSeriesIterator struct { + seek func(int64) bool + values func() (int64, float64) + next func() bool + err func() error +} + +func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) } +func (m *mockSeriesIterator) Values() (int64, float64) { return m.values() } +func (m *mockSeriesIterator) Next() bool { return m.next() } +func (m *mockSeriesIterator) Err() error { return m.err() } + +type mockSeries struct { + labels func() Labels + iterator func() SeriesIterator +} + +func (m *mockSeries) Labels() Labels { return m.labels() } +func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } + +type listSeriesIterator struct { + list []sample + idx int +} + +func newListSeriesIterator(list []sample) *listSeriesIterator { + return &listSeriesIterator{list: list, idx: -1} +} + +func (it *listSeriesIterator) Values() (int64, float64) { + s := it.list[it.idx] + return s.t, s.v +} + +func (it *listSeriesIterator) Next() bool { + it.idx++ + return it.idx < len(it.list) +} + +func (it *listSeriesIterator) Seek(t int64) bool { + // Do binary search between current position and end. + it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { + s := it.list[i+it.idx] + return s.t >= t + }) + return it.idx < len(it.list) +} + +func (it *listSeriesIterator) Err() error { + return nil +} + +// func TestChainedSeriesIterator(t *testing.T) { + +// cases := []struct { +// series []Series +// }{} + +// for _, c := range cases { + +// } +// } + +type mockSeriesSet struct { + next func() bool + series func() Series + err func() error +} + +func (m *mockSeriesSet) Next() bool { return m.next() } +func (m *mockSeriesSet) Series() Series { return m.series() } +func (m *mockSeriesSet) Err() error { return m.err() } + +func newListSeriesSet(list []Series) *mockSeriesSet { + i := -1 + return &mockSeriesSet{ + next: func() bool { + i++ + return i < len(list) + }, + series: func() Series { + return list[i] + }, + err: func() error { return nil }, + } +} + +func TestShardSeriesSet(t *testing.T) { + newSeries := func(l map[string]string, s []sample) Series { + return &mockSeries{ + labels: func() Labels { return LabelsFromMap(l) }, + iterator: func() SeriesIterator { return newListSeriesIterator(s) }, + } + } + + cases := []struct { + // The input sets in order (samples in series in b are strictly + // after those in a). + a, b SeriesSet + // The composition of a and b in the shard series set must yield + // results equivalent to the result series set. + exp SeriesSet + }{ + { + a: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "a": "a", + }, []sample{ + {t: 1, v: 1}, + }), + }), + b: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "a": "a", + }, []sample{ + {t: 2, v: 2}, + }), + newSeries(map[string]string{ + "b": "b", + }, []sample{ + {t: 1, v: 1}, + }), + }), + exp: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "a": "a", + }, []sample{ + {t: 1, v: 1}, + {t: 2, v: 2}, + }), + newSeries(map[string]string{ + "b": "b", + }, []sample{ + {t: 1, v: 1}, + }), + }), + }, + } + +Outer: + for _, c := range cases { + res := newShardSeriesSet(c.a, c.b) + + for { + eok, rok := c.exp.Next(), res.Next() + require.Equal(t, eok, rok, "next") + + if !eok { + continue Outer + } + sexp := c.exp.Series() + sres := res.Series() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + } +} + +func expandSeriesIterator(it SeriesIterator) (r []sample, err error) { + for it.Next() { + t, v := it.Values() + r = append(r, sample{t: t, v: v}) + } + + return r, it.Err() +} + func TestCompareLabels(t *testing.T) { cases := []struct { a, b []Label diff --git a/reader.go b/reader.go index ff5beb8a1..4ef550f42 100644 --- a/reader.go +++ b/reader.go @@ -364,7 +364,7 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { b = b[4:] } - return &listIterator{list: l, idx: -1}, nil + return &listPostings{list: l, idx: -1}, nil } type stringTuples struct {