Extract initial FP candidate build logic into candidateFPsForLabelMatchers method

No functional changes otherwise
This commit is contained in:
Maxim Ivanov 2016-09-18 11:37:17 +01:00
parent c048a0cde8
commit 4978a65495
1 changed files with 31 additions and 19 deletions

View File

@ -563,43 +563,43 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers(
return metrics, nil return metrics, nil
} }
func (s *MemorySeriesStorage) metricsForLabelMatchers( // returns candidate FPs for given matchers and remaining matchers to be checked
from, through model.Time, func (s *MemorySeriesStorage) candidateFPsForLabelMatchers(
matchers ...*metric.LabelMatcher, matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]metric.Metric, error) { ) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) {
sort.Sort(metric.LabelMatchers(matchers)) sort.Sort(metric.LabelMatchers(matchers))
if len(matchers) == 0 || matchers[0].MatchesEmptyString() { if len(matchers) == 0 || matchers[0].MatchesEmptyString() {
// No matchers at all or even the best matcher matches the empty string. // No matchers at all or even the best matcher matches the empty string.
return nil, nil return nil, nil, nil
} }
var ( var (
matcherIdx int matcherIdx int
remainingFPs map[model.Fingerprint]struct{} candidateFPs map[model.Fingerprint]struct{}
) )
// Equal matchers. // Equal matchers.
for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpEqualMatchThreshold); matcherIdx++ { for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpEqualMatchThreshold); matcherIdx++ {
m := matchers[matcherIdx] m := matchers[matcherIdx]
if m.Type != metric.Equal || m.MatchesEmptyString() { if m.Type != metric.Equal || m.MatchesEmptyString() {
break break
} }
remainingFPs = s.fingerprintsForLabelPair( candidateFPs = s.fingerprintsForLabelPair(
model.LabelPair{ model.LabelPair{
Name: m.Name, Name: m.Name,
Value: m.Value, Value: m.Value,
}, },
nil, nil,
remainingFPs, candidateFPs,
) )
if len(remainingFPs) == 0 { if len(candidateFPs) == 0 {
return nil, nil return nil, nil, nil
} }
} }
// Other matchers. // Other matchers.
for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpOtherMatchThreshold); matcherIdx++ { for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpOtherMatchThreshold); matcherIdx++ {
m := matchers[matcherIdx] m := matchers[matcherIdx]
if m.MatchesEmptyString() { if m.MatchesEmptyString() {
break break
@ -607,11 +607,11 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers(
lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name) lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
lvs = m.Filter(lvs) lvs = m.Filter(lvs)
if len(lvs) == 0 { if len(lvs) == 0 {
return nil, nil return nil, nil, nil
} }
fps := map[model.Fingerprint]struct{}{} fps := map[model.Fingerprint]struct{}{}
for _, lv := range lvs { for _, lv := range lvs {
@ -621,18 +621,30 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers(
Value: lv, Value: lv,
}, },
fps, fps,
remainingFPs, candidateFPs,
) )
} }
remainingFPs = fps candidateFPs = fps
if len(remainingFPs) == 0 { if len(candidateFPs) == 0 {
return nil, nil return nil, nil, nil
} }
} }
return candidateFPs, matchers[matcherIdx:], nil
}
func (s *MemorySeriesStorage) metricsForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]metric.Metric, error) {
candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
if err != nil {
return nil, err
}
result := map[model.Fingerprint]metric.Metric{} result := map[model.Fingerprint]metric.Metric{}
FP_LOOP: FP_LOOP:
for fp := range remainingFPs { for fp := range candidateFPs {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
met, _, ok := s.metricForRange(fp, from, through) met, _, ok := s.metricForRange(fp, from, through)
s.fpLocker.Unlock(fp) s.fpLocker.Unlock(fp)
@ -641,7 +653,7 @@ FP_LOOP:
continue FP_LOOP continue FP_LOOP
} }
for _, m := range matchers[matcherIdx:] { for _, m := range matchersToCheck {
if !m.Match(met[m.Name]) { if !m.Match(met[m.Name]) {
continue FP_LOOP continue FP_LOOP
} }