Create a `Muter` implementation for silences

This encapsulates the logic of querying and marking silenced
alerts. It removes the code duplication flagged earlier.

I removed the error returned by the setAlertStatus function as we were
only logging it, and that's already done anyway when the error is
received from the `silence.Query` call (now in the `Mutes` method).

Signed-off-by: beorn7 <beorn@soundcloud.com>
This commit is contained in:
beorn7 2019-02-26 11:59:14 +01:00
parent 891f368c51
commit f3d9c89bbc
6 changed files with 115 additions and 143 deletions

View File

@ -186,7 +186,7 @@ func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
// Update config and resolve timeout of each API. APIv2 also needs
// setAlertStatus to be updated.
func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet) error) {
func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet)) {
api.v1.Update(cfg)
api.v2.Update(cfg, setAlertStatus)
}

View File

@ -70,7 +70,7 @@ type API struct {
}
type getAlertStatusFn func(prometheus_model.Fingerprint) types.AlertStatus
type setAlertStatusFn func(prometheus_model.LabelSet) error
type setAlertStatusFn func(prometheus_model.LabelSet)
// NewAPI returns a new Alertmanager API v2
func NewAPI(
@ -260,9 +260,8 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
}
// Set alert's current status based on its label set.
if err := api.setAlertStatus(a.Labels); err != nil {
level.Error(api.logger).Log("msg", "set alert status failed", "err", err)
}
api.setAlertStatus(a.Labels)
// Get alert's current status after seeing if it is suppressed.
status := api.getAlertStatus(a.Fingerprint())

View File

