diff --git a/provider/mem/mem.go b/provider/mem/mem.go index cfc3bfc3..1f59ccbe 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -34,10 +34,11 @@ const alertChannelLength = 200 type Alerts struct { cancel context.CancelFunc + mtx sync.Mutex + alerts *store.Alerts marker types.AlertMarker - mtx sync.Mutex listeners map[int]listeningAlerts next int @@ -100,37 +101,53 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio logger: log.With(l, "component", "provider"), callback: alertCallback, } - a.alerts.SetGCCallback(func(alerts []types.Alert) { - for _, alert := range alerts { - // As we don't persist alerts, we no longer consider them after - // they are resolved. Alerts waiting for resolved notifications are - // held in memory in aggregation groups redundantly. - m.Delete(alert.Fingerprint()) - a.callback.PostDelete(&alert) - } - - a.mtx.Lock() - for i, l := range a.listeners { - select { - case <-l.done: - delete(a.listeners, i) - close(l.alerts) - default: - // listener is not closed yet, hence proceed. - } - } - a.mtx.Unlock() - }) if r != nil { a.registerMetrics(r) } - go a.alerts.Run(ctx, intervalGC) + go a.gcLoop(ctx, intervalGC) return a, nil } +func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + a.gc() + } + } +} + +func (a *Alerts) gc() { + a.mtx.Lock() + defer a.mtx.Unlock() + + deleted := a.alerts.GC() + for _, alert := range deleted { + // As we don't persist alerts, we no longer consider them after + // they are resolved. Alerts waiting for resolved notifications are + // held in memory in aggregation groups redundantly. + a.marker.Delete(alert.Fingerprint()) + a.callback.PostDelete(&alert) + } + + for i, l := range a.listeners { + select { + case <-l.done: + delete(a.listeners, i) + close(l.alerts) + default: + // listener is not closed yet, hence proceed. + } + } +} + // Close the alert provider. func (a *Alerts) Close() { if a.cancel != nil { @@ -174,11 +191,13 @@ func (a *Alerts) GetPending() provider.AlertIterator { ch = make(chan *types.Alert, alertChannelLength) done = make(chan struct{}) ) + a.mtx.Lock() + defer a.mtx.Unlock() + alerts := a.alerts.List() go func() { defer close(ch) - - for _, a := range a.alerts.List() { + for _, a := range alerts { select { case ch <- a: case <-done: @@ -192,11 +211,16 @@ func (a *Alerts) GetPending() provider.AlertIterator { // Get returns the alert for a given fingerprint. func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { + a.mtx.Lock() + defer a.mtx.Unlock() return a.alerts.Get(fp) } // Put adds the given alert to the set. func (a *Alerts) Put(alerts ...*types.Alert) error { + a.mtx.Lock() + defer a.mtx.Unlock() + for _, alert := range alerts { fp := alert.Fingerprint() @@ -226,14 +250,12 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { a.callback.PostStore(alert, existing) - a.mtx.Lock() for _, l := range a.listeners { select { case l.alerts <- alert: case <-l.done: } } - a.mtx.Unlock() } return nil @@ -241,6 +263,9 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { // count returns the number of non-resolved alerts we currently have stored filtered by the provided state. func (a *Alerts) count(state types.AlertState) int { + a.mtx.Lock() + defer a.mtx.Unlock() + var count int for _, alert := range a.alerts.List() { if alert.Resolved() { diff --git a/provider/mem/mem_test.go b/provider/mem/mem_test.go index 85aa97e6..c000cbbf 100644 --- a/provider/mem/mem_test.go +++ b/provider/mem/mem_test.go @@ -15,6 +15,7 @@ package mem import ( "context" + "errors" "fmt" "reflect" "strconv" @@ -561,3 +562,62 @@ func (l *limitCountCallback) PostStore(_ *types.Alert, existing bool) { func (l *limitCountCallback) PostDelete(_ *types.Alert) { l.alerts.Dec() } + +func TestAlertsConcurrently(t *testing.T) { + callback := &limitCountCallback{limit: 100} + a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), time.Millisecond, callback, log.NewNopLogger(), nil) + require.NoError(t, err) + + stopc := make(chan struct{}) + failc := make(chan struct{}) + go func() { + time.Sleep(2 * time.Second) + close(stopc) + }() + expire := 10 * time.Millisecond + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + j := 0 + for { + select { + case <-failc: + return + case <-stopc: + return + default: + } + now := time.Now() + err := a.Put(&types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"bar": model.LabelValue(strconv.Itoa(j))}, + StartsAt: now, + EndsAt: now.Add(expire), + }, + UpdatedAt: now, + }) + if err != nil && !errors.Is(err, errTooManyAlerts) { + close(failc) + return + } + j++ + } + }() + } + wg.Wait() + select { + case <-failc: + t.Fatalf("unexpected error happened") + default: + } + + time.Sleep(expire) + require.Eventually(t, func() bool { + // When the alert will eventually expire and is considered resolved - it won't count. + return a.count(types.AlertStateActive) == 0 + }, 2*expire, expire) + require.Equal(t, int32(0), callback.alerts.Load()) +} diff --git a/store/store.go b/store/store.go index 5de0c3ee..e4275aa1 100644 --- a/store/store.go +++ b/store/store.go @@ -64,12 +64,13 @@ func (a *Alerts) Run(ctx context.Context, interval time.Duration) { case <-ctx.Done(): return case <-t.C: - a.gc() + a.GC() } } } -func (a *Alerts) gc() { +// GC deletes resolved alerts and returns them. +func (a *Alerts) GC() []types.Alert { a.Lock() var resolved []types.Alert for fp, alert := range a.c { @@ -90,6 +91,7 @@ func (a *Alerts) gc() { } a.Unlock() a.cb(resolved) + return resolved } // Get returns the Alert with the matching fingerprint, or an error if it is