mirror of
https://github.com/prometheus/prometheus
synced 2025-03-04 20:49:51 +00:00
Reuse (copy) overlapping matrix samples between range evaluation steps (#4315)
* Reuse (copy) overlapping matrix samples between range evaluation steps. Signed-off-by: Alin Sinpalean <alin.sinpalean@gmail.com>
This commit is contained in:
parent
0be25f92e2
commit
372e7652b7
@ -82,6 +82,10 @@ func BenchmarkRangeQuery(b *testing.B) {
|
|||||||
steps int
|
steps int
|
||||||
}
|
}
|
||||||
cases := []benchCase{
|
cases := []benchCase{
|
||||||
|
// Plain retrieval.
|
||||||
|
{
|
||||||
|
expr: "a_X",
|
||||||
|
},
|
||||||
// Simple rate.
|
// Simple rate.
|
||||||
{
|
{
|
||||||
expr: "rate(a_X[1m])",
|
expr: "rate(a_X[1m])",
|
||||||
|
@ -831,6 +831,10 @@ func (ev *evaluator) eval(expr Expr) Value {
|
|||||||
mat := make(Matrix, 0, len(sel.series)) // Output matrix.
|
mat := make(Matrix, 0, len(sel.series)) // Output matrix.
|
||||||
offset := durationMilliseconds(sel.Offset)
|
offset := durationMilliseconds(sel.Offset)
|
||||||
selRange := durationMilliseconds(sel.Range)
|
selRange := durationMilliseconds(sel.Range)
|
||||||
|
stepRange := selRange
|
||||||
|
if stepRange > ev.interval {
|
||||||
|
stepRange = ev.interval
|
||||||
|
}
|
||||||
// Reuse objects across steps to save memory allocations.
|
// Reuse objects across steps to save memory allocations.
|
||||||
points := getPointSlice(16)
|
points := getPointSlice(16)
|
||||||
inMatrix := make(Matrix, 1)
|
inMatrix := make(Matrix, 1)
|
||||||
@ -839,6 +843,7 @@ func (ev *evaluator) eval(expr Expr) Value {
|
|||||||
// Process all the calls for one time series at a time.
|
// Process all the calls for one time series at a time.
|
||||||
it := storage.NewBuffer(selRange)
|
it := storage.NewBuffer(selRange)
|
||||||
for i, s := range sel.series {
|
for i, s := range sel.series {
|
||||||
|
points = points[:0]
|
||||||
it.Reset(s.Iterator())
|
it.Reset(s.Iterator())
|
||||||
ss := Series{
|
ss := Series{
|
||||||
// For all range vector functions, the only change to the
|
// For all range vector functions, the only change to the
|
||||||
@ -861,7 +866,7 @@ func (ev *evaluator) eval(expr Expr) Value {
|
|||||||
maxt := ts - offset
|
maxt := ts - offset
|
||||||
mint := maxt - selRange
|
mint := maxt - selRange
|
||||||
// Evaluate the matrix selector for this series for this step.
|
// Evaluate the matrix selector for this series for this step.
|
||||||
points = ev.matrixIterSlice(it, mint, maxt, points[:0])
|
points = ev.matrixIterSlice(it, mint, maxt, points)
|
||||||
if len(points) == 0 {
|
if len(points) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -873,6 +878,8 @@ func (ev *evaluator) eval(expr Expr) Value {
|
|||||||
if len(outVec) > 0 {
|
if len(outVec) > 0 {
|
||||||
ss.Points = append(ss.Points, Point{V: outVec[0].Point.V, T: ts})
|
ss.Points = append(ss.Points, Point{V: outVec[0].Point.V, T: ts})
|
||||||
}
|
}
|
||||||
|
// Only buffer stepRange milliseconds from the second step on.
|
||||||
|
it.ReduceDelta(stepRange)
|
||||||
}
|
}
|
||||||
if len(ss.Points) > 0 {
|
if len(ss.Points) > 0 {
|
||||||
mat = append(mat, ss)
|
mat = append(mat, ss)
|
||||||
@ -1067,8 +1074,32 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
|
|||||||
return matrix
|
return matrix
|
||||||
}
|
}
|
||||||
|
|
||||||
// matrixIterSlice evaluates a matrix vector for the iterator of one time series.
|
// matrixIterSlice populates a matrix vector covering the requested range for a
|
||||||
|
// single time series, with points retrieved from an iterator.
|
||||||
|
//
|
||||||
|
// As an optimization, the matrix vector may already contain points of the same
|
||||||
|
// time series from the evaluation of an earlier step (with lower mint and maxt
|
||||||
|
// values). Any such points falling before mint are discarded; points that fall
|
||||||
|
// into the [mint, maxt] range are retained; only points with later timestamps
|
||||||
|
// are populated from the iterator.
|
||||||
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point {
|
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point {
|
||||||
|
if len(out) > 0 && out[len(out)-1].T >= mint {
|
||||||
|
// There is an overlap between previous and current ranges, retain common
|
||||||
|
// points. In most such cases:
|
||||||
|
// (a) the overlap is significantly larger than the eval step; and/or
|
||||||
|
// (b) the number of samples is relatively small.
|
||||||
|
// so a linear search will be as fast as a binary search.
|
||||||
|
var drop int
|
||||||
|
for drop = 0; out[drop].T < mint; drop++ {
|
||||||
|
}
|
||||||
|
copy(out, out[drop:])
|
||||||
|
out = out[:len(out)-drop]
|
||||||
|
// Only append points with timestamps after the last timestamp we have.
|
||||||
|
mint = out[len(out)-1].T + 1
|
||||||
|
} else {
|
||||||
|
out = out[:0]
|
||||||
|
}
|
||||||
|
|
||||||
ok := it.Seek(maxt)
|
ok := it.Seek(maxt)
|
||||||
if !ok {
|
if !ok {
|
||||||
if it.Err() != nil {
|
if it.Err() != nil {
|
||||||
|
@ -19,8 +19,9 @@ import (
|
|||||||
|
|
||||||
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
|
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
|
||||||
type BufferedSeriesIterator struct {
|
type BufferedSeriesIterator struct {
|
||||||
it SeriesIterator
|
it SeriesIterator
|
||||||
buf *sampleRing
|
buf *sampleRing
|
||||||
|
delta int64
|
||||||
|
|
||||||
lastTime int64
|
lastTime int64
|
||||||
ok bool
|
ok bool
|
||||||
@ -37,22 +38,34 @@ func NewBuffer(delta int64) *BufferedSeriesIterator {
|
|||||||
// time range of the current element and the duration of delta before.
|
// time range of the current element and the duration of delta before.
|
||||||
func NewBufferIterator(it SeriesIterator, delta int64) *BufferedSeriesIterator {
|
func NewBufferIterator(it SeriesIterator, delta int64) *BufferedSeriesIterator {
|
||||||
bit := &BufferedSeriesIterator{
|
bit := &BufferedSeriesIterator{
|
||||||
buf: newSampleRing(delta, 16),
|
buf: newSampleRing(delta, 16),
|
||||||
|
delta: delta,
|
||||||
}
|
}
|
||||||
bit.Reset(it)
|
bit.Reset(it)
|
||||||
|
|
||||||
return bit
|
return bit
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset re-uses the buffer with a new iterator.
|
// Reset re-uses the buffer with a new iterator, resetting the buffered time
|
||||||
|
// delta to its original value.
|
||||||
func (b *BufferedSeriesIterator) Reset(it SeriesIterator) {
|
func (b *BufferedSeriesIterator) Reset(it SeriesIterator) {
|
||||||
b.it = it
|
b.it = it
|
||||||
b.lastTime = math.MinInt64
|
b.lastTime = math.MinInt64
|
||||||
b.ok = true
|
b.ok = true
|
||||||
b.buf.reset()
|
b.buf.reset()
|
||||||
|
b.buf.delta = b.delta
|
||||||
it.Next()
|
it.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReduceDelta lowers the buffered time delta, for the current SeriesIterator only.
|
||||||
|
func (b *BufferedSeriesIterator) ReduceDelta(delta int64) bool {
|
||||||
|
if delta > b.buf.delta {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
b.buf.delta = delta
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// PeekBack returns the nth previous element of the iterator. If there is none buffered,
|
// PeekBack returns the nth previous element of the iterator. If there is none buffered,
|
||||||
// ok is false.
|
// ok is false.
|
||||||
func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, ok bool) {
|
func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, ok bool) {
|
||||||
|
Loading…
Reference in New Issue
Block a user