Move the tolerance logic in the loop function.
Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
parent
6544f95403
commit
98e14611a5
|
@ -991,6 +991,7 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
|||
var last time.Time
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
alignedScrapeTime := time.Now()
|
||||
defer ticker.Stop()
|
||||
|
||||
mainLoop:
|
||||
|
@ -1004,7 +1005,23 @@ mainLoop:
|
|||
default:
|
||||
}
|
||||
|
||||
last = sl.scrapeAndReport(interval, timeout, last, errc)
|
||||
// Temporary workaround for a jitter in go timers that causes disk space
|
||||
// increase in TSDB.
|
||||
// See https://github.com/prometheus/prometheus/issues/7846
|
||||
scrapeTime := time.Now()
|
||||
if interval > 100*scrapeTimestampTolerance {
|
||||
// For some reason, a ticker might have been skipped, in which case we
|
||||
// would call alignedScrapeTime.Add(interval) multiple times.
|
||||
for scrapeTime.Sub(alignedScrapeTime) >= interval {
|
||||
alignedScrapeTime = alignedScrapeTime.Add(interval)
|
||||
}
|
||||
// Align the scrape time if we are in the tolerance boundaries.
|
||||
if t := scrapeTime.Sub(alignedScrapeTime); t >= -scrapeTimestampTolerance && t <= scrapeTimestampTolerance {
|
||||
scrapeTime = alignedScrapeTime
|
||||
}
|
||||
}
|
||||
|
||||
last = sl.scrapeAndReport(interval, timeout, last, scrapeTime, errc)
|
||||
|
||||
select {
|
||||
case <-sl.parentCtx.Done():
|
||||
|
@ -1028,7 +1045,7 @@ mainLoop:
|
|||
// In the happy scenario, a single appender is used.
|
||||
// This function uses sl.parentCtx instead of sl.ctx on purpose. A scrape should
|
||||
// only be cancelled on shutdown, not on reloads.
|
||||
func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time {
|
||||
func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last, appendTime time.Time, errc chan<- error) time.Time {
|
||||
start := time.Now()
|
||||
|
||||
// Only record after the first scrape.
|
||||
|
@ -1038,16 +1055,6 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
|||
)
|
||||
}
|
||||
|
||||
// Temporary workaround for a jitter in go timers that causes disk space
|
||||
// increase in TSDB.
|
||||
// See https://github.com/prometheus/prometheus/issues/7846
|
||||
if last.IsZero() && interval > 100*scrapeTimestampTolerance {
|
||||
exactStart := last.Add(interval)
|
||||
if t := start.Sub(exactStart); t >= -scrapeTimestampTolerance && t <= scrapeTimestampTolerance {
|
||||
start = exactStart
|
||||
}
|
||||
}
|
||||
|
||||
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
|
||||
defer sl.buffers.Put(b)
|
||||
buf := bytes.NewBuffer(b)
|
||||
|
@ -1068,7 +1075,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
|||
}()
|
||||
|
||||
defer func() {
|
||||
if err = sl.report(app, start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
|
||||
if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
|
||||
level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
|
||||
}
|
||||
}()
|
||||
|
@ -1076,7 +1083,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
|||
if forcedErr := sl.getForcedError(); forcedErr != nil {
|
||||
scrapeErr = forcedErr
|
||||
// Add stale markers.
|
||||
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
|
||||
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender(sl.parentCtx)
|
||||
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
|
||||
|
@ -1110,14 +1117,14 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
|
|||
|
||||
// A failed scrape is the same as an empty scrape,
|
||||
// we still call sl.append to trigger stale markers.
|
||||
total, added, seriesAdded, appErr = sl.append(app, b, contentType, start)
|
||||
total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
|
||||
if appErr != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender(sl.parentCtx)
|
||||
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
|
||||
// The append failed, probably due to a parse error or sample limit.
|
||||
// Call sl.append again with an empty scrape to trigger stale markers.
|
||||
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
|
||||
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
|
||||
app.Rollback()
|
||||
app = sl.appender(sl.parentCtx)
|
||||
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
|
||||
|
|
Loading…
Reference in New Issue