Synchronize Close(), fix race conditions.

Close() was not synced through the main dispatcher loop, so it could close
channels that were currently being written to by methods called from said
dispatcher loop. This leads to a crash. Instead, Close() now writes a
closeRequest, which is handled in the dispatcher.
This commit is contained in:
Julius Volz 2013-07-19 13:29:52 +02:00
parent f362b04f61
commit 648a79a3e1
1 changed files with 23 additions and 20 deletions

View File

@ -110,7 +110,7 @@ type Aggregator struct {
getAggregatesRequests chan *getAggregatesRequest
removeAggregateRequests chan EventFingerprint
rulesRequests chan *aggregatorResetRulesRequest
closed chan bool
closeRequests chan *closeRequest
}
func NewAggregator() *Aggregator {
@ -121,18 +121,24 @@ func NewAggregator() *Aggregator {
getAggregatesRequests: make(chan *getAggregatesRequest),
removeAggregateRequests: make(chan EventFingerprint),
rulesRequests: make(chan *aggregatorResetRulesRequest),
closed: make(chan bool),
closeRequests: make(chan *closeRequest),
}
}
func (a *Aggregator) Close() {
req := &closeRequest{
done: make(chan bool),
}
a.closeRequests <- req
<-req.done
}
func (a *Aggregator) closeInternal() {
close(a.rulesRequests)
close(a.aggRequests)
close(a.getAggregatesRequests)
close(a.removeAggregateRequests)
<-a.closed
close(a.closed)
close(a.closeRequests)
}
type aggregateEventsResponse struct {
@ -153,6 +159,10 @@ type getAggregatesRequest struct {
Response chan getAggregatesResponse
}
type closeRequest struct {
done chan bool
}
func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) {
if len(a.Rules) == 0 {
req.Response <- &aggregateEventsResponse{
@ -259,24 +269,14 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) {
t := time.NewTicker(time.Second)
defer t.Stop()
closed := 0
for closed < 2 {
for {
select {
case req, open := <-a.aggRequests:
case req := <-a.aggRequests:
a.aggregate(req, s)
if !open {
closed++
}
case rules, open := <-a.rulesRequests:
case rules := <-a.rulesRequests:
a.replaceRules(rules)
if !open {
closed++
}
case req := <-a.getAggregatesRequests:
aggs := a.aggregates()
req.Response <- getAggregatesResponse{
@ -288,8 +288,11 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) {
log.Println("Deleting expired aggregation instance", a)
a.Aggregates[fp].Close()
delete(a.Aggregates, fp)
case req := <-a.closeRequests:
a.closeInternal()
req.done <- true
return
}
}
a.closed <- true
}