From 4535311c34f13ddcc492e3c1e769eec85c18dde2 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Wed, 18 Sep 2019 09:29:34 +0200 Subject: [PATCH] dispatch: don't garbage-collect alerts from store The aggregation group is already responsible for removing the resolved alerts. Running the garbage collection in parallel introduces a race and eventually resolved notifications may be dropped. Signed-off-by: Simon Pasquier --- dispatch/dispatch.go | 11 ++++------- inhibit/inhibit.go | 4 ++-- inhibit/inhibit_test.go | 6 +++--- provider/mem/mem.go | 4 ++-- store/store.go | 19 ++++++------------- store/store_test.go | 31 +++++++++++++++++++++---------- 6 files changed, 38 insertions(+), 37 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 24a390ae..377cfcda 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -313,12 +313,10 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func( routeKey: r.Key(), opts: &r.RouteOpts, timeout: to, - alerts: store.NewAlerts(15 * time.Minute), + alerts: store.NewAlerts(), done: make(chan struct{}), } - ag.ctx, ag.cancel = context.WithCancel(ctx) - ag.alerts.Run(ag.ctx) ag.logger = log.With(logger, "aggrGroup", ag) @@ -438,14 +436,13 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { fp := a.Fingerprint() got, err := ag.alerts.Get(fp) if err != nil { - // This should only happen if the Alert was - // deleted from the store during the flush. - level.Error(ag.logger).Log("msg", "failed to get alert", "err", err) + // This should never happen. + level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String()) continue } if a.Resolved() && got.UpdatedAt == a.UpdatedAt { if err := ag.alerts.Delete(fp); err != nil { - level.Error(ag.logger).Log("msg", "error on delete alert", "err", err) + level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String()) } } } diff --git a/inhibit/inhibit.go b/inhibit/inhibit.go index b4d00fd2..2cb68581 100644 --- a/inhibit/inhibit.go +++ b/inhibit/inhibit.go @@ -94,7 +94,7 @@ func (ih *Inhibitor) Run() { runCtx, runCancel := context.WithCancel(ctx) for _, rule := range ih.rules { - rule.scache.Run(runCtx) + rule.scache.Run(runCtx, 15*time.Minute) } g.Add(func() error { @@ -194,7 +194,7 @@ func NewInhibitRule(cr *config.InhibitRule) *InhibitRule { SourceMatchers: sourcem, TargetMatchers: targetm, Equal: equal, - scache: store.NewAlerts(15 * time.Minute), + scache: store.NewAlerts(), } } diff --git a/inhibit/inhibit_test.go b/inhibit/inhibit_test.go index 825ff539..646c1e9c 100644 --- a/inhibit/inhibit_test.go +++ b/inhibit/inhibit_test.go @@ -122,7 +122,7 @@ func TestInhibitRuleHasEqual(t *testing.T) { for _, c := range cases { r := &InhibitRule{ Equal: map[model.LabelName]struct{}{}, - scache: store.NewAlerts(5 * time.Minute), + scache: store.NewAlerts(), } for _, ln := range c.equal { r.Equal[ln] = struct{}{} @@ -170,9 +170,9 @@ func TestInhibitRuleMatches(t *testing.T) { }, } - ih.rules[0].scache = store.NewAlerts(5 * time.Minute) + ih.rules[0].scache = store.NewAlerts() ih.rules[0].scache.Set(sourceAlert1) - ih.rules[1].scache = store.NewAlerts(5 * time.Minute) + ih.rules[1].scache = store.NewAlerts() ih.rules[1].scache.Set(sourceAlert2) cases := []struct { diff --git a/provider/mem/mem.go b/provider/mem/mem.go index 9cae5487..4bae2905 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -50,7 +50,7 @@ type listeningAlerts struct { func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l log.Logger) (*Alerts, error) { ctx, cancel := context.WithCancel(ctx) a := &Alerts{ - alerts: store.NewAlerts(intervalGC), + alerts: store.NewAlerts(), cancel: cancel, listeners: map[int]listeningAlerts{}, next: 0, @@ -76,7 +76,7 @@ func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l } a.mtx.Unlock() }) - a.alerts.Run(ctx) + a.alerts.Run(ctx, intervalGC) return a, nil } diff --git a/store/store.go b/store/store.go index 16f04571..bed77474 100644 --- a/store/store.go +++ b/store/store.go @@ -33,23 +33,16 @@ var ( // gcInterval. An optional callback can be set which receives a slice of all // resolved alerts that have been removed. type Alerts struct { - gcInterval time.Duration - sync.Mutex c map[model.Fingerprint]*types.Alert cb func([]*types.Alert) } // NewAlerts returns a new Alerts struct. -func NewAlerts(gcInterval time.Duration) *Alerts { - if gcInterval == 0 { - gcInterval = time.Minute - } - +func NewAlerts() *Alerts { a := &Alerts{ - c: make(map[model.Fingerprint]*types.Alert), - cb: func(_ []*types.Alert) {}, - gcInterval: gcInterval, + c: make(map[model.Fingerprint]*types.Alert), + cb: func(_ []*types.Alert) {}, } return a @@ -63,8 +56,8 @@ func (a *Alerts) SetGCCallback(cb func([]*types.Alert)) { a.cb = cb } -// Run starts the GC loop. -func (a *Alerts) Run(ctx context.Context) { +// Run starts the GC loop. The interval must be greater than zero; if not, the function will panic. +func (a *Alerts) Run(ctx context.Context, interval time.Duration) { go func(t *time.Ticker) { for { select { @@ -74,7 +67,7 @@ func (a *Alerts) Run(ctx context.Context) { a.gc() } } - }(time.NewTicker(a.gcInterval)) + }(time.NewTicker(interval)) } func (a *Alerts) gc() { diff --git a/store/store_test.go b/store/store_test.go index c525ba53..67f21a3b 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -28,8 +28,8 @@ func TestSetGet(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - a := NewAlerts(d) - a.Run(ctx) + a := NewAlerts() + a.Run(ctx, d) alert := &types.Alert{ UpdatedAt: time.Now(), } @@ -46,8 +46,8 @@ func TestDelete(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - a := NewAlerts(d) - a.Run(ctx) + a := NewAlerts() + a.Run(ctx, d) alert := &types.Alert{ UpdatedAt: time.Now(), } @@ -82,18 +82,29 @@ func TestGC(t *testing.T) { newAlert("a", -10, -5), newAlert("d", -10, -1), } - s := NewAlerts(5 * time.Minute) - var n int + s := NewAlerts() + var ( + n int + done = make(chan struct{}) + ) s.SetGCCallback(func(a []*types.Alert) { - for range a { - n++ + n += len(a) + if n >= len(resolved) { + close(done) } }) for _, alert := range append(active, resolved...) { require.NoError(t, s.Set(alert)) } - - s.gc() + ctx, cancel := context.WithCancel(context.Background()) + s.Run(ctx, 10*time.Millisecond) + select { + case <-done: + cancel() + break + case <-time.After(1 * time.Second): + t.Fatal("garbage collection didn't complete in time") + } for _, alert := range active { if _, err := s.Get(alert.Fingerprint()); err != nil {