Improve `Mutes` performance for silences

Add version tracking of silences states. Adding a silence to the state
increments the version. If the version hasn't changed since the last
time an alert was checked for being silenced, we only have to verify
that the relevant silences are still active rather than checking the
alert against all silences.

Signed-off-by: beorn7 <beorn@soundcloud.com>
This commit is contained in:
beorn7 2019-02-27 12:33:46 +01:00
parent 49ff877079
commit 3c981a92f7
6 changed files with 156 additions and 54 deletions

View File

@ -556,7 +556,7 @@ func (api *API) setSilence(w http.ResponseWriter, r *http.Request) {
func (api *API) getSilence(w http.ResponseWriter, r *http.Request) {
sid := route.Param(r.Context(), "sid")
sils, err := api.silences.Query(silence.QIDs(sid))
sils, _, err := api.silences.Query(silence.QIDs(sid))
if err != nil || len(sils) == 0 {
http.Error(w, fmt.Sprint("Error getting silence: ", err), http.StatusNotFound)
return
@ -587,7 +587,7 @@ func (api *API) delSilence(w http.ResponseWriter, r *http.Request) {
}
func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
psils, err := api.silences.Query()
psils, _, err := api.silences.Query()
if err != nil {
api.respondError(w, apiError{
typ: errorInternal,

View File

@ -489,7 +489,7 @@ func (api *API) getSilencesHandler(params silence_ops.GetSilencesParams) middlew
}
}
psils, err := api.silences.Query()
psils, _, err := api.silences.Query()
if err != nil {
level.Error(api.logger).Log("msg", "failed to get silences", "err", err)
return silence_ops.NewGetSilencesInternalServerError().WithPayload(err.Error())
@ -521,7 +521,7 @@ func gettableSilenceMatchesFilterLabels(s open_api_models.GettableSilence, match
}
func (api *API) getSilenceHandler(params silence_ops.GetSilenceParams) middleware.Responder {
sils, err := api.silences.Query(silence.QIDs(params.SilenceID.String()))
sils, _, err := api.silences.Query(silence.QIDs(params.SilenceID.String()))
if err != nil {
level.Error(api.logger).Log("msg", "failed to get silence by id", "err", err)
return silence_ops.NewGetSilenceInternalServerError().WithPayload(err.Error())

View File

@ -624,14 +624,15 @@ func TestMuteStage(t *testing.T) {
}
func TestMuteStageWithSilences(t *testing.T) {
silences, err := silence.New(silence.Options{})
silences, err := silence.New(silence.Options{Retention: time.Hour})
if err != nil {
t.Fatal(err)
}
if _, err := silences.Set(&silencepb.Silence{
silID, err := silences.Set(&silencepb.Silence{
EndsAt: utcNow().Add(time.Hour),
Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}},
}); err != nil {
})
if err != nil {
t.Fatal(err)
}
@ -663,9 +664,9 @@ func TestMuteStageWithSilences(t *testing.T) {
})
}
// Set the second alert as previously silenced. It is expected to have
// the WasSilenced flag set to true afterwards.
marker.SetSilenced(inAlerts[1].Fingerprint(), "123")
// Set the second alert as previously silenced with an old version
// number. This is expected to get unsilenced by the stage.
marker.SetSilenced(inAlerts[1].Fingerprint(), 0, "123")
_, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...)
if err != nil {
@ -680,4 +681,37 @@ func TestMuteStageWithSilences(t *testing.T) {
if !reflect.DeepEqual(got, out) {
t.Fatalf("Muting failed, expected: %v\ngot %v", out, got)
}
// Do it again to exercise the version tracking of silences.
_, alerts, err = stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
got = got[:0]
for _, a := range alerts {
got = append(got, a.Labels)
}
if !reflect.DeepEqual(got, out) {
t.Fatalf("Muting failed, expected: %v\ngot %v", out, got)
}
// Expire the silence and verify that no alerts are silenced now.
if err := silences.Expire(silID); err != nil {
t.Fatal(err)
}
_, alerts, err = stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
got = got[:0]
for _, a := range alerts {
got = append(got, a.Labels)
}
if !reflect.DeepEqual(got, in) {
t.Fatalf("Unmuting failed, expected: %v\ngot %v", in, got)
}
}

View File

@ -23,6 +23,7 @@ import (
"os"
"reflect"
"regexp"
"sort"
"sync"
"time"
@ -111,22 +112,68 @@ func NewSilencer(s *Silences, m types.Marker, l log.Logger) *Silencer {
// Mutes implements the Muter interface.
func (s *Silencer) Mutes(lset model.LabelSet) bool {
sils, err := s.silences.Query(
QState(types.SilenceStateActive),
QMatches(lset),
fp := lset.Fingerprint()
ids, markerVersion, _ := s.marker.Silenced(fp)
var (
err error
sils []*pb.Silence
newVersion = markerVersion
)
if markerVersion == s.silences.Version() {
// No new silences added, just need to check which of the old
// silences are still revelant.
if len(ids) == 0 {
// Super fast path: No silences ever applied to this
// alert, none have been added. We are done.
return false
}
// This is still a quite fast path: No silences have been added,
// we only need to check which of the applicable silences are
// currently active. Note that newVersion is left at
// markerVersion because the Query call might already return a
// newer version, which is not the version our old list of
// applicable silences is based on.
sils, _, err = s.silences.Query(
QIDs(ids...),
QState(types.SilenceStateActive),
)
} else {
// New silences have been added, do a full query.
sils, newVersion, err = s.silences.Query(
QState(types.SilenceStateActive),
QMatches(lset),
)
}
if err != nil {
level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err)
}
if len(sils) == 0 {
s.marker.SetSilenced(lset.Fingerprint())
s.marker.SetSilenced(fp, newVersion)
return false
}
ids := make([]string, len(sils))
for i, s := range sils {
ids[i] = s.Id
idsChanged := len(sils) != len(ids)
if !idsChanged {
// Length is the same, but is the content the same?
for i, s := range sils {
if ids[i] != s.Id {
idsChanged = true
break
}
}
}
if idsChanged {
// Need to recreate ids.
ids = make([]string, len(sils))
for i, s := range sils {
ids[i] = s.Id
}
sort.Strings(ids) // For comparability.
}
if idsChanged || newVersion != markerVersion {
// Update marker only if something changed.
s.marker.SetSilenced(fp, newVersion, ids...)
}
s.marker.SetSilenced(lset.Fingerprint(), ids...)
return true
}
@ -139,6 +186,7 @@ type Silences struct {
mtx sync.RWMutex
st state
version int // Increments whenever silences are added.
broadcast func([]byte)
mc matcherCache
}
@ -441,7 +489,9 @@ func (s *Silences) setSilence(sil *pb.Silence) error {
return err
}
s.st.merge(msil, s.now())
if s.st.merge(msil, s.now()) {
s.version++
}
s.broadcast(b)
return nil
@ -615,7 +665,7 @@ func QState(states ...types.SilenceState) QueryParam {
// QueryOne queries with the given parameters and returns the first result.
// Returns ErrNotFound if the query result is empty.
func (s *Silences) QueryOne(params ...QueryParam) (*pb.Silence, error) {
res, err := s.Query(params...)
res, _, err := s.Query(params...)
if err != nil {
return nil, err
}
@ -625,16 +675,17 @@ func (s *Silences) QueryOne(params ...QueryParam) (*pb.Silence, error) {
return res[0], nil
}
// Query for silences based on the given query parameters.
func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, error) {
// Query for silences based on the given query parameters. It returns the
// resulting silences and the state version the result is based on.
func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, int, error) {
start := time.Now()
s.metrics.queriesTotal.Inc()
sils, err := func() ([]*pb.Silence, error) {
sils, version, err := func() ([]*pb.Silence, int, error) {
q := &query{}
for _, p := range params {
if err := p(q); err != nil {
return nil, err
return nil, s.Version(), err
}
}
return s.query(q, s.now())
@ -643,23 +694,29 @@ func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, error) {
s.metrics.queryErrorsTotal.Inc()
}
s.metrics.queryDuration.Observe(time.Since(start).Seconds())
return sils, err
return sils, version, err
}
// Version of the silence state.
func (s *Silences) Version() int {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.version
}
// Count silences by state.
func (s *Silences) CountState(states ...types.SilenceState) (int, error) {
// This could probably be optimized.
sils, err := s.Query(QState(states...))
sils, _, err := s.Query(QState(states...))
if err != nil {
return -1, err
}
return len(sils), nil
}
func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) {
// If we have an ID constraint, all silences are our base set.
// This and the use of post-filter functions is the
// the trivial solution for now.
func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, int, error) {
// If we have no ID constraint, all silences are our base set. This and
// the use of post-filter functions is the trivial solution for now.
var res []*pb.Silence
s.mtx.Lock()
@ -683,7 +740,7 @@ func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) {
for _, f := range q.filters {
ok, err := f(sil, s, now)
if err != nil {
return nil, err
return nil, s.version, err
}
if !ok {
remove = true
@ -695,7 +752,7 @@ func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) {
}
}
return resf, nil
return resf, s.version, nil
}
// loadSnapshot loads a snapshot generated by Snapshot() into the state.
@ -716,6 +773,7 @@ func (s *Silences) loadSnapshot(r io.Reader) error {
}
s.mtx.Lock()
s.st = st
s.version++
s.mtx.Unlock()
return nil
@ -758,14 +816,17 @@ func (s *Silences) Merge(b []byte) error {
now := s.now()
for _, e := range st {
if merged := s.st.merge(e, now); merged && !cluster.OversizedMessage(b) {
// If this is the first we've seen the message and it's
// not oversized, gossip it to other nodes. We don't
// propagate oversized messages because they're sent to
// all nodes already.
s.broadcast(b)
s.metrics.propagatedMessagesTotal.Inc()
level.Debug(s.logger).Log("msg", "Gossiping new silence", "silence", e)
if merged := s.st.merge(e, now); merged {
s.version++
if !cluster.OversizedMessage(b) {
// If this is the first we've seen the message and it's
// not oversized, gossip it to other nodes. We don't
// propagate oversized messages because they're sent to
// all nodes already.
s.broadcast(b)
s.metrics.propagatedMessagesTotal.Inc()
level.Debug(s.logger).Log("msg", "Gossiping new silence", "silence", e)
}
}
}
return nil

View File

@ -573,7 +573,7 @@ func TestSilencesQuery(t *testing.T) {
for _, c := range cases {
// Run default query of retrieving all silences.
res, err := s.query(c.q, time.Time{})
res, _, err := s.query(c.q, time.Time{})
require.NoError(t, err, "unexpected error on querying")
// Currently there are no sorting guarantees in the querying API.
@ -1130,7 +1130,7 @@ func benchmarkSilencesQuery(b *testing.B, numSilences int) {
}
// Run things once to populate the matcherCache.
sils, err := s.Query(
sils, _, err := s.Query(
QState(types.SilenceStateActive),
QMatches(lset),
)
@ -1139,7 +1139,7 @@ func benchmarkSilencesQuery(b *testing.B, numSilences int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
sils, err := s.Query(
sils, _, err := s.Query(
QState(types.SilenceStateActive),
QMatches(lset),
)

View File

@ -42,6 +42,8 @@ type AlertStatus struct {
State AlertState `json:"state"`
SilencedBy []string `json:"silencedBy"`
InhibitedBy []string `json:"inhibitedBy"`
silencesVersion int
}
// Marker helps to mark alerts as silenced and/or inhibited.
@ -51,11 +53,12 @@ type Marker interface {
// SilencedBy and InhibitedBy entries.
SetActive(alert model.Fingerprint)
// SetSilenced replaces the previous SilencedBy by the provided IDs of
// silences. The set of provided IDs is supposed to represent the
// complete set of relevant silences. If no ID is provided and
// InhibitedBy is already empty, this call is equivalent
// SetActive. Otherwise, it sets AlertStateSuppressed.
SetSilenced(alert model.Fingerprint, silenceIDs ...string)
// silences, including the version number of the silences state. The set
// of provided IDs is supposed to represent the complete set of relevant
// silences. If no ID is provided and InhibitedBy is already empty, this
// call is equivalent to SetActive. Otherwise, it sets
// AlertStateSuppressed.
SetSilenced(alert model.Fingerprint, version int, silenceIDs ...string)
// SetInhibited replaces the previous InhibitedBy by the provided IDs of
// alerts. In contrast to SetSilenced, the set of provided IDs is not
// expected to represent the complete set of inhibiting alerts. (In
@ -76,10 +79,12 @@ type Marker interface {
// Various methods to inquire if the given alert is in a certain
// AlertState. Silenced also returns all the silencing silences, while
// Inhibited may return only a subset of inhibiting alerts.
// Inhibited may return only a subset of inhibiting alerts. Silenced
// also returns the version of the silences state the result is based
// on.
Unprocessed(model.Fingerprint) bool
Active(model.Fingerprint) bool
Silenced(model.Fingerprint) ([]string, bool)
Silenced(model.Fingerprint) ([]string, int, bool)
Inhibited(model.Fingerprint) ([]string, bool)
}
@ -143,7 +148,7 @@ func (m *memMarker) Count(states ...AlertState) int {
}
// SetSilenced implements Marker.
func (m *memMarker) SetSilenced(alert model.Fingerprint, ids ...string) {
func (m *memMarker) SetSilenced(alert model.Fingerprint, version int, ids ...string) {
m.mtx.Lock()
s, found := m.m[alert]
@ -151,6 +156,7 @@ func (m *memMarker) SetSilenced(alert model.Fingerprint, ids ...string) {
s = &AlertStatus{}
m.m[alert] = s
}
s.silencesVersion = version
// If there are any silence or alert IDs associated with the
// fingerprint, it is suppressed. Otherwise, set it to
@ -253,10 +259,11 @@ func (m *memMarker) Inhibited(alert model.Fingerprint) ([]string, bool) {
}
// Silenced returns whether the alert for the given Fingerprint is in the
// Silenced state and any associated silence IDs.
func (m *memMarker) Silenced(alert model.Fingerprint) ([]string, bool) {
// Silenced state, any associated silence IDs, and the silences state version
// the result is based on.
func (m *memMarker) Silenced(alert model.Fingerprint) ([]string, int, bool) {
s := m.Status(alert)
return s.SilencedBy,
return s.SilencedBy, s.silencesVersion,
s.State == AlertStateSuppressed && len(s.SilencedBy) > 0
}