From d6fb6aaaa8838dd444b60571e540df3f1089d6c8 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 14 Mar 2017 15:24:08 +0100 Subject: [PATCH] Rename paritionSeriesSet to mergedSeriesSet --- index_test.go | 158 ++++++++++++++++++++++++------------------------ querier.go | 39 +++--------- querier_test.go | 4 +- 3 files changed, 89 insertions(+), 112 deletions(-) diff --git a/index_test.go b/index_test.go index 48026b5c8..a9659c95c 100644 --- a/index_test.go +++ b/index_test.go @@ -2,8 +2,10 @@ package tsdb import ( "io/ioutil" + "math/rand" "os" "path/filepath" + "sort" "testing" "github.com/fabxc/tsdb/labels" @@ -107,103 +109,103 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, ir.Close()) } -// func TestPersistence_index_e2e(t *testing.T) { -// dir, err := ioutil.TempDir("", "test_persistence_e2e") -// require.NoError(t, err) -// defer os.RemoveAll(dir) +func TestPersistence_index_e2e(t *testing.T) { + dir, err := ioutil.TempDir("", "test_persistence_e2e") + require.NoError(t, err) + defer os.RemoveAll(dir) -// lbls, err := readPrometheusLabels("testdata/20k.series", 20000) -// require.NoError(t, err) + lbls, err := readPrometheusLabels("testdata/20k.series", 20000) + require.NoError(t, err) -// var input indexWriterSeriesSlice + var input indexWriterSeriesSlice -// // Generate ChunkMetas for every label set. -// for i, lset := range lbls { -// var metas []ChunkMeta + // Generate ChunkMetas for every label set. + for i, lset := range lbls { + var metas []ChunkMeta -// for j := 0; j <= (i % 20); j++ { -// metas = append(metas, ChunkMeta{ -// MinTime: int64(j * 10000), -// MaxTime: int64((j + 1) * 10000), -// Ref: rand.Uint64(), -// }) -// } -// input = append(input, &indexWriterSeries{ -// labels: lset, -// chunks: metas, -// }) -// } + for j := 0; j <= (i % 20); j++ { + metas = append(metas, ChunkMeta{ + MinTime: int64(j * 10000), + MaxTime: int64((j + 1) * 10000), + Ref: rand.Uint64(), + }) + } + input = append(input, &indexWriterSeries{ + labels: lset, + chunks: metas, + }) + } -// iw, err := newIndexWriter(dir) -// require.NoError(t, err) + iw, err := newIndexWriter(dir) + require.NoError(t, err) -// // Population procedure as done by compaction. -// var ( -// postings = &memPostings{m: make(map[term][]uint32, 512)} -// values = map[string]stringset{} -// ) + // Population procedure as done by compaction. + var ( + postings = &memPostings{m: make(map[term][]uint32, 512)} + values = map[string]stringset{} + ) -// for i, s := range input { -// iw.AddSeries(uint32(i), s.labels, s.chunks...) + for i, s := range input { + iw.AddSeries(uint32(i), s.labels, s.chunks...) -// for _, l := range s.labels { -// valset, ok := values[l.Name] -// if !ok { -// valset = stringset{} -// values[l.Name] = valset -// } -// valset.set(l.Value) + for _, l := range s.labels { + valset, ok := values[l.Name] + if !ok { + valset = stringset{} + values[l.Name] = valset + } + valset.set(l.Value) -// postings.add(uint32(i), term{name: l.Name, value: l.Value}) -// } -// i++ -// } -// all := make([]uint32, len(lbls)) -// for i := range all { -// all[i] = uint32(i) -// } -// err = iw.WritePostings("", "", newListPostings(all)) -// require.NoError(t, err) + postings.add(uint32(i), term{name: l.Name, value: l.Value}) + } + i++ + } + all := make([]uint32, len(lbls)) + for i := range all { + all[i] = uint32(i) + } + err = iw.WritePostings("", "", newListPostings(all)) + require.NoError(t, err) -// err = iw.Close() -// require.NoError(t, err) + err = iw.Close() + require.NoError(t, err) -// ir, err := newIndexReader(dir) -// require.NoError(t, err) + ir, err := newIndexReader(dir) + require.NoError(t, err) -// allp, err := ir.Postings("", "") -// require.NoError(t, err) + allp, err := ir.Postings("", "") + require.NoError(t, err) -// var result indexWriterSeriesSlice + var result indexWriterSeriesSlice -// for allp.Next() { -// ref := allp.At() + for allp.Next() { + ref := allp.At() -// lset, chks, err := ir.Series(ref) -// require.NoError(t, err) + lset, chks, err := ir.Series(ref) + require.NoError(t, err) -// result = append(result, &indexWriterSeries{ -// offset: ref, -// labels: lset, -// chunks: chks, -// }) -// } -// require.NoError(t, allp.Err()) + result = append(result, &indexWriterSeries{ + offset: ref, + labels: lset, + chunks: chks, + }) + } + require.NoError(t, allp.Err()) -// // Persisted data must be sorted. -// sort.IsSorted(result) + // Persisted data must be sorted. + sort.IsSorted(result) -// // Validate result contents. -// sort.Sort(input) -// require.Equal(t, len(input), len(result)) + // Validate result contents. + sort.Sort(input) + require.Equal(t, len(input), len(result)) -// for i, re := range result { -// exp := input[i] + for i, re := range result { + exp := input[i] -// require.Equal(t, exp.labels, re.labels) -// require.Equal(t, exp.chunks, re.chunks) -// } + require.Equal(t, exp.labels, re.labels) + require.Equal(t, exp.chunks, re.chunks) + } -// require.NoError(t, ir.Close()) + require.NoError(t, ir.Close()) -// } +} diff --git a/querier.go b/querier.go index a397b9a63..31b5d9f1e 100644 --- a/querier.go +++ b/querier.go @@ -109,7 +109,7 @@ func (q *querier) Select(ms ...labels.Matcher) SeriesSet { r := q.blocks[0].Select(ms...) for _, s := range q.blocks[1:] { - r = newPartitionSeriesSet(r, s.Select(ms...)) + r = newMergedSeriesSet(r, s.Select(ms...)) } return r } @@ -282,39 +282,14 @@ func (nopSeriesSet) At() Series { return nil } func (nopSeriesSet) Err() error { return nil } type mergedSeriesSet struct { - sets []SeriesSet - - cur int - err error -} - -func (s *mergedSeriesSet) At() Series { return s.sets[s.cur].At() } -func (s *mergedSeriesSet) Err() error { return s.sets[s.cur].Err() } - -func (s *mergedSeriesSet) Next() bool { - // TODO(fabxc): We just emit the sets one after one. They are each - // lexicographically sorted. Should we emit their union sorted too? - if s.sets[s.cur].Next() { - return true - } - - if s.cur == len(s.sets)-1 { - return false - } - s.cur++ - - return s.Next() -} - -type partitionSeriesSet struct { a, b SeriesSet cur Series adone, bdone bool } -func newPartitionSeriesSet(a, b SeriesSet) *partitionSeriesSet { - s := &partitionSeriesSet{a: a, b: b} +func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet { + s := &mergedSeriesSet{a: a, b: b} // Initialize first elements of both sets as Next() needs // one element look-ahead. s.adone = !s.a.Next() @@ -323,18 +298,18 @@ func newPartitionSeriesSet(a, b SeriesSet) *partitionSeriesSet { return s } -func (s *partitionSeriesSet) At() Series { +func (s *mergedSeriesSet) At() Series { return s.cur } -func (s *partitionSeriesSet) Err() error { +func (s *mergedSeriesSet) Err() error { if s.a.Err() != nil { return s.a.Err() } return s.b.Err() } -func (s *partitionSeriesSet) compare() int { +func (s *mergedSeriesSet) compare() int { if s.adone { return 1 } @@ -344,7 +319,7 @@ func (s *partitionSeriesSet) compare() int { return labels.Compare(s.a.At().Labels(), s.b.At().Labels()) } -func (s *partitionSeriesSet) Next() bool { +func (s *mergedSeriesSet) Next() bool { if s.adone && s.bdone || s.Err() != nil { return false } diff --git a/querier_test.go b/querier_test.go index 8d8609406..41fafd5ae 100644 --- a/querier_test.go +++ b/querier_test.go @@ -65,7 +65,7 @@ func (it *listSeriesIterator) Err() error { return nil } -func TestPartitionSeriesSet(t *testing.T) { +func TestMergedSeriesSet(t *testing.T) { newSeries := func(l map[string]string, s []sample) Series { return &mockSeries{ labels: func() labels.Labels { return labels.FromMap(l) }, @@ -170,7 +170,7 @@ func TestPartitionSeriesSet(t *testing.T) { Outer: for _, c := range cases { - res := newPartitionSeriesSet(c.a, c.b) + res := newMergedSeriesSet(c.a, c.b) for { eok, rok := c.exp.Next(), res.Next()