Avoid listener blocking (#1482)

Signed-off-by: wangyue <wangyue@actiontech.com>
This commit is contained in:
wangYue 2018-08-06 19:24:21 +08:00 committed by stuart nelson
parent 6d0edbe630
commit 0fc0ff8e71

View File

@ -29,9 +29,13 @@ type Alerts struct {
marker types.Marker
intervalGC time.Duration
stopGC chan struct{}
listeners map[int]listeningAlerts
next int
}
listeners map[int]chan *types.Alert
next int
type listeningAlerts struct {
alerts chan *types.Alert
done chan struct{}
}
// NewAlerts returns a new alert provider.
@ -41,7 +45,7 @@ func NewAlerts(m types.Marker, intervalGC time.Duration) (*Alerts, error) {
marker: m,
intervalGC: intervalGC,
stopGC: make(chan struct{}),
listeners: map[int]chan *types.Alert{},
listeners: map[int]listeningAlerts{},
next: 0,
}
go a.runGC()
@ -92,7 +96,7 @@ func (a *Alerts) Subscribe() provider.AlertIterator {
a.mtx.Lock()
i := a.next
a.next++
a.listeners[i] = ch
a.listeners[i] = listeningAlerts{alerts: ch, done: done}
a.mtx.Unlock()
go func() {
@ -185,8 +189,11 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
a.alerts[fp] = alert
for _, ch := range a.listeners {
ch <- alert
for _, l := range a.listeners {
select {
case l.alerts <- alert:
case <-l.done:
}
}
}