Merge pull request #3963 from mz-techops/fix-query-err-scope
promql: propagate storage errors
This commit is contained in:
commit
e87c6c8b28
|
@ -497,6 +497,7 @@ func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s
|
|||
}
|
||||
|
||||
Inspect(s.Expr, func(node Node, path []Node) bool {
|
||||
var set storage.SeriesSet
|
||||
params := &storage.SelectParams{
|
||||
Step: int64(s.Interval / time.Millisecond),
|
||||
}
|
||||
|
@ -505,7 +506,7 @@ func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s
|
|||
case *VectorSelector:
|
||||
params.Func = extractFuncFromPath(path)
|
||||
|
||||
set, err := querier.Select(params, n.LabelMatchers...)
|
||||
set, err = querier.Select(params, n.LabelMatchers...)
|
||||
if err != nil {
|
||||
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||
return false
|
||||
|
@ -524,7 +525,7 @@ func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s
|
|||
case *MatrixSelector:
|
||||
params.Func = extractFuncFromPath(path)
|
||||
|
||||
set, err := querier.Select(params, n.LabelMatchers...)
|
||||
set, err = querier.Select(params, n.LabelMatchers...)
|
||||
if err != nil {
|
||||
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||
return false
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
func TestQueryConcurrency(t *testing.T) {
|
||||
|
@ -142,6 +143,60 @@ func TestQueryCancel(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// errQuerier implements storage.Querier which always returns error.
|
||||
type errQuerier struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error) {
|
||||
return errSeriesSet{err: q.err}, q.err
|
||||
}
|
||||
func (*errQuerier) LabelValues(name string) ([]string, error) { return nil, nil }
|
||||
func (*errQuerier) Close() error { return nil }
|
||||
|
||||
// errSeriesSet implements storage.SeriesSet which always returns error.
|
||||
type errSeriesSet struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (errSeriesSet) Next() bool { return false }
|
||||
func (errSeriesSet) At() storage.Series { return nil }
|
||||
func (e errSeriesSet) Err() error { return e.err }
|
||||
|
||||
func TestQueryError(t *testing.T) {
|
||||
engine := NewEngine(nil, nil, 10, 10*time.Second)
|
||||
errStorage := ErrStorage(fmt.Errorf("storage error"))
|
||||
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
return &errQuerier{err: errStorage}, nil
|
||||
})
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
defer cancelCtx()
|
||||
|
||||
vectorQuery, err := engine.NewInstantQuery(queryable, "foo", time.Unix(1, 0))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating query: %q", err)
|
||||
}
|
||||
res := vectorQuery.Exec(ctx)
|
||||
if res.Err == nil {
|
||||
t.Fatalf("expected error on failed select but got none")
|
||||
}
|
||||
if res.Err != errStorage {
|
||||
t.Fatalf("expected error %q, got %q", errStorage, res.Err)
|
||||
}
|
||||
|
||||
matrixQuery, err := engine.NewInstantQuery(queryable, "foo[1m]", time.Unix(1, 0))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating query: %q", err)
|
||||
}
|
||||
res = matrixQuery.Exec(ctx)
|
||||
if res.Err == nil {
|
||||
t.Fatalf("expected error on failed select but got none")
|
||||
}
|
||||
if res.Err != errStorage {
|
||||
t.Fatalf("expected error %q, got %q", errStorage, res.Err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngineShutdown(t *testing.T) {
|
||||
engine := NewEngine(nil, nil, 10, 10*time.Second)
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
|
|
Loading…
Reference in New Issue