diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b18b47710..ba7f82d09 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -198,6 +198,7 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.") case "native-histograms": c.tsdb.EnableNativeHistograms = true + c.scrape.EnableProtobufNegotiation = true level.Info(logger).Log("msg", "Experimental native histogram support enabled.") case "": continue diff --git a/scrape/manager.go b/scrape/manager.go index d8a0ce72f..3c77dac39 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -132,6 +132,9 @@ type Options struct { // Option to enable the experimental in-memory metadata storage and append // metadata to the WAL. EnableMetadataStorage bool + // Option to enable protobuf negotiation with the client. Note that the client can already + // send protobuf without needing to enable this. + EnableProtobufNegotiation bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration diff --git a/scrape/scrape.go b/scrape/scrape.go index 839f753d5..8a664bf73 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -243,6 +243,8 @@ type scrapePool struct { newLoop func(scrapeLoopOptions) loop noDefaultPort bool + + enableProtobufNegotiation bool } type labelLimits struct { @@ -284,15 +286,16 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed ctx, cancel := context.WithCancel(context.Background()) sp := &scrapePool{ - cancel: cancel, - appendable: app, - config: cfg, - client: client, - activeTargets: map[uint64]*Target{}, - loops: map[uint64]loop{}, - logger: logger, - httpOpts: options.HTTPClientOptions, - noDefaultPort: options.NoDefaultPort, + cancel: cancel, + appendable: app, + config: cfg, + client: client, + activeTargets: map[uint64]*Target{}, + loops: map[uint64]loop{}, + logger: logger, + httpOpts: options.HTTPClientOptions, + noDefaultPort: options.NoDefaultPort, + enableProtobufNegotiation: options.EnableProtobufNegotiation, } sp.newLoop = func(opts scrapeLoopOptions) loop { // Update the targets retrieval function for metadata to a new scrape cache. @@ -433,8 +436,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { t := sp.activeTargets[fp] interval, timeout, err := t.intervalAndTimeout(interval, timeout) + acceptHeader := scrapeAcceptHeader + if sp.enableProtobufNegotiation { + acceptHeader = scrapeAcceptHeaderWithProtobuf + } var ( - s = &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit} + s = &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit, acceptHeader: acceptHeader} newLoop = sp.newLoop(scrapeLoopOptions{ target: t, scraper: s, @@ -537,8 +544,11 @@ func (sp *scrapePool) sync(targets []*Target) { // for every target. var err error interval, timeout, err = t.intervalAndTimeout(interval, timeout) - - s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit} + acceptHeader := scrapeAcceptHeader + if sp.enableProtobufNegotiation { + acceptHeader = scrapeAcceptHeaderWithProtobuf + } + s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit, acceptHeader: acceptHeader} l := sp.newLoop(scrapeLoopOptions{ target: t, scraper: s, @@ -757,11 +767,15 @@ type targetScraper struct { buf *bufio.Reader bodySizeLimit int64 + acceptHeader string } var errBodySizeLimit = errors.New("body size limit exceeded") -const acceptHeader = `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1` +const ( + scrapeAcceptHeader = `encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1` + scrapeAcceptHeaderWithProtobuf = `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1` +) var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) @@ -771,7 +785,7 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) if err != nil { return "", err } - req.Header.Add("Accept", acceptHeader) + req.Header.Add("Accept", s.acceptHeader) req.Header.Add("Accept-Encoding", "gzip") req.Header.Set("User-Agent", UserAgent) req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(s.timeout.Seconds(), 'f', -1, 64)) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 376c9e9cc..27749dbc6 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -2147,11 +2147,15 @@ func TestTargetScraperScrapeOK(t *testing.T) { expectedTimeout = "1.5" ) + var protobufParsing bool + server := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - accept := r.Header.Get("Accept") - if !strings.HasPrefix(accept, "application/vnd.google.protobuf;") { - t.Errorf("Expected Accept header to prefer application/vnd.google.protobuf, got %q", accept) + if protobufParsing { + accept := r.Header.Get("Accept") + if !strings.HasPrefix(accept, "application/vnd.google.protobuf;") { + t.Errorf("Expected Accept header to prefer application/vnd.google.protobuf, got %q", accept) + } } timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds") @@ -2170,22 +2174,29 @@ func TestTargetScraperScrapeOK(t *testing.T) { panic(err) } - ts := &targetScraper{ - Target: &Target{ - labels: labels.FromStrings( - model.SchemeLabel, serverURL.Scheme, - model.AddressLabel, serverURL.Host, - ), - }, - client: http.DefaultClient, - timeout: configTimeout, - } - var buf bytes.Buffer + runTest := func(acceptHeader string) { + ts := &targetScraper{ + Target: &Target{ + labels: labels.FromStrings( + model.SchemeLabel, serverURL.Scheme, + model.AddressLabel, serverURL.Host, + ), + }, + client: http.DefaultClient, + timeout: configTimeout, + acceptHeader: acceptHeader, + } + var buf bytes.Buffer - contentType, err := ts.scrape(context.Background(), &buf) - require.NoError(t, err) - require.Equal(t, "text/plain; version=0.0.4", contentType) - require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String()) + contentType, err := ts.scrape(context.Background(), &buf) + require.NoError(t, err) + require.Equal(t, "text/plain; version=0.0.4", contentType) + require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String()) + } + + runTest(scrapeAcceptHeader) + protobufParsing = true + runTest(scrapeAcceptHeaderWithProtobuf) } func TestTargetScrapeScrapeCancel(t *testing.T) { @@ -2210,7 +2221,8 @@ func TestTargetScrapeScrapeCancel(t *testing.T) { model.AddressLabel, serverURL.Host, ), }, - client: http.DefaultClient, + client: http.DefaultClient, + acceptHeader: scrapeAcceptHeader, } ctx, cancel := context.WithCancel(context.Background()) @@ -2263,7 +2275,8 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) { model.AddressLabel, serverURL.Host, ), }, - client: http.DefaultClient, + client: http.DefaultClient, + acceptHeader: scrapeAcceptHeader, } _, err = ts.scrape(context.Background(), io.Discard) @@ -2305,6 +2318,7 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { }, client: http.DefaultClient, bodySizeLimit: bodySizeLimit, + acceptHeader: scrapeAcceptHeader, } var buf bytes.Buffer