From 690b5f1575e2f592d96d9018a95067c5da419b6a Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 10 Aug 2015 14:21:24 +0200 Subject: [PATCH] Remove multi-statement queries This commit removes the possibility to have multi-statement queries which had no full support anyway. This makes the caller responsible for multi-statement semantics. Multiple tests are no longer timing-dependent. --- promql/engine.go | 56 ++++++++++------------- promql/engine_test.go | 101 ++++++++++++++++++++++++------------------ 2 files changed, 82 insertions(+), 75 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index f32f373eb..44aab0677 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -196,8 +196,8 @@ func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was cancele type Query interface { // Exec processes the query and Exec() *Result - // Statements returns the parsed statements of the query. - Statements() Statements + // Statement returns the parsed statement of the query. + Statement() Statement // Stats returns statistics about the lifetime of the query. Stats() *stats.TimerGroup // Cancel signals that a running query execution should be aborted. @@ -208,8 +208,8 @@ type Query interface { type query struct { // The original query string. q string - // Statements of the parsed query. - stmts Statements + // Statement of the parsed query. + stmt Statement // Timer stats for the query execution. stats *stats.TimerGroup // Cancelation function for the query. @@ -219,9 +219,9 @@ type query struct { ng *Engine } -// Statements implements the Query interface. -func (q *query) Statements() Statements { - return q.stmts +// Statement implements the Query interface. +func (q *query) Statement() Statement { + return q.stmt } // Stats implements the Query interface. @@ -343,7 +343,7 @@ func (ng *Engine) newQuery(expr Expr, start, end clientmodel.Timestamp, interval Interval: interval, } qry := &query{ - stmts: Statements{es}, + stmt: es, ng: ng, stats: stats.NewTimerGroup(), } @@ -358,10 +358,10 @@ func (testStmt) String() string { return "test statement" } func (testStmt) DotGraph() string { return "test statement" } func (testStmt) stmt() {} -func (ng *Engine) newTestQuery(stmts ...Statement) Query { +func (ng *Engine) newTestQuery(f func(context.Context) error) Query { qry := &query{ q: "test statement", - stmts: Statements(stmts), + stmt: testStmt(f), ng: ng, stats: stats.NewTimerGroup(), } @@ -373,8 +373,6 @@ func (ng *Engine) newTestQuery(stmts ...Statement) Query { // At this point per query only one EvalStmt is evaluated. Alert and record // statements are not handled by the Engine. func (ng *Engine) exec(q *query) (Value, error) { - const env = "query execution" - ctx, cancel := context.WithTimeout(q.ng.baseCtx, ng.options.Timeout) q.cancel = cancel @@ -390,30 +388,24 @@ func (ng *Engine) exec(q *query) (Value, error) { // Cancel when execution is done or an error was raised. defer q.cancel() + const env = "query execution" + evalTimer := q.stats.GetTimer(stats.TotalEvalTime).Start() defer evalTimer.Stop() - for _, stmt := range q.stmts { - // The base context might already be canceled on the first iteration (e.g. during shutdown). - if err := contextDone(ctx, env); err != nil { - return nil, err - } - - switch s := stmt.(type) { - case *EvalStmt: - // Currently, only one execution statement per query is allowed. - return ng.execEvalStmt(ctx, q, s) - - case testStmt: - if err := s(ctx); err != nil { - return nil, err - } - - default: - panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", stmt)) - } + // The base context might already be canceled on the first iteration (e.g. during shutdown). + if err := contextDone(ctx, env); err != nil { + return nil, err } - return nil, nil + + switch s := q.Statement().(type) { + case *EvalStmt: + return ng.execEvalStmt(ctx, q, s) + case testStmt: + return nil, s(ctx) + } + + panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement())) } // execEvalStmt evaluates the expression of an evaluation statement for the given time range. diff --git a/promql/engine_test.go b/promql/engine_test.go index 6636d5c5a..1572cbfb1 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -1,7 +1,6 @@ package promql import ( - "sync" "testing" "time" @@ -18,14 +17,15 @@ func TestQueryConcurreny(t *testing.T) { block := make(chan struct{}) processing := make(chan struct{}) - f1 := testStmt(func(context.Context) error { + + f := func(context.Context) error { processing <- struct{}{} <-block return nil - }) + } for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ { - q := engine.newTestQuery(f1) + q := engine.newTestQuery(f) go q.Exec() select { case <-processing: @@ -35,7 +35,7 @@ func TestQueryConcurreny(t *testing.T) { } } - q := engine.newTestQuery(f1) + q := engine.newTestQuery(f) go q.Exec() select { @@ -68,15 +68,11 @@ func TestQueryTimeout(t *testing.T) { }) defer engine.Stop() - f1 := testStmt(func(context.Context) error { + query := engine.newTestQuery(func(ctx context.Context) error { time.Sleep(10 * time.Millisecond) - return nil + return contextDone(ctx, "test statement execution") }) - // Timeouts are not exact but checked in designated places. For example between - // invoking test statements. - query := engine.newTestQuery(f1, f1) - res := query.Exec() if res.Err == nil { t.Fatalf("expected timeout error but got none") @@ -90,37 +86,40 @@ func TestQueryCancel(t *testing.T) { engine := NewEngine(nil, nil) defer engine.Stop() - // As for timeouts, cancellation is only checked at designated points. We ensure - // that we reach one of those points using the same method. - f1 := testStmt(func(context.Context) error { - time.Sleep(2 * time.Millisecond) - return nil + // Cancel a running query before it completes. + block := make(chan struct{}) + processing := make(chan struct{}) + + query1 := engine.newTestQuery(func(ctx context.Context) error { + processing <- struct{}{} + <-block + return contextDone(ctx, "test statement execution") }) - query1 := engine.newTestQuery(f1, f1) - query2 := engine.newTestQuery(f1, f1) - - // Cancel query after starting it. - var wg sync.WaitGroup var res *Result - wg.Add(1) go func() { res = query1.Exec() - wg.Done() + processing <- struct{}{} }() - time.Sleep(1 * time.Millisecond) + + <-processing query1.Cancel() - wg.Wait() + block <- struct{}{} + <-processing if res.Err == nil { t.Fatalf("expected cancellation error for query1 but got none") } - if _, ok := res.Err.(ErrQueryCanceled); res.Err != nil && !ok { - t.Fatalf("expected cancellation error for query1 but got: %s", res.Err) + if ee := ErrQueryCanceled("test statement execution"); res.Err != ee { + t.Fatalf("expected error %q, got %q") } - // Canceling query before starting it must have no effect. + // Canceling a query before starting it must have no effect. + query2 := engine.newTestQuery(func(ctx context.Context) error { + return contextDone(ctx, "test statement execution") + }) + query2.Cancel() res = query2.Exec() if res.Err != nil { @@ -131,36 +130,52 @@ func TestQueryCancel(t *testing.T) { func TestEngineShutdown(t *testing.T) { engine := NewEngine(nil, nil) - handlerExecutions := 0 + block := make(chan struct{}) + processing := make(chan struct{}) + // Shutdown engine on first handler execution. Should handler execution ever become // concurrent this test has to be adjusted accordingly. - f1 := testStmt(func(context.Context) error { - handlerExecutions++ - engine.Stop() - time.Sleep(10 * time.Millisecond) - return nil - }) - query1 := engine.newTestQuery(f1, f1) - query2 := engine.newTestQuery(f1, f1) + f := func(ctx context.Context) error { + processing <- struct{}{} + <-block + return contextDone(ctx, "test statement execution") + } + query1 := engine.newTestQuery(f) // Stopping the engine must cancel the base context. While executing queries is // still possible, their context is canceled from the beginning and execution should // terminate immediately. - res := query1.Exec() + var res *Result + go func() { + res = query1.Exec() + processing <- struct{}{} + }() + + <-processing + engine.Stop() + block <- struct{}{} + <-processing + if res.Err == nil { t.Fatalf("expected error on shutdown during query but got none") } - if handlerExecutions != 1 { - t.Fatalf("expected only one handler to be executed before query cancellation but got %d executions", handlerExecutions) + if ee := ErrQueryCanceled("test statement execution"); res.Err != ee { + t.Fatalf("expected error %q, got %q", ee, res.Err) } + query2 := engine.newTestQuery(func(context.Context) error { + t.Fatalf("reached query execution unexpectedly") + return nil + }) + + // The second query is started after the engine shut down. It must + // be canceled immediately. res2 := query2.Exec() if res2.Err == nil { t.Fatalf("expected error on querying shutdown engine but got none") } - if handlerExecutions != 1 { - t.Fatalf("expected no handler execution for query after engine shutdown") + if _, ok := res2.Err.(ErrQueryCanceled); !ok { + t.Fatalf("expected cancelation error, got %q", res2.Err) } - }