From dd3073616c51718465e46c69492e49fd9e3d6a81 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 28 Mar 2019 17:07:14 +0000 Subject: [PATCH] Don't lose the scrape cache on a failed scrape. This avoids CPU usage increasing when the target comes back. Signed-off-by: Brian Brazil --- scrape/scrape.go | 45 ++++++++++++------------ scrape/scrape_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 103 insertions(+), 23 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index 1f14318fe..2039a7b82 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -643,28 +643,32 @@ func newScrapeCache() *scrapeCache { } } -func (c *scrapeCache) iterDone() { - // All caches may grow over time through series churn - // or multiple string representations of the same metric. Clean up entries - // that haven't appeared in the last scrape. - for s, e := range c.series { - if c.iter-e.lastIter > 2 { - delete(c.series, s) +func (c *scrapeCache) iterDone(cleanCache bool) { + if cleanCache { + // All caches may grow over time through series churn + // or multiple string representations of the same metric. Clean up entries + // that haven't appeared in the last scrape. + for s, e := range c.series { + if c.iter != e.lastIter { + delete(c.series, s) + } } - } - for s, iter := range c.droppedSeries { - if c.iter-*iter > 2 { - delete(c.droppedSeries, s) + for s, iter := range c.droppedSeries { + if c.iter != *iter { + delete(c.droppedSeries, s) + } } - } - c.metaMtx.Lock() - for m, e := range c.metadata { - // Keep metadata around for 10 scrapes after its metric disappeared. - if c.iter-e.lastIter > 10 { - delete(c.metadata, m) + c.metaMtx.Lock() + for m, e := range c.metadata { + // Keep metadata around for 10 scrapes after its metric disappeared. + if c.iter-e.lastIter > 10 { + delete(c.metadata, m) + } } + c.metaMtx.Unlock() + + c.iter++ } - c.metaMtx.Unlock() // Swap current and previous series. c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev @@ -673,8 +677,6 @@ func (c *scrapeCache) iterDone() { for k := range c.seriesCur { delete(c.seriesCur, k) } - - c.iter++ } func (c *scrapeCache) get(met string) (*cacheEntry, bool) { @@ -1110,7 +1112,6 @@ loop: var ref uint64 ref, err = app.Add(lset, t, v) - // TODO(fabxc): also add a dropped-cache? switch err { case nil: case storage.ErrOutOfOrderSample: @@ -1184,7 +1185,7 @@ loop: return total, added, err } - sl.cache.iterDone() + sl.cache.iterDone(len(b) > 0) return total, added, nil } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index ba32912dc..64a211996 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -770,7 +770,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { // 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for // each scrape successful or not. if len(appender.result) != 14 { - t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result)) + t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 14, len(appender.result)) } if appender.result[0].v != 42.0 { t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0) @@ -780,6 +780,85 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { } } +func TestScrapeLoopCache(t *testing.T) { + s := testutil.NewStorage(t) + defer s.Close() + + sapp, err := s.Appender() + if err != nil { + t.Error(err) + } + appender := &collectResultAppender{next: sapp} + var ( + signal = make(chan struct{}) + scraper = &testScraper{} + app = func() storage.Appender { return appender } + ) + defer close(signal) + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + nil, + 0, + true, + ) + + numScrapes := 0 + + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + if numScrapes == 1 || numScrapes == 2 { + if _, ok := sl.cache.series["metric_a"]; !ok { + t.Errorf("metric_a missing from cache after scrape %d", numScrapes) + } + if _, ok := sl.cache.series["metric_b"]; !ok { + t.Errorf("metric_b missing from cache after scrape %d", numScrapes) + } + } else if numScrapes == 3 { + if _, ok := sl.cache.series["metric_a"]; !ok { + t.Errorf("metric_a missing from cache after scrape %d", numScrapes) + } + if _, ok := sl.cache.series["metric_b"]; ok { + t.Errorf("metric_b present in cache after scrape %d", numScrapes) + } + } + + numScrapes++ + + if numScrapes == 1 { + w.Write([]byte("metric_a 42\nmetric_b 43\n")) + return nil + } else if numScrapes == 3 { + w.Write([]byte("metric_a 44\n")) + return nil + } else if numScrapes == 4 { + cancel() + } + return fmt.Errorf("scrape failed") + } + + go func() { + sl.run(10*time.Millisecond, time.Hour, nil) + signal <- struct{}{} + }() + + select { + case <-signal: + case <-time.After(5 * time.Second): + t.Fatalf("Scrape wasn't stopped.") + } + + // 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for + // each scrape successful or not. + if len(appender.result) != 22 { + t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result)) + } +} + func TestScrapeLoopAppend(t *testing.T) { tests := []struct {