Add tracing spans to promql (#4436)
* Add spans to promql Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Simplify timer and span tracking. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
This commit is contained in:
parent
0b4d22b245
commit
71855a22a4
|
@ -85,7 +85,7 @@ type Query interface {
|
||||||
// Statement returns the parsed statement of the query.
|
// Statement returns the parsed statement of the query.
|
||||||
Statement() Statement
|
Statement() Statement
|
||||||
// Stats returns statistics about the lifetime of the query.
|
// Stats returns statistics about the lifetime of the query.
|
||||||
Stats() *stats.TimerGroup
|
Stats() *stats.QueryTimers
|
||||||
// Cancel signals that a running query execution should be aborted.
|
// Cancel signals that a running query execution should be aborted.
|
||||||
Cancel()
|
Cancel()
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ type query struct {
|
||||||
// Statement of the parsed query.
|
// Statement of the parsed query.
|
||||||
stmt Statement
|
stmt Statement
|
||||||
// Timer stats for the query execution.
|
// Timer stats for the query execution.
|
||||||
stats *stats.TimerGroup
|
stats *stats.QueryTimers
|
||||||
// Result matrix for reuse.
|
// Result matrix for reuse.
|
||||||
matrix Matrix
|
matrix Matrix
|
||||||
// Cancellation function for the query.
|
// Cancellation function for the query.
|
||||||
|
@ -115,7 +115,7 @@ func (q *query) Statement() Statement {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stats implements the Query interface.
|
// Stats implements the Query interface.
|
||||||
func (q *query) Stats() *stats.TimerGroup {
|
func (q *query) Stats() *stats.QueryTimers {
|
||||||
return q.stats
|
return q.stats
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -276,7 +276,7 @@ func (ng *Engine) newQuery(q storage.Queryable, expr Expr, start, end time.Time,
|
||||||
qry := &query{
|
qry := &query{
|
||||||
stmt: es,
|
stmt: es,
|
||||||
ng: ng,
|
ng: ng,
|
||||||
stats: stats.NewTimerGroup(),
|
stats: stats.NewQueryTimers(),
|
||||||
queryable: q,
|
queryable: q,
|
||||||
}
|
}
|
||||||
return qry
|
return qry
|
||||||
|
@ -294,7 +294,7 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
|
||||||
q: "test statement",
|
q: "test statement",
|
||||||
stmt: testStmt(f),
|
stmt: testStmt(f),
|
||||||
ng: ng,
|
ng: ng,
|
||||||
stats: stats.NewTimerGroup(),
|
stats: stats.NewQueryTimers(),
|
||||||
}
|
}
|
||||||
return qry
|
return qry
|
||||||
}
|
}
|
||||||
|
@ -310,25 +310,25 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
|
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
|
||||||
q.cancel = cancel
|
q.cancel = cancel
|
||||||
|
|
||||||
execTimer := q.stats.GetTimer(stats.ExecTotalTime).Start()
|
execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime)
|
||||||
defer execTimer.Stop()
|
defer execSpanTimer.Finish()
|
||||||
queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start()
|
|
||||||
|
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
|
||||||
|
|
||||||
if err := ng.gate.Start(ctx); err != nil {
|
if err := ng.gate.Start(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer ng.gate.Done()
|
defer ng.gate.Done()
|
||||||
|
|
||||||
queueTimer.Stop()
|
queueSpanTimer.Finish()
|
||||||
ng.metrics.queryQueueTime.Observe(queueTimer.ElapsedTime().Seconds())
|
|
||||||
|
|
||||||
// Cancel when execution is done or an error was raised.
|
// Cancel when execution is done or an error was raised.
|
||||||
defer q.cancel()
|
defer q.cancel()
|
||||||
|
|
||||||
const env = "query execution"
|
const env = "query execution"
|
||||||
|
|
||||||
evalTimer := q.stats.GetTimer(stats.EvalTotalTime).Start()
|
evalSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.EvalTotalTime)
|
||||||
defer evalTimer.Stop()
|
defer evalSpanTimer.Finish()
|
||||||
|
|
||||||
// The base context might already be canceled on the first iteration (e.g. during shutdown).
|
// The base context might already be canceled on the first iteration (e.g. during shutdown).
|
||||||
if err := contextDone(ctx, env); err != nil {
|
if err := contextDone(ctx, env); err != nil {
|
||||||
|
@ -355,10 +355,9 @@ func durationMilliseconds(d time.Duration) int64 {
|
||||||
|
|
||||||
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
|
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
|
||||||
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error) {
|
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error) {
|
||||||
prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start()
|
prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime)
|
||||||
querier, err := ng.populateSeries(ctx, query.queryable, s)
|
querier, err := ng.populateSeries(ctxPrepare, query.queryable, s)
|
||||||
prepareTimer.Stop()
|
prepareSpanTimer.Finish()
|
||||||
ng.metrics.queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds())
|
|
||||||
|
|
||||||
// XXX(fabxc): the querier returned by populateSeries might be instantiated
|
// XXX(fabxc): the querier returned by populateSeries might be instantiated
|
||||||
// we must not return without closing irrespective of the error.
|
// we must not return without closing irrespective of the error.
|
||||||
|
@ -371,7 +370,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start()
|
evalSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval)
|
||||||
// Instant evaluation. This is executed as a range evaluation with one step.
|
// Instant evaluation. This is executed as a range evaluation with one step.
|
||||||
if s.Start == s.End && s.Interval == 0 {
|
if s.Start == s.End && s.Interval == 0 {
|
||||||
start := timeMilliseconds(s.Start)
|
start := timeMilliseconds(s.Start)
|
||||||
|
@ -387,8 +386,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
evalTimer.Stop()
|
evalSpanTimer.Finish()
|
||||||
ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds())
|
|
||||||
|
|
||||||
mat, ok := val.(Matrix)
|
mat, ok := val.(Matrix)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -427,8 +425,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
evalTimer.Stop()
|
evalSpanTimer.Finish()
|
||||||
ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds())
|
|
||||||
|
|
||||||
mat, ok := val.(Matrix)
|
mat, ok := val.(Matrix)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -442,11 +439,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
||||||
|
|
||||||
// TODO(fabxc): order ensured by storage?
|
// TODO(fabxc): order ensured by storage?
|
||||||
// TODO(fabxc): where to ensure metric labels are a copy from the storage internals.
|
// TODO(fabxc): where to ensure metric labels are a copy from the storage internals.
|
||||||
sortTimer := query.stats.GetTimer(stats.ResultSortTime).Start()
|
sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort)
|
||||||
sort.Sort(mat)
|
sort.Sort(mat)
|
||||||
sortTimer.Stop()
|
sortSpanTimer.Finish()
|
||||||
|
|
||||||
ng.metrics.queryResultSort.Observe(sortTimer.ElapsedTime().Seconds())
|
|
||||||
return mat, nil
|
return mat, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,13 @@
|
||||||
|
|
||||||
package stats
|
package stats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
opentracing "github.com/opentracing/opentracing-go"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
// QueryTiming identifies the code area or functionality in which time is spent
|
// QueryTiming identifies the code area or functionality in which time is spent
|
||||||
// during a query.
|
// during a query.
|
||||||
type QueryTiming int
|
type QueryTiming int
|
||||||
|
@ -47,6 +54,26 @@ func (s QueryTiming) String() string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return a string representation of a QueryTiming span operation.
|
||||||
|
func (s QueryTiming) SpanOperation() string {
|
||||||
|
switch s {
|
||||||
|
case EvalTotalTime:
|
||||||
|
return "promqlEval"
|
||||||
|
case ResultSortTime:
|
||||||
|
return "promqlSort"
|
||||||
|
case QueryPreparationTime:
|
||||||
|
return "promqlPrepare"
|
||||||
|
case InnerEvalTime:
|
||||||
|
return "promqlInnerEval"
|
||||||
|
case ExecQueueTime:
|
||||||
|
return "promqlExecQueue"
|
||||||
|
case ExecTotalTime:
|
||||||
|
return "promqlExec"
|
||||||
|
default:
|
||||||
|
return "Unknown query timing"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// queryTimings with all query timers mapped to durations.
|
// queryTimings with all query timers mapped to durations.
|
||||||
type queryTimings struct {
|
type queryTimings struct {
|
||||||
EvalTotalTime float64 `json:"evalTotalTime"`
|
EvalTotalTime float64 `json:"evalTotalTime"`
|
||||||
|
@ -64,10 +91,10 @@ type QueryStats struct {
|
||||||
|
|
||||||
// NewQueryStats makes a QueryStats struct with all QueryTimings found in the
|
// NewQueryStats makes a QueryStats struct with all QueryTimings found in the
|
||||||
// given TimerGroup.
|
// given TimerGroup.
|
||||||
func NewQueryStats(tg *TimerGroup) *QueryStats {
|
func NewQueryStats(tg *QueryTimers) *QueryStats {
|
||||||
var qt queryTimings
|
var qt queryTimings
|
||||||
|
|
||||||
for s, timer := range tg.timers {
|
for s, timer := range tg.TimerGroup.timers {
|
||||||
switch s {
|
switch s {
|
||||||
case EvalTotalTime:
|
case EvalTotalTime:
|
||||||
qt.EvalTotalTime = timer.Duration()
|
qt.EvalTotalTime = timer.Duration()
|
||||||
|
@ -87,3 +114,44 @@ func NewQueryStats(tg *TimerGroup) *QueryStats {
|
||||||
qs := QueryStats{Timings: qt}
|
qs := QueryStats{Timings: qt}
|
||||||
return &qs
|
return &qs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SpanTimer unifies tracing and timing, to reduce repetition.
|
||||||
|
type SpanTimer struct {
|
||||||
|
timer *Timer
|
||||||
|
observers []prometheus.Observer
|
||||||
|
|
||||||
|
span opentracing.Span
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSpanTimer(ctx context.Context, operation string, timer *Timer, observers ...prometheus.Observer) (*SpanTimer, context.Context) {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, operation)
|
||||||
|
timer.Start()
|
||||||
|
|
||||||
|
return &SpanTimer{
|
||||||
|
timer: timer,
|
||||||
|
observers: observers,
|
||||||
|
|
||||||
|
span: span,
|
||||||
|
}, ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SpanTimer) Finish() {
|
||||||
|
s.timer.Stop()
|
||||||
|
s.span.Finish()
|
||||||
|
|
||||||
|
for _, obs := range s.observers {
|
||||||
|
obs.Observe(s.timer.ElapsedTime().Seconds())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueryTimers struct {
|
||||||
|
*TimerGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewQueryTimers() *QueryTimers {
|
||||||
|
return &QueryTimers{NewTimerGroup()}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (qs *QueryTimers) GetSpanTimer(ctx context.Context, qt QueryTiming, observers ...prometheus.Observer) (*SpanTimer, context.Context) {
|
||||||
|
return NewSpanTimer(ctx, qt.SpanOperation(), qs.TimerGroup.GetTimer(qt), observers...)
|
||||||
|
}
|
||||||
|
|
|
@ -39,13 +39,13 @@ func TestTimerGroupNewTimer(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQueryStatsWithTimers(t *testing.T) {
|
func TestQueryStatsWithTimers(t *testing.T) {
|
||||||
tg := NewTimerGroup()
|
qt := NewQueryTimers()
|
||||||
timer := tg.GetTimer(ExecTotalTime)
|
timer := qt.GetTimer(ExecTotalTime)
|
||||||
timer.Start()
|
timer.Start()
|
||||||
time.Sleep(2 * time.Millisecond)
|
time.Sleep(2 * time.Millisecond)
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
|
|
||||||
qs := NewQueryStats(tg)
|
qs := NewQueryStats(qt)
|
||||||
actual, err := json.Marshal(qs)
|
actual, err := json.Marshal(qs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error during serialization: %v", err)
|
t.Fatalf("Unexpected error during serialization: %v", err)
|
||||||
|
|
Loading…
Reference in New Issue