Stop routing rule statements through the engine.

This commit is contained in:
Fabian Reinartz 2015-04-29 11:08:56 +02:00
parent 8d7c479fed
commit fe935179cd
4 changed files with 94 additions and 323 deletions

13
main.go
View File

@ -167,16 +167,9 @@ func NewPrometheus() *prometheus {
PrometheusURL: web.MustBuildServerURL(*pathPrefix),
PathPrefix: *pathPrefix,
})
for _, rf := range conf.Global.GetRuleFile() {
query, err := queryEngine.NewQueryFromFile(rf)
if err != nil {
glog.Errorf("Error loading rule file %q: %s", rf, err)
os.Exit(1)
}
if res := query.Exec(); res.Err != nil {
glog.Errorf("Error initializing rules: %s", res.Err)
os.Exit(1)
}
if err := ruleManager.LoadRuleFiles(conf.Global.GetRuleFile()...); err != nil {
glog.Errorf("Error loading rule files: %s", err)
os.Exit(1)
}
flags := map[string]string{}

View File

@ -16,11 +16,9 @@ package promql
import (
"flag"
"fmt"
"io/ioutil"
"math"
"runtime"
"sort"
"sync"
"time"
"golang.org/x/net/context"
@ -165,14 +163,10 @@ type (
ErrQueryTimeout string
// ErrQueryCanceled is returned if a query was canceled during processing.
ErrQueryCanceled string
// ErrNoHandlers is returned if no handlers were registered for the
// execution of a statement.
ErrNoHandlers string
)
func (e ErrQueryTimeout) Error() string { return fmt.Sprintf("query timed out in %s", e) }
func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was canceled in %s", e) }
func (e ErrNoHandlers) Error() string { return fmt.Sprintf("no handlers registered to process %s", e) }
// A Query is derived from an a raw query string and can be run against an engine
// it is associated with.
@ -193,9 +187,6 @@ type query struct {
q string
// Statements of the parsed query.
stmts Statements
// On finished execution two bools indicating success of the execution
// are sent on the channel.
done chan bool
// Timer stats for the query execution.
stats *stats.TimerGroup
// Cancelation function for the query.
@ -231,15 +222,6 @@ func (q *query) Exec() *Result {
return &Result{Err: err, Value: res}
}
type (
// AlertHandlers can be registered with an engine and are called on
// each executed alert statement.
AlertHandler func(context.Context, *AlertStmt) error
// RecordHandlers can be registered with an engine and are called on
// each executed record statement.
RecordHandler func(context.Context, *RecordStmt) error
)
// contextDone returns an error if the context was canceled or timed out.
func contextDone(ctx context.Context, env string) error {
select {
@ -258,32 +240,24 @@ func contextDone(ctx context.Context, env string) error {
}
}
// Engine handles the liftetime of queries from beginning to end. It is connected
// to a storage.
// Engine handles the liftetime of queries from beginning to end.
// It is connected to a storage.
type Engine struct {
sync.RWMutex
// The storage on which the engine operates.
storage local.Storage
// The base context for all queries and its cancellation function.
baseCtx context.Context
cancelQueries func()
// Handlers for the statements.
alertHandlers map[string]AlertHandler
recordHandlers map[string]RecordHandler
}
// NewEngine returns a new engine.
func NewEngine(storage local.Storage) *Engine {
ctx, cancel := context.WithCancel(context.Background())
return &Engine{
storage: storage,
baseCtx: ctx,
cancelQueries: cancel,
alertHandlers: map[string]AlertHandler{},
recordHandlers: map[string]RecordHandler{},
storage: storage,
baseCtx: ctx,
cancelQueries: cancel,
}
}
@ -292,31 +266,6 @@ func (ng *Engine) Stop() {
ng.cancelQueries()
}
// NewQuery returns a new query of the given query string.
func (ng *Engine) NewQuery(qs string) (Query, error) {
stmts, err := ParseStmts(qs)
if err != nil {
return nil, err
}
query := &query{
q: qs,
stmts: stmts,
ng: ng,
done: make(chan bool, 2),
stats: stats.NewTimerGroup(),
}
return query, nil
}
// NewQueryFromFile reads a file and returns a query of statements it contains.
func (ng *Engine) NewQueryFromFile(filename string) (Query, error) {
content, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
return ng.NewQuery(string(content))
}
// NewInstantQuery returns an evaluation query for the given expression at the given time.
func (ng *Engine) NewInstantQuery(es string, ts clientmodel.Timestamp) (Query, error) {
return ng.NewRangeQuery(es, ts, ts, 0)
@ -336,77 +285,64 @@ func (ng *Engine) NewRangeQuery(qs string, start, end clientmodel.Timestamp, int
Interval: interval,
}
query := &query{
qry := &query{
q: qs,
stmts: Statements{es},
ng: ng,
done: make(chan bool, 2),
stats: stats.NewTimerGroup(),
}
return query, nil
return qry, nil
}
// exec executes all statements in the query. For evaluation statements only
// one statement per query is allowed, after which the execution returns.
// testStmt is an internal helper statement that allows execution
// of an arbitrary function during handling. It is used to test the Engine.
type testStmt func(context.Context) error
func (testStmt) String() string { return "test statement" }
func (testStmt) DotGraph() string { return "test statement" }
func (testStmt) stmt() {}
func (ng *Engine) newTestQuery(stmts ...Statement) Query {
qry := &query{
q: "test statement",
stmts: Statements(stmts),
ng: ng,
stats: stats.NewTimerGroup(),
}
return qry
}
// exec executes the query.
//
// At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
const env = "query execution"
// Cancel when execution is done or an error was raised.
defer q.cancel()
// The base context might already be canceled (e.g. during shutdown).
if err := contextDone(ctx, env); err != nil {
return nil, err
}
evalTimer := q.stats.GetTimer(stats.TotalEvalTime).Start()
defer evalTimer.Stop()
ng.RLock()
alertHandlers := []AlertHandler{}
for _, h := range ng.alertHandlers {
alertHandlers = append(alertHandlers, h)
}
recordHandlers := []RecordHandler{}
for _, h := range ng.recordHandlers {
recordHandlers = append(recordHandlers, h)
}
ng.RUnlock()
for _, stmt := range q.stmts {
// The base context might already be canceled on the first iteration (e.g. during shutdown).
if err := contextDone(ctx, env); err != nil {
return nil, err
}
switch s := stmt.(type) {
case *AlertStmt:
if len(alertHandlers) == 0 {
return nil, ErrNoHandlers("alert statement")
}
for _, h := range alertHandlers {
if err := contextDone(ctx, env); err != nil {
return nil, err
}
err := h(ctx, s)
if err != nil {
return nil, err
}
}
case *RecordStmt:
if len(recordHandlers) == 0 {
return nil, ErrNoHandlers("record statement")
}
for _, h := range recordHandlers {
if err := contextDone(ctx, env); err != nil {
return nil, err
}
err := h(ctx, s)
if err != nil {
return nil, err
}
}
case *EvalStmt:
// Currently, only one execution statement per query is allowed.
return ng.execEvalStmt(ctx, q, s)
case testStmt:
if err := s(ctx); err != nil {
return nil, err
}
default:
panic(fmt.Errorf("statement of unknown type %T", stmt))
panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", stmt))
}
}
return nil, nil
@ -1050,34 +986,6 @@ func (ev *evaluator) aggregation(op itemType, grouping clientmodel.LabelNames, k
return resultVector
}
// RegisterAlertHandler registers a new alert handler of the given name.
func (ng *Engine) RegisterAlertHandler(name string, h AlertHandler) {
ng.Lock()
ng.alertHandlers[name] = h
ng.Unlock()
}
// RegisterRecordHandler registers a new record handler of the given name.
func (ng *Engine) RegisterRecordHandler(name string, h RecordHandler) {
ng.Lock()
ng.recordHandlers[name] = h
ng.Unlock()
}
// UnregisterAlertHandler removes the alert handler with the given name.
func (ng *Engine) UnregisterAlertHandler(name string) {
ng.Lock()
delete(ng.alertHandlers, name)
ng.Unlock()
}
// UnregisterRecordHandler removes the record handler with the given name.
func (ng *Engine) UnregisterRecordHandler(name string) {
ng.Lock()
delete(ng.recordHandlers, name)
ng.Unlock()
}
// btos returns 1 if b is true, 0 otherwise.
func btos(b bool) clientmodel.SampleValue {
if b {

View File

@ -1,7 +1,6 @@
package promql
import (
"reflect"
"sync"
"testing"
"time"
@ -11,6 +10,10 @@ import (
"github.com/prometheus/prometheus/storage/local"
)
var noop = testStmt(func(context.Context) error {
return nil
})
func TestQueryTimeout(t *testing.T) {
*defaultQueryTimeout = 5 * time.Millisecond
defer func() {
@ -24,23 +27,14 @@ func TestQueryTimeout(t *testing.T) {
engine := NewEngine(storage)
defer engine.Stop()
query, err := engine.NewQuery("foo = bar")
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
f1 := testStmt(func(context.Context) error {
time.Sleep(10 * time.Millisecond)
return nil
})
// Timeouts are not exact but checked in designated places. For example between
// invoking handlers. Thus, we reigster two handlers that take some time to ensure we check
// after exceeding the timeout.
// Should the implementation of this area change, the test might have to be adjusted.
engine.RegisterRecordHandler("test", func(context.Context, *RecordStmt) error {
time.Sleep(10 * time.Millisecond)
return nil
})
engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error {
time.Sleep(10 * time.Millisecond)
return nil
})
// invoking test statements.
query := engine.newTestQuery(f1, f1)
res := query.Exec()
if res.Err == nil {
@ -58,26 +52,16 @@ func TestQueryCancel(t *testing.T) {
engine := NewEngine(storage)
defer engine.Stop()
query1, err := engine.NewQuery("foo = bar")
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
query2, err := engine.NewQuery("foo = baz")
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
// As for timeouts, cancellation is only checked at designated points. We ensure
// that we reach one of those points using the same method.
engine.RegisterRecordHandler("test1", func(context.Context, *RecordStmt) error {
<-time.After(2 * time.Millisecond)
return nil
})
engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error {
<-time.After(2 * time.Millisecond)
f1 := testStmt(func(context.Context) error {
time.Sleep(2 * time.Millisecond)
return nil
})
query1 := engine.newTestQuery(f1, f1)
query2 := engine.newTestQuery(f1, f1)
// Cancel query after starting it.
var wg sync.WaitGroup
var res *Result
@ -87,7 +71,7 @@ func TestQueryCancel(t *testing.T) {
res = query1.Exec()
wg.Done()
}()
<-time.After(1 * time.Millisecond)
time.Sleep(1 * time.Millisecond)
query1.Cancel()
wg.Wait()
@ -112,34 +96,20 @@ func TestEngineShutdown(t *testing.T) {
engine := NewEngine(storage)
query1, err := engine.NewQuery("foo = bar")
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
query2, err := engine.NewQuery("foo = baz")
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
handlerExecutions := 0
// Shutdown engine on first handler execution. Should handler execution ever become
// concurrent this test has to be adjusted accordingly.
engine.RegisterRecordHandler("test", func(context.Context, *RecordStmt) error {
handlerExecutions++
engine.Stop()
time.Sleep(10 * time.Millisecond)
return nil
})
engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error {
f1 := testStmt(func(context.Context) error {
handlerExecutions++
engine.Stop()
time.Sleep(10 * time.Millisecond)
return nil
})
query1 := engine.newTestQuery(f1, f1)
query2 := engine.newTestQuery(f1, f1)
// Stopping the engine should cancel the base context. While setting up queries is
// still possible their context is canceled from the beginning and execution should
// Stopping the engine must cancel the base context. While executing queries is
// still possible, their context is canceled from the beginning and execution should
// terminate immediately.
res := query1.Exec()
@ -147,7 +117,7 @@ func TestEngineShutdown(t *testing.T) {
t.Fatalf("expected error on shutdown during query but got none")
}
if handlerExecutions != 1 {
t.Fatalf("expected only one handler to be executed before query cancellation but got %d executons", handlerExecutions)
t.Fatalf("expected only one handler to be executed before query cancellation but got %d executions", handlerExecutions)
}
res2 := query2.Exec()
@ -159,114 +129,3 @@ func TestEngineShutdown(t *testing.T) {
}
}
func TestAlertHandler(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()
engine := NewEngine(storage)
defer engine.Stop()
qs := `ALERT Foo IF bar FOR 5m WITH {a="b"} SUMMARY "sum" DESCRIPTION "desc"`
doQuery := func(expectFailure bool) *AlertStmt {
query, err := engine.NewQuery(qs)
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
res := query.Exec()
if expectFailure && res.Err == nil {
t.Fatalf("expected error but got none.")
}
if res.Err != nil && !expectFailure {
t.Fatalf("error on executing alert query: %s", res.Err)
}
// That this alert statement is correct is tested elsewhere.
return query.Statements()[0].(*AlertStmt)
}
// We expect an error if nothing is registered to handle the query.
alertStmt := doQuery(true)
receivedCalls := 0
// Ensure that we receive the correct statement.
engine.RegisterAlertHandler("test", func(ctx context.Context, as *AlertStmt) error {
if !reflect.DeepEqual(alertStmt, as) {
t.Errorf("received alert statement did not match input: %q", qs)
t.Fatalf("no match\n\nexpected:\n%s\ngot: \n%s\n", Tree(alertStmt), Tree(as))
}
receivedCalls++
return nil
})
for i := 0; i < 10; i++ {
doQuery(false)
if receivedCalls != i+1 {
t.Fatalf("alert handler was not called on query execution")
}
}
engine.UnregisterAlertHandler("test")
// We must receive no further calls after unregistering.
doQuery(true)
if receivedCalls != 10 {
t.Fatalf("received calls after unregistering alert handler")
}
}
func TestRecordHandler(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()
engine := NewEngine(storage)
defer engine.Stop()
qs := `foo = bar`
doQuery := func(expectFailure bool) *RecordStmt {
query, err := engine.NewQuery(qs)
if err != nil {
t.Fatalf("error parsing query: %s", err)
}
res := query.Exec()
if expectFailure && res.Err == nil {
t.Fatalf("expected error but got none.")
}
if res.Err != nil && !expectFailure {
t.Fatalf("error on executing record query: %s", res.Err)
}
return query.Statements()[0].(*RecordStmt)
}
// We expect an error if nothing is registered to handle the query.
recordStmt := doQuery(true)
receivedCalls := 0
// Ensure that we receive the correct statement.
engine.RegisterRecordHandler("test", func(ctx context.Context, rs *RecordStmt) error {
if !reflect.DeepEqual(recordStmt, rs) {
t.Errorf("received record statement did not match input: %q", qs)
t.Fatalf("no match\n\nexpected:\n%s\ngot: \n%s\n", Tree(recordStmt), Tree(rs))
}
receivedCalls++
return nil
})
for i := 0; i < 10; i++ {
doQuery(false)
if receivedCalls != i+1 {
t.Fatalf("record handler was not called on query execution")
}
}
engine.UnregisterRecordHandler("test")
// We must receive no further calls after unregistering.
doQuery(true)
if receivedCalls != 10 {
t.Fatalf("received calls after unregistering record handler")
}
}

View File

@ -15,12 +15,12 @@ package rules
import (
"fmt"
"io/ioutil"
"sync"
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
clientmodel "github.com/prometheus/client_golang/model"
@ -113,9 +113,6 @@ func NewManager(o *ManagerOptions) *Manager {
notificationHandler: o.NotificationHandler,
prometheusURL: o.PrometheusURL,
}
manager.queryEngine.RegisterAlertHandler("rule_manager", manager.AddAlertingRule)
manager.queryEngine.RegisterRecordHandler("rule_manager", manager.AddRecordingRule)
return manager
}
@ -258,24 +255,37 @@ func (m *Manager) runIteration() {
wg.Wait()
}
func (m *Manager) AddAlertingRule(ctx context.Context, r *promql.AlertStmt) error {
rule := NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Summary, r.Description)
// LoadRuleFiles loads alerting and recording rules from the given files.
func (m *Manager) LoadRuleFiles(filenames ...string) error {
m.Lock()
m.rules = append(m.rules, rule)
m.Unlock()
return nil
}
func (m *Manager) AddRecordingRule(ctx context.Context, r *promql.RecordStmt) error {
rule := &RecordingRule{r.Name, r.Expr, r.Labels}
m.Lock()
m.rules = append(m.rules, rule)
m.Unlock()
defer m.Unlock()
for _, fn := range filenames {
content, err := ioutil.ReadFile(fn)
if err != nil {
return err
}
stmts, err := promql.ParseStmts(string(content))
if err != nil {
return fmt.Errorf("error parsing %s: %s", fn, err)
}
for _, stmt := range stmts {
switch r := stmt.(type) {
case *promql.AlertStmt:
rule := NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Summary, r.Description)
m.rules = append(m.rules, rule)
case *promql.RecordStmt:
rule := &RecordingRule{r.Name, r.Expr, r.Labels}
m.rules = append(m.rules, rule)
default:
panic("retrieval.Manager.LoadRuleFiles: unknown statement type")
}
}
}
return nil
}
// Rules returns the list of the manager's rules.
func (m *Manager) Rules() []Rule {
m.Lock()
defer m.Unlock()
@ -285,6 +295,7 @@ func (m *Manager) Rules() []Rule {
return rules
}
// AlertingRules returns the list of the manager's alerting rules.
func (m *Manager) AlertingRules() []*AlertingRule {
m.Lock()
defer m.Unlock()