diff --git a/promql/bench_test.go b/promql/bench_test.go index cd748f81a..4bdddb13e 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -82,6 +82,10 @@ func BenchmarkRangeQuery(b *testing.B) { steps int } cases := []benchCase{ + // Plain retrieval. + { + expr: "a_X", + }, // Simple rate. { expr: "rate(a_X[1m])", diff --git a/promql/engine.go b/promql/engine.go index d20292134..d9b9b5ce3 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -831,6 +831,10 @@ func (ev *evaluator) eval(expr Expr) Value { mat := make(Matrix, 0, len(sel.series)) // Output matrix. offset := durationMilliseconds(sel.Offset) selRange := durationMilliseconds(sel.Range) + stepRange := selRange + if stepRange > ev.interval { + stepRange = ev.interval + } // Reuse objects across steps to save memory allocations. points := getPointSlice(16) inMatrix := make(Matrix, 1) @@ -839,6 +843,7 @@ func (ev *evaluator) eval(expr Expr) Value { // Process all the calls for one time series at a time. it := storage.NewBuffer(selRange) for i, s := range sel.series { + points = points[:0] it.Reset(s.Iterator()) ss := Series{ // For all range vector functions, the only change to the @@ -861,7 +866,7 @@ func (ev *evaluator) eval(expr Expr) Value { maxt := ts - offset mint := maxt - selRange // Evaluate the matrix selector for this series for this step. - points = ev.matrixIterSlice(it, mint, maxt, points[:0]) + points = ev.matrixIterSlice(it, mint, maxt, points) if len(points) == 0 { continue } @@ -873,6 +878,8 @@ func (ev *evaluator) eval(expr Expr) Value { if len(outVec) > 0 { ss.Points = append(ss.Points, Point{V: outVec[0].Point.V, T: ts}) } + // Only buffer stepRange milliseconds from the second step on. + it.ReduceDelta(stepRange) } if len(ss.Points) > 0 { mat = append(mat, ss) @@ -1067,8 +1074,32 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { return matrix } -// matrixIterSlice evaluates a matrix vector for the iterator of one time series. +// matrixIterSlice populates a matrix vector covering the requested range for a +// single time series, with points retrieved from an iterator. +// +// As an optimization, the matrix vector may already contain points of the same +// time series from the evaluation of an earlier step (with lower mint and maxt +// values). Any such points falling before mint are discarded; points that fall +// into the [mint, maxt] range are retained; only points with later timestamps +// are populated from the iterator. func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point { + if len(out) > 0 && out[len(out)-1].T >= mint { + // There is an overlap between previous and current ranges, retain common + // points. In most such cases: + // (a) the overlap is significantly larger than the eval step; and/or + // (b) the number of samples is relatively small. + // so a linear search will be as fast as a binary search. + var drop int + for drop = 0; out[drop].T < mint; drop++ { + } + copy(out, out[drop:]) + out = out[:len(out)-drop] + // Only append points with timestamps after the last timestamp we have. + mint = out[len(out)-1].T + 1 + } else { + out = out[:0] + } + ok := it.Seek(maxt) if !ok { if it.Err() != nil { diff --git a/storage/buffer.go b/storage/buffer.go index b45b55265..55a0c604e 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -19,8 +19,9 @@ import ( // BufferedSeriesIterator wraps an iterator with a look-back buffer. type BufferedSeriesIterator struct { - it SeriesIterator - buf *sampleRing + it SeriesIterator + buf *sampleRing + delta int64 lastTime int64 ok bool @@ -37,22 +38,34 @@ func NewBuffer(delta int64) *BufferedSeriesIterator { // time range of the current element and the duration of delta before. func NewBufferIterator(it SeriesIterator, delta int64) *BufferedSeriesIterator { bit := &BufferedSeriesIterator{ - buf: newSampleRing(delta, 16), + buf: newSampleRing(delta, 16), + delta: delta, } bit.Reset(it) return bit } -// Reset re-uses the buffer with a new iterator. +// Reset re-uses the buffer with a new iterator, resetting the buffered time +// delta to its original value. func (b *BufferedSeriesIterator) Reset(it SeriesIterator) { b.it = it b.lastTime = math.MinInt64 b.ok = true b.buf.reset() + b.buf.delta = b.delta it.Next() } +// ReduceDelta lowers the buffered time delta, for the current SeriesIterator only. +func (b *BufferedSeriesIterator) ReduceDelta(delta int64) bool { + if delta > b.buf.delta { + return false + } + b.buf.delta = delta + return true +} + // PeekBack returns the nth previous element of the iterator. If there is none buffered, // ok is false. func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, ok bool) {