diff --git a/model/labels/regexp.go b/model/labels/regexp.go index f35dc76f6..79e340984 100644 --- a/model/labels/regexp.go +++ b/model/labels/regexp.go @@ -16,6 +16,7 @@ package labels import ( "slices" "strings" + "unicode/utf8" "github.com/grafana/regexp" "github.com/grafana/regexp/syntax" @@ -827,8 +828,7 @@ type zeroOrOneCharacterStringMatcher struct { } func (m *zeroOrOneCharacterStringMatcher) Matches(s string) bool { - // Zero or one. - if len(s) > 1 { + if moreThanOneRune(s) { return false } @@ -840,6 +840,27 @@ func (m *zeroOrOneCharacterStringMatcher) Matches(s string) bool { return s[0] != '\n' } +// moreThanOneRune returns true if there are more than one runes in the string. +// It doesn't check whether the string is valid UTF-8. +// The return value should be always equal to utf8.RuneCountInString(s) > 1, +// but the function is optimized for the common case where the string prefix is ASCII. +func moreThanOneRune(s string) bool { + // If len(s) is exactly one or zero, there can't be more than one rune. + // Exit through this path quickly. + if len(s) <= 1 { + return false + } + + // There's one or more bytes: + // If first byte is ASCII then there are multiple runes if there are more bytes after that. + if s[0] < utf8.RuneSelf { + return len(s) > 1 + } + + // Less common case: first is a multibyte rune. + return utf8.RuneCountInString(s) > 1 +} + // trueMatcher is a stringMatcher which matches any string (always returns true). type trueMatcher struct{} diff --git a/model/labels/regexp_test.go b/model/labels/regexp_test.go index 3a15b52b4..47d3eeb4a 100644 --- a/model/labels/regexp_test.go +++ b/model/labels/regexp_test.go @@ -84,7 +84,7 @@ var ( "foo", " foo bar", "bar", "buzz\nbar", "bar foo", "bfoo", "\n", "\nfoo", "foo\n", "hello foo world", "hello foo\n world", "", "FOO", "Foo", "OO", "Oo", "\nfoo\n", strings.Repeat("f", 20), "prometheus", "prometheus_api_v1", "prometheus_api_v1_foo", "10.0.1.20", "10.0.2.10", "10.0.3.30", "10.0.4.40", - "foofoo0", "foofoo", + "foofoo0", "foofoo", "😀foo0", // Values matching / not matching the test regexps on long alternations. "zQPbMkNO", "zQPbMkNo", "jyyfj00j0061", "jyyfj00j006", "jyyfj00j00612", "NNSPdvMi", "NNSPdvMiXXX", "NNSPdvMixxx", "nnSPdvMi", "nnSPdvMiXXX", diff --git a/promql/engine.go b/promql/engine.go index b8a8ea095..4bd9d25d7 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2024,25 +2024,21 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec vec := make(Vector, 0, len(vs.Series)) for i, s := range vs.Series { it := seriesIterators[i] - t, f, h, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) - if ok { - vec = append(vec, Sample{ - Metric: s.Labels(), - T: t, - F: f, - H: h, - }) - histSize := 0 - if h != nil { - histSize := h.Size() / 16 // 16 bytes per sample. - ev.currentSamples += histSize - } - ev.currentSamples++ + t, _, _, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) + if !ok { + continue + } - ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, int64(1+histSize)) - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } + // Note that we ignore the sample values because call only cares about the timestamp. + vec = append(vec, Sample{ + Metric: s.Labels(), + T: t, + }) + + ev.currentSamples++ + ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) } } ev.samplesStats.UpdatePeak(ev.currentSamples) diff --git a/promql/engine_test.go b/promql/engine_test.go index 0202c15ae..09992c672 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -818,8 +818,8 @@ load 10s { Query: "timestamp(metricWith1HistogramEvery10Seconds)", Start: time.Unix(21, 0), - PeakSamples: 13, // histogram size 12 + 1 extra because of timestamp - TotalSamples: 1, // 1 float sample (because of timestamp) / 10 seconds + PeakSamples: 2, + TotalSamples: 1, // 1 float sample (because of timestamp) / 10 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 21000: 1, }, @@ -1116,7 +1116,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, - PeakSamples: 16, + PeakSamples: 5, TotalSamples: 4, // 1 sample per query * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 1, diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 2d6aa30a1..65dac99c5 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -38,7 +38,7 @@ type Settings struct { SendMetadata bool } -// PrometheusConverter converts from OTel write format to Prometheus write format. +// PrometheusConverter converts from OTel write format to Prometheus remote write format. type PrometheusConverter struct { unique map[uint64]*prompb.TimeSeries conflicts map[uint64][]*prompb.TimeSeries @@ -130,25 +130,6 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) return } -// TimeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format. -func (c *PrometheusConverter) TimeSeries() []prompb.TimeSeries { - conflicts := 0 - for _, ts := range c.conflicts { - conflicts += len(ts) - } - allTS := make([]prompb.TimeSeries, 0, len(c.unique)+conflicts) - for _, ts := range c.unique { - allTS = append(allTS, *ts) - } - for _, cTS := range c.conflicts { - for _, ts := range cTS { - allTS = append(allTS, *ts) - } - } - - return allTS -} - func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool { if len(ts.Labels) != len(lbls) { return false diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go index 4797daeec..37ac67774 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go @@ -64,7 +64,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { } } -func createExportRequest(resourceAttributeCount int, histogramCount int, nonHistogramCount int, labelsPerMetric int, exemplarsPerSeries int) pmetricotlp.ExportRequest { +func createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries int) pmetricotlp.ExportRequest { request := pmetricotlp.NewExportRequest() rm := request.Metrics().ResourceMetrics().AppendEmpty() diff --git a/storage/remote/otlptranslator/prometheusremotewrite/timeseries.go b/storage/remote/otlptranslator/prometheusremotewrite/timeseries.go new file mode 100644 index 000000000..fe973761a --- /dev/null +++ b/storage/remote/otlptranslator/prometheusremotewrite/timeseries.go @@ -0,0 +1,41 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Provenance-includes-location: +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/metrics_to_prw.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright The OpenTelemetry Authors. + +package prometheusremotewrite + +import ( + "github.com/prometheus/prometheus/prompb" +) + +// TimeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format. +func (c *PrometheusConverter) TimeSeries() []prompb.TimeSeries { + conflicts := 0 + for _, ts := range c.conflicts { + conflicts += len(ts) + } + allTS := make([]prompb.TimeSeries, 0, len(c.unique)+conflicts) + for _, ts := range c.unique { + allTS = append(allTS, *ts) + } + for _, cTS := range c.conflicts { + for _, ts := range cTS { + allTS = append(allTS, *ts) + } + } + + return allTS +} diff --git a/tsdb/chunkenc/bstream.go b/tsdb/chunkenc/bstream.go index 7b17f4686..8cc59f3ea 100644 --- a/tsdb/chunkenc/bstream.go +++ b/tsdb/chunkenc/bstream.go @@ -52,6 +52,12 @@ type bstream struct { count uint8 // How many right-most bits are available for writing in the current byte (the last byte of the stream). } +// Reset resets b around stream. +func (b *bstream) Reset(stream []byte) { + b.stream = stream + b.count = 0 +} + func (b *bstream) bytes() []byte { return b.stream } diff --git a/tsdb/chunkenc/bstream_test.go b/tsdb/chunkenc/bstream_test.go index 66a54bc8e..8ac45ef0b 100644 --- a/tsdb/chunkenc/bstream_test.go +++ b/tsdb/chunkenc/bstream_test.go @@ -19,6 +19,19 @@ import ( "github.com/stretchr/testify/require" ) +func TestBstream_Reset(t *testing.T) { + bs := bstream{ + stream: []byte("test"), + count: 10, + } + bs.Reset([]byte("was reset")) + + require.Equal(t, bstream{ + stream: []byte("was reset"), + count: 0, + }, bs) +} + func TestBstreamReader(t *testing.T) { // Write to the bit stream. w := bstream{} diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 21c41257b..1421f3b39 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -87,6 +87,9 @@ type Chunk interface { // There's no strong guarantee that no samples will be appended once // Compact() is called. Implementing this function is optional. Compact() + + // Reset resets the chunk given stream. + Reset(stream []byte) } type Iterable interface { @@ -303,64 +306,47 @@ func NewPool() Pool { } func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { + var c Chunk switch e { case EncXOR: - c := p.xor.Get().(*XORChunk) - c.b.stream = b - c.b.count = 0 - return c, nil + c = p.xor.Get().(*XORChunk) case EncHistogram: - c := p.histogram.Get().(*HistogramChunk) - c.b.stream = b - c.b.count = 0 - return c, nil + c = p.histogram.Get().(*HistogramChunk) case EncFloatHistogram: - c := p.floatHistogram.Get().(*FloatHistogramChunk) - c.b.stream = b - c.b.count = 0 - return c, nil + c = p.floatHistogram.Get().(*FloatHistogramChunk) + default: + return nil, fmt.Errorf("invalid chunk encoding %q", e) } - return nil, fmt.Errorf("invalid chunk encoding %q", e) + + c.Reset(b) + return c, nil } func (p *pool) Put(c Chunk) error { + var sp *sync.Pool + var ok bool switch c.Encoding() { case EncXOR: - xc, ok := c.(*XORChunk) - // This may happen often with wrapped chunks. Nothing we can really do about - // it but returning an error would cause a lot of allocations again. Thus, - // we just skip it. - if !ok { - return nil - } - xc.b.stream = nil - xc.b.count = 0 - p.xor.Put(c) + _, ok = c.(*XORChunk) + sp = &p.xor case EncHistogram: - sh, ok := c.(*HistogramChunk) - // This may happen often with wrapped chunks. Nothing we can really do about - // it but returning an error would cause a lot of allocations again. Thus, - // we just skip it. - if !ok { - return nil - } - sh.b.stream = nil - sh.b.count = 0 - p.histogram.Put(c) + _, ok = c.(*HistogramChunk) + sp = &p.histogram case EncFloatHistogram: - sh, ok := c.(*FloatHistogramChunk) - // This may happen often with wrapped chunks. Nothing we can really do about - // it but returning an error would cause a lot of allocations again. Thus, - // we just skip it. - if !ok { - return nil - } - sh.b.stream = nil - sh.b.count = 0 - p.floatHistogram.Put(c) + _, ok = c.(*FloatHistogramChunk) + sp = &p.floatHistogram default: return fmt.Errorf("invalid chunk encoding %q", c.Encoding()) } + if !ok { + // This may happen often with wrapped chunks. Nothing we can really do about + // it but returning an error would cause a lot of allocations again. Thus, + // we just skip it. + return nil + } + + c.Reset(nil) + sp.Put(c) return nil } diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index 9db1bf364..b72492a08 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -110,6 +110,96 @@ func testChunk(t *testing.T, c Chunk) { require.Equal(t, ValNone, it3.Seek(exp[len(exp)-1].t+1)) } +func TestPool(t *testing.T) { + p := NewPool() + for _, tc := range []struct { + name string + encoding Encoding + expErr error + }{ + { + name: "xor", + encoding: EncXOR, + }, + { + name: "histogram", + encoding: EncHistogram, + }, + { + name: "float histogram", + encoding: EncFloatHistogram, + }, + { + name: "invalid encoding", + encoding: EncNone, + expErr: fmt.Errorf(`invalid chunk encoding "none"`), + }, + } { + t.Run(tc.name, func(t *testing.T) { + c, err := p.Get(tc.encoding, []byte("test")) + if tc.expErr != nil { + require.EqualError(t, err, tc.expErr.Error()) + return + } + + require.NoError(t, err) + + var b *bstream + switch tc.encoding { + case EncHistogram: + b = &c.(*HistogramChunk).b + case EncFloatHistogram: + b = &c.(*FloatHistogramChunk).b + default: + b = &c.(*XORChunk).b + } + + require.Equal(t, &bstream{ + stream: []byte("test"), + count: 0, + }, b) + + b.count = 1 + require.NoError(t, p.Put(c)) + require.Equal(t, &bstream{ + stream: nil, + count: 0, + }, b) + }) + } + + t.Run("put bad chunk wrapper", func(t *testing.T) { + // When a wrapping chunk poses as an encoding it can't be converted to, Put should skip it. + c := fakeChunk{ + encoding: EncXOR, + t: t, + } + require.NoError(t, p.Put(c)) + }) + t.Run("put invalid encoding", func(t *testing.T) { + c := fakeChunk{ + encoding: EncNone, + t: t, + } + require.EqualError(t, p.Put(c), `invalid chunk encoding "none"`) + }) +} + +type fakeChunk struct { + Chunk + + encoding Encoding + t *testing.T +} + +func (c fakeChunk) Encoding() Encoding { + return c.encoding +} + +func (c fakeChunk) Reset([]byte) { + c.t.Fatal("Reset should not be called") +} + func benchmarkIterator(b *testing.B, newChunk func() Chunk) { const samplesPerChunk = 250 var ( diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 88d189254..1eed46ca8 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -44,6 +44,10 @@ func NewFloatHistogramChunk() *FloatHistogramChunk { return &FloatHistogramChunk{b: bstream{stream: b, count: 0}} } +func (c *FloatHistogramChunk) Reset(stream []byte) { + c.b.Reset(stream) +} + // xorValue holds all the necessary information to encode // and decode XOR encoded float64 values. type xorValue struct { diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index cb09eda26..e12aec4dc 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -45,6 +45,10 @@ func NewHistogramChunk() *HistogramChunk { return &HistogramChunk{b: bstream{stream: b, count: 0}} } +func (c *HistogramChunk) Reset(stream []byte) { + c.b.Reset(stream) +} + // Encoding returns the encoding type. func (c *HistogramChunk) Encoding() Encoding { return EncHistogram diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 07b923831..9430de396 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -66,6 +66,10 @@ func NewXORChunk() *XORChunk { return &XORChunk{b: bstream{stream: b, count: 0}} } +func (c *XORChunk) Reset(stream []byte) { + c.b.Reset(stream) +} + // Encoding returns the encoding type. func (c *XORChunk) Encoding() Encoding { return EncXOR @@ -171,7 +175,6 @@ func (a *xorAppender) Append(t int64, v float64) { } a.writeVDelta(v) - default: tDelta = uint64(t - a.t) dod := int64(tDelta - a.tDelta)