diff --git a/aggregation.go b/aggregator.go similarity index 59% rename from aggregation.go rename to aggregator.go index f7eb61ec..fb630371 100644 --- a/aggregation.go +++ b/aggregator.go @@ -14,9 +14,7 @@ package main import ( - "container/heap" "log" - "sort" "time" ) @@ -27,33 +25,17 @@ const ( aggEmitting ) +// AggregationRule creates and manages the scope for received events. type AggregationRule struct { - Filters *Filters + Filters Filters - // BUG(matt): Unsupported. RepeatRate time.Duration - - fingerprint uint64 -} - -func NewAggregationRule(filters ...*Filter) *AggregationRule { - f := new(Filters) - heap.Init(f) - for _, filter := range filters { - heap.Push(f, filter) - } - - return &AggregationRule{ - Filters: f, - fingerprint: f.fingerprint(), - } } type AggregationInstance struct { Rule *AggregationRule Events Events - // BUG(matt): Unsupported. EndsAt time.Time state aggregationState @@ -91,7 +73,7 @@ func (r *AggregationInstance) Tidy() { r.Events = events } -func (r *AggregationInstance) Summarize(s chan<- EventSummary) { +func (r *AggregationInstance) Summarize(s SummaryReceiver) { if r.state != aggIdle { return } @@ -101,48 +83,53 @@ func (r *AggregationInstance) Summarize(s chan<- EventSummary) { r.state = aggEmitting - s <- EventSummary{ + s.Receive(&EventSummary{ Rule: r.Rule, Events: r.Events, - } + }) + } type AggregationRules []*AggregationRule -func (r AggregationRules) Len() int { - return len(r) -} - -func (r AggregationRules) Less(i, j int) bool { - return r[i].fingerprint < r[j].fingerprint -} - -func (r AggregationRules) Swap(i, j int) { - r[i], r[j] = r[i], r[j] -} - type Aggregator struct { Rules AggregationRules Aggregates map[uint64]*AggregationInstance + + aggRequests chan *aggregateEventsRequest + rulesRequests chan *aggregatorResetRulesRequest + closed chan bool } func NewAggregator() *Aggregator { return &Aggregator{ Aggregates: make(map[uint64]*AggregationInstance), + + aggRequests: make(chan *aggregateEventsRequest), + rulesRequests: make(chan *aggregatorResetRulesRequest), + closed: make(chan bool), } } -type AggregateEventsResponse struct { +func (a *Aggregator) Close() { + close(a.rulesRequests) + close(a.aggRequests) + + <-a.closed + close(a.closed) +} + +type aggregateEventsResponse struct { Err error } -type AggregateEventsRequest struct { +type aggregateEventsRequest struct { Events Events - Response chan *AggregateEventsResponse + Response chan *aggregateEventsResponse } -func (a *Aggregator) aggregate(r *AggregateEventsRequest, s chan<- EventSummary) { +func (a *Aggregator) aggregate(r *aggregateEventsRequest, s SummaryReceiver) { log.Println("aggregating", *r) for _, element := range r.Events { fp := element.Fingerprint() @@ -165,47 +152,70 @@ func (a *Aggregator) aggregate(r *AggregateEventsRequest, s chan<- EventSummary) } } - r.Response <- new(AggregateEventsResponse) + r.Response <- new(aggregateEventsResponse) + close(r.Response) } -type AggregatorResetRulesResponse struct { - Err error -} -type AggregatorResetRulesRequest struct { +type aggregatorResetRulesResponse struct{} + +type aggregatorResetRulesRequest struct { Rules AggregationRules - Response chan *AggregatorResetRulesResponse + Response chan *aggregatorResetRulesResponse } -func (a *Aggregator) replaceRules(r *AggregatorResetRulesRequest) { - newRules := AggregationRules{} - for _, rule := range r.Rules { - newRules = append(newRules, rule) - } - - sort.Sort(newRules) - +func (a *Aggregator) replaceRules(r *aggregatorResetRulesRequest) { + log.Println("Replacing", len(r.Rules), "aggregator rules...") + newRules := make(AggregationRules, len(r.Rules)) + copy(newRules, r.Rules) a.Rules = newRules - r.Response <- new(AggregatorResetRulesResponse) + r.Response <- new(aggregatorResetRulesResponse) + close(r.Response) } -func (a *Aggregator) Dispatch(reqs <-chan *AggregateEventsRequest, rules <-chan *AggregatorResetRulesRequest, s chan<- EventSummary) { +func (a *Aggregator) Receive(e Events) error { + req := &aggregateEventsRequest{ + Events: e, + Response: make(chan *aggregateEventsResponse), + } + + a.aggRequests <- req + + result := <-req.Response + + return result.Err +} + +func (a *Aggregator) SetRules(r AggregationRules) error { + req := &aggregatorResetRulesRequest{ + Rules: r, + Response: make(chan *aggregatorResetRulesResponse), + } + + a.rulesRequests <- req + + _ = <-req.Response + + return nil +} + +func (a *Aggregator) Dispatch(s SummaryReceiver) { t := time.NewTicker(time.Second) defer t.Stop() closed := 0 - for closed < 1 { + for closed < 2 { select { - case req, open := <-reqs: + case req, open := <-a.aggRequests: a.aggregate(req, s) if !open { closed++ } - case rules, open := <-rules: + case rules, open := <-a.rulesRequests: a.replaceRules(rules) if !open { @@ -218,4 +228,6 @@ func (a *Aggregator) Dispatch(reqs <-chan *AggregateEventsRequest, rules <-chan } } } + + a.closed <- true } diff --git a/dispatcher.go b/dispatcher.go index babc3064..63ceedad 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -14,7 +14,6 @@ package main import ( - "log" "strings" ) @@ -40,23 +39,97 @@ type EventSummary struct { Destination string } -type EventSummaries []EventSummary +type SummaryDispatcher struct { + summaryReqs chan *summaryDispatchRequest -type SummaryDispatcher struct{} - -func (d *SummaryDispatcher) dispatchSummary(s EventSummary, i chan<- *IsInhibitedRequest) { - log.Println("dispatching summary", s) - r := &IsInhibitedRequest{ - Response: make(chan IsInhibitedResponse), - } - i <- r - resp := <-r.Response - log.Println(resp) + closed chan bool } -func (d *SummaryDispatcher) Dispatch(s <-chan EventSummary, i chan<- *IsInhibitedRequest) { - for summary := range s { - d.dispatchSummary(summary, i) - // fmt.Println("Summary for", summary.Rule, "with", summary.Events, "@", len(summary.Events)) +type summaryDispatchRequest struct { + Summary *EventSummary + + Response chan *summaryDispatchResponse +} + +type Disposition int + +const ( + UNHANDLED Disposition = iota + DISPATCHED + SUPPRESSED +) + +type summaryDispatchResponse struct { + Disposition Disposition + Err RemoteError +} + +func (s *SummaryDispatcher) Close() { + close(s.summaryReqs) + <-s.closed +} + +func NewSummaryDispatcher() *SummaryDispatcher { + return &SummaryDispatcher{ + summaryReqs: make(chan *summaryDispatchRequest), + closed: make(chan bool), } } + +type RemoteError interface { + error + + Retryable() bool +} + +type remoteError struct { + error + + retryable bool +} + +func (e *remoteError) Retryable() bool { + return e.retryable +} + +func NewRemoteError(err error, retryable bool) RemoteError { + return &remoteError{ + err, + retryable, + } +} + +type SummaryReceiver interface { + Receive(*EventSummary) RemoteError +} + +func (d *SummaryDispatcher) Receive(s *EventSummary) RemoteError { + req := &summaryDispatchRequest{ + Summary: s, + Response: make(chan *summaryDispatchResponse), + } + + d.summaryReqs <- req + resp := <-req.Response + + return resp.Err +} + +func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhibitedInterrogator) { + if i.IsInhibited(r.Summary.Events[0]) { + r.Response <- &summaryDispatchResponse{ + Disposition: SUPPRESSED, + } + return + } + + // BUG: Perform sending of summaries. +} + +func (d *SummaryDispatcher) Dispatch(i IsInhibitedInterrogator) { + for req := range d.summaryReqs { + d.dispatchSummary(req, i) + } + + d.closed <- true +} diff --git a/main.go b/main.go index 8d0ffad2..fd1ba7af 100644 --- a/main.go +++ b/main.go @@ -17,81 +17,45 @@ import ( "log" ) -type Main struct { - SuppressionRequests chan SuppressionRequest - InhibitQueries chan *IsInhibitedRequest - Summaries chan SuppressionSummaryRequest - AggregateEvents chan *AggregateEventsRequest - EventSummary chan EventSummary - Rules chan *AggregatorResetRulesRequest -} - -func (m *Main) close() { - close(m.SuppressionRequests) - close(m.InhibitQueries) - close(m.Summaries) - close(m.AggregateEvents) - close(m.EventSummary) - close(m.Rules) -} - func main() { - main := &Main{ - SuppressionRequests: make(chan SuppressionRequest), - InhibitQueries: make(chan *IsInhibitedRequest), - Summaries: make(chan SuppressionSummaryRequest), - AggregateEvents: make(chan *AggregateEventsRequest), - EventSummary: make(chan EventSummary), - Rules: make(chan *AggregatorResetRulesRequest), - } - defer main.close() - log.Print("Starting event suppressor...") - suppressor := &Suppressor{ - Suppressions: new(Suppressions), - } - go suppressor.Dispatch(main.SuppressionRequests, main.InhibitQueries, main.Summaries) + suppressor := NewSuppressor() + defer suppressor.Close() + go suppressor.Dispatch() log.Println("Done.") log.Println("Starting event aggregator...") aggregator := NewAggregator() - go aggregator.Dispatch(main.AggregateEvents, main.Rules, main.EventSummary) + defer aggregator.Close() + + summarizer := new(SummaryDispatcher) + go aggregator.Dispatch(summarizer) log.Println("Done.") done := make(chan bool) go func() { - ar := make(chan *AggregatorResetRulesResponse) - agg := &AggregatorResetRulesRequest{ - Rules: AggregationRules{ - NewAggregationRule(NewFilter("service", "discovery")), + rules := AggregationRules{ + &AggregationRule{ + Filters: Filters{NewFilter("service", "discovery")}, }, - Response: ar, } - main.Rules <- agg - log.Println("aggResult", <-ar) + aggregator.SetRules(rules) - r := make(chan *AggregateEventsResponse) - aer := &AggregateEventsRequest{ - Events: Events{ - &Event{ - Payload: map[string]string{ - "service": "discovery", - }, + events := Events{ + &Event{ + Payload: map[string]string{ + "service": "discovery", }, }, - Response: r, } - main.AggregateEvents <- aer - - log.Println("Response", r) + aggregator.Receive(events) done <- true }() <-done log.Println("Running summary dispatcher...") - summarizer := new(SummaryDispatcher) - summarizer.Dispatch(main.EventSummary, main.InhibitQueries) + summarizer.Dispatch(suppressor) } diff --git a/suppression.go b/suppressor.go similarity index 60% rename from suppression.go rename to suppressor.go index efe70d83..5bd5140b 100644 --- a/suppression.go +++ b/suppressor.go @@ -33,51 +33,64 @@ type Suppression struct { CreatedAt time.Time } -type SuppressionRequest struct { +type suppressionRequest struct { Suppression Suppression - Response chan SuppressionResponse + Response chan *suppressionResponse } -type SuppressionResponse struct { +type suppressionResponse struct { Err error } -type IsInhibitedRequest struct { - Event Event +type isInhibitedRequest struct { + Event *Event - Response chan IsInhibitedResponse + Response chan *isInhibitedResponse } -type IsInhibitedResponse struct { +type isInhibitedResponse struct { Err error Inhibited bool InhibitingSuppression *Suppression } -type SuppressionSummaryResponse struct { +type suppressionSummaryResponse struct { Err error Suppressions Suppressions } -type SuppressionSummaryRequest struct { +type suppressionSummaryRequest struct { MatchCandidates map[string]string - Response chan<- SuppressionSummaryResponse + Response chan *suppressionSummaryResponse } type Suppressor struct { Suppressions *Suppressions + + suppressionReqs chan *suppressionRequest + suppressionSummaryReqs chan *suppressionSummaryRequest + isInhibitedReqs chan *isInhibitedRequest +} + +type IsInhibitedInterrogator interface { + IsInhibited(*Event) bool } func NewSuppressor() *Suppressor { suppressions := new(Suppressions) + heap.Init(suppressions) return &Suppressor{ Suppressions: suppressions, + + suppressionReqs: make(chan *suppressionRequest), + suppressionSummaryReqs: make(chan *suppressionSummaryRequest), + isInhibitedReqs: make(chan *isInhibitedRequest), } } @@ -107,11 +120,12 @@ func (s *Suppressions) Pop() interface{} { return item } -func (s *Suppressor) dispatchSuppression(r SuppressionRequest) { +func (s *Suppressor) dispatchSuppression(r *suppressionRequest) { log.Println("dispatching suppression", r) heap.Push(s.Suppressions, r.Suppression) - r.Response <- SuppressionResponse{} + r.Response <- &suppressionResponse{} + close(r.Response) } func (s *Suppressor) reapSuppressions(t time.Time) { @@ -127,22 +141,36 @@ func (s *Suppressor) reapSuppressions(t time.Time) { heap.Init(s.Suppressions) } -func (s *Suppressor) generateSummary(r SuppressionSummaryRequest) { +func (s *Suppressor) generateSummary(r *suppressionSummaryRequest) { log.Println("Generating summary", r) - response := SuppressionSummaryResponse{} + response := new(suppressionSummaryResponse) for _, suppression := range *s.Suppressions { response.Suppressions = append(response.Suppressions, suppression) } r.Response <- response + close(r.Response) } -func (s *Suppressor) queryInhibit(q *IsInhibitedRequest) { - response := IsInhibitedResponse{} +func (s *Suppressor) IsInhibited(e *Event) bool { + req := &isInhibitedRequest{ + Event: e, + Response: make(chan *isInhibitedResponse), + } + + s.isInhibitedReqs <- req + + resp := <-req.Response + + return resp.Inhibited +} + +func (s *Suppressor) queryInhibit(q *isInhibitedRequest) { + response := new(isInhibitedResponse) for _, s := range *s.Suppressions { - if s.Filters.Handle(&q.Event) { + if s.Filters.Handle(q.Event) { response.Inhibited = true response.InhibitingSuppression = &s @@ -151,9 +179,18 @@ func (s *Suppressor) queryInhibit(q *IsInhibitedRequest) { } q.Response <- response + close(q.Response) } -func (s *Suppressor) Dispatch(suppressions <-chan SuppressionRequest, inhibitQuery <-chan *IsInhibitedRequest, summaries <-chan SuppressionSummaryRequest) { +func (s *Suppressor) Close() { + close(s.suppressionReqs) + close(s.suppressionSummaryReqs) + close(s.isInhibitedReqs) +} + +func (s *Suppressor) Dispatch() { + // BUG: Accomplish this more intelligently by creating a timer for the least- + // likely-to-tenure item. reaper := time.NewTicker(30 * time.Second) defer reaper.Stop() @@ -161,21 +198,21 @@ func (s *Suppressor) Dispatch(suppressions <-chan SuppressionRequest, inhibitQue for closed < 2 { select { - case suppression, open := <-suppressions: + case suppression, open := <-s.suppressionReqs: s.dispatchSuppression(suppression) if !open { closed++ } - case query, open := <-inhibitQuery: + case query, open := <-s.isInhibitedReqs: s.queryInhibit(query) if !open { closed++ } - case summary, open := <-summaries: + case summary, open := <-s.suppressionSummaryReqs: s.generateSummary(summary) if !open {