diff --git a/api/v1/api.go b/api/v1/api.go index f9d98c1f..b0b5b85d 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -556,7 +556,7 @@ func (api *API) setSilence(w http.ResponseWriter, r *http.Request) { func (api *API) getSilence(w http.ResponseWriter, r *http.Request) { sid := route.Param(r.Context(), "sid") - sils, err := api.silences.Query(silence.QIDs(sid)) + sils, _, err := api.silences.Query(silence.QIDs(sid)) if err != nil || len(sils) == 0 { http.Error(w, fmt.Sprint("Error getting silence: ", err), http.StatusNotFound) return @@ -587,7 +587,7 @@ func (api *API) delSilence(w http.ResponseWriter, r *http.Request) { } func (api *API) listSilences(w http.ResponseWriter, r *http.Request) { - psils, err := api.silences.Query() + psils, _, err := api.silences.Query() if err != nil { api.respondError(w, apiError{ typ: errorInternal, diff --git a/api/v2/api.go b/api/v2/api.go index 28349d1f..95174c1e 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -489,7 +489,7 @@ func (api *API) getSilencesHandler(params silence_ops.GetSilencesParams) middlew } } - psils, err := api.silences.Query() + psils, _, err := api.silences.Query() if err != nil { level.Error(api.logger).Log("msg", "failed to get silences", "err", err) return silence_ops.NewGetSilencesInternalServerError().WithPayload(err.Error()) @@ -521,7 +521,7 @@ func gettableSilenceMatchesFilterLabels(s open_api_models.GettableSilence, match } func (api *API) getSilenceHandler(params silence_ops.GetSilenceParams) middleware.Responder { - sils, err := api.silences.Query(silence.QIDs(params.SilenceID.String())) + sils, _, err := api.silences.Query(silence.QIDs(params.SilenceID.String())) if err != nil { level.Error(api.logger).Log("msg", "failed to get silence by id", "err", err) return silence_ops.NewGetSilenceInternalServerError().WithPayload(err.Error()) diff --git a/notify/notify_test.go b/notify/notify_test.go index b3e994f2..71e0501f 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -624,14 +624,15 @@ func TestMuteStage(t *testing.T) { } func TestMuteStageWithSilences(t *testing.T) { - silences, err := silence.New(silence.Options{}) + silences, err := silence.New(silence.Options{Retention: time.Hour}) if err != nil { t.Fatal(err) } - if _, err := silences.Set(&silencepb.Silence{ + silID, err := silences.Set(&silencepb.Silence{ EndsAt: utcNow().Add(time.Hour), Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}}, - }); err != nil { + }) + if err != nil { t.Fatal(err) } @@ -663,9 +664,9 @@ func TestMuteStageWithSilences(t *testing.T) { }) } - // Set the second alert as previously silenced. It is expected to have - // the WasSilenced flag set to true afterwards. - marker.SetSilenced(inAlerts[1].Fingerprint(), "123") + // Set the second alert as previously silenced with an old version + // number. This is expected to get unsilenced by the stage. + marker.SetSilenced(inAlerts[1].Fingerprint(), 0, "123") _, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) if err != nil { @@ -680,4 +681,37 @@ func TestMuteStageWithSilences(t *testing.T) { if !reflect.DeepEqual(got, out) { t.Fatalf("Muting failed, expected: %v\ngot %v", out, got) } + + // Do it again to exercise the version tracking of silences. + _, alerts, err = stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + + got = got[:0] + for _, a := range alerts { + got = append(got, a.Labels) + } + + if !reflect.DeepEqual(got, out) { + t.Fatalf("Muting failed, expected: %v\ngot %v", out, got) + } + + // Expire the silence and verify that no alerts are silenced now. + if err := silences.Expire(silID); err != nil { + t.Fatal(err) + } + + _, alerts, err = stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + got = got[:0] + for _, a := range alerts { + got = append(got, a.Labels) + } + + if !reflect.DeepEqual(got, in) { + t.Fatalf("Unmuting failed, expected: %v\ngot %v", in, got) + } } diff --git a/silence/silence.go b/silence/silence.go index 3687c17a..938e89d4 100644 --- a/silence/silence.go +++ b/silence/silence.go @@ -23,6 +23,7 @@ import ( "os" "reflect" "regexp" + "sort" "sync" "time" @@ -111,22 +112,68 @@ func NewSilencer(s *Silences, m types.Marker, l log.Logger) *Silencer { // Mutes implements the Muter interface. func (s *Silencer) Mutes(lset model.LabelSet) bool { - sils, err := s.silences.Query( - QState(types.SilenceStateActive), - QMatches(lset), + fp := lset.Fingerprint() + ids, markerVersion, _ := s.marker.Silenced(fp) + + var ( + err error + sils []*pb.Silence + newVersion = markerVersion ) + if markerVersion == s.silences.Version() { + // No new silences added, just need to check which of the old + // silences are still revelant. + if len(ids) == 0 { + // Super fast path: No silences ever applied to this + // alert, none have been added. We are done. + return false + } + // This is still a quite fast path: No silences have been added, + // we only need to check which of the applicable silences are + // currently active. Note that newVersion is left at + // markerVersion because the Query call might already return a + // newer version, which is not the version our old list of + // applicable silences is based on. + sils, _, err = s.silences.Query( + QIDs(ids...), + QState(types.SilenceStateActive), + ) + } else { + // New silences have been added, do a full query. + sils, newVersion, err = s.silences.Query( + QState(types.SilenceStateActive), + QMatches(lset), + ) + } if err != nil { level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err) } if len(sils) == 0 { - s.marker.SetSilenced(lset.Fingerprint()) + s.marker.SetSilenced(fp, newVersion) return false } - ids := make([]string, len(sils)) - for i, s := range sils { - ids[i] = s.Id + idsChanged := len(sils) != len(ids) + if !idsChanged { + // Length is the same, but is the content the same? + for i, s := range sils { + if ids[i] != s.Id { + idsChanged = true + break + } + } + } + if idsChanged { + // Need to recreate ids. + ids = make([]string, len(sils)) + for i, s := range sils { + ids[i] = s.Id + } + sort.Strings(ids) // For comparability. + } + if idsChanged || newVersion != markerVersion { + // Update marker only if something changed. + s.marker.SetSilenced(fp, newVersion, ids...) } - s.marker.SetSilenced(lset.Fingerprint(), ids...) return true } @@ -139,6 +186,7 @@ type Silences struct { mtx sync.RWMutex st state + version int // Increments whenever silences are added. broadcast func([]byte) mc matcherCache } @@ -441,7 +489,9 @@ func (s *Silences) setSilence(sil *pb.Silence) error { return err } - s.st.merge(msil, s.now()) + if s.st.merge(msil, s.now()) { + s.version++ + } s.broadcast(b) return nil @@ -615,7 +665,7 @@ func QState(states ...types.SilenceState) QueryParam { // QueryOne queries with the given parameters and returns the first result. // Returns ErrNotFound if the query result is empty. func (s *Silences) QueryOne(params ...QueryParam) (*pb.Silence, error) { - res, err := s.Query(params...) + res, _, err := s.Query(params...) if err != nil { return nil, err } @@ -625,16 +675,17 @@ func (s *Silences) QueryOne(params ...QueryParam) (*pb.Silence, error) { return res[0], nil } -// Query for silences based on the given query parameters. -func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, error) { +// Query for silences based on the given query parameters. It returns the +// resulting silences and the state version the result is based on. +func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, int, error) { start := time.Now() s.metrics.queriesTotal.Inc() - sils, err := func() ([]*pb.Silence, error) { + sils, version, err := func() ([]*pb.Silence, int, error) { q := &query{} for _, p := range params { if err := p(q); err != nil { - return nil, err + return nil, s.Version(), err } } return s.query(q, s.now()) @@ -643,23 +694,29 @@ func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, error) { s.metrics.queryErrorsTotal.Inc() } s.metrics.queryDuration.Observe(time.Since(start).Seconds()) - return sils, err + return sils, version, err +} + +// Version of the silence state. +func (s *Silences) Version() int { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.version } // Count silences by state. func (s *Silences) CountState(states ...types.SilenceState) (int, error) { // This could probably be optimized. - sils, err := s.Query(QState(states...)) + sils, _, err := s.Query(QState(states...)) if err != nil { return -1, err } return len(sils), nil } -func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) { - // If we have an ID constraint, all silences are our base set. - // This and the use of post-filter functions is the - // the trivial solution for now. +func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, int, error) { + // If we have no ID constraint, all silences are our base set. This and + // the use of post-filter functions is the trivial solution for now. var res []*pb.Silence s.mtx.Lock() @@ -683,7 +740,7 @@ func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) { for _, f := range q.filters { ok, err := f(sil, s, now) if err != nil { - return nil, err + return nil, s.version, err } if !ok { remove = true @@ -695,7 +752,7 @@ func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) { } } - return resf, nil + return resf, s.version, nil } // loadSnapshot loads a snapshot generated by Snapshot() into the state. @@ -716,6 +773,7 @@ func (s *Silences) loadSnapshot(r io.Reader) error { } s.mtx.Lock() s.st = st + s.version++ s.mtx.Unlock() return nil @@ -758,14 +816,17 @@ func (s *Silences) Merge(b []byte) error { now := s.now() for _, e := range st { - if merged := s.st.merge(e, now); merged && !cluster.OversizedMessage(b) { - // If this is the first we've seen the message and it's - // not oversized, gossip it to other nodes. We don't - // propagate oversized messages because they're sent to - // all nodes already. - s.broadcast(b) - s.metrics.propagatedMessagesTotal.Inc() - level.Debug(s.logger).Log("msg", "Gossiping new silence", "silence", e) + if merged := s.st.merge(e, now); merged { + s.version++ + if !cluster.OversizedMessage(b) { + // If this is the first we've seen the message and it's + // not oversized, gossip it to other nodes. We don't + // propagate oversized messages because they're sent to + // all nodes already. + s.broadcast(b) + s.metrics.propagatedMessagesTotal.Inc() + level.Debug(s.logger).Log("msg", "Gossiping new silence", "silence", e) + } } } return nil diff --git a/silence/silence_test.go b/silence/silence_test.go index f39fc259..71dcbb2b 100644 --- a/silence/silence_test.go +++ b/silence/silence_test.go @@ -573,7 +573,7 @@ func TestSilencesQuery(t *testing.T) { for _, c := range cases { // Run default query of retrieving all silences. - res, err := s.query(c.q, time.Time{}) + res, _, err := s.query(c.q, time.Time{}) require.NoError(t, err, "unexpected error on querying") // Currently there are no sorting guarantees in the querying API. @@ -1130,7 +1130,7 @@ func benchmarkSilencesQuery(b *testing.B, numSilences int) { } // Run things once to populate the matcherCache. - sils, err := s.Query( + sils, _, err := s.Query( QState(types.SilenceStateActive), QMatches(lset), ) @@ -1139,7 +1139,7 @@ func benchmarkSilencesQuery(b *testing.B, numSilences int) { b.ResetTimer() for i := 0; i < b.N; i++ { - sils, err := s.Query( + sils, _, err := s.Query( QState(types.SilenceStateActive), QMatches(lset), ) diff --git a/types/types.go b/types/types.go index 3c1e7a0d..00d6f9be 100644 --- a/types/types.go +++ b/types/types.go @@ -42,6 +42,8 @@ type AlertStatus struct { State AlertState `json:"state"` SilencedBy []string `json:"silencedBy"` InhibitedBy []string `json:"inhibitedBy"` + + silencesVersion int } // Marker helps to mark alerts as silenced and/or inhibited. @@ -51,11 +53,12 @@ type Marker interface { // SilencedBy and InhibitedBy entries. SetActive(alert model.Fingerprint) // SetSilenced replaces the previous SilencedBy by the provided IDs of - // silences. The set of provided IDs is supposed to represent the - // complete set of relevant silences. If no ID is provided and - // InhibitedBy is already empty, this call is equivalent - // SetActive. Otherwise, it sets AlertStateSuppressed. - SetSilenced(alert model.Fingerprint, silenceIDs ...string) + // silences, including the version number of the silences state. The set + // of provided IDs is supposed to represent the complete set of relevant + // silences. If no ID is provided and InhibitedBy is already empty, this + // call is equivalent to SetActive. Otherwise, it sets + // AlertStateSuppressed. + SetSilenced(alert model.Fingerprint, version int, silenceIDs ...string) // SetInhibited replaces the previous InhibitedBy by the provided IDs of // alerts. In contrast to SetSilenced, the set of provided IDs is not // expected to represent the complete set of inhibiting alerts. (In @@ -76,10 +79,12 @@ type Marker interface { // Various methods to inquire if the given alert is in a certain // AlertState. Silenced also returns all the silencing silences, while - // Inhibited may return only a subset of inhibiting alerts. + // Inhibited may return only a subset of inhibiting alerts. Silenced + // also returns the version of the silences state the result is based + // on. Unprocessed(model.Fingerprint) bool Active(model.Fingerprint) bool - Silenced(model.Fingerprint) ([]string, bool) + Silenced(model.Fingerprint) ([]string, int, bool) Inhibited(model.Fingerprint) ([]string, bool) } @@ -143,7 +148,7 @@ func (m *memMarker) Count(states ...AlertState) int { } // SetSilenced implements Marker. -func (m *memMarker) SetSilenced(alert model.Fingerprint, ids ...string) { +func (m *memMarker) SetSilenced(alert model.Fingerprint, version int, ids ...string) { m.mtx.Lock() s, found := m.m[alert] @@ -151,6 +156,7 @@ func (m *memMarker) SetSilenced(alert model.Fingerprint, ids ...string) { s = &AlertStatus{} m.m[alert] = s } + s.silencesVersion = version // If there are any silence or alert IDs associated with the // fingerprint, it is suppressed. Otherwise, set it to @@ -253,10 +259,11 @@ func (m *memMarker) Inhibited(alert model.Fingerprint) ([]string, bool) { } // Silenced returns whether the alert for the given Fingerprint is in the -// Silenced state and any associated silence IDs. -func (m *memMarker) Silenced(alert model.Fingerprint) ([]string, bool) { +// Silenced state, any associated silence IDs, and the silences state version +// the result is based on. +func (m *memMarker) Silenced(alert model.Fingerprint) ([]string, int, bool) { s := m.Status(alert) - return s.SilencedBy, + return s.SilencedBy, s.silencesVersion, s.State == AlertStateSuppressed && len(s.SilencedBy) > 0 }