query/query_range should return eval timestamp
Query and query_range should return the timestamp at which an evaluation is performed, not the timestamp of the data. This is as that's what query range asked for, and we need to keep query consistent with that. Query for a matrix remains unchanged, returning the literal matrix.
This commit is contained in:
parent
517b81f927
commit
fcc88f0e1e
|
@ -363,8 +363,9 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start()
|
||||
// Instant evaluation.
|
||||
if s.Start == s.End && s.Interval == 0 {
|
||||
start := timeMilliseconds(s.Start)
|
||||
evaluator := &evaluator{
|
||||
Timestamp: timeMilliseconds(s.Start),
|
||||
Timestamp: start,
|
||||
ctx: ctx,
|
||||
}
|
||||
val, err := evaluator.Eval(s.Expr)
|
||||
|
@ -374,6 +375,16 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
|
||||
evalTimer.Stop()
|
||||
queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds())
|
||||
// Point might have a different timestamp, force it to the evaluation
|
||||
// timestamp as that is when we ran the evaluation.
|
||||
switch v := val.(type) {
|
||||
case Scalar:
|
||||
v.T = start
|
||||
case Vector:
|
||||
for i := range v {
|
||||
v[i].Point.T = start
|
||||
}
|
||||
}
|
||||
|
||||
return val, nil
|
||||
}
|
||||
|
@ -387,8 +398,9 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
return nil, err
|
||||
}
|
||||
|
||||
t := timeMilliseconds(ts)
|
||||
evaluator := &evaluator{
|
||||
Timestamp: timeMilliseconds(ts),
|
||||
Timestamp: t,
|
||||
ctx: ctx,
|
||||
}
|
||||
val, err := evaluator.Eval(s.Expr)
|
||||
|
@ -405,7 +417,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
ss = Series{Points: make([]Point, 0, numSteps)}
|
||||
Seriess[0] = ss
|
||||
}
|
||||
ss.Points = append(ss.Points, Point(v))
|
||||
ss.Points = append(ss.Points, Point{V: v.V, T: t})
|
||||
Seriess[0] = ss
|
||||
case Vector:
|
||||
for _, sample := range v {
|
||||
|
@ -418,6 +430,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
}
|
||||
Seriess[h] = ss
|
||||
}
|
||||
sample.Point.T = t
|
||||
ss.Points = append(ss.Points, sample.Point)
|
||||
Seriess[h] = ss
|
||||
}
|
||||
|
|
|
@ -15,10 +15,13 @@ package promql
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
)
|
||||
|
||||
func TestQueryConcurrency(t *testing.T) {
|
||||
|
@ -194,6 +197,93 @@ func TestEngineShutdown(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEngineEvalStmtTimestamps(t *testing.T) {
|
||||
test, err := NewTest(t, `
|
||||
load 10s
|
||||
metric 1 2
|
||||
`)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating test: %q", err)
|
||||
}
|
||||
err = test.Run()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error initializing test: %q", err)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
Query string
|
||||
Result Value
|
||||
Start time.Time
|
||||
End time.Time
|
||||
Interval time.Duration
|
||||
}{
|
||||
// Instant queries.
|
||||
{
|
||||
Query: "1",
|
||||
Result: Scalar{V: 1, T: 1000},
|
||||
Start: time.Unix(1, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric",
|
||||
Result: Vector{
|
||||
Sample{Point: Point{V: 1, T: 1000},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
Start: time.Unix(1, 0),
|
||||
},
|
||||
{
|
||||
Query: "metric[20s]",
|
||||
Result: Matrix{Series{
|
||||
Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
Start: time.Unix(10, 0),
|
||||
},
|
||||
// Range queries.
|
||||
{
|
||||
Query: "1",
|
||||
Result: Matrix{Series{
|
||||
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}},
|
||||
Metric: labels.FromStrings()},
|
||||
},
|
||||
Start: time.Unix(0, 0),
|
||||
End: time.Unix(2, 0),
|
||||
Interval: time.Second,
|
||||
},
|
||||
{
|
||||
Query: "metric",
|
||||
Result: Matrix{Series{
|
||||
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 2, T: 2000}},
|
||||
Metric: labels.FromStrings("__name__", "metric")},
|
||||
},
|
||||
Start: time.Unix(0, 0),
|
||||
End: time.Unix(2, 0),
|
||||
Interval: time.Second,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
var err error
|
||||
var qry Query
|
||||
if c.Interval == 0 {
|
||||
qry, err = test.QueryEngine().NewInstantQuery(c.Query, c.Start)
|
||||
} else {
|
||||
qry, err = test.QueryEngine().NewRangeQuery(c.Query, c.Start, c.End, c.Interval)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating query: %q", err)
|
||||
}
|
||||
res := qry.Exec(test.Context())
|
||||
if res.Err != nil {
|
||||
t.Fatalf("unexpected error running query: %q", res.Err)
|
||||
}
|
||||
if !reflect.DeepEqual(res.Value, c.Result) {
|
||||
t.Fatalf("unexpected result for query %q: got %q wanted %q", c.Query, res.Value.String(), c.Result.String())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRecoverEvaluatorRuntime(t *testing.T) {
|
||||
var ev *evaluator
|
||||
var err error
|
||||
|
|
Loading…
Reference in New Issue