diff --git a/storage/local/storage.go b/storage/local/storage.go index 734b97ad3..740dede3c 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -500,9 +500,11 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers( result := map[model.Fingerprint]metric.Metric{} for fp := range resFPs { - if metric, ok := s.metricForFingerprint(fp, from, through); ok { - result[fp] = metric + s.fpLocker.Lock(fp) + if met, _, ok := s.metricForRange(fp, from, through); ok { + result[fp] = metric.Metric{Metric: met} } + s.fpLocker.Unlock(fp) } for _, matcher := range filters { for fp, met := range result { @@ -514,50 +516,46 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers( return result } -// metricForFingerprint returns the metric for the given fingerprint if the -// corresponding time series has samples between 'from' and 'through'. -func (s *memorySeriesStorage) metricForFingerprint( +// metricForRange returns the metric for the given fingerprint if the +// corresponding time series has samples between 'from' and 'through', together +// with a pointer to the series if it is in memory already. For a series that +// does not have samples between 'from' and 'through', the returned bool is +// false. For an archived series that does contain samples between 'from' and +// 'through', it returns (metric, nil, true). +// +// The caller must have locked the fp. +func (s *memorySeriesStorage) metricForRange( fp model.Fingerprint, from, through model.Time, -) (metric.Metric, bool) { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - +) (model.Metric, *memorySeries, bool) { series, ok := s.fpToSeries.get(fp) if ok { if series.lastTime.Before(from) || series.firstTime().After(through) { - return metric.Metric{}, false + return nil, nil, false } - // Wrap the returned metric in a copy-on-write (COW) metric here because - // the caller might mutate it. - return metric.Metric{ - Metric: series.metric, - }, true + return series.metric, series, true } // From here on, we are only concerned with archived metrics. // If the high watermark of archived series is before 'from', we are done. watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) if watermark < from { - return metric.Metric{}, false + return nil, nil, false } if from.After(model.Earliest) || through.Before(model.Latest) { // The range lookup is relatively cheap, so let's do it first if // we have a chance the archived metric is not in the range. has, first, last := s.persistence.hasArchivedMetric(fp) if !has || first.After(through) || last.Before(from) { - return metric.Metric{}, false + return nil, nil, false } } - met, _ := s.persistence.archivedMetric(fp) // Ignoring error, there is nothing we can do. - if met == nil { - return metric.Metric{}, false + metric, err := s.persistence.archivedMetric(fp) + if err != nil { + // archivedMetric has already flagged the storage as dirty in this case. + return nil, nil, false } - - return metric.Metric{ - Metric: met, - Copied: false, - }, true + return metric, nil, true } // LabelValuesForLabelName implements Storage. @@ -723,43 +721,20 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return series, nil } -// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. // // The caller must have locked the fp. -func (s *memorySeriesStorage) getSeriesForRange( +func (s *memorySeriesStorage) seriesForRange( fp model.Fingerprint, from model.Time, through model.Time, ) *memorySeries { - series, ok := s.fpToSeries.get(fp) - if ok { - if series.lastTime.Before(from) || series.firstTime().After(through) { - return nil - } - return series - } - - watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) - if watermark < from { + metric, series, ok := s.metricForRange(fp, from, through) + if !ok { return nil } - - has, first, last := s.persistence.hasArchivedMetric(fp) - if !has { - s.invalidPreloadRequestsCount.Inc() - return nil - } - if last.Before(from) || first.After(through) { - return nil - } - metric, err := s.persistence.archivedMetric(fp) - if err != nil { - // Error already logged, storage declared dirty by archivedMetric. - return nil - } - series, err = s.getOrCreateSeries(fp, metric) - if err != nil { - // getOrCreateSeries took care of quarantining already. - return nil + if series == nil { + series, _ = s.getOrCreateSeries(fp, metric) + // getOrCreateSeries took care of quarantining already, so ignore the error. } return series } @@ -771,7 +746,7 @@ func (s *memorySeriesStorage) preloadChunksForRange( s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series := s.getSeriesForRange(fp, from, through) + series := s.seriesForRange(fp, from, through) if series == nil { return nil, nopIter } @@ -790,7 +765,7 @@ func (s *memorySeriesStorage) preloadChunksForInstant( s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series := s.getSeriesForRange(fp, from, through) + series := s.seriesForRange(fp, from, through) if series == nil { return nil, nopIter } diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 4e914b724..1dedf518e 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -21,6 +21,7 @@ package local import ( "time" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/util/testutil" ) @@ -51,6 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, SyncStrategy: Adaptive, } storage := NewMemorySeriesStorage(o) + storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest if err := storage.Start(); err != nil { directory.Close() t.Fatalf("Error creating storage: %s", err)