Very basic staleness handling for rules.

This commit is contained in:
Brian Brazil 2017-05-18 16:26:36 +01:00
parent 9aa8f822c1
commit 0400f3cfd2
1 changed files with 56 additions and 14 deletions

View File

@ -16,6 +16,7 @@ package rules
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math"
"net/url" "net/url"
"path/filepath" "path/filepath"
"sync" "sync"
@ -30,6 +31,9 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
@ -122,10 +126,11 @@ type Rule interface {
// Group is a set of rules that have a logical relation. // Group is a set of rules that have a logical relation.
type Group struct { type Group struct {
name string name string
interval time.Duration interval time.Duration
rules []Rule rules []Rule
opts *ManagerOptions opts *ManagerOptions
seriesInPreviousEval map[string]labels.Labels
done chan struct{} done chan struct{}
terminated chan struct{} terminated chan struct{}
@ -134,12 +139,13 @@ type Group struct {
// NewGroup makes a new Group with the given name, options, and rules. // NewGroup makes a new Group with the given name, options, and rules.
func NewGroup(name string, interval time.Duration, rules []Rule, opts *ManagerOptions) *Group { func NewGroup(name string, interval time.Duration, rules []Rule, opts *ManagerOptions) *Group {
return &Group{ return &Group{
name: name, name: name,
interval: interval, interval: interval,
rules: rules, rules: rules,
opts: opts, opts: opts,
done: make(chan struct{}), seriesInPreviousEval: map[string]labels.Labels{},
terminated: make(chan struct{}), done: make(chan struct{}),
terminated: make(chan struct{}),
} }
} }
@ -214,8 +220,9 @@ func (g *Group) offset() time.Duration {
return time.Duration(next - now) return time.Duration(next - now)
} }
// copyState copies the alerting rule state from the given group. // copyState copies the alerting rule and staleness related state from the given group.
func (g *Group) copyState(from *Group) { func (g *Group) copyState(from *Group) {
g.seriesInPreviousEval = from.seriesInPreviousEval
for _, fromRule := range from.rules { for _, fromRule := range from.rules {
far, ok := fromRule.(*AlertingRule) far, ok := fromRule.(*AlertingRule)
if !ok { if !ok {
@ -252,8 +259,10 @@ func typeForRule(r Rule) ruleType {
// rule dependency. // rule dependency.
func (g *Group) Eval() { func (g *Group) Eval() {
var ( var (
now = time.Now() now = time.Now()
wg sync.WaitGroup wg sync.WaitGroup
mu sync.Mutex
seriesReturned = make(map[string]labels.Labels, len(g.seriesInPreviousEval))
) )
for _, rule := range g.rules { for _, rule := range g.rules {
@ -307,6 +316,10 @@ func (g *Group) Eval() {
default: default:
log.With("sample", s).With("err", err).Warn("Rule evaluation result discarded") log.With("sample", s).With("err", err).Warn("Rule evaluation result discarded")
} }
} else {
mu.Lock()
seriesReturned[s.Metric.String()] = s.Metric
mu.Unlock()
} }
} }
if numOutOfOrder > 0 { if numOutOfOrder > 0 {
@ -321,6 +334,35 @@ func (g *Group) Eval() {
}(rule) }(rule)
} }
wg.Wait() wg.Wait()
// TODO(bbrazil): This should apply per-rule.
app, err := g.opts.Appendable.Appender()
if err != nil {
log.With("err", err).Warn("creating appender failed")
return
}
for metric, lset := range g.seriesInPreviousEval {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, timestamp.FromTime(now), math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different group.
continue
default:
log.With("sample", metric).With("err", err).Warn("adding stale sample failed")
if err := app.Rollback(); err != nil {
log.With("err", err).Warn("rule stale sample rollback failed")
}
}
}
}
if err := app.Commit(); err != nil {
log.With("err", err).Warn("rule stale sample appending failed")
}
g.seriesInPreviousEval = seriesReturned
} }
// sendAlerts sends alert notifications for the given rule. // sendAlerts sends alert notifications for the given rule.
@ -433,7 +475,7 @@ func (m *Manager) ApplyConfig(conf *config.Config) error {
wg.Add(1) wg.Add(1)
// If there is an old group with the same identifier, stop it and wait for // If there is an old group with the same identifier, stop it and wait for
// it to finish the current iteration. Then copy its into the new group. // it to finish the current iteration. Then copy it into the new group.
oldg, ok := m.groups[newg.name] oldg, ok := m.groups[newg.name]
delete(m.groups, newg.name) delete(m.groups, newg.name)