diff --git a/promql/analyzer.go b/promql/analyzer.go index bad5fbd92..7243d31db 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -79,7 +79,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error { Inspect(a.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: - n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...) + n.metrics = a.Storage.MetricsForLabelMatchers( + a.Start.Add(-n.Offset-StalenessDelta), a.End.Add(-n.Offset), + n.LabelMatchers..., + ) n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) pt := getPreloadTimes(n.Offset) @@ -95,7 +98,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error { } } case *MatrixSelector: - n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...) + n.metrics = a.Storage.MetricsForLabelMatchers( + a.Start.Add(-n.Offset-n.Range), a.End.Add(-n.Offset), + n.LabelMatchers..., + ) n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) pt := getPreloadTimes(n.Offset) diff --git a/storage/local/interface.go b/storage/local/interface.go index d9dbc4f21..26bc5325a 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -40,20 +40,22 @@ type Storage interface { // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. NewPreloader() Preloader - // MetricsForLabelMatchers returns the metrics from storage that satisfy the given - // label matchers. At least one label matcher must be specified that does not - // match the empty string. - MetricsForLabelMatchers(...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric - // LastSamplePairForFingerprint returns the last sample pair that has - // been ingested for the provided fingerprint. If this instance of the + // MetricsForLabelMatchers returns the metrics from storage that satisfy + // the given label matchers. At least one label matcher must be + // specified that does not match the empty string. The times from and + // through are hints for the storage to optimize the search. The storage + // MAY exclude metrics that have no samples in the specified interval + // from the returned map. In doubt, specify model.Earliest for from and + // model.Latest for through. + MetricsForLabelMatchers(from, through model.Time, matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric + // LastSampleForFingerprint returns the last sample that has been + // ingested for the provided fingerprint. If this instance of the // Storage has never ingested a sample for the provided fingerprint (or // the last ingestion is so long ago that the series has been archived), - // ZeroSamplePair is returned. - LastSamplePairForFingerprint(model.Fingerprint) model.SamplePair + // ZeroSample is returned. + LastSampleForFingerprint(model.Fingerprint) model.Sample // Get all of the label values that are associated with a given label name. LabelValuesForLabelName(model.LabelName) model.LabelValues - // Get the metric associated with the provided fingerprint. - MetricForFingerprint(model.Fingerprint) metric.Metric // Drop all time series associated with the given fingerprints. DropMetricsForFingerprints(...model.Fingerprint) // Run the various maintenance loops in goroutines. Returns when the @@ -89,7 +91,7 @@ type SeriesIterator interface { type Preloader interface { PreloadRange( fp model.Fingerprint, - from model.Time, through model.Time, + from, through model.Time, ) SeriesIterator PreloadInstant( fp model.Fingerprint, @@ -100,8 +102,15 @@ type Preloader interface { } // ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local -// package to signal a non-existing sample. It is a SamplePair with timestamp -// model.Earliest and value 0.0. Note that the natural zero value of SamplePair -// has a timestamp of 0, which is possible to appear in a real SamplePair and -// thus not suitable to signal a non-existing SamplePair. +// package to signal a non-existing sample pair. It is a SamplePair with +// timestamp model.Earliest and value 0.0. Note that the natural zero value of +// SamplePair has a timestamp of 0, which is possible to appear in a real +// SamplePair and thus not suitable to signal a non-existing SamplePair. var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest} + +// ZeroSample is the pseudo zero-value of model.Sample used by the local package +// to signal a non-existing sample. It is a Sample with timestamp +// model.Earliest, value 0.0, and metric nil. Note that the natural zero value +// of Sample has a timestamp of 0, which is possible to appear in a real +// Sample and thus not suitable to signal a non-existing Sample. +var ZeroSample = model.Sample{Timestamp: model.Earliest} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 124a41fb6..a89307bc3 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -1068,6 +1068,9 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model // method is goroutine-safe. func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) + if err != nil { + p.setDirty(true, err) + } return metric, err } diff --git a/storage/local/storage.go b/storage/local/storage.go index 201c2ba8b..e6527085b 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -129,12 +129,13 @@ const ( type syncStrategy func() bool type memorySeriesStorage struct { - // numChunksToPersist has to be aligned for atomic operations. - numChunksToPersist int64 // The number of chunks waiting for persistence. - maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. - rushed bool // Whether the storage is in rushed mode. - rushedMtx sync.Mutex // Protects entering and exiting rushed mode. - throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). + // archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations. + archiveHighWatermark model.Time // No archived series has samples after this time. + numChunksToPersist int64 // The number of chunks waiting for persistence. + maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. + rushed bool // Whether the storage is in rushed mode. + rushedMtx sync.Mutex // Protects entering and exiting rushed mode. + throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). fpLocker *fingerprintLocker fpToSeries *seriesMap @@ -201,6 +202,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, + archiveHighWatermark: model.Now().Add(-headChunkTimeout), maxChunksToPersist: o.MaxChunksToPersist, @@ -368,15 +370,20 @@ func (s *memorySeriesStorage) WaitForIndexing() { } // LastSampleForFingerprint implements Storage. -func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) model.SamplePair { +func (s *memorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) series, ok := s.fpToSeries.get(fp) if !ok { - return ZeroSamplePair + return ZeroSample + } + sp := series.lastSamplePair() + return model.Sample{ + Metric: series.metric, + Value: sp.Value, + Timestamp: sp.Timestamp, } - return series.lastSamplePair() } // boundedIterator wraps a SeriesIterator and does not allow fetching @@ -439,7 +446,10 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair } // MetricsForLabelMatchers implements Storage. -func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric { +func (s *memorySeriesStorage) MetricsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) map[model.Fingerprint]metric.Metric { var ( equals []model.LabelPair filters []*metric.LabelMatcher @@ -491,9 +501,11 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM filters = remaining } - result := make(map[model.Fingerprint]metric.Metric, len(resFPs)) + result := map[model.Fingerprint]metric.Metric{} for fp := range resFPs { - result[fp] = s.MetricForFingerprint(fp) + if metric, ok := s.metricForFingerprint(fp, from, through); ok { + result[fp] = metric + } } for _, matcher := range filters { for fp, met := range result { @@ -505,6 +517,58 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM return result } +// metricForFingerprint returns the metric for the given fingerprint if the +// corresponding time series has samples between 'from' and 'through'. +func (s *memorySeriesStorage) metricForFingerprint( + fp model.Fingerprint, + from, through model.Time, +) (metric.Metric, bool) { + // Lock FP so that no (un-)archiving will happen during lookup. + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + + watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) + + series, ok := s.fpToSeries.get(fp) + if ok { + if series.lastTime.Before(from) || series.savedFirstTime.After(through) { + return metric.Metric{}, false + } + // Wrap the returned metric in a copy-on-write (COW) metric here because + // the caller might mutate it. + return metric.Metric{ + Metric: series.metric, + }, true + } + // From here on, we are only concerned with archived metrics. + // If the high watermark of archived series is before 'from', we are done. + if watermark < from { + return metric.Metric{}, false + } + if from.After(model.Earliest) || through.Before(model.Latest) { + // The range lookup is relatively cheap, so let's do it first. + ok, first, last, err := s.persistence.hasArchivedMetric(fp) + if err != nil { + log.Errorf("Error retrieving archived time range for fingerprint %v: %v", fp, err) + return metric.Metric{}, false + } + if !ok || first.After(through) || last.Before(from) { + return metric.Metric{}, false + } + } + + met, err := s.persistence.archivedMetric(fp) + if err != nil { + log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) + return metric.Metric{}, false + } + + return metric.Metric{ + Metric: met, + Copied: false, + }, true +} + // LabelValuesForLabelName implements Storage. func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { lvs, err := s.persistence.labelValuesForLabelName(labelName) @@ -514,30 +578,6 @@ func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) return lvs } -// MetricForFingerprint implements Storage. -func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric.Metric { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series, ok := s.fpToSeries.get(fp) - if ok { - // Wrap the returned metric in a copy-on-write (COW) metric here because - // the caller might mutate it. - return metric.Metric{ - Metric: series.metric, - } - } - met, err := s.persistence.archivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) - } - - return metric.Metric{ - Metric: met, - Copied: false, - } -} - // DropMetric implements Storage. func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { for _, fp := range fps { @@ -1077,8 +1117,9 @@ func (s *memorySeriesStorage) maintainMemorySeries( } } - // Archive if all chunks are evicted. - if iOldestNotEvicted == -1 { + // Archive if all chunks are evicted. Also make sure the last sample has + // an age of at least headChunkTimeout (which is very likely anyway). + if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() if err := s.persistence.archiveMetric( @@ -1088,6 +1129,15 @@ func (s *memorySeriesStorage) maintainMemorySeries( return } s.seriesOps.WithLabelValues(archive).Inc() + oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) + if oldWatermark < int64(series.lastTime) { + if !atomic.CompareAndSwapInt64( + (*int64)(&s.archiveHighWatermark), + oldWatermark, int64(series.lastTime), + ) { + panic("s.archiveHighWatermark modified outside of maintainMemorySeries") + } + } return } // If we are here, the series is not archived, so check for chunkDesc diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 5fcb39c43..ab5c2a9ed 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -178,7 +178,10 @@ func TestMatches(t *testing.T) { } for _, mt := range matcherTests { - res := storage.MetricsForLabelMatchers(mt.matchers...) + res := storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, + mt.matchers..., + ) if len(mt.expected) != len(res) { t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res)) } @@ -362,7 +365,10 @@ func BenchmarkLabelMatching(b *testing.B) { for i := 0; i < b.N; i++ { benchLabelMatchingRes = map[model.Fingerprint]metric.Metric{} for _, mt := range matcherTests { - benchLabelMatchingRes = s.MetricsForLabelMatchers(mt...) + benchLabelMatchingRes = s.MetricsForLabelMatchers( + model.Earliest, model.Latest, + mt..., + ) } } // Stop timer to not count the storage closing. diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 823665f3b..6858938fb 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -226,7 +226,10 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { if err != nil { return nil, &apiError{errorBadData, err} } - for fp, met := range api.Storage.MetricsForLabelMatchers(matchers...) { + for fp, met := range api.Storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, // Get every series. + matchers..., + ) { res[fp] = met } } @@ -250,7 +253,10 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { if err != nil { return nil, &apiError{errorBadData, err} } - for fp := range api.Storage.MetricsForLabelMatchers(matchers...) { + for fp := range api.Storage.MetricsForLabelMatchers( + model.Earliest, model.Latest, // Get every series. + matchers..., + ) { fps[fp] = struct{}{} } } diff --git a/web/federate.go b/web/federate.go index d9baf676b..26f4710eb 100644 --- a/web/federate.go +++ b/web/federate.go @@ -19,7 +19,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" @@ -33,7 +32,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { req.ParseForm() - metrics := map[model.Fingerprint]metric.Metric{} + fps := map[model.Fingerprint]struct{}{} for _, s := range req.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) @@ -41,8 +40,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - for fp, met := range h.storage.MetricsForLabelMatchers(matchers...) { - metrics[fp] = met + for fp := range h.storage.MetricsForLabelMatchers( + model.Now().Add(-promql.StalenessDelta), model.Latest, + matchers..., + ) { + fps[fp] = struct{}{} } } @@ -62,19 +64,19 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { Type: dto.MetricType_UNTYPED.Enum(), } - for fp, met := range metrics { + for fp := range fps { globalUsed := map[model.LabelName]struct{}{} - sp := h.storage.LastSamplePairForFingerprint(fp) + s := h.storage.LastSampleForFingerprint(fp) // Discard if sample does not exist or lays before the staleness interval. - if sp.Timestamp.Before(minTimestamp) { + if s.Timestamp.Before(minTimestamp) { continue } // Reset label slice. protMetric.Label = protMetric.Label[:0] - for ln, lv := range met.Metric { + for ln, lv := range s.Metric { if ln == model.MetricNameLabel { protMetricFam.Name = proto.String(string(lv)) continue @@ -98,8 +100,8 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } } - protMetric.TimestampMs = proto.Int64(int64(sp.Timestamp)) - protMetric.Untyped.Value = proto.Float64(float64(sp.Value)) + protMetric.TimestampMs = proto.Int64(int64(s.Timestamp)) + protMetric.Untyped.Value = proto.Float64(float64(s.Value)) if err := enc.Encode(protMetricFam); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError)