From f6d9c84fde6b0fc89019590d4081876c7d93d609 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 9 Oct 2023 17:23:53 +0100 Subject: [PATCH] scraping: delay creating buffer, to save memory (#12953) We don't need the buffer to read the response until the scrape http call returns; creating it earlier makes the buffer pool larger. I split `scrape()` into `scrape()` which returns with the http response, and `readResponse()` which decompresses and copies the data into the supplied buffer. This design was chosen to minimize impact on the logic. Signed-off-by: Bryan Boreham --- scrape/scrape.go | 41 ++++++++++++++++++++++++----------------- scrape/scrape_test.go | 32 ++++++++++++++++++++++++-------- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index e40ae81d2..299ffdc98 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -785,7 +785,8 @@ func appender(app storage.Appender, sampleLimit, bucketLimit int) storage.Append // A scraper retrieves samples and accepts a status report at the end. type scraper interface { - scrape(ctx context.Context, w io.Writer) (string, error) + scrape(ctx context.Context) (*http.Response, error) + readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) Report(start time.Time, dur time.Duration, err error) offset(interval time.Duration, offsetSeed uint64) time.Duration } @@ -814,11 +815,11 @@ const ( var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) -func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) { +func (s *targetScraper) scrape(ctx context.Context) (*http.Response, error) { if s.req == nil { req, err := http.NewRequest("GET", s.URL().String(), nil) if err != nil { - return "", err + return nil, err } req.Header.Add("Accept", s.acceptHeader) req.Header.Add("Accept-Encoding", "gzip") @@ -828,10 +829,10 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) s.req = req } - resp, err := s.client.Do(s.req.WithContext(ctx)) - if err != nil { - return "", err - } + return s.client.Do(s.req.WithContext(ctx)) +} + +func (s *targetScraper) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) { defer func() { io.Copy(io.Discard, resp.Body) resp.Body.Close() @@ -858,13 +859,14 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) if s.gzipr == nil { s.buf = bufio.NewReader(resp.Body) + var err error s.gzipr, err = gzip.NewReader(s.buf) if err != nil { return "", err } } else { s.buf.Reset(resp.Body) - if err = s.gzipr.Reset(s.buf); err != nil { + if err := s.gzipr.Reset(s.buf); err != nil { return "", err } } @@ -1326,11 +1328,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er ) } - b := sl.buffers.Get(sl.lastScrapeSize).([]byte) - defer sl.buffers.Put(b) - buf := bytes.NewBuffer(b) - - var total, added, seriesAdded, bytes int + var total, added, seriesAdded, bytesRead int var err, appErr, scrapeErr error app := sl.appender(sl.appenderCtx) @@ -1346,7 +1344,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er }() defer func() { - if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, bytes, scrapeErr); err != nil { + if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, bytesRead, scrapeErr); err != nil { level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err) } }() @@ -1367,8 +1365,17 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er } var contentType string + var resp *http.Response + var b []byte + var buf *bytes.Buffer scrapeCtx, cancel := context.WithTimeout(sl.parentCtx, sl.timeout) - contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf) + resp, scrapeErr = sl.scraper.scrape(scrapeCtx) + if scrapeErr == nil { + b = sl.buffers.Get(sl.lastScrapeSize).([]byte) + defer sl.buffers.Put(b) + buf = bytes.NewBuffer(b) + contentType, scrapeErr = sl.scraper.readResponse(scrapeCtx, resp, buf) + } cancel() if scrapeErr == nil { @@ -1379,14 +1386,14 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er if len(b) > 0 { sl.lastScrapeSize = len(b) } - bytes = len(b) + bytesRead = len(b) } else { level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr) if errc != nil { errc <- scrapeErr } if errors.Is(scrapeErr, errBodySizeLimit) { - bytes = -1 + bytesRead = -1 } } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 3b7d6a7ab..6c09b95e5 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -2619,7 +2619,9 @@ func TestTargetScraperScrapeOK(t *testing.T) { } var buf bytes.Buffer - contentType, err := ts.scrape(context.Background(), &buf) + resp, err := ts.scrape(context.Background()) + require.NoError(t, err) + contentType, err := ts.readResponse(context.Background(), resp, &buf) require.NoError(t, err) require.Equal(t, "text/plain; version=0.0.4", contentType) require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String()) @@ -2665,7 +2667,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) { }() go func() { - _, err := ts.scrape(ctx, io.Discard) + _, err := ts.scrape(ctx) switch { case err == nil: errc <- errors.New("Expected error but got nil") @@ -2711,7 +2713,9 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) { acceptHeader: scrapeAcceptHeader, } - _, err = ts.scrape(context.Background(), io.Discard) + resp, err := ts.scrape(context.Background()) + require.NoError(t, err) + _, err = ts.readResponse(context.Background(), resp, io.Discard) require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err) } @@ -2755,26 +2759,34 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { var buf bytes.Buffer // Target response uncompressed body, scrape with body size limit. - _, err = ts.scrape(context.Background(), &buf) + resp, err := ts.scrape(context.Background()) + require.NoError(t, err) + _, err = ts.readResponse(context.Background(), resp, &buf) require.ErrorIs(t, err, errBodySizeLimit) require.Equal(t, bodySizeLimit, buf.Len()) // Target response gzip compressed body, scrape with body size limit. gzipResponse = true buf.Reset() - _, err = ts.scrape(context.Background(), &buf) + resp, err = ts.scrape(context.Background()) + require.NoError(t, err) + _, err = ts.readResponse(context.Background(), resp, &buf) require.ErrorIs(t, err, errBodySizeLimit) require.Equal(t, bodySizeLimit, buf.Len()) // Target response uncompressed body, scrape without body size limit. gzipResponse = false buf.Reset() ts.bodySizeLimit = 0 - _, err = ts.scrape(context.Background(), &buf) + resp, err = ts.scrape(context.Background()) + require.NoError(t, err) + _, err = ts.readResponse(context.Background(), resp, &buf) require.NoError(t, err) require.Equal(t, len(responseBody), buf.Len()) // Target response gzip compressed body, scrape without body size limit. gzipResponse = true buf.Reset() - _, err = ts.scrape(context.Background(), &buf) + resp, err = ts.scrape(context.Background()) + require.NoError(t, err) + _, err = ts.readResponse(context.Background(), resp, &buf) require.NoError(t, err) require.Equal(t, len(responseBody), buf.Len()) } @@ -2802,7 +2814,11 @@ func (ts *testScraper) Report(start time.Time, duration time.Duration, err error ts.lastError = err } -func (ts *testScraper) scrape(ctx context.Context, w io.Writer) (string, error) { +func (ts *testScraper) scrape(ctx context.Context) (*http.Response, error) { + return nil, ts.scrapeErr +} + +func (ts *testScraper) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) { if ts.scrapeFunc != nil { return "", ts.scrapeFunc(ctx, w) }