reimplement inhibitor datastructures to improve performance
Signed-off-by: Ethan Hunter <ehunter@hudson-trading.com>
This commit is contained in:
parent
82b89dc769
commit
fdef56676f
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/pkg/labels"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/store"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
|
@ -72,9 +71,7 @@ func (ih *Inhibitor) run(ctx context.Context) {
|
|||
// Update the inhibition rules' cache.
|
||||
for _, r := range ih.rules {
|
||||
if r.SourceMatchers.Matches(a.Labels) {
|
||||
if err := r.scache.Set(a); err != nil {
|
||||
ih.logger.Error("error on set alert", "err", err)
|
||||
}
|
||||
r.set(a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -93,8 +90,27 @@ func (ih *Inhibitor) Run() {
|
|||
ih.mtx.Unlock()
|
||||
runCtx, runCancel := context.WithCancel(ctx)
|
||||
|
||||
for _, rule := range ih.rules {
|
||||
go rule.scache.Run(runCtx, 15*time.Minute)
|
||||
for _, r := range ih.rules {
|
||||
go func(r *InhibitRule) {
|
||||
ticker := time.NewTicker(15 * time.Minute)
|
||||
select {
|
||||
case <-runCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
r.mtx.Lock()
|
||||
for icacheKey, cacheEntry := range r.icache {
|
||||
for fp, cachedAlert := range cacheEntry {
|
||||
if cachedAlert.alert.Resolved() {
|
||||
delete(cacheEntry, fp)
|
||||
}
|
||||
}
|
||||
if len(cacheEntry) == 0 {
|
||||
delete(r.icache, icacheKey)
|
||||
}
|
||||
}
|
||||
r.mtx.Unlock()
|
||||
}
|
||||
}(r)
|
||||
}
|
||||
|
||||
g.Add(func() error {
|
||||
|
@ -126,24 +142,40 @@ func (ih *Inhibitor) Stop() {
|
|||
// interface.
|
||||
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
|
||||
fp := lset.Fingerprint()
|
||||
now := time.Now()
|
||||
|
||||
for _, r := range ih.rules {
|
||||
if !r.TargetMatchers.Matches(lset) {
|
||||
// If target side of rule doesn't match, we don't need to look any further.
|
||||
continue
|
||||
}
|
||||
// If we are here, the target side matches. If the source side matches, too, we
|
||||
// need to exclude inhibiting alerts for which the same is true.
|
||||
if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq {
|
||||
ih.marker.SetInhibited(fp, inhibitedByFP.String())
|
||||
// we know that the target side matches, but we don't know if this alert
|
||||
// is actually inhibited yet - let the InhibitRule figure that out.
|
||||
if inhibiting, matches := r.findInhibitor(lset, now); matches {
|
||||
ih.marker.SetInhibited(fp, inhibiting.String())
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
ih.marker.SetInhibited(fp)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
type cachedAlert struct {
|
||||
alert *types.Alert
|
||||
matchesSourceAndTarget bool
|
||||
}
|
||||
|
||||
func newCachedAlert(a *types.Alert, targetMatchers labels.Matchers) cachedAlert {
|
||||
return cachedAlert{
|
||||
alert: a,
|
||||
matchesSourceAndTarget: targetMatchers.Matches(a.Labels),
|
||||
}
|
||||
}
|
||||
|
||||
type iCacheEntry map[model.Fingerprint]cachedAlert
|
||||
|
||||
// An InhibitRule specifies that a class of (source) alerts should inhibit
|
||||
// notifications for another class of (target) alerts if all specified matching
|
||||
// labels are equal between the two alerts. This may be used to inhibit alerts
|
||||
|
@ -161,7 +193,9 @@ type InhibitRule struct {
|
|||
Equal map[model.LabelName]struct{}
|
||||
|
||||
// Cache of alerts matching source labels.
|
||||
scache *store.Alerts
|
||||
icache map[model.Fingerprint]iCacheEntry
|
||||
|
||||
mtx *sync.RWMutex
|
||||
}
|
||||
|
||||
// NewInhibitRule returns a new InhibitRule based on a configuration definition.
|
||||
|
@ -221,30 +255,65 @@ func NewInhibitRule(cr config.InhibitRule) *InhibitRule {
|
|||
SourceMatchers: sourcem,
|
||||
TargetMatchers: targetm,
|
||||
Equal: equal,
|
||||
scache: store.NewAlerts(),
|
||||
icache: make(map[model.Fingerprint]iCacheEntry),
|
||||
mtx: &sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// hasEqual checks whether the source cache contains alerts matching the equal
|
||||
// labels for the given label set. If so, the fingerprint of one of those alerts
|
||||
// is returned. If excludeTwoSidedMatch is true, alerts that match both the
|
||||
// source and the target side of the rule are disregarded.
|
||||
func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) {
|
||||
Outer:
|
||||
for _, a := range r.scache.List() {
|
||||
// The cache might be stale and contain resolved alerts.
|
||||
if a.Resolved() {
|
||||
continue
|
||||
}
|
||||
for n := range r.Equal {
|
||||
if a.Labels[n] != lset[n] {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
if excludeTwoSidedMatch && r.TargetMatchers.Matches(a.Labels) {
|
||||
continue Outer
|
||||
}
|
||||
return a.Fingerprint(), true
|
||||
func (r *InhibitRule) set(a *types.Alert) {
|
||||
// these two operations are by far the most expensive part of the method
|
||||
// since they don't require hilding the mutex, call them here as a tiny
|
||||
// optimization
|
||||
icacheKey := r.icacheKey(a.Labels)
|
||||
fp := a.Fingerprint()
|
||||
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
cacheEntry, ok := r.icache[icacheKey]
|
||||
if !ok {
|
||||
cacheEntry = make(iCacheEntry)
|
||||
r.icache[icacheKey] = cacheEntry
|
||||
}
|
||||
|
||||
cacheEntry[fp] = newCachedAlert(a, r.TargetMatchers)
|
||||
}
|
||||
|
||||
func (r *InhibitRule) icacheKey(lset model.LabelSet) model.Fingerprint {
|
||||
equalLabels := model.LabelSet{}
|
||||
for label := range r.Equal {
|
||||
equalLabels[label] = lset[label]
|
||||
}
|
||||
return equalLabels.Fingerprint()
|
||||
}
|
||||
|
||||
// findInhibitor determines if any alert inhibits an lset that matches the target
|
||||
// matchers. The fingerprint of the first matching result is returned.
|
||||
func (r *InhibitRule) findInhibitor(lset model.LabelSet, now time.Time) (model.Fingerprint, bool) {
|
||||
r.mtx.RLock()
|
||||
defer r.mtx.RUnlock()
|
||||
|
||||
var sourceMatchersEvaluated, lsetMatchesSource bool
|
||||
if cacheEntry, ok := r.icache[r.icacheKey(lset)]; ok {
|
||||
for fp, cachedAlert := range cacheEntry {
|
||||
if cachedAlert.alert.ResolvedAt(now) {
|
||||
continue
|
||||
}
|
||||
|
||||
if cachedAlert.matchesSourceAndTarget {
|
||||
if !sourceMatchersEvaluated {
|
||||
lsetMatchesSource = r.SourceMatchers.Matches(lset)
|
||||
sourceMatchersEvaluated = true
|
||||
}
|
||||
if lsetMatchesSource {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return fp, true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return model.Fingerprint(0), false
|
||||
}
|
||||
|
|
|
@ -14,17 +14,18 @@
|
|||
package inhibit
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/promslog"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/pkg/labels"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/store"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
|
@ -122,18 +123,27 @@ func TestInhibitRuleHasEqual(t *testing.T) {
|
|||
|
||||
for _, c := range cases {
|
||||
r := &InhibitRule{
|
||||
Equal: map[model.LabelName]struct{}{},
|
||||
scache: store.NewAlerts(),
|
||||
Equal: map[model.LabelName]struct{}{},
|
||||
mtx: &sync.RWMutex{},
|
||||
TargetMatchers: make(labels.Matchers, 0),
|
||||
SourceMatchers: make(labels.Matchers, 0),
|
||||
icache: make(map[model.Fingerprint]iCacheEntry),
|
||||
}
|
||||
for _, ln := range c.equal {
|
||||
r.Equal[ln] = struct{}{}
|
||||
}
|
||||
for _, v := range c.initial {
|
||||
r.scache.Set(v)
|
||||
r.set(v)
|
||||
}
|
||||
|
||||
if _, have := r.hasEqual(c.input, false); have != c.result {
|
||||
t.Errorf("Unexpected result %t, expected %t", have, c.result)
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, "notareallabel", "notarealvalue")
|
||||
require.NoError(t, err)
|
||||
r.SourceMatchers = append(r.SourceMatchers, matcher)
|
||||
|
||||
_, hasMatch := r.findInhibitor(c.input, time.Now())
|
||||
|
||||
if hasMatch != c.result {
|
||||
t.Errorf("Unexpected result %t, expected %t", hasMatch, c.result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -172,10 +182,10 @@ func TestInhibitRuleMatches(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
ih.rules[0].scache = store.NewAlerts()
|
||||
ih.rules[0].scache.Set(sourceAlert1)
|
||||
ih.rules[1].scache = store.NewAlerts()
|
||||
ih.rules[1].scache.Set(sourceAlert2)
|
||||
ih.rules[0].icache = make(map[model.Fingerprint]iCacheEntry)
|
||||
ih.rules[0].set(sourceAlert1)
|
||||
ih.rules[1].icache = make(map[model.Fingerprint]iCacheEntry)
|
||||
ih.rules[1].set(sourceAlert2)
|
||||
|
||||
cases := []struct {
|
||||
target model.LabelSet
|
||||
|
@ -268,10 +278,10 @@ func TestInhibitRuleMatchers(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
ih.rules[0].scache = store.NewAlerts()
|
||||
ih.rules[0].scache.Set(sourceAlert1)
|
||||
ih.rules[1].scache = store.NewAlerts()
|
||||
ih.rules[1].scache.Set(sourceAlert2)
|
||||
ih.rules[0].icache = make(map[model.Fingerprint]iCacheEntry)
|
||||
ih.rules[0].set(sourceAlert1)
|
||||
ih.rules[1].icache = make(map[model.Fingerprint]iCacheEntry)
|
||||
ih.rules[1].set(sourceAlert2)
|
||||
|
||||
cases := []struct {
|
||||
target model.LabelSet
|
||||
|
|
Loading…
Reference in New Issue