diff --git a/CHANGELOG.md b/CHANGELOG.md index e17124abe..603a75294 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980 * [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974 * [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991 +* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620 * [BUGFIX] OTLP: Don't generate target_info unless at least one identifying label is defined. #13991 * [BUGFIX] OTLP: Don't generate target_info unless there are metrics. #13991 * [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042 diff --git a/tsdb/block.go b/tsdb/block.go index abd223e4a..83b86a58d 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -77,6 +77,10 @@ type IndexReader interface { // during background garbage collections. Postings(ctx context.Context, name string, values ...string) (index.Postings, error) + // PostingsForLabelMatching returns a sorted iterator over postings having a label with the given name and a value for which match returns true. + // If no postings are found having at least one matching label, an empty iterator is returned. + PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings + // SortedPostings returns a postings list that is reordered to be sorted // by the label set of the underlying series. SortedPostings(index.Postings) index.Postings @@ -518,6 +522,10 @@ func (r blockIndexReader) Postings(ctx context.Context, name string, values ...s return p, nil } +func (r blockIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings { + return r.ir.PostingsForLabelMatching(ctx, name, match) +} + func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { return r.ir.SortedPostings(p) } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 6d15d1838..42acc3c69 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/wlog" ) @@ -509,6 +510,86 @@ func TestLabelNamesWithMatchers(t *testing.T) { } } +func TestBlockIndexReader_PostingsForLabelMatching(t *testing.T) { + testPostingsForLabelMatching(t, 2, func(t *testing.T, series []labels.Labels) IndexReader { + var seriesEntries []storage.Series + for _, s := range series { + seriesEntries = append(seriesEntries, storage.NewListSeries(s, []chunks.Sample{sample{100, 0, nil, nil}})) + } + + blockDir := createBlock(t, t.TempDir(), seriesEntries) + files, err := sequenceFiles(chunkDir(blockDir)) + require.NoError(t, err) + require.NotEmpty(t, files, "No chunk created.") + + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, block.Close()) }) + + ir, err := block.Index() + require.NoError(t, err) + return ir + }) +} + +func testPostingsForLabelMatching(t *testing.T, offset storage.SeriesRef, setUp func(*testing.T, []labels.Labels) IndexReader) { + t.Helper() + + ctx := context.Background() + series := []labels.Labels{ + labels.FromStrings("n", "1"), + labels.FromStrings("n", "1", "i", "a"), + labels.FromStrings("n", "1", "i", "b"), + labels.FromStrings("n", "2"), + labels.FromStrings("n", "2.5"), + } + ir := setUp(t, series) + t.Cleanup(func() { + require.NoError(t, ir.Close()) + }) + + testCases := []struct { + name string + labelName string + match func(string) bool + exp []storage.SeriesRef + }{ + { + name: "n=1", + labelName: "n", + match: func(val string) bool { + return val == "1" + }, + exp: []storage.SeriesRef{offset + 1, offset + 2, offset + 3}, + }, + { + name: "n=2", + labelName: "n", + match: func(val string) bool { + return val == "2" + }, + exp: []storage.SeriesRef{offset + 4}, + }, + { + name: "missing label", + labelName: "missing", + match: func(val string) bool { + return true + }, + exp: nil, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := ir.PostingsForLabelMatching(ctx, tc.labelName, tc.match) + require.NotNil(t, p) + srs, err := index.ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, tc.exp, srs) + }) + } +} + // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []storage.Series) string { blockDir, err := CreateBlock(series, dir, 0, log.NewNopLogger()) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 45bbc81f1..df15abcd5 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -121,6 +121,10 @@ func (h *headIndexReader) Postings(ctx context.Context, name string, values ...s } } +func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings { + return h.head.postings.PostingsForLabelMatching(ctx, name, match) +} + func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { series := make([]*memSeries, 0, 128) diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index de97d70a5..8d835e943 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "sync" "testing" @@ -552,3 +553,25 @@ func TestMemSeries_chunk(t *testing.T) { }) } } + +func TestHeadIndexReader_PostingsForLabelMatching(t *testing.T) { + testPostingsForLabelMatching(t, 0, func(t *testing.T, series []labels.Labels) IndexReader { + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = t.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, h.Close()) + }) + app := h.Appender(context.Background()) + for _, s := range series { + app.Append(0, s, 0, 0) + } + require.NoError(t, app.Commit()) + + ir, err := h.Index() + require.NoError(t, err) + return ir + }) +} diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 69e258125..a36c33c4f 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1536,36 +1536,14 @@ func (r *Reader) LabelValues(ctx context.Context, name string, matchers ...*labe if len(e) == 0 { return nil, nil } + values := make([]string, 0, len(e)*symbolFactor) - - d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) - d.Skip(e[0].off) lastVal := e[len(e)-1].value - - skip := 0 - for d.Err() == nil && ctx.Err() == nil { - if skip == 0 { - // These are always the same number of bytes, - // and it's faster to skip than parse. - skip = d.Len() - d.Uvarint() // Keycount. - d.UvarintBytes() // Label name. - skip -= d.Len() - } else { - d.Skip(skip) - } - s := yoloString(d.UvarintBytes()) // Label value. - values = append(values, s) - if s == lastVal { - break - } - d.Uvarint64() // Offset. - } - if d.Err() != nil { - return nil, fmt.Errorf("get postings offset entry: %w", d.Err()) - } - - return values, ctx.Err() + err := r.traversePostingOffsets(ctx, e[0].off, func(val string, _ uint64) (bool, error) { + values = append(values, val) + return val != lastVal, nil + }) + return values, err } // LabelNamesFor returns all the label names for the series referred to by IDs. @@ -1662,6 +1640,44 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch return nil } +// traversePostingOffsets traverses r's posting offsets table, starting at off, and calls cb with every label value and postings offset. +// If cb returns false (or an error), the traversing is interrupted. +func (r *Reader) traversePostingOffsets(ctx context.Context, off int, cb func(string, uint64) (bool, error)) error { + // Don't Crc32 the entire postings offset table, this is very slow + // so hope any issues were caught at startup. + d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) + d.Skip(off) + skip := 0 + ctxErr := ctx.Err() + for d.Err() == nil && ctxErr == nil { + if skip == 0 { + // These are always the same number of bytes, + // and it's faster to skip than to parse. + skip = d.Len() + d.Uvarint() // Keycount. + d.UvarintBytes() // Label name. + skip -= d.Len() + } else { + d.Skip(skip) + } + v := yoloString(d.UvarintBytes()) // Label value. + postingsOff := d.Uvarint64() // Offset. + if ok, err := cb(v, postingsOff); err != nil { + return err + } else if !ok { + break + } + ctxErr = ctx.Err() + } + if d.Err() != nil { + return fmt.Errorf("get postings offset entry: %w", d.Err()) + } + if ctxErr != nil { + return fmt.Errorf("get postings offset entry: %w", ctxErr) + } + return nil +} + func (r *Reader) Postings(ctx context.Context, name string, values ...string) (Postings, error) { if r.version == FormatV1 { e, ok := r.postingsV1[name] @@ -1696,7 +1712,6 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P slices.Sort(values) // Values must be in order so we can step through the table on disk. res := make([]Postings, 0, len(values)) - skip := 0 valueIndex := 0 for valueIndex < len(values) && values[valueIndex] < e[0].value { // Discard values before the start. @@ -1714,33 +1729,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P // Need to look from previous entry. i-- } - // Don't Crc32 the entire postings offset table, this is very slow - // so hope any issues were caught at startup. - d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) - d.Skip(e[i].off) - // Iterate on the offset table. - var postingsOff uint64 // The offset into the postings table. - for d.Err() == nil && ctx.Err() == nil { - if skip == 0 { - // These are always the same number of bytes, - // and it's faster to skip than parse. - skip = d.Len() - d.Uvarint() // Keycount. - d.UvarintBytes() // Label name. - skip -= d.Len() - } else { - d.Skip(skip) - } - v := d.UvarintBytes() // Label value. - postingsOff = d.Uvarint64() // Offset. - for string(v) >= value { - if string(v) == value { + if err := r.traversePostingOffsets(ctx, e[i].off, func(val string, postingsOff uint64) (bool, error) { + for val >= value { + if val == value { // Read from the postings table. d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) _, p, err := r.dec.Postings(d2.Get()) if err != nil { - return nil, fmt.Errorf("decode postings: %w", err) + return false, fmt.Errorf("decode postings: %w", err) } res = append(res, p) } @@ -1752,20 +1749,72 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P } if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) { // Need to go to a later postings offset entry, if there is one. - break + return false, nil } - } - if d.Err() != nil { - return nil, fmt.Errorf("get postings offset entry: %w", d.Err()) - } - if ctx.Err() != nil { - return nil, fmt.Errorf("get postings offset entry: %w", ctx.Err()) + return true, nil + }); err != nil { + return nil, err } } return Merge(ctx, res...), nil } +func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { + if r.version == FormatV1 { + return r.postingsForLabelMatchingV1(ctx, name, match) + } + + e := r.postings[name] + if len(e) == 0 { + return EmptyPostings() + } + + lastVal := e[len(e)-1].value + var its []Postings + if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) { + if match(val) { + // We want this postings iterator since the value is a match + postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) + _, p, err := r.dec.PostingsFromDecbuf(postingsDec) + if err != nil { + return false, fmt.Errorf("decode postings: %w", err) + } + its = append(its, p) + } + return val != lastVal, nil + }); err != nil { + return ErrPostings(err) + } + + return Merge(ctx, its...) +} + +func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, match func(string) bool) Postings { + e := r.postingsV1[name] + if len(e) == 0 { + return EmptyPostings() + } + + var its []Postings + for val, offset := range e { + if !match(val) { + continue + } + + // Read from the postings table. + d := encoding.NewDecbufAt(r.b, int(offset), castagnoliTable) + _, p, err := r.dec.PostingsFromDecbuf(d) + if err != nil { + return ErrPostings(fmt.Errorf("decode postings: %w", err)) + } + + its = append(its, p) + } + + return Merge(ctx, its...) +} + // SortedPostings returns the given postings list reordered so that the backing series // are sorted. func (r *Reader) SortedPostings(p Postings) Postings { @@ -1856,6 +1905,11 @@ type Decoder struct { // Postings returns a postings list for b and its number of elements. func (dec *Decoder) Postings(b []byte) (int, Postings, error) { d := encoding.Decbuf{B: b} + return dec.PostingsFromDecbuf(d) +} + +// PostingsFromDecbuf returns a postings list for d and its number of elements. +func (dec *Decoder) PostingsFromDecbuf(d encoding.Decbuf) (int, Postings, error) { n := d.Be32int() l := d.Get() if d.Err() != nil { diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 61a5560ee..136b3441e 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -397,6 +397,35 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { } } +func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { + p.mtx.RLock() + + e := p.m[name] + if len(e) == 0 { + p.mtx.RUnlock() + return EmptyPostings() + } + + // Benchmarking shows that first copying the values into a slice and then matching over that is + // faster than matching over the map keys directly, at least on AMD64. + vals := make([]string, 0, len(e)) + for v, srs := range e { + if len(srs) > 0 { + vals = append(vals, v) + } + } + + var its []Postings + for _, v := range vals { + if match(v) { + its = append(its, NewListPostings(e[v])) + } + } + p.mtx.RUnlock() + + return Merge(ctx, its...) +} + // ExpandPostings returns the postings expanded as a slice. func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) { for p.Next() { diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index ed0b3fd22..af431d678 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -446,6 +446,10 @@ func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, return index.NewListPostings(ir.ch.postings), nil } +func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings { + return index.ErrPostings(errors.New("not supported")) +} + func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings { // This will already be sorted from the Postings() call above. return p diff --git a/tsdb/querier.go b/tsdb/querier.go index a6763e996..1170493be 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -326,23 +326,8 @@ func postingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Matcher) } } - vals, err := ix.LabelValues(ctx, m.Name) - if err != nil { - return nil, err - } - - res := vals[:0] - for _, val := range vals { - if m.Matches(val) { - res = append(res, val) - } - } - - if len(res) == 0 { - return index.EmptyPostings(), nil - } - - return ix.Postings(ctx, m.Name, res...) + it := ix.PostingsForLabelMatching(ctx, m.Name, m.Matches) + return it, it.Err() } // inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index a293a983d..16de6373d 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2326,6 +2326,16 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings { return index.NewListPostings(ep) } +func (m mockIndex) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings { + var res []index.Postings + for l, srs := range m.postings { + if l.Name == name && match(l.Value) { + res = append(res, index.NewListPostings(srs)) + } + } + return index.Merge(ctx, res...) +} + func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { out := make([]storage.SeriesRef, 0, 128) @@ -3238,6 +3248,10 @@ func (m mockMatcherIndex) LabelNames(context.Context, ...*labels.Matcher) ([]str return []string{}, nil } +func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings { + return index.ErrPostings(fmt.Errorf("PostingsForLabelMatching called")) +} + func TestPostingsForMatcher(t *testing.T) { ctx := context.Background()