mirror of
https://github.com/prometheus/prometheus
synced 2025-01-14 10:52:10 +00:00
9ab1f6c690
A high number of concurrent queries can slow each other down so that none of them is reasonbly responsive. This commit limits the number of queries being concurrently executed.
170 lines
4.0 KiB
Go
170 lines
4.0 KiB
Go
package promql
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
var noop = testStmt(func(context.Context) error {
|
|
return nil
|
|
})
|
|
|
|
func TestQueryConcurreny(t *testing.T) {
|
|
engine := NewEngine(nil)
|
|
defer engine.Stop()
|
|
|
|
block := make(chan struct{})
|
|
processing := make(chan struct{})
|
|
f1 := testStmt(func(context.Context) error {
|
|
processing <- struct{}{}
|
|
<-block
|
|
return nil
|
|
})
|
|
|
|
for i := 0; i < *maxConcurrentQueries; i++ {
|
|
q := engine.newTestQuery(f1)
|
|
go q.Exec()
|
|
select {
|
|
case <-processing:
|
|
// Expected.
|
|
case <-time.After(5 * time.Millisecond):
|
|
t.Fatalf("Query within concurrency threshold not being executed")
|
|
}
|
|
}
|
|
|
|
q := engine.newTestQuery(f1)
|
|
go q.Exec()
|
|
|
|
select {
|
|
case <-processing:
|
|
t.Fatalf("Query above concurrency threhosld being executed")
|
|
case <-time.After(5 * time.Millisecond):
|
|
// Expected.
|
|
}
|
|
|
|
// Terminate a running query.
|
|
block <- struct{}{}
|
|
|
|
select {
|
|
case <-processing:
|
|
// Expected.
|
|
case <-time.After(5 * time.Millisecond):
|
|
t.Fatalf("Query within concurrency threshold not being executed")
|
|
}
|
|
|
|
// Terminate remaining queries.
|
|
for i := 0; i < *maxConcurrentQueries; i++ {
|
|
block <- struct{}{}
|
|
}
|
|
}
|
|
|
|
func TestQueryTimeout(t *testing.T) {
|
|
*defaultQueryTimeout = 5 * time.Millisecond
|
|
defer func() {
|
|
// Restore default query timeout
|
|
*defaultQueryTimeout = 2 * time.Minute
|
|
}()
|
|
|
|
engine := NewEngine(nil)
|
|
defer engine.Stop()
|
|
|
|
f1 := testStmt(func(context.Context) error {
|
|
time.Sleep(10 * time.Millisecond)
|
|
return nil
|
|
})
|
|
|
|
// Timeouts are not exact but checked in designated places. For example between
|
|
// invoking test statements.
|
|
query := engine.newTestQuery(f1, f1)
|
|
|
|
res := query.Exec()
|
|
if res.Err == nil {
|
|
t.Fatalf("expected timeout error but got none")
|
|
}
|
|
if _, ok := res.Err.(ErrQueryTimeout); res.Err != nil && !ok {
|
|
t.Fatalf("expected timeout error but got: %s", res.Err)
|
|
}
|
|
}
|
|
|
|
func TestQueryCancel(t *testing.T) {
|
|
engine := NewEngine(nil)
|
|
defer engine.Stop()
|
|
|
|
// As for timeouts, cancellation is only checked at designated points. We ensure
|
|
// that we reach one of those points using the same method.
|
|
f1 := testStmt(func(context.Context) error {
|
|
time.Sleep(2 * time.Millisecond)
|
|
return nil
|
|
})
|
|
|
|
query1 := engine.newTestQuery(f1, f1)
|
|
query2 := engine.newTestQuery(f1, f1)
|
|
|
|
// Cancel query after starting it.
|
|
var wg sync.WaitGroup
|
|
var res *Result
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
res = query1.Exec()
|
|
wg.Done()
|
|
}()
|
|
time.Sleep(1 * time.Millisecond)
|
|
query1.Cancel()
|
|
wg.Wait()
|
|
|
|
if res.Err == nil {
|
|
t.Fatalf("expected cancellation error for query1 but got none")
|
|
}
|
|
if _, ok := res.Err.(ErrQueryCanceled); res.Err != nil && !ok {
|
|
t.Fatalf("expected cancellation error for query1 but got: %s", res.Err)
|
|
}
|
|
|
|
// Canceling query before starting it must have no effect.
|
|
query2.Cancel()
|
|
res = query2.Exec()
|
|
if res.Err != nil {
|
|
t.Fatalf("unexpeceted error on executing query2: %s", res.Err)
|
|
}
|
|
}
|
|
|
|
func TestEngineShutdown(t *testing.T) {
|
|
engine := NewEngine(nil)
|
|
|
|
handlerExecutions := 0
|
|
// Shutdown engine on first handler execution. Should handler execution ever become
|
|
// concurrent this test has to be adjusted accordingly.
|
|
f1 := testStmt(func(context.Context) error {
|
|
handlerExecutions++
|
|
engine.Stop()
|
|
time.Sleep(10 * time.Millisecond)
|
|
return nil
|
|
})
|
|
query1 := engine.newTestQuery(f1, f1)
|
|
query2 := engine.newTestQuery(f1, f1)
|
|
|
|
// Stopping the engine must cancel the base context. While executing queries is
|
|
// still possible, their context is canceled from the beginning and execution should
|
|
// terminate immediately.
|
|
|
|
res := query1.Exec()
|
|
if res.Err == nil {
|
|
t.Fatalf("expected error on shutdown during query but got none")
|
|
}
|
|
if handlerExecutions != 1 {
|
|
t.Fatalf("expected only one handler to be executed before query cancellation but got %d executions", handlerExecutions)
|
|
}
|
|
|
|
res2 := query2.Exec()
|
|
if res2.Err == nil {
|
|
t.Fatalf("expected error on querying shutdown engine but got none")
|
|
}
|
|
if handlerExecutions != 1 {
|
|
t.Fatalf("expected no handler execution for query after engine shutdown")
|
|
}
|
|
|
|
}
|