Snapshot of no more frontier.
Change-Id: Icd52da3f52bfe4529829ea70b4865ed7c9f6c446
This commit is contained in:
parent
e26d3437fd
commit
2b42fd0068
2
Makefile
2
Makefile
|
@ -89,7 +89,7 @@ $(FULL_GOPATH):
|
|||
-[ -d "$(FULL_GOPATH)" ] || { mkdir -vp $(FULL_GOPATH_BASE) ; ln -s "$(PWD)" "$(FULL_GOPATH)" ; }
|
||||
[ -d "$(FULL_GOPATH)" ]
|
||||
|
||||
test: build
|
||||
test: config dependencies model preparation tools web
|
||||
$(GO) test $(GO_TEST_FLAGS) ./...
|
||||
|
||||
tools: dependencies preparation
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"time"
|
||||
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"github.com/golang/glog"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -70,9 +71,6 @@ type Curator struct {
|
|||
type watermarkScanner struct {
|
||||
// curationState is the data store for curation remarks.
|
||||
curationState CurationRemarker
|
||||
// diskFrontier models the available seekable ranges for the provided
|
||||
// sampleIterator.
|
||||
diskFrontier *diskFrontier
|
||||
// ignoreYoungerThan is passed into the curation remark for the given series.
|
||||
ignoreYoungerThan time.Duration
|
||||
// processor is responsible for executing a given stategy on the
|
||||
|
@ -89,6 +87,8 @@ type watermarkScanner struct {
|
|||
stop chan bool
|
||||
// status is the outbound channel for notifying the status page of its state.
|
||||
status CurationStateUpdater
|
||||
|
||||
firstBlock, lastBlock *SampleKey
|
||||
}
|
||||
|
||||
// run facilitates the curation lifecycle.
|
||||
|
@ -119,14 +119,19 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
|
|||
iterator := samples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
||||
diskFrontier, present, err := newDiskFrontier(iterator)
|
||||
if err != nil {
|
||||
if !iterator.SeekToLast() {
|
||||
glog.Info("Empty database; skipping curation.")
|
||||
|
||||
return
|
||||
}
|
||||
if !present {
|
||||
// No sample database exists; no work to do!
|
||||
lastBlock, _ := extractSampleKey(iterator)
|
||||
|
||||
if !iterator.SeekToFirst() {
|
||||
glog.Info("Empty database; skipping curation.")
|
||||
|
||||
return
|
||||
}
|
||||
firstBlock, _ := extractSampleKey(iterator)
|
||||
|
||||
scanner := &watermarkScanner{
|
||||
curationState: curationState,
|
||||
|
@ -136,9 +141,11 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
|
|||
stop: c.Stop,
|
||||
stopAt: instant.Add(-1 * ignoreYoungerThan),
|
||||
|
||||
diskFrontier: diskFrontier,
|
||||
sampleIterator: iterator,
|
||||
samples: samples,
|
||||
|
||||
firstBlock: firstBlock,
|
||||
lastBlock: lastBlock,
|
||||
}
|
||||
|
||||
// Right now, the ability to stop a curation is limited to the beginning of
|
||||
|
@ -271,12 +278,11 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm
|
|||
func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
|
||||
fingerprint := key.(*clientmodel.Fingerprint)
|
||||
|
||||
seriesFrontier, present, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator)
|
||||
if err != nil || !present {
|
||||
// An anomaly with the series frontier is severe in the sense that some sort
|
||||
// of an illegal state condition exists in the storage layer, which would
|
||||
// probably signify an illegal disk frontier.
|
||||
return &storage.OperatorError{error: err, Continuable: false}
|
||||
if fingerprint.Less(w.firstBlock.Fingerprint) {
|
||||
return nil
|
||||
}
|
||||
if w.lastBlock.Fingerprint.Less(fingerprint) {
|
||||
return nil
|
||||
}
|
||||
|
||||
curationState, present, err := w.curationState.Get(&curationKey{
|
||||
|
@ -285,7 +291,6 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
|||
ProcessorMessageTypeName: w.processor.Name(),
|
||||
IgnoreYoungerThan: w.ignoreYoungerThan,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// An anomaly with the curation remark is likely not fatal in the sense that
|
||||
// there was a decoding error with the entity and shouldn't be cause to stop
|
||||
|
@ -293,23 +298,21 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
|||
// work forward. With an idempotent processor, this is safe.
|
||||
return &storage.OperatorError{error: err, Continuable: true}
|
||||
}
|
||||
var firstSeek time.Time
|
||||
switch {
|
||||
case !present, seriesFrontier.After(curationState):
|
||||
firstSeek = seriesFrontier.firstSupertime
|
||||
case !seriesFrontier.InSafeSeekRange(curationState):
|
||||
firstSeek = seriesFrontier.lastSupertime
|
||||
default:
|
||||
firstSeek = curationState
|
||||
|
||||
keySet := &SampleKey{
|
||||
Fingerprint: fingerprint,
|
||||
}
|
||||
|
||||
startKey := &SampleKey{
|
||||
Fingerprint: fingerprint,
|
||||
FirstTimestamp: firstSeek,
|
||||
if !present && fingerprint.Equal(w.firstBlock.Fingerprint) {
|
||||
// If the fingerprint is the same, then we simply need to use the earliest
|
||||
// block found in the database.
|
||||
*keySet = *w.firstBlock
|
||||
} else if present {
|
||||
keySet.FirstTimestamp = curationState
|
||||
}
|
||||
|
||||
dto := new(dto.SampleKey)
|
||||
|
||||
startKey.Dump(dto)
|
||||
keySet.Dump(dto)
|
||||
prospectiveKey := coding.NewPBEncoder(dto).MustEncode()
|
||||
if !w.sampleIterator.Seek(prospectiveKey) {
|
||||
// LevelDB is picky about the seek ranges. If an iterator was invalidated,
|
||||
|
@ -317,25 +320,41 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
|||
return &storage.OperatorError{error: fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), Continuable: false}
|
||||
}
|
||||
|
||||
newestAllowedSample := w.stopAt
|
||||
if !newestAllowedSample.Before(seriesFrontier.lastSupertime) {
|
||||
newestAllowedSample = seriesFrontier.lastSupertime
|
||||
for {
|
||||
sampleKey, err := extractSampleKey(w.sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !sampleKey.Fingerprint.Equal(fingerprint) {
|
||||
return
|
||||
}
|
||||
|
||||
if !present {
|
||||
break
|
||||
}
|
||||
|
||||
if !(sampleKey.FirstTimestamp.Before(curationState) && sampleKey.LastTimestamp.Before(curationState)) {
|
||||
break
|
||||
}
|
||||
|
||||
if !w.sampleIterator.Next() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, newestAllowedSample, fingerprint)
|
||||
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, w.stopAt, fingerprint)
|
||||
if err != nil {
|
||||
// We can't divine the severity of a processor error without refactoring the
|
||||
// interface.
|
||||
return &storage.OperatorError{error: err, Continuable: false}
|
||||
}
|
||||
|
||||
err = w.curationState.Update(&curationKey{
|
||||
if err = w.curationState.Update(&curationKey{
|
||||
Fingerprint: fingerprint,
|
||||
ProcessorMessageRaw: w.processor.Signature(),
|
||||
ProcessorMessageTypeName: w.processor.Name(),
|
||||
IgnoreYoungerThan: w.ignoreYoungerThan,
|
||||
}, lastTime)
|
||||
if err != nil {
|
||||
}, lastTime); err != nil {
|
||||
// Under the assumption that the processors are idempotent, they can be
|
||||
// re-run; thusly, the commitment of the curation remark is no cause
|
||||
// to cease further progress.
|
||||
|
|
|
@ -1,196 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/coding/indexable"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
)
|
||||
|
||||
// diskFrontier describes an on-disk store of series to provide a
|
||||
// representation of the known keyspace and time series values available.
|
||||
//
|
||||
// This is used to reduce the burden associated with LevelDB iterator
|
||||
// management.
|
||||
type diskFrontier struct {
|
||||
firstFingerprint *clientmodel.Fingerprint
|
||||
firstSupertime time.Time
|
||||
lastFingerprint *clientmodel.Fingerprint
|
||||
lastSupertime time.Time
|
||||
}
|
||||
|
||||
func (f diskFrontier) String() string {
|
||||
return fmt.Sprintf("diskFrontier from %s at %s to %s at %s", f.firstFingerprint, f.firstSupertime, f.lastFingerprint, f.lastSupertime)
|
||||
}
|
||||
|
||||
func (f *diskFrontier) ContainsFingerprint(fingerprint *clientmodel.Fingerprint) bool {
|
||||
return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
|
||||
}
|
||||
|
||||
func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, present bool, err error) {
|
||||
if !i.SeekToLast() || i.Key() == nil {
|
||||
return nil, false, nil
|
||||
}
|
||||
lastKey, err := extractSampleKey(i)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if !i.SeekToFirst() || i.Key() == nil {
|
||||
return nil, false, nil
|
||||
}
|
||||
firstKey, err := extractSampleKey(i)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return &diskFrontier{
|
||||
firstFingerprint: firstKey.Fingerprint,
|
||||
firstSupertime: firstKey.FirstTimestamp,
|
||||
lastFingerprint: lastKey.Fingerprint,
|
||||
lastSupertime: lastKey.FirstTimestamp,
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
// seriesFrontier represents the valid seek frontier for a given series.
|
||||
type seriesFrontier struct {
|
||||
firstSupertime time.Time
|
||||
lastSupertime time.Time
|
||||
lastTime time.Time
|
||||
}
|
||||
|
||||
func (f *seriesFrontier) String() string {
|
||||
return fmt.Sprintf("seriesFrontier from %s to %s at %s", f.firstSupertime, f.lastSupertime, f.lastTime)
|
||||
}
|
||||
|
||||
// newSeriesFrontier furnishes a populated diskFrontier for a given
|
||||
// fingerprint. If the series is absent, present will be false.
|
||||
func newSeriesFrontier(f *clientmodel.Fingerprint, d *diskFrontier, i leveldb.Iterator) (s *seriesFrontier, present bool, err error) {
|
||||
lowerSeek := firstSupertime
|
||||
upperSeek := lastSupertime
|
||||
|
||||
// If the diskFrontier for this iterator says that the candidate fingerprint
|
||||
// is outside of its seeking domain, there is no way that a seriesFrontier
|
||||
// could be materialized. Simply bail.
|
||||
if !d.ContainsFingerprint(f) {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// If we are either the first or the last key in the database, we need to use
|
||||
// pessimistic boundary frontiers.
|
||||
if f.Equal(d.firstFingerprint) {
|
||||
lowerSeek = indexable.EncodeTime(d.firstSupertime)
|
||||
}
|
||||
if f.Equal(d.lastFingerprint) {
|
||||
upperSeek = indexable.EncodeTime(d.lastSupertime)
|
||||
}
|
||||
|
||||
// TODO: Convert this to SampleKey.ToPartialDTO.
|
||||
fp := new(dto.Fingerprint)
|
||||
dumpFingerprint(fp, f)
|
||||
key := &dto.SampleKey{
|
||||
Fingerprint: fp,
|
||||
Timestamp: upperSeek,
|
||||
}
|
||||
|
||||
raw := coding.NewPBEncoder(key).MustEncode()
|
||||
i.Seek(raw)
|
||||
|
||||
if i.Key() == nil {
|
||||
return nil, false, fmt.Errorf("illegal condition: empty key")
|
||||
}
|
||||
|
||||
retrievedKey, err := extractSampleKey(i)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
retrievedFingerprint := retrievedKey.Fingerprint
|
||||
|
||||
// The returned fingerprint may not match if the original seek key lives
|
||||
// outside of a metric's frontier. This is probable, for we are seeking to
|
||||
// to the maximum allowed time, which could advance us to the next
|
||||
// fingerprint.
|
||||
//
|
||||
//
|
||||
if !retrievedFingerprint.Equal(f) {
|
||||
i.Previous()
|
||||
|
||||
retrievedKey, err = extractSampleKey(i)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
retrievedFingerprint := retrievedKey.Fingerprint
|
||||
// If the previous key does not match, we know that the requested
|
||||
// fingerprint does not live in the database.
|
||||
if !retrievedFingerprint.Equal(f) {
|
||||
return nil, false, nil
|
||||
}
|
||||
}
|
||||
|
||||
s = &seriesFrontier{
|
||||
lastSupertime: retrievedKey.FirstTimestamp,
|
||||
lastTime: retrievedKey.LastTimestamp,
|
||||
}
|
||||
|
||||
key.Timestamp = lowerSeek
|
||||
|
||||
raw = coding.NewPBEncoder(key).MustEncode()
|
||||
|
||||
i.Seek(raw)
|
||||
|
||||
retrievedKey, err = extractSampleKey(i)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
retrievedFingerprint = retrievedKey.Fingerprint
|
||||
|
||||
s.firstSupertime = retrievedKey.FirstTimestamp
|
||||
|
||||
return s, true, nil
|
||||
}
|
||||
|
||||
// Contains indicates whether a given time value is within the recorded
|
||||
// interval.
|
||||
func (s *seriesFrontier) Contains(t time.Time) bool {
|
||||
return !(t.Before(s.firstSupertime) || t.After(s.lastTime))
|
||||
}
|
||||
|
||||
// InSafeSeekRange indicates whether the time is within the recorded time range
|
||||
// and is safely seekable such that a seek does not result in an iterator point
|
||||
// after the last value of the series or outside of the entire store.
|
||||
func (s *seriesFrontier) InSafeSeekRange(t time.Time) (safe bool) {
|
||||
if !s.Contains(t) {
|
||||
return
|
||||
}
|
||||
|
||||
if s.lastSupertime.Before(t) {
|
||||
return
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *seriesFrontier) After(t time.Time) bool {
|
||||
return s.firstSupertime.After(t)
|
||||
}
|
|
@ -107,12 +107,13 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
unactedSamples, err = extractSampleValues(sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) {
|
||||
for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) {
|
||||
switch {
|
||||
// Furnish a new pending batch operation if none is available.
|
||||
case pendingBatch == nil:
|
||||
|
@ -289,6 +290,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
sampleValues, err := extractSampleValues(sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -296,7 +298,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
|
|||
|
||||
pendingMutations := 0
|
||||
|
||||
for lastCurated.Before(stopAt) {
|
||||
for lastCurated.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) {
|
||||
switch {
|
||||
// Furnish a new pending batch operation if none is available.
|
||||
case pendingBatch == nil:
|
||||
|
|
|
@ -35,6 +35,24 @@ type SampleKey struct {
|
|||
SampleCount uint32
|
||||
}
|
||||
|
||||
func (s *SampleKey) Equal(o *SampleKey) bool {
|
||||
if s == o {
|
||||
return true
|
||||
}
|
||||
|
||||
if !s.Fingerprint.Equal(o.Fingerprint) {
|
||||
return false
|
||||
}
|
||||
if !s.FirstTimestamp.Equal(o.FirstTimestamp) {
|
||||
return false
|
||||
}
|
||||
if !s.LastTimestamp.Equal(o.LastTimestamp) {
|
||||
return false
|
||||
}
|
||||
|
||||
return s.SampleCount == o.SampleCount
|
||||
}
|
||||
|
||||
// MayContain indicates whether the given SampleKey could potentially contain a
|
||||
// value at the provided time. Even if true is emitted, that does not mean a
|
||||
// satisfactory value, in fact, exists.
|
||||
|
@ -49,6 +67,21 @@ func (s *SampleKey) MayContain(t time.Time) bool {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t time.Time) bool {
|
||||
if s.Fingerprint.Less(fp) {
|
||||
return true
|
||||
}
|
||||
if !s.Fingerprint.Equal(fp) {
|
||||
return false
|
||||
}
|
||||
|
||||
if s.FirstTimestamp.Before(t) {
|
||||
return true
|
||||
}
|
||||
|
||||
return s.LastTimestamp.Before(t)
|
||||
}
|
||||
|
||||
// ToDTO converts this SampleKey into a DTO for use in serialization purposes.
|
||||
func (s *SampleKey) Dump(d *dto.SampleKey) {
|
||||
d.Reset()
|
||||
|
@ -61,19 +94,6 @@ func (s *SampleKey) Dump(d *dto.SampleKey) {
|
|||
d.SampleCount = proto.Uint32(s.SampleCount)
|
||||
}
|
||||
|
||||
// ToPartialDTO converts this SampleKey into a DTO that is only suitable for
|
||||
// database exploration purposes for a given (Fingerprint, First Sample Time)
|
||||
// tuple.
|
||||
func (s *SampleKey) FOOdumpPartial(d *dto.SampleKey) {
|
||||
d.Reset()
|
||||
|
||||
f := &dto.Fingerprint{}
|
||||
dumpFingerprint(f, s.Fingerprint)
|
||||
|
||||
d.Fingerprint = f
|
||||
d.Timestamp = indexable.EncodeTime(s.FirstTimestamp)
|
||||
}
|
||||
|
||||
func (s *SampleKey) String() string {
|
||||
return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount)
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import (
|
|||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/coding/indexable"
|
||||
"github.com/prometheus/prometheus/stats"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
|
@ -90,7 +89,6 @@ type TieredStorage struct {
|
|||
state tieredStorageState
|
||||
|
||||
memorySemaphore chan bool
|
||||
diskSemaphore chan bool
|
||||
|
||||
wmCache *watermarkCache
|
||||
|
||||
|
@ -107,7 +105,6 @@ type viewJob struct {
|
|||
}
|
||||
|
||||
const (
|
||||
tieredDiskSemaphores = 1
|
||||
tieredMemorySemaphores = 5
|
||||
)
|
||||
|
||||
|
@ -136,15 +133,11 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
|
|||
memoryTTL: memoryTTL,
|
||||
viewQueue: make(chan viewJob, viewQueueDepth),
|
||||
|
||||
diskSemaphore: make(chan bool, tieredDiskSemaphores),
|
||||
memorySemaphore: make(chan bool, tieredMemorySemaphores),
|
||||
|
||||
wmCache: wmCache,
|
||||
}
|
||||
|
||||
for i := 0; i < tieredDiskSemaphores; i++ {
|
||||
s.diskSemaphore <- true
|
||||
}
|
||||
for i := 0; i < tieredMemorySemaphores; i++ {
|
||||
s.memorySemaphore <- true
|
||||
}
|
||||
|
@ -370,9 +363,9 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
scanJobsTimer.Stop()
|
||||
view := newView()
|
||||
|
||||
var iterator leveldb.Iterator = nil
|
||||
var diskFrontier *diskFrontier = nil
|
||||
var diskPresent = true
|
||||
var iterator leveldb.Iterator
|
||||
diskPresent := true
|
||||
var firstBlock, lastBlock *SampleKey
|
||||
|
||||
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
|
||||
for _, scanJob := range scans {
|
||||
|
@ -385,9 +378,6 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
continue
|
||||
}
|
||||
|
||||
var seriesFrontier *seriesFrontier = nil
|
||||
var seriesPresent = true
|
||||
|
||||
standingOps := scanJob.operations
|
||||
memValues := t.memoryArena.CloneSamples(scanJob.fingerprint)
|
||||
|
||||
|
@ -402,50 +392,40 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
|
||||
currentChunk := chunk{}
|
||||
// If we aimed before the oldest value in memory, load more data from disk.
|
||||
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent && seriesPresent {
|
||||
diskPrepareTimer := viewJob.stats.GetTimer(stats.ViewDiskPreparationTime).Start()
|
||||
// Conditionalize disk access.
|
||||
if diskFrontier == nil && diskPresent {
|
||||
if iterator == nil {
|
||||
<-t.diskSemaphore
|
||||
defer func() {
|
||||
t.diskSemaphore <- true
|
||||
}()
|
||||
|
||||
// Get a single iterator that will be used for all data extraction
|
||||
// below.
|
||||
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
}
|
||||
|
||||
diskFrontier, diskPresent, err = newDiskFrontier(iterator)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if !diskPresent {
|
||||
seriesPresent = false
|
||||
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent {
|
||||
if iterator == nil {
|
||||
// Get a single iterator that will be used for all data extraction
|
||||
// below.
|
||||
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
if diskPresent = iterator.SeekToLast(); diskPresent {
|
||||
lastBlock, _ = extractSampleKey(iterator)
|
||||
if !iterator.SeekToFirst() {
|
||||
diskPresent = false
|
||||
} else {
|
||||
firstBlock, _ = extractSampleKey(iterator)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if seriesFrontier == nil && diskPresent {
|
||||
seriesFrontier, seriesPresent, err = newSeriesFrontier(scanJob.fingerprint, diskFrontier, iterator)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
diskPrepareTimer.Stop()
|
||||
|
||||
if diskPresent && seriesPresent {
|
||||
if diskPresent {
|
||||
diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start()
|
||||
diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime)
|
||||
diskValues, expired := t.loadChunkAroundTime(iterator, scanJob.fingerprint, targetTime, firstBlock, lastBlock)
|
||||
if expired {
|
||||
diskPresent = false
|
||||
}
|
||||
diskTimer.Stop()
|
||||
|
||||
// If we aimed past the newest value on disk, combine it with the next value from memory.
|
||||
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
|
||||
latestDiskValue := diskValues[len(diskValues)-1:]
|
||||
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
|
||||
if len(diskValues) == 0 {
|
||||
currentChunk = chunk(memValues)
|
||||
} else {
|
||||
currentChunk = chunk(diskValues)
|
||||
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
|
||||
latestDiskValue := diskValues[len(diskValues)-1:]
|
||||
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
|
||||
} else {
|
||||
currentChunk = chunk(diskValues)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
currentChunk = chunk(memValues)
|
||||
|
@ -513,81 +493,93 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
return
|
||||
}
|
||||
|
||||
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint *clientmodel.Fingerprint, ts time.Time) (chunk Values) {
|
||||
|
||||
fd := &dto.Fingerprint{}
|
||||
dumpFingerprint(fd, fingerprint)
|
||||
targetKey := &dto.SampleKey{
|
||||
Fingerprint: fd,
|
||||
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerprint *clientmodel.Fingerprint, ts time.Time, firstBlock, lastBlock *SampleKey) (chunk Values, expired bool) {
|
||||
if fingerprint.Less(firstBlock.Fingerprint) {
|
||||
return nil, false
|
||||
}
|
||||
if lastBlock.Fingerprint.Less(fingerprint) {
|
||||
return nil, true
|
||||
}
|
||||
|
||||
seekingKey := &SampleKey{
|
||||
Fingerprint: fingerprint,
|
||||
}
|
||||
|
||||
if fingerprint.Equal(firstBlock.Fingerprint) && ts.Before(firstBlock.FirstTimestamp) {
|
||||
seekingKey.FirstTimestamp = firstBlock.FirstTimestamp
|
||||
} else if fingerprint.Equal(lastBlock.Fingerprint) && ts.After(lastBlock.FirstTimestamp) {
|
||||
seekingKey.FirstTimestamp = lastBlock.FirstTimestamp
|
||||
} else {
|
||||
seekingKey.FirstTimestamp = ts
|
||||
}
|
||||
|
||||
fd := new(dto.SampleKey)
|
||||
seekingKey.Dump(fd)
|
||||
|
||||
// Try seeking to target key.
|
||||
rawKey := coding.NewPBEncoder(fd).MustEncode()
|
||||
if !iterator.Seek(rawKey) {
|
||||
return chunk, true
|
||||
}
|
||||
|
||||
var foundKey *SampleKey
|
||||
var foundValues Values
|
||||
|
||||
// Limit the target key to be within the series' keyspace.
|
||||
if ts.After(frontier.lastSupertime) {
|
||||
targetKey.Timestamp = indexable.EncodeTime(frontier.lastSupertime)
|
||||
} else {
|
||||
targetKey.Timestamp = indexable.EncodeTime(ts)
|
||||
}
|
||||
foundKey, _ = extractSampleKey(iterator)
|
||||
|
||||
// Try seeking to target key.
|
||||
rawKey := coding.NewPBEncoder(targetKey).MustEncode()
|
||||
iterator.Seek(rawKey)
|
||||
if foundKey.Fingerprint.Equal(fingerprint) {
|
||||
// Figure out if we need to rewind by one block.
|
||||
// Imagine the following supertime blocks with time ranges:
|
||||
//
|
||||
// Block 1: ft 1000 - lt 1009 <data>
|
||||
// Block 1: ft 1010 - lt 1019 <data>
|
||||
//
|
||||
// If we are aiming to find time 1005, we would first seek to the block with
|
||||
// supertime 1010, then need to rewind by one block by virtue of LevelDB
|
||||
// iterator seek behavior.
|
||||
//
|
||||
// Only do the rewind if there is another chunk before this one.
|
||||
if !foundKey.MayContain(ts) {
|
||||
postValues, _ := extractSampleValues(iterator)
|
||||
if !foundKey.Equal(firstBlock) {
|
||||
if !iterator.Previous() {
|
||||
panic("This should never return false.")
|
||||
}
|
||||
|
||||
foundKey, err := extractSampleKey(iterator)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
foundKey, _ = extractSampleKey(iterator)
|
||||
|
||||
// Figure out if we need to rewind by one block.
|
||||
// Imagine the following supertime blocks with time ranges:
|
||||
//
|
||||
// Block 1: ft 1000 - lt 1009 <data>
|
||||
// Block 1: ft 1010 - lt 1019 <data>
|
||||
//
|
||||
// If we are aiming to find time 1005, we would first seek to the block with
|
||||
// supertime 1010, then need to rewind by one block by virtue of LevelDB
|
||||
// iterator seek behavior.
|
||||
//
|
||||
// Only do the rewind if there is another chunk before this one.
|
||||
rewound := false
|
||||
firstTime := foundKey.FirstTimestamp
|
||||
if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) {
|
||||
iterator.Previous()
|
||||
rewound = true
|
||||
}
|
||||
if !foundKey.Fingerprint.Equal(fingerprint) {
|
||||
return postValues, false
|
||||
}
|
||||
|
||||
foundValues, err = extractSampleValues(iterator)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// If we rewound, but the target time is still past the current block, return
|
||||
// the last value of the current (rewound) block and the entire next block.
|
||||
if rewound {
|
||||
foundKey, err = extractSampleKey(iterator)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
currentChunkLastTime := foundKey.LastTimestamp
|
||||
|
||||
if ts.After(currentChunkLastTime) {
|
||||
sampleCount := len(foundValues)
|
||||
chunk = append(chunk, foundValues[sampleCount-1])
|
||||
// We know there's a next block since we have rewound from it.
|
||||
iterator.Next()
|
||||
|
||||
foundValues, err = extractSampleValues(iterator)
|
||||
if err != nil {
|
||||
return
|
||||
foundValues, _ = extractSampleValues(iterator)
|
||||
foundValues = append(foundValues, postValues...)
|
||||
return foundValues, false
|
||||
}
|
||||
}
|
||||
|
||||
foundValues, _ = extractSampleValues(iterator)
|
||||
return foundValues, false
|
||||
}
|
||||
|
||||
// Now append all the samples of the currently seeked block to the output.
|
||||
chunk = append(chunk, foundValues...)
|
||||
if fingerprint.Less(foundKey.Fingerprint) {
|
||||
if !foundKey.Equal(firstBlock) {
|
||||
if !iterator.Previous() {
|
||||
panic("This should never return false.")
|
||||
}
|
||||
|
||||
return
|
||||
foundKey, _ = extractSampleKey(iterator)
|
||||
|
||||
if !foundKey.Fingerprint.Equal(fingerprint) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
foundValues, _ = extractSampleValues(iterator)
|
||||
return foundValues, false
|
||||
}
|
||||
}
|
||||
|
||||
panic("illegal state: violated sort invariant")
|
||||
}
|
||||
|
||||
// Get all label values that are associated with the provided label name.
|
||||
|
|
Loading…
Reference in New Issue