diff --git a/CHANGELOG.md b/CHANGELOG.md index e7314d041..850554bf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## unreleased +* [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200 * [FEATURE] Remote-Write: Add sender and receiver support for [Remote Write 2.0-rc.2](https://prometheus.io/docs/specs/remote_write_spec_2_0/) specification #14395 #14427 #14444 * [ENHANCEMENT] Remote-Write: 1.x messages against Remote Write 2.x Receivers will have now correct values for `prometheus_storage__failed_total` in case of partial errors #14444 diff --git a/config/config.go b/config/config.go index c924e3098..913983881 100644 --- a/config/config.go +++ b/config/config.go @@ -227,6 +227,9 @@ var ( DefaultExemplarsConfig = ExemplarsConfig{ MaxExemplars: 100000, } + + // DefaultOTLPConfig is the default OTLP configuration. + DefaultOTLPConfig = OTLPConfig{} ) // Config is the top-level configuration for Prometheus's config files. @@ -242,6 +245,7 @@ type Config struct { RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"` RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"` + OTLPConfig OTLPConfig `yaml:"otlp,omitempty"` } // SetDirectory joins any relative file paths with dir. @@ -1304,3 +1308,35 @@ func getGoGCEnv() int { } return DefaultRuntimeConfig.GoGC } + +// OTLPConfig is the configuration for writing to the OTLP endpoint. +type OTLPConfig struct { + PromoteResourceAttributes []string `yaml:"promote_resource_attributes,omitempty"` +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultOTLPConfig + type plain OTLPConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + + seen := map[string]struct{}{} + var err error + for i, attr := range c.PromoteResourceAttributes { + attr = strings.TrimSpace(attr) + if attr == "" { + err = errors.Join(err, fmt.Errorf("empty promoted OTel resource attribute")) + continue + } + if _, exists := seen[attr]; exists { + err = errors.Join(err, fmt.Errorf("duplicated promoted OTel resource attribute %q", attr)) + continue + } + + seen[attr] = struct{}{} + c.PromoteResourceAttributes[i] = attr + } + return err +} diff --git a/config/config_test.go b/config/config_test.go index 3c4907a46..b684fdb50 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -156,6 +156,12 @@ var expectedConf = &Config{ }, }, + OTLPConfig: OTLPConfig{ + PromoteResourceAttributes: []string{ + "k8s.cluster.name", "k8s.job.name", "k8s.namespace.name", + }, + }, + RemoteReadConfigs: []*RemoteReadConfig{ { URL: mustParseURL("http://remote1/read"), @@ -1471,6 +1477,26 @@ func TestRemoteWriteRetryOnRateLimit(t *testing.T) { require.False(t, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit) } +func TestOTLPSanitizeResourceAttributes(t *testing.T) { + t.Run("good config", func(t *testing.T) { + want, err := LoadFile(filepath.Join("testdata", "otlp_sanitize_resource_attributes.good.yml"), false, false, log.NewNopLogger()) + require.NoError(t, err) + + out, err := yaml.Marshal(want) + require.NoError(t, err) + var got Config + require.NoError(t, yaml.UnmarshalStrict(out, &got)) + + require.Equal(t, []string{"k8s.cluster.name", "k8s.job.name", "k8s.namespace.name"}, got.OTLPConfig.PromoteResourceAttributes) + }) + + t.Run("bad config", func(t *testing.T) { + _, err := LoadFile(filepath.Join("testdata", "otlp_sanitize_resource_attributes.bad.yml"), false, false, log.NewNopLogger()) + require.ErrorContains(t, err, `duplicated promoted OTel resource attribute "k8s.job.name"`) + require.ErrorContains(t, err, `empty promoted OTel resource attribute`) + }) +} + func TestLoadConfig(t *testing.T) { // Parse a valid file that sets a global scrape timeout. This tests whether parsing // an overwritten default field in the global config permanently changes the default. diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 0e0aa2bd5..56741822c 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -45,6 +45,9 @@ remote_write: headers: name: value +otlp: + promote_resource_attributes: ["k8s.cluster.name", "k8s.job.name", "k8s.namespace.name"] + remote_read: - url: http://remote1/read read_recent: true diff --git a/config/testdata/otlp_sanitize_resource_attributes.bad.yml b/config/testdata/otlp_sanitize_resource_attributes.bad.yml new file mode 100644 index 000000000..37ec5d120 --- /dev/null +++ b/config/testdata/otlp_sanitize_resource_attributes.bad.yml @@ -0,0 +1,2 @@ +otlp: + promote_resource_attributes: ["k8s.cluster.name", " k8s.job.name ", "k8s.namespace.name", "k8s.job.name", ""] diff --git a/config/testdata/otlp_sanitize_resource_attributes.good.yml b/config/testdata/otlp_sanitize_resource_attributes.good.yml new file mode 100644 index 000000000..67247e774 --- /dev/null +++ b/config/testdata/otlp_sanitize_resource_attributes.good.yml @@ -0,0 +1,2 @@ +otlp: + promote_resource_attributes: ["k8s.cluster.name", " k8s.job.name ", "k8s.namespace.name"] diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index ff24082e4..5aa57b3ba 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -152,6 +152,10 @@ alerting: remote_write: [ - ... ] +# Settings related to the OTLP receiver feature. +otlp: + [ promote_resource_attributes: [, ...] | default = [ ] ] + # Settings related to the remote read feature. remote_read: [ - ... ] diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index 257133853..f2d7ecd4e 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -65,14 +65,14 @@ type bucketBoundsData struct { bound float64 } -// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds +// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds. type byBucketBoundsData []bucketBoundsData func (m byBucketBoundsData) Len() int { return len(m) } func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound } func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] } -// ByLabelName enables the usage of sort.Sort() with a slice of labels +// ByLabelName enables the usage of sort.Sort() with a slice of labels. type ByLabelName []prompb.Label func (a ByLabelName) Len() int { return len(a) } @@ -115,14 +115,23 @@ var seps = []byte{'\xff'} // createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. // Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and // if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. -func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, +// If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels. +func createAttributes(resource pcommon.Resource, attributes pcommon.Map, settings Settings, ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label { resourceAttrs := resource.Attributes() serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID) + promotedAttrs := make([]prompb.Label, 0, len(settings.PromoteResourceAttributes)) + for _, name := range settings.PromoteResourceAttributes { + if value, exists := resourceAttrs.Get(name); exists { + promotedAttrs = append(promotedAttrs, prompb.Label{Name: name, Value: value.AsString()}) + } + } + sort.Stable(ByLabelName(promotedAttrs)) + // Calculate the maximum possible number of labels we could return so we can preallocate l - maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 + maxLabelCount := attributes.Len() + len(settings.ExternalLabels) + len(promotedAttrs) + len(extras)/2 if haveServiceName { maxLabelCount++ @@ -132,9 +141,6 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa maxLabelCount++ } - // map ensures no duplicate label name - l := make(map[string]string, maxLabelCount) - // Ensure attributes are sorted by key for consistent merging of keys which // collide when sanitized. labels := make([]prompb.Label, 0, maxLabelCount) @@ -148,6 +154,8 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa }) sort.Stable(ByLabelName(labels)) + // map ensures no duplicate label names. + l := make(map[string]string, maxLabelCount) for _, label := range labels { var finalKey = prometheustranslator.NormalizeLabel(label.Name) if existingValue, alreadyExists := l[finalKey]; alreadyExists { @@ -157,6 +165,13 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa } } + for _, lbl := range promotedAttrs { + normalized := prometheustranslator.NormalizeLabel(lbl.Name) + if _, exists := l[normalized]; !exists { + l[normalized] = lbl.Value + } + } + // Map service.name + service.namespace to job if haveServiceName { val := serviceName.AsString() @@ -169,7 +184,7 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa if haveInstanceID { l[model.InstanceLabel] = instance.AsString() } - for key, value := range externalLabels { + for key, value := range settings.ExternalLabels { // External labels have already been sanitized if _, alreadyExists := l[key]; alreadyExists { // Skip external labels if they are overridden by metric attributes @@ -232,7 +247,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) - baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) + baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false) // If the sum is unset, it indicates the _sum metric point should be // omitted @@ -408,7 +423,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDat for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) - baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) + baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false) // treat sum as a sample in an individual TimeSeries sum := &prompb.Sample{ @@ -554,7 +569,8 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta name = settings.Namespace + "_" + name } - labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name) + settings.PromoteResourceAttributes = nil + labels := createAttributes(resource, attributes, settings, identifyingAttrs, false, model.MetricNameLabel, name) haveIdentifier := false for _, l := range labels { if l.Name == model.JobLabel || l.Name == model.InstanceLabel { diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go new file mode 100644 index 000000000..c4dd781ae --- /dev/null +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go @@ -0,0 +1,161 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package prometheusremotewrite + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/prometheus/prometheus/prompb" +) + +func TestCreateAttributes(t *testing.T) { + resourceAttrs := map[string]string{ + "service.name": "service name", + "service.instance.id": "service ID", + "existent-attr": "resource value", + // This one is for testing conflict with metric attribute. + "metric-attr": "resource value", + // This one is for testing conflict with auto-generated job attribute. + "job": "resource value", + // This one is for testing conflict with auto-generated instance attribute. + "instance": "resource value", + } + + resource := pcommon.NewResource() + for k, v := range resourceAttrs { + resource.Attributes().PutStr(k, v) + } + attrs := pcommon.NewMap() + attrs.PutStr("__name__", "test_metric") + attrs.PutStr("metric-attr", "metric value") + + testCases := []struct { + name string + promoteResourceAttributes []string + expectedLabels []prompb.Label + }{ + { + name: "Successful conversion without resource attribute promotion", + promoteResourceAttributes: nil, + expectedLabels: []prompb.Label{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "instance", + Value: "service ID", + }, + { + Name: "job", + Value: "service name", + }, + { + Name: "metric_attr", + Value: "metric value", + }, + }, + }, + { + name: "Successful conversion with resource attribute promotion", + promoteResourceAttributes: []string{"non-existent-attr", "existent-attr"}, + expectedLabels: []prompb.Label{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "instance", + Value: "service ID", + }, + { + Name: "job", + Value: "service name", + }, + { + Name: "metric_attr", + Value: "metric value", + }, + { + Name: "existent_attr", + Value: "resource value", + }, + }, + }, + { + name: "Successful conversion with resource attribute promotion, conflicting resource attributes are ignored", + promoteResourceAttributes: []string{"non-existent-attr", "existent-attr", "metric-attr", "job", "instance"}, + expectedLabels: []prompb.Label{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "instance", + Value: "service ID", + }, + { + Name: "job", + Value: "service name", + }, + { + Name: "existent_attr", + Value: "resource value", + }, + { + Name: "metric_attr", + Value: "metric value", + }, + }, + }, + { + name: "Successful conversion with resource attribute promotion, attributes are only promoted once", + promoteResourceAttributes: []string{"existent-attr", "existent-attr"}, + expectedLabels: []prompb.Label{ + { + Name: "__name__", + Value: "test_metric", + }, + { + Name: "instance", + Value: "service ID", + }, + { + Name: "job", + Value: "service name", + }, + { + Name: "existent_attr", + Value: "resource value", + }, + { + Name: "metric_attr", + Value: "metric value", + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + settings := Settings{ + PromoteResourceAttributes: tc.promoteResourceAttributes, + } + lbls := createAttributes(resource, attrs, settings, nil, false) + + assert.ElementsMatch(t, lbls, tc.expectedLabels) + }) + } +} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index 21b3f5dd9..73528019d 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -45,7 +45,7 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetr lbls := createAttributes( resource, pt.Attributes(), - settings.ExternalLabels, + settings, nil, true, model.MetricNameLabel, diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 65dac99c5..a3a789723 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -30,12 +30,13 @@ import ( ) type Settings struct { - Namespace string - ExternalLabels map[string]string - DisableTargetInfo bool - ExportCreatedMetric bool - AddMetricSuffixes bool - SendMetadata bool + Namespace string + ExternalLabels map[string]string + DisableTargetInfo bool + ExportCreatedMetric bool + AddMetricSuffixes bool + SendMetadata bool + PromoteResourceAttributes []string } // PrometheusConverter converts from OTel write format to Prometheus remote write format. diff --git a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go index aafebc6c4..80ccb46c7 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go @@ -34,7 +34,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.Number labels := createAttributes( resource, pt.Attributes(), - settings.ExternalLabels, + settings, nil, true, model.MetricNameLabel, @@ -64,7 +64,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa lbls := createAttributes( resource, pt.Attributes(), - settings.ExternalLabels, + settings, nil, true, model.MetricNameLabel, diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 6756bf0ab..aba79a561 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -472,21 +472,23 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // writes them to the provided appendable. -func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler { +func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler { rwHandler := &writeHandler{ logger: logger, appendable: appendable, } return &otlpWriteHandler{ - logger: logger, - rwHandler: rwHandler, + logger: logger, + rwHandler: rwHandler, + configFunc: configFunc, } } type otlpWriteHandler struct { - logger log.Logger - rwHandler *writeHandler + logger log.Logger + rwHandler *writeHandler + configFunc func() config.Config } func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -497,9 +499,12 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + otlpCfg := h.configFunc().OTLPConfig + converter := otlptranslator.NewPrometheusConverter() if err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{ - AddMetricSuffixes: true, + AddMetricSuffixes: true, + PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes, }); err != nil { level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) } diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index 6e7422a58..83dfffbae 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -379,7 +379,11 @@ func TestOTLPWriteHandler(t *testing.T) { req.Header.Set("Content-Type", "application/x-protobuf") appendable := &mockAppendable{} - handler := NewOTLPWriteHandler(nil, appendable) + handler := NewOTLPWriteHandler(nil, appendable, func() config.Config { + return config.Config{ + OTLPConfig: config.DefaultOTLPConfig, + } + }) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 03854787f..d58be211f 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -295,7 +295,7 @@ func NewAPI( a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs) } if otlpEnabled { - a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) + a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc) } return a diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 86a57ca08..ba38ddc97 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -359,6 +359,7 @@ var samplePrometheusCfg = config.Config{ ScrapeConfigs: []*config.ScrapeConfig{}, RemoteWriteConfigs: []*config.RemoteWriteConfig{}, RemoteReadConfigs: []*config.RemoteReadConfig{}, + OTLPConfig: config.OTLPConfig{}, } var sampleFlagMap = map[string]string{