diff --git a/pkg/textparse/parse.go b/pkg/textparse/parse.go index b3cdd6fcd..c540b627f 100644 --- a/pkg/textparse/parse.go +++ b/pkg/textparse/parse.go @@ -99,7 +99,7 @@ func (p *Parser) Err() error { } // Metric writes the labels of the current sample into the passed labels. -func (p *Parser) Metric(l *labels.Labels) { +func (p *Parser) Metric(l *labels.Labels) string { // Allocate the full immutable string immediately, so we just // have to create references on it below. s := string(p.l.b[p.l.mstart:p.l.mend]) @@ -118,6 +118,8 @@ func (p *Parser) Metric(l *labels.Labels) { } sort.Sort((*l)[1:]) + + return s } func yoloString(b []byte) string { diff --git a/retrieval/scrape.go b/retrieval/scrape.go index b646c5d99..2180307f8 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -416,25 +416,31 @@ type loop interface { type lsetCacheEntry struct { lset labels.Labels - str string + hash uint64 +} + +type refEntry struct { + ref string + lastIter uint64 } type scrapeLoop struct { scraper scraper l log.Logger + iter uint64 // scrape iteration + appender func() storage.Appender reportAppender func() storage.Appender - // TODO: Keep only the values from the last scrape to avoid a memory leak. - refCache map[string]string // Parsed string to ref. - lsetCache map[string]lsetCacheEntry // Ref to labelset and string + refCache map[string]*refEntry // Parsed string to ref. + lsetCache map[string]*lsetCacheEntry // Ref to labelset and string // seriesCur and seriesPrev store the labels of series that were seen // in the current and previous scrape. // We hold two maps and swap them out to save allocations. - seriesCur map[string]labels.Labels - seriesPrev map[string]labels.Labels + seriesCur map[uint64]labels.Labels + seriesPrev map[uint64]labels.Labels ctx context.Context scrapeCtx context.Context @@ -450,10 +456,10 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag scraper: sc, appender: app, reportAppender: reportApp, - refCache: map[string]string{}, - lsetCache: map[string]lsetCacheEntry{}, - seriesCur: map[string]labels.Labels{}, - seriesPrev: map[string]labels.Labels{}, + refCache: map[string]*refEntry{}, + lsetCache: map[string]*lsetCacheEntry{}, + seriesCur: map[uint64]labels.Labels{}, + seriesPrev: map[uint64]labels.Labels{}, stopped: make(chan struct{}), ctx: ctx, l: l, @@ -481,6 +487,8 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { mainLoop: for { + sl.iter++ + buf.Reset() select { case <-sl.ctx.Done(): @@ -526,6 +534,16 @@ mainLoop: sl.report(start, time.Since(start), total, added, err) last = start + // refCache and lsetCache may grow over time through series churn + // or multiple string representation of the same metric. Clean up entries + // that haven't appeared in the last scrape. + for s, e := range sl.refCache { + if e.lastIter < sl.iter { + delete(sl.refCache, s) + delete(sl.lsetCache, e.ref) + } + } + select { case <-sl.ctx.Done(): close(sl.stopped) @@ -637,13 +655,17 @@ loop: } mets := yoloString(met) - ref, ok := sl.refCache[mets] + re, ok := sl.refCache[mets] if ok { - switch err = app.AddFast(ref, t, v); err { + re.lastIter = sl.iter + + switch err = app.AddFast(re.ref, t, v); err { case nil: if tp == nil { + e := sl.lsetCache[re.ref] + // Bypass staleness logic if there is an explicit timestamp. - sl.seriesCur[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset + sl.seriesCur[e.hash] = e.lset } case storage.ErrNotFound: ok = false @@ -652,10 +674,10 @@ loop: continue case storage.ErrOutOfOrderSample: sl.l.With("timeseries", string(met)).Debug("Out of order sample") - numOutOfOrder += 1 + numOutOfOrder++ continue case storage.ErrDuplicateSampleForTimestamp: - numDuplicates += 1 + numDuplicates++ sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") continue default: @@ -664,8 +686,9 @@ loop: } if !ok { var lset labels.Labels - p.Metric(&lset) + mets = p.Metric(&lset) + var ref string ref, err = app.Add(lset, t, v) // TODO(fabxc): also add a dropped-cache? switch err { @@ -676,24 +699,27 @@ loop: case storage.ErrOutOfOrderSample: err = nil sl.l.With("timeseries", string(met)).Debug("Out of order sample") - numOutOfOrder += 1 + numOutOfOrder++ continue case storage.ErrDuplicateSampleForTimestamp: err = nil - numDuplicates += 1 + numDuplicates++ sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") continue default: break loop } - // Allocate a real string. - mets = string(met) - sl.refCache[mets] = ref - str := lset.String() - sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str} + + sl.refCache[mets] = &refEntry{ref: ref, lastIter: sl.iter} + + // mets is the raw string the metric was ingested as and ambigious as it might + // not be sorted. Construct the authoritative string for the label set. + h := lset.Hash() + + sl.lsetCache[ref] = &lsetCacheEntry{lset: lset, hash: h} if tp == nil { // Bypass staleness logic if there is an explicit timestamp. - sl.seriesCur[str] = lset + sl.seriesCur[h] = lset } } added++ @@ -807,10 +833,12 @@ func (sl *scrapeLoop) reportStale(start time.Time) error { } func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { - ref, ok := sl.refCache[s] + re, ok := sl.refCache[s] if ok { - err := app.AddFast(ref, t, v) + re.lastIter = sl.iter + + err := app.AddFast(re.ref, t, v) switch err { case nil: return nil @@ -830,7 +858,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ref, err := app.Add(met, t, v) switch err { case nil: - sl.refCache[s] = ref + sl.refCache[s] = &refEntry{ref: ref, lastIter: sl.iter} return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: return nil diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index c76a70da3..d9a36e04e 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -604,10 +604,10 @@ func TestScrapeLoopAppend(t *testing.T) { sl := &scrapeLoop{ appender: func() storage.Appender { return app }, reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]string{}, - lsetCache: map[string]lsetCacheEntry{}, - seriesCur: map[string]labels.Labels{}, - seriesPrev: map[string]labels.Labels{}, + refCache: map[string]*refEntry{}, + lsetCache: map[string]*lsetCacheEntry{}, + seriesCur: map[uint64]labels.Labels{}, + seriesPrev: map[uint64]labels.Labels{}, } now := time.Now() @@ -645,10 +645,10 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { sl := &scrapeLoop{ appender: func() storage.Appender { return app }, reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]string{}, - lsetCache: map[string]lsetCacheEntry{}, - seriesCur: map[string]labels.Labels{}, - seriesPrev: map[string]labels.Labels{}, + refCache: map[string]*refEntry{}, + lsetCache: map[string]*lsetCacheEntry{}, + seriesCur: map[uint64]labels.Labels{}, + seriesPrev: map[uint64]labels.Labels{}, } now := time.Now() @@ -691,8 +691,8 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { sl := &scrapeLoop{ appender: func() storage.Appender { return app }, reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]string{}, - lsetCache: map[string]lsetCacheEntry{}, + refCache: map[string]*refEntry{}, + lsetCache: map[string]*lsetCacheEntry{}, } now := time.Now() @@ -740,10 +740,10 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) { sl := &scrapeLoop{ appender: func() storage.Appender { return app }, reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]string{}, - lsetCache: map[string]lsetCacheEntry{}, - seriesCur: map[string]labels.Labels{}, - seriesPrev: map[string]labels.Labels{}, + refCache: map[string]*refEntry{}, + lsetCache: map[string]*lsetCacheEntry{}, + seriesCur: map[uint64]labels.Labels{}, + seriesPrev: map[uint64]labels.Labels{}, l: log.Base(), }