Merge pull request #2005 from redbaron/microoptimise-matching

Microoptimise matching
This commit is contained in:
Björn Rabenstein 2016-10-05 17:26:56 +02:00 committed by GitHub
commit 1e2f03f668
3 changed files with 183 additions and 69 deletions

View File

@ -410,19 +410,19 @@ func (s *MemorySeriesStorage) WaitForIndexing() {
// LastSampleForLabelMatchers implements Storage. // LastSampleForLabelMatchers implements Storage.
func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
fps := map[model.Fingerprint]struct{}{} mergedFPs := map[model.Fingerprint]struct{}{}
for _, matchers := range matcherSets { for _, matchers := range matcherSets {
fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...) fps, err := s.fpsForLabelMatchers(cutoff, model.Latest, matchers...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for fp := range fpToMetric { for fp := range fps {
fps[fp] = struct{}{} mergedFPs[fp] = struct{}{}
} }
} }
res := make(model.Vector, 0, len(fps)) res := make(model.Vector, 0, len(mergedFPs))
for fp := range fps { for fp := range mergedFPs {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
@ -480,13 +480,13 @@ func (bit *boundedIterator) Close() {
// QueryRange implements Storage. // QueryRange implements Storage.
func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
iterators := make([]SeriesIterator, 0, len(fpToMetric)) iterators := make([]SeriesIterator, 0, len(fpSeriesPairs))
for fp := range fpToMetric { for _, pair := range fpSeriesPairs {
it := s.preloadChunksForRange(fp, from, through) it := s.preloadChunksForRange(pair, from, through)
iterators = append(iterators, it) iterators = append(iterators, it)
} }
return iterators, nil return iterators, nil
@ -497,13 +497,13 @@ func (s *MemorySeriesStorage) QueryInstant(_ context.Context, ts model.Time, sta
from := ts.Add(-stalenessDelta) from := ts.Add(-stalenessDelta)
through := ts through := ts
fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
iterators := make([]SeriesIterator, 0, len(fpToMetric)) iterators := make([]SeriesIterator, 0, len(fpSeriesPairs))
for fp := range fpToMetric { for _, pair := range fpSeriesPairs {
it := s.preloadChunksForInstant(fp, from, through) it := s.preloadChunksForInstant(pair, from, through)
iterators = append(iterators, it) iterators = append(iterators, it)
} }
return iterators, nil return iterators, nil
@ -558,43 +558,43 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers(
return metrics, nil return metrics, nil
} }
func (s *MemorySeriesStorage) metricsForLabelMatchers( // candidateFPsForLabelMatchers returns candidate FPs for given matchers and remaining matchers to be checked.
from, through model.Time, func (s *MemorySeriesStorage) candidateFPsForLabelMatchers(
matchers ...*metric.LabelMatcher, matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]metric.Metric, error) { ) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) {
sort.Sort(metric.LabelMatchers(matchers)) sort.Sort(metric.LabelMatchers(matchers))
if len(matchers) == 0 || matchers[0].MatchesEmptyString() { if len(matchers) == 0 || matchers[0].MatchesEmptyString() {
// No matchers at all or even the best matcher matches the empty string. // No matchers at all or even the best matcher matches the empty string.
return nil, nil return nil, nil, nil
} }
var ( var (
matcherIdx int matcherIdx int
remainingFPs map[model.Fingerprint]struct{} candidateFPs map[model.Fingerprint]struct{}
) )
// Equal matchers. // Equal matchers.
for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpEqualMatchThreshold); matcherIdx++ { for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpEqualMatchThreshold); matcherIdx++ {
m := matchers[matcherIdx] m := matchers[matcherIdx]
if m.Type != metric.Equal || m.MatchesEmptyString() { if m.Type != metric.Equal || m.MatchesEmptyString() {
break break
} }
remainingFPs = s.fingerprintsForLabelPair( candidateFPs = s.fingerprintsForLabelPair(
model.LabelPair{ model.LabelPair{
Name: m.Name, Name: m.Name,
Value: m.Value, Value: m.Value,
}, },
nil, nil,
remainingFPs, candidateFPs,
) )
if len(remainingFPs) == 0 { if len(candidateFPs) == 0 {
return nil, nil return nil, nil, nil
} }
} }
// Other matchers. // Other matchers.
for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpOtherMatchThreshold); matcherIdx++ { for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpOtherMatchThreshold); matcherIdx++ {
m := matchers[matcherIdx] m := matchers[matcherIdx]
if m.MatchesEmptyString() { if m.MatchesEmptyString() {
break break
@ -602,11 +602,11 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers(
lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name) lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
lvs = m.Filter(lvs) lvs = m.Filter(lvs)
if len(lvs) == 0 { if len(lvs) == 0 {
return nil, nil return nil, nil, nil
} }
fps := map[model.Fingerprint]struct{}{} fps := map[model.Fingerprint]struct{}{}
for _, lv := range lvs { for _, lv := range lvs {
@ -616,29 +616,104 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers(
Value: lv, Value: lv,
}, },
fps, fps,
remainingFPs, candidateFPs,
) )
} }
remainingFPs = fps candidateFPs = fps
if len(remainingFPs) == 0 { if len(candidateFPs) == 0 {
return nil, nil return nil, nil, nil
} }
} }
return candidateFPs, matchers[matcherIdx:], nil
}
func (s *MemorySeriesStorage) seriesForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) ([]fingerprintSeriesPair, error) {
candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
if err != nil {
return nil, err
}
result := []fingerprintSeriesPair{}
FPLoop:
for fp := range candidateFPs {
s.fpLocker.Lock(fp)
series := s.seriesForRange(fp, from, through)
s.fpLocker.Unlock(fp)
if series == nil {
continue FPLoop
}
for _, m := range matchersToCheck {
if !m.Match(series.metric[m.Name]) {
continue FPLoop
}
}
result = append(result, fingerprintSeriesPair{fp, series})
}
return result, nil
}
func (s *MemorySeriesStorage) fpsForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]struct{}, error) {
candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
if err != nil {
return nil, err
}
FPLoop:
for fp := range candidateFPs {
s.fpLocker.Lock(fp)
met, _, ok := s.metricForRange(fp, from, through)
s.fpLocker.Unlock(fp)
if !ok {
delete(candidateFPs, fp)
continue FPLoop
}
for _, m := range matchersToCheck {
if !m.Match(met[m.Name]) {
delete(candidateFPs, fp)
continue FPLoop
}
}
}
return candidateFPs, nil
}
func (s *MemorySeriesStorage) metricsForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]metric.Metric, error) {
candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
if err != nil {
return nil, err
}
result := map[model.Fingerprint]metric.Metric{} result := map[model.Fingerprint]metric.Metric{}
for fp := range remainingFPs { FPLoop:
for fp := range candidateFPs {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
if met, _, ok := s.metricForRange(fp, from, through); ok { met, _, ok := s.metricForRange(fp, from, through)
result[fp] = metric.Metric{Metric: met}
}
s.fpLocker.Unlock(fp) s.fpLocker.Unlock(fp)
}
for _, m := range matchers[matcherIdx:] { if !ok {
for fp, met := range result { continue FPLoop
if !m.Match(met.Metric[m.Name]) { }
delete(result, fp)
for _, m := range matchersToCheck {
if !m.Match(met[m.Name]) {
continue FPLoop
} }
} }
result[fp] = metric.Metric{Metric: met}
} }
return result, nil return result, nil
} }
@ -696,14 +771,14 @@ func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelNa
// DropMetricsForLabelMatchers implements Storage. // DropMetricsForLabelMatchers implements Storage.
func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) { func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) {
fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...) fps, err := s.fpsForLabelMatchers(model.Earliest, model.Latest, matchers...)
if err != nil { if err != nil {
return 0, err return 0, err
} }
for fp := range fpToMetric { for fp := range fps {
s.purgeSeries(fp, nil, nil) s.purgeSeries(fp, nil, nil)
} }
return len(fpToMetric), nil return len(fps), nil
} }
var ( var (
@ -864,7 +939,7 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
return series, nil return series, nil
} }
// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. // seriesForRange is a helper method for seriesForLabelMatchers.
// //
// The caller must have locked the fp. // The caller must have locked the fp.
func (s *MemorySeriesStorage) seriesForRange( func (s *MemorySeriesStorage) seriesForRange(
@ -883,16 +958,17 @@ func (s *MemorySeriesStorage) seriesForRange(
} }
func (s *MemorySeriesStorage) preloadChunksForRange( func (s *MemorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint, pair fingerprintSeriesPair,
from model.Time, through model.Time, from model.Time, through model.Time,
) SeriesIterator { ) SeriesIterator {
s.fpLocker.Lock(fp) fp, series := pair.fp, pair.series
defer s.fpLocker.Unlock(fp)
series := s.seriesForRange(fp, from, through)
if series == nil { if series == nil {
return nopIter return nopIter
} }
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
iter, err := series.preloadChunksForRange(fp, from, through, s) iter, err := series.preloadChunksForRange(fp, from, through, s)
if err != nil { if err != nil {
s.quarantineSeries(fp, series.metric, err) s.quarantineSeries(fp, series.metric, err)
@ -902,16 +978,17 @@ func (s *MemorySeriesStorage) preloadChunksForRange(
} }
func (s *MemorySeriesStorage) preloadChunksForInstant( func (s *MemorySeriesStorage) preloadChunksForInstant(
fp model.Fingerprint, pair fingerprintSeriesPair,
from model.Time, through model.Time, from model.Time, through model.Time,
) SeriesIterator { ) SeriesIterator {
s.fpLocker.Lock(fp) fp, series := pair.fp, pair.series
defer s.fpLocker.Unlock(fp)
series := s.seriesForRange(fp, from, through)
if series == nil { if series == nil {
return nopIter return nopIter
} }
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
iter, err := series.preloadChunksForInstant(fp, from, through, s) iter, err := series.preloadChunksForInstant(fp, from, through, s)
if err != nil { if err != nil {
s.quarantineSeries(fp, series.metric, err) s.quarantineSeries(fp, series.metric, err)

View File

@ -19,6 +19,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"os" "os"
"strconv"
"testing" "testing"
"testing/quick" "testing/quick"
"time" "time"
@ -470,6 +471,38 @@ func BenchmarkLabelMatching(b *testing.B) {
b.StopTimer() b.StopTimer()
} }
func BenchmarkQueryRange(b *testing.B) {
now := model.Now()
insertStart := now.Add(-2 * time.Hour)
s, closer := NewTestStorage(b, 2)
defer closer.Close()
// Stop maintenance loop to prevent actual purging.
close(s.loopStopping)
<-s.loopStopped
<-s.logThrottlingStopped
// Recreate channel to avoid panic when we really shut down.
s.loopStopping = make(chan struct{})
for i := 0; i < 8192; i++ {
s.Append(&model.Sample{
Metric: model.Metric{"__name__": model.LabelValue(strconv.Itoa(i)), "job": "test"},
Timestamp: insertStart,
Value: 1,
})
}
s.WaitForIndexing()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
lm, _ := metric.NewLabelMatcher(metric.Equal, "job", "test")
for pb.Next() {
s.QueryRange(context.Background(), insertStart, now, lm)
}
})
}
func TestRetentionCutoff(t *testing.T) { func TestRetentionCutoff(t *testing.T) {
now := model.Now() now := model.Now()
insertStart := now.Add(-2 * time.Hour) insertStart := now.Add(-2 * time.Hour)
@ -604,12 +637,12 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps2)) t.Errorf("unexpected number of fingerprints: %d", len(fps2))
} }
it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[0]), model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[1]), model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
@ -637,12 +670,12 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps3)) t.Errorf("unexpected number of fingerprints: %d", len(fps3))
} }
it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[0]), model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[1]), model.Earliest, model.Latest)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
@ -720,7 +753,7 @@ func TestQuarantineMetric(t *testing.T) {
} }
// This will access the corrupt file and lead to quarantining. // This will access the corrupt file and lead to quarantining.
iter := s.preloadChunksForInstant(fpToBeArchived, now.Add(-2*time.Hour-1*time.Minute), now.Add(-2*time.Hour)) iter := s.preloadChunksForInstant(makeFingerprintSeriesPair(s, fpToBeArchived), now.Add(-2*time.Hour-1*time.Minute), now.Add(-2*time.Hour))
iter.Close() iter.Close()
time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait. time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait.
s.WaitForIndexing() s.WaitForIndexing()
@ -862,7 +895,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunk.Encoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest)
// #1 Exactly on a sample. // #1 Exactly on a sample.
for i, expected := range samples { for i, expected := range samples {
@ -940,7 +973,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunk.Encoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest)
b.ResetTimer() b.ResetTimer()
@ -1022,7 +1055,7 @@ func testRangeValues(t *testing.T, encoding chunk.Encoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest)
// #1 Zero length interval at sample. // #1 Zero length interval at sample.
for i, expected := range samples { for i, expected := range samples {
@ -1178,7 +1211,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunk.Encoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest)
b.ResetTimer() b.ResetTimer()
@ -1228,7 +1261,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunk.Encoding) {
// Drop ~half of the chunks. // Drop ~half of the chunks.
s.maintainMemorySeries(fp, 10000) s.maintainMemorySeries(fp, 10000)
it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest)
actual := it.RangeValues(metric.Interval{ actual := it.RangeValues(metric.Interval{
OldestInclusive: 0, OldestInclusive: 0,
NewestInclusive: 100000, NewestInclusive: 100000,
@ -1246,7 +1279,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunk.Encoding) {
// Drop everything. // Drop everything.
s.maintainMemorySeries(fp, 100000) s.maintainMemorySeries(fp, 100000)
it = s.preloadChunksForRange(fp, model.Earliest, model.Latest) it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest)
actual = it.RangeValues(metric.Interval{ actual = it.RangeValues(metric.Interval{
OldestInclusive: 0, OldestInclusive: 0,
NewestInclusive: 100000, NewestInclusive: 100000,
@ -1410,7 +1443,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunk.Encoding) {
} }
// Load everything back. // Load everything back.
it := s.preloadChunksForRange(fp, 0, 100000) it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), 0, 100000)
if oldLen != len(series.chunkDescs) { if oldLen != len(series.chunkDescs) {
t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs)) t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs))
@ -1738,7 +1771,7 @@ func verifyStorageRandom(t testing.TB, s *MemorySeriesStorage, samples model.Sam
for _, i := range rand.Perm(len(samples)) { for _, i := range rand.Perm(len(samples)) {
sample := samples[i] sample := samples[i]
fp := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) fp := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric)
it := s.preloadChunksForInstant(fp, sample.Timestamp, sample.Timestamp) it := s.preloadChunksForInstant(makeFingerprintSeriesPair(s, fp), sample.Timestamp, sample.Timestamp)
found := it.ValueAtOrBeforeTime(sample.Timestamp) found := it.ValueAtOrBeforeTime(sample.Timestamp)
startTime := it.(*boundedIterator).start startTime := it.(*boundedIterator).start
switch { switch {
@ -1781,7 +1814,7 @@ func verifyStorageSequential(t testing.TB, s *MemorySeriesStorage, samples model
if it != nil { if it != nil {
it.Close() it.Close()
} }
it = s.preloadChunksForRange(fp, sample.Timestamp, model.Latest) it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), sample.Timestamp, model.Latest)
r = it.RangeValues(metric.Interval{ r = it.RangeValues(metric.Interval{
OldestInclusive: sample.Timestamp, OldestInclusive: sample.Timestamp,
NewestInclusive: model.Latest, NewestInclusive: model.Latest,
@ -1902,7 +1935,7 @@ func TestAppendOutOfOrder(t *testing.T) {
fp := s.mapper.mapFP(m.FastFingerprint(), m) fp := s.mapper.mapFP(m.FastFingerprint(), m)
it := s.preloadChunksForRange(fp, 0, 2) it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), 0, 2)
defer it.Close() defer it.Close()
want := []model.SamplePair{ want := []model.SamplePair{

View File

@ -66,3 +66,7 @@ func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage
return storage, closer return storage, closer
} }
func makeFingerprintSeriesPair(s *MemorySeriesStorage, fp model.Fingerprint) fingerprintSeriesPair {
return fingerprintSeriesPair{fp, s.seriesForRange(fp, model.Earliest, model.Latest)}
}