diff --git a/head_bench_test.go b/head_bench_test.go index b511667f5..ebae304d7 100644 --- a/head_bench_test.go +++ b/head_bench_test.go @@ -54,19 +54,61 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { // Put a series, select it. GC it and then access it. h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(b, err) - defer h.Close() + defer func() { + testutil.Ok(b, h.Close()) + }() - // TODO: vary number of series - for i := 0; i < 1000000; i++ { - h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i))) + var hash uint64 + for n := 0; n < 10; n++ { + for i := 0; i < 100000; i++ { + h.getOrCreate(hash, labels.FromStrings("i", strconv.Itoa(i), "n", strconv.Itoa(i), "j", "foo")) + hash++ + // Have some series that won't be matched, to properly test inverted matches. + h.getOrCreate(hash, labels.FromStrings("i", strconv.Itoa(i), "n", strconv.Itoa(i), "j", "bar")) + hash++ + } } - b.ResetTimer() + n1 := labels.NewEqualMatcher("n", "1") - all, _ := labels.NewRegexpMatcher("a", ".*") + jFoo := labels.NewEqualMatcher("j", "foo") + jNotFoo := labels.Not(jFoo) - for i := 0; i < b.N; i++ { - _, err := PostingsForMatchers(h.indexRange(0, 1000), all) - testutil.Ok(b, err) + iStar := labels.NewMustRegexpMatcher("i", "^.*$") + iPlus := labels.NewMustRegexpMatcher("i", "^.+$") + i1Plus := labels.NewMustRegexpMatcher("i", "^1.+$") + iEmptyRe := labels.NewMustRegexpMatcher("i", "^$") + iNotEmpty := labels.Not(labels.NewEqualMatcher("i", "")) + iNot2 := labels.Not(labels.NewEqualMatcher("n", "2")) + iNot2Star := labels.Not(labels.NewMustRegexpMatcher("i", "^2.*$")) + + cases := []struct { + name string + matchers []labels.Matcher + }{ + {`n="1"`, []labels.Matcher{n1}}, + {`n="1",j="foo"`, []labels.Matcher{n1, jFoo}}, + {`j="foo",n="1"`, []labels.Matcher{jFoo, n1}}, + {`n="1",j!="foo"`, []labels.Matcher{n1, jNotFoo}}, + {`i=~".*"`, []labels.Matcher{iStar}}, + {`i=~".+"`, []labels.Matcher{iPlus}}, + {`i=~""`, []labels.Matcher{iEmptyRe}}, + {`i!=""`, []labels.Matcher{iNotEmpty}}, + {`n="1",i=~".*",j="foo"`, []labels.Matcher{n1, iStar, jFoo}}, + {`n="1",i=~".*",i!="2",j="foo"`, []labels.Matcher{n1, iStar, iNot2, jFoo}}, + {`n="1",i!="",j="foo"`, []labels.Matcher{n1, iNotEmpty, jFoo}}, + {`n="1",i=~".+",j="foo"`, []labels.Matcher{n1, iPlus, jFoo}}, + {`n="1",i=~"1.+",j="foo"`, []labels.Matcher{n1, i1Plus, jFoo}}, + {`n="1",i=~".+",i!="2",j="foo"`, []labels.Matcher{n1, iPlus, iNot2, jFoo}}, + {`n="1",i=~".+",i!~"2.*",j="foo"`, []labels.Matcher{n1, iPlus, iNot2Star, jFoo}}, + } + + for _, c := range cases { + b.Run(c.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + _, err := PostingsForMatchers(h.indexRange(0, 1000), c.matchers...) + testutil.Ok(b, err) + } + }) } } diff --git a/index/postings.go b/index/postings.go index 6212d07b4..cbad5b74d 100644 --- a/index/postings.go +++ b/index/postings.go @@ -14,6 +14,7 @@ package index import ( + "container/heap" "encoding/binary" "runtime" "sort" @@ -365,25 +366,132 @@ func Merge(its ...Postings) Postings { if len(its) == 1 { return its[0] } - // All the uses of this function immediately expand it, so - // collect everything in a map. This is more efficient - // when there's 100ks of postings, compared to - // having a tree of merge objects. - pm := make(map[uint64]struct{}, len(its)) - for _, it := range its { - for it.Next() { - pm[it.At()] = struct{}{} - } - if it.Err() != nil { - return ErrPostings(it.Err()) + return newMergedPostings(its) +} + +type postingsHeap []Postings + +func (h postingsHeap) Len() int { return len(h) } +func (h postingsHeap) Less(i, j int) bool { return h[i].At() < h[j].At() } +func (h *postingsHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } + +func (h *postingsHeap) Push(x interface{}) { + *h = append(*h, x.(Postings)) +} + +func (h *postingsHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +type mergedPostings struct { + h postingsHeap + initilized bool + heaped bool + cur uint64 + err error +} + +func newMergedPostings(p []Postings) *mergedPostings { + ph := make(postingsHeap, 0, len(p)) + for _, it := range p { + if it.Next() { + ph = append(ph, it) + } else { + if it.Err() != nil { + return &mergedPostings{err: it.Err()} + } } } - pl := make([]uint64, 0, len(pm)) - for p := range pm { - pl = append(pl, p) + return &mergedPostings{h: ph} +} + +func (it *mergedPostings) Next() bool { + if it.h.Len() == 0 || it.err != nil { + return false } - sort.Slice(pl, func(i, j int) bool { return pl[i] < pl[j] }) - return newListPostings(pl) + + if !it.heaped { + heap.Init(&it.h) + it.heaped = true + } + // The user must issue an initial Next. + if !it.initilized { + it.cur = it.h[0].At() + it.initilized = true + return true + } + + for { + cur := it.h[0] + if !cur.Next() { + heap.Pop(&it.h) + if cur.Err() != nil { + it.err = cur.Err() + return false + } + if it.h.Len() == 0 { + return false + } + } else { + // Value of top of heap has changed, re-heapify. + heap.Fix(&it.h, 0) + } + + if it.h[0].At() != it.cur { + it.cur = it.h[0].At() + return true + } + } +} + +func (it *mergedPostings) Seek(id uint64) bool { + if it.h.Len() == 0 || it.err != nil { + return false + } + if !it.initilized { + if !it.Next() { + return false + } + } + if it.cur >= id { + return true + } + // Heapifying when there is lots of Seeks is inefficient, + // mark to be re-heapified on the Next() call. + it.heaped = false + newH := make(postingsHeap, 0, len(it.h)) + lowest := ^uint64(0) + for _, i := range it.h { + if i.Seek(id) { + newH = append(newH, i) + if i.At() < lowest { + lowest = i.At() + } + } else { + if i.Err() != nil { + it.err = i.Err() + return false + } + } + } + it.h = newH + if len(it.h) == 0 { + return false + } + it.cur = lowest + return true +} + +func (it mergedPostings) At() uint64 { + return it.cur +} + +func (it mergedPostings) Err() error { + return it.err } // Without returns a new postings list that contains all elements from the full list that @@ -498,6 +606,9 @@ func (it *listPostings) Seek(x uint64) bool { if it.cur >= x { return true } + if len(it.list) == 0 { + return false + } // Do binary search between current position and end. i := sort.Search(len(it.list), func(i int) bool { diff --git a/querier.go b/querier.go index 61503d672..3e8cd77ca 100644 --- a/querier.go +++ b/querier.go @@ -354,11 +354,23 @@ func postingsForUnsetLabelMatcher(ix IndexReader, m labels.Matcher) (index.Posti rit = append(rit, it) } + merged := index.Merge(rit...) + // With many many postings, it's best to pre-calculate + // the merged list via next rather than have a ton of seeks + // in Without/Intersection. + if len(rit) > 100 { + pl, err := index.ExpandPostings(merged) + if err != nil { + return nil, err + } + merged = index.NewListPostings(pl) + } + allPostings, err := ix.Postings(index.AllPostingsKey()) if err != nil { return nil, err } - return index.Without(allPostings, index.Merge(rit...)), nil + return index.Without(allPostings, merged), nil } func mergeStrings(a, b []string) []string {