diff --git a/provider/mem/mem.go b/provider/mem/mem.go index fc9e1093..7e0875c6 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -75,6 +75,16 @@ func (a *Alerts) runGC() { } } + for i, l := range a.listeners { + select { + case <-l.done: + delete(a.listeners, i) + close(l.alerts) + default: + // listener is not closed yet, hence proceed. + } + } + a.mtx.Unlock() } } @@ -85,15 +95,27 @@ func (a *Alerts) Close() error { return nil } +func max(a, b int) int { + if a > b { + return a + } + return b +} + // Subscribe returns an iterator over active alerts that have not been // resolved and successfully notified about. // They are not guaranteed to be in chronological order. func (a *Alerts) Subscribe() provider.AlertIterator { + alerts, err := a.getPending() + var ( - ch = make(chan *types.Alert, alertChannelLength) + ch = make(chan *types.Alert, max(len(alerts), alertChannelLength)) done = make(chan struct{}) ) - alerts, err := a.getPending() + + for _, a := range alerts { + ch <- a + } a.mtx.Lock() i := a.next @@ -101,25 +123,6 @@ func (a *Alerts) Subscribe() provider.AlertIterator { a.listeners[i] = listeningAlerts{alerts: ch, done: done} a.mtx.Unlock() - go func() { - defer func() { - a.mtx.Lock() - delete(a.listeners, i) - close(ch) - a.mtx.Unlock() - }() - - for _, a := range alerts { - select { - case ch <- a: - case <-done: - return - } - } - - <-done - }() - return provider.NewAlertIterator(ch, done, err) }