mirror of
https://github.com/prometheus/prometheus
synced 2025-01-16 03:41:50 +00:00
9adad8ad30
Since we use ActiveQueryTracker to check for concurrency in
d992c36b3a
it does not make sense to keep
the MaxConcurrent value as an option of the PromQL engine.
This pull request removes it from the PromQL engine options, sets the
max concurrent metric to -1 if there is no active query tracker, and use
the value of the active query tracker otherwise.
It removes dead code and also will inform people who import the promql
package that we made that change, as it breaks the EngineOpts struct.
Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
1251 lines
30 KiB
Go
1251 lines
30 KiB
Go
// Copyright 2016 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package promql
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io/ioutil"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/util/testutil"
|
|
)
|
|
|
|
func TestQueryConcurrency(t *testing.T) {
|
|
maxConcurrency := 10
|
|
|
|
dir, err := ioutil.TempDir("", "test_concurrency")
|
|
testutil.Ok(t, err)
|
|
defer os.RemoveAll(dir)
|
|
queryTracker := NewActiveQueryTracker(dir, maxConcurrency, nil)
|
|
|
|
opts := EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: 10,
|
|
Timeout: 100 * time.Second,
|
|
ActiveQueryTracker: queryTracker,
|
|
}
|
|
|
|
engine := NewEngine(opts)
|
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
|
defer cancelCtx()
|
|
|
|
block := make(chan struct{})
|
|
processing := make(chan struct{})
|
|
|
|
f := func(context.Context) error {
|
|
processing <- struct{}{}
|
|
<-block
|
|
return nil
|
|
}
|
|
|
|
for i := 0; i < maxConcurrency; i++ {
|
|
q := engine.newTestQuery(f)
|
|
go q.Exec(ctx)
|
|
select {
|
|
case <-processing:
|
|
// Expected.
|
|
case <-time.After(20 * time.Millisecond):
|
|
t.Fatalf("Query within concurrency threshold not being executed")
|
|
}
|
|
}
|
|
|
|
q := engine.newTestQuery(f)
|
|
go q.Exec(ctx)
|
|
|
|
select {
|
|
case <-processing:
|
|
t.Fatalf("Query above concurrency threshold being executed")
|
|
case <-time.After(20 * time.Millisecond):
|
|
// Expected.
|
|
}
|
|
|
|
// Terminate a running query.
|
|
block <- struct{}{}
|
|
|
|
select {
|
|
case <-processing:
|
|
// Expected.
|
|
case <-time.After(20 * time.Millisecond):
|
|
t.Fatalf("Query within concurrency threshold not being executed")
|
|
}
|
|
|
|
// Terminate remaining queries.
|
|
for i := 0; i < maxConcurrency; i++ {
|
|
block <- struct{}{}
|
|
}
|
|
}
|
|
|
|
func TestQueryTimeout(t *testing.T) {
|
|
opts := EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: 10,
|
|
Timeout: 5 * time.Millisecond,
|
|
}
|
|
engine := NewEngine(opts)
|
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
|
defer cancelCtx()
|
|
|
|
query := engine.newTestQuery(func(ctx context.Context) error {
|
|
time.Sleep(100 * time.Millisecond)
|
|
return contextDone(ctx, "test statement execution")
|
|
})
|
|
|
|
res := query.Exec(ctx)
|
|
testutil.NotOk(t, res.Err, "expected timeout error but got none")
|
|
|
|
var e ErrQueryTimeout
|
|
// TODO: when circleci-windows moves to go 1.13:
|
|
// testutil.Assert(t, errors.As(res.Err, &e), "expected timeout error but got: %s", res.Err)
|
|
testutil.Assert(t, strings.HasPrefix(res.Err.Error(), e.Error()), "expected timeout error but got: %s", res.Err)
|
|
}
|
|
|
|
const errQueryCanceled = ErrQueryCanceled("test statement execution")
|
|
|
|
func TestQueryCancel(t *testing.T) {
|
|
opts := EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: 10,
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
engine := NewEngine(opts)
|
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
|
defer cancelCtx()
|
|
|
|
// Cancel a running query before it completes.
|
|
block := make(chan struct{})
|
|
processing := make(chan struct{})
|
|
|
|
query1 := engine.newTestQuery(func(ctx context.Context) error {
|
|
processing <- struct{}{}
|
|
<-block
|
|
return contextDone(ctx, "test statement execution")
|
|
})
|
|
|
|
var res *Result
|
|
|
|
go func() {
|
|
res = query1.Exec(ctx)
|
|
processing <- struct{}{}
|
|
}()
|
|
|
|
<-processing
|
|
query1.Cancel()
|
|
block <- struct{}{}
|
|
<-processing
|
|
|
|
testutil.NotOk(t, res.Err, "expected cancellation error for query1 but got none")
|
|
testutil.Equals(t, errQueryCanceled, res.Err)
|
|
|
|
// Canceling a query before starting it must have no effect.
|
|
query2 := engine.newTestQuery(func(ctx context.Context) error {
|
|
return contextDone(ctx, "test statement execution")
|
|
})
|
|
|
|
query2.Cancel()
|
|
res = query2.Exec(ctx)
|
|
testutil.Ok(t, res.Err)
|
|
}
|
|
|
|
// errQuerier implements storage.Querier which always returns error.
|
|
type errQuerier struct {
|
|
err error
|
|
}
|
|
|
|
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
return errSeriesSet{err: q.err}, nil, q.err
|
|
}
|
|
func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
return errSeriesSet{err: q.err}, nil, q.err
|
|
}
|
|
func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil }
|
|
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
|
|
func (*errQuerier) Close() error { return nil }
|
|
|
|
// errSeriesSet implements storage.SeriesSet which always returns error.
|
|
type errSeriesSet struct {
|
|
err error
|
|
}
|
|
|
|
func (errSeriesSet) Next() bool { return false }
|
|
func (errSeriesSet) At() storage.Series { return nil }
|
|
func (e errSeriesSet) Err() error { return e.err }
|
|
|
|
func TestQueryError(t *testing.T) {
|
|
opts := EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: 10,
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
engine := NewEngine(opts)
|
|
errStorage := ErrStorage{errors.New("storage error")}
|
|
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
|
return &errQuerier{err: errStorage}, nil
|
|
})
|
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
|
defer cancelCtx()
|
|
|
|
vectorQuery, err := engine.NewInstantQuery(queryable, "foo", time.Unix(1, 0))
|
|
testutil.Ok(t, err)
|
|
|
|
res := vectorQuery.Exec(ctx)
|
|
testutil.NotOk(t, res.Err, "expected error on failed select but got none")
|
|
testutil.Equals(t, errStorage, res.Err)
|
|
|
|
matrixQuery, err := engine.NewInstantQuery(queryable, "foo[1m]", time.Unix(1, 0))
|
|
testutil.Ok(t, err)
|
|
|
|
res = matrixQuery.Exec(ctx)
|
|
testutil.NotOk(t, res.Err, "expected error on failed select but got none")
|
|
testutil.Equals(t, errStorage, res.Err)
|
|
}
|
|
|
|
// paramCheckerQuerier implements storage.Querier which checks the start and end times
|
|
// in params.
|
|
type paramCheckerQuerier struct {
|
|
start int64
|
|
end int64
|
|
grouping []string
|
|
by bool
|
|
selRange int64
|
|
function string
|
|
|
|
t *testing.T
|
|
}
|
|
|
|
func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
return q.SelectSorted(sp, m...)
|
|
}
|
|
func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
testutil.Equals(q.t, q.start, sp.Start)
|
|
testutil.Equals(q.t, q.end, sp.End)
|
|
testutil.Equals(q.t, q.grouping, sp.Grouping)
|
|
testutil.Equals(q.t, q.by, sp.By)
|
|
testutil.Equals(q.t, q.selRange, sp.Range)
|
|
testutil.Equals(q.t, q.function, sp.Func)
|
|
|
|
return errSeriesSet{err: nil}, nil, nil
|
|
}
|
|
func (*paramCheckerQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
|
|
return nil, nil, nil
|
|
}
|
|
func (*paramCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
|
|
func (*paramCheckerQuerier) Close() error { return nil }
|
|
|
|
func TestParamsSetCorrectly(t *testing.T) {
|
|
opts := EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: 10,
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
|
|
// Set the lookback to be smaller and reset at the end.
|
|
currLookback := LookbackDelta
|
|
LookbackDelta = 5 * time.Second
|
|
defer func() {
|
|
LookbackDelta = currLookback
|
|
}()
|
|
|
|
cases := []struct {
|
|
query string
|
|
|
|
// All times are in seconds.
|
|
start int64
|
|
end int64
|
|
|
|
paramStart int64
|
|
paramEnd int64
|
|
|
|
paramGrouping []string
|
|
paramBy bool
|
|
paramRange int64
|
|
paramFunc string
|
|
}{{
|
|
query: "foo",
|
|
start: 10,
|
|
|
|
paramStart: 5,
|
|
paramEnd: 10,
|
|
}, {
|
|
query: "foo[2m]",
|
|
start: 200,
|
|
|
|
paramStart: 80, // 200 - 120
|
|
paramEnd: 200,
|
|
paramRange: 120000,
|
|
}, {
|
|
query: "foo[2m] offset 2m",
|
|
start: 300,
|
|
|
|
paramStart: 60,
|
|
paramEnd: 180,
|
|
paramRange: 120000,
|
|
}, {
|
|
query: "foo[2m:1s]",
|
|
start: 300,
|
|
|
|
paramStart: 175, // 300 - 120 - 5
|
|
paramEnd: 300,
|
|
}, {
|
|
query: "count_over_time(foo[2m:1s])",
|
|
start: 300,
|
|
|
|
paramStart: 175, // 300 - 120 - 5
|
|
paramEnd: 300,
|
|
paramFunc: "count_over_time",
|
|
}, {
|
|
query: "count_over_time(foo[2m:1s] offset 10s)",
|
|
start: 300,
|
|
|
|
paramStart: 165, // 300 - 120 - 5 - 10
|
|
paramEnd: 300,
|
|
paramFunc: "count_over_time",
|
|
}, {
|
|
query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)",
|
|
start: 300,
|
|
|
|
paramStart: 155, // 300 - 120 - 5 - 10 - 10
|
|
paramEnd: 290,
|
|
paramFunc: "count_over_time",
|
|
}, {
|
|
// Range queries now.
|
|
query: "foo",
|
|
start: 10,
|
|
end: 20,
|
|
|
|
paramStart: 5,
|
|
paramEnd: 20,
|
|
}, {
|
|
query: "rate(foo[2m])",
|
|
start: 200,
|
|
end: 500,
|
|
|
|
paramStart: 80, // 200 - 120
|
|
paramEnd: 500,
|
|
paramRange: 120000,
|
|
paramFunc: "rate",
|
|
}, {
|
|
query: "rate(foo[2m] offset 2m)",
|
|
start: 300,
|
|
end: 500,
|
|
|
|
paramStart: 60,
|
|
paramEnd: 380,
|
|
paramRange: 120000,
|
|
paramFunc: "rate",
|
|
}, {
|
|
query: "rate(foo[2m:1s])",
|
|
start: 300,
|
|
end: 500,
|
|
|
|
paramStart: 175, // 300 - 120 - 5
|
|
paramEnd: 500,
|
|
paramFunc: "rate",
|
|
}, {
|
|
query: "count_over_time(foo[2m:1s])",
|
|
start: 300,
|
|
end: 500,
|
|
|
|
paramStart: 175, // 300 - 120 - 5
|
|
paramEnd: 500,
|
|
paramFunc: "count_over_time",
|
|
}, {
|
|
query: "count_over_time(foo[2m:1s] offset 10s)",
|
|
start: 300,
|
|
end: 500,
|
|
|
|
paramStart: 165, // 300 - 120 - 5 - 10
|
|
paramEnd: 500,
|
|
paramFunc: "count_over_time",
|
|
}, {
|
|
query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)",
|
|
start: 300,
|
|
end: 500,
|
|
|
|
paramStart: 155, // 300 - 120 - 5 - 10 - 10
|
|
paramEnd: 490,
|
|
paramFunc: "count_over_time",
|
|
}, {
|
|
query: "sum by (dim1) (foo)",
|
|
start: 10,
|
|
|
|
paramStart: 5,
|
|
paramEnd: 10,
|
|
paramGrouping: []string{"dim1"},
|
|
paramBy: true,
|
|
paramFunc: "sum",
|
|
}, {
|
|
query: "sum without (dim1) (foo)",
|
|
start: 10,
|
|
|
|
paramStart: 5,
|
|
paramEnd: 10,
|
|
paramGrouping: []string{"dim1"},
|
|
paramBy: false,
|
|
paramFunc: "sum",
|
|
}, {
|
|
query: "sum by (dim1) (avg_over_time(foo[1s]))",
|
|
start: 10,
|
|
|
|
paramStart: 9,
|
|
paramEnd: 10,
|
|
paramGrouping: nil,
|
|
paramBy: false,
|
|
paramRange: 1000,
|
|
paramFunc: "avg_over_time",
|
|
}, {
|
|
query: "sum by (dim1) (max by (dim2) (foo))",
|
|
start: 10,
|
|
|
|
paramStart: 5,
|
|
paramEnd: 10,
|
|
paramGrouping: []string{"dim2"},
|
|
paramBy: true,
|
|
paramFunc: "max",
|
|
}, {
|
|
query: "(max by (dim1) (foo))[5s:1s]",
|
|
start: 10,
|
|
|
|
paramStart: 0,
|
|
paramEnd: 10,
|
|
paramGrouping: []string{"dim1"},
|
|
paramBy: true,
|
|
paramFunc: "max",
|
|
}}
|
|
|
|
for _, tc := range cases {
|
|
engine := NewEngine(opts)
|
|
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
|
return ¶mCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil
|
|
})
|
|
|
|
var (
|
|
query Query
|
|
err error
|
|
)
|
|
if tc.end == 0 {
|
|
query, err = engine.NewInstantQuery(queryable, tc.query, time.Unix(tc.start, 0))
|
|
} else {
|
|
query, err = engine.NewRangeQuery(queryable, tc.query, time.Unix(tc.start, 0), time.Unix(tc.end, 0), time.Second)
|
|
}
|
|
testutil.Ok(t, err)
|
|
|
|
res := query.Exec(context.Background())
|
|
testutil.Ok(t, res.Err)
|
|
}
|
|
}
|
|
|
|
func TestEngineShutdown(t *testing.T) {
|
|
opts := EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: 10,
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
engine := NewEngine(opts)
|
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
|
|
|
block := make(chan struct{})
|
|
processing := make(chan struct{})
|
|
|
|
// Shutdown engine on first handler execution. Should handler execution ever become
|
|
// concurrent this test has to be adjusted accordingly.
|
|
f := func(ctx context.Context) error {
|
|
processing <- struct{}{}
|
|
<-block
|
|
return contextDone(ctx, "test statement execution")
|
|
}
|
|
query1 := engine.newTestQuery(f)
|
|
|
|
// Stopping the engine must cancel the base context. While executing queries is
|
|
// still possible, their context is canceled from the beginning and execution should
|
|
// terminate immediately.
|
|
|
|
var res *Result
|
|
go func() {
|
|
res = query1.Exec(ctx)
|
|
processing <- struct{}{}
|
|
}()
|
|
|
|
<-processing
|
|
cancelCtx()
|
|
block <- struct{}{}
|
|
<-processing
|
|
|
|
testutil.NotOk(t, res.Err, "expected error on shutdown during query but got none")
|
|
testutil.Equals(t, errQueryCanceled, res.Err)
|
|
|
|
query2 := engine.newTestQuery(func(context.Context) error {
|
|
t.Fatalf("reached query execution unexpectedly")
|
|
return nil
|
|
})
|
|
|
|
// The second query is started after the engine shut down. It must
|
|
// be canceled immediately.
|
|
res2 := query2.Exec(ctx)
|
|
testutil.NotOk(t, res2.Err, "expected error on querying with canceled context but got none")
|
|
|
|
var e ErrQueryCanceled
|
|
// TODO: when circleci-windows moves to go 1.13:
|
|
// testutil.Assert(t, errors.As(res2.Err, &e), "expected cancellation error but got: %s", res2.Err)
|
|
testutil.Assert(t, strings.HasPrefix(res2.Err.Error(), e.Error()), "expected cancellation error but got: %s", res2.Err)
|
|
}
|
|
|
|
func TestEngineEvalStmtTimestamps(t *testing.T) {
|
|
test, err := NewTest(t, `
|
|
load 10s
|
|
metric 1 2
|
|
`)
|
|
testutil.Ok(t, err)
|
|
defer test.Close()
|
|
|
|
err = test.Run()
|
|
testutil.Ok(t, err)
|
|
|
|
cases := []struct {
|
|
Query string
|
|
Result Value
|
|
Start time.Time
|
|
End time.Time
|
|
Interval time.Duration
|
|
ShouldError bool
|
|
}{
|
|
// Instant queries.
|
|
{
|
|
Query: "1",
|
|
Result: Scalar{V: 1, T: 1000},
|
|
Start: time.Unix(1, 0),
|
|
},
|
|
{
|
|
Query: "metric",
|
|
Result: Vector{
|
|
Sample{Point: Point{V: 1, T: 1000},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
Start: time.Unix(1, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s]",
|
|
Result: Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
Start: time.Unix(10, 0),
|
|
},
|
|
// Range queries.
|
|
{
|
|
Query: "1",
|
|
Result: Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}},
|
|
Metric: labels.FromStrings()},
|
|
},
|
|
Start: time.Unix(0, 0),
|
|
End: time.Unix(2, 0),
|
|
Interval: time.Second,
|
|
},
|
|
{
|
|
Query: "metric",
|
|
Result: Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
Start: time.Unix(0, 0),
|
|
End: time.Unix(2, 0),
|
|
Interval: time.Second,
|
|
},
|
|
{
|
|
Query: "metric",
|
|
Result: Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
Start: time.Unix(0, 0),
|
|
End: time.Unix(10, 0),
|
|
Interval: 5 * time.Second,
|
|
},
|
|
{
|
|
Query: `count_values("wrong label!", metric)`,
|
|
ShouldError: true,
|
|
},
|
|
}
|
|
|
|
for _, c := range cases {
|
|
var err error
|
|
var qry Query
|
|
if c.Interval == 0 {
|
|
qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), c.Query, c.Start)
|
|
} else {
|
|
qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval)
|
|
}
|
|
testutil.Ok(t, err)
|
|
|
|
res := qry.Exec(test.Context())
|
|
if c.ShouldError {
|
|
testutil.NotOk(t, res.Err, "expected error for the query %q", c.Query)
|
|
continue
|
|
}
|
|
|
|
testutil.Ok(t, res.Err)
|
|
testutil.Equals(t, c.Result, res.Value)
|
|
}
|
|
|
|
}
|
|
|
|
func TestMaxQuerySamples(t *testing.T) {
|
|
test, err := NewTest(t, `
|
|
load 10s
|
|
metric 1 2
|
|
`)
|
|
testutil.Ok(t, err)
|
|
defer test.Close()
|
|
|
|
err = test.Run()
|
|
testutil.Ok(t, err)
|
|
|
|
cases := []struct {
|
|
Query string
|
|
MaxSamples int
|
|
Result Result
|
|
Start time.Time
|
|
End time.Time
|
|
Interval time.Duration
|
|
}{
|
|
// Instant queries.
|
|
{
|
|
Query: "1",
|
|
MaxSamples: 1,
|
|
Result: Result{
|
|
nil,
|
|
Scalar{V: 1, T: 1000},
|
|
nil},
|
|
Start: time.Unix(1, 0),
|
|
},
|
|
{
|
|
Query: "1",
|
|
MaxSamples: 0,
|
|
Result: Result{
|
|
ErrTooManySamples(env),
|
|
nil,
|
|
nil,
|
|
},
|
|
Start: time.Unix(1, 0),
|
|
},
|
|
{
|
|
Query: "metric",
|
|
MaxSamples: 0,
|
|
Result: Result{
|
|
ErrTooManySamples(env),
|
|
nil,
|
|
nil,
|
|
},
|
|
Start: time.Unix(1, 0),
|
|
},
|
|
{
|
|
Query: "metric",
|
|
MaxSamples: 1,
|
|
Result: Result{
|
|
nil,
|
|
Vector{
|
|
Sample{Point: Point{V: 1, T: 1000},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(1, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s]",
|
|
MaxSamples: 2,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(10, 0),
|
|
},
|
|
{
|
|
Query: "rate(metric[20s])",
|
|
MaxSamples: 3,
|
|
Result: Result{
|
|
nil,
|
|
Vector{
|
|
Sample{
|
|
Point: Point{V: 0.1, T: 10000},
|
|
Metric: labels.Labels{},
|
|
},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(10, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s:5s]",
|
|
MaxSamples: 3,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(10, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s]",
|
|
MaxSamples: 0,
|
|
Result: Result{
|
|
ErrTooManySamples(env),
|
|
nil,
|
|
nil,
|
|
},
|
|
Start: time.Unix(10, 0),
|
|
},
|
|
// Range queries.
|
|
{
|
|
Query: "1",
|
|
MaxSamples: 3,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}},
|
|
Metric: labels.FromStrings()},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(0, 0),
|
|
End: time.Unix(2, 0),
|
|
Interval: time.Second,
|
|
},
|
|
{
|
|
Query: "1",
|
|
MaxSamples: 0,
|
|
Result: Result{
|
|
ErrTooManySamples(env),
|
|
nil,
|
|
nil,
|
|
},
|
|
Start: time.Unix(0, 0),
|
|
End: time.Unix(2, 0),
|
|
Interval: time.Second,
|
|
},
|
|
{
|
|
Query: "metric",
|
|
MaxSamples: 3,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(0, 0),
|
|
End: time.Unix(2, 0),
|
|
Interval: time.Second,
|
|
},
|
|
{
|
|
Query: "metric",
|
|
MaxSamples: 2,
|
|
Result: Result{
|
|
ErrTooManySamples(env),
|
|
nil,
|
|
nil,
|
|
},
|
|
Start: time.Unix(0, 0),
|
|
End: time.Unix(2, 0),
|
|
Interval: time.Second,
|
|
},
|
|
{
|
|
Query: "metric",
|
|
MaxSamples: 3,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(0, 0),
|
|
End: time.Unix(10, 0),
|
|
Interval: 5 * time.Second,
|
|
},
|
|
{
|
|
Query: "metric",
|
|
MaxSamples: 2,
|
|
Result: Result{
|
|
ErrTooManySamples(env),
|
|
nil,
|
|
nil,
|
|
},
|
|
Start: time.Unix(0, 0),
|
|
End: time.Unix(10, 0),
|
|
Interval: 5 * time.Second,
|
|
},
|
|
}
|
|
|
|
engine := test.QueryEngine()
|
|
for _, c := range cases {
|
|
var err error
|
|
var qry Query
|
|
|
|
engine.maxSamplesPerQuery = c.MaxSamples
|
|
|
|
if c.Interval == 0 {
|
|
qry, err = engine.NewInstantQuery(test.Queryable(), c.Query, c.Start)
|
|
} else {
|
|
qry, err = engine.NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval)
|
|
}
|
|
testutil.Ok(t, err)
|
|
|
|
res := qry.Exec(test.Context())
|
|
testutil.Equals(t, c.Result.Err, res.Err)
|
|
testutil.Equals(t, c.Result.Value, res.Value)
|
|
}
|
|
}
|
|
|
|
func TestRecoverEvaluatorRuntime(t *testing.T) {
|
|
ev := &evaluator{logger: log.NewNopLogger()}
|
|
|
|
var err error
|
|
defer ev.recover(&err)
|
|
|
|
// Cause a runtime panic.
|
|
var a []int
|
|
//nolint:govet
|
|
a[123] = 1
|
|
|
|
if err.Error() != "unexpected error" {
|
|
t.Fatalf("wrong error message: %q, expected %q", err, "unexpected error")
|
|
}
|
|
}
|
|
|
|
func TestRecoverEvaluatorError(t *testing.T) {
|
|
ev := &evaluator{logger: log.NewNopLogger()}
|
|
var err error
|
|
|
|
e := errors.New("custom error")
|
|
|
|
defer func() {
|
|
if err.Error() != e.Error() {
|
|
t.Fatalf("wrong error message: %q, expected %q", err, e)
|
|
}
|
|
}()
|
|
defer ev.recover(&err)
|
|
|
|
panic(e)
|
|
}
|
|
|
|
func TestSubquerySelector(t *testing.T) {
|
|
tests := []struct {
|
|
loadString string
|
|
cases []struct {
|
|
Query string
|
|
Result Result
|
|
Start time.Time
|
|
}
|
|
}{
|
|
{
|
|
loadString: `load 10s
|
|
metric 1 2`,
|
|
cases: []struct {
|
|
Query string
|
|
Result Result
|
|
Start time.Time
|
|
}{
|
|
{
|
|
Query: "metric[20s:10s]",
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(10, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s:5s]",
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(10, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s:5s] offset 2s",
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(12, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s:5s] offset 6s",
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(20, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s:5s] offset 4s",
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 2, T: 15000}, {V: 2, T: 20000}, {V: 2, T: 25000}, {V: 2, T: 30000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(35, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s:5s] offset 5s",
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 2, T: 10000}, {V: 2, T: 15000}, {V: 2, T: 20000}, {V: 2, T: 25000}, {V: 2, T: 30000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(35, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s:5s] offset 6s",
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 2, T: 10000}, {V: 2, T: 15000}, {V: 2, T: 20000}, {V: 2, T: 25000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(35, 0),
|
|
},
|
|
{
|
|
Query: "metric[20s:5s] offset 7s",
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 2, T: 10000}, {V: 2, T: 15000}, {V: 2, T: 20000}, {V: 2, T: 25000}},
|
|
Metric: labels.FromStrings("__name__", "metric")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(35, 0),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
loadString: `load 10s
|
|
http_requests{job="api-server", instance="0", group="production"} 0+10x1000 100+30x1000
|
|
http_requests{job="api-server", instance="1", group="production"} 0+20x1000 200+30x1000
|
|
http_requests{job="api-server", instance="0", group="canary"} 0+30x1000 300+80x1000
|
|
http_requests{job="api-server", instance="1", group="canary"} 0+40x2000`,
|
|
cases: []struct {
|
|
Query string
|
|
Result Result
|
|
Start time.Time
|
|
}{
|
|
{ // Normal selector.
|
|
Query: `http_requests{group=~"pro.*",instance="0"}[30s:10s]`,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 9990, T: 9990000}, {V: 10000, T: 10000000}, {V: 100, T: 10010000}, {V: 130, T: 10020000}},
|
|
Metric: labels.FromStrings("__name__", "http_requests", "job", "api-server", "instance", "0", "group", "production")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(10020, 0),
|
|
},
|
|
{ // Default step.
|
|
Query: `http_requests{group=~"pro.*",instance="0"}[5m:]`,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 9840, T: 9840000}, {V: 9900, T: 9900000}, {V: 9960, T: 9960000}, {V: 130, T: 10020000}, {V: 310, T: 10080000}},
|
|
Metric: labels.FromStrings("__name__", "http_requests", "job", "api-server", "instance", "0", "group", "production")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(10100, 0),
|
|
},
|
|
{ // Checking if high offset (>LookbackDelta) is being taken care of.
|
|
Query: `http_requests{group=~"pro.*",instance="0"}[5m:] offset 20m`,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 8640, T: 8640000}, {V: 8700, T: 8700000}, {V: 8760, T: 8760000}, {V: 8820, T: 8820000}, {V: 8880, T: 8880000}},
|
|
Metric: labels.FromStrings("__name__", "http_requests", "job", "api-server", "instance", "0", "group", "production")},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(10100, 0),
|
|
},
|
|
{
|
|
Query: `rate(http_requests[1m])[15s:5s]`,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{
|
|
Series{
|
|
Points: []Point{{V: 3, T: 7985000}, {V: 3, T: 7990000}, {V: 3, T: 7995000}, {V: 3, T: 8000000}},
|
|
Metric: labels.FromStrings("job", "api-server", "instance", "0", "group", "canary"),
|
|
},
|
|
Series{
|
|
Points: []Point{{V: 4, T: 7985000}, {V: 4, T: 7990000}, {V: 4, T: 7995000}, {V: 4, T: 8000000}},
|
|
Metric: labels.FromStrings("job", "api-server", "instance", "1", "group", "canary"),
|
|
},
|
|
Series{
|
|
Points: []Point{{V: 1, T: 7985000}, {V: 1, T: 7990000}, {V: 1, T: 7995000}, {V: 1, T: 8000000}},
|
|
Metric: labels.FromStrings("job", "api-server", "instance", "0", "group", "production"),
|
|
},
|
|
Series{
|
|
Points: []Point{{V: 2, T: 7985000}, {V: 2, T: 7990000}, {V: 2, T: 7995000}, {V: 2, T: 8000000}},
|
|
Metric: labels.FromStrings("job", "api-server", "instance", "1", "group", "production"),
|
|
},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(8000, 0),
|
|
},
|
|
{
|
|
Query: `sum(http_requests{group=~"pro.*"})[30s:10s]`,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 270, T: 90000}, {V: 300, T: 100000}, {V: 330, T: 110000}, {V: 360, T: 120000}},
|
|
Metric: labels.Labels{}},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(120, 0),
|
|
},
|
|
{
|
|
Query: `sum(http_requests)[40s:10s]`,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 800, T: 80000}, {V: 900, T: 90000}, {V: 1000, T: 100000}, {V: 1100, T: 110000}, {V: 1200, T: 120000}},
|
|
Metric: labels.Labels{}},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(120, 0),
|
|
},
|
|
{
|
|
Query: `(sum(http_requests{group=~"p.*"})+sum(http_requests{group=~"c.*"}))[20s:5s]`,
|
|
Result: Result{
|
|
nil,
|
|
Matrix{Series{
|
|
Points: []Point{{V: 1000, T: 100000}, {V: 1000, T: 105000}, {V: 1100, T: 110000}, {V: 1100, T: 115000}, {V: 1200, T: 120000}},
|
|
Metric: labels.Labels{}},
|
|
},
|
|
nil,
|
|
},
|
|
Start: time.Unix(120, 0),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
SetDefaultEvaluationInterval(1 * time.Minute)
|
|
for _, tst := range tests {
|
|
test, err := NewTest(t, tst.loadString)
|
|
testutil.Ok(t, err)
|
|
|
|
defer test.Close()
|
|
|
|
err = test.Run()
|
|
testutil.Ok(t, err)
|
|
|
|
engine := test.QueryEngine()
|
|
for _, c := range tst.cases {
|
|
var err error
|
|
var qry Query
|
|
|
|
qry, err = engine.NewInstantQuery(test.Queryable(), c.Query, c.Start)
|
|
testutil.Ok(t, err)
|
|
|
|
res := qry.Exec(test.Context())
|
|
testutil.Equals(t, c.Result.Err, res.Err)
|
|
mat := res.Value.(Matrix)
|
|
sort.Sort(mat)
|
|
testutil.Equals(t, c.Result.Value, mat)
|
|
}
|
|
}
|
|
}
|
|
|
|
type FakeQueryLogger struct {
|
|
closed bool
|
|
logs []interface{}
|
|
}
|
|
|
|
func NewFakeQueryLogger() *FakeQueryLogger {
|
|
return &FakeQueryLogger{
|
|
closed: false,
|
|
logs: make([]interface{}, 0),
|
|
}
|
|
}
|
|
|
|
func (f *FakeQueryLogger) Close() error {
|
|
f.closed = true
|
|
return nil
|
|
}
|
|
|
|
func (f *FakeQueryLogger) Log(l ...interface{}) error {
|
|
f.logs = append(f.logs, l...)
|
|
return nil
|
|
}
|
|
|
|
func TestQueryLogger_basic(t *testing.T) {
|
|
opts := EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: 10,
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
engine := NewEngine(opts)
|
|
|
|
queryExec := func() {
|
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
|
defer cancelCtx()
|
|
query := engine.newTestQuery(func(ctx context.Context) error {
|
|
return contextDone(ctx, "test statement execution")
|
|
})
|
|
res := query.Exec(ctx)
|
|
testutil.Ok(t, res.Err)
|
|
}
|
|
|
|
// Query works without query log initalized.
|
|
queryExec()
|
|
|
|
f1 := NewFakeQueryLogger()
|
|
engine.SetQueryLogger(f1)
|
|
queryExec()
|
|
for i, field := range []interface{}{"params", map[string]interface{}{"query": "test statement"}} {
|
|
testutil.Equals(t, field, f1.logs[i])
|
|
}
|
|
|
|
l := len(f1.logs)
|
|
queryExec()
|
|
testutil.Equals(t, 2*l, len(f1.logs))
|
|
|
|
// Test that we close the query logger when unsetting it.
|
|
testutil.Assert(t, !f1.closed, "expected f1 to be open, got closed")
|
|
engine.SetQueryLogger(nil)
|
|
testutil.Assert(t, f1.closed, "expected f1 to be closed, got open")
|
|
queryExec()
|
|
|
|
// Test that we close the query logger when swapping.
|
|
f2 := NewFakeQueryLogger()
|
|
f3 := NewFakeQueryLogger()
|
|
engine.SetQueryLogger(f2)
|
|
testutil.Assert(t, !f2.closed, "expected f2 to be open, got closed")
|
|
queryExec()
|
|
engine.SetQueryLogger(f3)
|
|
testutil.Assert(t, f2.closed, "expected f2 to be closed, got open")
|
|
testutil.Assert(t, !f3.closed, "expected f3 to be open, got closed")
|
|
queryExec()
|
|
}
|
|
|
|
func TestQueryLogger_fields(t *testing.T) {
|
|
opts := EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: 10,
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
engine := NewEngine(opts)
|
|
|
|
f1 := NewFakeQueryLogger()
|
|
engine.SetQueryLogger(f1)
|
|
|
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
|
ctx = NewOriginContext(ctx, map[string]interface{}{"foo": "bar"})
|
|
defer cancelCtx()
|
|
query := engine.newTestQuery(func(ctx context.Context) error {
|
|
return contextDone(ctx, "test statement execution")
|
|
})
|
|
|
|
res := query.Exec(ctx)
|
|
testutil.Ok(t, res.Err)
|
|
|
|
expected := []string{"foo", "bar"}
|
|
for i, field := range expected {
|
|
v := f1.logs[len(f1.logs)-len(expected)+i].(string)
|
|
testutil.Equals(t, field, v)
|
|
}
|
|
}
|
|
|
|
func TestQueryLogger_error(t *testing.T) {
|
|
opts := EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: 10,
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
engine := NewEngine(opts)
|
|
|
|
f1 := NewFakeQueryLogger()
|
|
engine.SetQueryLogger(f1)
|
|
|
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
|
ctx = NewOriginContext(ctx, map[string]interface{}{"foo": "bar"})
|
|
defer cancelCtx()
|
|
testErr := errors.New("failure")
|
|
query := engine.newTestQuery(func(ctx context.Context) error {
|
|
return testErr
|
|
})
|
|
|
|
res := query.Exec(ctx)
|
|
testutil.NotOk(t, res.Err, "query should have failed")
|
|
|
|
for i, field := range []interface{}{"params", map[string]interface{}{"query": "test statement"}, "error", testErr} {
|
|
testutil.Equals(t, f1.logs[i], field)
|
|
}
|
|
}
|