diff --git a/rules/manager.go b/rules/manager.go index 7fe6b05ec..993302637 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -16,6 +16,7 @@ package rules import ( "fmt" "io/ioutil" + "math" "net/url" "path/filepath" "sync" @@ -30,6 +31,9 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/strutil" @@ -122,10 +126,11 @@ type Rule interface { // Group is a set of rules that have a logical relation. type Group struct { - name string - interval time.Duration - rules []Rule - opts *ManagerOptions + name string + interval time.Duration + rules []Rule + opts *ManagerOptions + seriesInPreviousEval map[string]labels.Labels done chan struct{} terminated chan struct{} @@ -134,12 +139,13 @@ type Group struct { // NewGroup makes a new Group with the given name, options, and rules. func NewGroup(name string, interval time.Duration, rules []Rule, opts *ManagerOptions) *Group { return &Group{ - name: name, - interval: interval, - rules: rules, - opts: opts, - done: make(chan struct{}), - terminated: make(chan struct{}), + name: name, + interval: interval, + rules: rules, + opts: opts, + seriesInPreviousEval: map[string]labels.Labels{}, + done: make(chan struct{}), + terminated: make(chan struct{}), } } @@ -214,8 +220,9 @@ func (g *Group) offset() time.Duration { return time.Duration(next - now) } -// copyState copies the alerting rule state from the given group. +// copyState copies the alerting rule and staleness related state from the given group. func (g *Group) copyState(from *Group) { + g.seriesInPreviousEval = from.seriesInPreviousEval for _, fromRule := range from.rules { far, ok := fromRule.(*AlertingRule) if !ok { @@ -252,8 +259,10 @@ func typeForRule(r Rule) ruleType { // rule dependency. func (g *Group) Eval() { var ( - now = time.Now() - wg sync.WaitGroup + now = time.Now() + wg sync.WaitGroup + mu sync.Mutex + seriesReturned = make(map[string]labels.Labels, len(g.seriesInPreviousEval)) ) for _, rule := range g.rules { @@ -307,6 +316,10 @@ func (g *Group) Eval() { default: log.With("sample", s).With("err", err).Warn("Rule evaluation result discarded") } + } else { + mu.Lock() + seriesReturned[s.Metric.String()] = s.Metric + mu.Unlock() } } if numOutOfOrder > 0 { @@ -321,6 +334,35 @@ func (g *Group) Eval() { }(rule) } wg.Wait() + + // TODO(bbrazil): This should apply per-rule. + app, err := g.opts.Appendable.Appender() + if err != nil { + log.With("err", err).Warn("creating appender failed") + return + } + for metric, lset := range g.seriesInPreviousEval { + if _, ok := seriesReturned[metric]; !ok { + // Series no longer exposed, mark it stale. + _, err = app.Add(lset, timestamp.FromTime(now), math.Float64frombits(value.StaleNaN)) + switch err { + case nil: + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not count these in logging, as this is expected if series + // is exposed from a different group. + continue + default: + log.With("sample", metric).With("err", err).Warn("adding stale sample failed") + if err := app.Rollback(); err != nil { + log.With("err", err).Warn("rule stale sample rollback failed") + } + } + } + } + if err := app.Commit(); err != nil { + log.With("err", err).Warn("rule stale sample appending failed") + } + g.seriesInPreviousEval = seriesReturned } // sendAlerts sends alert notifications for the given rule. @@ -433,7 +475,7 @@ func (m *Manager) ApplyConfig(conf *config.Config) error { wg.Add(1) // If there is an old group with the same identifier, stop it and wait for - // it to finish the current iteration. Then copy its into the new group. + // it to finish the current iteration. Then copy it into the new group. oldg, ok := m.groups[newg.name] delete(m.groups, newg.name)