Rename paritionSeriesSet to mergedSeriesSet

This commit is contained in:
Fabian Reinartz 2017-03-14 15:24:08 +01:00
parent e825a0b40c
commit d6fb6aaaa8
3 changed files with 89 additions and 112 deletions

View File

@ -2,8 +2,10 @@ package tsdb
import ( import (
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"testing" "testing"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
@ -107,103 +109,103 @@ func TestIndexRW_Postings(t *testing.T) {
require.NoError(t, ir.Close()) require.NoError(t, ir.Close())
} }
// func TestPersistence_index_e2e(t *testing.T) { func TestPersistence_index_e2e(t *testing.T) {
// dir, err := ioutil.TempDir("", "test_persistence_e2e") dir, err := ioutil.TempDir("", "test_persistence_e2e")
// require.NoError(t, err) require.NoError(t, err)
// defer os.RemoveAll(dir) defer os.RemoveAll(dir)
// lbls, err := readPrometheusLabels("testdata/20k.series", 20000) lbls, err := readPrometheusLabels("testdata/20k.series", 20000)
// require.NoError(t, err) require.NoError(t, err)
// var input indexWriterSeriesSlice var input indexWriterSeriesSlice
// // Generate ChunkMetas for every label set. // Generate ChunkMetas for every label set.
// for i, lset := range lbls { for i, lset := range lbls {
// var metas []ChunkMeta var metas []ChunkMeta
// for j := 0; j <= (i % 20); j++ { for j := 0; j <= (i % 20); j++ {
// metas = append(metas, ChunkMeta{ metas = append(metas, ChunkMeta{
// MinTime: int64(j * 10000), MinTime: int64(j * 10000),
// MaxTime: int64((j + 1) * 10000), MaxTime: int64((j + 1) * 10000),
// Ref: rand.Uint64(), Ref: rand.Uint64(),
// }) })
// } }
// input = append(input, &indexWriterSeries{ input = append(input, &indexWriterSeries{
// labels: lset, labels: lset,
// chunks: metas, chunks: metas,
// }) })
// } }
// iw, err := newIndexWriter(dir) iw, err := newIndexWriter(dir)
// require.NoError(t, err) require.NoError(t, err)
// // Population procedure as done by compaction. // Population procedure as done by compaction.
// var ( var (
// postings = &memPostings{m: make(map[term][]uint32, 512)} postings = &memPostings{m: make(map[term][]uint32, 512)}
// values = map[string]stringset{} values = map[string]stringset{}
// ) )
// for i, s := range input { for i, s := range input {
// iw.AddSeries(uint32(i), s.labels, s.chunks...) iw.AddSeries(uint32(i), s.labels, s.chunks...)
// for _, l := range s.labels { for _, l := range s.labels {
// valset, ok := values[l.Name] valset, ok := values[l.Name]
// if !ok { if !ok {
// valset = stringset{} valset = stringset{}
// values[l.Name] = valset values[l.Name] = valset
// } }
// valset.set(l.Value) valset.set(l.Value)
// postings.add(uint32(i), term{name: l.Name, value: l.Value}) postings.add(uint32(i), term{name: l.Name, value: l.Value})
// } }
// i++ i++
// } }
// all := make([]uint32, len(lbls)) all := make([]uint32, len(lbls))
// for i := range all { for i := range all {
// all[i] = uint32(i) all[i] = uint32(i)
// } }
// err = iw.WritePostings("", "", newListPostings(all)) err = iw.WritePostings("", "", newListPostings(all))
// require.NoError(t, err) require.NoError(t, err)
// err = iw.Close() err = iw.Close()
// require.NoError(t, err) require.NoError(t, err)
// ir, err := newIndexReader(dir) ir, err := newIndexReader(dir)
// require.NoError(t, err) require.NoError(t, err)
// allp, err := ir.Postings("", "") allp, err := ir.Postings("", "")
// require.NoError(t, err) require.NoError(t, err)
// var result indexWriterSeriesSlice var result indexWriterSeriesSlice
// for allp.Next() { for allp.Next() {
// ref := allp.At() ref := allp.At()
// lset, chks, err := ir.Series(ref) lset, chks, err := ir.Series(ref)
// require.NoError(t, err) require.NoError(t, err)
// result = append(result, &indexWriterSeries{ result = append(result, &indexWriterSeries{
// offset: ref, offset: ref,
// labels: lset, labels: lset,
// chunks: chks, chunks: chks,
// }) })
// } }
// require.NoError(t, allp.Err()) require.NoError(t, allp.Err())
// // Persisted data must be sorted. // Persisted data must be sorted.
// sort.IsSorted(result) sort.IsSorted(result)
// // Validate result contents. // Validate result contents.
// sort.Sort(input) sort.Sort(input)
// require.Equal(t, len(input), len(result)) require.Equal(t, len(input), len(result))
// for i, re := range result { for i, re := range result {
// exp := input[i] exp := input[i]
// require.Equal(t, exp.labels, re.labels) require.Equal(t, exp.labels, re.labels)
// require.Equal(t, exp.chunks, re.chunks) require.Equal(t, exp.chunks, re.chunks)
// } }
// require.NoError(t, ir.Close()) require.NoError(t, ir.Close())
// } }

View File

@ -109,7 +109,7 @@ func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
r := q.blocks[0].Select(ms...) r := q.blocks[0].Select(ms...)
for _, s := range q.blocks[1:] { for _, s := range q.blocks[1:] {
r = newPartitionSeriesSet(r, s.Select(ms...)) r = newMergedSeriesSet(r, s.Select(ms...))
} }
return r return r
} }
@ -282,39 +282,14 @@ func (nopSeriesSet) At() Series { return nil }
func (nopSeriesSet) Err() error { return nil } func (nopSeriesSet) Err() error { return nil }
type mergedSeriesSet struct { type mergedSeriesSet struct {
sets []SeriesSet
cur int
err error
}
func (s *mergedSeriesSet) At() Series { return s.sets[s.cur].At() }
func (s *mergedSeriesSet) Err() error { return s.sets[s.cur].Err() }
func (s *mergedSeriesSet) Next() bool {
// TODO(fabxc): We just emit the sets one after one. They are each
// lexicographically sorted. Should we emit their union sorted too?
if s.sets[s.cur].Next() {
return true
}
if s.cur == len(s.sets)-1 {
return false
}
s.cur++
return s.Next()
}
type partitionSeriesSet struct {
a, b SeriesSet a, b SeriesSet
cur Series cur Series
adone, bdone bool adone, bdone bool
} }
func newPartitionSeriesSet(a, b SeriesSet) *partitionSeriesSet { func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
s := &partitionSeriesSet{a: a, b: b} s := &mergedSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs // Initialize first elements of both sets as Next() needs
// one element look-ahead. // one element look-ahead.
s.adone = !s.a.Next() s.adone = !s.a.Next()
@ -323,18 +298,18 @@ func newPartitionSeriesSet(a, b SeriesSet) *partitionSeriesSet {
return s return s
} }
func (s *partitionSeriesSet) At() Series { func (s *mergedSeriesSet) At() Series {
return s.cur return s.cur
} }
func (s *partitionSeriesSet) Err() error { func (s *mergedSeriesSet) Err() error {
if s.a.Err() != nil { if s.a.Err() != nil {
return s.a.Err() return s.a.Err()
} }
return s.b.Err() return s.b.Err()
} }
func (s *partitionSeriesSet) compare() int { func (s *mergedSeriesSet) compare() int {
if s.adone { if s.adone {
return 1 return 1
} }
@ -344,7 +319,7 @@ func (s *partitionSeriesSet) compare() int {
return labels.Compare(s.a.At().Labels(), s.b.At().Labels()) return labels.Compare(s.a.At().Labels(), s.b.At().Labels())
} }
func (s *partitionSeriesSet) Next() bool { func (s *mergedSeriesSet) Next() bool {
if s.adone && s.bdone || s.Err() != nil { if s.adone && s.bdone || s.Err() != nil {
return false return false
} }

View File

@ -65,7 +65,7 @@ func (it *listSeriesIterator) Err() error {
return nil return nil
} }
func TestPartitionSeriesSet(t *testing.T) { func TestMergedSeriesSet(t *testing.T) {
newSeries := func(l map[string]string, s []sample) Series { newSeries := func(l map[string]string, s []sample) Series {
return &mockSeries{ return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) }, labels: func() labels.Labels { return labels.FromMap(l) },
@ -170,7 +170,7 @@ func TestPartitionSeriesSet(t *testing.T) {
Outer: Outer:
for _, c := range cases { for _, c := range cases {
res := newPartitionSeriesSet(c.a, c.b) res := newMergedSeriesSet(c.a, c.b)
for { for {
eok, rok := c.exp.Next(), res.Next() eok, rok := c.exp.Next(), res.Next()