diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 0dae21e77..da78cb5dd 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -40,8 +40,10 @@ type sample struct { // collectResultAppender records all samples that were added through the appender. // It can be used as its zero value or be backed by another appender it writes samples through. type collectResultAppender struct { - next storage.Appender - result []sample + next storage.Appender + result []sample + pendingResult []sample + rolledbackResult []sample mapper map[uint64]labels.Labels } @@ -55,7 +57,7 @@ func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error { if err != nil { return err } - a.result = append(a.result, sample{ + a.pendingResult = append(a.pendingResult, sample{ metric: a.mapper[ref], t: t, v: v, @@ -64,7 +66,7 @@ func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error { } func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) { - a.result = append(a.result, sample{ + a.pendingResult = append(a.pendingResult, sample{ metric: m, t: t, v: v, @@ -85,5 +87,20 @@ func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64 return ref, nil } -func (a *collectResultAppender) Commit() error { return nil } -func (a *collectResultAppender) Rollback() error { return nil } +func (a *collectResultAppender) Commit() error { + a.result = append(a.result, a.pendingResult...) + a.pendingResult = nil + if a.next == nil { + return nil + } + return a.next.Commit() +} + +func (a *collectResultAppender) Rollback() error { + a.rolledbackResult = a.pendingResult + a.pendingResult = nil + if a.next == nil { + return nil + } + return a.next.Rollback() +} diff --git a/scrape/scrape.go b/scrape/scrape.go index 3b792d469..9883156ad 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -932,61 +932,7 @@ mainLoop: default: } - var ( - start = time.Now() - scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout) - ) - - // Only record after the first scrape. - if !last.IsZero() { - targetIntervalLength.WithLabelValues(interval.String()).Observe( - time.Since(last).Seconds(), - ) - } - - b := sl.buffers.Get(sl.lastScrapeSize).([]byte) - buf := bytes.NewBuffer(b) - - contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf) - cancel() - - if scrapeErr == nil { - b = buf.Bytes() - // NOTE: There were issues with misbehaving clients in the past - // that occasionally returned empty results. We don't want those - // to falsely reset our buffer size. - if len(b) > 0 { - sl.lastScrapeSize = len(b) - } - } else { - level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error()) - if errc != nil { - errc <- scrapeErr - } - } - - // A failed scrape is the same as an empty scrape, - // we still call sl.append to trigger stale markers. - total, added, seriesAdded, appErr := sl.append(b, contentType, start) - if appErr != nil { - level.Debug(sl.l).Log("msg", "Append failed", "err", appErr) - // The append failed, probably due to a parse error or sample limit. - // Call sl.append again with an empty scrape to trigger stale markers. - if _, _, _, err := sl.append([]byte{}, "", start); err != nil { - level.Warn(sl.l).Log("msg", "Append failed", "err", err) - } - } - - sl.buffers.Put(b) - - if scrapeErr == nil { - scrapeErr = appErr - } - - if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil { - level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err) - } - last = start + last = sl.scrapeAndReport(interval, timeout, last, errc) select { case <-sl.parentCtx.Done(): @@ -1005,6 +951,83 @@ mainLoop: } } +// scrapeAndReport performs a scrape and then appends the result to the storage +// together with reporting metrics, by using as few appenders as possible. +// In the happy scenario, a single appender is used. +func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time { + var ( + start = time.Now() + scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout) + ) + + // Only record after the first scrape. + if !last.IsZero() { + targetIntervalLength.WithLabelValues(interval.String()).Observe( + time.Since(last).Seconds(), + ) + } + + b := sl.buffers.Get(sl.lastScrapeSize).([]byte) + buf := bytes.NewBuffer(b) + + contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf) + cancel() + + if scrapeErr == nil { + b = buf.Bytes() + // NOTE: There were issues with misbehaving clients in the past + // that occasionally returned empty results. We don't want those + // to falsely reset our buffer size. + if len(b) > 0 { + sl.lastScrapeSize = len(b) + } + } else { + level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error()) + if errc != nil { + errc <- scrapeErr + } + } + + app := sl.appender() + var err error + defer func() { + if err != nil { + app.Rollback() + return + } + err = app.Commit() + if err != nil { + level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err) + } + }() + // A failed scrape is the same as an empty scrape, + // we still call sl.append to trigger stale markers. + total, added, seriesAdded, appErr := sl.append(app, b, contentType, start) + if appErr != nil { + app.Rollback() + app = sl.appender() + level.Debug(sl.l).Log("msg", "Append failed", "err", appErr) + // The append failed, probably due to a parse error or sample limit. + // Call sl.append again with an empty scrape to trigger stale markers. + if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil { + app.Rollback() + app = sl.appender() + level.Warn(sl.l).Log("msg", "Append failed", "err", err) + } + } + + sl.buffers.Put(b) + + if scrapeErr == nil { + scrapeErr = appErr + } + + if err = sl.report(app, start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil { + level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err) + } + return start +} + func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) { // Scraping has stopped. We want to write stale markers but // the target may be recreated, so we wait just over 2 scrape intervals @@ -1045,11 +1068,25 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // Call sl.append again with an empty scrape to trigger stale markers. // If the target has since been recreated and scraped, the // stale markers will be out of order and ignored. - if _, _, _, err := sl.append([]byte{}, "", staleTime); err != nil { - level.Error(sl.l).Log("msg", "stale append failed", "err", err) + app := sl.appender() + var err error + defer func() { + if err != nil { + app.Rollback() + return + } + err = app.Commit() + if err != nil { + level.Warn(sl.l).Log("msg", "Stale commit failed", "err", err) + } + }() + if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil { + app.Rollback() + app = sl.appender() + level.Warn(sl.l).Log("msg", "Stale append failed", "err", err) } - if err := sl.reportStale(staleTime); err != nil { - level.Error(sl.l).Log("msg", "stale report failed", "err", err) + if err = sl.reportStale(app, staleTime); err != nil { + level.Warn(sl.l).Log("msg", "Stale report failed", "err", err) } } @@ -1074,9 +1111,8 @@ type appendErrors struct { numOutOfBounds int } -func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { +func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { var ( - app = sl.appender() p = textparse.New(b, contentType) defTime = timestamp.FromTime(ts) appErrs = appendErrors{} @@ -1085,10 +1121,6 @@ func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, defer func() { if err != nil { - app.Rollback() - return - } - if err = app.Commit(); err != nil { return } // Only perform cache cleaning if the scrape was not empty. @@ -1275,7 +1307,7 @@ const ( scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff" ) -func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, added, seriesAdded int, scrapeErr error) (err error) { +func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration time.Duration, scraped, added, seriesAdded int, scrapeErr error) (err error) { sl.scraper.Report(start, duration, scrapeErr) ts := timestamp.FromTime(start) @@ -1284,14 +1316,6 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a if scrapeErr == nil { health = 1 } - app := sl.appender() - defer func() { - if err != nil { - app.Rollback() - return - } - err = app.Commit() - }() if err = sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { return @@ -1311,16 +1335,8 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a return } -func (sl *scrapeLoop) reportStale(start time.Time) (err error) { +func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) { ts := timestamp.FromTime(start) - app := sl.appender() - defer func() { - if err != nil { - app.Rollback() - return - } - err = app.Commit() - }() stale := math.Float64frombits(value.StaleNaN) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 1651b437d..7d0e730a7 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -612,7 +612,8 @@ func TestScrapeLoopMetadata(t *testing.T) { ) defer cancel() - total, _, _, err := sl.append([]byte(`# TYPE test_metric counter + slApp := sl.appender() + total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter # HELP test_metric some help text # UNIT test_metric metric test_metric 1 @@ -620,6 +621,7 @@ test_metric 1 # HELP test_metric_no_type other help text # EOF`), "application/openmetrics-text", time.Now()) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) testutil.Equals(t, 1, total) md, ok := cache.GetMetadata("test_metric") @@ -661,13 +663,17 @@ func TestScrapeLoopSeriesAdded(t *testing.T) { ) defer cancel() - total, added, seriesAdded, err := sl.append([]byte("test_metric 1\n"), "", time.Time{}) + slApp := sl.appender() + total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) testutil.Equals(t, 1, total) testutil.Equals(t, 1, added) testutil.Equals(t, 1, seriesAdded) - total, added, seriesAdded, err = sl.append([]byte("test_metric 1\n"), "", time.Time{}) + slApp = sl.appender() + total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) + testutil.Ok(t, slApp.Commit()) testutil.Ok(t, err) testutil.Equals(t, 1, total) testutil.Equals(t, 1, added) @@ -854,9 +860,7 @@ func TestScrapeLoopCache(t *testing.T) { // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for // each scrape successful or not. - if len(appender.result) != 26 { - t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 26, len(appender.result)) - } + testutil.Equals(t, 26, len(appender.result), "Appended samples not as expected") } func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { @@ -992,8 +996,10 @@ func TestScrapeLoopAppend(t *testing.T) { now := time.Now() - _, _, _, err := sl.append([]byte(test.scrapeLabels), "", now) + slApp := sl.appender() + _, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) expected := []sample{ { @@ -1043,8 +1049,10 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { sl.cache.addRef(mets, fakeRef, lset, hash) now := time.Now() - _, _, _, err := sl.append([]byte(metric), "", now) + slApp := sl.appender() + _, _, _, err := sl.append(slApp, []byte(metric), "", now) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) expected := []sample{ { @@ -1084,10 +1092,12 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { beforeMetricValue := beforeMetric.GetCounter().GetValue() now := time.Now() - total, added, seriesAdded, err := sl.append([]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) + slApp := sl.appender() + total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) if err != errSampleLimit { t.Fatalf("Did not see expected sample limit error: %s", err) } + testutil.Ok(t, slApp.Rollback()) testutil.Equals(t, 3, total) testutil.Equals(t, 3, added) testutil.Equals(t, 1, seriesAdded) @@ -1110,13 +1120,15 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { v: 1, }, } - testutil.Equals(t, want, resApp.result, "Appended samples not as expected") + testutil.Equals(t, want, resApp.rolledbackResult, "Appended samples not as expected") now = time.Now() - total, added, seriesAdded, err = sl.append([]byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now) + slApp = sl.appender() + total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now) if err != errSampleLimit { t.Fatalf("Did not see expected sample limit error: %s", err) } + testutil.Ok(t, slApp.Rollback()) testutil.Equals(t, 9, total) testutil.Equals(t, 6, added) testutil.Equals(t, 0, seriesAdded) @@ -1144,11 +1156,15 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { ) now := time.Now() - _, _, _, err := sl.append([]byte(`metric_a{a="1",b="1"} 1`), "", now) + slApp := sl.appender() + _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) - _, _, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute)) + slApp = sl.appender() + _, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute)) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) // DeepEqual will report NaNs as being different, so replace with a different value. want := []sample{ @@ -1180,11 +1196,15 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { ) now := time.Now() - _, _, _, err := sl.append([]byte("metric_a 1\n"), "", now) + slApp := sl.appender() + _, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) - _, _, _, err = sl.append([]byte(""), "", now.Add(time.Second)) + slApp = sl.appender() + _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) ingestedNaN := math.Float64bits(app.result[1].v) testutil.Equals(t, value.StaleNaN, ingestedNaN, "Appended stale sample wasn't as expected") @@ -1219,11 +1239,15 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { ) now := time.Now() - _, _, _, err := sl.append([]byte("metric_a 1 1000\n"), "", now) + slApp := sl.appender() + _, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) - _, _, _, err = sl.append([]byte(""), "", now.Add(time.Second)) + slApp = sl.appender() + _, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second)) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) want := []sample{ { @@ -1328,8 +1352,10 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T ) now := time.Unix(1, 0) - total, added, seriesAdded, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now) + slApp := sl.appender() + total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) want := []sample{ { @@ -1363,12 +1389,14 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { ) now := time.Now().Add(20 * time.Minute) - total, added, seriesAdded, err := sl.append([]byte("normal 1\n"), "", now) + slApp := sl.appender() + total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now) + testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) testutil.Equals(t, 1, total) testutil.Equals(t, 1, added) testutil.Equals(t, 0, seriesAdded) - testutil.Ok(t, err) } func TestTargetScraperScrapeOK(t *testing.T) { @@ -1548,8 +1576,10 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { ) now := time.Now() - _, _, _, err := sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now) + slApp := sl.appender() + _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) want := []sample{ { @@ -1579,8 +1609,10 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { ) now := time.Now() - _, _, _, err := sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now) + slApp := sl.appender() + _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) want := []sample{ { @@ -1612,8 +1644,10 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { defer cancel() // We add a good and a bad metric to check that both are discarded. - _, _, _, err := sl.append([]byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{}) + slApp := sl.appender() + _, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{}) testutil.NotOk(t, err) + testutil.Ok(t, slApp.Rollback()) q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) @@ -1622,8 +1656,10 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { testutil.Ok(t, series.Err()) // We add a good metric to check that it is recorded. - _, _, _, err = sl.append([]byte("test_metric{le=\"500\"} 1\n"), "", time.Time{}) + slApp = sl.appender() + _, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{}) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) @@ -1657,8 +1693,10 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { ) defer cancel() - _, _, _, err := sl.append([]byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{}) + slApp := sl.appender() + _, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{}) testutil.NotOk(t, err) + testutil.Ok(t, slApp.Rollback()) testutil.Equals(t, errNameLabelMandatory, err) q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) @@ -1873,8 +1911,10 @@ func TestScrapeAddFast(t *testing.T) { ) defer cancel() - _, _, _, err := sl.append([]byte("up 1\n"), "", time.Time{}) + slApp := sl.appender() + _, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{}) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) // Poison the cache. There is just one entry, and one series in the // storage. Changing the ref will create a 'not found' error. @@ -1882,8 +1922,10 @@ func TestScrapeAddFast(t *testing.T) { v.ref++ } - _, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second)) + slApp = sl.appender() + _, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second)) testutil.Ok(t, err) + testutil.Ok(t, slApp.Commit()) } func TestReuseCacheRace(t *testing.T) { @@ -1928,3 +1970,66 @@ func TestCheckAddError(t *testing.T) { sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, &appErrs) testutil.Equals(t, 1, appErrs.numOutOfOrder) } + +func TestScrapeReportSingleAppender(t *testing.T) { + s := teststorage.New(t) + defer s.Close() + + var ( + signal = make(chan struct{}, 1) + scraper = &testScraper{} + ) + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + s.Appender, + nil, + 0, + true, + ) + + numScrapes := 0 + + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + numScrapes++ + if numScrapes%4 == 0 { + return fmt.Errorf("scrape failed") + } + w.Write([]byte("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n")) + return nil + } + + go func() { + sl.run(10*time.Millisecond, time.Hour, nil) + signal <- struct{}{} + }() + + start := time.Now() + for time.Since(start) < 3*time.Second { + q, err := s.Querier(ctx, time.Time{}.UnixNano(), time.Now().UnixNano()) + testutil.Ok(t, err) + series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+")) + + c := 0 + for series.Next() { + i := series.At().Iterator() + for i.Next() { + c++ + } + } + + testutil.Equals(t, 0, c%9, "Appended samples not as expected: %d", c) + q.Close() + } + cancel() + + select { + case <-signal: + case <-time.After(5 * time.Second): + t.Fatalf("Scrape wasn't stopped.") + } +}