Merge pull request #379 from prometheus/fabxc-inhibit-fix
inhibit: reduce O(n^2) complexity problem
This commit is contained in:
commit
e6892e1063
148
inhibit.go
148
inhibit.go
|
@ -15,6 +15,7 @@ package main
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -31,7 +32,8 @@ type Inhibitor struct {
|
|||
rules []*InhibitRule
|
||||
marker types.Marker
|
||||
|
||||
mtx sync.RWMutex
|
||||
mtx sync.RWMutex
|
||||
stopc chan struct{}
|
||||
}
|
||||
|
||||
// NewInhibitor returns a new Inhibitor.
|
||||
|
@ -41,41 +43,87 @@ func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker)
|
|||
marker: mk,
|
||||
}
|
||||
for _, cr := range rs {
|
||||
ih.rules = append(ih.rules, NewInhibitRule(cr))
|
||||
r := NewInhibitRule(cr)
|
||||
ih.rules = append(ih.rules, r)
|
||||
}
|
||||
return ih
|
||||
}
|
||||
|
||||
// Mutes returns true iff the given label set is muted.
|
||||
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
|
||||
alerts := ih.alerts.GetPending()
|
||||
defer alerts.Close()
|
||||
|
||||
// TODO(fabxc): improve erroring for iterators so it does not
|
||||
// go silenced here.
|
||||
|
||||
for alert := range alerts.Next() {
|
||||
if err := alerts.Err(); err != nil {
|
||||
log.Errorf("Error iterating alerts: %s", err)
|
||||
continue
|
||||
func (ih *Inhibitor) runGC() {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(15 * time.Minute):
|
||||
for _, r := range ih.rules {
|
||||
r.gc()
|
||||
}
|
||||
case <-ih.stopc:
|
||||
return
|
||||
}
|
||||
if alert.Resolved() {
|
||||
continue
|
||||
}
|
||||
for _, rule := range ih.rules {
|
||||
if rule.Mutes(alert.Labels, lset) {
|
||||
ih.marker.SetInhibited(lset.Fingerprint(), true)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Run the Inihibitor's background processing.
|
||||
func (ih *Inhibitor) Run() {
|
||||
ih.mtx.Lock()
|
||||
ih.stopc = make(chan struct{})
|
||||
ih.mtx.Unlock()
|
||||
|
||||
go ih.runGC()
|
||||
|
||||
it := ih.alerts.Subscribe()
|
||||
defer it.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ih.stopc:
|
||||
return
|
||||
case a := <-it.Next():
|
||||
if err := it.Err(); err != nil {
|
||||
log.Errorf("Error iterating alerts: %s", err)
|
||||
continue
|
||||
}
|
||||
if a.Resolved() {
|
||||
// As alerts can also time out without an update, we never
|
||||
// handle new resolved alerts but invalidate the cache on read.
|
||||
continue
|
||||
}
|
||||
// Populate the inhibition rules' cache.
|
||||
for _, r := range ih.rules {
|
||||
if r.SourceMatchers.Match(a.Labels) {
|
||||
r.set(a)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := alerts.Err(); err != nil {
|
||||
log.Errorf("Error after iterating alerts: %s", err)
|
||||
}
|
||||
|
||||
// Stop the Inhibitor's background processing.
|
||||
func (ih *Inhibitor) Stop() {
|
||||
if ih == nil {
|
||||
return
|
||||
}
|
||||
ih.mtx.Lock()
|
||||
defer ih.mtx.Unlock()
|
||||
|
||||
ih.marker.SetInhibited(lset.Fingerprint(), false)
|
||||
if ih.stopc != nil {
|
||||
close(ih.stopc)
|
||||
ih.stopc = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Mutes returns true iff the given label set is muted.
|
||||
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
|
||||
fp := lset.Fingerprint()
|
||||
|
||||
for _, r := range ih.rules {
|
||||
if r.TargetMatchers.Match(lset) && r.hasEqual(lset) {
|
||||
ih.marker.SetInhibited(fp, true)
|
||||
return true
|
||||
}
|
||||
}
|
||||
ih.marker.SetInhibited(fp, false)
|
||||
return false
|
||||
|
||||
}
|
||||
|
||||
// An InhibitRule specifies that a class of (source) alerts should inhibit
|
||||
|
@ -93,6 +141,10 @@ type InhibitRule struct {
|
|||
// A set of label names whose label values need to be identical in source and
|
||||
// target alerts in order for the inhibition to take effect.
|
||||
Equal map[model.LabelName]struct{}
|
||||
|
||||
mtx sync.RWMutex
|
||||
// Cache of alerts matching source labels.
|
||||
scache map[model.Fingerprint]*types.Alert
|
||||
}
|
||||
|
||||
// NewInhibitRule returns a new InihibtRule based on a configuration definition.
|
||||
|
@ -125,20 +177,48 @@ func NewInhibitRule(cr *config.InhibitRule) *InhibitRule {
|
|||
SourceMatchers: sourcem,
|
||||
TargetMatchers: targetm,
|
||||
Equal: equal,
|
||||
scache: map[model.Fingerprint]*types.Alert{},
|
||||
}
|
||||
}
|
||||
|
||||
// Mutes returns true iff the Inhibition rule applies for the given
|
||||
// source and target label set.
|
||||
func (r *InhibitRule) Mutes(source, target model.LabelSet) bool {
|
||||
if !r.TargetMatchers.Match(target) || !r.SourceMatchers.Match(source) {
|
||||
return false
|
||||
// set the alert in the source cache.
|
||||
func (r *InhibitRule) set(a *types.Alert) {
|
||||
r.mtx.Lock()
|
||||
r.mtx.Unlock()
|
||||
|
||||
r.scache[a.Fingerprint()] = a
|
||||
}
|
||||
|
||||
// hasEqual checks whether the source cache contains alerts matching
|
||||
// the equal labels for the given label set.
|
||||
func (r *InhibitRule) hasEqual(lset model.LabelSet) bool {
|
||||
r.mtx.RLock()
|
||||
defer r.mtx.RUnlock()
|
||||
|
||||
Outer:
|
||||
for _, a := range r.scache {
|
||||
// The cache might be stale and contain resolved alerts.
|
||||
if a.Resolved() {
|
||||
continue
|
||||
}
|
||||
for n := range r.Equal {
|
||||
if a.Labels[n] != lset[n] {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
for ln := range r.Equal {
|
||||
if source[ln] != target[ln] {
|
||||
return false
|
||||
return false
|
||||
}
|
||||
|
||||
// gc clears out resolved alerts from the source cache.
|
||||
func (r *InhibitRule) gc() {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
for fp, a := range r.scache {
|
||||
if a.Resolved() {
|
||||
delete(r.scache, fp)
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -0,0 +1,168 @@
|
|||
// Copyright 2016 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kylelemons/godebug/pretty"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
func TestInhibitRuleHasEqual(t *testing.T) {
|
||||
now := time.Now()
|
||||
cases := []struct {
|
||||
initial map[model.Fingerprint]*types.Alert
|
||||
equal model.LabelNames
|
||||
input model.LabelSet
|
||||
result bool
|
||||
}{
|
||||
{
|
||||
// No source alerts at all.
|
||||
initial: map[model.Fingerprint]*types.Alert{},
|
||||
input: model.LabelSet{"a": "b"},
|
||||
result: false,
|
||||
},
|
||||
{
|
||||
// No equal labels, any source alerts satisfies the requirement.
|
||||
initial: map[model.Fingerprint]*types.Alert{1: &types.Alert{}},
|
||||
input: model.LabelSet{"a": "b"},
|
||||
result: true,
|
||||
},
|
||||
{
|
||||
// Matching but already resolved.
|
||||
initial: map[model.Fingerprint]*types.Alert{
|
||||
1: &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"a": "b", "b": "f"},
|
||||
StartsAt: now.Add(-time.Minute),
|
||||
EndsAt: now.Add(-time.Second),
|
||||
},
|
||||
},
|
||||
2: &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"a": "b", "b": "c"},
|
||||
StartsAt: now.Add(-time.Minute),
|
||||
EndsAt: now.Add(-time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
equal: model.LabelNames{"a", "b"},
|
||||
input: model.LabelSet{"a": "b", "b": "c"},
|
||||
result: false,
|
||||
},
|
||||
{
|
||||
// Matching but already resolved.
|
||||
initial: map[model.Fingerprint]*types.Alert{
|
||||
1: &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"a": "b", "c": "d"},
|
||||
StartsAt: now.Add(-time.Minute),
|
||||
EndsAt: now.Add(-time.Second),
|
||||
},
|
||||
},
|
||||
2: &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"a": "b", "c": "f"},
|
||||
StartsAt: now.Add(-time.Minute),
|
||||
EndsAt: now.Add(-time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
equal: model.LabelNames{"a"},
|
||||
input: model.LabelSet{"a": "b"},
|
||||
result: false,
|
||||
},
|
||||
{
|
||||
// Equal label does not match.
|
||||
initial: map[model.Fingerprint]*types.Alert{
|
||||
1: &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"a": "c", "c": "d"},
|
||||
StartsAt: now.Add(-time.Minute),
|
||||
EndsAt: now.Add(-time.Second),
|
||||
},
|
||||
},
|
||||
2: &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"a": "c", "c": "f"},
|
||||
StartsAt: now.Add(-time.Minute),
|
||||
EndsAt: now.Add(-time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
equal: model.LabelNames{"a"},
|
||||
input: model.LabelSet{"a": "b"},
|
||||
result: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
r := &InhibitRule{
|
||||
Equal: map[model.LabelName]struct{}{},
|
||||
scache: map[model.Fingerprint]*types.Alert{},
|
||||
}
|
||||
for _, ln := range c.equal {
|
||||
r.Equal[ln] = struct{}{}
|
||||
}
|
||||
for k, v := range c.initial {
|
||||
r.scache[k] = v
|
||||
}
|
||||
|
||||
if have := r.hasEqual(c.input); have != c.result {
|
||||
t.Errorf("Unexpected result %q, expected %q", have, c.result)
|
||||
}
|
||||
if !reflect.DeepEqual(r.scache, c.initial) {
|
||||
t.Errorf("Cache state unexpectedly changed")
|
||||
t.Errorf(pretty.Compare(r.scache, c.initial))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestInhibitRuleGC(t *testing.T) {
|
||||
// TODO(fabxc): add now() injection function to Resolved() to remove
|
||||
// dependency on machine time in this test.
|
||||
now := time.Now()
|
||||
newAlert := func(start, end time.Duration) *types.Alert {
|
||||
return &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"a": "b"},
|
||||
StartsAt: now.Add(start * time.Minute),
|
||||
EndsAt: now.Add(end * time.Minute),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
before := map[model.Fingerprint]*types.Alert{
|
||||
0: newAlert(-10, -5),
|
||||
1: newAlert(10, 20),
|
||||
2: newAlert(-10, 10),
|
||||
3: newAlert(-10, -1),
|
||||
}
|
||||
after := map[model.Fingerprint]*types.Alert{
|
||||
1: newAlert(10, 20),
|
||||
2: newAlert(-10, 10),
|
||||
}
|
||||
|
||||
r := &InhibitRule{scache: before}
|
||||
r.gc()
|
||||
|
||||
if !reflect.DeepEqual(r.scache, after) {
|
||||
t.Errorf("Unexpected cache state after GC")
|
||||
t.Errorf(pretty.Compare(r.scache, after))
|
||||
}
|
||||
}
|
2
main.go
2
main.go
|
@ -171,12 +171,14 @@ func main() {
|
|||
}
|
||||
tmpl.ExternalURL = amURL
|
||||
|
||||
inhibitor.Stop()
|
||||
disp.Stop()
|
||||
|
||||
inhibitor = NewInhibitor(alerts, conf.InhibitRules, marker)
|
||||
disp = NewDispatcher(alerts, NewRoute(conf.Route, nil), build(conf.Receivers), marker)
|
||||
|
||||
go disp.Run()
|
||||
go inhibitor.Run()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue