mirror of
https://github.com/prometheus/prometheus
synced 2024-12-30 19:03:03 +00:00
Merge remote-tracking branch 'prometheus/main' into arve/wlog-histograms
This commit is contained in:
commit
f094dd50e0
@ -5,6 +5,7 @@
|
|||||||
* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980
|
* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980
|
||||||
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974
|
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974
|
||||||
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
|
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
|
||||||
|
* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620
|
||||||
* [BUGFIX] OTLP: Don't generate target_info unless at least one identifying label is defined. #13991
|
* [BUGFIX] OTLP: Don't generate target_info unless at least one identifying label is defined. #13991
|
||||||
* [BUGFIX] OTLP: Don't generate target_info unless there are metrics. #13991
|
* [BUGFIX] OTLP: Don't generate target_info unless there are metrics. #13991
|
||||||
* [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042
|
* [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042
|
||||||
|
@ -77,6 +77,10 @@ type IndexReader interface {
|
|||||||
// during background garbage collections.
|
// during background garbage collections.
|
||||||
Postings(ctx context.Context, name string, values ...string) (index.Postings, error)
|
Postings(ctx context.Context, name string, values ...string) (index.Postings, error)
|
||||||
|
|
||||||
|
// PostingsForLabelMatching returns a sorted iterator over postings having a label with the given name and a value for which match returns true.
|
||||||
|
// If no postings are found having at least one matching label, an empty iterator is returned.
|
||||||
|
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings
|
||||||
|
|
||||||
// SortedPostings returns a postings list that is reordered to be sorted
|
// SortedPostings returns a postings list that is reordered to be sorted
|
||||||
// by the label set of the underlying series.
|
// by the label set of the underlying series.
|
||||||
SortedPostings(index.Postings) index.Postings
|
SortedPostings(index.Postings) index.Postings
|
||||||
@ -518,6 +522,10 @@ func (r blockIndexReader) Postings(ctx context.Context, name string, values ...s
|
|||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r blockIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||||
|
return r.ir.PostingsForLabelMatching(ctx, name, match)
|
||||||
|
}
|
||||||
|
|
||||||
func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
|
func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||||
return r.ir.SortedPostings(p)
|
return r.ir.SortedPostings(p)
|
||||||
}
|
}
|
||||||
|
@ -36,6 +36,7 @@ import (
|
|||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/index"
|
||||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -509,6 +510,86 @@ func TestLabelNamesWithMatchers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBlockIndexReader_PostingsForLabelMatching(t *testing.T) {
|
||||||
|
testPostingsForLabelMatching(t, 2, func(t *testing.T, series []labels.Labels) IndexReader {
|
||||||
|
var seriesEntries []storage.Series
|
||||||
|
for _, s := range series {
|
||||||
|
seriesEntries = append(seriesEntries, storage.NewListSeries(s, []chunks.Sample{sample{100, 0, nil, nil}}))
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDir := createBlock(t, t.TempDir(), seriesEntries)
|
||||||
|
files, err := sequenceFiles(chunkDir(blockDir))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, files, "No chunk created.")
|
||||||
|
|
||||||
|
block, err := OpenBlock(nil, blockDir, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() { require.NoError(t, block.Close()) })
|
||||||
|
|
||||||
|
ir, err := block.Index()
|
||||||
|
require.NoError(t, err)
|
||||||
|
return ir
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testPostingsForLabelMatching(t *testing.T, offset storage.SeriesRef, setUp func(*testing.T, []labels.Labels) IndexReader) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
series := []labels.Labels{
|
||||||
|
labels.FromStrings("n", "1"),
|
||||||
|
labels.FromStrings("n", "1", "i", "a"),
|
||||||
|
labels.FromStrings("n", "1", "i", "b"),
|
||||||
|
labels.FromStrings("n", "2"),
|
||||||
|
labels.FromStrings("n", "2.5"),
|
||||||
|
}
|
||||||
|
ir := setUp(t, series)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, ir.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
labelName string
|
||||||
|
match func(string) bool
|
||||||
|
exp []storage.SeriesRef
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "n=1",
|
||||||
|
labelName: "n",
|
||||||
|
match: func(val string) bool {
|
||||||
|
return val == "1"
|
||||||
|
},
|
||||||
|
exp: []storage.SeriesRef{offset + 1, offset + 2, offset + 3},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "n=2",
|
||||||
|
labelName: "n",
|
||||||
|
match: func(val string) bool {
|
||||||
|
return val == "2"
|
||||||
|
},
|
||||||
|
exp: []storage.SeriesRef{offset + 4},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "missing label",
|
||||||
|
labelName: "missing",
|
||||||
|
match: func(val string) bool {
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
exp: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
p := ir.PostingsForLabelMatching(ctx, tc.labelName, tc.match)
|
||||||
|
require.NotNil(t, p)
|
||||||
|
srs, err := index.ExpandPostings(p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tc.exp, srs)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// createBlock creates a block with given set of series and returns its dir.
|
// createBlock creates a block with given set of series and returns its dir.
|
||||||
func createBlock(tb testing.TB, dir string, series []storage.Series) string {
|
func createBlock(tb testing.TB, dir string, series []storage.Series) string {
|
||||||
blockDir, err := CreateBlock(series, dir, 0, log.NewNopLogger())
|
blockDir, err := CreateBlock(series, dir, 0, log.NewNopLogger())
|
||||||
|
@ -121,6 +121,10 @@ func (h *headIndexReader) Postings(ctx context.Context, name string, values ...s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||||
|
return h.head.postings.PostingsForLabelMatching(ctx, name, match)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||||
series := make([]*memSeries, 0, 128)
|
series := make([]*memSeries, 0, 128)
|
||||||
|
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@ -552,3 +553,25 @@ func TestMemSeries_chunk(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHeadIndexReader_PostingsForLabelMatching(t *testing.T) {
|
||||||
|
testPostingsForLabelMatching(t, 0, func(t *testing.T, series []labels.Labels) IndexReader {
|
||||||
|
opts := DefaultHeadOptions()
|
||||||
|
opts.ChunkRange = 1000
|
||||||
|
opts.ChunkDirRoot = t.TempDir()
|
||||||
|
h, err := NewHead(nil, nil, nil, nil, opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, h.Close())
|
||||||
|
})
|
||||||
|
app := h.Appender(context.Background())
|
||||||
|
for _, s := range series {
|
||||||
|
app.Append(0, s, 0, 0)
|
||||||
|
}
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
ir, err := h.Index()
|
||||||
|
require.NoError(t, err)
|
||||||
|
return ir
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -1536,36 +1536,14 @@ func (r *Reader) LabelValues(ctx context.Context, name string, matchers ...*labe
|
|||||||
if len(e) == 0 {
|
if len(e) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
values := make([]string, 0, len(e)*symbolFactor)
|
values := make([]string, 0, len(e)*symbolFactor)
|
||||||
|
|
||||||
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
|
|
||||||
d.Skip(e[0].off)
|
|
||||||
lastVal := e[len(e)-1].value
|
lastVal := e[len(e)-1].value
|
||||||
|
err := r.traversePostingOffsets(ctx, e[0].off, func(val string, _ uint64) (bool, error) {
|
||||||
skip := 0
|
values = append(values, val)
|
||||||
for d.Err() == nil && ctx.Err() == nil {
|
return val != lastVal, nil
|
||||||
if skip == 0 {
|
})
|
||||||
// These are always the same number of bytes,
|
return values, err
|
||||||
// and it's faster to skip than parse.
|
|
||||||
skip = d.Len()
|
|
||||||
d.Uvarint() // Keycount.
|
|
||||||
d.UvarintBytes() // Label name.
|
|
||||||
skip -= d.Len()
|
|
||||||
} else {
|
|
||||||
d.Skip(skip)
|
|
||||||
}
|
|
||||||
s := yoloString(d.UvarintBytes()) // Label value.
|
|
||||||
values = append(values, s)
|
|
||||||
if s == lastVal {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
d.Uvarint64() // Offset.
|
|
||||||
}
|
|
||||||
if d.Err() != nil {
|
|
||||||
return nil, fmt.Errorf("get postings offset entry: %w", d.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
return values, ctx.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelNamesFor returns all the label names for the series referred to by IDs.
|
// LabelNamesFor returns all the label names for the series referred to by IDs.
|
||||||
@ -1662,6 +1640,44 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// traversePostingOffsets traverses r's posting offsets table, starting at off, and calls cb with every label value and postings offset.
|
||||||
|
// If cb returns false (or an error), the traversing is interrupted.
|
||||||
|
func (r *Reader) traversePostingOffsets(ctx context.Context, off int, cb func(string, uint64) (bool, error)) error {
|
||||||
|
// Don't Crc32 the entire postings offset table, this is very slow
|
||||||
|
// so hope any issues were caught at startup.
|
||||||
|
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
|
||||||
|
d.Skip(off)
|
||||||
|
skip := 0
|
||||||
|
ctxErr := ctx.Err()
|
||||||
|
for d.Err() == nil && ctxErr == nil {
|
||||||
|
if skip == 0 {
|
||||||
|
// These are always the same number of bytes,
|
||||||
|
// and it's faster to skip than to parse.
|
||||||
|
skip = d.Len()
|
||||||
|
d.Uvarint() // Keycount.
|
||||||
|
d.UvarintBytes() // Label name.
|
||||||
|
skip -= d.Len()
|
||||||
|
} else {
|
||||||
|
d.Skip(skip)
|
||||||
|
}
|
||||||
|
v := yoloString(d.UvarintBytes()) // Label value.
|
||||||
|
postingsOff := d.Uvarint64() // Offset.
|
||||||
|
if ok, err := cb(v, postingsOff); err != nil {
|
||||||
|
return err
|
||||||
|
} else if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
ctxErr = ctx.Err()
|
||||||
|
}
|
||||||
|
if d.Err() != nil {
|
||||||
|
return fmt.Errorf("get postings offset entry: %w", d.Err())
|
||||||
|
}
|
||||||
|
if ctxErr != nil {
|
||||||
|
return fmt.Errorf("get postings offset entry: %w", ctxErr)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Reader) Postings(ctx context.Context, name string, values ...string) (Postings, error) {
|
func (r *Reader) Postings(ctx context.Context, name string, values ...string) (Postings, error) {
|
||||||
if r.version == FormatV1 {
|
if r.version == FormatV1 {
|
||||||
e, ok := r.postingsV1[name]
|
e, ok := r.postingsV1[name]
|
||||||
@ -1696,7 +1712,6 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
|
|||||||
|
|
||||||
slices.Sort(values) // Values must be in order so we can step through the table on disk.
|
slices.Sort(values) // Values must be in order so we can step through the table on disk.
|
||||||
res := make([]Postings, 0, len(values))
|
res := make([]Postings, 0, len(values))
|
||||||
skip := 0
|
|
||||||
valueIndex := 0
|
valueIndex := 0
|
||||||
for valueIndex < len(values) && values[valueIndex] < e[0].value {
|
for valueIndex < len(values) && values[valueIndex] < e[0].value {
|
||||||
// Discard values before the start.
|
// Discard values before the start.
|
||||||
@ -1714,33 +1729,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
|
|||||||
// Need to look from previous entry.
|
// Need to look from previous entry.
|
||||||
i--
|
i--
|
||||||
}
|
}
|
||||||
// Don't Crc32 the entire postings offset table, this is very slow
|
|
||||||
// so hope any issues were caught at startup.
|
|
||||||
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
|
|
||||||
d.Skip(e[i].off)
|
|
||||||
|
|
||||||
// Iterate on the offset table.
|
if err := r.traversePostingOffsets(ctx, e[i].off, func(val string, postingsOff uint64) (bool, error) {
|
||||||
var postingsOff uint64 // The offset into the postings table.
|
for val >= value {
|
||||||
for d.Err() == nil && ctx.Err() == nil {
|
if val == value {
|
||||||
if skip == 0 {
|
|
||||||
// These are always the same number of bytes,
|
|
||||||
// and it's faster to skip than parse.
|
|
||||||
skip = d.Len()
|
|
||||||
d.Uvarint() // Keycount.
|
|
||||||
d.UvarintBytes() // Label name.
|
|
||||||
skip -= d.Len()
|
|
||||||
} else {
|
|
||||||
d.Skip(skip)
|
|
||||||
}
|
|
||||||
v := d.UvarintBytes() // Label value.
|
|
||||||
postingsOff = d.Uvarint64() // Offset.
|
|
||||||
for string(v) >= value {
|
|
||||||
if string(v) == value {
|
|
||||||
// Read from the postings table.
|
// Read from the postings table.
|
||||||
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
|
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
|
||||||
_, p, err := r.dec.Postings(d2.Get())
|
_, p, err := r.dec.Postings(d2.Get())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("decode postings: %w", err)
|
return false, fmt.Errorf("decode postings: %w", err)
|
||||||
}
|
}
|
||||||
res = append(res, p)
|
res = append(res, p)
|
||||||
}
|
}
|
||||||
@ -1752,20 +1749,72 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
|
|||||||
}
|
}
|
||||||
if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) {
|
if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) {
|
||||||
// Need to go to a later postings offset entry, if there is one.
|
// Need to go to a later postings offset entry, if there is one.
|
||||||
break
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
return true, nil
|
||||||
if d.Err() != nil {
|
}); err != nil {
|
||||||
return nil, fmt.Errorf("get postings offset entry: %w", d.Err())
|
return nil, err
|
||||||
}
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return nil, fmt.Errorf("get postings offset entry: %w", ctx.Err())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return Merge(ctx, res...), nil
|
return Merge(ctx, res...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
|
||||||
|
if r.version == FormatV1 {
|
||||||
|
return r.postingsForLabelMatchingV1(ctx, name, match)
|
||||||
|
}
|
||||||
|
|
||||||
|
e := r.postings[name]
|
||||||
|
if len(e) == 0 {
|
||||||
|
return EmptyPostings()
|
||||||
|
}
|
||||||
|
|
||||||
|
lastVal := e[len(e)-1].value
|
||||||
|
var its []Postings
|
||||||
|
if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) {
|
||||||
|
if match(val) {
|
||||||
|
// We want this postings iterator since the value is a match
|
||||||
|
postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
|
||||||
|
_, p, err := r.dec.PostingsFromDecbuf(postingsDec)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("decode postings: %w", err)
|
||||||
|
}
|
||||||
|
its = append(its, p)
|
||||||
|
}
|
||||||
|
return val != lastVal, nil
|
||||||
|
}); err != nil {
|
||||||
|
return ErrPostings(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return Merge(ctx, its...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, match func(string) bool) Postings {
|
||||||
|
e := r.postingsV1[name]
|
||||||
|
if len(e) == 0 {
|
||||||
|
return EmptyPostings()
|
||||||
|
}
|
||||||
|
|
||||||
|
var its []Postings
|
||||||
|
for val, offset := range e {
|
||||||
|
if !match(val) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read from the postings table.
|
||||||
|
d := encoding.NewDecbufAt(r.b, int(offset), castagnoliTable)
|
||||||
|
_, p, err := r.dec.PostingsFromDecbuf(d)
|
||||||
|
if err != nil {
|
||||||
|
return ErrPostings(fmt.Errorf("decode postings: %w", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
its = append(its, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return Merge(ctx, its...)
|
||||||
|
}
|
||||||
|
|
||||||
// SortedPostings returns the given postings list reordered so that the backing series
|
// SortedPostings returns the given postings list reordered so that the backing series
|
||||||
// are sorted.
|
// are sorted.
|
||||||
func (r *Reader) SortedPostings(p Postings) Postings {
|
func (r *Reader) SortedPostings(p Postings) Postings {
|
||||||
@ -1856,6 +1905,11 @@ type Decoder struct {
|
|||||||
// Postings returns a postings list for b and its number of elements.
|
// Postings returns a postings list for b and its number of elements.
|
||||||
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
|
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
|
||||||
d := encoding.Decbuf{B: b}
|
d := encoding.Decbuf{B: b}
|
||||||
|
return dec.PostingsFromDecbuf(d)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PostingsFromDecbuf returns a postings list for d and its number of elements.
|
||||||
|
func (dec *Decoder) PostingsFromDecbuf(d encoding.Decbuf) (int, Postings, error) {
|
||||||
n := d.Be32int()
|
n := d.Be32int()
|
||||||
l := d.Get()
|
l := d.Get()
|
||||||
if d.Err() != nil {
|
if d.Err() != nil {
|
||||||
|
@ -397,6 +397,35 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
|
||||||
|
p.mtx.RLock()
|
||||||
|
|
||||||
|
e := p.m[name]
|
||||||
|
if len(e) == 0 {
|
||||||
|
p.mtx.RUnlock()
|
||||||
|
return EmptyPostings()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Benchmarking shows that first copying the values into a slice and then matching over that is
|
||||||
|
// faster than matching over the map keys directly, at least on AMD64.
|
||||||
|
vals := make([]string, 0, len(e))
|
||||||
|
for v, srs := range e {
|
||||||
|
if len(srs) > 0 {
|
||||||
|
vals = append(vals, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var its []Postings
|
||||||
|
for _, v := range vals {
|
||||||
|
if match(v) {
|
||||||
|
its = append(its, NewListPostings(e[v]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p.mtx.RUnlock()
|
||||||
|
|
||||||
|
return Merge(ctx, its...)
|
||||||
|
}
|
||||||
|
|
||||||
// ExpandPostings returns the postings expanded as a slice.
|
// ExpandPostings returns the postings expanded as a slice.
|
||||||
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
|
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
|
@ -446,6 +446,10 @@ func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string,
|
|||||||
return index.NewListPostings(ir.ch.postings), nil
|
return index.NewListPostings(ir.ch.postings), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
|
||||||
|
return index.ErrPostings(errors.New("not supported"))
|
||||||
|
}
|
||||||
|
|
||||||
func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
|
func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||||
// This will already be sorted from the Postings() call above.
|
// This will already be sorted from the Postings() call above.
|
||||||
return p
|
return p
|
||||||
|
@ -326,23 +326,8 @@ func postingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Matcher)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vals, err := ix.LabelValues(ctx, m.Name)
|
it := ix.PostingsForLabelMatching(ctx, m.Name, m.Matches)
|
||||||
if err != nil {
|
return it, it.Err()
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res := vals[:0]
|
|
||||||
for _, val := range vals {
|
|
||||||
if m.Matches(val) {
|
|
||||||
res = append(res, val)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(res) == 0 {
|
|
||||||
return index.EmptyPostings(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return ix.Postings(ctx, m.Name, res...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
||||||
|
@ -2326,6 +2326,16 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
|
|||||||
return index.NewListPostings(ep)
|
return index.NewListPostings(ep)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m mockIndex) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||||
|
var res []index.Postings
|
||||||
|
for l, srs := range m.postings {
|
||||||
|
if l.Name == name && match(l.Value) {
|
||||||
|
res = append(res, index.NewListPostings(srs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return index.Merge(ctx, res...)
|
||||||
|
}
|
||||||
|
|
||||||
func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||||
out := make([]storage.SeriesRef, 0, 128)
|
out := make([]storage.SeriesRef, 0, 128)
|
||||||
|
|
||||||
@ -3238,6 +3248,10 @@ func (m mockMatcherIndex) LabelNames(context.Context, ...*labels.Matcher) ([]str
|
|||||||
return []string{}, nil
|
return []string{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
|
||||||
|
return index.ErrPostings(fmt.Errorf("PostingsForLabelMatching called"))
|
||||||
|
}
|
||||||
|
|
||||||
func TestPostingsForMatcher(t *testing.T) {
|
func TestPostingsForMatcher(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user