provider/mem: cleanup closed listener in GC

... rather than in the Subscribe method. Currently the cleanup for a
given Alert subscription is done in a blocking goroutine, started in
the Subscribe method.

This simplifies it by moving the cleanup to the GC.

Additionally it simplifies the subscribe method by setting up the
buffered channel big enough to fill it up with all pending alerts
preventing the necessity to start a goroutine in Subscribe at all.

Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>
This commit is contained in:
Sergiusz Urbaniak 2018-08-13 08:14:19 +02:00
parent ec263489e9
commit f9896e0162
No known key found for this signature in database
GPG Key ID: 3F7273C2B5DA406E

View File

@ -75,6 +75,16 @@ func (a *Alerts) runGC() {
}
}
for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
}
a.mtx.Unlock()
}
}
@ -85,15 +95,27 @@ func (a *Alerts) Close() error {
return nil
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
// Subscribe returns an iterator over active alerts that have not been
// resolved and successfully notified about.
// They are not guaranteed to be in chronological order.
func (a *Alerts) Subscribe() provider.AlertIterator {
alerts, err := a.getPending()
var (
ch = make(chan *types.Alert, alertChannelLength)
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
done = make(chan struct{})
)
alerts, err := a.getPending()
for _, a := range alerts {
ch <- a
}
a.mtx.Lock()
i := a.next
@ -101,25 +123,6 @@ func (a *Alerts) Subscribe() provider.AlertIterator {
a.listeners[i] = listeningAlerts{alerts: ch, done: done}
a.mtx.Unlock()
go func() {
defer func() {
a.mtx.Lock()
delete(a.listeners, i)
close(ch)
a.mtx.Unlock()
}()
for _, a := range alerts {
select {
case ch <- a:
case <-done:
return
}
}
<-done
}()
return provider.NewAlertIterator(ch, done, err)
}