OTLP: Support context cancellation/timeout during translation (#14612)

* OTLP: Support context cancellation/timeout during translation

---------

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen 2024-09-08 17:13:40 +02:00 committed by GitHub
parent db5e48dc33
commit 4fc562f9e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 239 additions and 30 deletions

View File

@ -4,6 +4,7 @@
* [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200
* [ENHANCEMENT] OTLP receiver: Warn when encountering exponential histograms with zero count and non-zero sum. #14706
* [ENHANCEMENT] OTLP receiver: Interrupt translation on context cancellation/timeout. #14612
* [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042
## 2.54.1 / 2024-08-27

View File

@ -0,0 +1,37 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheusremotewrite
import "context"
// everyNTimes supports checking for context error every n times.
type everyNTimes struct {
n int
i int
err error
}
// checkContext calls ctx.Err() every e.n times and returns an eventual error.
func (e *everyNTimes) checkContext(ctx context.Context) error {
if e.err != nil {
return e.err
}
e.i++
if e.i >= e.n {
e.i = 0
e.err = ctx.Err()
}
return e.err
}

View File

@ -0,0 +1,40 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheusremotewrite
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func TestEveryNTimes(t *testing.T) {
const n = 128
ctx, cancel := context.WithCancel(context.Background())
e := &everyNTimes{
n: n,
}
for i := 0; i < n; i++ {
require.NoError(t, e.checkContext(ctx))
}
cancel()
for i := 0; i < n-1; i++ {
require.NoError(t, e.checkContext(ctx))
}
require.EqualError(t, e.checkContext(ctx), context.Canceled.Error())
// e should remember the error.
require.EqualError(t, e.checkContext(ctx), context.Canceled.Error())
}

View File

@ -17,6 +17,7 @@
package prometheusremotewrite
import (
"context"
"encoding/hex"
"fmt"
"log"
@ -241,9 +242,13 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool {
// with the user defined bucket boundaries of non-exponential OTel histograms.
// However, work is under way to resolve this shortcoming through a feature called native histograms custom buckets:
// https://github.com/prometheus/prometheus/issues/13485.
func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice,
resource pcommon.Resource, settings Settings, baseName string) {
func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice,
resource pcommon.Resource, settings Settings, baseName string) error {
for x := 0; x < dataPoints.Len(); x++ {
if err := c.everyN.checkContext(ctx); err != nil {
return err
}
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false)
@ -284,6 +289,10 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra
// process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1
for i := 0; i < pt.ExplicitBounds().Len() && i < pt.BucketCounts().Len(); i++ {
if err := c.everyN.checkContext(ctx); err != nil {
return err
}
bound := pt.ExplicitBounds().At(i)
cumulativeCount += pt.BucketCounts().At(i)
bucket := &prompb.Sample{
@ -312,7 +321,9 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra
ts := c.addSample(infBucket, infLabels)
bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: math.Inf(1)})
c.addExemplars(pt, bucketBounds)
if err := c.addExemplars(ctx, pt, bucketBounds); err != nil {
return err
}
startTimestamp := pt.StartTimestamp()
if settings.ExportCreatedMetric && startTimestamp != 0 {
@ -320,6 +331,8 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra
c.addTimeSeriesIfNeeded(labels, startTimestamp, pt.Timestamp())
}
}
return nil
}
type exemplarType interface {
@ -327,9 +340,13 @@ type exemplarType interface {
Exemplars() pmetric.ExemplarSlice
}
func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
func getPromExemplars[T exemplarType](ctx context.Context, everyN *everyNTimes, pt T) ([]prompb.Exemplar, error) {
promExemplars := make([]prompb.Exemplar, 0, pt.Exemplars().Len())
for i := 0; i < pt.Exemplars().Len(); i++ {
if err := everyN.checkContext(ctx); err != nil {
return nil, err
}
exemplar := pt.Exemplars().At(i)
exemplarRunes := 0
@ -379,7 +396,7 @@ func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
promExemplars = append(promExemplars, promExemplar)
}
return promExemplars
return promExemplars, nil
}
// mostRecentTimestampInMetric returns the latest timestamp in a batch of metrics
@ -417,9 +434,13 @@ func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp {
return ts
}
func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
settings Settings, baseName string) {
func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
settings Settings, baseName string) error {
for x := 0; x < dataPoints.Len(); x++ {
if err := c.everyN.checkContext(ctx); err != nil {
return err
}
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false)
@ -468,6 +489,8 @@ func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDat
c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp())
}
}
return nil
}
// createLabels returns a copy of baseLabels, adding to it the pair model.MetricNameLabel=name.

View File

