From 3c981a92f7478df1d51ce50e7dec07df63a03e15 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 27 Feb 2019 12:33:46 +0100 Subject: [PATCH] Improve `Mutes` performance for silences Add version tracking of silences states. Adding a silence to the state increments the version. If the version hasn't changed since the last time an alert was checked for being silenced, we only have to verify that the relevant silences are still active rather than checking the alert against all silences. Signed-off-by: beorn7 --- api/v1/api.go | 4 +- api/v2/api.go | 4 +- notify/notify_test.go | 46 +++++++++++++-- silence/silence.go | 121 ++++++++++++++++++++++++++++++---------- silence/silence_test.go | 6 +- types/types.go | 29 ++++++---- 6 files changed, 156 insertions(+), 54 deletions(-) diff --git a/api/v1/api.go b/api/v1/api.go index f9d98c1f..b0b5b85d 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -556,7 +556,7 @@ func (api *API) setSilence(w http.ResponseWriter, r *http.Request) { func (api *API) getSilence(w http.ResponseWriter, r *http.Request) { sid := route.Param(r.Context(), "sid") - sils, err := api.silences.Query(silence.QIDs(sid)) + sils, _, err := api.silences.Query(silence.QIDs(sid)) if err != nil || len(sils) == 0 { http.Error(w, fmt.Sprint("Error getting silence: ", err), http.StatusNotFound) return @@ -587,7 +587,7 @@ func (api *API) delSilence(w http.ResponseWriter, r *http.Request) { } func (api *API) listSilences(w http.ResponseWriter, r *http.Request) { - psils, err := api.silences.Query() + psils, _, err := api.silences.Query() if err != nil { api.respondError(w, apiError{ typ: errorInternal, diff --git a/api/v2/api.go b/api/v2/api.go index 28349d1f..95174c1e 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -489,7 +489,7 @@ func (api *API) getSilencesHandler(params silence_ops.GetSilencesParams) middlew } } - psils, err := api.silences.Query() + psils, _, err := api.silences.Query() if err != nil { level.Error(api.logger).Log("msg", "failed to get silences", "err", err) return silence_ops.NewGetSilencesInternalServerError().WithPayload(err.Error()) @@ -521,7 +521,7 @@ func gettableSilenceMatchesFilterLabels(s open_api_models.GettableSilence, match } func (api *API) getSilenceHandler(params silence_ops.GetSilenceParams) middleware.Responder { - sils, err := api.silences.Query(silence.QIDs(params.SilenceID.String())) + sils, _, err := api.silences.Query(silence.QIDs(params.SilenceID.String())) if err != nil { level.Error(api.logger).Log("msg", "failed to get silence by id", "err", err) return silence_ops.NewGetSilenceInternalServerError().WithPayload(err.Error()) diff --git a/notify/notify_test.go b/notify/notify_test.go index b3e994f2..71e0501f 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -624,14 +624,15 @@ func TestMuteStage(t *testing.T) { } func TestMuteStageWithSilences(t *testing.T) { - silences, err := silence.New(silence.Options{}) + silences, err := silence.New(silence.Options{Retention: time.Hour}) if err != nil { t.Fatal(err) } - if _, err := silences.Set(&silencepb.Silence{ + silID, err := silences.Set(&silencepb.Silence{ EndsAt: utcNow().Add(time.Hour), Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}}, - }); err != nil { + }) + if err != nil { t.Fatal(err) } @@ -663,9 +664,9 @@ func TestMuteStageWithSilences(t *testing.T) { }) } - // Set the second alert as previously silenced. It is expected to have - // the WasSilenced flag set to true afterwards. - marker.SetSilenced(inAlerts[1].Fingerprint(), "123") + // Set the second alert as previously silenced with an old version + // number. This is expected to get unsilenced by the stage. + marker.SetSilenced(inAlerts[1].Fingerprint(), 0, "123") _, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) if err != nil { @@ -680,4 +681,37 @@ func TestMuteStageWithSilences(t *testing.T) { if !reflect.DeepEqual(got, out) { t.Fatalf("Muting failed, expected: %v\ngot %v", out, got) } + + // Do it again to exercise the version tracking of silences. + _, alerts, err = stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + + got = got[:0] + for _, a := range alerts { + got = append(got, a.Labels) + } + + if !reflect.DeepEqual(got, out) { + t.Fatalf("Muting failed, expected: %v\ngot %v", out, got) + } + + // Expire the silence and verify that no alerts are silenced now. + if err := silences.Expire(silID); err != nil { + t.Fatal(err) + } + + _, alerts, err = stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + got = got[:0] + for _, a := range alerts { + got = append(got, a.Labels) + } + + if !reflect.DeepEqual(got, in) { + t.Fatalf("Unmuting failed, expected: %v\ngot %v", in, got) + } } diff --git a/silence/silence.go b/silence/silence.go index 3687c17a..938e89d4 100644 --- a/silence/silence.go +++ b/silence/silence.go @@ -23,6 +23,7 @@ import ( "os" "reflect" "regexp" + "sort" "sync" "time" @@ -111,22 +112,68 @@ func NewSilencer(s *Silences, m types.Marker, l log.Logger) *Silencer { // Mutes implements the Muter interface. func (s *Silencer) Mutes(lset model.LabelSet) bool { - sils, err := s.silences.Query( - QState(types.SilenceStateActive), - QMatches(lset), + fp := lset.Fingerprint() + ids, markerVersion, _ := s.marker.Silenced(fp) + + var ( + err error + sils []*pb.Silence + newVersion = markerVersion ) + if markerVersion == s.silences.Version() { + // No new silences added, just need to check which of the old + // silences are still revelant. + if len(ids) == 0 { + // Super fast path: No silences ever applied to this + // alert, none have been added. We are done. + return false + } + // This is still a quite fast path: No silences have been added, + // we only need to check which of the applicable silences are + // currently active. Note that newVersion is left at + // markerVersion because the Query call might already return a + // newer version, which is not the version our old list of + // applicable silences is based on. + sils, _, err = s.silences.Query( + QIDs(ids...), + QState(types.SilenceStateActive), + ) + } else { + // New silences have been added, do a full query. + sils, newVersion, err = s.silences.Query( + QState(types.SilenceStateActive), + QMatches(lset), + ) + } if err != nil { level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err) } if len(sils) == 0 { - s.marker.SetSilenced(lset.Fingerprint()) + s.marker.SetSilenced(fp, newVersion) return false } - ids := make([]string, len(sils)) - for i, s := range sils { - ids[i] = s.Id + idsChanged := len(sils) != len(ids) + if !idsChanged { + // Length is the same, but is the content the same? + for i, s := range sils { + if ids[i] != s.Id { + idsChanged = true + break + } + } + } + if idsChanged { + // Need to recreate ids. + ids = make([]string, len(sils)) + for i, s := range sils { + ids[i] = s.Id + } + sort.Strings(ids) // For comparability. + } + if idsChanged || newVersion != markerVersion { + // Update marker only if something changed. + s.marker.SetSilenced(fp, newVersion, ids...) } - s.marker.SetSilenced(lset.Fingerprint(), ids...) return true } @@ -139,6 +186,7 @@ type Silences struct { mtx sync.RWMutex st state + version int // Increments whenever silences are added. broadcast func([]byte) mc matcherCache } @@ -441,7 +489,9 @@ func (s *Silences) setSilence(sil *pb.Silence) error { return err } - s.st.merge(msil, s.now()) + if s.st.merge(msil, s.now()) { + s.version++ + } s.broadcast(b) return nil @@ -615,7 +665,7 @@ func QState(states ...types.SilenceState) QueryParam { // QueryOne queries with the given parameters and returns the first result. // Returns ErrNotFound if the query result is empty. func (s *Silences) QueryOne(params ...QueryParam) (*pb.Silence, error) { - res, err := s.Query(params...) + res, _, err := s.Query(params...) if err != nil { return nil, err } @@ -625,16 +675,17 @@ func (s *Silences) QueryOne(params ...QueryParam) (*pb.Silence, error) { return res[0], nil } -// Query for silences based on the given query parameters. -func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, error) { +// Query for silences based on the given query parameters. It returns the +// resulting silences and the state version the result is based on. +func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, int, error) { start := time.Now() s.metrics.queriesTotal.Inc() - sils, err := func() ([]*pb.Silence, error) { + sils, version, err := func() ([]*pb.Silence, int, error) { q := &query{} for _, p := range params { if err := p(q); err != nil { - return nil, err + return nil, s.Version(), err } } return s.query(q, s.now()) @@ -643,23 +694,29 @@ func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, error) { s.metrics.queryErrorsTotal.Inc() } s.metrics.queryDuration.Observe(time.Since(start).Seconds()) - return sils, err + return sils, version, err +} + +// Version of the silence state. +func (s *Silences) Version() int { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.version } // Count silences by state. func (s *Silences) CountState(states ...types.SilenceState) (int, error) { // This could probably be optimized. - sils, err := s.Query(QState(states...)) + sils, _, err := s.Query(QState(states...)) if err != nil { return -1, err } return len(sils), nil } -func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) { - // If we have an ID constraint, all silences are our base set. - // This and the use of post-filter functions is the - // the trivial solution for now. +func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, int, error) { + // If we have no ID constraint, all silences are our base set. This and + // the use of post-filter functions is the trivial solution for now. var res []*pb.Silence s.mtx.Lock() @@ -683,7 +740,7 @@ func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) { for _, f := range q.filters { ok, err := f(sil, s, now) if err != nil { - return nil, err + return nil, s.version, err } if !ok { remove = true @@ -695,7 +752,7 @@ func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) { } } - return resf, nil + return resf, s.version, nil } // loadSnapshot loads a snapshot generated by Snapshot() into the state. @@ -716,6 +773,7 @@ func (s *Silences) loadSnapshot(r io.Reader) error { } s.mtx.Lock() s.st = st + s.version++ s.mtx.Unlock() return nil @@ -758,14 +816,17 @@ func (s *Silences) Merge(b []byte) error { now := s.now() for _, e := range st { - if merged := s.st.merge(e, now); merged && !cluster.OversizedMessage(b) { - // If this is the first we've seen the message and it's - // not oversized, gossip it to other nodes. We don't - // propagate oversized messages because they're sent to - // all nodes already. - s.broadcast(b) - s.metrics.propagatedMessagesTotal.Inc() - level.Debug(s.logger).Log("msg", "Gossiping new silence", "silence", e) + if merged := s.st.merge(e, now); merged { + s.version++ + if !cluster.OversizedMessage(b) { + // If this is the first we've seen the message and it's + // not oversized, gossip it to other nodes. We don't + // propagate oversized messages because they're sent to + // all nodes already. + s.broadcast(b) + s.metrics.propagatedMessagesTotal.Inc() + level.Debug(s.logger).Log("msg", "Gossiping new silence", "silence", e) + } } } return nil diff --git a/silence/silence_test.go b/silence/silence_test.go index f39fc259..71dcbb2b 100644 --- a/silence/silence_test.go +++ b/silence/silence_test.go @@ -573,7 +573,7 @@ func TestSilencesQuery(t *testing.T) { for _, c := range cases { // Run default query of retrieving all silences. - res, err := s.query(c.q, time.Time{}) + res, _, err := s.query(c.q, time.Time{}) require.NoError(t, err, "unexpected error on querying") // Currently there are no sorting guarantees in the querying API. @@ -1130,7 +1130,7 @@ func benchmarkSilencesQuery(b *testing.B, numSilences int) { } // Run things once to populate the matcherCache. - sils, err := s.Query( + sils, _, err := s.Query( QState(types.SilenceStateActive), QMatches(lset), ) @@ -1139,7 +1139,7 @@ func benchmarkSilencesQuery(b *testing.B, numSilences int) { b.ResetTimer() for i := 0; i < b.N; i++ { - sils, err := s.Query( + sils, _, err := s.Query( QState(types.SilenceStateActive), QMatches(lset), ) diff --git a/types/types.go b/types/types.go index 3c1e7a0d..00d6f9be 100644 --- a/types/types.go +++ b/types/types.go @@ -42,6 +42,8 @@ type AlertStatus struct { State AlertState `json:"state"` SilencedBy []string `json:"silencedBy"` InhibitedBy []string `json:"inhibitedBy"` + + silencesVersion int } // Marker helps to mark alerts as silenced and/or inhibited. @@ -51,11 +53,12 @@ type Marker interface { // SilencedBy and InhibitedBy entries. SetActive(alert model.Fingerprint) // SetSilenced replaces the previous SilencedBy by the provided IDs of - // silences. The set of provided IDs is supposed to represent the - // complete set of relevant silences. If no ID is provided and - // InhibitedBy is already empty, this call is equivalent - // SetActive. Otherwise, it sets AlertStateSuppressed. - SetSilenced(alert model.Fingerprint, silenceIDs ...string) + // silences, including the version number of the silences state. The set + // of provided IDs is supposed to represent the complete set of relevant + // silences. If no ID is provided and InhibitedBy is already empty, this + // call is equivalent to SetActive. Otherwise, it sets + // AlertStateSuppressed. + SetSilenced(alert model.Fingerprint, version int, silenceIDs ...string) // SetInhibited replaces the previous InhibitedBy by the provided IDs of // alerts. In contrast to SetSilenced, the set of provided IDs is not // expected to represent the complete set of inhibiting alerts. (In @@ -76,10 +79,12 @@ type Marker interface { // Various methods to inquire if the given alert is in a certain // AlertState. Silenced also returns all the silencing silences, while - // Inhibited may return only a subset of inhibiting alerts. + // Inhibited may return only a subset of inhibiting alerts. Silenced + // also returns the version of the silences state the result is based + // on. Unprocessed(model.Fingerprint) bool Active(model.Fingerprint) bool - Silenced(model.Fingerprint) ([]string, bool) + Silenced(model.Fingerprint) ([]string, int, bool) Inhibited(model.Fingerprint) ([]string, bool) } @@ -143,7 +148,7 @@ func (m *memMarker) Count(states ...AlertState) int { } // SetSilenced implements Marker. -func (m *memMarker) SetSilenced(alert model.Fingerprint, ids ...string) { +func (m *memMarker) SetSilenced(alert model.Fingerprint, version int, ids ...string) { m.mtx.Lock() s, found := m.m[alert] @@ -151,6 +156,7 @@ func (m *memMarker) SetSilenced(alert model.Fingerprint, ids ...string) { s = &AlertStatus{} m.m[alert] = s } + s.silencesVersion = version // If there are any silence or alert IDs associated with the // fingerprint, it is suppressed. Otherwise, set it to @@ -253,10 +259,11 @@ func (m *memMarker) Inhibited(alert model.Fingerprint) ([]string, bool) { } // Silenced returns whether the alert for the given Fingerprint is in the -// Silenced state and any associated silence IDs. -func (m *memMarker) Silenced(alert model.Fingerprint) ([]string, bool) { +// Silenced state, any associated silence IDs, and the silences state version +// the result is based on. +func (m *memMarker) Silenced(alert model.Fingerprint) ([]string, int, bool) { s := m.Status(alert) - return s.SilencedBy, + return s.SilencedBy, s.silencesVersion, s.State == AlertStateSuppressed && len(s.SilencedBy) > 0 }