diff --git a/api/api.go b/api/api.go index 9c8346e5..449b51c8 100644 --- a/api/api.go +++ b/api/api.go @@ -186,7 +186,7 @@ func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux { // Update config and resolve timeout of each API. APIv2 also needs // setAlertStatus to be updated. -func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet) error) { +func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet)) { api.v1.Update(cfg) api.v2.Update(cfg, setAlertStatus) } diff --git a/api/v2/api.go b/api/v2/api.go index 376f9c69..28349d1f 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -70,7 +70,7 @@ type API struct { } type getAlertStatusFn func(prometheus_model.Fingerprint) types.AlertStatus -type setAlertStatusFn func(prometheus_model.LabelSet) error +type setAlertStatusFn func(prometheus_model.LabelSet) // NewAPI returns a new Alertmanager API v2 func NewAPI( @@ -260,9 +260,8 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re } // Set alert's current status based on its label set. - if err := api.setAlertStatus(a.Labels); err != nil { - level.Error(api.logger).Log("msg", "set alert status failed", "err", err) - } + api.setAlertStatus(a.Labels) + // Get alert's current status after seeing if it is suppressed. status := api.getAlertStatus(a.Fingerprint()) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 4d70424e..7adee6e4 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -281,6 +281,7 @@ func run() int { var ( inhibitor *inhibit.Inhibitor + silencer *silence.Silencer tmpl *template.Template pipeline notify.Stage disp *dispatch.Dispatcher @@ -304,19 +305,22 @@ func run() int { disp.Stop() inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger) + silencer = silence.NewSilencer(silences, marker, logger) pipeline = notify.BuildPipeline( conf.Receivers, tmpl, waitFunc, inhibitor, - silences, + silencer, notificationLog, - marker, peer, logger, ) - api.Update(conf, setAlertStatus(inhibitor, marker, silences)) + api.Update(conf, func(labels model.LabelSet) { + inhibitor.Mutes(labels) + silencer.Mutes(labels) + }) disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc, logger) @@ -435,28 +439,3 @@ func extURL(listen, external string) (*url.URL, error) { return u, nil } - -func setAlertStatus(inhibitor *inhibit.Inhibitor, marker types.Marker, silences *silence.Silences) func(model.LabelSet) error { - return func(labels model.LabelSet) error { - inhibitor.Mutes(labels) - // TODO(beorn7): The following code is almost exactly replicated in notify/notify.go. - sils, err := silences.Query( - silence.QState(types.SilenceStateActive), - silence.QMatches(labels), - ) - if err != nil { - return fmt.Errorf("failed to query silences: %v", err) - } - - if len(sils) > 0 { - ids := make([]string, len(sils)) - for i, s := range sils { - ids[i] = s.Id - } - marker.SetSilenced(labels.Fingerprint(), ids...) - } else { - marker.SetSilenced(labels.Fingerprint()) - } - return nil - } -} diff --git a/notify/notify.go b/notify/notify.go index 38277ebd..cfd81469 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/config" + "github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/nflog" "github.com/prometheus/alertmanager/nflog/nflogpb" "github.com/prometheus/alertmanager/silence" @@ -236,18 +237,17 @@ func BuildPipeline( confs []*config.Receiver, tmpl *template.Template, wait func() time.Duration, - muter types.Muter, - silences *silence.Silences, + inhibitor *inhibit.Inhibitor, + silencer *silence.Silencer, notificationLog NotificationLog, - marker types.Marker, peer *cluster.Peer, logger log.Logger, ) RoutingStage { rs := RoutingStage{} ms := NewGossipSettleStage(peer) - is := NewInhibitStage(muter) - ss := NewSilenceStage(silences, marker) + is := NewMuteStage(inhibitor) + ss := NewMuteStage(silencer) for _, rc := range confs { rs[rc.Name] = MultiStage{ms, is, ss, createStage(rc, tmpl, wait, notificationLog, logger)} @@ -359,72 +359,27 @@ func (n *GossipSettleStage) Exec(ctx context.Context, l log.Logger, alerts ...*t return ctx, alerts, nil } -// InhibitStage filters alerts through an inhibition muter. -type InhibitStage struct { +// MuteStage filters alerts through a Muter. +type MuteStage struct { muter types.Muter } -// NewInhibitStage return a new InhibitStage. -func NewInhibitStage(m types.Muter) *InhibitStage { - return &InhibitStage{muter: m} +// NewMuteStage return a new MuteStage. +func NewMuteStage(m types.Muter) *MuteStage { + return &MuteStage{muter: m} } // Exec implements the Stage interface. -func (n *InhibitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { +func (n *MuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var filtered []*types.Alert for _, a := range alerts { // TODO(fabxc): increment total alerts counter. - // Do not send the alert if the silencer mutes it. + // Do not send the alert if muted. if !n.muter.Mutes(a.Labels) { - // TODO(fabxc): increment muted alerts counter. filtered = append(filtered, a) } + // TODO(fabxc): increment muted alerts counter if muted. } - - return ctx, filtered, nil -} - -// SilenceStage filters alerts through a silence muter. -type SilenceStage struct { - silences *silence.Silences - marker types.Marker -} - -// NewSilenceStage returns a new SilenceStage. -func NewSilenceStage(s *silence.Silences, mk types.Marker) *SilenceStage { - return &SilenceStage{ - silences: s, - marker: mk, - } -} - -// Exec implements the Stage interface. -func (n *SilenceStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { - var filtered []*types.Alert - for _, a := range alerts { - // TODO(fabxc): increment total alerts counter. - // Do not send the alert if the silencer mutes it. - sils, err := n.silences.Query( - silence.QState(types.SilenceStateActive), - silence.QMatches(a.Labels), - ) - if err != nil { - level.Error(l).Log("msg", "Querying silences failed", "err", err) - } - - if len(sils) == 0 { - // TODO(fabxc): increment muted alerts counter. - filtered = append(filtered, a) - n.marker.SetSilenced(a.Labels.Fingerprint()) - } else { - ids := make([]string, len(sils)) - for i, s := range sils { - ids[i] = s.Id - } - n.marker.SetSilenced(a.Labels.Fingerprint(), ids...) - } - } - return ctx, filtered, nil } diff --git a/notify/notify_test.go b/notify/notify_test.go index d15e7cdf..b3e994f2 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -575,7 +575,55 @@ func TestSetNotifiesStage(t *testing.T) { require.NotNil(t, resctx) } -func TestSilenceStage(t *testing.T) { +func TestMuteStage(t *testing.T) { + // Mute all label sets that have a "mute" key. + muter := types.MuteFunc(func(lset model.LabelSet) bool { + _, ok := lset["mute"] + return ok + }) + + stage := NewMuteStage(muter) + + in := []model.LabelSet{ + {}, + {"test": "set"}, + {"mute": "me"}, + {"foo": "bar", "test": "set"}, + {"foo": "bar", "mute": "me"}, + {}, + {"not": "muted"}, + } + out := []model.LabelSet{ + {}, + {"test": "set"}, + {"foo": "bar", "test": "set"}, + {}, + {"not": "muted"}, + } + + var inAlerts []*types.Alert + for _, lset := range in { + inAlerts = append(inAlerts, &types.Alert{ + Alert: model.Alert{Labels: lset}, + }) + } + + _, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + + var got []model.LabelSet + for _, a := range alerts { + got = append(got, a.Labels) + } + + if !reflect.DeepEqual(got, out) { + t.Fatalf("Muting failed, expected: %v\ngot %v", out, got) + } +} + +func TestMuteStageWithSilences(t *testing.T) { silences, err := silence.New(silence.Options{}) if err != nil { t.Fatal(err) @@ -588,7 +636,8 @@ func TestSilenceStage(t *testing.T) { } marker := types.NewMarker(prometheus.NewRegistry()) - silencer := NewSilenceStage(silences, marker) + silencer := silence.NewSilencer(silences, marker, log.NewNopLogger()) + stage := NewMuteStage(silencer) in := []model.LabelSet{ {}, @@ -618,55 +667,7 @@ func TestSilenceStage(t *testing.T) { // the WasSilenced flag set to true afterwards. marker.SetSilenced(inAlerts[1].Fingerprint(), "123") - _, alerts, err := silencer.Exec(context.Background(), log.NewNopLogger(), inAlerts...) - if err != nil { - t.Fatalf("Exec failed: %s", err) - } - - var got []model.LabelSet - for _, a := range alerts { - got = append(got, a.Labels) - } - - if !reflect.DeepEqual(got, out) { - t.Fatalf("Muting failed, expected: %v\ngot %v", out, got) - } -} - -func TestInhibitStage(t *testing.T) { - // Mute all label sets that have a "mute" key. - muter := types.MuteFunc(func(lset model.LabelSet) bool { - _, ok := lset["mute"] - return ok - }) - - inhibitor := NewInhibitStage(muter) - - in := []model.LabelSet{ - {}, - {"test": "set"}, - {"mute": "me"}, - {"foo": "bar", "test": "set"}, - {"foo": "bar", "mute": "me"}, - {}, - {"not": "muted"}, - } - out := []model.LabelSet{ - {}, - {"test": "set"}, - {"foo": "bar", "test": "set"}, - {}, - {"not": "muted"}, - } - - var inAlerts []*types.Alert - for _, lset := range in { - inAlerts = append(inAlerts, &types.Alert{ - Alert: model.Alert{Labels: lset}, - }) - } - - _, alerts, err := inhibitor.Exec(context.Background(), log.NewNopLogger(), inAlerts...) + _, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) if err != nil { t.Fatalf("Exec failed: %s", err) } diff --git a/silence/silence.go b/silence/silence.go index 4b448170..3687c17a 100644 --- a/silence/silence.go +++ b/silence/silence.go @@ -92,6 +92,44 @@ func (c matcherCache) add(s *pb.Silence) (types.Matchers, error) { return ms, nil } +// Silencer binds together a Marker and a Silences to implement the Muter +// interface. +type Silencer struct { + silences *Silences + marker types.Marker + logger log.Logger +} + +// NewSilencer returns a new Silencer. +func NewSilencer(s *Silences, m types.Marker, l log.Logger) *Silencer { + return &Silencer{ + silences: s, + marker: m, + logger: l, + } +} + +// Mutes implements the Muter interface. +func (s *Silencer) Mutes(lset model.LabelSet) bool { + sils, err := s.silences.Query( + QState(types.SilenceStateActive), + QMatches(lset), + ) + if err != nil { + level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err) + } + if len(sils) == 0 { + s.marker.SetSilenced(lset.Fingerprint()) + return false + } + ids := make([]string, len(sils)) + for i, s := range sils { + ids[i] = s.Id + } + s.marker.SetSilenced(lset.Fingerprint(), ids...) + return true +} + // Silences holds a silence state that can be modified, queried, and snapshot. type Silences struct { logger log.Logger @@ -727,7 +765,7 @@ func (s *Silences) Merge(b []byte) error { // all nodes already. s.broadcast(b) s.metrics.propagatedMessagesTotal.Inc() - level.Debug(s.logger).Log("msg", "gossiping new silence", "silence", e) + level.Debug(s.logger).Log("msg", "Gossiping new silence", "silence", e) } } return nil