@ -281,6 +281,7 @@ func run() int {
var (
inhibitor *inhibit.Inhibitor
silencer *silence.Silencer
tmpl *template.Template
pipeline notify.Stage
disp *dispatch.Dispatcher
@ -304,19 +305,22 @@ func run() int {
disp.Stop()
inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger)
silencer = silence.NewSilencer(silences, marker, logger)
pipeline = notify.BuildPipeline(
conf.Receivers,
tmpl,
waitFunc,
inhibitor,
silences,
silencer,
notificationLog,
marker,
peer,
logger,
)
api.Update(conf, setAlertStatus(inhibitor, marker, silences))
api.Update(conf, func(labels model.LabelSet) {
inhibitor.Mutes(labels)
silencer.Mutes(labels)
})
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc, logger)
@ -435,28 +439,3 @@ func extURL(listen, external string) (*url.URL, error) {
return u, nil
}
func setAlertStatus(inhibitor *inhibit.Inhibitor, marker types.Marker, silences *silence.Silences) func(model.LabelSet) error {
return func(labels model.LabelSet) error {
inhibitor.Mutes(labels)
// TODO(beorn7): The following code is almost exactly replicated in notify/notify.go.
sils, err := silences.Query(
silence.QState(types.SilenceStateActive),
silence.QMatches(labels),
)
if err != nil {
return fmt.Errorf("failed to query silences: %v", err)
}
if len(sils) > 0 {
ids := make([]string, len(sils))
for i, s := range sils {
ids[i] = s.Id
}
marker.SetSilenced(labels.Fingerprint(), ids...)
} else {
marker.SetSilenced(labels.Fingerprint())
}
return nil
}
}

View File

@ -29,6 +29,7 @@ import (
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/silence"
@ -236,18 +237,17 @@ func BuildPipeline(
confs []*config.Receiver,
tmpl *template.Template,
wait func() time.Duration,
muter types.Muter,
silences *silence.Silences,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
notificationLog NotificationLog,
marker types.Marker,
peer *cluster.Peer,
logger log.Logger,
) RoutingStage {
rs := RoutingStage{}
ms := NewGossipSettleStage(peer)
is := NewInhibitStage(muter)
ss := NewSilenceStage(silences, marker)
is := NewMuteStage(inhibitor)
ss := NewMuteStage(silencer)
for _, rc := range confs {
rs[rc.Name] = MultiStage{ms, is, ss, createStage(rc, tmpl, wait, notificationLog, logger)}
@ -359,72 +359,27 @@ func (n *GossipSettleStage) Exec(ctx context.Context, l log.Logger, alerts ...*t
return ctx, alerts, nil
}
// InhibitStage filters alerts through an inhibition muter.
type InhibitStage struct {
// MuteStage filters alerts through a Muter.
type MuteStage struct {
muter types.Muter
}
// NewInhibitStage return a new InhibitStage.
func NewInhibitStage(m types.Muter) *InhibitStage {
return &InhibitStage{muter: m}
// NewMuteStage return a new MuteStage.
func NewMuteStage(m types.Muter) *MuteStage {
return &MuteStage{muter: m}
}
// Exec implements the Stage interface.
func (n *InhibitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (n *MuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
// Do not send the alert if muted.
if !n.muter.Mutes(a.Labels) {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
}
// TODO(fabxc): increment muted alerts counter if muted.
}
return ctx, filtered, nil
}
// SilenceStage filters alerts through a silence muter.
type SilenceStage struct {
silences *silence.Silences
marker types.Marker
}
// NewSilenceStage returns a new SilenceStage.
func NewSilenceStage(s *silence.Silences, mk types.Marker) *SilenceStage {
return &SilenceStage{
silences: s,
marker: mk,
}
}
// Exec implements the Stage interface.
func (n *SilenceStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
sils, err := n.silences.Query(
silence.QState(types.SilenceStateActive),
silence.QMatches(a.Labels),
)
if err != nil {
level.Error(l).Log("msg", "Querying silences failed", "err", err)
}
if len(sils) == 0 {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
n.marker.SetSilenced(a.Labels.Fingerprint())
} else {
ids := make([]string, len(sils))
for i, s := range sils {
ids[i] = s.Id
}
n.marker.SetSilenced(a.Labels.Fingerprint(), ids...)
}
}
return ctx, filtered, nil
}

View File

@ -575,7 +575,55 @@ func TestSetNotifiesStage(t *testing.T) {
require.NotNil(t, resctx)
}
func TestSilenceStage(t *testing.T) {
func TestMuteStage(t *testing.T) {
// Mute all label sets that have a "mute" key.
muter := types.MuteFunc(func(lset model.LabelSet) bool {
_, ok := lset["mute"]
return ok
})
stage := NewMuteStage(muter)
in := []model.LabelSet{
{},
{"test": "set"},
{"mute": "me"},
{"foo": "bar", "test": "set"},
{"foo": "bar", "mute": "me"},
{},
{"not": "muted"},
}
out := []model.LabelSet{
{},
{"test": "set"},
{"foo": "bar", "test": "set"},
{},
{"not": "muted"},
}
var inAlerts []*types.Alert
for _, lset := range in {
inAlerts = append(inAlerts, &types.Alert{
Alert: model.Alert{Labels: lset},
})
}
_, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
var got []model.LabelSet
for _, a := range alerts {
got = append(got, a.Labels)
}
if !reflect.DeepEqual(got, out) {
t.Fatalf("Muting failed, expected: %v\ngot %v", out, got)
}
}
func TestMuteStageWithSilences(t *testing.T) {
silences, err := silence.New(silence.Options{})
if err != nil {
t.Fatal(err)
@ -588,7 +636,8 @@ func TestSilenceStage(t *testing.T) {
}
marker := types.NewMarker(prometheus.NewRegistry())
silencer := NewSilenceStage(silences, marker)
silencer := silence.NewSilencer(silences, marker, log.NewNopLogger())
stage := NewMuteStage(silencer)
in := []model.LabelSet{
{},
@ -618,55 +667,7 @@ func TestSilenceStage(t *testing.T) {
// the WasSilenced flag set to true afterwards.
marker.SetSilenced(inAlerts[1].Fingerprint(), "123")
_, alerts, err := silencer.Exec(context.Background(), log.NewNopLogger(), inAlerts...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
var got []model.LabelSet
for _, a := range alerts {
got = append(got, a.Labels)
}
if !reflect.DeepEqual(got, out) {
t.Fatalf("Muting failed, expected: %v\ngot %v", out, got)
}
}
func TestInhibitStage(t *testing.T) {
// Mute all label sets that have a "mute" key.
muter := types.MuteFunc(func(lset model.LabelSet) bool {
_, ok := lset["mute"]
return ok
})
inhibitor := NewInhibitStage(muter)
in := []model.LabelSet{
{},
{"test": "set"},
{"mute": "me"},
{"foo": "bar", "test": "set"},
{"foo": "bar", "mute": "me"},
{},
{"not": "muted"},
}
out := []model.LabelSet{
{},
{"test": "set"},
{"foo": "bar", "test": "set"},
{},
{"not": "muted"},
}
var inAlerts []*types.Alert
for _, lset := range in {
inAlerts = append(inAlerts, &types.Alert{
Alert: model.Alert{Labels: lset},
})
}
_, alerts, err := inhibitor.Exec(context.Background(), log.NewNopLogger(), inAlerts...)
_, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}

View File

@ -92,6 +92,44 @@ func (c matcherCache) add(s *pb.Silence) (types.Matchers, error) {
return ms, nil
}
// Silencer binds together a Marker and a Silences to implement the Muter
// interface.
type Silencer struct {
silences *Silences
marker types.Marker
logger log.Logger
}
// NewSilencer returns a new Silencer.
func NewSilencer(s *Silences, m types.Marker, l log.Logger) *Silencer {
return &Silencer{
silences: s,
marker: m,
logger: l,
}
}
// Mutes implements the Muter interface.
func (s *Silencer) Mutes(lset model.LabelSet) bool {
sils, err := s.silences.Query(
QState(types.SilenceStateActive),
QMatches(lset),
)
if err != nil {
level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err)
}
if len(sils) == 0 {
s.marker.SetSilenced(lset.Fingerprint())
return false
}
ids := make([]string, len(sils))
for i, s := range sils {
ids[i] = s.Id
}
s.marker.SetSilenced(lset.Fingerprint(), ids...)
return true
}
// Silences holds a silence state that can be modified, queried, and snapshot.
type Silences struct {
logger log.Logger
@ -727,7 +765,7 @@ func (s *Silences) Merge(b []byte) error {
// all nodes already.
s.broadcast(b)
s.metrics.propagatedMessagesTotal.Inc()
level.Debug(s.logger).Log("msg", "gossiping new silence", "silence", e)
level.Debug(s.logger).Log("msg", "Gossiping new silence", "silence", e)
}
}
return nil