diff --git a/storage/local/interface.go b/storage/local/interface.go index 54885b0e2..0cb7e48b7 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -44,6 +44,8 @@ type Storage interface { // Get the metric associated with the provided fingerprint. MetricForFingerprint(clientmodel.Fingerprint) clientmodel.COWMetric // Construct an iterator for a given fingerprint. + // The iterator will never return samples older than retention time, + // relative to the time NewIterator was called. NewIterator(clientmodel.Fingerprint) SeriesIterator // Run the various maintenance loops in goroutines. Returns when the // storage is ready to use. Keeps everything running in the background diff --git a/storage/local/storage.go b/storage/local/storage.go index 699076006..12324e081 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -286,7 +286,47 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter // return any values. return nopSeriesIterator{} } - return series.newIterator() + return &boundedIterator{ + it: series.newIterator(), + start: clientmodel.Now().Add(-s.dropAfter), + } +} + +// boundedIterator wraps a SeriesIterator and does not allow fetching +// data from earlier than the configured start time. +type boundedIterator struct { + it SeriesIterator + start clientmodel.Timestamp +} + +// ValueAtTime implements the SeriesIterator interface. +func (bit *boundedIterator) ValueAtTime(ts clientmodel.Timestamp) metric.Values { + if ts < bit.start { + return metric.Values{} + } + return bit.it.ValueAtTime(ts) +} + +// BoundaryValues implements the SeriesIterator interface. +func (bit *boundedIterator) BoundaryValues(interval metric.Interval) metric.Values { + if interval.NewestInclusive < bit.start { + return metric.Values{} + } + if interval.OldestInclusive < bit.start { + interval.OldestInclusive = bit.start + } + return bit.it.BoundaryValues(interval) +} + +// RangeValues implements the SeriesIterator interface. +func (bit *boundedIterator) RangeValues(interval metric.Interval) metric.Values { + if interval.NewestInclusive < bit.start { + return metric.Values{} + } + if interval.OldestInclusive < bit.start { + interval.OldestInclusive = bit.start + } + return bit.it.RangeValues(interval) } // NewPreloader implements Storage. diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 8b43ae2a1..0b43aa1eb 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -140,6 +140,70 @@ func TestFingerprintsForLabelMatchers(t *testing.T) { } } +func TestRetentionCutoff(t *testing.T) { + now := clientmodel.Now() + insertStart := now.Add(-2 * time.Hour) + + s, closer := NewTestStorage(t, 1) + defer closer.Close() + + // Stop maintenance loop to prevent actual purging. + s.loopStopping <- struct{}{} + + s.dropAfter = 1 * time.Hour + + samples := make(clientmodel.Samples, 120) + for i := range samples { + smpl := &clientmodel.Sample{ + Metric: clientmodel.Metric{"job": "test"}, + Timestamp: insertStart.Add(time.Duration(i) * time.Minute), // 1 minute intervals. + Value: 1, + } + s.Append(smpl) + } + s.WaitForIndexing() + + lm, err := metric.NewLabelMatcher(metric.Equal, "job", "test") + if err != nil { + t.Fatalf("error creating label matcher: %s", err) + } + fp := s.FingerprintsForLabelMatchers(metric.LabelMatchers{lm})[0] + + pl := s.NewPreloader() + defer pl.Close() + + // Preload everything. + err = pl.PreloadRange(fp, insertStart, now, 5*time.Minute) + if err != nil { + t.Fatalf("Error preloading outdated chunks: %s", err) + } + + it := s.NewIterator(fp) + + vals := it.ValueAtTime(now.Add(-61 * time.Minute)) + if len(vals) != 0 { + t.Errorf("unexpected result for timestamp before retention period") + } + + vals = it.RangeValues(metric.Interval{insertStart, now}) + // We get 59 values here because the clientmodel.Now() is slightly later + // than our now. + if len(vals) != 59 { + t.Errorf("expected 59 values but got %d", len(vals)) + } + if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt { + t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) + } + + vals = it.BoundaryValues(metric.Interval{insertStart, now}) + if len(vals) != 2 { + t.Errorf("expected 2 values but got %d", len(vals)) + } + if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt { + t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) + } +} + // TestLoop is just a smoke test for the loop method, if we can switch it on and // off without disaster. func TestLoop(t *testing.T) {