@ -17,6 +17,7 @@
package prometheusremotewrite
import (
"context"
"testing"
"time"
@ -280,6 +281,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
converter := NewPrometheusConverter()
converter.addSummaryDataPoints(
context.Background(),
metric.Summary().DataPoints(),
pcommon.NewResource(),
Settings{
@ -390,6 +392,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
converter := NewPrometheusConverter()
converter.addHistogramDataPoints(
context.Background(),
metric.Histogram().DataPoints(),
pcommon.NewResource(),
Settings{

View File

@ -17,6 +17,7 @@
package prometheusremotewrite
import (
"context"
"fmt"
"math"
@ -33,10 +34,14 @@ const defaultZeroThreshold = 1e-128
// addExponentialHistogramDataPoints adds OTel exponential histogram data points to the corresponding time series
// as native histogram samples.
func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice,
func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Context, dataPoints pmetric.ExponentialHistogramDataPointSlice,
resource pcommon.Resource, settings Settings, promName string) (annotations.Annotations, error) {
var annots annotations.Annotations
for x := 0; x < dataPoints.Len(); x++ {
if err := c.everyN.checkContext(ctx); err != nil {
return annots, err
}
pt := dataPoints.At(x)
histogram, ws, err := exponentialToNativeHistogram(pt)
@ -57,15 +62,18 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetr
ts, _ := c.getOrCreateTimeSeries(lbls)
ts.Histograms = append(ts.Histograms, histogram)
exemplars := getPromExemplars[pmetric.ExponentialHistogramDataPoint](pt)
exemplars, err := getPromExemplars[pmetric.ExponentialHistogramDataPoint](ctx, &c.everyN, pt)
if err != nil {
return annots, err
}
ts.Exemplars = append(ts.Exemplars, exemplars...)
}
return annots, nil
}
// exponentialToNativeHistogram translates OTel Exponential Histogram data point
// to Prometheus Native Histogram.
// exponentialToNativeHistogram translates an OTel Exponential Histogram data point
// to a Prometheus Native Histogram.
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) {
var annots annotations.Annotations
scale := p.Scale()

View File

@ -17,6 +17,7 @@
package prometheusremotewrite
import (
"context"
"fmt"
"testing"
"time"
@ -754,6 +755,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) {
converter := NewPrometheusConverter()
annots, err := converter.addExponentialHistogramDataPoints(
context.Background(),
metric.ExponentialHistogram().DataPoints(),
pcommon.NewResource(),
Settings{

View File

@ -17,6 +17,7 @@
package prometheusremotewrite
import (
"context"
"errors"
"fmt"
"sort"
@ -44,6 +45,7 @@ type Settings struct {
type PrometheusConverter struct {
unique map[uint64]*prompb.TimeSeries
conflicts map[uint64][]*prompb.TimeSeries
everyN everyNTimes
}
func NewPrometheusConverter() *PrometheusConverter {
@ -54,7 +56,8 @@ func NewPrometheusConverter() *PrometheusConverter {
}
// FromMetrics converts pmetric.Metrics to Prometheus remote write format.
func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) {
func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) {
c.everyN = everyNTimes{n: 128}
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
@ -68,6 +71,11 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings)
// TODO: decide if instrumentation library information should be exported as labels
for k := 0; k < metricSlice.Len(); k++ {
if err := c.everyN.checkContext(ctx); err != nil {
errs = multierr.Append(errs, err)
return
}
metric := metricSlice.At(k)
mostRecentTimestamp = max(mostRecentTimestamp, mostRecentTimestampInMetric(metric))
@ -87,21 +95,36 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings)
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
c.addGaugeNumberDataPoints(dataPoints, resource, settings, promName)
if err := c.addGaugeNumberDataPoints(ctx, dataPoints, resource, settings, promName); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
case pmetric.MetricTypeSum:
dataPoints := metric.Sum().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
c.addSumNumberDataPoints(dataPoints, resource, metric, settings, promName)
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, promName); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
c.addHistogramDataPoints(dataPoints, resource, settings, promName)
if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, promName); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
case pmetric.MetricTypeExponentialHistogram:
dataPoints := metric.ExponentialHistogram().DataPoints()
if dataPoints.Len() == 0 {
@ -109,20 +132,31 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings)
break
}
ws, err := c.addExponentialHistogramDataPoints(
ctx,
dataPoints,
resource,
settings,
promName,
)
annots.Merge(ws)
errs = multierr.Append(errs, err)
if err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
c.addSummaryDataPoints(dataPoints, resource, settings, promName)
if err := c.addSummaryDataPoints(ctx, dataPoints, resource, settings, promName); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
default:
errs = multierr.Append(errs, errors.New("unsupported metric type"))
}
@ -148,25 +182,33 @@ func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool {
// addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value,
// the exemplar is added to the bucket bound's time series, provided that the time series' has samples.
func (c *PrometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) {
func (c *PrometheusConverter) addExemplars(ctx context.Context, dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) error {
if len(bucketBounds) == 0 {
return
return nil
}
exemplars := getPromExemplars(dataPoint)
exemplars, err := getPromExemplars(ctx, &c.everyN, dataPoint)
if err != nil {
return err
}
if len(exemplars) == 0 {
return
return nil
}
sort.Sort(byBucketBoundsData(bucketBounds))
for _, exemplar := range exemplars {
for _, bound := range bucketBounds {
if err := c.everyN.checkContext(ctx); err != nil {
return err
}
if len(bound.ts.Samples) > 0 && exemplar.Value <= bound.bound {
bound.ts.Exemplars = append(bound.ts.Exemplars, exemplar)
break
}
}
}
return nil
}
// addSample finds a TimeSeries that corresponds to lbls, and adds sample to it.

View File

@ -17,6 +17,7 @@
package prometheusremotewrite
import (
"context"
"fmt"
"testing"
"time"
@ -28,6 +29,39 @@ import (
)
func TestFromMetrics(t *testing.T) {
t.Run("successful", func(t *testing.T) {
converter := NewPrometheusConverter()
payload := createExportRequest(5, 128, 128, 2, 0)
annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), Settings{})
require.NoError(t, err)
require.Empty(t, annots)
})
t.Run("context cancellation", func(t *testing.T) {
converter := NewPrometheusConverter()
ctx, cancel := context.WithCancel(context.Background())
// Verify that converter.FromMetrics respects cancellation.
cancel()
payload := createExportRequest(5, 128, 128, 2, 0)
annots, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{})
require.ErrorIs(t, err, context.Canceled)
require.Empty(t, annots)
})
t.Run("context timeout", func(t *testing.T) {
converter := NewPrometheusConverter()
// Verify that converter.FromMetrics respects timeout.
ctx, cancel := context.WithTimeout(context.Background(), 0)
t.Cleanup(cancel)
payload := createExportRequest(5, 128, 128, 2, 0)
annots, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{})
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Empty(t, annots)
})
t.Run("exponential histogram warnings for zero count and non-zero sum", func(t *testing.T) {
request := pmetricotlp.NewExportRequest()
rm := request.Metrics().ResourceMetrics().AppendEmpty()
@ -51,7 +85,7 @@ func TestFromMetrics(t *testing.T) {
}
converter := NewPrometheusConverter()
annots, err := converter.FromMetrics(request.Metrics(), Settings{})
annots, err := converter.FromMetrics(context.Background(), request.Metrics(), Settings{})
require.NoError(t, err)
require.NotEmpty(t, annots)
ws, infos := annots.AsStrings("", 0, 0)
@ -84,7 +118,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
for i := 0; i < b.N; i++ {
converter := NewPrometheusConverter()
annots, err := converter.FromMetrics(payload.Metrics(), Settings{})
annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), Settings{})
require.NoError(b, err)
require.Empty(b, annots)
require.NotNil(b, converter.TimeSeries())

View File

@ -17,6 +17,7 @@
package prometheusremotewrite
import (
"context"
"math"
"github.com/prometheus/common/model"
@ -27,9 +28,13 @@ import (
"github.com/prometheus/prometheus/prompb"
)
func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice,
resource pcommon.Resource, settings Settings, name string) {
func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice,
resource pcommon.Resource, settings Settings, name string) error {
for x := 0; x < dataPoints.Len(); x++ {
if err := c.everyN.checkContext(ctx); err != nil {
return err
}
pt := dataPoints.At(x)
labels := createAttributes(
resource,
@ -55,11 +60,17 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.Number
}
c.addSample(sample, labels)
}
return nil
}
func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDataPointSlice,
resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string) {
func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice,
resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string) error {
for x := 0; x < dataPoints.Len(); x++ {
if err := c.everyN.checkContext(ctx); err != nil {
return err
}
pt := dataPoints.At(x)
lbls := createAttributes(
resource,
@ -85,7 +96,10 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa
}
ts := c.addSample(sample, lbls)
if ts != nil {
exemplars := getPromExemplars[pmetric.NumberDataPoint](pt)
exemplars, err := getPromExemplars[pmetric.NumberDataPoint](ctx, &c.everyN, pt)
if err != nil {
return err
}
ts.Exemplars = append(ts.Exemplars, exemplars...)
}
@ -93,7 +107,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa
if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() {
startTimestamp := pt.StartTimestamp()
if startTimestamp == 0 {
return
return nil
}
createdLabels := make([]prompb.Label, len(lbls))
@ -107,4 +121,6 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa
c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp())
}
}
return nil
}

View File

@ -17,6 +17,7 @@
package prometheusremotewrite
import (
"context"
"testing"
"time"
@ -66,6 +67,7 @@ func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) {
converter := NewPrometheusConverter()
converter.addGaugeNumberDataPoints(
context.Background(),
metric.Gauge().DataPoints(),
pcommon.NewResource(),
Settings{
@ -242,6 +244,7 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) {
converter := NewPrometheusConverter()
converter.addSumNumberDataPoints(
context.Background(),
metric.Sum().DataPoints(),
pcommon.NewResource(),
metric,

View File

@ -512,7 +512,7 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
otlpCfg := h.configFunc().OTLPConfig
converter := otlptranslator.NewPrometheusConverter()
annots, err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{
annots, err := converter.FromMetrics(r.Context(), req.Metrics(), otlptranslator.Settings{
AddMetricSuffixes: true,
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
})