diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 3ef064c23..f6850d2ee 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -113,7 +113,8 @@ type scrapePool struct { // Constructor for new scrape loops. This is settable for testing convenience. newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop - logger log.Logger + logger log.Logger + maxAheadTime time.Duration } func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool { @@ -133,14 +134,15 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable } return &scrapePool{ - appendable: app, - config: cfg, - ctx: ctx, - client: client, - targets: map[uint64]*Target{}, - loops: map[uint64]loop{}, - newLoop: newLoop, - logger: logger, + appendable: app, + config: cfg, + ctx: ctx, + client: client, + targets: map[uint64]*Target{}, + loops: map[uint64]loop{}, + newLoop: newLoop, + logger: logger, + maxAheadTime: 10 * time.Minute, } } @@ -310,6 +312,13 @@ func (sp *scrapePool) sampleAppender(target *Target) storage.Appender { panic(err) } + if sp.maxAheadTime > 0 { + app = &timeLimitAppender{ + Appender: app, + maxTime: timestamp.FromTime(time.Now().Add(sp.maxAheadTime)), + } + } + // The limit is applied after metrics are potentially dropped via relabeling. if sp.config.SampleLimit > 0 { app = &limitAppender{ @@ -810,6 +819,7 @@ loop: sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") continue case storage.ErrOutOfBounds: + err = nil numOutOfBounds++ sl.l.With("timeseries", string(met)).Debug("Out of bounds metric") continue diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index adcd37fe8..7ef8f6a81 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -273,6 +273,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { app := &nopAppendable{} sp := newScrapePool(context.Background(), cfg, app, log.Base()) + sp.maxAheadTime = 0 cfg.HonorLabels = false wrapped := sp.sampleAppender(target) @@ -872,19 +873,23 @@ type errorAppender struct { } func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { - if lset.Get(model.MetricNameLabel) == "out_of_order" { + switch lset.Get(model.MetricNameLabel) { + case "out_of_order": return "", storage.ErrOutOfOrderSample - } else if lset.Get(model.MetricNameLabel) == "amend" { + case "amend": return "", storage.ErrDuplicateSampleForTimestamp + case "out_of_bounds": + return "", storage.ErrOutOfBounds + default: + return app.collectResultAppender.Add(lset, t, v) } - return app.collectResultAppender.Add(lset, t, v) } func (app *errorAppender) AddFast(ref string, t int64, v float64) error { return app.collectResultAppender.AddFast(ref, t, v) } -func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) { +func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) { app := &errorAppender{} sl := newScrapeLoop(context.Background(), nil, func() storage.Appender { return app }, @@ -893,7 +898,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) { ) now := time.Unix(1, 0) - _, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\n"), now) + _, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), now) if err != nil { t.Fatalf("Unexpected append error: %s", err) } @@ -907,7 +912,35 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) { if !reflect.DeepEqual(want, app.result) { t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result) } +} +func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { + app := &collectResultAppender{} + sl := newScrapeLoop(context.Background(), nil, + func() storage.Appender { + return &timeLimitAppender{ + Appender: app, + maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)), + } + }, + func() storage.Appender { return nopAppender{} }, + nil, + ) + + now := time.Now().Add(20 * time.Minute) + total, added, err := sl.append([]byte("normal 1\n"), now) + if total != 1 { + t.Error("expected 1 metric") + return + } + + if added != 0 { + t.Error("no metric should be added") + } + + if err != nil { + t.Errorf("expect no error, got %s", err.Error()) + } } func TestTargetScraperScrapeOK(t *testing.T) { diff --git a/retrieval/target.go b/retrieval/target.go index 8875a5c85..2949074a6 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -225,6 +225,34 @@ func (app *limitAppender) AddFast(ref string, t int64, v float64) error { return nil } +type timeLimitAppender struct { + storage.Appender + + maxTime int64 +} + +func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { + if t > app.maxTime { + return "", storage.ErrOutOfBounds + } + + ref, err := app.Appender.Add(lset, t, v) + if err != nil { + return "", err + } + return ref, nil +} + +func (app *timeLimitAppender) AddFast(ref string, t int64, v float64) error { + if t > app.maxTime { + return storage.ErrOutOfBounds + } + if err := app.Appender.AddFast(ref, t, v); err != nil { + return err + } + return nil +} + // Merges the ingested sample's metric with the label set. On a collision the // value of the ingested label is stored in a label prefixed with 'exported_'. type ruleLabelsAppender struct {