From d2da21121c917229d81dd3f2d490ac326ad6557a Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Tue, 7 May 2013 14:25:01 +0200 Subject: [PATCH] Implement getValueRangeAtIntervalOp for faster range queries. This also short-circuits optimize() for now, since it is complex to implement for the new operator, and ops generated by the query layer already fulfill the needed invariants. We should still investigate later whether to completely delete operator optimization code or extend it to support getValueRangeAtIntervalOp operators. --- rules/ast/query_analyzer.go | 7 +- storage/metric/operation.go | 114 ++++++++++++++++++--- storage/metric/operation_test.go | 168 +++++++++++++++++++++++++++++-- storage/metric/tiered.go | 4 +- storage/metric/view.go | 39 +++++-- 5 files changed, 297 insertions(+), 35 deletions(-) diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go index e1830e101..977dac240 100644 --- a/rules/ast/query_analyzer.go +++ b/rules/ast/query_analyzer.go @@ -132,9 +132,10 @@ func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interva requestBuildTimer := queryStats.GetTimer(stats.ViewRequestBuildTime).Start() viewBuilder := metric.NewViewRequestBuilder() for fingerprint, rangeDuration := range analyzer.FullRanges { - // TODO: we should support GetMetricRangeAtInterval() or similar ops in the view builder. - for t := start; t.Before(end); t = t.Add(interval) { - viewBuilder.GetMetricRange(&fingerprint, t.Add(-rangeDuration), t) + if interval < rangeDuration { + viewBuilder.GetMetricRange(&fingerprint, start.Add(-rangeDuration), end) + } else { + viewBuilder.GetMetricRangeAtInterval(&fingerprint, start.Add(-rangeDuration), end, interval, rangeDuration) } } for fingerprint := range analyzer.IntervalRanges { diff --git a/storage/metric/operation.go b/storage/metric/operation.go index 2fef66a5c..0ce9b9ecf 100644 --- a/storage/metric/operation.go +++ b/storage/metric/operation.go @@ -25,7 +25,9 @@ type op interface { // The time at which this operation starts. StartsAt() time.Time // Extract samples from stream of values and advance operation time. - ExtractSamples(in Values) (out Values) + ExtractSamples(Values) Values + // Return whether the operator has consumed all data it needs. + Consumed() bool // Get current operation time or nil if no subsequent work associated with // this operator remains. CurrentTime() *time.Time @@ -124,11 +126,12 @@ func extractValuesAroundTime(t time.Time, in Values) (out Values) { return } -func (g getValuesAtTimeOp) CurrentTime() (currentTime *time.Time) { - if !g.consumed { - currentTime = &g.time - } - return +func (g getValuesAtTimeOp) CurrentTime() *time.Time { + return &g.time +} + +func (g getValuesAtTimeOp) Consumed() bool { + return g.consumed } // Encapsulates getting values at a given interval over a duration. @@ -173,13 +176,14 @@ func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) { return } -func (g *getValuesAtIntervalOp) CurrentTime() (currentTime *time.Time) { - if g.from.After(g.through) { - return - } +func (g *getValuesAtIntervalOp) CurrentTime() *time.Time { return &g.from } +func (g *getValuesAtIntervalOp) Consumed() bool { + return g.from.After(g.through) +} + func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) { switch o := op.(type) { case *getValuesAtTimeOp: @@ -193,6 +197,7 @@ func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) { return } +// Encapsulates getting all values in a given range. type getValuesAlongRangeOp struct { from time.Time through time.Time @@ -243,13 +248,14 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) { return in[firstIdx:lastIdx] } -func (g *getValuesAlongRangeOp) CurrentTime() (currentTime *time.Time) { - if g.from.After(g.through) { - return - } +func (g *getValuesAlongRangeOp) CurrentTime() *time.Time { return &g.from } +func (g *getValuesAlongRangeOp) Consumed() bool { + return g.from.After(g.through) +} + func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) { switch o := op.(type) { case *getValuesAtTimeOp: @@ -263,6 +269,86 @@ func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) { return } +// Encapsulates getting all values from ranges along intervals. +// +// Works just like getValuesAlongRangeOp, but when from > through, through is +// incremented by interval and from is reset to through-rangeDuration. Returns +// current time nil when from > totalThrough. +type getValueRangeAtIntervalOp struct { + rangeFrom time.Time + rangeThrough time.Time + rangeDuration time.Duration + interval time.Duration + through time.Time +} + +func (o *getValueRangeAtIntervalOp) String() string { + return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", o.rangeDuration, o.rangeFrom, o.interval, o.through) +} + +func (g *getValueRangeAtIntervalOp) StartsAt() time.Time { + return g.rangeFrom +} + +func (g *getValueRangeAtIntervalOp) Through() time.Time { + panic("not implemented") +} + +func (g *getValueRangeAtIntervalOp) advanceToNextInterval() { + g.rangeThrough = g.rangeThrough.Add(g.interval) + g.rangeFrom = g.rangeThrough.Add(-g.rangeDuration) +} + +func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) { + if len(in) == 0 { + return + } + // Find the first sample where time >= g.from. + firstIdx := sort.Search(len(in), func(i int) bool { + return !in[i].Timestamp.Before(g.rangeFrom) + }) + if firstIdx == len(in) { + // No samples at or after operator start time. This can only happen if we + // try applying the operator to a time after the last recorded sample. In + // this case, we're finished. + g.rangeFrom = g.through.Add(1) + return + } + + // Find the first sample where time > g.rangeThrough. + lastIdx := sort.Search(len(in), func(i int) bool { + return in[i].Timestamp.After(g.rangeThrough) + }) + // This only happens when there is only one sample and it is both after + // g.rangeFrom and after g.rangeThrough. In this case, both indexes are 0. + if lastIdx == firstIdx { + g.advanceToNextInterval() + return + } + + lastSampleTime := in[lastIdx-1].Timestamp + // Sample times are stored with a maximum time resolution of one second, so + // we have to add exactly that to target the next chunk on the next op + // iteration. + g.rangeFrom = lastSampleTime.Add(time.Second) + if g.rangeFrom.After(g.rangeThrough) { + g.advanceToNextInterval() + } + return in[firstIdx:lastIdx] +} + +func (g *getValueRangeAtIntervalOp) CurrentTime() *time.Time { + return &g.rangeFrom +} + +func (g *getValueRangeAtIntervalOp) Consumed() bool { + return g.rangeFrom.After(g.through) +} + +func (g *getValueRangeAtIntervalOp) GreedierThan(op op) bool { + panic("not implemented") +} + // Provides a collection of getMetricRangeOperation. type getMetricRangeOperations []*getValuesAlongRangeOp diff --git a/storage/metric/operation_test.go b/storage/metric/operation_test.go index ca0ee774f..e39cbdeb0 100644 --- a/storage/metric/operation_test.go +++ b/storage/metric/operation_test.go @@ -1633,17 +1633,15 @@ func TestGetValuesAtIntervalOp(t *testing.T) { for i, scenario := range scenarios { actual := scenario.op.ExtractSamples(scenario.in) if len(actual) != len(scenario.out) { - t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), scenario.op) - t.Fatalf("%d. expected length %d, got %d", i, len(scenario.out), len(actual)) + t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual) } if len(scenario.in) < 1 { continue } - opTime := scenario.op.CurrentTime() lastExtractedTime := scenario.out[len(scenario.out)-1].Timestamp - if opTime != nil && opTime.Before(lastExtractedTime) { - t.Fatalf("%d. expected op.CurrentTime() to be nil or after current chunk, %v, %v", i, scenario.op.CurrentTime(), scenario.out) + if !scenario.op.Consumed() && scenario.op.CurrentTime().Before(lastExtractedTime) { + t.Fatalf("%d. expected op to be consumed or with CurrentTime() after current chunk, %v, %v", i, scenario.op.CurrentTime(), scenario.out) } for j, out := range scenario.out { @@ -1819,8 +1817,164 @@ func TestGetValuesAlongRangeOp(t *testing.T) { for i, scenario := range scenarios { actual := scenario.op.ExtractSamples(scenario.in) if len(actual) != len(scenario.out) { - t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), scenario.op) - t.Fatalf("%d. expected length %d, got %d", i, len(scenario.out), len(actual)) + t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual) + } + for j, out := range scenario.out { + if out != actual[j] { + t.Fatalf("%d. expected output %v, got %v", i, scenario.out, actual) + } + } + } +} + +func TestGetValueRangeAtIntervalOp(t *testing.T) { + testOp := getValueRangeAtIntervalOp{ + rangeFrom: testInstant.Add(-2 * time.Minute), + rangeThrough: testInstant, + rangeDuration: 2 * time.Minute, + interval: 10 * time.Minute, + through: testInstant.Add(20 * time.Minute), + } + + var scenarios = []struct { + op getValueRangeAtIntervalOp + in model.Values + out model.Values + }{ + // All values before the first range. + { + op: testOp, + in: model.Values{ + { + Timestamp: testInstant.Add(-4 * time.Minute), + Value: 1, + }, + { + Timestamp: testInstant.Add(-3 * time.Minute), + Value: 2, + }, + }, + out: model.Values{}, + }, + // Values starting before first range, ending after last. + { + op: testOp, + in: model.Values{ + { + Timestamp: testInstant.Add(-4 * time.Minute), + Value: 1, + }, + { + Timestamp: testInstant.Add(-3 * time.Minute), + Value: 2, + }, + { + Timestamp: testInstant.Add(-2 * time.Minute), + Value: 3, + }, + { + Timestamp: testInstant.Add(-1 * time.Minute), + Value: 4, + }, + { + Timestamp: testInstant.Add(0 * time.Minute), + Value: 5, + }, + { + Timestamp: testInstant.Add(5 * time.Minute), + Value: 6, + }, + { + Timestamp: testInstant.Add(8 * time.Minute), + Value: 7, + }, + { + Timestamp: testInstant.Add(9 * time.Minute), + Value: 8, + }, + { + Timestamp: testInstant.Add(10 * time.Minute), + Value: 9, + }, + { + Timestamp: testInstant.Add(15 * time.Minute), + Value: 10, + }, + { + Timestamp: testInstant.Add(18 * time.Minute), + Value: 11, + }, + { + Timestamp: testInstant.Add(19 * time.Minute), + Value: 12, + }, + { + Timestamp: testInstant.Add(20 * time.Minute), + Value: 13, + }, + { + Timestamp: testInstant.Add(21 * time.Minute), + Value: 14, + }, + }, + out: model.Values{ + { + Timestamp: testInstant.Add(-2 * time.Minute), + Value: 3, + }, + { + Timestamp: testInstant.Add(-1 * time.Minute), + Value: 4, + }, + { + Timestamp: testInstant.Add(0 * time.Minute), + Value: 5, + }, + { + Timestamp: testInstant.Add(8 * time.Minute), + Value: 7, + }, + { + Timestamp: testInstant.Add(9 * time.Minute), + Value: 8, + }, + { + Timestamp: testInstant.Add(10 * time.Minute), + Value: 9, + }, + { + Timestamp: testInstant.Add(18 * time.Minute), + Value: 11, + }, + { + Timestamp: testInstant.Add(19 * time.Minute), + Value: 12, + }, + { + Timestamp: testInstant.Add(20 * time.Minute), + Value: 13, + }, + }, + }, + // Values starting after last range. + { + op: testOp, + in: model.Values{ + { + Timestamp: testInstant.Add(21 * time.Minute), + Value: 14, + }, + }, + out: model.Values{}, + }, + } + for i, scenario := range scenarios { + actual := model.Values{} + for !scenario.op.Consumed() { + actual = append(actual, scenario.op.ExtractSamples(scenario.in)...) + } + if len(actual) != len(scenario.out) { + t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual) } for j, out := range scenario.out { if !out.Equal(actual[j]) { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index a1bab4d0f..d2105f2b2 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -471,7 +471,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) { currentChunk = currentChunk.TruncateBefore(*(op.CurrentTime())) - for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) { + for !op.Consumed() && !op.CurrentTime().After(targetTime) { out = op.ExtractSamples(Values(currentChunk)) // Append the extracted samples to the materialized view. @@ -482,7 +482,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) { // Throw away standing ops which are finished. filteredOps := ops{} for _, op := range standingOps { - if op.CurrentTime() != nil { + if !op.Consumed() { filteredOps = append(filteredOps, op) } } diff --git a/storage/metric/view.go b/storage/metric/view.go index 80e2e9210..f2e1193e3 100644 --- a/storage/metric/view.go +++ b/storage/metric/view.go @@ -42,15 +42,15 @@ type viewRequestBuilder struct { } // Furnishes a ViewRequestBuilder for remarking what types of queries to perform. -func NewViewRequestBuilder() viewRequestBuilder { - return viewRequestBuilder{ +func NewViewRequestBuilder() *viewRequestBuilder { + return &viewRequestBuilder{ operations: make(map[clientmodel.Fingerprint]ops), } } // Gets for the given Fingerprint either the value at that time if there is an // match or the one or two values adjacent thereto. -func (v viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) { +func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) { ops := v.operations[*fingerprint] ops = append(ops, &getValuesAtTimeOp{ time: time, @@ -61,7 +61,7 @@ func (v viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint // Gets for the given Fingerprint either the value at that interval from From // through Through if there is an match or the one or two values adjacent // for each point. -func (v viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) { +func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) { ops := v.operations[*fingerprint] ops = append(ops, &getValuesAtIntervalOp{ from: from, @@ -71,9 +71,9 @@ func (v viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerp v.operations[*fingerprint] = ops } -// Gets for the given Fingerprint either the values that occur inclusively from -// From through Through. -func (v viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) { +// Gets for the given Fingerprint the values that occur inclusively from From +// through Through. +func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) { ops := v.operations[*fingerprint] ops = append(ops, &getValuesAlongRangeOp{ from: from, @@ -82,16 +82,37 @@ func (v viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, v.operations[*fingerprint] = ops } +// Gets value ranges at intervals for the given Fingerprint: +// +// |----| |----| |----| |----| +// ^ ^ ^ ^ ^ ^ +// | \------------/ \----/ | +// from interval rangeDuration through +func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval, rangeDuration time.Duration) { + ops := v.operations[*fingerprint] + ops = append(ops, &getValueRangeAtIntervalOp{ + rangeFrom: from, + rangeThrough: from.Add(rangeDuration), + rangeDuration: rangeDuration, + interval: interval, + through: through, + }) + v.operations[*fingerprint] = ops +} + // Emits the optimized scans that will occur in the data store. This // effectively resets the ViewRequestBuilder back to a pristine state. -func (v viewRequestBuilder) ScanJobs() (j scanJobs) { +func (v *viewRequestBuilder) ScanJobs() (j scanJobs) { for fingerprint, operations := range v.operations { sort.Sort(startsAtSort{operations}) fpCopy := fingerprint j = append(j, scanJob{ fingerprint: &fpCopy, - operations: optimize(operations), + // BUG: Evaluate whether we need to implement an optimize() working also + // for getValueRangeAtIntervalOp and use it here instead of just passing + // through the list of ops as-is. + operations: operations, }) delete(v.operations, fingerprint)