From cf783971072e8b8d5da8441148f5b81e63dc6abe Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 18 Jul 2013 14:49:37 +0200 Subject: [PATCH] Change model to be more state- and less event-focussed. --- main.go | 22 ++++- manager/aggregator.go | 172 +++++++++++++++++++++++--------------- manager/dispatcher.go | 7 +- manager/event.go | 17 ++-- web/alerts.go | 14 +++- web/silences.go | 2 +- web/status.go | 2 +- web/templates/alerts.html | 27 +++--- web/web.go | 6 +- 9 files changed, 165 insertions(+), 104 deletions(-) diff --git a/main.go b/main.go index 70f4aef0..10d84789 100644 --- a/main.go +++ b/main.go @@ -32,7 +32,7 @@ func main() { aggregator := manager.NewAggregator() defer aggregator.Close() - summarizer := new(manager.SummaryDispatcher) + summarizer := manager.NewSummaryDispatcher() go aggregator.Dispatch(summarizer) log.Println("Done.") @@ -40,10 +40,28 @@ func main() { AlertManagerService: &api.AlertManagerService{ Aggregator: aggregator, }, - AlertsHandler: nil, + AlertsHandler: &web.AlertsHandler{ + Aggregator: aggregator, + }, } go webService.ServeForever() + // BEGIN EXAMPLE CODE - replace with config loading later. + done := make(chan bool) + go func() { + rules := manager.AggregationRules{ + &manager.AggregationRule{ + Filters: manager.Filters{manager.NewFilter("service", "discovery")}, + }, + } + + aggregator.SetRules(rules) + + done <- true + }() + <-done + // END EXAMPLE CODE + log.Println("Running summary dispatcher...") summarizer.Dispatch(suppressor) } diff --git a/manager/aggregator.go b/manager/aggregator.go index 2772ae34..481d7d24 100644 --- a/manager/aggregator.go +++ b/manager/aggregator.go @@ -19,11 +19,9 @@ import ( "time" ) -type aggDispatchState int - const ( - aggUnsent aggDispatchState = iota - aggSent + minimumRefreshPeriod = 5 * time.Minute + notificationRetryPeriod = 1 * time.Minute ) // AggregationRule creates and manages the scope for received events. @@ -33,13 +31,25 @@ type AggregationRule struct { RepeatRate time.Duration } +type AggregationInstances []*AggregationInstance + type AggregationInstance struct { - Rule *AggregationRule - Events Events + Rule *AggregationRule + Event *Event - EndsAt time.Time + // When was this AggregationInstance created? + Created time.Time + // When was the last refresh received into this AggregationInstance? + LastRefreshed time.Time - state aggDispatchState + // When was the last successful notification sent out for this + // AggregationInstance? + lastNotificationSent time.Time + // Timer used to trigger a notification retry/resend. + notificationResendTimer *time.Timer + // Timer used to trigger the deletion of the AggregationInstance after it + // hasn't been refreshed for too long. + expiryTimer *time.Timer } func (r *AggregationRule) Handles(e *Event) bool { @@ -47,90 +57,79 @@ func (r *AggregationRule) Handles(e *Event) bool { } func (r *AggregationInstance) Ingest(e *Event) { - r.Events = append(r.Events, e) -} + r.Event = e + r.LastRefreshed = time.Now() -func (r *AggregationInstance) Tidy() { - // BUG(matt): Drop this in favor of having the entire AggregationInstance - // being dropped when too old. - log.Println("Tidying...") - if len(r.Events) == 0 { - return - } - - events := Events{} - - t := time.Now() - for _, e := range r.Events { - if t.Before(e.CreatedAt) { - events = append(events, e) - } - } - - if len(events) == 0 { - r.state = aggSent - } - - r.Events = events + r.expiryTimer.Reset(minimumRefreshPeriod) } func (r *AggregationInstance) SendNotification(s SummaryReceiver) { - if r.state == aggSent { + if time.Since(r.lastNotificationSent) < r.Rule.RepeatRate { return } err := s.Receive(&EventSummary{ - Rule: r.Rule, - Events: r.Events, + Rule: r.Rule, + Event: r.Event, }) if err != nil { - if err.Retryable() { - return - } - log.Println("Unretryable error while sending notification:", err) + log.Printf("Error while sending notification: %s, retrying in %v", err, notificationRetryPeriod) + r.resendNotificationAfter(notificationRetryPeriod, s) + return } - r.state = aggSent + r.resendNotificationAfter(r.Rule.RepeatRate, s) + r.lastNotificationSent = time.Now() +} + +func (r *AggregationInstance) resendNotificationAfter(d time.Duration, s SummaryReceiver) { + // BUG: we can't just call SendNotification whenever the timer ends without + // any synchronisation. The timer should instead feed into a channel which is + // served by the main Dispatch() loop. + r.notificationResendTimer = time.AfterFunc(d, func() { + r.SendNotification(s) + }) +} + +func (r *AggregationInstance) Close() { + if r.notificationResendTimer != nil { + r.notificationResendTimer.Stop() + } + if r.expiryTimer != nil { + r.expiryTimer.Stop() + } } type AggregationRules []*AggregationRule type Aggregator struct { Rules AggregationRules - // Used for O(1) lookup and removal of aggregations when new ones come into the system. - Aggregates map[uint64]*AggregationInstance - // TODO: Add priority queue sorted by expiration time.Time (newest, oldest). - // When a new element comes into this queue and the last head is not equal to - // current head, cancel the existing internal timer and create a new timer for - // expiry.Sub(time.Now) and have that (<- chan time.Time) funnel into the - // event into the dispatch loop where the present tidy call is made. Delete - // tidy, and just shift the head element of the priority queue off and remove - // it from the O(1) membership index above. + Aggregates map[EventFingerprint]*AggregationInstance - // TODO?: Build a new priority queue type that uses an internal wrapper container for - // the AggregationInstance it decorates to note the last dispatch time. The - // queue uses higher-level add and remove methods. - - // SHORTFALL: Needing to garbage collect aggregations across three containers? - - aggRequests chan *aggregateEventsRequest - rulesRequests chan *aggregatorResetRulesRequest - closed chan bool + aggRequests chan *aggregateEventsRequest + getAggregatesRequests chan *getAggregatesRequest + removeAggregateRequests chan EventFingerprint + rulesRequests chan *aggregatorResetRulesRequest + closed chan bool } func NewAggregator() *Aggregator { return &Aggregator{ - Aggregates: make(map[uint64]*AggregationInstance), + Aggregates: make(map[EventFingerprint]*AggregationInstance), - aggRequests: make(chan *aggregateEventsRequest), - rulesRequests: make(chan *aggregatorResetRulesRequest), - closed: make(chan bool), + aggRequests: make(chan *aggregateEventsRequest), + getAggregatesRequests: make(chan *getAggregatesRequest), + removeAggregateRequests: make(chan EventFingerprint), + rulesRequests: make(chan *aggregatorResetRulesRequest), + closed: make(chan bool), } } func (a *Aggregator) Close() { close(a.rulesRequests) close(a.aggRequests) + close(a.getAggregatesRequests) + close(a.removeAggregateRequests) <-a.closed close(a.closed) @@ -146,6 +145,14 @@ type aggregateEventsRequest struct { Response chan *aggregateEventsResponse } +type getAggregatesResponse struct { + Aggregates AggregationInstances +} + +type getAggregatesRequest struct { + Response chan getAggregatesResponse +} + func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) { if len(a.Rules) == 0 { req.Response <- &aggregateEventsResponse{ @@ -162,8 +169,14 @@ func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) { fp := element.Fingerprint() aggregation, ok := a.Aggregates[fp] if !ok { + expTimer := time.AfterFunc(minimumRefreshPeriod, func() { + a.removeAggregateRequests <- fp + }) + aggregation = &AggregationInstance{ - Rule: r, + Rule: r, + Created: time.Now(), + expiryTimer: expTimer, } a.Aggregates[fp] = aggregation @@ -196,6 +209,26 @@ func (a *Aggregator) replaceRules(r *aggregatorResetRulesRequest) { close(r.Response) } +func (a *Aggregator) AlertAggregates() AggregationInstances { + req := &getAggregatesRequest{ + Response: make(chan getAggregatesResponse), + } + + a.getAggregatesRequests <- req + + result := <-req.Response + + return result.Aggregates +} + +func (a *Aggregator) aggregates() AggregationInstances { + aggs := make(AggregationInstances, 0, len(a.Aggregates)) + for _, agg := range a.Aggregates { + aggs = append(aggs, agg) + } + return aggs +} + func (a *Aggregator) Receive(e Events) error { req := &aggregateEventsRequest{ Events: e, @@ -244,10 +277,17 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) { closed++ } - case <-t.C: - for _, a := range a.Aggregates { - a.Tidy() + case req := <-a.getAggregatesRequests: + aggs := a.aggregates() + req.Response <- getAggregatesResponse{ + Aggregates: aggs, } + close(req.Response) + + case fp := <-a.removeAggregateRequests: + log.Println("Deleting expired aggregation instance", a) + a.Aggregates[fp].Close() + delete(a.Aggregates, fp) } } diff --git a/manager/dispatcher.go b/manager/dispatcher.go index cd4b0072..d2db9ce3 100644 --- a/manager/dispatcher.go +++ b/manager/dispatcher.go @@ -34,7 +34,7 @@ func DispatcherFor(destination string) DestinationDispatcher { type EventSummary struct { Rule *AggregationRule - Events Events + Event *Event Destination string } @@ -116,7 +116,7 @@ func (d *SummaryDispatcher) Receive(s *EventSummary) RemoteError { } func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhibitedInterrogator) { - if i.IsInhibited(r.Summary.Events[0]) { + if i.IsInhibited(r.Summary.Event) { r.Response <- &summaryDispatchResponse{ Disposition: SUPPRESSED, } @@ -124,6 +124,9 @@ func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhib } // BUG: Perform sending of summaries. + r.Response <- &summaryDispatchResponse{ + Disposition: DISPATCHED, + } } func (d *SummaryDispatcher) Dispatch(i IsInhibitedInterrogator) { diff --git a/manager/event.go b/manager/event.go index 6c961068..bc7cd546 100644 --- a/manager/event.go +++ b/manager/event.go @@ -17,28 +17,23 @@ import ( "fmt" "hash/fnv" "sort" - "time" ) +type EventFingerprint uint64 + // Event models an action triggered by Prometheus. type Event struct { // Label value pairs for purpose of aggregation, matching, and disposition // dispatching. This must minimally include a "name" label. Labels map[string]string - - // CreatedAt indicates when the event was created. - CreatedAt time.Time - - // ExpiresAt is the allowed lifetime for this event before it is reaped. - ExpiresAt time.Time - + // Extra key/value information which is not used for aggregation. Payload map[string]string } -func (e Event) Fingerprint() uint64 { +func (e Event) Fingerprint() EventFingerprint { keys := []string{} - for k := range e.Payload { + for k := range e.Labels { keys = append(keys, k) } @@ -50,7 +45,7 @@ func (e Event) Fingerprint() uint64 { fmt.Fprintf(summer, k, e.Labels[k]) } - return summer.Sum64() + return EventFingerprint(summer.Sum64()) } type Events []*Event diff --git a/web/alerts.go b/web/alerts.go index 1ee77b24..e9e46833 100644 --- a/web/alerts.go +++ b/web/alerts.go @@ -14,11 +14,21 @@ package web import ( + "github.com/prometheus/alert_manager/manager" "net/http" ) -type AlertsHandler struct {} +type AlertStatus struct { + AlertAggregates []*manager.AggregationInstance +} + +type AlertsHandler struct { + Aggregator *manager.Aggregator +} func (h *AlertsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - executeTemplate(w, "alerts", nil) + alertStatus := &AlertStatus{ + AlertAggregates: h.Aggregator.AlertAggregates(), + } + executeTemplate(w, "alerts", alertStatus) } diff --git a/web/silences.go b/web/silences.go index f87e2058..82bcd2e4 100644 --- a/web/silences.go +++ b/web/silences.go @@ -17,7 +17,7 @@ import ( "net/http" ) -type SilencesHandler struct {} +type SilencesHandler struct{} func (h *SilencesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { executeTemplate(w, "silences", nil) diff --git a/web/status.go b/web/status.go index 72ad44bf..6ed058db 100644 --- a/web/status.go +++ b/web/status.go @@ -17,7 +17,7 @@ import ( "net/http" ) -type StatusHandler struct {} +type StatusHandler struct{} func (h *StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { executeTemplate(w, "status", nil) diff --git a/web/templates/alerts.html b/web/templates/alerts.html index 9e7402c1..be9bcd20 100644 --- a/web/templates/alerts.html +++ b/web/templates/alerts.html @@ -16,25 +16,31 @@ Alert Labels Active Since + Last Refreshed Surpressed + {{range .AlertAggregates}} + + {{.Event}} + {{.Event.Labels}} + {{.Created}} + {{.LastRefreshed}} + No + + {{end}} TheTaxesAreTooDamnHigh {foo="bar",baz="biz"} ... + ... No TheTaxesAreTooDamnHigh {foo="bar",baz="biz"} ... - No - - - TheTaxesAreTooDamnHigh - {foo="bar",baz="biz"} ... No @@ -42,17 +48,6 @@ TheTaxesAreTooDamnHigh {foo="bar",baz="biz"} ... - No - - - TheTaxesAreTooDamnHigh - {foo="bar",baz="biz"} - ... - No - - - TheTaxesAreTooDamnHigh - {foo="bar",baz="biz"} ... No diff --git a/web/web.go b/web/web.go index 1326a3cb..4f035c02 100644 --- a/web/web.go +++ b/web/web.go @@ -38,9 +38,9 @@ var ( type WebService struct { AlertManagerService *api.AlertManagerService - AlertsHandler *AlertsHandler - SilencesHandler *SilencesHandler - StatusHandler *StatusHandler + AlertsHandler *AlertsHandler + SilencesHandler *SilencesHandler + StatusHandler *StatusHandler } func (w WebService) ServeForever() error {