Properly balance k-way operations
This commit is contained in:
parent
9963a4c7c3
commit
eb8c9759fc
18
postings.go
18
postings.go
|
@ -78,12 +78,11 @@ func Intersect(its ...Postings) Postings {
|
||||||
if len(its) == 0 {
|
if len(its) == 0 {
|
||||||
return emptyPostings
|
return emptyPostings
|
||||||
}
|
}
|
||||||
a := its[0]
|
if len(its) == 1 {
|
||||||
|
return its[0]
|
||||||
for _, b := range its[1:] {
|
|
||||||
a = newIntersectPostings(a, b)
|
|
||||||
}
|
}
|
||||||
return a
|
l := len(its) / 2
|
||||||
|
return newIntersectPostings(Intersect(its[:l]...), Intersect(its[l:]...))
|
||||||
}
|
}
|
||||||
|
|
||||||
type intersectPostings struct {
|
type intersectPostings struct {
|
||||||
|
@ -145,12 +144,11 @@ func Merge(its ...Postings) Postings {
|
||||||
if len(its) == 0 {
|
if len(its) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
a := its[0]
|
if len(its) == 1 {
|
||||||
|
return its[0]
|
||||||
for _, b := range its[1:] {
|
|
||||||
a = newMergedPostings(a, b)
|
|
||||||
}
|
}
|
||||||
return a
|
l := len(its) / 2
|
||||||
|
return newMergedPostings(Merge(its[:l]...), Merge(its[l:]...))
|
||||||
}
|
}
|
||||||
|
|
||||||
type mergedPostings struct {
|
type mergedPostings struct {
|
||||||
|
|
44
querier.go
44
querier.go
|
@ -75,22 +75,26 @@ func (s *DB) Querier(mint, maxt int64) Querier {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) LabelValues(n string) ([]string, error) {
|
func (q *querier) LabelValues(n string) ([]string, error) {
|
||||||
if len(q.blocks) == 0 {
|
return q.lvals(q.blocks, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *querier) lvals(qs []Querier, n string) ([]string, error) {
|
||||||
|
if len(qs) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
res, err := q.blocks[0].LabelValues(n)
|
if len(qs) == 1 {
|
||||||
|
return qs[0].LabelValues(n)
|
||||||
|
}
|
||||||
|
l := len(qs) / 2
|
||||||
|
s1, err := q.lvals(qs[:l], n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, bq := range q.blocks[1:] {
|
s2, err := q.lvals(qs[l:], n)
|
||||||
pr, err := bq.LabelValues(n)
|
if err != nil {
|
||||||
if err != nil {
|
return nil, err
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// Merge new values into deduplicated result.
|
|
||||||
res = mergeStrings(res, pr)
|
|
||||||
}
|
}
|
||||||
return res, nil
|
return mergeStrings(s1, s2), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
|
func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
|
||||||
|
@ -98,19 +102,19 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
|
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
|
||||||
// Sets from different blocks have no time overlap. The reference numbers
|
return q.sel(q.blocks, ms)
|
||||||
// they emit point to series sorted in lexicographic order.
|
|
||||||
// We can fully connect partial series by simply comparing with the previous
|
}
|
||||||
// label set.
|
|
||||||
if len(q.blocks) == 0 {
|
func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet {
|
||||||
|
if len(qs) == 0 {
|
||||||
return nopSeriesSet{}
|
return nopSeriesSet{}
|
||||||
}
|
}
|
||||||
r := q.blocks[0].Select(ms...)
|
if len(qs) == 1 {
|
||||||
|
return qs[0].Select(ms...)
|
||||||
for _, s := range q.blocks[1:] {
|
|
||||||
r = newMergedSeriesSet(r, s.Select(ms...))
|
|
||||||
}
|
}
|
||||||
return r
|
l := len(qs) / 2
|
||||||
|
return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *querier) Close() error {
|
func (q *querier) Close() error {
|
||||||
|
|
Loading…
Reference in New Issue