Merge pull request #316 from prometheus/refactor/range-at-interval-op

Implement getValueRangeAtIntervalOp for faster range queries.
This commit is contained in:
juliusv 2013-06-26 09:13:17 -07:00
commit 0a69119f7b
5 changed files with 297 additions and 35 deletions

View File

@ -132,9 +132,10 @@ func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interva
requestBuildTimer := queryStats.GetTimer(stats.ViewRequestBuildTime).Start()
viewBuilder := metric.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges {
// TODO: we should support GetMetricRangeAtInterval() or similar ops in the view builder.
for t := start; t.Before(end); t = t.Add(interval) {
viewBuilder.GetMetricRange(&fingerprint, t.Add(-rangeDuration), t)
if interval < rangeDuration {
viewBuilder.GetMetricRange(&fingerprint, start.Add(-rangeDuration), end)
} else {
viewBuilder.GetMetricRangeAtInterval(&fingerprint, start.Add(-rangeDuration), end, interval, rangeDuration)
}
}
for fingerprint := range analyzer.IntervalRanges {

View File

@ -25,7 +25,9 @@ type op interface {
// The time at which this operation starts.
StartsAt() time.Time
// Extract samples from stream of values and advance operation time.
ExtractSamples(in Values) (out Values)
ExtractSamples(Values) Values
// Return whether the operator has consumed all data it needs.
Consumed() bool
// Get current operation time or nil if no subsequent work associated with
// this operator remains.
CurrentTime() *time.Time
@ -124,11 +126,12 @@ func extractValuesAroundTime(t time.Time, in Values) (out Values) {
return
}
func (g getValuesAtTimeOp) CurrentTime() (currentTime *time.Time) {
if !g.consumed {
currentTime = &g.time
}
return
func (g getValuesAtTimeOp) CurrentTime() *time.Time {
return &g.time
}
func (g getValuesAtTimeOp) Consumed() bool {
return g.consumed
}
// Encapsulates getting values at a given interval over a duration.
@ -173,13 +176,14 @@ func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) {
return
}
func (g *getValuesAtIntervalOp) CurrentTime() (currentTime *time.Time) {
if g.from.After(g.through) {
return
}
func (g *getValuesAtIntervalOp) CurrentTime() *time.Time {
return &g.from
}
func (g *getValuesAtIntervalOp) Consumed() bool {
return g.from.After(g.through)
}
func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) {
switch o := op.(type) {
case *getValuesAtTimeOp:
@ -193,6 +197,7 @@ func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) {
return
}
// Encapsulates getting all values in a given range.
type getValuesAlongRangeOp struct {
from time.Time
through time.Time
@ -243,13 +248,14 @@ func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) {
return in[firstIdx:lastIdx]
}
func (g *getValuesAlongRangeOp) CurrentTime() (currentTime *time.Time) {
if g.from.After(g.through) {
return
}
func (g *getValuesAlongRangeOp) CurrentTime() *time.Time {
return &g.from
}
func (g *getValuesAlongRangeOp) Consumed() bool {
return g.from.After(g.through)
}
func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) {
switch o := op.(type) {
case *getValuesAtTimeOp:
@ -263,6 +269,86 @@ func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) {
return
}
// Encapsulates getting all values from ranges along intervals.
//
// Works just like getValuesAlongRangeOp, but when from > through, through is
// incremented by interval and from is reset to through-rangeDuration. Returns
// current time nil when from > totalThrough.
type getValueRangeAtIntervalOp struct {
rangeFrom time.Time
rangeThrough time.Time
rangeDuration time.Duration
interval time.Duration
through time.Time
}
func (o *getValueRangeAtIntervalOp) String() string {
return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", o.rangeDuration, o.rangeFrom, o.interval, o.through)
}
func (g *getValueRangeAtIntervalOp) StartsAt() time.Time {
return g.rangeFrom
}
func (g *getValueRangeAtIntervalOp) Through() time.Time {
panic("not implemented")
}
func (g *getValueRangeAtIntervalOp) advanceToNextInterval() {
g.rangeThrough = g.rangeThrough.Add(g.interval)
g.rangeFrom = g.rangeThrough.Add(-g.rangeDuration)
}
func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) {
if len(in) == 0 {
return
}
// Find the first sample where time >= g.from.
firstIdx := sort.Search(len(in), func(i int) bool {
return !in[i].Timestamp.Before(g.rangeFrom)
})
if firstIdx == len(in) {
// No samples at or after operator start time. This can only happen if we
// try applying the operator to a time after the last recorded sample. In
// this case, we're finished.
g.rangeFrom = g.through.Add(1)
return
}
// Find the first sample where time > g.rangeThrough.
lastIdx := sort.Search(len(in), func(i int) bool {
return in[i].Timestamp.After(g.rangeThrough)
})
// This only happens when there is only one sample and it is both after
// g.rangeFrom and after g.rangeThrough. In this case, both indexes are 0.
if lastIdx == firstIdx {
g.advanceToNextInterval()
return
}
lastSampleTime := in[lastIdx-1].Timestamp
// Sample times are stored with a maximum time resolution of one second, so
// we have to add exactly that to target the next chunk on the next op
// iteration.
g.rangeFrom = lastSampleTime.Add(time.Second)
if g.rangeFrom.After(g.rangeThrough) {
g.advanceToNextInterval()
}
return in[firstIdx:lastIdx]
}
func (g *getValueRangeAtIntervalOp) CurrentTime() *time.Time {
return &g.rangeFrom
}
func (g *getValueRangeAtIntervalOp) Consumed() bool {
return g.rangeFrom.After(g.through)
}
func (g *getValueRangeAtIntervalOp) GreedierThan(op op) bool {
panic("not implemented")
}
// Provides a collection of getMetricRangeOperation.
type getMetricRangeOperations []*getValuesAlongRangeOp

View File

@ -1633,17 +1633,15 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
for i, scenario := range scenarios {
actual := scenario.op.ExtractSamples(scenario.in)
if len(actual) != len(scenario.out) {
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), scenario.op)
t.Fatalf("%d. expected length %d, got %d", i, len(scenario.out), len(actual))
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual)
}
if len(scenario.in) < 1 {
continue
}
opTime := scenario.op.CurrentTime()
lastExtractedTime := scenario.out[len(scenario.out)-1].Timestamp
if opTime != nil && opTime.Before(lastExtractedTime) {
t.Fatalf("%d. expected op.CurrentTime() to be nil or after current chunk, %v, %v", i, scenario.op.CurrentTime(), scenario.out)
if !scenario.op.Consumed() && scenario.op.CurrentTime().Before(lastExtractedTime) {
t.Fatalf("%d. expected op to be consumed or with CurrentTime() after current chunk, %v, %v", i, scenario.op.CurrentTime(), scenario.out)
}
for j, out := range scenario.out {
@ -1819,8 +1817,164 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
for i, scenario := range scenarios {
actual := scenario.op.ExtractSamples(scenario.in)
if len(actual) != len(scenario.out) {
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), scenario.op)
t.Fatalf("%d. expected length %d, got %d", i, len(scenario.out), len(actual))
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual)
}
for j, out := range scenario.out {
if out != actual[j] {
t.Fatalf("%d. expected output %v, got %v", i, scenario.out, actual)
}
}
}
}
func TestGetValueRangeAtIntervalOp(t *testing.T) {
testOp := getValueRangeAtIntervalOp{
rangeFrom: testInstant.Add(-2 * time.Minute),
rangeThrough: testInstant,
rangeDuration: 2 * time.Minute,
interval: 10 * time.Minute,
through: testInstant.Add(20 * time.Minute),
}
var scenarios = []struct {
op getValueRangeAtIntervalOp
in model.Values
out model.Values
}{
// All values before the first range.
{
op: testOp,
in: model.Values{
{
Timestamp: testInstant.Add(-4 * time.Minute),
Value: 1,
},
{
Timestamp: testInstant.Add(-3 * time.Minute),
Value: 2,
},
},
out: model.Values{},
},
// Values starting before first range, ending after last.
{
op: testOp,
in: model.Values{
{
Timestamp: testInstant.Add(-4 * time.Minute),
Value: 1,
},
{
Timestamp: testInstant.Add(-3 * time.Minute),
Value: 2,
},
{
Timestamp: testInstant.Add(-2 * time.Minute),
Value: 3,
},
{
Timestamp: testInstant.Add(-1 * time.Minute),
Value: 4,
},
{
Timestamp: testInstant.Add(0 * time.Minute),
Value: 5,
},
{
Timestamp: testInstant.Add(5 * time.Minute),
Value: 6,
},
{
Timestamp: testInstant.Add(8 * time.Minute),
Value: 7,
},
{
Timestamp: testInstant.Add(9 * time.Minute),
Value: 8,
},
{
Timestamp: testInstant.Add(10 * time.Minute),
Value: 9,
},
{
Timestamp: testInstant.Add(15 * time.Minute),
Value: 10,
},
{
Timestamp: testInstant.Add(18 * time.Minute),
Value: 11,
},
{
Timestamp: testInstant.Add(19 * time.Minute),
Value: 12,
},
{
Timestamp: testInstant.Add(20 * time.Minute),
Value: 13,
},
{
Timestamp: testInstant.Add(21 * time.Minute),
Value: 14,
},
},
out: model.Values{
{
Timestamp: testInstant.Add(-2 * time.Minute),
Value: 3,
},
{
Timestamp: testInstant.Add(-1 * time.Minute),
Value: 4,
},
{
Timestamp: testInstant.Add(0 * time.Minute),
Value: 5,
},
{
Timestamp: testInstant.Add(8 * time.Minute),
Value: 7,
},
{
Timestamp: testInstant.Add(9 * time.Minute),
Value: 8,
},
{
Timestamp: testInstant.Add(10 * time.Minute),
Value: 9,
},
{
Timestamp: testInstant.Add(18 * time.Minute),
Value: 11,
},
{
Timestamp: testInstant.Add(19 * time.Minute),
Value: 12,
},
{
Timestamp: testInstant.Add(20 * time.Minute),
Value: 13,
},
},
},
// Values starting after last range.
{
op: testOp,
in: model.Values{
{
Timestamp: testInstant.Add(21 * time.Minute),
Value: 14,
},
},
out: model.Values{},
},
}
for i, scenario := range scenarios {
actual := model.Values{}
for !scenario.op.Consumed() {
actual = append(actual, scenario.op.ExtractSamples(scenario.in)...)
}
if len(actual) != len(scenario.out) {
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual)
}
for j, out := range scenario.out {
if !out.Equal(actual[j]) {

View File

@ -471,7 +471,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
currentChunk = currentChunk.TruncateBefore(*(op.CurrentTime()))
for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) {
for !op.Consumed() && !op.CurrentTime().After(targetTime) {
out = op.ExtractSamples(Values(currentChunk))
// Append the extracted samples to the materialized view.
@ -482,7 +482,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
// Throw away standing ops which are finished.
filteredOps := ops{}
for _, op := range standingOps {
if op.CurrentTime() != nil {
if !op.Consumed() {
filteredOps = append(filteredOps, op)
}
}

View File

@ -42,15 +42,15 @@ type viewRequestBuilder struct {
}
// Furnishes a ViewRequestBuilder for remarking what types of queries to perform.
func NewViewRequestBuilder() viewRequestBuilder {
return viewRequestBuilder{
func NewViewRequestBuilder() *viewRequestBuilder {
return &viewRequestBuilder{
operations: make(map[clientmodel.Fingerprint]ops),
}
}
// Gets for the given Fingerprint either the value at that time if there is an
// match or the one or two values adjacent thereto.
func (v viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) {
func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValuesAtTimeOp{
time: time,
@ -61,7 +61,7 @@ func (v viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint
// Gets for the given Fingerprint either the value at that interval from From
// through Through if there is an match or the one or two values adjacent
// for each point.
func (v viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) {
func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValuesAtIntervalOp{
from: from,
@ -71,9 +71,9 @@ func (v viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerp
v.operations[*fingerprint] = ops
}
// Gets for the given Fingerprint either the values that occur inclusively from
// From through Through.
func (v viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) {
// Gets for the given Fingerprint the values that occur inclusively from From
// through Through.
func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValuesAlongRangeOp{
from: from,
@ -82,16 +82,37 @@ func (v viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint,
v.operations[*fingerprint] = ops
}
// Gets value ranges at intervals for the given Fingerprint:
//
// |----| |----| |----| |----|
// ^ ^ ^ ^ ^ ^
// | \------------/ \----/ |
// from interval rangeDuration through
func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval, rangeDuration time.Duration) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValueRangeAtIntervalOp{
rangeFrom: from,
rangeThrough: from.Add(rangeDuration),
rangeDuration: rangeDuration,
interval: interval,
through: through,
})
v.operations[*fingerprint] = ops
}
// Emits the optimized scans that will occur in the data store. This
// effectively resets the ViewRequestBuilder back to a pristine state.
func (v viewRequestBuilder) ScanJobs() (j scanJobs) {
func (v *viewRequestBuilder) ScanJobs() (j scanJobs) {
for fingerprint, operations := range v.operations {
sort.Sort(startsAtSort{operations})
fpCopy := fingerprint
j = append(j, scanJob{
fingerprint: &fpCopy,
operations: optimize(operations),
// BUG: Evaluate whether we need to implement an optimize() working also
// for getValueRangeAtIntervalOp and use it here instead of just passing
// through the list of ops as-is.
operations: operations,
})
delete(v.operations, fingerprint)