From fafb7940b1cdb24cada28a713fbcc01ed1e2d585 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Wed, 22 Jan 2020 13:13:47 +0100 Subject: [PATCH] Pass over scrape cache to the next scrape (#6670) * Pass over scrape cache to the next scrape Signed-off-by: Julien Pivotto --- scrape/scrape.go | 42 +++++++++- scrape/scrape_test.go | 191 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+), 1 deletion(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index 303d7974d..60e3a3015 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -23,6 +23,7 @@ import ( "io/ioutil" "math" "net/http" + "reflect" "sync" "time" "unsafe" @@ -176,6 +177,7 @@ type scrapeLoopOptions struct { honorLabels bool honorTimestamps bool mrc []*relabel.Config + cache *scrapeCache } const maxAheadTime = 10 * time.Minute @@ -208,7 +210,10 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, jitterSeed uint64, } sp.newLoop = func(opts scrapeLoopOptions) loop { // Update the targets retrieval function for metadata to a new scrape cache. - cache := newScrapeCache() + cache := opts.cache + if cache == nil { + cache = newScrapeCache() + } opts.target.SetMetadataStore(cache) return newScrapeLoop( @@ -291,6 +296,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { targetScrapePoolReloadsFailed.Inc() return errors.Wrap(err, "error creating HTTP client") } + + reuseCache := reusableCache(sp.config, cfg) sp.config = cfg oldClient := sp.client sp.client = client @@ -306,6 +313,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { ) for fp, oldLoop := range sp.loops { + var cache *scrapeCache + if oc := oldLoop.getCache(); reuseCache && oc != nil { + cache = oc + } else { + cache = newScrapeCache() + } var ( t = sp.activeTargets[fp] s = &targetScraper{Target: t, client: sp.client, timeout: timeout} @@ -316,6 +329,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { honorLabels: honorLabels, honorTimestamps: honorTimestamps, mrc: mrc, + cache: cache, }) ) wg.Add(1) @@ -579,6 +593,7 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) type loop interface { run(interval, timeout time.Duration, errc chan<- error) stop() + getCache() *scrapeCache } type cacheEntry struct { @@ -1016,6 +1031,10 @@ func (sl *scrapeLoop) stop() { <-sl.stopped } +func (sl *scrapeLoop) getCache() *scrapeCache { + return sl.cache +} + func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { var ( app = sl.appender() @@ -1312,3 +1331,24 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v return err } } + +// zeroConfig returns a new scrape config that only contains configuration items +// that alter metrics. +func zeroConfig(c *config.ScrapeConfig) *config.ScrapeConfig { + z := *c + // We zero out the fields that for sure don't affect scrape. + z.ScrapeInterval = 0 + z.ScrapeTimeout = 0 + z.SampleLimit = 0 + z.HTTPClientConfig = config_util.HTTPClientConfig{} + return &z +} + +// reusableCache compares two scrape config and tells wheter the cache is still +// valid. +func reusableCache(r, l *config.ScrapeConfig) bool { + if r == nil || l == nil { + return false + } + return reflect.DeepEqual(zeroConfig(r), zeroConfig(l)) +} diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index dfde91ddf..5641d5de6 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -30,6 +30,7 @@ import ( "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" + config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -144,6 +145,10 @@ func (l *testLoop) stop() { l.stopFunc() } +func (l *testLoop) getCache() *scrapeCache { + return nil +} + func TestScrapePoolStop(t *testing.T) { sp := &scrapePool{ activeTargets: map[uint64]*Target{}, @@ -1576,3 +1581,189 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { testutil.Equals(t, true, series.Next(), "series not found in tsdb") testutil.Equals(t, false, series.Next(), "more than one series found in tsdb") } + +func TestReusableConfig(t *testing.T) { + variants := []*config.ScrapeConfig{ + &config.ScrapeConfig{ + JobName: "prometheus", + ScrapeTimeout: model.Duration(15 * time.Second), + }, + &config.ScrapeConfig{ + JobName: "httpd", + ScrapeTimeout: model.Duration(15 * time.Second), + }, + &config.ScrapeConfig{ + JobName: "prometheus", + ScrapeTimeout: model.Duration(5 * time.Second), + }, + &config.ScrapeConfig{ + JobName: "prometheus", + MetricsPath: "/metrics", + }, + &config.ScrapeConfig{ + JobName: "prometheus", + MetricsPath: "/metrics2", + }, + &config.ScrapeConfig{ + JobName: "prometheus", + ScrapeTimeout: model.Duration(5 * time.Second), + MetricsPath: "/metrics2", + }, + &config.ScrapeConfig{ + JobName: "prometheus", + ScrapeInterval: model.Duration(5 * time.Second), + MetricsPath: "/metrics2", + }, + &config.ScrapeConfig{ + JobName: "prometheus", + ScrapeInterval: model.Duration(5 * time.Second), + SampleLimit: 1000, + MetricsPath: "/metrics2", + }, + } + + match := [][]int{ + []int{0, 2}, + []int{4, 5}, + []int{4, 6}, + []int{4, 7}, + []int{5, 6}, + []int{5, 7}, + []int{6, 7}, + } + noMatch := [][]int{ + []int{1, 2}, + []int{0, 4}, + []int{3, 4}, + } + + for i, m := range match { + testutil.Equals(t, true, reusableCache(variants[m[0]], variants[m[1]]), "match test %d", i) + testutil.Equals(t, true, reusableCache(variants[m[1]], variants[m[0]]), "match test %d", i) + testutil.Equals(t, true, reusableCache(variants[m[1]], variants[m[1]]), "match test %d", i) + testutil.Equals(t, true, reusableCache(variants[m[0]], variants[m[0]]), "match test %d", i) + } + for i, m := range noMatch { + testutil.Equals(t, false, reusableCache(variants[m[0]], variants[m[1]]), "not match test %d", i) + testutil.Equals(t, false, reusableCache(variants[m[1]], variants[m[0]]), "not match test %d", i) + } +} + +func TestReuseScrapeCache(t *testing.T) { + var ( + app = &nopAppendable{} + cfg = &config.ScrapeConfig{ + JobName: "Prometheus", + ScrapeTimeout: model.Duration(5 * time.Second), + ScrapeInterval: model.Duration(5 * time.Second), + MetricsPath: "/metrics", + } + sp, _ = newScrapePool(cfg, app, 0, nil) + t1 = &Target{ + discoveredLabels: labels.Labels{ + labels.Label{ + Name: "labelNew", + Value: "nameNew", + }, + }, + } + proxyURL, _ = url.Parse("http://localhost:2128") + ) + sp.sync([]*Target{t1}) + + steps := []struct { + keep bool + newConfig *config.ScrapeConfig + }{ + { + keep: true, + newConfig: &config.ScrapeConfig{ + JobName: "Prometheus", + ScrapeInterval: model.Duration(5 * time.Second), + ScrapeTimeout: model.Duration(5 * time.Second), + MetricsPath: "/metrics", + }, + }, + { + keep: false, + newConfig: &config.ScrapeConfig{ + JobName: "Prometheus", + ScrapeInterval: model.Duration(5 * time.Second), + ScrapeTimeout: model.Duration(15 * time.Second), + MetricsPath: "/metrics2", + }, + }, + { + keep: true, + newConfig: &config.ScrapeConfig{ + JobName: "Prometheus", + SampleLimit: 400, + ScrapeInterval: model.Duration(5 * time.Second), + ScrapeTimeout: model.Duration(15 * time.Second), + MetricsPath: "/metrics2", + }, + }, + { + keep: false, + newConfig: &config.ScrapeConfig{ + JobName: "Prometheus", + HonorTimestamps: true, + SampleLimit: 400, + ScrapeInterval: model.Duration(5 * time.Second), + ScrapeTimeout: model.Duration(15 * time.Second), + MetricsPath: "/metrics2", + }, + }, + { + keep: true, + newConfig: &config.ScrapeConfig{ + JobName: "Prometheus", + HonorTimestamps: true, + SampleLimit: 400, + HTTPClientConfig: config_util.HTTPClientConfig{ + ProxyURL: config_util.URL{URL: proxyURL}, + }, + ScrapeInterval: model.Duration(5 * time.Second), + ScrapeTimeout: model.Duration(15 * time.Second), + MetricsPath: "/metrics2", + }, + }, + { + keep: false, + newConfig: &config.ScrapeConfig{ + JobName: "Prometheus", + HonorTimestamps: true, + HonorLabels: true, + SampleLimit: 400, + ScrapeInterval: model.Duration(5 * time.Second), + ScrapeTimeout: model.Duration(15 * time.Second), + MetricsPath: "/metrics2", + }, + }, + } + + cacheAddr := func(sp *scrapePool) map[uint64]string { + r := make(map[uint64]string) + for fp, l := range sp.loops { + r[fp] = fmt.Sprintf("%p", l.getCache()) + } + return r + } + + for i, s := range steps { + initCacheAddr := cacheAddr(sp) + sp.reload(s.newConfig) + for fp, newCacheAddr := range cacheAddr(sp) { + if s.keep { + testutil.Assert(t, initCacheAddr[fp] == newCacheAddr, "step %d: old cache and new cache are not the same", i) + } else { + testutil.Assert(t, initCacheAddr[fp] != newCacheAddr, "step %d: old cache and new cache are the same", i) + } + } + initCacheAddr = cacheAddr(sp) + sp.reload(s.newConfig) + for fp, newCacheAddr := range cacheAddr(sp) { + testutil.Assert(t, initCacheAddr[fp] == newCacheAddr, "step %d: reloading the exact config invalidates the cache", i) + } + } +}