diff --git a/pkg/textparse/parse.go b/pkg/textparse/parse.go index c540b627f..cfb0a4ce4 100644 --- a/pkg/textparse/parse.go +++ b/pkg/textparse/parse.go @@ -99,6 +99,7 @@ func (p *Parser) Err() error { } // Metric writes the labels of the current sample into the passed labels. +// It returns the string from which the metric was parsed. func (p *Parser) Metric(l *labels.Labels) string { // Allocate the full immutable string immediately, so we just // have to create references on it below. diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 2180307f8..b01c8bb69 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -120,6 +120,16 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable // Any errors that could occur here should be caught during config validation. log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) } + + newLoop := func( + ctx context.Context, + s scraper, + app, reportApp func() storage.Appender, + l log.Logger, + ) loop { + return newScrapeLoop(ctx, s, app, reportApp, l) + } + return &scrapePool{ appendable: app, config: cfg, @@ -127,7 +137,7 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable client: client, targets: map[uint64]*Target{}, loops: map[uint64]loop{}, - newLoop: newScrapeLoop, + newLoop: newLoop, } } @@ -427,39 +437,110 @@ type refEntry struct { type scrapeLoop struct { scraper scraper l log.Logger - - iter uint64 // scrape iteration + cache *scrapeCache appender func() storage.Appender reportAppender func() storage.Appender - refCache map[string]*refEntry // Parsed string to ref. - lsetCache map[string]*lsetCacheEntry // Ref to labelset and string - - // seriesCur and seriesPrev store the labels of series that were seen - // in the current and previous scrape. - // We hold two maps and swap them out to save allocations. - seriesCur map[uint64]labels.Labels - seriesPrev map[uint64]labels.Labels - ctx context.Context scrapeCtx context.Context cancel func() stopped chan struct{} } -func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender, l log.Logger) loop { +// scrapeCache tracks mappings of exposed metric strings to label sets and +// storage references. Additionally, it tracks staleness of series between +// scrapes. +type scrapeCache struct { + iter uint64 // Current scrape iteration. + + refs map[string]*refEntry // Parsed string to ref. + lsets map[string]*lsetCacheEntry // Ref to labelset and string. + + // seriesCur and seriesPrev store the labels of series that were seen + // in the current and previous scrape. + // We hold two maps and swap them out to save allocations. + seriesCur map[uint64]labels.Labels + seriesPrev map[uint64]labels.Labels +} + +func newScrapeCache() *scrapeCache { + return &scrapeCache{ + refs: map[string]*refEntry{}, + lsets: map[string]*lsetCacheEntry{}, + seriesCur: map[uint64]labels.Labels{}, + seriesPrev: map[uint64]labels.Labels{}, + } +} + +func (c *scrapeCache) iterDone() { + // refCache and lsetCache may grow over time through series churn + // or multiple string representations of the same metric. Clean up entries + // that haven't appeared in the last scrape. + for s, e := range c.refs { + if e.lastIter < c.iter { + delete(c.refs, s) + delete(c.lsets, e.ref) + } + } + + // Swap current and previous series. + c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev + + // We have to delete every single key in the map. + for k := range c.seriesCur { + delete(c.seriesCur, k) + } + + c.iter++ +} + +func (c *scrapeCache) getRef(met string) (string, bool) { + e, ok := c.refs[met] + if !ok { + return "", false + } + e.lastIter = c.iter + return e.ref, true +} + +func (c *scrapeCache) addRef(met, ref string, lset labels.Labels) { + c.refs[met] = &refEntry{ref: ref, lastIter: c.iter} + // met is the raw string the metric was ingested as. The label set is not ordered + // and thus it's not suitable to uniquely identify cache entries. + // We store a hash over the label set instead. + c.lsets[ref] = &lsetCacheEntry{lset: lset, hash: lset.Hash()} +} + +func (c *scrapeCache) trackStaleness(ref string) { + e := c.lsets[ref] + c.seriesCur[e.hash] = e.lset +} + +func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { + for h, lset := range c.seriesPrev { + if _, ok := c.seriesCur[h]; !ok { + if !f(lset) { + break + } + } + } +} + +func newScrapeLoop( + ctx context.Context, + sc scraper, + app, reportApp func() storage.Appender, + l log.Logger, +) *scrapeLoop { if l == nil { l = log.Base() } sl := &scrapeLoop{ scraper: sc, appender: app, + cache: newScrapeCache(), reportAppender: reportApp, - refCache: map[string]*refEntry{}, - lsetCache: map[string]*lsetCacheEntry{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, stopped: make(chan struct{}), ctx: ctx, l: l, @@ -487,8 +568,6 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { mainLoop: for { - sl.iter++ - buf.Reset() select { case <-sl.ctx.Done(): @@ -534,16 +613,6 @@ mainLoop: sl.report(start, time.Since(start), total, added, err) last = start - // refCache and lsetCache may grow over time through series churn - // or multiple string representation of the same metric. Clean up entries - // that haven't appeared in the last scrape. - for s, e := range sl.refCache { - if e.lastIter < sl.iter { - delete(sl.refCache, s) - delete(sl.lsetCache, e.ref) - } - } - select { case <-sl.ctx.Done(): close(sl.stopped) @@ -654,18 +723,12 @@ loop: t = *tp } - mets := yoloString(met) - re, ok := sl.refCache[mets] + ref, ok := sl.cache.getRef(yoloString(met)) if ok { - re.lastIter = sl.iter - - switch err = app.AddFast(re.ref, t, v); err { + switch err = app.AddFast(ref, t, v); err { case nil: if tp == nil { - e := sl.lsetCache[re.ref] - - // Bypass staleness logic if there is an explicit timestamp. - sl.seriesCur[e.hash] = e.lset + sl.cache.trackStaleness(ref) } case storage.ErrNotFound: ok = false @@ -686,7 +749,7 @@ loop: } if !ok { var lset labels.Labels - mets = p.Metric(&lset) + mets := p.Metric(&lset) var ref string ref, err = app.Add(lset, t, v) @@ -710,16 +773,11 @@ loop: break loop } - sl.refCache[mets] = &refEntry{ref: ref, lastIter: sl.iter} + sl.cache.addRef(mets, ref, lset) - // mets is the raw string the metric was ingested as and ambigious as it might - // not be sorted. Construct the authoritative string for the label set. - h := lset.Hash() - - sl.lsetCache[ref] = &lsetCacheEntry{lset: lset, hash: h} if tp == nil { // Bypass staleness logic if there is an explicit timestamp. - sl.seriesCur[h] = lset + sl.cache.trackStaleness(ref) } } added++ @@ -734,25 +792,19 @@ loop: sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") } if err == nil { - for metric, lset := range sl.seriesPrev { - if _, ok := sl.seriesCur[metric]; !ok { - // Series no longer exposed, mark it stale. - _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) - switch err { - case nil: - case errSeriesDropped: - err = nil - continue - case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: - // Do not count these in logging, as this is expected if a target - // goes away and comes back again with a new scrape loop. - err = nil - continue - default: - break - } + sl.cache.forEachStale(func(lset labels.Labels) bool { + // Series no longer exposed, mark it stale. + _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) + switch err { + case errSeriesDropped: + err = nil + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not count these in logging, as this is expected if a target + // goes away and comes back again with a new scrape loop. + err = nil } - } + return err == nil + }) } if err != nil { app.Rollback() @@ -762,13 +814,7 @@ loop: return total, 0, err } - // Swap current and previous series. - sl.seriesPrev, sl.seriesCur = sl.seriesCur, sl.seriesPrev - - // We have to delete every single key in the map. - for k := range sl.seriesCur { - delete(sl.seriesCur, k) - } + sl.cache.iterDone() return total, added, nil } @@ -833,12 +879,13 @@ func (sl *scrapeLoop) reportStale(start time.Time) error { } func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { - re, ok := sl.refCache[s] + // Suffix s with the invalid \xff unicode rune to avoid collisions + // with scraped metrics. + s2 := s + "\xff" + ref, ok := sl.cache.getRef(s2) if ok { - re.lastIter = sl.iter - - err := app.AddFast(re.ref, t, v) + err := app.AddFast(ref, t, v) switch err { case nil: return nil @@ -858,7 +905,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ref, err := app.Add(met, t, v) switch err { case nil: - sl.refCache[s] = &refEntry{ref: ref, lastIter: sl.iter} + sl.cache.addRef(s2, ref, met) return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: return nil diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index d9a36e04e..3f5565530 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -373,11 +373,9 @@ func TestScrapeLoopStop(t *testing.T) { // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { - numScrapes += 1 + numScrapes++ if numScrapes == 2 { - go func() { - sl.stop() - }() + go sl.stop() } w.Write([]byte("metric_a 42\n")) return nil @@ -398,7 +396,7 @@ func TestScrapeLoopStop(t *testing.T) { t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 2, len(appender.result)) } if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) { - t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)].v)) + t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)-1].v)) } if len(reportAppender.result) < 8 { @@ -515,7 +513,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { - numScrapes += 1 + numScrapes++ + if numScrapes == 1 { w.Write([]byte("metric_a 42\n")) return nil @@ -564,7 +563,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { - numScrapes += 1 + numScrapes++ + if numScrapes == 1 { w.Write([]byte("metric_a 42\n")) return nil @@ -601,15 +601,12 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { func TestScrapeLoopAppend(t *testing.T) { app := &collectResultAppender{} - sl := &scrapeLoop{ - appender: func() storage.Appender { return app }, - reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]*refEntry{}, - lsetCache: map[string]*lsetCacheEntry{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, - } + sl := newScrapeLoop(context.Background(), nil, + func() storage.Appender { return app }, + func() storage.Appender { return nopAppender{} }, + nil, + ) now := time.Now() _, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now) if err != nil { @@ -642,14 +639,11 @@ func TestScrapeLoopAppend(t *testing.T) { func TestScrapeLoopAppendStaleness(t *testing.T) { app := &collectResultAppender{} - sl := &scrapeLoop{ - appender: func() storage.Appender { return app }, - reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]*refEntry{}, - lsetCache: map[string]*lsetCacheEntry{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, - } + sl := newScrapeLoop(context.Background(), nil, + func() storage.Appender { return app }, + func() storage.Appender { return nopAppender{} }, + nil, + ) now := time.Now() _, _, err := sl.append([]byte("metric_a 1\n"), now) @@ -688,12 +682,11 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { app := &collectResultAppender{} - sl := &scrapeLoop{ - appender: func() storage.Appender { return app }, - reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]*refEntry{}, - lsetCache: map[string]*lsetCacheEntry{}, - } + sl := newScrapeLoop(context.Background(), nil, + func() storage.Appender { return app }, + func() storage.Appender { return nopAppender{} }, + nil, + ) now := time.Now() _, _, err := sl.append([]byte("metric_a 1 1000\n"), now) @@ -737,15 +730,11 @@ func (app *errorAppender) AddFast(ref string, t int64, v float64) error { func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) { app := &errorAppender{} - sl := &scrapeLoop{ - appender: func() storage.Appender { return app }, - reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]*refEntry{}, - lsetCache: map[string]*lsetCacheEntry{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, - l: log.Base(), - } + sl := newScrapeLoop(context.Background(), nil, + func() storage.Appender { return app }, + func() storage.Appender { return nopAppender{} }, + nil, + ) now := time.Unix(1, 0) _, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\n"), now)