TSDB: Extract parts out of populateSeries (#6983)
This addresses fabxc's TODO. More importantly, it now properly defers the querier.Close(). Previously, if a panic happened after creation of the querier within the populateSeries function, querier.Close() was never called. The latter was responsible for #6977. Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
parent
3128875ff4
commit
a28fa010ee
|
@ -502,15 +502,16 @@ func durationMilliseconds(d time.Duration) int64 {
|
|||
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
|
||||
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.EvalStmt) (parser.Value, storage.Warnings, error) {
|
||||
prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime)
|
||||
querier, warnings, err := ng.populateSeries(ctxPrepare, query.queryable, s)
|
||||
prepareSpanTimer.Finish()
|
||||
|
||||
// XXX(fabxc): the querier returned by populateSeries might be instantiated
|
||||
// we must not return without closing irrespective of the error.
|
||||
// TODO: make this semantically saner.
|
||||
if querier != nil {
|
||||
defer querier.Close()
|
||||
mint := ng.findMinTime(s)
|
||||
querier, err := query.queryable.Querier(ctxPrepare, timestamp.FromTime(mint), timestamp.FromTime(s.End))
|
||||
if err != nil {
|
||||
prepareSpanTimer.Finish()
|
||||
return nil, nil, err
|
||||
}
|
||||
defer querier.Close()
|
||||
|
||||
warnings, err := ng.populateSeries(ctxPrepare, querier, s)
|
||||
prepareSpanTimer.Finish()
|
||||
|
||||
if err != nil {
|
||||
return nil, warnings, err
|
||||
|
@ -616,7 +617,7 @@ func (ng *Engine) cumulativeSubqueryOffset(path []parser.Node) time.Duration {
|
|||
return subqOffset
|
||||
}
|
||||
|
||||
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *parser.EvalStmt) (storage.Querier, storage.Warnings, error) {
|
||||
func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time {
|
||||
var maxOffset time.Duration
|
||||
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
|
||||
subqOffset := ng.cumulativeSubqueryOffset(path)
|
||||
|
@ -638,20 +639,18 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa
|
|||
}
|
||||
return nil
|
||||
})
|
||||
return s.Start.Add(-maxOffset)
|
||||
}
|
||||
|
||||
mint := s.Start.Add(-maxOffset)
|
||||
|
||||
querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var warnings storage.Warnings
|
||||
|
||||
// Whenever a MatrixSelector is evaluated this variable is set to the corresponding range.
|
||||
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
|
||||
// the variable.
|
||||
var evalRange time.Duration
|
||||
func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s *parser.EvalStmt) (storage.Warnings, error) {
|
||||
var (
|
||||
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
|
||||
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
|
||||
// the variable.
|
||||
evalRange time.Duration
|
||||
warnings storage.Warnings
|
||||
err error
|
||||
)
|
||||
|
||||
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
|
||||
var set storage.SeriesSet
|
||||
|
@ -703,7 +702,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa
|
|||
}
|
||||
return nil
|
||||
})
|
||||
return querier, warnings, err
|
||||
return warnings, err
|
||||
}
|
||||
|
||||
// extractFuncFromPath walks up the path and searches for the first instance of
|
||||
|
|
Loading…
Reference in New Issue