diff --git a/manager/aggregator.go b/manager/aggregator.go index 481d7d24..16ae4bcb 100644 --- a/manager/aggregator.go +++ b/manager/aggregator.go @@ -110,7 +110,7 @@ type Aggregator struct { getAggregatesRequests chan *getAggregatesRequest removeAggregateRequests chan EventFingerprint rulesRequests chan *aggregatorResetRulesRequest - closed chan bool + closeRequests chan *closeRequest } func NewAggregator() *Aggregator { @@ -121,18 +121,24 @@ func NewAggregator() *Aggregator { getAggregatesRequests: make(chan *getAggregatesRequest), removeAggregateRequests: make(chan EventFingerprint), rulesRequests: make(chan *aggregatorResetRulesRequest), - closed: make(chan bool), + closeRequests: make(chan *closeRequest), } } func (a *Aggregator) Close() { + req := &closeRequest{ + done: make(chan bool), + } + a.closeRequests <- req + <-req.done +} + +func (a *Aggregator) closeInternal() { close(a.rulesRequests) close(a.aggRequests) close(a.getAggregatesRequests) close(a.removeAggregateRequests) - - <-a.closed - close(a.closed) + close(a.closeRequests) } type aggregateEventsResponse struct { @@ -153,6 +159,10 @@ type getAggregatesRequest struct { Response chan getAggregatesResponse } +type closeRequest struct { + done chan bool +} + func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) { if len(a.Rules) == 0 { req.Response <- &aggregateEventsResponse{ @@ -259,24 +269,14 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) { t := time.NewTicker(time.Second) defer t.Stop() - closed := 0 - - for closed < 2 { + for { select { - case req, open := <-a.aggRequests: + case req := <-a.aggRequests: a.aggregate(req, s) - if !open { - closed++ - } - - case rules, open := <-a.rulesRequests: + case rules := <-a.rulesRequests: a.replaceRules(rules) - if !open { - closed++ - } - case req := <-a.getAggregatesRequests: aggs := a.aggregates() req.Response <- getAggregatesResponse{ @@ -288,8 +288,11 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) { log.Println("Deleting expired aggregation instance", a) a.Aggregates[fp].Close() delete(a.Aggregates, fp) + + case req := <-a.closeRequests: + a.closeInternal() + req.done <- true + return } } - - a.closed <- true }