Improve comments, handle allPostingsKey properly

This commit is contained in:
Fabian Reinartz 2017-12-22 09:43:34 +01:00
parent da9614fa54
commit 1e55b7987f
5 changed files with 18 additions and 12 deletions

View File

@ -63,7 +63,7 @@ type IndexReader interface {
// and indices. // and indices.
Symbols() (map[string]struct{}, error) Symbols() (map[string]struct{}, error)
// LabelValues returns the possible label values // LabelValues returns the possible label values.
LabelValues(names ...string) (index.StringTuples, error) LabelValues(names ...string) (index.StringTuples, error)
// Postings returns the postings list iterator for the label pair. // Postings returns the postings list iterator for the label pair.
@ -81,10 +81,10 @@ type IndexReader interface {
// Returns ErrNotFound if the ref does not resolve to a known series. // Returns ErrNotFound if the ref does not resolve to a known series.
Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error
// LabelIndices returns the label pairs for which indices exist. // LabelIndices returns a list of string tuples for which a label value index exists.
LabelIndices() ([][]string, error) LabelIndices() ([][]string, error)
// Close released the underlying resources of the reader. // Close releases the underlying resources of the reader.
Close() error Close() error
} }

View File

@ -63,9 +63,6 @@ var (
errInvalidChecksum = fmt.Errorf("invalid checksum") errInvalidChecksum = fmt.Errorf("invalid checksum")
) )
// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
var castagnoliTable *crc32.Table var castagnoliTable *crc32.Table
func init() { func init() {

View File

@ -516,7 +516,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
allSymbols[s] = struct{}{} allSymbols[s] = struct{}{}
} }
all, err := indexr.Postings("", "") all, err := indexr.Postings(index.AllPostingsKey())
if err != nil { if err != nil {
return err return err
} }

View File

@ -23,6 +23,13 @@ import (
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
var allPostingsKey = labels.Label{}
// AllPostingsKey returns the label key that is used to store the postings list of all existing IDs.
func AllPostingsKey() (name, value string) {
return allPostingsKey.Name, allPostingsKey.Value
}
// MemPostings holds postings list for series ID per label pair. They may be written // MemPostings holds postings list for series ID per label pair. They may be written
// to out of order. // to out of order.
// ensureOrder() must be called once before any reads are done. This allows for quick // ensureOrder() must be called once before any reads are done. This allows for quick
@ -83,11 +90,9 @@ func (p *MemPostings) Get(name, value string) Postings {
// All returns a postings list over all documents ever added. // All returns a postings list over all documents ever added.
func (p *MemPostings) All() Postings { func (p *MemPostings) All() Postings {
return p.Get(allPostingsKey.Name, allPostingsKey.Value) return p.Get(AllPostingsKey())
} }
var allPostingsKey = labels.Label{}
// EnsureOrder ensures that all postings lists are sorted. After it returns all further // EnsureOrder ensures that all postings lists are sorted. After it returns all further
// calls to add and addFor will insert new IDs in a sorted manner. // calls to add and addFor will insert new IDs in a sorted manner.
func (p *MemPostings) EnsureOrder() { func (p *MemPostings) EnsureOrder() {
@ -126,13 +131,18 @@ func (p *MemPostings) EnsureOrder() {
func (p *MemPostings) Delete(deleted map[uint64]struct{}) { func (p *MemPostings) Delete(deleted map[uint64]struct{}) {
var keys []labels.Label var keys []labels.Label
// Collect all keys relevant for deletion once. New keys added afterwards
// can by definition not be affected by any of the given deletes.
p.mtx.RLock() p.mtx.RLock()
for l := range p.m { for l := range p.m {
keys = append(keys, l) keys = append(keys, l)
} }
p.mtx.RUnlock() p.mtx.RUnlock()
// For each key we first analyse whether the postings list is affected by the deletes.
// If yes, we actually reallocate a new postings list.
for _, l := range keys { for _, l := range keys {
// Only lock for processing one postings list so we don't block reads for too long.
p.mtx.Lock() p.mtx.Lock()
found := false found := false

View File

@ -334,11 +334,10 @@ func postingsForUnsetLabelMatcher(ix IndexReader, m labels.Matcher) (index.Posti
rit = append(rit, it) rit = append(rit, it)
} }
allPostings, err := ix.Postings("", "") allPostings, err := ix.Postings(index.AllPostingsKey())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return index.Without(allPostings, index.Merge(rit...)), nil return index.Without(allPostings, index.Merge(rit...)), nil
} }