diff --git a/storage/local/storage.go b/storage/local/storage.go index 93367fe6d..78eb557c7 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -563,43 +563,43 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers( return metrics, nil } -func (s *MemorySeriesStorage) metricsForLabelMatchers( - from, through model.Time, +// returns candidate FPs for given matchers and remaining matchers to be checked +func (s *MemorySeriesStorage) candidateFPsForLabelMatchers( matchers ...*metric.LabelMatcher, -) (map[model.Fingerprint]metric.Metric, error) { +) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) { sort.Sort(metric.LabelMatchers(matchers)) if len(matchers) == 0 || matchers[0].MatchesEmptyString() { // No matchers at all or even the best matcher matches the empty string. - return nil, nil + return nil, nil, nil } var ( matcherIdx int - remainingFPs map[model.Fingerprint]struct{} + candidateFPs map[model.Fingerprint]struct{} ) // Equal matchers. - for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpEqualMatchThreshold); matcherIdx++ { + for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpEqualMatchThreshold); matcherIdx++ { m := matchers[matcherIdx] if m.Type != metric.Equal || m.MatchesEmptyString() { break } - remainingFPs = s.fingerprintsForLabelPair( + candidateFPs = s.fingerprintsForLabelPair( model.LabelPair{ Name: m.Name, Value: m.Value, }, nil, - remainingFPs, + candidateFPs, ) - if len(remainingFPs) == 0 { - return nil, nil + if len(candidateFPs) == 0 { + return nil, nil, nil } } // Other matchers. - for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpOtherMatchThreshold); matcherIdx++ { + for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpOtherMatchThreshold); matcherIdx++ { m := matchers[matcherIdx] if m.MatchesEmptyString() { break @@ -607,11 +607,11 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name) if err != nil { - return nil, err + return nil, nil, err } lvs = m.Filter(lvs) if len(lvs) == 0 { - return nil, nil + return nil, nil, nil } fps := map[model.Fingerprint]struct{}{} for _, lv := range lvs { @@ -621,18 +621,30 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( Value: lv, }, fps, - remainingFPs, + candidateFPs, ) } - remainingFPs = fps - if len(remainingFPs) == 0 { - return nil, nil + candidateFPs = fps + if len(candidateFPs) == 0 { + return nil, nil, nil } } + return candidateFPs, matchers[matcherIdx:], nil +} + +func (s *MemorySeriesStorage) metricsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) (map[model.Fingerprint]metric.Metric, error) { + + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } result := map[model.Fingerprint]metric.Metric{} FP_LOOP: - for fp := range remainingFPs { + for fp := range candidateFPs { s.fpLocker.Lock(fp) met, _, ok := s.metricForRange(fp, from, through) s.fpLocker.Unlock(fp) @@ -641,7 +653,7 @@ FP_LOOP: continue FP_LOOP } - for _, m := range matchers[matcherIdx:] { + for _, m := range matchersToCheck { if !m.Match(met[m.Name]) { continue FP_LOOP }