Change Aggregator from channel-based to mutex-based.

This removes >100 lines of boilerplate code in the Aggregator alone.
This commit is contained in:
Julius Volz 2013-07-26 02:05:52 +02:00
parent 00efa4a4a5
commit 0c3c75edb3
3 changed files with 44 additions and 160 deletions

View File

@ -28,13 +28,10 @@ func main() {
suppressor := manager.NewSuppressor()
defer suppressor.Close()
log.Println("Starting event aggregator...")
aggregator := manager.NewAggregator()
defer aggregator.Close()
summarizer := manager.NewSummaryDispatcher()
go aggregator.Dispatch(summarizer)
log.Println("Done.")
aggregator := manager.NewAggregator(summarizer)
defer aggregator.Close()
webService := &web.WebService{
AlertManagerService: &api.AlertManagerService{

View File

@ -16,6 +16,7 @@ package manager
import (
"errors"
"log"
"sync"
"time"
)
@ -84,9 +85,6 @@ func (r *AggregationInstance) SendNotification(s SummaryReceiver) {
}
func (r *AggregationInstance) resendNotificationAfter(d time.Duration, s SummaryReceiver) {
// BUG: we can't just call SendNotification whenever the timer ends without
// any synchronisation. The timer should instead feed into a channel which is
// served by the main Dispatch() loop.
r.notificationResendTimer = time.AfterFunc(d, func() {
r.SendNotification(s)
})
@ -104,83 +102,45 @@ func (r *AggregationInstance) Close() {
type AggregationRules []*AggregationRule
type Aggregator struct {
Rules AggregationRules
Aggregates map[EventFingerprint]*AggregationInstance
Rules AggregationRules
Aggregates map[EventFingerprint]*AggregationInstance
SummaryReceiver SummaryReceiver
aggRequests chan *aggregateEventsRequest
getAggregatesRequests chan *getAggregatesRequest
removeAggregateRequests chan EventFingerprint
rulesRequests chan *aggregatorResetRulesRequest
closeRequests chan *closeRequest
// Mutex to protect the above.
mu sync.Mutex
}
func NewAggregator() *Aggregator {
func NewAggregator(s SummaryReceiver) *Aggregator {
return &Aggregator{
Aggregates: make(map[EventFingerprint]*AggregationInstance),
aggRequests: make(chan *aggregateEventsRequest),
getAggregatesRequests: make(chan *getAggregatesRequest),
removeAggregateRequests: make(chan EventFingerprint),
rulesRequests: make(chan *aggregatorResetRulesRequest),
closeRequests: make(chan *closeRequest),
Aggregates: make(map[EventFingerprint]*AggregationInstance),
SummaryReceiver: s,
}
}
func (a *Aggregator) Close() {
req := &closeRequest{
done: make(chan bool),
a.mu.Lock()
defer a.mu.Unlock()
for _, agg := range a.Aggregates {
agg.Close()
}
a.closeRequests <- req
<-req.done
}
func (a *Aggregator) closeInternal() {
close(a.rulesRequests)
close(a.aggRequests)
close(a.getAggregatesRequests)
close(a.removeAggregateRequests)
close(a.closeRequests)
}
func (a *Aggregator) Receive(events Events) error {
a.mu.Lock()
defer a.mu.Unlock()
type aggregateEventsResponse struct {
Err error
}
type aggregateEventsRequest struct {
Events Events
Response chan *aggregateEventsResponse
}
type getAggregatesResponse struct {
Aggregates AggregationInstances
}
type getAggregatesRequest struct {
Response chan getAggregatesResponse
}
type closeRequest struct {
done chan bool
}
func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) {
if len(a.Rules) == 0 {
req.Response <- &aggregateEventsResponse{
Err: errors.New("No aggregation rules"),
}
close(req.Response)
return
return errors.New("No aggregation rules")
}
log.Println("aggregating", *req)
for _, event := range req.Events {
for _, e := range events {
for _, r := range a.Rules {
if r.Handles(event) {
fp := event.Fingerprint()
if r.Handles(e) {
fp := e.Fingerprint()
aggregation, ok := a.Aggregates[fp]
if !ok {
expTimer := time.AfterFunc(minimumRefreshPeriod, func() {
a.removeAggregateRequests <- fp
a.removeAggregate(fp)
})
aggregation = &AggregationInstance{
@ -192,53 +152,34 @@ func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) {
a.Aggregates[fp] = aggregation
}
aggregation.Ingest(event)
aggregation.SendNotification(s)
aggregation.Ingest(e)
aggregation.SendNotification(a.SummaryReceiver)
break
}
}
}
req.Response <- new(aggregateEventsResponse)
close(req.Response)
return nil
}
type aggregatorResetRulesResponse struct{}
func (a *Aggregator) SetRules(rules AggregationRules) {
a.mu.Lock()
defer a.mu.Unlock()
type aggregatorResetRulesRequest struct {
Rules AggregationRules
log.Println("Replacing", len(rules), "aggregator rules...")
Response chan *aggregatorResetRulesResponse
}
func (a *Aggregator) replaceRules(r *aggregatorResetRulesRequest) {
log.Println("Replacing", len(r.Rules), "aggregator rules...")
for _, rule := range r.Rules {
for _, rule := range rules {
if rule.RepeatRate < minimumRepeatRate {
log.Println("Rule repeat rate too low, setting to minimum value")
rule.RepeatRate = minimumRepeatRate
}
}
a.Rules = r.Rules
r.Response <- new(aggregatorResetRulesResponse)
close(r.Response)
a.Rules = rules
}
func (a *Aggregator) AlertAggregates() AggregationInstances {
req := &getAggregatesRequest{
Response: make(chan getAggregatesResponse),
}
a.mu.Lock()
defer a.mu.Unlock()
a.getAggregatesRequests <- req
result := <-req.Response
return result.Aggregates
}
func (a *Aggregator) aggregates() AggregationInstances {
aggs := make(AggregationInstances, 0, len(a.Aggregates))
for _, agg := range a.Aggregates {
aggs = append(aggs, agg)
@ -246,58 +187,11 @@ func (a *Aggregator) aggregates() AggregationInstances {
return aggs
}
func (a *Aggregator) Receive(e Events) error {
req := &aggregateEventsRequest{
Events: e,
Response: make(chan *aggregateEventsResponse),
}
func (a *Aggregator) removeAggregate(fp EventFingerprint) {
a.mu.Lock()
defer a.mu.Unlock()
a.aggRequests <- req
result := <-req.Response
return result.Err
}
func (a *Aggregator) SetRules(r AggregationRules) error {
req := &aggregatorResetRulesRequest{
Rules: r,
Response: make(chan *aggregatorResetRulesResponse),
}
a.rulesRequests <- req
_ = <-req.Response
return nil
}
func (a *Aggregator) Dispatch(s SummaryReceiver) {
for {
select {
case req := <-a.aggRequests:
a.aggregate(req, s)
case rules := <-a.rulesRequests:
a.replaceRules(rules)
case req := <-a.getAggregatesRequests:
aggs := a.aggregates()
req.Response <- getAggregatesResponse{
Aggregates: aggs,
}
close(req.Response)
case fp := <-a.removeAggregateRequests:
log.Println("Deleting expired aggregation instance", a)
a.Aggregates[fp].Close()
delete(a.Aggregates, fp)
case req := <-a.closeRequests:
a.closeInternal()
req.done <- true
// BUG: Simply returning here will prevent proper draining. Fix this.
return
}
}
log.Println("Deleting expired aggregation instance", a)
a.Aggregates[fp].Close()
delete(a.Aggregates, fp)
}

View File

@ -30,15 +30,8 @@ type testAggregatorScenario struct {
}
func (s *testAggregatorScenario) test(i int, t *testing.T) {
a := NewAggregator()
go a.Dispatch(&dummyReceiver{})
done := make(chan bool)
go func() {
a.SetRules(s.rules)
done <- true
}()
<-done
a := NewAggregator(&dummyReceiver{})
a.SetRules(s.rules)
if len(s.inMatch) > 0 {
err := a.Receive(s.inMatch)