From d532272520610c0e8fcb6ae83c90411f9873b08b Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 11 May 2017 14:43:43 +0100 Subject: [PATCH] Add stalemarkers to synthetic series too when target stops. --- retrieval/helpers_test.go | 2 +- retrieval/scrape.go | 50 ++++++++++++++++++++++++++++++++++----- retrieval/scrape_test.go | 17 ++++++++++++- 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 32f5cb68f..5ce761c8c 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -38,7 +38,7 @@ type collectResultAppender struct { func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error { // Not implemented. - return nil + return storage.ErrNotFound } func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) { diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 6d097d55c..3e5e41db3 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -566,6 +566,9 @@ mainLoop: if _, _, err := sl.append([]byte{}, staleTime); err != nil { log.With("err", err).Error("stale append failed") } + if err := sl.reportStale(staleTime); err != nil { + log.With("err", err).Error("stale report failed") + } } // Stop the scraping. May still write data and stale markers after it has @@ -738,13 +741,45 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a return app.Commit() } +func (sl *scrapeLoop) reportStale(start time.Time) error { + ts := timestamp.FromTime(start) + app := sl.reportAppender() + stale := math.Float64frombits(value.StaleNaN) + + if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { + app.Rollback() + return err + } + if err := sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil { + app.Rollback() + return err + } + if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil { + app.Rollback() + return err + } + if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil { + app.Rollback() + return err + } + return app.Commit() +} + func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { ref, ok := sl.refCache[s] if ok { - if err := app.AddFast(ref, t, v); err == nil { + err := app.AddFast(ref, t, v) + switch err { + case nil: return nil - } else if err != storage.ErrNotFound { + case storage.ErrNotFound: + // Try an Add. + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not log here, as this is expected if a target goes away and comes back + // again with a new scrape loop. + return nil + default: return err } } @@ -752,10 +787,13 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v labels.Label{Name: labels.MetricName, Value: s}, } ref, err := app.Add(met, t, v) - if err != nil { + switch err { + case nil: + sl.refCache[s] = ref + return nil + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + return nil + default: return err } - sl.refCache[s] = ref - - return nil } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 2477eaf25..878995f57 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -357,12 +357,13 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) { func TestScrapeLoopStop(t *testing.T) { appender := &collectResultAppender{} + reportAppender := &collectResultAppender{} var ( signal = make(chan struct{}) scraper = &testScraper{} app = func() storage.Appender { return appender } - reportApp = func() storage.Appender { return &nopAppender{} } + reportApp = func() storage.Appender { return reportAppender } numScrapes = 0 ) defer close(signal) @@ -398,6 +399,20 @@ func TestScrapeLoopStop(t *testing.T) { if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) { t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)].v)) } + + if len(reportAppender.result) < 8 { + t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 8, len(reportAppender.result)) + } + if len(reportAppender.result)%4 != 0 { + t.Fatalf("Appended samples not as expected. Wanted: samples mod 4 == 0 Got: %d samples", len(reportAppender.result)) + } + if !value.IsStaleNaN(reportAppender.result[len(reportAppender.result)-1].v) { + t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(reportAppender.result[len(reportAppender.result)].v)) + } + + if reportAppender.result[len(reportAppender.result)-1].t != appender.result[len(appender.result)-1].t { + t.Fatalf("Expected last append and report sample to have same timestamp. Append: stale NaN Report: %x", appender.result[len(appender.result)-1].t, reportAppender.result[len(reportAppender.result)-1].t) + } } func TestScrapeLoopRun(t *testing.T) {