diff --git a/scrape/scrape.go b/scrape/scrape.go index 98cea3292..80b754108 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1068,16 +1068,20 @@ func (sl *scrapeLoop) getCache() *scrapeCache { return sl.cache } +type appendErrors struct { + numOutOfOrder int + numDuplicates int + numOutOfBounds int +} + func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { var ( app = sl.appender() p = textparse.New(b, contentType) defTime = timestamp.FromTime(ts) - numOutOfOrder = 0 - numDuplicates = 0 - numOutOfBounds = 0 + appErrs = appendErrors{} + sampleLimitErr error ) - var sampleLimitErr error defer func() { if err != nil { @@ -1094,7 +1098,10 @@ func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, loop: for { - var et textparse.Entry + var ( + et textparse.Entry + sampleAdded bool + ) if et, err = p.Next(); err != nil { if err == io.EOF { err = nil @@ -1130,37 +1137,13 @@ loop: continue } ce, ok := sl.cache.get(yoloString(met)) + if ok { - switch err = app.AddFast(ce.ref, t, v); errors.Cause(err) { - case nil: - if tp == nil { - sl.cache.trackStaleness(ce.hash, ce.lset) - } - case storage.ErrNotFound: + err = app.AddFast(ce.ref, t, v) + sampleAdded, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, appErrs) + // In theory this should never happen. + if err == storage.ErrNotFound { ok = false - case storage.ErrOutOfOrderSample: - numOutOfOrder++ - level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met)) - targetScrapeSampleOutOfOrder.Inc() - continue - case storage.ErrDuplicateSampleForTimestamp: - numDuplicates++ - level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met)) - targetScrapeSampleDuplicate.Inc() - continue - case storage.ErrOutOfBounds: - numOutOfBounds++ - level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met)) - targetScrapeSampleOutOfBounds.Inc() - continue - case errSampleLimit: - // Keep on parsing output if we hit the limit, so we report the correct - // total number of samples scraped. - sampleLimitErr = err - added++ - continue - default: - break loop } } if !ok { @@ -1186,42 +1169,28 @@ loop: var ref uint64 ref, err = app.Add(lset, t, v) - switch errors.Cause(err) { - case nil: - case storage.ErrOutOfOrderSample: - err = nil - numOutOfOrder++ - level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met)) - targetScrapeSampleOutOfOrder.Inc() - continue - case storage.ErrDuplicateSampleForTimestamp: - err = nil - numDuplicates++ - level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met)) - targetScrapeSampleDuplicate.Inc() - continue - case storage.ErrOutOfBounds: - err = nil - numOutOfBounds++ - level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met)) - targetScrapeSampleOutOfBounds.Inc() - continue - case errSampleLimit: - sampleLimitErr = err - added++ - continue - default: - level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err) + sampleAdded, err = sl.checkAddError(nil, met, tp, err, &sampleLimitErr, appErrs) + if err != nil { + if err != storage.ErrNotFound { + level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err) + } break loop } + if tp == nil { // Bypass staleness logic if there is an explicit timestamp. sl.cache.trackStaleness(hash, lset) } sl.cache.addRef(mets, ref, lset, hash) - seriesAdded++ + if sampleAdded && sampleLimitErr == nil { + seriesAdded++ + } + } + + // Increment added even if there's a sampleLimitErr so we correctly report the number of samples scraped. + if sampleAdded || sampleLimitErr != nil { + added++ } - added++ } if sampleLimitErr != nil { if err == nil { @@ -1230,14 +1199,14 @@ loop: // We only want to increment this once per scrape, so this is Inc'd outside the loop. targetScrapeSampleLimit.Inc() } - if numOutOfOrder > 0 { - level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder) + if appErrs.numOutOfOrder > 0 { + level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder) } - if numDuplicates > 0 { - level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates) + if appErrs.numDuplicates > 0 { + level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates) } - if numOutOfBounds > 0 { - level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds) + if appErrs.numOutOfBounds > 0 { + level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds) } if err == nil { sl.cache.forEachStale(func(lset labels.Labels) bool { @@ -1259,6 +1228,43 @@ func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } +// Adds samples to the appender, checking the error, and then returns the # of samples added, +// whether the caller should continue to process more samples, and any sample limit errors. + +func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr *error, appErrs appendErrors) (bool, error) { + switch errors.Cause(err) { + case nil: + if tp == nil && ce != nil { + sl.cache.trackStaleness(ce.hash, ce.lset) + } + return true, nil + case storage.ErrNotFound: + return false, storage.ErrNotFound + case storage.ErrOutOfOrderSample: + appErrs.numOutOfOrder++ + level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met)) + targetScrapeSampleOutOfOrder.Inc() + return false, nil + case storage.ErrDuplicateSampleForTimestamp: + appErrs.numDuplicates++ + level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met)) + targetScrapeSampleDuplicate.Inc() + return false, nil + case storage.ErrOutOfBounds: + appErrs.numOutOfBounds++ + level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met)) + targetScrapeSampleOutOfBounds.Inc() + return false, nil + case errSampleLimit: + // Keep on parsing output if we hit the limit, so we report the correct + // total number of samples scraped. + *sampleLimitErr = err + return false, nil + default: + return false, err + } +} + // The constants are suffixed with the invalid \xff unicode rune to avoid collisions // with scraped metrics in the cache. const ( diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 1d3864bb9..5b120cead 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1014,6 +1014,48 @@ func TestScrapeLoopAppend(t *testing.T) { } } +func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { + // collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next. + app := &collectResultAppender{} + + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + nopMutator, + nopMutator, + func() storage.Appender { return app }, + nil, + 0, + true, + ) + + fakeRef := uint64(1) + expValue := float64(1) + metric := `metric{n="1"} 1` + p := textparse.New([]byte(metric), "") + + var lset labels.Labels + p.Next() + mets := p.Metric(&lset) + hash := lset.Hash() + + // Create a fake entry in the cache + sl.cache.addRef(mets, fakeRef, lset, hash) + now := time.Now() + + _, _, _, err := sl.append([]byte(metric), "", now) + testutil.Ok(t, err) + + expected := []sample{ + { + metric: lset, + t: timestamp.FromTime(now), + v: expValue, + }, + } + + testutil.Equals(t, expected, app.result) +} + func TestScrapeLoopAppendSampleLimit(t *testing.T) { resApp := &collectResultAppender{} app := &limitAppender{Appender: resApp, limit: 1}