diff --git a/Makefile b/Makefile index 21777855d..0355958c2 100644 --- a/Makefile +++ b/Makefile @@ -89,7 +89,7 @@ $(FULL_GOPATH): -[ -d "$(FULL_GOPATH)" ] || { mkdir -vp $(FULL_GOPATH_BASE) ; ln -s "$(PWD)" "$(FULL_GOPATH)" ; } [ -d "$(FULL_GOPATH)" ] -test: build +test: config dependencies model preparation tools web $(GO) test $(GO_TEST_FLAGS) ./... tools: dependencies preparation diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 513c64e36..3fdf2a5a3 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -20,6 +20,7 @@ import ( "time" "code.google.com/p/goprotobuf/proto" + "github.com/golang/glog" clientmodel "github.com/prometheus/client_golang/model" @@ -70,9 +71,6 @@ type Curator struct { type watermarkScanner struct { // curationState is the data store for curation remarks. curationState CurationRemarker - // diskFrontier models the available seekable ranges for the provided - // sampleIterator. - diskFrontier *diskFrontier // ignoreYoungerThan is passed into the curation remark for the given series. ignoreYoungerThan time.Duration // processor is responsible for executing a given stategy on the @@ -89,6 +87,8 @@ type watermarkScanner struct { stop chan bool // status is the outbound channel for notifying the status page of its state. status CurationStateUpdater + + firstBlock, lastBlock *SampleKey } // run facilitates the curation lifecycle. @@ -119,14 +119,19 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces iterator := samples.NewIterator(true) defer iterator.Close() - diskFrontier, present, err := newDiskFrontier(iterator) - if err != nil { + if !iterator.SeekToLast() { + glog.Info("Empty database; skipping curation.") + return } - if !present { - // No sample database exists; no work to do! + lastBlock, _ := extractSampleKey(iterator) + + if !iterator.SeekToFirst() { + glog.Info("Empty database; skipping curation.") + return } + firstBlock, _ := extractSampleKey(iterator) scanner := &watermarkScanner{ curationState: curationState, @@ -136,9 +141,11 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces stop: c.Stop, stopAt: instant.Add(-1 * ignoreYoungerThan), - diskFrontier: diskFrontier, sampleIterator: iterator, samples: samples, + + firstBlock: firstBlock, + lastBlock: lastBlock, } // Right now, the ability to stop a curation is limited to the beginning of @@ -271,12 +278,11 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) { fingerprint := key.(*clientmodel.Fingerprint) - seriesFrontier, present, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator) - if err != nil || !present { - // An anomaly with the series frontier is severe in the sense that some sort - // of an illegal state condition exists in the storage layer, which would - // probably signify an illegal disk frontier. - return &storage.OperatorError{error: err, Continuable: false} + if fingerprint.Less(w.firstBlock.Fingerprint) { + return nil + } + if w.lastBlock.Fingerprint.Less(fingerprint) { + return nil } curationState, present, err := w.curationState.Get(&curationKey{ @@ -285,7 +291,6 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr ProcessorMessageTypeName: w.processor.Name(), IgnoreYoungerThan: w.ignoreYoungerThan, }) - if err != nil { // An anomaly with the curation remark is likely not fatal in the sense that // there was a decoding error with the entity and shouldn't be cause to stop @@ -293,23 +298,21 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr // work forward. With an idempotent processor, this is safe. return &storage.OperatorError{error: err, Continuable: true} } - var firstSeek time.Time - switch { - case !present, seriesFrontier.After(curationState): - firstSeek = seriesFrontier.firstSupertime - case !seriesFrontier.InSafeSeekRange(curationState): - firstSeek = seriesFrontier.lastSupertime - default: - firstSeek = curationState + + keySet := &SampleKey{ + Fingerprint: fingerprint, } - startKey := &SampleKey{ - Fingerprint: fingerprint, - FirstTimestamp: firstSeek, + if !present && fingerprint.Equal(w.firstBlock.Fingerprint) { + // If the fingerprint is the same, then we simply need to use the earliest + // block found in the database. + *keySet = *w.firstBlock + } else if present { + keySet.FirstTimestamp = curationState } + dto := new(dto.SampleKey) - - startKey.Dump(dto) + keySet.Dump(dto) prospectiveKey := coding.NewPBEncoder(dto).MustEncode() if !w.sampleIterator.Seek(prospectiveKey) { // LevelDB is picky about the seek ranges. If an iterator was invalidated, @@ -317,25 +320,41 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr return &storage.OperatorError{error: fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), Continuable: false} } - newestAllowedSample := w.stopAt - if !newestAllowedSample.Before(seriesFrontier.lastSupertime) { - newestAllowedSample = seriesFrontier.lastSupertime + for { + sampleKey, err := extractSampleKey(w.sampleIterator) + if err != nil { + return + } + if !sampleKey.Fingerprint.Equal(fingerprint) { + return + } + + if !present { + break + } + + if !(sampleKey.FirstTimestamp.Before(curationState) && sampleKey.LastTimestamp.Before(curationState)) { + break + } + + if !w.sampleIterator.Next() { + return + } } - lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, newestAllowedSample, fingerprint) + lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, w.stopAt, fingerprint) if err != nil { // We can't divine the severity of a processor error without refactoring the // interface. return &storage.OperatorError{error: err, Continuable: false} } - err = w.curationState.Update(&curationKey{ + if err = w.curationState.Update(&curationKey{ Fingerprint: fingerprint, ProcessorMessageRaw: w.processor.Signature(), ProcessorMessageTypeName: w.processor.Name(), IgnoreYoungerThan: w.ignoreYoungerThan, - }, lastTime) - if err != nil { + }, lastTime); err != nil { // Under the assumption that the processors are idempotent, they can be // re-run; thusly, the commitment of the curation remark is no cause // to cease further progress. diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go deleted file mode 100644 index b61520ab2..000000000 --- a/storage/metric/frontier.go +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric - -import ( - "fmt" - "time" - - clientmodel "github.com/prometheus/client_golang/model" - - "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/coding/indexable" - "github.com/prometheus/prometheus/storage/raw/leveldb" - - dto "github.com/prometheus/prometheus/model/generated" -) - -// diskFrontier describes an on-disk store of series to provide a -// representation of the known keyspace and time series values available. -// -// This is used to reduce the burden associated with LevelDB iterator -// management. -type diskFrontier struct { - firstFingerprint *clientmodel.Fingerprint - firstSupertime time.Time - lastFingerprint *clientmodel.Fingerprint - lastSupertime time.Time -} - -func (f diskFrontier) String() string { - return fmt.Sprintf("diskFrontier from %s at %s to %s at %s", f.firstFingerprint, f.firstSupertime, f.lastFingerprint, f.lastSupertime) -} - -func (f *diskFrontier) ContainsFingerprint(fingerprint *clientmodel.Fingerprint) bool { - return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint)) -} - -func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, present bool, err error) { - if !i.SeekToLast() || i.Key() == nil { - return nil, false, nil - } - lastKey, err := extractSampleKey(i) - if err != nil { - return nil, false, err - } - - if !i.SeekToFirst() || i.Key() == nil { - return nil, false, nil - } - firstKey, err := extractSampleKey(i) - if err != nil { - return nil, false, err - } - - return &diskFrontier{ - firstFingerprint: firstKey.Fingerprint, - firstSupertime: firstKey.FirstTimestamp, - lastFingerprint: lastKey.Fingerprint, - lastSupertime: lastKey.FirstTimestamp, - }, true, nil -} - -// seriesFrontier represents the valid seek frontier for a given series. -type seriesFrontier struct { - firstSupertime time.Time - lastSupertime time.Time - lastTime time.Time -} - -func (f *seriesFrontier) String() string { - return fmt.Sprintf("seriesFrontier from %s to %s at %s", f.firstSupertime, f.lastSupertime, f.lastTime) -} - -// newSeriesFrontier furnishes a populated diskFrontier for a given -// fingerprint. If the series is absent, present will be false. -func newSeriesFrontier(f *clientmodel.Fingerprint, d *diskFrontier, i leveldb.Iterator) (s *seriesFrontier, present bool, err error) { - lowerSeek := firstSupertime - upperSeek := lastSupertime - - // If the diskFrontier for this iterator says that the candidate fingerprint - // is outside of its seeking domain, there is no way that a seriesFrontier - // could be materialized. Simply bail. - if !d.ContainsFingerprint(f) { - return nil, false, nil - } - - // If we are either the first or the last key in the database, we need to use - // pessimistic boundary frontiers. - if f.Equal(d.firstFingerprint) { - lowerSeek = indexable.EncodeTime(d.firstSupertime) - } - if f.Equal(d.lastFingerprint) { - upperSeek = indexable.EncodeTime(d.lastSupertime) - } - - // TODO: Convert this to SampleKey.ToPartialDTO. - fp := new(dto.Fingerprint) - dumpFingerprint(fp, f) - key := &dto.SampleKey{ - Fingerprint: fp, - Timestamp: upperSeek, - } - - raw := coding.NewPBEncoder(key).MustEncode() - i.Seek(raw) - - if i.Key() == nil { - return nil, false, fmt.Errorf("illegal condition: empty key") - } - - retrievedKey, err := extractSampleKey(i) - if err != nil { - panic(err) - } - - retrievedFingerprint := retrievedKey.Fingerprint - - // The returned fingerprint may not match if the original seek key lives - // outside of a metric's frontier. This is probable, for we are seeking to - // to the maximum allowed time, which could advance us to the next - // fingerprint. - // - // - if !retrievedFingerprint.Equal(f) { - i.Previous() - - retrievedKey, err = extractSampleKey(i) - if err != nil { - panic(err) - } - retrievedFingerprint := retrievedKey.Fingerprint - // If the previous key does not match, we know that the requested - // fingerprint does not live in the database. - if !retrievedFingerprint.Equal(f) { - return nil, false, nil - } - } - - s = &seriesFrontier{ - lastSupertime: retrievedKey.FirstTimestamp, - lastTime: retrievedKey.LastTimestamp, - } - - key.Timestamp = lowerSeek - - raw = coding.NewPBEncoder(key).MustEncode() - - i.Seek(raw) - - retrievedKey, err = extractSampleKey(i) - if err != nil { - panic(err) - } - - retrievedFingerprint = retrievedKey.Fingerprint - - s.firstSupertime = retrievedKey.FirstTimestamp - - return s, true, nil -} - -// Contains indicates whether a given time value is within the recorded -// interval. -func (s *seriesFrontier) Contains(t time.Time) bool { - return !(t.Before(s.firstSupertime) || t.After(s.lastTime)) -} - -// InSafeSeekRange indicates whether the time is within the recorded time range -// and is safely seekable such that a seek does not result in an iterator point -// after the last value of the series or outside of the entire store. -func (s *seriesFrontier) InSafeSeekRange(t time.Time) (safe bool) { - if !s.Contains(t) { - return - } - - if s.lastSupertime.Before(t) { - return - } - - return true -} - -func (s *seriesFrontier) After(t time.Time) bool { - return s.firstSupertime.After(t) -} diff --git a/storage/metric/processor.go b/storage/metric/processor.go index 001086d9f..831032555 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -107,12 +107,13 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers if err != nil { return } + unactedSamples, err = extractSampleValues(sampleIterator) if err != nil { return } - for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) { + for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) { switch { // Furnish a new pending batch operation if none is available. case pendingBatch == nil: @@ -289,6 +290,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis if err != nil { return } + sampleValues, err := extractSampleValues(sampleIterator) if err != nil { return @@ -296,7 +298,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis pendingMutations := 0 - for lastCurated.Before(stopAt) { + for lastCurated.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) { switch { // Furnish a new pending batch operation if none is available. case pendingBatch == nil: diff --git a/storage/metric/samplekey.go b/storage/metric/samplekey.go index afcfad493..e76133fba 100644 --- a/storage/metric/samplekey.go +++ b/storage/metric/samplekey.go @@ -35,6 +35,24 @@ type SampleKey struct { SampleCount uint32 } +func (s *SampleKey) Equal(o *SampleKey) bool { + if s == o { + return true + } + + if !s.Fingerprint.Equal(o.Fingerprint) { + return false + } + if !s.FirstTimestamp.Equal(o.FirstTimestamp) { + return false + } + if !s.LastTimestamp.Equal(o.LastTimestamp) { + return false + } + + return s.SampleCount == o.SampleCount +} + // MayContain indicates whether the given SampleKey could potentially contain a // value at the provided time. Even if true is emitted, that does not mean a // satisfactory value, in fact, exists. @@ -49,6 +67,21 @@ func (s *SampleKey) MayContain(t time.Time) bool { } } +func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t time.Time) bool { + if s.Fingerprint.Less(fp) { + return true + } + if !s.Fingerprint.Equal(fp) { + return false + } + + if s.FirstTimestamp.Before(t) { + return true + } + + return s.LastTimestamp.Before(t) +} + // ToDTO converts this SampleKey into a DTO for use in serialization purposes. func (s *SampleKey) Dump(d *dto.SampleKey) { d.Reset() @@ -61,19 +94,6 @@ func (s *SampleKey) Dump(d *dto.SampleKey) { d.SampleCount = proto.Uint32(s.SampleCount) } -// ToPartialDTO converts this SampleKey into a DTO that is only suitable for -// database exploration purposes for a given (Fingerprint, First Sample Time) -// tuple. -func (s *SampleKey) FOOdumpPartial(d *dto.SampleKey) { - d.Reset() - - f := &dto.Fingerprint{} - dumpFingerprint(f, s.Fingerprint) - - d.Fingerprint = f - d.Timestamp = indexable.EncodeTime(s.FirstTimestamp) -} - func (s *SampleKey) String() string { return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount) } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 73f67d86e..23de17022 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -24,7 +24,6 @@ import ( clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/stats" "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility" @@ -90,7 +89,6 @@ type TieredStorage struct { state tieredStorageState memorySemaphore chan bool - diskSemaphore chan bool wmCache *watermarkCache @@ -107,7 +105,6 @@ type viewJob struct { } const ( - tieredDiskSemaphores = 1 tieredMemorySemaphores = 5 ) @@ -136,15 +133,11 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn memoryTTL: memoryTTL, viewQueue: make(chan viewJob, viewQueueDepth), - diskSemaphore: make(chan bool, tieredDiskSemaphores), memorySemaphore: make(chan bool, tieredMemorySemaphores), wmCache: wmCache, } - for i := 0; i < tieredDiskSemaphores; i++ { - s.diskSemaphore <- true - } for i := 0; i < tieredMemorySemaphores; i++ { s.memorySemaphore <- true } @@ -370,9 +363,9 @@ func (t *TieredStorage) renderView(viewJob viewJob) { scanJobsTimer.Stop() view := newView() - var iterator leveldb.Iterator = nil - var diskFrontier *diskFrontier = nil - var diskPresent = true + var iterator leveldb.Iterator + diskPresent := true + var firstBlock, lastBlock *SampleKey extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start() for _, scanJob := range scans { @@ -385,9 +378,6 @@ func (t *TieredStorage) renderView(viewJob viewJob) { continue } - var seriesFrontier *seriesFrontier = nil - var seriesPresent = true - standingOps := scanJob.operations memValues := t.memoryArena.CloneSamples(scanJob.fingerprint) @@ -402,50 +392,40 @@ func (t *TieredStorage) renderView(viewJob viewJob) { currentChunk := chunk{} // If we aimed before the oldest value in memory, load more data from disk. - if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent && seriesPresent { - diskPrepareTimer := viewJob.stats.GetTimer(stats.ViewDiskPreparationTime).Start() - // Conditionalize disk access. - if diskFrontier == nil && diskPresent { - if iterator == nil { - <-t.diskSemaphore - defer func() { - t.diskSemaphore <- true - }() - - // Get a single iterator that will be used for all data extraction - // below. - iterator = t.DiskStorage.MetricSamples.NewIterator(true) - defer iterator.Close() - } - - diskFrontier, diskPresent, err = newDiskFrontier(iterator) - if err != nil { - panic(err) - } - if !diskPresent { - seriesPresent = false + if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent { + if iterator == nil { + // Get a single iterator that will be used for all data extraction + // below. + iterator = t.DiskStorage.MetricSamples.NewIterator(true) + defer iterator.Close() + if diskPresent = iterator.SeekToLast(); diskPresent { + lastBlock, _ = extractSampleKey(iterator) + if !iterator.SeekToFirst() { + diskPresent = false + } else { + firstBlock, _ = extractSampleKey(iterator) + } } } - if seriesFrontier == nil && diskPresent { - seriesFrontier, seriesPresent, err = newSeriesFrontier(scanJob.fingerprint, diskFrontier, iterator) - if err != nil { - panic(err) - } - } - diskPrepareTimer.Stop() - - if diskPresent && seriesPresent { + if diskPresent { diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start() - diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime) + diskValues, expired := t.loadChunkAroundTime(iterator, scanJob.fingerprint, targetTime, firstBlock, lastBlock) + if expired { + diskPresent = false + } diskTimer.Stop() // If we aimed past the newest value on disk, combine it with the next value from memory. - if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { - latestDiskValue := diskValues[len(diskValues)-1:] - currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) + if len(diskValues) == 0 { + currentChunk = chunk(memValues) } else { - currentChunk = chunk(diskValues) + if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { + latestDiskValue := diskValues[len(diskValues)-1:] + currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) + } else { + currentChunk = chunk(diskValues) + } } } else { currentChunk = chunk(memValues) @@ -513,81 +493,93 @@ func (t *TieredStorage) renderView(viewJob viewJob) { return } -func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint *clientmodel.Fingerprint, ts time.Time) (chunk Values) { - - fd := &dto.Fingerprint{} - dumpFingerprint(fd, fingerprint) - targetKey := &dto.SampleKey{ - Fingerprint: fd, +func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerprint *clientmodel.Fingerprint, ts time.Time, firstBlock, lastBlock *SampleKey) (chunk Values, expired bool) { + if fingerprint.Less(firstBlock.Fingerprint) { + return nil, false } + if lastBlock.Fingerprint.Less(fingerprint) { + return nil, true + } + + seekingKey := &SampleKey{ + Fingerprint: fingerprint, + } + + if fingerprint.Equal(firstBlock.Fingerprint) && ts.Before(firstBlock.FirstTimestamp) { + seekingKey.FirstTimestamp = firstBlock.FirstTimestamp + } else if fingerprint.Equal(lastBlock.Fingerprint) && ts.After(lastBlock.FirstTimestamp) { + seekingKey.FirstTimestamp = lastBlock.FirstTimestamp + } else { + seekingKey.FirstTimestamp = ts + } + + fd := new(dto.SampleKey) + seekingKey.Dump(fd) + + // Try seeking to target key. + rawKey := coding.NewPBEncoder(fd).MustEncode() + if !iterator.Seek(rawKey) { + return chunk, true + } + var foundKey *SampleKey var foundValues Values - // Limit the target key to be within the series' keyspace. - if ts.After(frontier.lastSupertime) { - targetKey.Timestamp = indexable.EncodeTime(frontier.lastSupertime) - } else { - targetKey.Timestamp = indexable.EncodeTime(ts) - } + foundKey, _ = extractSampleKey(iterator) - // Try seeking to target key. - rawKey := coding.NewPBEncoder(targetKey).MustEncode() - iterator.Seek(rawKey) + if foundKey.Fingerprint.Equal(fingerprint) { + // Figure out if we need to rewind by one block. + // Imagine the following supertime blocks with time ranges: + // + // Block 1: ft 1000 - lt 1009 + // Block 1: ft 1010 - lt 1019 + // + // If we are aiming to find time 1005, we would first seek to the block with + // supertime 1010, then need to rewind by one block by virtue of LevelDB + // iterator seek behavior. + // + // Only do the rewind if there is another chunk before this one. + if !foundKey.MayContain(ts) { + postValues, _ := extractSampleValues(iterator) + if !foundKey.Equal(firstBlock) { + if !iterator.Previous() { + panic("This should never return false.") + } - foundKey, err := extractSampleKey(iterator) - if err != nil { - panic(err) - } + foundKey, _ = extractSampleKey(iterator) - // Figure out if we need to rewind by one block. - // Imagine the following supertime blocks with time ranges: - // - // Block 1: ft 1000 - lt 1009 - // Block 1: ft 1010 - lt 1019 - // - // If we are aiming to find time 1005, we would first seek to the block with - // supertime 1010, then need to rewind by one block by virtue of LevelDB - // iterator seek behavior. - // - // Only do the rewind if there is another chunk before this one. - rewound := false - firstTime := foundKey.FirstTimestamp - if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) { - iterator.Previous() - rewound = true - } + if !foundKey.Fingerprint.Equal(fingerprint) { + return postValues, false + } - foundValues, err = extractSampleValues(iterator) - if err != nil { - return - } - - // If we rewound, but the target time is still past the current block, return - // the last value of the current (rewound) block and the entire next block. - if rewound { - foundKey, err = extractSampleKey(iterator) - if err != nil { - return - } - currentChunkLastTime := foundKey.LastTimestamp - - if ts.After(currentChunkLastTime) { - sampleCount := len(foundValues) - chunk = append(chunk, foundValues[sampleCount-1]) - // We know there's a next block since we have rewound from it. - iterator.Next() - - foundValues, err = extractSampleValues(iterator) - if err != nil { - return + foundValues, _ = extractSampleValues(iterator) + foundValues = append(foundValues, postValues...) + return foundValues, false } } + + foundValues, _ = extractSampleValues(iterator) + return foundValues, false } - // Now append all the samples of the currently seeked block to the output. - chunk = append(chunk, foundValues...) + if fingerprint.Less(foundKey.Fingerprint) { + if !foundKey.Equal(firstBlock) { + if !iterator.Previous() { + panic("This should never return false.") + } - return + foundKey, _ = extractSampleKey(iterator) + + if !foundKey.Fingerprint.Equal(fingerprint) { + return nil, false + } + + foundValues, _ = extractSampleValues(iterator) + return foundValues, false + } + } + + panic("illegal state: violated sort invariant") } // Get all label values that are associated with the provided label name.