Merge pull request #1514 from s-urbaniak/concurrency

provider/mem: cleanup closed listener in GC
This commit is contained in:
Max Inden 2018-08-13 23:17:09 +02:00 committed by GitHub
commit 2b4598c6d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -75,6 +75,16 @@ func (a *Alerts) runGC() {
}
}
for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
}
a.mtx.Unlock()
}
}
@ -85,15 +95,27 @@ func (a *Alerts) Close() error {
return nil
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
// Subscribe returns an iterator over active alerts that have not been
// resolved and successfully notified about.
// They are not guaranteed to be in chronological order.
func (a *Alerts) Subscribe() provider.AlertIterator {
alerts, err := a.getPending()
var (
ch = make(chan *types.Alert, alertChannelLength)
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
done = make(chan struct{})
)
alerts, err := a.getPending()
for _, a := range alerts {
ch <- a
}
a.mtx.Lock()
i := a.next
@ -101,25 +123,6 @@ func (a *Alerts) Subscribe() provider.AlertIterator {
a.listeners[i] = listeningAlerts{alerts: ch, done: done}
a.mtx.Unlock()
go func() {
defer func() {
a.mtx.Lock()
delete(a.listeners, i)
close(ch)
a.mtx.Unlock()
}()
for _, a := range alerts {
select {
case ch <- a:
case <-done:
return
}
}
<-done
}()
return provider.NewAlertIterator(ch, done, err)
}