promql: refactor: extract fn to wait on concurrency limit

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2023-06-01 17:17:04 +00:00
parent a8772a4178
commit 1f3821379c
1 changed files with 16 additions and 11 deletions

View File

@ -589,18 +589,11 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storag
execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime)
defer execSpanTimer.Finish()
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
// Log query in active log. The active log guarantees that we don't run over
// MaxConcurrent queries.
if ng.activeQueryTracker != nil {
queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q)
if err != nil {
queueSpanTimer.Finish()
return nil, nil, contextErr(err, "query queue")
}
defer ng.activeQueryTracker.Delete(queryIndex)
if finish, err := ng.queueActive(ctx, q); err != nil {
return nil, nil, err
} else {
defer finish()
}
queueSpanTimer.Finish()
// Cancel when execution is done or an error was raised.
defer q.cancel()
@ -623,6 +616,18 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storag
panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement()))
}
// Log query in active log. The active log guarantees that we don't run over
// MaxConcurrent queries.
func (ng *Engine) queueActive(ctx context.Context, q *query) (func(), error) {
if ng.activeQueryTracker == nil {
return func() {}, nil
}
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q)
queueSpanTimer.Finish()
return func() { ng.activeQueryTracker.Delete(queryIndex) }, err
}
func timeMilliseconds(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond)
}