diff --git a/promql/bench_test.go b/promql/bench_test.go index 6818498bf..c6a528f7b 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -186,6 +186,10 @@ func rangeQueryCases() []benchCase { expr: "count({__name__!=\"\",l=\"\"})", steps: 1, }, + // Functions which have special handling inside eval() + { + expr: "timestamp(a_X)", + }, } // X in an expr will be replaced by different metric sizes. diff --git a/promql/engine.go b/promql/engine.go index e2092a800..cafbab4e3 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1387,15 +1387,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { unwrapParenExpr(&arg) vs, ok := arg.(*parser.VectorSelector) if ok { - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { - if vs.Timestamp != nil { - // This is a special case only for "timestamp" since the offset - // needs to be adjusted for every point. - vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond - } - val, ws := ev.vectorSelector(vs, enh.Ts) - return call([]parser.Value{val}, e.Args, enh), ws - }) + return ev.rangeEvalTimestampFunctionOverVectorSelector(vs, call, e) } } @@ -1833,38 +1825,48 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { panic(fmt.Errorf("unhandled expression of type: %T", expr)) } -// vectorSelector evaluates a *parser.VectorSelector expression. -func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) { - ws, err := checkAndExpandSeriesSet(ev.ctx, node) +func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, storage.Warnings) { + ws, err := checkAndExpandSeriesSet(ev.ctx, vs) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } - vec := make(Vector, 0, len(node.Series)) - it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) - var chkIter chunkenc.Iterator - for i, s := range node.Series { - chkIter = s.Iterator(chkIter) - it.Reset(chkIter) - t, f, h, ok := ev.vectorSelectorSingle(it, node, ts) - if ok { - vec = append(vec, Sample{ - Metric: node.Series[i].Labels(), - T: t, - F: f, - H: h, - }) + seriesIterators := make([]*storage.MemoizedSeriesIterator, len(vs.Series)) + for i, s := range vs.Series { + it := s.Iterator(nil) + seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta)) + } - ev.currentSamples++ - ev.samplesStats.IncrementSamplesAtTimestamp(ts, 1) - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { + if vs.Timestamp != nil { + // This is a special case for "timestamp()" when the @ modifier is used, to ensure that + // we return a point for each time step in this case. + // See https://github.com/prometheus/prometheus/issues/8433. + vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond } - } - ev.samplesStats.UpdatePeak(ev.currentSamples) - return vec, ws + vec := make(Vector, 0, len(vs.Series)) + for i, s := range vs.Series { + it := seriesIterators[i] + t, f, h, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) + if ok { + vec = append(vec, Sample{ + Metric: s.Labels(), + T: t, + F: f, + H: h, + }) + + ev.currentSamples++ + ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + } + } + ev.samplesStats.UpdatePeak(ev.currentSamples) + return call([]parser.Value{vec}, e.Args, enh), ws + }) } // vectorSelectorSingle evaluates an instant vector for the iterator of one time series. diff --git a/promql/engine_test.go b/promql/engine_test.go index ca4a022e0..54567d154 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -1977,6 +1977,50 @@ func TestSubquerySelector(t *testing.T) { } } +func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) { + test, err := NewTest(t, ` +load 1m + metric 0+1x1000 +`) + require.NoError(t, err) + defer test.Close() + + err = test.Run() + require.NoError(t, err) + + query := "timestamp(metric)" + start := time.Unix(0, 0) + end := time.Unix(61, 0) + interval := time.Second + + // We expect the value to be 0 for t=0s to t=59s (inclusive), then 60 for t=60s and t=61s. + expectedPoints := []FPoint{} + + for t := 0; t <= 59; t++ { + expectedPoints = append(expectedPoints, FPoint{F: 0, T: int64(t * 1000)}) + } + + expectedPoints = append( + expectedPoints, + FPoint{F: 60, T: 60_000}, + FPoint{F: 60, T: 61_000}, + ) + + expectedResult := Matrix{ + Series{ + Floats: expectedPoints, + Metric: labels.EmptyLabels(), + }, + } + + qry, err := test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, query, start, end, interval) + require.NoError(t, err) + + res := qry.Exec(test.Context()) + require.NoError(t, res.Err) + require.Equal(t, expectedResult, res.Value) +} + type FakeQueryLogger struct { closed bool logs []interface{}