diff --git a/index/index_test.go b/index/index_test.go index d5122402e..f7a815622 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -320,10 +320,12 @@ func TestPersistence_index_e2e(t *testing.T) { testutil.Ok(t, err) mi.WritePostings("", "", newListPostings(all)) - for l := range postings.m { - err = iw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)) - testutil.Ok(t, err) - mi.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)) + for n, e := range postings.m { + for v := range e { + err = iw.WritePostings(n, v, postings.Get(n, v)) + testutil.Ok(t, err) + mi.WritePostings(n, v, postings.Get(n, v)) + } } err = iw.Close() diff --git a/index/postings.go b/index/postings.go index 53449f341..1f2ee0d6f 100644 --- a/index/postings.go +++ b/index/postings.go @@ -36,14 +36,14 @@ func AllPostingsKey() (name, value string) { // unordered batch fills on startup. type MemPostings struct { mtx sync.RWMutex - m map[labels.Label][]uint64 + m map[string]map[string][]uint64 ordered bool } // NewMemPostings returns a memPostings that's ready for reads and writes. func NewMemPostings() *MemPostings { return &MemPostings{ - m: make(map[labels.Label][]uint64, 512), + m: make(map[string]map[string][]uint64, 512), ordered: true, } } @@ -52,7 +52,7 @@ func NewMemPostings() *MemPostings { // until ensureOrder was called once. func NewUnorderedMemPostings() *MemPostings { return &MemPostings{ - m: make(map[labels.Label][]uint64, 512), + m: make(map[string]map[string][]uint64, 512), ordered: false, } } @@ -62,8 +62,10 @@ func (p *MemPostings) SortedKeys() []labels.Label { p.mtx.RLock() keys := make([]labels.Label, 0, len(p.m)) - for l := range p.m { - keys = append(keys, l) + for n, e := range p.m { + for v := range e { + keys = append(keys, labels.Label{Name: n, Value: v}) + } } p.mtx.RUnlock() @@ -78,14 +80,18 @@ func (p *MemPostings) SortedKeys() []labels.Label { // Get returns a postings list for the given label pair. func (p *MemPostings) Get(name, value string) Postings { + var lp []uint64 p.mtx.RLock() - l := p.m[labels.Label{Name: name, Value: value}] + l := p.m[name] + if l != nil { + lp = l[value] + } p.mtx.RUnlock() - if l == nil { + if lp == nil { return EmptyPostings() } - return newListPostings(l) + return newListPostings(lp) } // All returns a postings list over all documents ever added. @@ -118,8 +124,10 @@ func (p *MemPostings) EnsureOrder() { }() } - for _, l := range p.m { - workc <- l + for _, e := range p.m { + for _, l := range e { + workc <- l + } } close(workc) wg.Wait() @@ -129,44 +137,58 @@ func (p *MemPostings) EnsureOrder() { // Delete removes all ids in the given map from the postings lists. func (p *MemPostings) Delete(deleted map[uint64]struct{}) { - var keys []labels.Label + var keys, vals []string // Collect all keys relevant for deletion once. New keys added afterwards // can by definition not be affected by any of the given deletes. p.mtx.RLock() - for l := range p.m { - keys = append(keys, l) + for n := range p.m { + keys = append(keys, n) } p.mtx.RUnlock() - // For each key we first analyse whether the postings list is affected by the deletes. - // If yes, we actually reallocate a new postings list. - for _, l := range keys { - // Only lock for processing one postings list so we don't block reads for too long. - p.mtx.Lock() - - found := false - for _, id := range p.m[l] { - if _, ok := deleted[id]; ok { - found = true - break - } + for _, n := range keys { + p.mtx.RLock() + vals = vals[:0] + for v := range p.m[n] { + vals = append(vals, v) } - if !found { + p.mtx.RUnlock() + + // For each posting we first analyse whether the postings list is affected by the deletes. + // If yes, we actually reallocate a new postings list. + for _, l := range vals { + // Only lock for processing one postings list so we don't block reads for too long. + p.mtx.Lock() + + found := false + for _, id := range p.m[n][l] { + if _, ok := deleted[id]; ok { + found = true + break + } + } + if !found { + p.mtx.Unlock() + continue + } + repl := make([]uint64, 0, len(p.m[n][l])) + + for _, id := range p.m[n][l] { + if _, ok := deleted[id]; !ok { + repl = append(repl, id) + } + } + if len(repl) > 0 { + p.m[n][l] = repl + } else { + delete(p.m[n], l) + } p.mtx.Unlock() - continue } - repl := make([]uint64, 0, len(p.m[l])) - - for _, id := range p.m[l] { - if _, ok := deleted[id]; !ok { - repl = append(repl, id) - } - } - if len(repl) > 0 { - p.m[l] = repl - } else { - delete(p.m, l) + p.mtx.Lock() + if len(p.m[n]) == 0 { + delete(p.m, n) } p.mtx.Unlock() } @@ -177,9 +199,11 @@ func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error { p.mtx.RLock() defer p.mtx.RUnlock() - for l, p := range p.m { - if err := f(l, newListPostings(p)); err != nil { - return err + for n, e := range p.m { + for v, p := range e { + if err := f(labels.Label{Name: n, Value: v}, newListPostings(p)); err != nil { + return err + } } } return nil @@ -198,8 +222,13 @@ func (p *MemPostings) Add(id uint64, lset labels.Labels) { } func (p *MemPostings) addFor(id uint64, l labels.Label) { - list := append(p.m[l], id) - p.m[l] = list + nm, ok := p.m[l.Name] + if !ok { + nm = map[string][]uint64{} + p.m[l.Name] = nm + } + list := append(nm[l.Value], id) + nm[l.Value] = list if !p.ordered { return diff --git a/index/postings_test.go b/index/postings_test.go index e7e2e61ca..53a9d95fc 100644 --- a/index/postings_test.go +++ b/index/postings_test.go @@ -20,21 +20,22 @@ import ( "sort" "testing" - "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" ) func TestMemPostings_addFor(t *testing.T) { p := NewMemPostings() - p.m[allPostingsKey] = []uint64{1, 2, 3, 4, 6, 7, 8} + p.m[allPostingsKey.Name] = map[string][]uint64{} + p.m[allPostingsKey.Name][allPostingsKey.Value] = []uint64{1, 2, 3, 4, 6, 7, 8} p.addFor(5, allPostingsKey) - testutil.Equals(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey]) + testutil.Equals(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey.Name][allPostingsKey.Value]) } func TestMemPostings_ensureOrder(t *testing.T) { p := NewUnorderedMemPostings() + p.m["a"] = map[string][]uint64{} for i := 0; i < 100; i++ { l := make([]uint64, 100) @@ -43,17 +44,19 @@ func TestMemPostings_ensureOrder(t *testing.T) { } v := fmt.Sprintf("%d", i) - p.m[labels.Label{"a", v}] = l + p.m["a"][v] = l } p.EnsureOrder() - for _, l := range p.m { - ok := sort.SliceIsSorted(l, func(i, j int) bool { - return l[i] < l[j] - }) - if !ok { - t.Fatalf("postings list %v is not sorted", l) + for _, e := range p.m { + for _, l := range e { + ok := sort.SliceIsSorted(l, func(i, j int) bool { + return l[i] < l[j] + }) + if !ok { + t.Fatalf("postings list %v is not sorted", l) + } } } }