diff --git a/rules/manager.go b/rules/manager.go index b3ed28f8f2..7fcc579090 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -129,8 +129,8 @@ type Group struct { name string interval time.Duration rules []Rule + seriesInPreviousEval []map[string]labels.Labels // One per Rule. opts *ManagerOptions - seriesInPreviousEval map[string]labels.Labels done chan struct{} terminated chan struct{} @@ -143,7 +143,7 @@ func NewGroup(name string, interval time.Duration, rules []Rule, opts *ManagerOp interval: interval, rules: rules, opts: opts, - seriesInPreviousEval: map[string]labels.Labels{}, + seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)), done: make(chan struct{}), terminated: make(chan struct{}), } @@ -259,17 +259,15 @@ func typeForRule(r Rule) ruleType { // rule dependency. func (g *Group) Eval(ts time.Time) { var ( - wg sync.WaitGroup - mu sync.Mutex - seriesReturned = make(map[string]labels.Labels, len(g.seriesInPreviousEval)) + wg sync.WaitGroup ) - for _, rule := range g.rules { + for i, rule := range g.rules { rtyp := string(typeForRule(rule)) wg.Add(1) // BUG(julius): Look at fixing thundering herd. - go func(rule Rule) { + go func(i int, rule Rule) { defer wg.Done() defer func(t time.Time) { @@ -303,6 +301,7 @@ func (g *Group) Eval(ts time.Time) { return } + seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) for _, s := range vector { if _, err := app.Add(s.Metric, s.T, s.V); err != nil { switch err { @@ -316,9 +315,7 @@ func (g *Group) Eval(ts time.Time) { log.With("sample", s).With("err", err).Warn("Rule evaluation result discarded") } } else { - mu.Lock() seriesReturned[s.Metric.String()] = s.Metric - mu.Unlock() } } if numOutOfOrder > 0 { @@ -327,41 +324,29 @@ func (g *Group) Eval(ts time.Time) { if numDuplicates > 0 { log.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp") } - if err := app.Commit(); err != nil { - log.With("err", err).Warn("rule sample appending failed") - } - }(rule) - } - wg.Wait() - // TODO(bbrazil): This should apply per-rule. - app, err := g.opts.Appendable.Appender() - if err != nil { - log.With("err", err).Warn("creating appender failed") - return - } - for metric, lset := range g.seriesInPreviousEval { - if _, ok := seriesReturned[metric]; !ok { - // Series no longer exposed, mark it stale. - _, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) - switch err { - case nil: - case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: - // Do not count these in logging, as this is expected if series - // is exposed from a different group. - continue - default: - log.With("sample", metric).With("err", err).Warn("adding stale sample failed") - if err := app.Rollback(); err != nil { - log.With("err", err).Warn("rule stale sample rollback failed") + for metric, lset := range g.seriesInPreviousEval[i] { + if _, ok := seriesReturned[metric]; !ok { + // Series no longer exposed, mark it stale. + _, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) + switch err { + case nil: + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not count these in logging, as this is expected if series + // is exposed from a different rule. + default: + log.With("sample", metric).With("err", err).Warn("adding stale sample failed") + } } } - } + if err := app.Commit(); err != nil { + log.With("err", err).Warn("rule sample appending failed") + } else { + g.seriesInPreviousEval[i] = seriesReturned + } + }(i, rule) } - if err := app.Commit(); err != nil { - log.With("err", err).Warn("rule stale sample appending failed") - } - g.seriesInPreviousEval = seriesReturned + wg.Wait() } // sendAlerts sends alert notifications for the given rule.