Initial interface cleanups.

This commit is contained in:
Matt T. Proud 2013-07-16 23:17:42 +02:00
parent ebbef79014
commit 48d0ba2b9b
4 changed files with 234 additions and 148 deletions

View File

@ -14,9 +14,7 @@
package main
import (
"container/heap"
"log"
"sort"
"time"
)
@ -27,33 +25,17 @@ const (
aggEmitting
)
// AggregationRule creates and manages the scope for received events.
type AggregationRule struct {
Filters *Filters
Filters Filters
// BUG(matt): Unsupported.
RepeatRate time.Duration
fingerprint uint64
}
func NewAggregationRule(filters ...*Filter) *AggregationRule {
f := new(Filters)
heap.Init(f)
for _, filter := range filters {
heap.Push(f, filter)
}
return &AggregationRule{
Filters: f,
fingerprint: f.fingerprint(),
}
}
type AggregationInstance struct {
Rule *AggregationRule
Events Events
// BUG(matt): Unsupported.
EndsAt time.Time
state aggregationState
@ -91,7 +73,7 @@ func (r *AggregationInstance) Tidy() {
r.Events = events
}
func (r *AggregationInstance) Summarize(s chan<- EventSummary) {
func (r *AggregationInstance) Summarize(s SummaryReceiver) {
if r.state != aggIdle {
return
}
@ -101,48 +83,53 @@ func (r *AggregationInstance) Summarize(s chan<- EventSummary) {
r.state = aggEmitting
s <- EventSummary{
s.Receive(&EventSummary{
Rule: r.Rule,
Events: r.Events,
}
})
}
type AggregationRules []*AggregationRule
func (r AggregationRules) Len() int {
return len(r)
}
func (r AggregationRules) Less(i, j int) bool {
return r[i].fingerprint < r[j].fingerprint
}
func (r AggregationRules) Swap(i, j int) {
r[i], r[j] = r[i], r[j]
}
type Aggregator struct {
Rules AggregationRules
Aggregates map[uint64]*AggregationInstance
aggRequests chan *aggregateEventsRequest
rulesRequests chan *aggregatorResetRulesRequest
closed chan bool
}
func NewAggregator() *Aggregator {
return &Aggregator{
Aggregates: make(map[uint64]*AggregationInstance),
aggRequests: make(chan *aggregateEventsRequest),
rulesRequests: make(chan *aggregatorResetRulesRequest),
closed: make(chan bool),
}
}
type AggregateEventsResponse struct {
func (a *Aggregator) Close() {
close(a.rulesRequests)
close(a.aggRequests)
<-a.closed
close(a.closed)
}
type aggregateEventsResponse struct {
Err error
}
type AggregateEventsRequest struct {
type aggregateEventsRequest struct {
Events Events
Response chan *AggregateEventsResponse
Response chan *aggregateEventsResponse
}
func (a *Aggregator) aggregate(r *AggregateEventsRequest, s chan<- EventSummary) {
func (a *Aggregator) aggregate(r *aggregateEventsRequest, s SummaryReceiver) {
log.Println("aggregating", *r)
for _, element := range r.Events {
fp := element.Fingerprint()
@ -165,47 +152,70 @@ func (a *Aggregator) aggregate(r *AggregateEventsRequest, s chan<- EventSummary)
}
}
r.Response <- new(AggregateEventsResponse)
r.Response <- new(aggregateEventsResponse)
close(r.Response)
}
type AggregatorResetRulesResponse struct {
Err error
}
type AggregatorResetRulesRequest struct {
type aggregatorResetRulesResponse struct{}
type aggregatorResetRulesRequest struct {
Rules AggregationRules
Response chan *AggregatorResetRulesResponse
Response chan *aggregatorResetRulesResponse
}
func (a *Aggregator) replaceRules(r *AggregatorResetRulesRequest) {
newRules := AggregationRules{}
for _, rule := range r.Rules {
newRules = append(newRules, rule)
}
sort.Sort(newRules)
func (a *Aggregator) replaceRules(r *aggregatorResetRulesRequest) {
log.Println("Replacing", len(r.Rules), "aggregator rules...")
newRules := make(AggregationRules, len(r.Rules))
copy(newRules, r.Rules)
a.Rules = newRules
r.Response <- new(AggregatorResetRulesResponse)
r.Response <- new(aggregatorResetRulesResponse)
close(r.Response)
}
func (a *Aggregator) Dispatch(reqs <-chan *AggregateEventsRequest, rules <-chan *AggregatorResetRulesRequest, s chan<- EventSummary) {
func (a *Aggregator) Receive(e Events) error {
req := &aggregateEventsRequest{
Events: e,
Response: make(chan *aggregateEventsResponse),
}
a.aggRequests <- req
result := <-req.Response
return result.Err
}
func (a *Aggregator) SetRules(r AggregationRules) error {
req := &aggregatorResetRulesRequest{
Rules: r,
Response: make(chan *aggregatorResetRulesResponse),
}
a.rulesRequests <- req
_ = <-req.Response
return nil
}
func (a *Aggregator) Dispatch(s SummaryReceiver) {
t := time.NewTicker(time.Second)
defer t.Stop()
closed := 0
for closed < 1 {
for closed < 2 {
select {
case req, open := <-reqs:
case req, open := <-a.aggRequests:
a.aggregate(req, s)
if !open {
closed++
}
case rules, open := <-rules:
case rules, open := <-a.rulesRequests:
a.replaceRules(rules)
if !open {
@ -218,4 +228,6 @@ func (a *Aggregator) Dispatch(reqs <-chan *AggregateEventsRequest, rules <-chan
}
}
}
a.closed <- true
}

View File

@ -14,7 +14,6 @@
package main
import (
"log"
"strings"
)
@ -40,23 +39,97 @@ type EventSummary struct {
Destination string
}
type EventSummaries []EventSummary
type SummaryDispatcher struct {
summaryReqs chan *summaryDispatchRequest
type SummaryDispatcher struct{}
func (d *SummaryDispatcher) dispatchSummary(s EventSummary, i chan<- *IsInhibitedRequest) {
log.Println("dispatching summary", s)
r := &IsInhibitedRequest{
Response: make(chan IsInhibitedResponse),
}
i <- r
resp := <-r.Response
log.Println(resp)
closed chan bool
}
func (d *SummaryDispatcher) Dispatch(s <-chan EventSummary, i chan<- *IsInhibitedRequest) {
for summary := range s {
d.dispatchSummary(summary, i)
// fmt.Println("Summary for", summary.Rule, "with", summary.Events, "@", len(summary.Events))
type summaryDispatchRequest struct {
Summary *EventSummary
Response chan *summaryDispatchResponse
}
type Disposition int
const (
UNHANDLED Disposition = iota
DISPATCHED
SUPPRESSED
)
type summaryDispatchResponse struct {
Disposition Disposition
Err RemoteError
}
func (s *SummaryDispatcher) Close() {
close(s.summaryReqs)
<-s.closed
}
func NewSummaryDispatcher() *SummaryDispatcher {
return &SummaryDispatcher{
summaryReqs: make(chan *summaryDispatchRequest),
closed: make(chan bool),
}
}
type RemoteError interface {
error
Retryable() bool
}
type remoteError struct {
error
retryable bool
}
func (e *remoteError) Retryable() bool {
return e.retryable
}
func NewRemoteError(err error, retryable bool) RemoteError {
return &remoteError{
err,
retryable,
}
}
type SummaryReceiver interface {
Receive(*EventSummary) RemoteError
}
func (d *SummaryDispatcher) Receive(s *EventSummary) RemoteError {
req := &summaryDispatchRequest{
Summary: s,
Response: make(chan *summaryDispatchResponse),
}
d.summaryReqs <- req
resp := <-req.Response
return resp.Err
}
func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhibitedInterrogator) {
if i.IsInhibited(r.Summary.Events[0]) {
r.Response <- &summaryDispatchResponse{
Disposition: SUPPRESSED,
}
return
}
// BUG: Perform sending of summaries.
}
func (d *SummaryDispatcher) Dispatch(i IsInhibitedInterrogator) {
for req := range d.summaryReqs {
d.dispatchSummary(req, i)
}
d.closed <- true
}

70
main.go
View File

@ -17,81 +17,45 @@ import (
"log"
)
type Main struct {
SuppressionRequests chan SuppressionRequest
InhibitQueries chan *IsInhibitedRequest
Summaries chan SuppressionSummaryRequest
AggregateEvents chan *AggregateEventsRequest
EventSummary chan EventSummary
Rules chan *AggregatorResetRulesRequest
}
func (m *Main) close() {
close(m.SuppressionRequests)
close(m.InhibitQueries)
close(m.Summaries)
close(m.AggregateEvents)
close(m.EventSummary)
close(m.Rules)
}
func main() {
main := &Main{
SuppressionRequests: make(chan SuppressionRequest),
InhibitQueries: make(chan *IsInhibitedRequest),
Summaries: make(chan SuppressionSummaryRequest),
AggregateEvents: make(chan *AggregateEventsRequest),
EventSummary: make(chan EventSummary),
Rules: make(chan *AggregatorResetRulesRequest),
}
defer main.close()
log.Print("Starting event suppressor...")
suppressor := &Suppressor{
Suppressions: new(Suppressions),
}
go suppressor.Dispatch(main.SuppressionRequests, main.InhibitQueries, main.Summaries)
suppressor := NewSuppressor()
defer suppressor.Close()
go suppressor.Dispatch()
log.Println("Done.")
log.Println("Starting event aggregator...")
aggregator := NewAggregator()
go aggregator.Dispatch(main.AggregateEvents, main.Rules, main.EventSummary)
defer aggregator.Close()
summarizer := new(SummaryDispatcher)
go aggregator.Dispatch(summarizer)
log.Println("Done.")
done := make(chan bool)
go func() {
ar := make(chan *AggregatorResetRulesResponse)
agg := &AggregatorResetRulesRequest{
Rules: AggregationRules{
NewAggregationRule(NewFilter("service", "discovery")),
rules := AggregationRules{
&AggregationRule{
Filters: Filters{NewFilter("service", "discovery")},
},
Response: ar,
}
main.Rules <- agg
log.Println("aggResult", <-ar)
aggregator.SetRules(rules)
r := make(chan *AggregateEventsResponse)
aer := &AggregateEventsRequest{
Events: Events{
&Event{
Payload: map[string]string{
"service": "discovery",
},
events := Events{
&Event{
Payload: map[string]string{
"service": "discovery",
},
},
Response: r,
}
main.AggregateEvents <- aer
log.Println("Response", r)
aggregator.Receive(events)
done <- true
}()
<-done
log.Println("Running summary dispatcher...")
summarizer := new(SummaryDispatcher)
summarizer.Dispatch(main.EventSummary, main.InhibitQueries)
summarizer.Dispatch(suppressor)
}

View File

@ -33,51 +33,64 @@ type Suppression struct {
CreatedAt time.Time
}
type SuppressionRequest struct {
type suppressionRequest struct {
Suppression Suppression
Response chan SuppressionResponse
Response chan *suppressionResponse
}
type SuppressionResponse struct {
type suppressionResponse struct {
Err error
}
type IsInhibitedRequest struct {
Event Event
type isInhibitedRequest struct {
Event *Event
Response chan IsInhibitedResponse
Response chan *isInhibitedResponse
}
type IsInhibitedResponse struct {
type isInhibitedResponse struct {
Err error
Inhibited bool
InhibitingSuppression *Suppression
}
type SuppressionSummaryResponse struct {
type suppressionSummaryResponse struct {
Err error
Suppressions Suppressions
}
type SuppressionSummaryRequest struct {
type suppressionSummaryRequest struct {
MatchCandidates map[string]string
Response chan<- SuppressionSummaryResponse
Response chan *suppressionSummaryResponse
}
type Suppressor struct {
Suppressions *Suppressions
suppressionReqs chan *suppressionRequest
suppressionSummaryReqs chan *suppressionSummaryRequest
isInhibitedReqs chan *isInhibitedRequest
}
type IsInhibitedInterrogator interface {
IsInhibited(*Event) bool
}
func NewSuppressor() *Suppressor {
suppressions := new(Suppressions)
heap.Init(suppressions)
return &Suppressor{
Suppressions: suppressions,
suppressionReqs: make(chan *suppressionRequest),
suppressionSummaryReqs: make(chan *suppressionSummaryRequest),
isInhibitedReqs: make(chan *isInhibitedRequest),
}
}
@ -107,11 +120,12 @@ func (s *Suppressions) Pop() interface{} {
return item
}
func (s *Suppressor) dispatchSuppression(r SuppressionRequest) {
func (s *Suppressor) dispatchSuppression(r *suppressionRequest) {
log.Println("dispatching suppression", r)
heap.Push(s.Suppressions, r.Suppression)
r.Response <- SuppressionResponse{}
r.Response <- &suppressionResponse{}
close(r.Response)
}
func (s *Suppressor) reapSuppressions(t time.Time) {
@ -127,22 +141,36 @@ func (s *Suppressor) reapSuppressions(t time.Time) {
heap.Init(s.Suppressions)
}
func (s *Suppressor) generateSummary(r SuppressionSummaryRequest) {
func (s *Suppressor) generateSummary(r *suppressionSummaryRequest) {
log.Println("Generating summary", r)
response := SuppressionSummaryResponse{}
response := new(suppressionSummaryResponse)
for _, suppression := range *s.Suppressions {
response.Suppressions = append(response.Suppressions, suppression)
}
r.Response <- response
close(r.Response)
}
func (s *Suppressor) queryInhibit(q *IsInhibitedRequest) {
response := IsInhibitedResponse{}
func (s *Suppressor) IsInhibited(e *Event) bool {
req := &isInhibitedRequest{
Event: e,
Response: make(chan *isInhibitedResponse),
}
s.isInhibitedReqs <- req
resp := <-req.Response
return resp.Inhibited
}
func (s *Suppressor) queryInhibit(q *isInhibitedRequest) {
response := new(isInhibitedResponse)
for _, s := range *s.Suppressions {
if s.Filters.Handle(&q.Event) {
if s.Filters.Handle(q.Event) {
response.Inhibited = true
response.InhibitingSuppression = &s
@ -151,9 +179,18 @@ func (s *Suppressor) queryInhibit(q *IsInhibitedRequest) {
}
q.Response <- response
close(q.Response)
}
func (s *Suppressor) Dispatch(suppressions <-chan SuppressionRequest, inhibitQuery <-chan *IsInhibitedRequest, summaries <-chan SuppressionSummaryRequest) {
func (s *Suppressor) Close() {
close(s.suppressionReqs)
close(s.suppressionSummaryReqs)
close(s.isInhibitedReqs)
}
func (s *Suppressor) Dispatch() {
// BUG: Accomplish this more intelligently by creating a timer for the least-
// likely-to-tenure item.
reaper := time.NewTicker(30 * time.Second)
defer reaper.Stop()
@ -161,21 +198,21 @@ func (s *Suppressor) Dispatch(suppressions <-chan SuppressionRequest, inhibitQue
for closed < 2 {
select {
case suppression, open := <-suppressions:
case suppression, open := <-s.suppressionReqs:
s.dispatchSuppression(suppression)
if !open {
closed++
}
case query, open := <-inhibitQuery:
case query, open := <-s.isInhibitedReqs:
s.queryInhibit(query)
if !open {
closed++
}
case summary, open := <-summaries:
case summary, open := <-s.suppressionSummaryReqs:
s.generateSummary(summary)
if !open {