diff --git a/retrieval/scrape.go b/retrieval/scrape.go index e9492acbb..5391eb3fe 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -14,12 +14,16 @@ package retrieval import ( + "bufio" "bytes" + "compress/gzip" "fmt" "io" "net/http" + "reflect" "sync" "time" + "unsafe" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" @@ -347,35 +351,30 @@ type scraper interface { // targetScraper implements the scraper interface for a target. type targetScraper struct { *Target + client *http.Client + req *http.Request + + gzipr *gzip.Reader + buf *bufio.Reader } const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1` -var scrapeBufPool = sync.Pool{} - -func getScrapeBuf() []byte { - b := scrapeBufPool.Get() - if b == nil { - return make([]byte, 0, 8192) - } - return b.([]byte) -} - -func putScrapeBuf(b []byte) { - b = b[:0] - scrapeBufPool.Put(b) -} - func (s *targetScraper) scrape(ctx context.Context, w io.Writer) error { - req, err := http.NewRequest("GET", s.URL().String(), nil) - if err != nil { - return err - } - // Disable accept header to always negotiate for text format. - // req.Header.Add("Accept", acceptHeader) + if s.req == nil { + req, err := http.NewRequest("GET", s.URL().String(), nil) + if err != nil { + return err + } + // Disable accept header to always negotiate for text format. + // req.Header.Add("Accept", acceptHeader) + req.Header.Add("Accept-Encoding", "gzip") - resp, err := ctxhttp.Do(ctx, s.client, req) + s.req = req + } + + resp, err := ctxhttp.Do(ctx, s.client, s.req) if err != nil { return err } @@ -385,7 +384,24 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) error { return fmt.Errorf("server returned HTTP status %s", resp.Status) } - _, err = io.Copy(w, resp.Body) + if resp.Header.Get("Content-Encoding") != "gzip" { + _, err = io.Copy(w, resp.Body) + return err + } + + if s.gzipr == nil { + s.buf = bufio.NewReader(resp.Body) + s.gzipr, err = gzip.NewReader(s.buf) + if err != nil { + return err + } + } else { + s.buf.Reset(resp.Body) + s.gzipr.Reset(s.buf) + } + + _, err = io.Copy(w, s.gzipr) + s.gzipr.Close() return err } @@ -436,7 +452,10 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { ticker := time.NewTicker(interval) defer ticker.Stop() + buf := bytes.NewBuffer(make([]byte, 0, 16000)) + for { + buf.Reset() select { case <-sl.ctx.Done(): return @@ -456,8 +475,6 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { ) } - buf := bytes.NewBuffer(getScrapeBuf()) - err := sl.scraper.scrape(scrapeCtx, buf) if err == nil { b := buf.Bytes() @@ -465,7 +482,6 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { if total, added, err = sl.append(b, start); err != nil { log.With("err", err).Error("append failed") } - putScrapeBuf(b) } else if errc != nil { errc <- err } @@ -524,7 +540,7 @@ loop: t = *tp } - mets := string(met) + mets := yoloString(met) ref, ok := sl.cache[mets] if ok { switch err = app.AddFast(ref, t, v); err { @@ -550,6 +566,8 @@ loop: default: break loop } + // Allocate a real string. + mets = string(met) sl.cache[mets] = ref } added++ @@ -567,6 +585,16 @@ loop: return total, added, nil } +func yoloString(b []byte) string { + sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + + h := reflect.StringHeader{ + Data: sh.Data, + Len: sh.Len, + } + return *((*string)(unsafe.Pointer(&h))) +} + func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error { sl.scraper.report(start, duration, err) diff --git a/retrieval/target.go b/retrieval/target.go index 4c206ad76..2cb30f888 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -78,9 +78,10 @@ func NewHTTPClient(cfg config.HTTPClientConfig) (*http.Client, error) { // The only timeout we care about is the configured scrape timeout. // It is applied on request. So we leave out any timings here. var rt http.RoundTripper = &http.Transport{ - Proxy: http.ProxyURL(cfg.ProxyURL.URL), - MaxIdleConns: 10000, - TLSClientConfig: tlsConfig, + Proxy: http.ProxyURL(cfg.ProxyURL.URL), + MaxIdleConns: 10000, + TLSClientConfig: tlsConfig, + DisableCompression: true, } // If a bearer token is provided, create a round tripper that will set the