From 4aa5dcccf366c4d3a596a11622b8fb4e3757485b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 4 Jul 2015 12:52:53 +0200 Subject: [PATCH] fix aggregation group handling and state --- manager/api.go | 4 +- manager/dispatch.go | 144 ++++++++++++++++++++++++++++++++------------ manager/state.go | 94 ++++++++++++++--------------- 3 files changed, 153 insertions(+), 89 deletions(-) diff --git a/manager/api.go b/manager/api.go index cfd0edcc..5156af4f 100644 --- a/manager/api.go +++ b/manager/api.go @@ -73,7 +73,9 @@ func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) { return } for _, alert := range alerts { - alert.Timestamp = time.Now() + if alert.Timestamp.IsZero() { + alert.Timestamp = time.Now() + } } // TODO(fabxc): validate input. diff --git a/manager/dispatch.go b/manager/dispatch.go index f49dbbec..0f67e623 100644 --- a/manager/dispatch.go +++ b/manager/dispatch.go @@ -1,7 +1,6 @@ package manager import ( - "sort" "sync" "time" @@ -9,6 +8,8 @@ import ( "github.com/prometheus/log" ) +const ResolveTimeout = 15 * time.Second + // Dispatcher dispatches alerts. It is absed on the alert's data // rather than the time they arrive. Thus it can recover it's state // without persistence. @@ -47,6 +48,33 @@ func (d *Dispatcher) Run() { for _, m := range conf.Routes.Match(alert.Labels) { d.processAlert(alert, m) } + + if !alert.Resolved { + // After the constant timeout update the alert to be resolved. + go func(alert *Alert) { + for { + // TODO: get most recent version first. + time.Sleep(ResolveTimeout) + + a := *alert + a.Resolved = true + + if err := d.state.Alert().Add(&a); err != nil { + log.Error(err) + continue + } + return + } + }(alert) + } + + // Cleanup routine. + for _, ag := range d.aggrGroups { + if ag.empty() { + ag.stop() + delete(d.aggrGroups, ag.fingerprint()) + } + } } } @@ -63,16 +91,43 @@ func (d *Dispatcher) processAlert(alert *Alert, opts *RouteOpts) { ag, ok := d.aggrGroups[fp] if !ok { - ag = newAggrGroup(group, opts) + ag = newAggrGroup(d, group, opts) d.aggrGroups[fp] = ag } ag.insert(alert) +} - if ag.empty() { - ag.stop() - delete(d.aggrGroups, fp) - } +// Alert models an action triggered by Prometheus. +type Alert struct { + // Label value pairs for purpose of aggregation, matching, and disposition + // dispatching. This must minimally include an "alertname" label. + Labels model.LabelSet `json:"labels"` + + // Extra key/value information which is not used for aggregation. + Payload map[string]string `json:"payload"` + + // Short summary of alert. + Summary string `json:"summary"` + Description string `json:"description"` + Runbook string `json:"runbook"` + + // When the alert was reported. + Timestamp time.Time `json:"timestamp"` + + // Whether the alert with the given label set is resolved. + Resolved bool `json:"resolved"` +} + +// Name returns the name of the alert. It is equivalent to the "alertname" label. +func (a *Alert) Name() string { + return string(a.Labels[model.AlertNameLabel]) +} + +// Fingerprint returns a unique hash for the alert. It is equivalent to +// the fingerprint of the alert's label set. +func (a *Alert) Fingerprint() model.Fingerprint { + return a.Labels.Fingerprint() } // aggrGroup aggregates alerts into groups based on @@ -80,32 +135,28 @@ func (d *Dispatcher) processAlert(alert *Alert, opts *RouteOpts) { type aggrGroup struct { dispatcher *Dispatcher - labels model.LabelSet - alertsOld alertTimeline - alertsNew alertTimeline - notify string + labels model.LabelSet + opts *RouteOpts - wait time.Duration - waitTimer *time.Timer + next *time.Timer + done chan struct{} - done chan bool - mtx sync.RWMutex + mtx sync.RWMutex + alerts map[model.Fingerprint]struct{} + hasSent bool } // newAggrGroup returns a new aggregation group and starts background processing // that sends notifications about the contained alerts. -func newAggrGroup(labels model.LabelSet, opts *RouteOpts) *aggrGroup { +func newAggrGroup(d *Dispatcher, labels model.LabelSet, opts *RouteOpts) *aggrGroup { ag := &aggrGroup{ dispatcher: d, - labels: group, - wait: opts.GroupWait(), - waitTimer: time.NewTimer(opts.GroupWait()), - notify: opts.SendTo, - done: make(chan bool), - } - if ag.wait == 0 { - ag.waitTimer.Stop() + labels: labels, + opts: opts, + + alerts: map[model.Fingerprint]struct{}{}, + done: make(chan struct{}), } go ag.run() @@ -114,10 +165,19 @@ func newAggrGroup(labels model.LabelSet, opts *RouteOpts) *aggrGroup { } func (ag *aggrGroup) run() { + // Set an initial one-time wait before flushing + // the first batch of notifications. + ag.next = time.NewTimer(ag.opts.GroupWait) + + defer ag.next.Stop() + for { select { - case <-ag.waitTimer.C: + case <-ag.next.C: ag.flush() + // Wait the configured interval before calling flush again. + ag.next.Reset(ag.opts.GroupInterval) + case <-ag.done: return } @@ -125,7 +185,6 @@ func (ag *aggrGroup) run() { } func (ag *aggrGroup) stop() { - ag.waitTimer.Stop() close(ag.done) } @@ -136,21 +195,16 @@ func (ag *aggrGroup) fingerprint() model.Fingerprint { // insert the alert into the aggregation group. If the aggregation group // is empty afterwards, true is returned. func (ag *aggrGroup) insert(alert *Alert) { + fp := alert.Fingerprint() + ag.mtx.Lock() - - ag.alertsNew = append(ag.alertsNew, alert) - sort.Sort(ag.alertsNew) - + ag.alerts[fp] = struct{}{} ag.mtx.Unlock() // Immediately trigger a flush if the wait duration for this // alert is already over. - if alert.Timestamp.Add(ag.wait).Before(time.Now()) { - ag.flush() - } - - if ag.wait > 0 { - ag.waitTimer.Reset(ag.wait) + if !ag.hasSent && alert.Timestamp.Add(ag.opts.GroupWait).Before(time.Now()) { + ag.next.Reset(0) } } @@ -158,7 +212,7 @@ func (ag *aggrGroup) empty() bool { ag.mtx.RLock() defer ag.mtx.RUnlock() - return len(ag.alertsNew)+len(ag.alertsOld) == 0 + return len(ag.alerts) == 0 } // flush sends notifications for all new alerts. @@ -166,10 +220,22 @@ func (ag *aggrGroup) flush() { ag.mtx.Lock() defer ag.mtx.Unlock() - ag.dispatcher.notify(ag.notify, ag.alertsNew...) + var alerts []*Alert + for fp := range ag.alerts { + a, err := ag.dispatcher.state.Alert().Get(fp) + if err != nil { + log.Error(err) + continue + } + // TODO(fabxc): only delete if notify successful. + if a.Resolved { + delete(ag.alerts, fp) + } + alerts = append(alerts, a) + } - ag.alertsOld = append(ag.alertsOld, ag.alertsNew...) - ag.alertsNew = ag.alertsNew[:0] + ag.dispatcher.notify(ag.opts.SendTo, alerts...) + ag.hasSent = true } // alertTimeline is a list of alerts sorted by their timestamp. diff --git a/manager/state.go b/manager/state.go index 5ca08ab9..b0c1a67c 100644 --- a/manager/state.go +++ b/manager/state.go @@ -3,7 +3,6 @@ package manager import ( "fmt" "sync" - "time" "github.com/prometheus/common/model" // "github.com/prometheus/log" @@ -19,6 +18,7 @@ type State interface { type AlertState interface { Add(...*Alert) error + Get(model.Fingerprint) (*Alert, error) GetAll() ([]*Alert, error) Next() *Alert @@ -51,11 +51,12 @@ type simpleState struct { func NewSimpleState() State { return &simpleState{ silences: &memSilences{ - m: map[string]*Silence{}, + sils: map[string]*Silence{}, nextID: 1, }, alerts: &memAlerts{ - ch: make(chan *Alert, 100), + alerts: map[model.Fingerprint]*Alert{}, + ch: make(chan *Alert, 100), }, config: &memConfig{}, } @@ -94,7 +95,7 @@ func (c *memConfig) Get() (*Config, error) { } type memAlerts struct { - alerts []*Alert + alerts map[model.Fingerprint]*Alert ch chan *Alert mtx sync.RWMutex } @@ -104,7 +105,9 @@ func (s *memAlerts) GetAll() ([]*Alert, error) { defer s.mtx.RUnlock() alerts := make([]*Alert, len(s.alerts)) - copy(alerts, s.alerts) + for i, a := range s.alerts { + alerts[i] = a + } return alerts, nil } @@ -113,23 +116,40 @@ func (s *memAlerts) Add(alerts ...*Alert) error { s.mtx.Lock() defer s.mtx.Unlock() - s.alerts = append(s.alerts, alerts...) - - // TODO(fabxc): remove this as it blocks if the channel is full. for _, alert := range alerts { + fp := alert.Fingerprint() + + // Last write wins. + if prev, ok := s.alerts[fp]; !ok || alert.Timestamp.After(prev.Timestamp) { + s.alerts[fp] = alert + } + + // TODO(fabxc): remove this as it blocks if the channel is full. s.ch <- alert } + return nil } +func (s *memAlerts) Get(fp model.Fingerprint) (*Alert, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + if a, ok := s.alerts[fp]; ok { + return a, nil + } + + return nil, fmt.Errorf("alert with fingerprint %s does not exist", fp) +} + func (s *memAlerts) Next() *Alert { return <-s.ch } type memSilences struct { - m map[string]*Silence - mtx sync.RWMutex + sils map[string]*Silence + mtx sync.RWMutex nextID uint64 } @@ -140,13 +160,22 @@ func (s *memSilences) genID() string { } func (s *memSilences) Get(sid string) (*Silence, error) { - return nil, nil + s.mtx.RLock() + defer s.mtx.RUnlock() + + if sil, ok := s.sils[sid]; ok { + return sil, nil + } + + return nil, fmt.Errorf("silence with ID %s does not exist", sid) } + func (s *memSilences) Del(sid string) error { - if _, ok := s.m[sid]; !ok { + if _, ok := s.sils[sid]; !ok { return fmt.Errorf("silence with ID %s does not exist", sid) } - delete(s.m, sid) + + delete(s.sils, sid) return nil } @@ -154,8 +183,8 @@ func (s *memSilences) GetAll() ([]*Silence, error) { s.mtx.Lock() defer s.mtx.Unlock() - sils := make([]*Silence, 0, len(s.m)) - for _, sil := range s.m { + sils := make([]*Silence, 0, len(s.sils)) + for _, sil := range s.sils { sils = append(sils, sil) } return sils, nil @@ -169,39 +198,6 @@ func (s *memSilences) Set(sil *Silence) error { sil.ID = s.genID() } - s.m[sil.ID] = sil + s.sils[sil.ID] = sil return nil } - -// Alert models an action triggered by Prometheus. -type Alert struct { - // Label value pairs for purpose of aggregation, matching, and disposition - // dispatching. This must minimally include an "alertname" label. - Labels model.LabelSet `json:"labels"` - - // Extra key/value information which is not used for aggregation. - Payload map[string]string `json:"payload"` - - // Short summary of alert. - Summary string `json:"summary"` - - // Long description of alert. - Description string `json:"description"` - - // Runbook link or reference for the alert. - Runbook string `json:"runbook"` - - // When the alert was reported. - Timestamp time.Time `json:"-"` -} - -// Name returns the name of the alert. It is equivalent to the "alertname" label. -func (a *Alert) Name() string { - return string(a.Labels[model.AlertNameLabel]) -} - -// Fingerprint returns a unique hash for the alert. It is equivalent to -// the fingerprint of the alert's label set. -func (a *Alert) Fingerprint() model.Fingerprint { - return a.Labels.Fingerprint() -}