From 98e14611a528e7eec323786f8ec67c81e111c51b Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Mon, 5 Oct 2020 18:17:50 +0200 Subject: [PATCH] Move the tolerance logic in the loop function. Signed-off-by: Julien Pivotto --- scrape/scrape.go | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index f8de16bed..e3c4f0201 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -991,6 +991,7 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { var last time.Time ticker := time.NewTicker(interval) + alignedScrapeTime := time.Now() defer ticker.Stop() mainLoop: @@ -1004,7 +1005,23 @@ mainLoop: default: } - last = sl.scrapeAndReport(interval, timeout, last, errc) + // Temporary workaround for a jitter in go timers that causes disk space + // increase in TSDB. + // See https://github.com/prometheus/prometheus/issues/7846 + scrapeTime := time.Now() + if interval > 100*scrapeTimestampTolerance { + // For some reason, a ticker might have been skipped, in which case we + // would call alignedScrapeTime.Add(interval) multiple times. + for scrapeTime.Sub(alignedScrapeTime) >= interval { + alignedScrapeTime = alignedScrapeTime.Add(interval) + } + // Align the scrape time if we are in the tolerance boundaries. + if t := scrapeTime.Sub(alignedScrapeTime); t >= -scrapeTimestampTolerance && t <= scrapeTimestampTolerance { + scrapeTime = alignedScrapeTime + } + } + + last = sl.scrapeAndReport(interval, timeout, last, scrapeTime, errc) select { case <-sl.parentCtx.Done(): @@ -1028,7 +1045,7 @@ mainLoop: // In the happy scenario, a single appender is used. // This function uses sl.parentCtx instead of sl.ctx on purpose. A scrape should // only be cancelled on shutdown, not on reloads. -func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time { +func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last, appendTime time.Time, errc chan<- error) time.Time { start := time.Now() // Only record after the first scrape. @@ -1038,16 +1055,6 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time ) } - // Temporary workaround for a jitter in go timers that causes disk space - // increase in TSDB. - // See https://github.com/prometheus/prometheus/issues/7846 - if last.IsZero() && interval > 100*scrapeTimestampTolerance { - exactStart := last.Add(interval) - if t := start.Sub(exactStart); t >= -scrapeTimestampTolerance && t <= scrapeTimestampTolerance { - start = exactStart - } - } - b := sl.buffers.Get(sl.lastScrapeSize).([]byte) defer sl.buffers.Put(b) buf := bytes.NewBuffer(b) @@ -1068,7 +1075,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time }() defer func() { - if err = sl.report(app, start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil { + if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil { level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err) } }() @@ -1076,7 +1083,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time if forcedErr := sl.getForcedError(); forcedErr != nil { scrapeErr = forcedErr // Add stale markers. - if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil { + if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil { app.Rollback() app = sl.appender(sl.parentCtx) level.Warn(sl.l).Log("msg", "Append failed", "err", err) @@ -1110,14 +1117,14 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time // A failed scrape is the same as an empty scrape, // we still call sl.append to trigger stale markers. - total, added, seriesAdded, appErr = sl.append(app, b, contentType, start) + total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime) if appErr != nil { app.Rollback() app = sl.appender(sl.parentCtx) level.Debug(sl.l).Log("msg", "Append failed", "err", appErr) // The append failed, probably due to a parse error or sample limit. // Call sl.append again with an empty scrape to trigger stale markers. - if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil { + if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil { app.Rollback() app = sl.appender(sl.parentCtx) level.Warn(sl.l).Log("msg", "Append failed", "err", err)