Merge branch 'beorn7/storage3' into beorn7/storage4
Conflicts: storage/local/preload.go storage/local/storage.go storage/local/storage_test.go
This commit is contained in:
commit
d0a4477446
|
@ -548,7 +548,7 @@ func linearRegression(samples []model.SamplePair, interceptTime model.Time) (slo
|
|||
|
||||
slope = covXY / varX
|
||||
intercept = sumY/n - slope*sumX/n
|
||||
return
|
||||
return slope, intercept
|
||||
}
|
||||
|
||||
// === deriv(node model.ValMatrix) Vector ===
|
||||
|
|
|
@ -30,7 +30,7 @@ func (p *memorySeriesPreloader) PreloadRange(
|
|||
fp model.Fingerprint,
|
||||
from model.Time, through model.Time,
|
||||
) SeriesIterator {
|
||||
cds, iter := p.storage.preloadChunksForRange(fp, from, through, false)
|
||||
cds, iter := p.storage.preloadChunksForRange(fp, from, through)
|
||||
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
|
||||
return iter
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ func (p *memorySeriesPreloader) PreloadInstant(
|
|||
fp model.Fingerprint,
|
||||
timestamp model.Time, stalenessDelta time.Duration,
|
||||
) SeriesIterator {
|
||||
cds, iter := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true)
|
||||
cds, iter := p.storage.preloadChunksForInstant(fp, timestamp.Add(-stalenessDelta), timestamp)
|
||||
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
|
||||
return iter
|
||||
}
|
||||
|
|
|
@ -399,30 +399,40 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine fun
|
|||
}
|
||||
}
|
||||
|
||||
// preloadChunksForInstant preloads chunks for the latest value in the given
|
||||
// range. If the last sample saved in the memorySeries itself is the latest
|
||||
// value in the given range, it will in fact preload zero chunks and just take
|
||||
// that value.
|
||||
func (s *memorySeries) preloadChunksForInstant(
|
||||
fp model.Fingerprint,
|
||||
from model.Time, through model.Time,
|
||||
mss *memorySeriesStorage,
|
||||
) ([]*chunkDesc, SeriesIterator, error) {
|
||||
// If we have a lastSamplePair in the series, and thas last samplePair
|
||||
// is in the interval, just take it in a singleSampleSeriesIterator. No
|
||||
// need to pin or load anything.
|
||||
lastSample := s.lastSamplePair()
|
||||
if !through.Before(lastSample.Timestamp) &&
|
||||
!from.After(lastSample.Timestamp) &&
|
||||
lastSample != ZeroSamplePair {
|
||||
iter := &boundedIterator{
|
||||
it: &singleSampleSeriesIterator{samplePair: lastSample},
|
||||
start: model.Now().Add(-mss.dropAfter),
|
||||
}
|
||||
return nil, iter, nil
|
||||
}
|
||||
// If we are here, we are out of luck and have to delegate to the more
|
||||
// expensive method.
|
||||
return s.preloadChunksForRange(fp, from, through, mss)
|
||||
}
|
||||
|
||||
// preloadChunksForRange loads chunks for the given range from the persistence.
|
||||
// The caller must have locked the fingerprint of the series.
|
||||
func (s *memorySeries) preloadChunksForRange(
|
||||
fp model.Fingerprint,
|
||||
from model.Time, through model.Time,
|
||||
lastSampleOnly bool,
|
||||
mss *memorySeriesStorage,
|
||||
) ([]*chunkDesc, SeriesIterator, error) {
|
||||
// If we have to preload for only one sample, and we have a
|
||||
// lastSamplePair in the series, and thas last samplePair is in the
|
||||
// interval, just take it in a singleSampleSeriesIterator. No need to
|
||||
// pin or load anything.
|
||||
if lastSampleOnly {
|
||||
lastSample := s.lastSamplePair()
|
||||
if !through.Before(lastSample.Timestamp) &&
|
||||
!from.After(lastSample.Timestamp) &&
|
||||
lastSample != ZeroSamplePair {
|
||||
iter := &boundedIterator{
|
||||
it: &singleSampleSeriesIterator{samplePair: lastSample},
|
||||
start: model.Now().Add(-mss.dropAfter),
|
||||
}
|
||||
return nil, iter, nil
|
||||
}
|
||||
}
|
||||
firstChunkDescTime := model.Latest
|
||||
if len(s.chunkDescs) > 0 {
|
||||
firstChunkDescTime = s.chunkDescs[0].firstTime()
|
||||
|
|
|
@ -695,46 +695,76 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
|
|||
return series, nil
|
||||
}
|
||||
|
||||
// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
|
||||
func (s *memorySeriesStorage) getSeriesForRange(
|
||||
fp model.Fingerprint,
|
||||
from model.Time, through model.Time,
|
||||
) *memorySeries {
|
||||
series, ok := s.fpToSeries.get(fp)
|
||||
if ok {
|
||||
return series
|
||||
}
|
||||
has, first, last, err := s.persistence.hasArchivedMetric(fp)
|
||||
if err != nil {
|
||||
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
|
||||
return nil
|
||||
}
|
||||
if !has {
|
||||
s.invalidPreloadRequestsCount.Inc()
|
||||
return nil
|
||||
}
|
||||
if last.Before(from) || first.After(through) {
|
||||
return nil
|
||||
}
|
||||
metric, err := s.persistence.archivedMetric(fp)
|
||||
if err != nil {
|
||||
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
|
||||
return nil
|
||||
}
|
||||
series, err = s.getOrCreateSeries(fp, metric)
|
||||
if err != nil {
|
||||
// getOrCreateSeries took care of quarantining already.
|
||||
return nil
|
||||
}
|
||||
return series
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) preloadChunksForRange(
|
||||
fp model.Fingerprint,
|
||||
from model.Time, through model.Time,
|
||||
lastSampleOnly bool,
|
||||
) ([]*chunkDesc, SeriesIterator) {
|
||||
s.fpLocker.Lock(fp)
|
||||
defer s.fpLocker.Unlock(fp)
|
||||
|
||||
series, ok := s.fpToSeries.get(fp)
|
||||
if !ok {
|
||||
has, first, last, err := s.persistence.hasArchivedMetric(fp)
|
||||
if err != nil {
|
||||
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
|
||||
return nil, nopIter
|
||||
}
|
||||
if !has {
|
||||
s.invalidPreloadRequestsCount.Inc()
|
||||
return nil, nopIter
|
||||
}
|
||||
if from.Before(last) && through.After(first) {
|
||||
metric, err := s.persistence.archivedMetric(fp)
|
||||
if err != nil {
|
||||
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
|
||||
return nil, nopIter
|
||||
}
|
||||
series, err = s.getOrCreateSeries(fp, metric)
|
||||
if err != nil {
|
||||
log.With("fingerprint", fp).With("error", err).Error("Error while retrieving series.")
|
||||
return nil, nopIter
|
||||
}
|
||||
} else {
|
||||
return nil, nopIter
|
||||
}
|
||||
series := s.getSeriesForRange(fp, from, through)
|
||||
if series == nil {
|
||||
return nil, nopIter
|
||||
}
|
||||
cds, it, err := series.preloadChunksForRange(fp, from, through, lastSampleOnly, s)
|
||||
cds, iter, err := series.preloadChunksForRange(fp, from, through, s)
|
||||
if err != nil {
|
||||
s.quarantineSeries(fp, series.metric, err)
|
||||
return nil, nopIter
|
||||
}
|
||||
return cds, it
|
||||
return cds, iter
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) preloadChunksForInstant(
|
||||
fp model.Fingerprint,
|
||||
from model.Time, through model.Time,
|
||||
) ([]*chunkDesc, SeriesIterator) {
|
||||
s.fpLocker.Lock(fp)
|
||||
defer s.fpLocker.Unlock(fp)
|
||||
|
||||
series := s.getSeriesForRange(fp, from, through)
|
||||
if series == nil {
|
||||
return nil, nopIter
|
||||
}
|
||||
cds, iter, err := series.preloadChunksForInstant(fp, from, through, s)
|
||||
if err != nil {
|
||||
s.quarantineSeries(fp, series.metric, err)
|
||||
return nil, nopIter
|
||||
}
|
||||
return cds, iter
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) handleEvictList() {
|
||||
|
|
|
@ -489,12 +489,12 @@ func TestDropMetrics(t *testing.T) {
|
|||
t.Errorf("unexpected number of fingerprints: %d", len(fps2))
|
||||
}
|
||||
|
||||
_, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
|
||||
_, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
|
||||
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
|
||||
t.Errorf("unexpected number of samples: %d", len(vals))
|
||||
}
|
||||
|
||||
_, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
|
||||
_, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
|
||||
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
|
||||
t.Errorf("unexpected number of samples: %d", len(vals))
|
||||
}
|
||||
|
@ -516,12 +516,12 @@ func TestDropMetrics(t *testing.T) {
|
|||
t.Errorf("unexpected number of fingerprints: %d", len(fps3))
|
||||
}
|
||||
|
||||
_, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
|
||||
_, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
|
||||
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
|
||||
t.Errorf("unexpected number of samples: %d", len(vals))
|
||||
}
|
||||
|
||||
_, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
|
||||
_, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
|
||||
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
|
||||
t.Errorf("unexpected number of samples: %d", len(vals))
|
||||
}
|
||||
|
@ -739,7 +739,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) {
|
|||
|
||||
fp := model.Metric{}.FastFingerprint()
|
||||
|
||||
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
||||
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||
|
||||
// #1 Exactly on a sample.
|
||||
for i, expected := range samples {
|
||||
|
@ -813,7 +813,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) {
|
|||
|
||||
fp := model.Metric{}.FastFingerprint()
|
||||
|
||||
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
||||
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
|
@ -891,7 +891,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) {
|
|||
|
||||
fp := model.Metric{}.FastFingerprint()
|
||||
|
||||
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
||||
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||
|
||||
// #1 Zero length interval at sample.
|
||||
for i, expected := range samples {
|
||||
|
@ -1043,7 +1043,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
|
|||
|
||||
fp := model.Metric{}.FastFingerprint()
|
||||
|
||||
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
||||
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
|
@ -1089,7 +1089,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
|||
|
||||
// Drop ~half of the chunks.
|
||||
s.maintainMemorySeries(fp, 10000)
|
||||
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
||||
_, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||
actual := it.RangeValues(metric.Interval{
|
||||
OldestInclusive: 0,
|
||||
NewestInclusive: 100000,
|
||||
|
@ -1107,7 +1107,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
|||
|
||||
// Drop everything.
|
||||
s.maintainMemorySeries(fp, 100000)
|
||||
_, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
||||
_, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||
actual = it.RangeValues(metric.Interval{
|
||||
OldestInclusive: 0,
|
||||
NewestInclusive: 100000,
|
||||
|
|
Loading…
Reference in New Issue