From dd72b520987c5557d312e4cf1bb1bee9c09b94cc Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 27 Dec 2016 11:32:10 +0100 Subject: [PATCH] Add postings intersection --- postings.go | 51 +++++++++++++++-- postings_test.go | 140 +++++++++++++++++++++++++++++++++++++++++++++++ querier.go | 7 --- 3 files changed, 185 insertions(+), 13 deletions(-) create mode 100644 postings_test.go diff --git a/postings.go b/postings.go index fc4d24dfa..db0f91077 100644 --- a/postings.go +++ b/postings.go @@ -52,6 +52,13 @@ func (e errPostings) Seek(uint32) bool { return false } func (e errPostings) Value() uint32 { return 0 } func (e errPostings) Err() error { return e.err } +func expandPostings(p Postings) (res []uint32, err error) { + for p.Next() { + res = append(res, p.Value()) + } + return res, p.Err() +} + // Intersect returns a new postings list over the intersection of the // input postings. func Intersect(its ...Postings) Postings { @@ -61,29 +68,61 @@ func Intersect(its ...Postings) Postings { a := its[0] for _, b := range its[1:] { - a = &intersectPostings{a: a, b: b} + a = newIntersectPostings(a, b) } return a } type intersectPostings struct { - a, b Postings + a, b Postings + av, bc uint32 + aok, bok bool + cur uint32 +} + +func newIntersectPostings(a, b Postings) *intersectPostings { + it := &intersectPostings{a: a, b: b} + it.aok = it.a.Next() + it.bok = it.b.Next() + + return it } func (it *intersectPostings) Value() uint32 { - return 0 + return it.cur } func (it *intersectPostings) Next() bool { - return false + for { + if !it.aok || !it.bok { + return false + } + av, bv := it.a.Value(), it.b.Value() + + if av < bv { + it.aok = it.a.Seek(bv) + } else if bv < av { + it.bok = it.b.Seek(av) + } else { + it.cur = av + it.aok = it.a.Next() + it.bok = it.b.Next() + return true + } + } } func (it *intersectPostings) Seek(id uint32) bool { - return false + it.aok = it.a.Seek(id) + it.bok = it.b.Seek(id) + return it.Next() } func (it *intersectPostings) Err() error { - return nil + if it.a.Err() != nil { + return it.a.Err() + } + return it.b.Err() } // Merge returns a new iterator over the union of the input iterators. diff --git a/postings_test.go b/postings_test.go new file mode 100644 index 000000000..f348e4d2a --- /dev/null +++ b/postings_test.go @@ -0,0 +1,140 @@ +package tsdb + +import ( + "reflect" + "testing" +) + +type mockPostings struct { + next func() bool + seek func(uint32) bool + value func() uint32 + err func() error +} + +func (m *mockPostings) Next() bool { return m.next() } +func (m *mockPostings) Seek(v uint32) bool { return m.seek(v) } +func (m *mockPostings) Value() uint32 { return m.value() } +func (m *mockPostings) Err() error { return m.err() } + +func newListPostings(list []uint32) *mockPostings { + i := -1 + return &mockPostings{ + next: func() bool { + i++ + return i < len(list) + }, + seek: func(v uint32) bool { + for ; i < len(list); i++ { + if list[i] >= v { + return true + } + } + return false + }, + value: func() uint32 { + return list[i] + }, + err: func() error { return nil }, + } +} +func TestIntersectIterator(t *testing.T) { + var cases = []struct { + a, b []uint32 + res []uint32 + }{ + { + a: []uint32{1, 2, 3, 4, 5}, + b: []uint32{6, 7, 8, 9, 10}, + res: nil, + }, + { + a: []uint32{1, 2, 3, 4, 5}, + b: []uint32{4, 5, 6, 7, 8}, + res: []uint32{4, 5}, + }, + { + a: []uint32{1, 2, 3, 4, 9, 10}, + b: []uint32{1, 4, 5, 6, 7, 8, 10, 11}, + res: []uint32{1, 4, 10}, + }, { + a: []uint32{1}, + b: []uint32{0, 1}, + res: []uint32{1}, + }, + } + + for i, c := range cases { + a := newListPostings(c.a) + b := newListPostings(c.b) + + res, err := expandPostings(Intersect(a, b)) + if err != nil { + t.Fatalf("%d: Unexpected error: %s", i, err) + } + if !reflect.DeepEqual(res, c.res) { + t.Fatalf("%d: Expected %v but got %v", i, c.res, res) + } + } +} + +func TestMultiIntersect(t *testing.T) { + var cases = []struct { + a, b, c []uint32 + res []uint32 + }{ + { + a: []uint32{1, 2, 3, 4, 5, 6, 1000, 1001}, + b: []uint32{2, 4, 5, 6, 7, 8, 999, 1001}, + c: []uint32{1, 2, 5, 6, 7, 8, 1001, 1200}, + res: []uint32{2, 5, 6, 1001}, + }, + } + + for _, c := range cases { + pa := newListPostings(c.a) + pb := newListPostings(c.b) + pc := newListPostings(c.c) + + res, err := expandPostings(Intersect(pa, pb, pc)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if !reflect.DeepEqual(res, c.res) { + t.Fatalf("Expected %v but got %v", c.res, res) + } + } +} + +func BenchmarkIntersect(t *testing.B) { + var a, b, c, d []uint32 + + for i := 0; i < 10000000; i += 2 { + a = append(a, uint32(i)) + } + for i := 5000000; i < 5000100; i += 4 { + b = append(b, uint32(i)) + } + for i := 5090000; i < 5090600; i += 4 { + b = append(b, uint32(i)) + } + for i := 4990000; i < 5100000; i++ { + c = append(c, uint32(i)) + } + for i := 4000000; i < 6000000; i++ { + d = append(d, uint32(i)) + } + + i1 := newListPostings(a) + i2 := newListPostings(b) + i3 := newListPostings(c) + i4 := newListPostings(d) + + t.ResetTimer() + + for i := 0; i < t.N; i++ { + if _, err := expandPostings(Intersect(i1, i2, i3, i4)); err != nil { + t.Fatal(err) + } + } +} diff --git a/querier.go b/querier.go index f92e3d1ae..c758935f9 100644 --- a/querier.go +++ b/querier.go @@ -252,13 +252,6 @@ func (q *blockQuerier) selectSingle(m labels.Matcher) Postings { return Intersect(rit...) } -func expandPostings(p Postings) (res []uint32, err error) { - for p.Next() { - res = append(res, p.Value()) - } - return res, p.Err() -} - func (q *blockQuerier) LabelValues(name string) ([]string, error) { tpls, err := q.index.LabelValues(name) if err != nil {