diff --git a/storage/local/storage.go b/storage/local/storage.go index 38467a805..1288389e0 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -410,19 +410,19 @@ func (s *MemorySeriesStorage) WaitForIndexing() { // LastSampleForLabelMatchers implements Storage. func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { - fps := map[model.Fingerprint]struct{}{} + mergedFPs := map[model.Fingerprint]struct{}{} for _, matchers := range matcherSets { - fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...) + fps, err := s.fpsForLabelMatchers(cutoff, model.Latest, matchers...) if err != nil { return nil, err } - for fp := range fpToMetric { - fps[fp] = struct{}{} + for fp := range fps { + mergedFPs[fp] = struct{}{} } } - res := make(model.Vector, 0, len(fps)) - for fp := range fps { + res := make(model.Vector, 0, len(mergedFPs)) + for fp := range mergedFPs { s.fpLocker.Lock(fp) series, ok := s.fpToSeries.get(fp) @@ -480,13 +480,13 @@ func (bit *boundedIterator) Close() { // QueryRange implements Storage. func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { - fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) + fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...) if err != nil { return nil, err } - iterators := make([]SeriesIterator, 0, len(fpToMetric)) - for fp := range fpToMetric { - it := s.preloadChunksForRange(fp, from, through) + iterators := make([]SeriesIterator, 0, len(fpSeriesPairs)) + for _, pair := range fpSeriesPairs { + it := s.preloadChunksForRange(pair, from, through) iterators = append(iterators, it) } return iterators, nil @@ -497,13 +497,13 @@ func (s *MemorySeriesStorage) QueryInstant(_ context.Context, ts model.Time, sta from := ts.Add(-stalenessDelta) through := ts - fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) + fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...) if err != nil { return nil, err } - iterators := make([]SeriesIterator, 0, len(fpToMetric)) - for fp := range fpToMetric { - it := s.preloadChunksForInstant(fp, from, through) + iterators := make([]SeriesIterator, 0, len(fpSeriesPairs)) + for _, pair := range fpSeriesPairs { + it := s.preloadChunksForInstant(pair, from, through) iterators = append(iterators, it) } return iterators, nil @@ -558,43 +558,43 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers( return metrics, nil } -func (s *MemorySeriesStorage) metricsForLabelMatchers( - from, through model.Time, +// candidateFPsForLabelMatchers returns candidate FPs for given matchers and remaining matchers to be checked. +func (s *MemorySeriesStorage) candidateFPsForLabelMatchers( matchers ...*metric.LabelMatcher, -) (map[model.Fingerprint]metric.Metric, error) { +) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) { sort.Sort(metric.LabelMatchers(matchers)) if len(matchers) == 0 || matchers[0].MatchesEmptyString() { // No matchers at all or even the best matcher matches the empty string. - return nil, nil + return nil, nil, nil } var ( matcherIdx int - remainingFPs map[model.Fingerprint]struct{} + candidateFPs map[model.Fingerprint]struct{} ) // Equal matchers. - for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpEqualMatchThreshold); matcherIdx++ { + for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpEqualMatchThreshold); matcherIdx++ { m := matchers[matcherIdx] if m.Type != metric.Equal || m.MatchesEmptyString() { break } - remainingFPs = s.fingerprintsForLabelPair( + candidateFPs = s.fingerprintsForLabelPair( model.LabelPair{ Name: m.Name, Value: m.Value, }, nil, - remainingFPs, + candidateFPs, ) - if len(remainingFPs) == 0 { - return nil, nil + if len(candidateFPs) == 0 { + return nil, nil, nil } } // Other matchers. - for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpOtherMatchThreshold); matcherIdx++ { + for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpOtherMatchThreshold); matcherIdx++ { m := matchers[matcherIdx] if m.MatchesEmptyString() { break @@ -602,11 +602,11 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name) if err != nil { - return nil, err + return nil, nil, err } lvs = m.Filter(lvs) if len(lvs) == 0 { - return nil, nil + return nil, nil, nil } fps := map[model.Fingerprint]struct{}{} for _, lv := range lvs { @@ -616,29 +616,104 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( Value: lv, }, fps, - remainingFPs, + candidateFPs, ) } - remainingFPs = fps - if len(remainingFPs) == 0 { - return nil, nil + candidateFPs = fps + if len(candidateFPs) == 0 { + return nil, nil, nil } } + return candidateFPs, matchers[matcherIdx:], nil +} + +func (s *MemorySeriesStorage) seriesForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) ([]fingerprintSeriesPair, error) { + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } + + result := []fingerprintSeriesPair{} +FPLoop: + for fp := range candidateFPs { + s.fpLocker.Lock(fp) + series := s.seriesForRange(fp, from, through) + s.fpLocker.Unlock(fp) + + if series == nil { + continue FPLoop + } + + for _, m := range matchersToCheck { + if !m.Match(series.metric[m.Name]) { + continue FPLoop + } + } + result = append(result, fingerprintSeriesPair{fp, series}) + } + return result, nil +} + +func (s *MemorySeriesStorage) fpsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) (map[model.Fingerprint]struct{}, error) { + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } + +FPLoop: + for fp := range candidateFPs { + s.fpLocker.Lock(fp) + met, _, ok := s.metricForRange(fp, from, through) + s.fpLocker.Unlock(fp) + + if !ok { + delete(candidateFPs, fp) + continue FPLoop + } + + for _, m := range matchersToCheck { + if !m.Match(met[m.Name]) { + delete(candidateFPs, fp) + continue FPLoop + } + } + } + return candidateFPs, nil +} + +func (s *MemorySeriesStorage) metricsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) (map[model.Fingerprint]metric.Metric, error) { + + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } result := map[model.Fingerprint]metric.Metric{} - for fp := range remainingFPs { +FPLoop: + for fp := range candidateFPs { s.fpLocker.Lock(fp) - if met, _, ok := s.metricForRange(fp, from, through); ok { - result[fp] = metric.Metric{Metric: met} - } + met, _, ok := s.metricForRange(fp, from, through) s.fpLocker.Unlock(fp) - } - for _, m := range matchers[matcherIdx:] { - for fp, met := range result { - if !m.Match(met.Metric[m.Name]) { - delete(result, fp) + + if !ok { + continue FPLoop + } + + for _, m := range matchersToCheck { + if !m.Match(met[m.Name]) { + continue FPLoop } } + result[fp] = metric.Metric{Metric: met} } return result, nil } @@ -696,14 +771,14 @@ func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelNa // DropMetricsForLabelMatchers implements Storage. func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) { - fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...) + fps, err := s.fpsForLabelMatchers(model.Earliest, model.Latest, matchers...) if err != nil { return 0, err } - for fp := range fpToMetric { + for fp := range fps { s.purgeSeries(fp, nil, nil) } - return len(fpToMetric), nil + return len(fps), nil } var ( @@ -864,7 +939,7 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return series, nil } -// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// seriesForRange is a helper method for seriesForLabelMatchers. // // The caller must have locked the fp. func (s *MemorySeriesStorage) seriesForRange( @@ -883,16 +958,17 @@ func (s *MemorySeriesStorage) seriesForRange( } func (s *MemorySeriesStorage) preloadChunksForRange( - fp model.Fingerprint, + pair fingerprintSeriesPair, from model.Time, through model.Time, ) SeriesIterator { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series := s.seriesForRange(fp, from, through) + fp, series := pair.fp, pair.series if series == nil { return nopIter } + + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + iter, err := series.preloadChunksForRange(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) @@ -902,16 +978,17 @@ func (s *MemorySeriesStorage) preloadChunksForRange( } func (s *MemorySeriesStorage) preloadChunksForInstant( - fp model.Fingerprint, + pair fingerprintSeriesPair, from model.Time, through model.Time, ) SeriesIterator { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series := s.seriesForRange(fp, from, through) + fp, series := pair.fp, pair.series if series == nil { return nopIter } + + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + iter, err := series.preloadChunksForInstant(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 361144f70..ce7ed2bd6 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -19,6 +19,7 @@ import ( "math" "math/rand" "os" + "strconv" "testing" "testing/quick" "time" @@ -470,6 +471,38 @@ func BenchmarkLabelMatching(b *testing.B) { b.StopTimer() } +func BenchmarkQueryRange(b *testing.B) { + now := model.Now() + insertStart := now.Add(-2 * time.Hour) + + s, closer := NewTestStorage(b, 2) + defer closer.Close() + + // Stop maintenance loop to prevent actual purging. + close(s.loopStopping) + <-s.loopStopped + <-s.logThrottlingStopped + // Recreate channel to avoid panic when we really shut down. + s.loopStopping = make(chan struct{}) + + for i := 0; i < 8192; i++ { + s.Append(&model.Sample{ + Metric: model.Metric{"__name__": model.LabelValue(strconv.Itoa(i)), "job": "test"}, + Timestamp: insertStart, + Value: 1, + }) + } + s.WaitForIndexing() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + lm, _ := metric.NewLabelMatcher(metric.Equal, "job", "test") + for pb.Next() { + s.QueryRange(context.Background(), insertStart, now, lm) + } + }) +} + func TestRetentionCutoff(t *testing.T) { now := model.Now() insertStart := now.Add(-2 * time.Hour) @@ -604,12 +637,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[0]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[1]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -637,12 +670,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[0]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[1]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -720,7 +753,7 @@ func TestQuarantineMetric(t *testing.T) { } // This will access the corrupt file and lead to quarantining. - iter := s.preloadChunksForInstant(fpToBeArchived, now.Add(-2*time.Hour-1*time.Minute), now.Add(-2*time.Hour)) + iter := s.preloadChunksForInstant(makeFingerprintSeriesPair(s, fpToBeArchived), now.Add(-2*time.Hour-1*time.Minute), now.Add(-2*time.Hour)) iter.Close() time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait. s.WaitForIndexing() @@ -862,7 +895,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunk.Encoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) // #1 Exactly on a sample. for i, expected := range samples { @@ -940,7 +973,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunk.Encoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) b.ResetTimer() @@ -1022,7 +1055,7 @@ func testRangeValues(t *testing.T, encoding chunk.Encoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) // #1 Zero length interval at sample. for i, expected := range samples { @@ -1178,7 +1211,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunk.Encoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) b.ResetTimer() @@ -1228,7 +1261,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunk.Encoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1246,7 +1279,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunk.Encoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - it = s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1410,7 +1443,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) { } // Load everything back. - it := s.preloadChunksForRange(fp, 0, 100000) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), 0, 100000) if oldLen != len(series.chunkDescs) { t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs)) @@ -1738,7 +1771,7 @@ func verifyStorageRandom(t testing.TB, s *MemorySeriesStorage, samples model.Sam for _, i := range rand.Perm(len(samples)) { sample := samples[i] fp := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) - it := s.preloadChunksForInstant(fp, sample.Timestamp, sample.Timestamp) + it := s.preloadChunksForInstant(makeFingerprintSeriesPair(s, fp), sample.Timestamp, sample.Timestamp) found := it.ValueAtOrBeforeTime(sample.Timestamp) startTime := it.(*boundedIterator).start switch { @@ -1781,7 +1814,7 @@ func verifyStorageSequential(t testing.TB, s *MemorySeriesStorage, samples model if it != nil { it.Close() } - it = s.preloadChunksForRange(fp, sample.Timestamp, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), sample.Timestamp, model.Latest) r = it.RangeValues(metric.Interval{ OldestInclusive: sample.Timestamp, NewestInclusive: model.Latest, @@ -1902,7 +1935,7 @@ func TestAppendOutOfOrder(t *testing.T) { fp := s.mapper.mapFP(m.FastFingerprint(), m) - it := s.preloadChunksForRange(fp, 0, 2) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), 0, 2) defer it.Close() want := []model.SamplePair{ diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index e28cf19ff..bdc26c555 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -66,3 +66,7 @@ func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage return storage, closer } + +func makeFingerprintSeriesPair(s *MemorySeriesStorage, fp model.Fingerprint) fingerprintSeriesPair { + return fingerprintSeriesPair{fp, s.seriesForRange(fp, model.Earliest, model.Latest)} +}