Merge remote-tracking branch 'prometheus/main' into arve/wlog-histograms

This commit is contained in:
Arve Knudsen 2024-05-08 15:06:41 +02:00
commit f1e4c33943
14 changed files with 234 additions and 89 deletions

View File

@ -16,6 +16,7 @@ package labels
import (
"slices"
"strings"
"unicode/utf8"
"github.com/grafana/regexp"
"github.com/grafana/regexp/syntax"
@ -827,8 +828,7 @@ type zeroOrOneCharacterStringMatcher struct {
}
func (m *zeroOrOneCharacterStringMatcher) Matches(s string) bool {
// Zero or one.
if len(s) > 1 {
if moreThanOneRune(s) {
return false
}
@ -840,6 +840,27 @@ func (m *zeroOrOneCharacterStringMatcher) Matches(s string) bool {
return s[0] != '\n'
}
// moreThanOneRune returns true if there are more than one runes in the string.
// It doesn't check whether the string is valid UTF-8.
// The return value should be always equal to utf8.RuneCountInString(s) > 1,
// but the function is optimized for the common case where the string prefix is ASCII.
func moreThanOneRune(s string) bool {
// If len(s) is exactly one or zero, there can't be more than one rune.
// Exit through this path quickly.
if len(s) <= 1 {
return false
}
// There's one or more bytes:
// If first byte is ASCII then there are multiple runes if there are more bytes after that.
if s[0] < utf8.RuneSelf {
return len(s) > 1
}
// Less common case: first is a multibyte rune.
return utf8.RuneCountInString(s) > 1
}
// trueMatcher is a stringMatcher which matches any string (always returns true).
type trueMatcher struct{}

View File

@ -84,7 +84,7 @@ var (
"foo", " foo bar", "bar", "buzz\nbar", "bar foo", "bfoo", "\n", "\nfoo", "foo\n", "hello foo world", "hello foo\n world", "",
"FOO", "Foo", "OO", "Oo", "\nfoo\n", strings.Repeat("f", 20), "prometheus", "prometheus_api_v1", "prometheus_api_v1_foo",
"10.0.1.20", "10.0.2.10", "10.0.3.30", "10.0.4.40",
"foofoo0", "foofoo",
"foofoo0", "foofoo", "😀foo0",
// Values matching / not matching the test regexps on long alternations.
"zQPbMkNO", "zQPbMkNo", "jyyfj00j0061", "jyyfj00j006", "jyyfj00j00612", "NNSPdvMi", "NNSPdvMiXXX", "NNSPdvMixxx", "nnSPdvMi", "nnSPdvMiXXX",

View File

@ -2024,25 +2024,21 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec
vec := make(Vector, 0, len(vs.Series))
for i, s := range vs.Series {
it := seriesIterators[i]
t, f, h, ok := ev.vectorSelectorSingle(it, vs, enh.Ts)
if ok {
vec = append(vec, Sample{
Metric: s.Labels(),
T: t,
F: f,
H: h,
})
histSize := 0
if h != nil {
histSize := h.Size() / 16 // 16 bytes per sample.
ev.currentSamples += histSize
}
ev.currentSamples++
t, _, _, ok := ev.vectorSelectorSingle(it, vs, enh.Ts)
if !ok {
continue
}
ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, int64(1+histSize))
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
// Note that we ignore the sample values because call only cares about the timestamp.
vec = append(vec, Sample{
Metric: s.Labels(),
T: t,
})
ev.currentSamples++
ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)

View File

@ -818,8 +818,8 @@ load 10s
{
Query: "timestamp(metricWith1HistogramEvery10Seconds)",
Start: time.Unix(21, 0),
PeakSamples: 13, // histogram size 12 + 1 extra because of timestamp
TotalSamples: 1, // 1 float sample (because of timestamp) / 10 seconds
PeakSamples: 2,
TotalSamples: 1, // 1 float sample (because of timestamp) / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 1,
},
@ -1116,7 +1116,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 16,
PeakSamples: 5,
TotalSamples: 4, // 1 sample per query * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 1,

View File

@ -38,7 +38,7 @@ type Settings struct {
SendMetadata bool
}
// PrometheusConverter converts from OTel write format to Prometheus write format.
// PrometheusConverter converts from OTel write format to Prometheus remote write format.
type PrometheusConverter struct {
unique map[uint64]*prompb.TimeSeries
conflicts map[uint64][]*prompb.TimeSeries
@ -130,25 +130,6 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings)
return
}
// TimeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format.
func (c *PrometheusConverter) TimeSeries() []prompb.TimeSeries {
conflicts := 0
for _, ts := range c.conflicts {
conflicts += len(ts)
}
allTS := make([]prompb.TimeSeries, 0, len(c.unique)+conflicts)
for _, ts := range c.unique {
allTS = append(allTS, *ts)
}
for _, cTS := range c.conflicts {
for _, ts := range cTS {
allTS = append(allTS, *ts)
}
}
return allTS
}
func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool {
if len(ts.Labels) != len(lbls) {
return false

View File

@ -64,7 +64,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
}
}
func createExportRequest(resourceAttributeCount int, histogramCount int, nonHistogramCount int, labelsPerMetric int, exemplarsPerSeries int) pmetricotlp.ExportRequest {
func createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries int) pmetricotlp.ExportRequest {
request := pmetricotlp.NewExportRequest()
rm := request.Metrics().ResourceMetrics().AppendEmpty()

View File

@ -0,0 +1,41 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Provenance-includes-location:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/metrics_to_prw.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: Copyright The OpenTelemetry Authors.
package prometheusremotewrite
import (
"github.com/prometheus/prometheus/prompb"
)
// TimeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format.
func (c *PrometheusConverter) TimeSeries() []prompb.TimeSeries {
conflicts := 0
for _, ts := range c.conflicts {
conflicts += len(ts)
}
allTS := make([]prompb.TimeSeries, 0, len(c.unique)+conflicts)
for _, ts := range c.unique {
allTS = append(allTS, *ts)
}
for _, cTS := range c.conflicts {
for _, ts := range cTS {
allTS = append(allTS, *ts)
}
}
return allTS
}

View File

@ -52,6 +52,12 @@ type bstream struct {
count uint8 // How many right-most bits are available for writing in the current byte (the last byte of the stream).
}
// Reset resets b around stream.
func (b *bstream) Reset(stream []byte) {
b.stream = stream
b.count = 0
}
func (b *bstream) bytes() []byte {
return b.stream
}

View File

@ -19,6 +19,19 @@ import (
"github.com/stretchr/testify/require"
)
func TestBstream_Reset(t *testing.T) {
bs := bstream{
stream: []byte("test"),
count: 10,
}
bs.Reset([]byte("was reset"))
require.Equal(t, bstream{
stream: []byte("was reset"),
count: 0,
}, bs)
}
func TestBstreamReader(t *testing.T) {
// Write to the bit stream.
w := bstream{}

View File

@ -87,6 +87,9 @@ type Chunk interface {
// There's no strong guarantee that no samples will be appended once
// Compact() is called. Implementing this function is optional.
Compact()
// Reset resets the chunk given stream.
Reset(stream []byte)
}
type Iterable interface {
@ -303,64 +306,47 @@ func NewPool() Pool {
}
func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
var c Chunk
switch e {
case EncXOR:
c := p.xor.Get().(*XORChunk)
c.b.stream = b
c.b.count = 0
return c, nil
c = p.xor.Get().(*XORChunk)
case EncHistogram:
c := p.histogram.Get().(*HistogramChunk)
c.b.stream = b
c.b.count = 0
return c, nil
c = p.histogram.Get().(*HistogramChunk)
case EncFloatHistogram:
c := p.floatHistogram.Get().(*FloatHistogramChunk)
c.b.stream = b
c.b.count = 0
return c, nil
c = p.floatHistogram.Get().(*FloatHistogramChunk)
default:
return nil, fmt.Errorf("invalid chunk encoding %q", e)
}
return nil, fmt.Errorf("invalid chunk encoding %q", e)
c.Reset(b)
return c, nil
}
func (p *pool) Put(c Chunk) error {
var sp *sync.Pool
var ok bool
switch c.Encoding() {
case EncXOR:
xc, ok := c.(*XORChunk)
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}
xc.b.stream = nil
xc.b.count = 0
p.xor.Put(c)
_, ok = c.(*XORChunk)
sp = &p.xor
case EncHistogram:
sh, ok := c.(*HistogramChunk)
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}
sh.b.stream = nil
sh.b.count = 0
p.histogram.Put(c)
_, ok = c.(*HistogramChunk)
sp = &p.histogram
case EncFloatHistogram:
sh, ok := c.(*FloatHistogramChunk)
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}
sh.b.stream = nil
sh.b.count = 0
p.floatHistogram.Put(c)
_, ok = c.(*FloatHistogramChunk)
sp = &p.floatHistogram
default:
return fmt.Errorf("invalid chunk encoding %q", c.Encoding())
}
if !ok {
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
return nil
}
c.Reset(nil)
sp.Put(c)
return nil
}

View File

@ -110,6 +110,96 @@ func testChunk(t *testing.T, c Chunk) {
require.Equal(t, ValNone, it3.Seek(exp[len(exp)-1].t+1))
}
func TestPool(t *testing.T) {
p := NewPool()
for _, tc := range []struct {
name string
encoding Encoding
expErr error
}{
{
name: "xor",
encoding: EncXOR,
},
{
name: "histogram",
encoding: EncHistogram,
},
{
name: "float histogram",
encoding: EncFloatHistogram,
},
{
name: "invalid encoding",
encoding: EncNone,
expErr: fmt.Errorf(`invalid chunk encoding "none"`),
},
} {
t.Run(tc.name, func(t *testing.T) {
c, err := p.Get(tc.encoding, []byte("test"))
if tc.expErr != nil {
require.EqualError(t, err, tc.expErr.Error())
return
}
require.NoError(t, err)
var b *bstream
switch tc.encoding {
case EncHistogram:
b = &c.(*HistogramChunk).b
case EncFloatHistogram:
b = &c.(*FloatHistogramChunk).b
default:
b = &c.(*XORChunk).b
}
require.Equal(t, &bstream{
stream: []byte("test"),
count: 0,
}, b)
b.count = 1
require.NoError(t, p.Put(c))
require.Equal(t, &bstream{
stream: nil,
count: 0,
}, b)
})
}
t.Run("put bad chunk wrapper", func(t *testing.T) {
// When a wrapping chunk poses as an encoding it can't be converted to, Put should skip it.
c := fakeChunk{
encoding: EncXOR,
t: t,
}
require.NoError(t, p.Put(c))
})
t.Run("put invalid encoding", func(t *testing.T) {
c := fakeChunk{
encoding: EncNone,
t: t,
}
require.EqualError(t, p.Put(c), `invalid chunk encoding "none"`)
})
}
type fakeChunk struct {
Chunk
encoding Encoding
t *testing.T
}
func (c fakeChunk) Encoding() Encoding {
return c.encoding
}
func (c fakeChunk) Reset([]byte) {
c.t.Fatal("Reset should not be called")
}
func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
const samplesPerChunk = 250
var (

View File

@ -44,6 +44,10 @@ func NewFloatHistogramChunk() *FloatHistogramChunk {
return &FloatHistogramChunk{b: bstream{stream: b, count: 0}}
}
func (c *FloatHistogramChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// xorValue holds all the necessary information to encode
// and decode XOR encoded float64 values.
type xorValue struct {

View File

@ -45,6 +45,10 @@ func NewHistogramChunk() *HistogramChunk {
return &HistogramChunk{b: bstream{stream: b, count: 0}}
}
func (c *HistogramChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// Encoding returns the encoding type.
func (c *HistogramChunk) Encoding() Encoding {
return EncHistogram

View File

@ -66,6 +66,10 @@ func NewXORChunk() *XORChunk {
return &XORChunk{b: bstream{stream: b, count: 0}}
}
func (c *XORChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// Encoding returns the encoding type.
func (c *XORChunk) Encoding() Encoding {
return EncXOR
@ -171,7 +175,6 @@ func (a *xorAppender) Append(t int64, v float64) {
}
a.writeVDelta(v)
default:
tDelta = uint64(t - a.t)
dod := int64(tDelta - a.tDelta)