From 0c3c75edb38e2899f83a13280189a514e2a96ad5 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 26 Jul 2013 02:05:52 +0200 Subject: [PATCH] Change Aggregator from channel-based to mutex-based. This removes >100 lines of boilerplate code in the Aggregator alone. --- main.go | 9 +- manager/aggregator.go | 184 ++++++++----------------------------- manager/aggregator_test.go | 11 +-- 3 files changed, 44 insertions(+), 160 deletions(-) diff --git a/main.go b/main.go index 15e75d65..f215446e 100644 --- a/main.go +++ b/main.go @@ -28,13 +28,10 @@ func main() { suppressor := manager.NewSuppressor() defer suppressor.Close() - log.Println("Starting event aggregator...") - aggregator := manager.NewAggregator() - defer aggregator.Close() - summarizer := manager.NewSummaryDispatcher() - go aggregator.Dispatch(summarizer) - log.Println("Done.") + + aggregator := manager.NewAggregator(summarizer) + defer aggregator.Close() webService := &web.WebService{ AlertManagerService: &api.AlertManagerService{ diff --git a/manager/aggregator.go b/manager/aggregator.go index 22dab706..a15428db 100644 --- a/manager/aggregator.go +++ b/manager/aggregator.go @@ -16,6 +16,7 @@ package manager import ( "errors" "log" + "sync" "time" ) @@ -84,9 +85,6 @@ func (r *AggregationInstance) SendNotification(s SummaryReceiver) { } func (r *AggregationInstance) resendNotificationAfter(d time.Duration, s SummaryReceiver) { - // BUG: we can't just call SendNotification whenever the timer ends without - // any synchronisation. The timer should instead feed into a channel which is - // served by the main Dispatch() loop. r.notificationResendTimer = time.AfterFunc(d, func() { r.SendNotification(s) }) @@ -104,83 +102,45 @@ func (r *AggregationInstance) Close() { type AggregationRules []*AggregationRule type Aggregator struct { - Rules AggregationRules - Aggregates map[EventFingerprint]*AggregationInstance + Rules AggregationRules + Aggregates map[EventFingerprint]*AggregationInstance + SummaryReceiver SummaryReceiver - aggRequests chan *aggregateEventsRequest - getAggregatesRequests chan *getAggregatesRequest - removeAggregateRequests chan EventFingerprint - rulesRequests chan *aggregatorResetRulesRequest - closeRequests chan *closeRequest + // Mutex to protect the above. + mu sync.Mutex } -func NewAggregator() *Aggregator { +func NewAggregator(s SummaryReceiver) *Aggregator { return &Aggregator{ - Aggregates: make(map[EventFingerprint]*AggregationInstance), - - aggRequests: make(chan *aggregateEventsRequest), - getAggregatesRequests: make(chan *getAggregatesRequest), - removeAggregateRequests: make(chan EventFingerprint), - rulesRequests: make(chan *aggregatorResetRulesRequest), - closeRequests: make(chan *closeRequest), + Aggregates: make(map[EventFingerprint]*AggregationInstance), + SummaryReceiver: s, } } func (a *Aggregator) Close() { - req := &closeRequest{ - done: make(chan bool), + a.mu.Lock() + defer a.mu.Unlock() + + for _, agg := range a.Aggregates { + agg.Close() } - a.closeRequests <- req - <-req.done } -func (a *Aggregator) closeInternal() { - close(a.rulesRequests) - close(a.aggRequests) - close(a.getAggregatesRequests) - close(a.removeAggregateRequests) - close(a.closeRequests) -} +func (a *Aggregator) Receive(events Events) error { + a.mu.Lock() + defer a.mu.Unlock() -type aggregateEventsResponse struct { - Err error -} - -type aggregateEventsRequest struct { - Events Events - - Response chan *aggregateEventsResponse -} - -type getAggregatesResponse struct { - Aggregates AggregationInstances -} - -type getAggregatesRequest struct { - Response chan getAggregatesResponse -} - -type closeRequest struct { - done chan bool -} - -func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) { if len(a.Rules) == 0 { - req.Response <- &aggregateEventsResponse{ - Err: errors.New("No aggregation rules"), - } - close(req.Response) - return + return errors.New("No aggregation rules") } - log.Println("aggregating", *req) - for _, event := range req.Events { + for _, e := range events { for _, r := range a.Rules { - if r.Handles(event) { - fp := event.Fingerprint() + if r.Handles(e) { + fp := e.Fingerprint() aggregation, ok := a.Aggregates[fp] if !ok { expTimer := time.AfterFunc(minimumRefreshPeriod, func() { - a.removeAggregateRequests <- fp + a.removeAggregate(fp) }) aggregation = &AggregationInstance{ @@ -192,53 +152,34 @@ func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) { a.Aggregates[fp] = aggregation } - aggregation.Ingest(event) - aggregation.SendNotification(s) + aggregation.Ingest(e) + aggregation.SendNotification(a.SummaryReceiver) break } } } - - req.Response <- new(aggregateEventsResponse) - close(req.Response) + return nil } -type aggregatorResetRulesResponse struct{} +func (a *Aggregator) SetRules(rules AggregationRules) { + a.mu.Lock() + defer a.mu.Unlock() -type aggregatorResetRulesRequest struct { - Rules AggregationRules + log.Println("Replacing", len(rules), "aggregator rules...") - Response chan *aggregatorResetRulesResponse -} - -func (a *Aggregator) replaceRules(r *aggregatorResetRulesRequest) { - log.Println("Replacing", len(r.Rules), "aggregator rules...") - - for _, rule := range r.Rules { + for _, rule := range rules { if rule.RepeatRate < minimumRepeatRate { log.Println("Rule repeat rate too low, setting to minimum value") rule.RepeatRate = minimumRepeatRate } } - a.Rules = r.Rules - - r.Response <- new(aggregatorResetRulesResponse) - close(r.Response) + a.Rules = rules } func (a *Aggregator) AlertAggregates() AggregationInstances { - req := &getAggregatesRequest{ - Response: make(chan getAggregatesResponse), - } + a.mu.Lock() + defer a.mu.Unlock() - a.getAggregatesRequests <- req - - result := <-req.Response - - return result.Aggregates -} - -func (a *Aggregator) aggregates() AggregationInstances { aggs := make(AggregationInstances, 0, len(a.Aggregates)) for _, agg := range a.Aggregates { aggs = append(aggs, agg) @@ -246,58 +187,11 @@ func (a *Aggregator) aggregates() AggregationInstances { return aggs } -func (a *Aggregator) Receive(e Events) error { - req := &aggregateEventsRequest{ - Events: e, - Response: make(chan *aggregateEventsResponse), - } +func (a *Aggregator) removeAggregate(fp EventFingerprint) { + a.mu.Lock() + defer a.mu.Unlock() - a.aggRequests <- req - - result := <-req.Response - - return result.Err -} - -func (a *Aggregator) SetRules(r AggregationRules) error { - req := &aggregatorResetRulesRequest{ - Rules: r, - Response: make(chan *aggregatorResetRulesResponse), - } - - a.rulesRequests <- req - - _ = <-req.Response - - return nil -} - -func (a *Aggregator) Dispatch(s SummaryReceiver) { - for { - select { - case req := <-a.aggRequests: - a.aggregate(req, s) - - case rules := <-a.rulesRequests: - a.replaceRules(rules) - - case req := <-a.getAggregatesRequests: - aggs := a.aggregates() - req.Response <- getAggregatesResponse{ - Aggregates: aggs, - } - close(req.Response) - - case fp := <-a.removeAggregateRequests: - log.Println("Deleting expired aggregation instance", a) - a.Aggregates[fp].Close() - delete(a.Aggregates, fp) - - case req := <-a.closeRequests: - a.closeInternal() - req.done <- true - // BUG: Simply returning here will prevent proper draining. Fix this. - return - } - } + log.Println("Deleting expired aggregation instance", a) + a.Aggregates[fp].Close() + delete(a.Aggregates, fp) } diff --git a/manager/aggregator_test.go b/manager/aggregator_test.go index fbf39a96..b13b9582 100644 --- a/manager/aggregator_test.go +++ b/manager/aggregator_test.go @@ -30,15 +30,8 @@ type testAggregatorScenario struct { } func (s *testAggregatorScenario) test(i int, t *testing.T) { - a := NewAggregator() - go a.Dispatch(&dummyReceiver{}) - - done := make(chan bool) - go func() { - a.SetRules(s.rules) - done <- true - }() - <-done + a := NewAggregator(&dummyReceiver{}) + a.SetRules(s.rules) if len(s.inMatch) > 0 { err := a.Receive(s.inMatch)