From 4d79dc3602afa6aaa54d540abf3bdded82735b80 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Sat, 16 Mar 2013 01:30:31 -0700 Subject: [PATCH] Replace renderView() by cleaner and more correct reimplementation. --- storage/metric/tiered.go | 288 +++++++++++++++++----------------- storage/metric/tiered_test.go | 134 ++++++++++++---- 2 files changed, 250 insertions(+), 172 deletions(-) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index df60e840c..3c4a97156 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -15,6 +15,7 @@ package metric import ( "fmt" + "github.com/jmhodges/levigo" "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" @@ -325,12 +326,14 @@ func (t *tieredStorage) flushMemory() { return } -func (t *tieredStorage) renderView(viewJob viewJob) (err error) { +func (t *tieredStorage) renderView(viewJob viewJob) { + // Telemetry. + var err error begin := time.Now() defer func() { duration := time.Since(begin) - recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: renderView, result: failure}) + recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure}) }() t.mutex.Lock() @@ -338,9 +341,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) { var ( scans = viewJob.builder.ScanJobs() - // standingOperations = ops{} - // lastTime = time.Time{} - view = newView() + view = newView() ) // Rebuilding of the frontier should happen on a conditional basis if a @@ -349,7 +350,13 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) { if err != nil { panic(err) } + if t.diskFrontier == nil { + // Storage still empty, return an empty view. + viewJob.output <- view + return + } + // Get a single iterator that will be used for all data extraction below. iterator, closer, err := t.diskStorage.metricSamples.GetIterator() if closer != nil { defer closer.Close() @@ -359,148 +366,149 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) { } for _, scanJob := range scans { - // XXX: Memoize the last retrieval for forward scans. - var ( - // standingOperations ops - ) + seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) + if err != nil { + panic(err) + } + if seriesFrontier == nil { + continue + } - // fmt.Printf("Starting scan of %s...\n", scanJob) - if t.diskFrontier != nil || t.diskFrontier.ContainsFingerprint(scanJob.fingerprint) { - // fmt.Printf("Using diskFrontier %s\n", t.diskFrontier) - seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) - // fmt.Printf("Using seriesFrontier %s\n", seriesFrontier) + standingOps := scanJob.operations + for len(standingOps) > 0 { + // Load data value chunk(s) around the first standing op's current time. + highWatermark := *standingOps[0].CurrentTime() + chunk := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, highWatermark) + lastChunkTime := chunk[len(chunk)-1].Timestamp + if lastChunkTime.After(highWatermark) { + highWatermark = lastChunkTime + } + + // For each op, extract all needed data from the current chunk. + out := []model.SamplePair{} + for _, op := range standingOps { + if op.CurrentTime().After(highWatermark) { + break + } + for op.CurrentTime() != nil && !op.CurrentTime().After(highWatermark) { + out = op.ExtractSamples(chunk) + } + } + + // Append the extracted samples to the materialized view. + for _, sample := range out { + view.appendSample(scanJob.fingerprint, sample.Timestamp, sample.Value) + } + + // Throw away standing ops which are finished. + filteredOps := ops{} + for _, op := range standingOps { + if op.CurrentTime() != nil { + filteredOps = append(filteredOps, op) + } + } + standingOps = filteredOps + + // Sort ops by start time again, since they might be slightly off now. + // For example, consider a current chunk of values and two interval ops + // with different interval lengthsr. Their states after the cycle above + // could be: + // + // (C = current op time) + // + // Chunk: [ X X X X X ] + // Op 1: [ X X C . . . ] + // Op 2: [ X X C . . .] + // + // Op 2 now has an earlier current time than Op 1. + sort.Sort(standingOps) + } + } + + viewJob.output <- view + return +} + +func (t *tieredStorage) loadChunkAroundTime(iterator *levigo.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) { + var ( + targetKey = &dto.SampleKey{ + Fingerprint: fingerprint.ToDTO(), + } + foundKey = &dto.SampleKey{} + foundValue *dto.SampleValueSeries + ) + + // Limit the target key to be within the series' keyspace. + if ts.After(frontier.lastSupertime) { + targetKey.Timestamp = indexable.EncodeTime(frontier.lastSupertime) + } else { + targetKey.Timestamp = indexable.EncodeTime(ts) + } + + // Try seeking to target key. + rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode() + iterator.Seek(rawKey) + + foundKey, err := extractSampleKey(iterator) + if err != nil { + panic(err) + } + + // Figure out if we need to rewind by one block. + // Imagine the following supertime blocks with time ranges: + // + // Block 1: ft 1000 - lt 1009 + // Block 1: ft 1010 - lt 1019 + // + // If we are aiming to find time 1005, we would first seek to the block with + // supertime 1010, then need to rewind by one block by virtue of LevelDB + // iterator seek behavior. + // + // Only do the rewind if there is another chunk before this one. + rewound := false + firstTime := indexable.DecodeTime(foundKey.Timestamp) + if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) { + iterator.Prev() + rewound = true + } + + foundValue, err = extractSampleValues(iterator) + if err != nil { + panic(err) + } + + // If we rewound, but the target time is still past the current block, return + // the last value of the current (rewound) block and the entire next block. + if rewound { + foundKey, err = extractSampleKey(iterator) + if err != nil { + panic(err) + } + currentChunkLastTime := time.Unix(*foundKey.LastTimestamp, 0) + + if ts.After(currentChunkLastTime) { + sampleCount := len(foundValue.Value) + chunk = append(chunk, model.SamplePair{ + Timestamp: time.Unix(*foundValue.Value[sampleCount-1].Timestamp, 0), + Value: model.SampleValue(*foundValue.Value[sampleCount-1].Value), + }) + // We know there's a next block since we have rewound from it. + iterator.Next() + + foundValue, err = extractSampleValues(iterator) if err != nil { panic(err) } - - if seriesFrontier != nil { - var ( - targetKey = &dto.SampleKey{} - foundKey = &dto.SampleKey{} - foundValue *dto.SampleValueSeries - ) - - for _, operation := range scanJob.operations { - if seriesFrontier.lastTime.Before(operation.StartsAt()) { - // fmt.Printf("operation %s occurs after %s; aborting...\n", operation, seriesFrontier.lastTime) - // XXXXXX - break - } - - scanJob.operations = scanJob.operations[1:len(scanJob.operations)] - - if operation.StartsAt().Before(seriesFrontier.firstSupertime) { - // fmt.Printf("operation %s occurs before %s; discarding...\n", operation, seriesFrontier.firstSupertime) - // XXXXXX - continue - } - - // If the operation starts in the last supertime block, but before - // the end of a series, set the seek time to be within the key space - // so as not to invalidate the iterator. - if seriesFrontier.lastSupertime.Before(operation.StartsAt()) && !seriesFrontier.lastTime.Before(operation.StartsAt()) { - targetKey.Timestamp = indexable.EncodeTime(seriesFrontier.lastSupertime) - } else { - targetKey.Timestamp = indexable.EncodeTime(operation.StartsAt()) - } - - targetKey.Fingerprint = scanJob.fingerprint.ToDTO() - - rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode() - - iterator.Seek(rawKey) - - foundKey, err = extractSampleKey(iterator) - if err != nil { - panic(err) - } - - firstTime := indexable.DecodeTime(foundKey.Timestamp) - - if operation.StartsAt().Before(firstTime) { - // Imagine the following supertime blocks with last time ranges: - // - // Block 1: ft 1000 - lt 1009 - // Block 1: ft 1010 - lt 1019 - // - // If an operation started at time 1005, we would first seek to the - // block with supertime 1010, then need to rewind by one block by - // virtue of LevelDB iterator seek behavior. - fmt.Printf("operation %s may occur in next entity; rewinding...\n", operation) - // XXXXXX - //iterator.Previous() - panic("oops") - } - // fmt.Printf("operation %s occurs inside of %s...\n", operation, foundKey) - foundValue, err = extractSampleValue(iterator) - if err != nil { - panic(err) - } - - var ( - elementCount = len(foundValue.Value) - searcher = func(i int) bool { - return time.Unix(*foundValue.Value[i].Timestamp, 0).After(operation.StartsAt()) - } - index = sort.Search(elementCount, searcher) - ) - - if index != elementCount { - if index > 0 { - index-- - } - - foundValue.Value = foundValue.Value[index:elementCount] - } - switch operation.(type) { - case *getValuesAtTimeOp: - if len(foundValue.Value) > 0 { - view.appendSample(scanJob.fingerprint, time.Unix(*foundValue.Value[0].Timestamp, 0), model.SampleValue(*foundValue.Value[0].Value)) - } - if len(foundValue.Value) > 1 { - view.appendSample(scanJob.fingerprint, time.Unix(*foundValue.Value[1].Timestamp, 0), model.SampleValue(*foundValue.Value[1].Value)) - } - default: - panic("unhandled") - } - } - } } - } - // for { - // if len(s.operations) == 0 { - // if len(standingOperations) > 0 { - // var ( - // intervals = collectIntervals(standingOperations) - // ranges = collectRanges(standingOperations) - // ) - - // if len(intervals) > 0 { - // } - - // if len(ranges) > 0 { - // if len(ranges) > 0 { - - // } - // } - // break - // } - // } - - // operation := s.operations[0] - // if operation.StartsAt().Equal(lastTime) { - // standingOperations = append(standingOperations, operation) - // } else { - // standingOperations = ops{operation} - // lastTime = operation.StartsAt() - // } - - // s.operations = s.operations[1:len(s.operations)] - // } - - viewJob.output <- view + // Now append all the samples of the currently seeked block to the output. + for _, sample := range foundValue.Value { + chunk = append(chunk, model.SamplePair{ + Timestamp: time.Unix(*sample.Timestamp, 0), + Value: model.SampleValue(*sample.Value), + }) + } return } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 91f7dd7f9..5e6c7572f 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -61,6 +61,20 @@ func testMakeView(t test.Tester) { in in out out }{ + // No sample, but query asks for one. + { + in: in{ + atTime: []getValuesAtTimeOp{ + { + time: instant, + }, + }, + }, + out: out{ + atTime: [][]model.SamplePair{{}}, + }, + }, + // Single sample, query asks for exact sample time. { data: []model.Sample{ { @@ -87,6 +101,66 @@ func testMakeView(t test.Tester) { }, }, }, + // Single sample, query time before the sample. + { + data: []model.Sample{ + { + Metric: metric, + Value: 0, + Timestamp: instant.Add(time.Second), + }, + { + Metric: metric, + Value: 1, + Timestamp: instant.Add(time.Second * 2), + }, + }, + in: in{ + atTime: []getValuesAtTimeOp{ + { + time: instant, + }, + }, + }, + out: out{ + atTime: [][]model.SamplePair{ + { + { + Timestamp: instant.Add(time.Second), + Value: 0, + }, + }, + }, + }, + }, + // Single sample, query time after the sample. + { + data: []model.Sample{ + { + Metric: metric, + Value: 0, + Timestamp: instant, + }, + }, + in: in{ + atTime: []getValuesAtTimeOp{ + { + time: instant.Add(time.Second), + }, + }, + }, + out: out{ + atTime: [][]model.SamplePair{ + { + { + Timestamp: instant, + Value: 0, + }, + }, + }, + }, + }, + // Two samples, query asks for first sample time. { data: []model.Sample{ { @@ -114,14 +188,11 @@ func testMakeView(t test.Tester) { Timestamp: instant, Value: 0, }, - { - Timestamp: instant.Add(time.Second), - Value: 1, - }, }, }, }, }, + // Three samples, query asks for second sample time. { data: []model.Sample{ { @@ -154,14 +225,11 @@ func testMakeView(t test.Tester) { Timestamp: instant.Add(time.Second), Value: 1, }, - { - Timestamp: instant.Add(time.Second * 2), - Value: 2, - }, }, }, }, }, + // Three samples, query asks for time between first and second samples. { data: []model.Sample{ { @@ -202,6 +270,7 @@ func testMakeView(t test.Tester) { }, }, }, + // Three samples, query asks for time between second and third samples. { data: []model.Sample{ { @@ -242,30 +311,31 @@ func testMakeView(t test.Tester) { }, }, }, - //{ - // data: buildSamples(instant, instant.Add(400*time.Second), time.Second, metric), - // in: in{ - // atTime: []getValuesAtTimeOp{ - // { - // time: instant.Add(time.Second * 100), - // }, - // }, - // }, - // out: out{ - // atTime: [][]model.SamplePair{ - // { - // { - // Timestamp: instant.Add(time.Second * 100), - // Value: 100, - // }, - // { - // Timestamp: instant.Add(time.Second * 100), - // Value: 101, - // }, - // }, - // }, - // }, - //}, + // Two chunks of samples, query asks for values from first chunk. + { + data: buildSamples(instant, instant.Add(time.Duration(*leveldbChunkSize*2)*time.Second), time.Second, metric), + in: in{ + atTime: []getValuesAtTimeOp{ + { + time: instant.Add(time.Second*time.Duration(*leveldbChunkSize/2) + 1), + }, + }, + }, + out: out{ + atTime: [][]model.SamplePair{ + { + { + Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize/2)), + Value: 100, + }, + { + Timestamp: instant.Add(time.Second * (time.Duration(*leveldbChunkSize/2) + 1)), + Value: 101, + }, + }, + }, + }, + }, } )