From eb8c9759fc9b6cb99e9e8039b0f9a4c74d7f8656 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 13 Jun 2017 08:25:13 +0200 Subject: [PATCH] Properly balance k-way operations --- postings.go | 18 ++++++++---------- querier.go | 44 ++++++++++++++++++++++++-------------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/postings.go b/postings.go index f2f452ac1..f2f1eb5b8 100644 --- a/postings.go +++ b/postings.go @@ -78,12 +78,11 @@ func Intersect(its ...Postings) Postings { if len(its) == 0 { return emptyPostings } - a := its[0] - - for _, b := range its[1:] { - a = newIntersectPostings(a, b) + if len(its) == 1 { + return its[0] } - return a + l := len(its) / 2 + return newIntersectPostings(Intersect(its[:l]...), Intersect(its[l:]...)) } type intersectPostings struct { @@ -145,12 +144,11 @@ func Merge(its ...Postings) Postings { if len(its) == 0 { return nil } - a := its[0] - - for _, b := range its[1:] { - a = newMergedPostings(a, b) + if len(its) == 1 { + return its[0] } - return a + l := len(its) / 2 + return newMergedPostings(Merge(its[:l]...), Merge(its[l:]...)) } type mergedPostings struct { diff --git a/querier.go b/querier.go index 601fc7440..751de0cb0 100644 --- a/querier.go +++ b/querier.go @@ -75,22 +75,26 @@ func (s *DB) Querier(mint, maxt int64) Querier { } func (q *querier) LabelValues(n string) ([]string, error) { - if len(q.blocks) == 0 { + return q.lvals(q.blocks, n) +} + +func (q *querier) lvals(qs []Querier, n string) ([]string, error) { + if len(qs) == 0 { return nil, nil } - res, err := q.blocks[0].LabelValues(n) + if len(qs) == 1 { + return qs[0].LabelValues(n) + } + l := len(qs) / 2 + s1, err := q.lvals(qs[:l], n) if err != nil { return nil, err } - for _, bq := range q.blocks[1:] { - pr, err := bq.LabelValues(n) - if err != nil { - return nil, err - } - // Merge new values into deduplicated result. - res = mergeStrings(res, pr) + s2, err := q.lvals(qs[l:], n) + if err != nil { + return nil, err } - return res, nil + return mergeStrings(s1, s2), nil } func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { @@ -98,19 +102,19 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { } func (q *querier) Select(ms ...labels.Matcher) SeriesSet { - // Sets from different blocks have no time overlap. The reference numbers - // they emit point to series sorted in lexicographic order. - // We can fully connect partial series by simply comparing with the previous - // label set. - if len(q.blocks) == 0 { + return q.sel(q.blocks, ms) + +} + +func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet { + if len(qs) == 0 { return nopSeriesSet{} } - r := q.blocks[0].Select(ms...) - - for _, s := range q.blocks[1:] { - r = newMergedSeriesSet(r, s.Select(ms...)) + if len(qs) == 1 { + return qs[0].Select(ms...) } - return r + l := len(qs) / 2 + return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms)) } func (q *querier) Close() error {