diff --git a/inhibit.go b/inhibit.go index 1669d11a..7f5a9cf6 100644 --- a/inhibit.go +++ b/inhibit.go @@ -15,6 +15,7 @@ package main import ( "sync" + "time" "github.com/prometheus/common/log" "github.com/prometheus/common/model" @@ -31,7 +32,8 @@ type Inhibitor struct { rules []*InhibitRule marker types.Marker - mtx sync.RWMutex + mtx sync.RWMutex + stopc chan struct{} } // NewInhibitor returns a new Inhibitor. @@ -41,41 +43,87 @@ func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker) marker: mk, } for _, cr := range rs { - ih.rules = append(ih.rules, NewInhibitRule(cr)) + r := NewInhibitRule(cr) + ih.rules = append(ih.rules, r) } return ih } -// Mutes returns true iff the given label set is muted. -func (ih *Inhibitor) Mutes(lset model.LabelSet) bool { - alerts := ih.alerts.GetPending() - defer alerts.Close() - - // TODO(fabxc): improve erroring for iterators so it does not - // go silenced here. - - for alert := range alerts.Next() { - if err := alerts.Err(); err != nil { - log.Errorf("Error iterating alerts: %s", err) - continue +func (ih *Inhibitor) runGC() { + for { + select { + case <-time.After(15 * time.Minute): + for _, r := range ih.rules { + r.gc() + } + case <-ih.stopc: + return } - if alert.Resolved() { - continue - } - for _, rule := range ih.rules { - if rule.Mutes(alert.Labels, lset) { - ih.marker.SetInhibited(lset.Fingerprint(), true) - return true + } +} + +// Run the Inihibitor's background processing. +func (ih *Inhibitor) Run() { + ih.mtx.Lock() + ih.stopc = make(chan struct{}) + ih.mtx.Unlock() + + go ih.runGC() + + it := ih.alerts.Subscribe() + defer it.Close() + + for { + select { + case <-ih.stopc: + return + case a := <-it.Next(): + if err := it.Err(); err != nil { + log.Errorf("Error iterating alerts: %s", err) + continue + } + if a.Resolved() { + // As alerts can also time out without an update, we never + // handle new resolved alerts but invalidate the cache on read. + continue + } + // Populate the inhibition rules' cache. + for _, r := range ih.rules { + if r.SourceMatchers.Match(a.Labels) { + r.set(a) + } } } } - if err := alerts.Err(); err != nil { - log.Errorf("Error after iterating alerts: %s", err) +} + +// Stop the Inhibitor's background processing. +func (ih *Inhibitor) Stop() { + if ih == nil { + return } + ih.mtx.Lock() + defer ih.mtx.Unlock() - ih.marker.SetInhibited(lset.Fingerprint(), false) + if ih.stopc != nil { + close(ih.stopc) + ih.stopc = nil + } +} +// Mutes returns true iff the given label set is muted. +func (ih *Inhibitor) Mutes(lset model.LabelSet) bool { + fp := lset.Fingerprint() + + for _, r := range ih.rules { + if r.TargetMatchers.Match(lset) && r.hasEqual(lset) { + ih.marker.SetInhibited(fp, true) + return true + } + } + ih.marker.SetInhibited(fp, false) return false + } // An InhibitRule specifies that a class of (source) alerts should inhibit @@ -93,6 +141,10 @@ type InhibitRule struct { // A set of label names whose label values need to be identical in source and // target alerts in order for the inhibition to take effect. Equal map[model.LabelName]struct{} + + mtx sync.RWMutex + // Cache of alerts matching source labels. + scache map[model.Fingerprint]*types.Alert } // NewInhibitRule returns a new InihibtRule based on a configuration definition. @@ -125,20 +177,48 @@ func NewInhibitRule(cr *config.InhibitRule) *InhibitRule { SourceMatchers: sourcem, TargetMatchers: targetm, Equal: equal, + scache: map[model.Fingerprint]*types.Alert{}, } } -// Mutes returns true iff the Inhibition rule applies for the given -// source and target label set. -func (r *InhibitRule) Mutes(source, target model.LabelSet) bool { - if !r.TargetMatchers.Match(target) || !r.SourceMatchers.Match(source) { - return false +// set the alert in the source cache. +func (r *InhibitRule) set(a *types.Alert) { + r.mtx.Lock() + r.mtx.Unlock() + + r.scache[a.Fingerprint()] = a +} + +// hasEqual checks whether the source cache contains alerts matching +// the equal labels for the given label set. +func (r *InhibitRule) hasEqual(lset model.LabelSet) bool { + r.mtx.RLock() + defer r.mtx.RUnlock() + +Outer: + for _, a := range r.scache { + // The cache might be stale and contain resolved alerts. + if a.Resolved() { + continue + } + for n := range r.Equal { + if a.Labels[n] != lset[n] { + continue Outer + } + } + return true } - for ln := range r.Equal { - if source[ln] != target[ln] { - return false + return false +} + +// gc clears out resolved alerts from the source cache. +func (r *InhibitRule) gc() { + r.mtx.Lock() + defer r.mtx.Unlock() + + for fp, a := range r.scache { + if a.Resolved() { + delete(r.scache, fp) } } - - return true } diff --git a/inhibit_test.go b/inhibit_test.go new file mode 100644 index 00000000..f212471d --- /dev/null +++ b/inhibit_test.go @@ -0,0 +1,168 @@ +// Copyright 2016 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "reflect" + "testing" + "time" + + "github.com/kylelemons/godebug/pretty" + "github.com/prometheus/alertmanager/types" + "github.com/prometheus/common/model" +) + +func TestInhibitRuleHasEqual(t *testing.T) { + now := time.Now() + cases := []struct { + initial map[model.Fingerprint]*types.Alert + equal model.LabelNames + input model.LabelSet + result bool + }{ + { + // No source alerts at all. + initial: map[model.Fingerprint]*types.Alert{}, + input: model.LabelSet{"a": "b"}, + result: false, + }, + { + // No equal labels, any source alerts satisfies the requirement. + initial: map[model.Fingerprint]*types.Alert{1: &types.Alert{}}, + input: model.LabelSet{"a": "b"}, + result: true, + }, + { + // Matching but already resolved. + initial: map[model.Fingerprint]*types.Alert{ + 1: &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"a": "b", "b": "f"}, + StartsAt: now.Add(-time.Minute), + EndsAt: now.Add(-time.Second), + }, + }, + 2: &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"a": "b", "b": "c"}, + StartsAt: now.Add(-time.Minute), + EndsAt: now.Add(-time.Second), + }, + }, + }, + equal: model.LabelNames{"a", "b"}, + input: model.LabelSet{"a": "b", "b": "c"}, + result: false, + }, + { + // Matching but already resolved. + initial: map[model.Fingerprint]*types.Alert{ + 1: &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"a": "b", "c": "d"}, + StartsAt: now.Add(-time.Minute), + EndsAt: now.Add(-time.Second), + }, + }, + 2: &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"a": "b", "c": "f"}, + StartsAt: now.Add(-time.Minute), + EndsAt: now.Add(-time.Second), + }, + }, + }, + equal: model.LabelNames{"a"}, + input: model.LabelSet{"a": "b"}, + result: false, + }, + { + // Equal label does not match. + initial: map[model.Fingerprint]*types.Alert{ + 1: &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"a": "c", "c": "d"}, + StartsAt: now.Add(-time.Minute), + EndsAt: now.Add(-time.Second), + }, + }, + 2: &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"a": "c", "c": "f"}, + StartsAt: now.Add(-time.Minute), + EndsAt: now.Add(-time.Second), + }, + }, + }, + equal: model.LabelNames{"a"}, + input: model.LabelSet{"a": "b"}, + result: false, + }, + } + + for _, c := range cases { + r := &InhibitRule{ + Equal: map[model.LabelName]struct{}{}, + scache: map[model.Fingerprint]*types.Alert{}, + } + for _, ln := range c.equal { + r.Equal[ln] = struct{}{} + } + for k, v := range c.initial { + r.scache[k] = v + } + + if have := r.hasEqual(c.input); have != c.result { + t.Errorf("Unexpected result %q, expected %q", have, c.result) + } + if !reflect.DeepEqual(r.scache, c.initial) { + t.Errorf("Cache state unexpectedly changed") + t.Errorf(pretty.Compare(r.scache, c.initial)) + } + } +} + +func TestInhibitRuleGC(t *testing.T) { + // TODO(fabxc): add now() injection function to Resolved() to remove + // dependency on machine time in this test. + now := time.Now() + newAlert := func(start, end time.Duration) *types.Alert { + return &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"a": "b"}, + StartsAt: now.Add(start * time.Minute), + EndsAt: now.Add(end * time.Minute), + }, + } + } + + before := map[model.Fingerprint]*types.Alert{ + 0: newAlert(-10, -5), + 1: newAlert(10, 20), + 2: newAlert(-10, 10), + 3: newAlert(-10, -1), + } + after := map[model.Fingerprint]*types.Alert{ + 1: newAlert(10, 20), + 2: newAlert(-10, 10), + } + + r := &InhibitRule{scache: before} + r.gc() + + if !reflect.DeepEqual(r.scache, after) { + t.Errorf("Unexpected cache state after GC") + t.Errorf(pretty.Compare(r.scache, after)) + } +} diff --git a/main.go b/main.go index 52141e49..2ac77b3d 100644 --- a/main.go +++ b/main.go @@ -171,12 +171,14 @@ func main() { } tmpl.ExternalURL = amURL + inhibitor.Stop() disp.Stop() inhibitor = NewInhibitor(alerts, conf.InhibitRules, marker) disp = NewDispatcher(alerts, NewRoute(conf.Route, nil), build(conf.Receivers), marker) go disp.Run() + go inhibitor.Run() return nil }