From 6a82b58efe096d4b0a8b4e47efd5f1d0eab7e3a0 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 4 Jul 2015 14:59:52 +0200 Subject: [PATCH] allow multiple alert subscribers, improve cleanup --- manager/dispatch.go | 75 +++++++++++++++++++++++++-------------------- manager/state.go | 60 ++++++++++++++++++++++++++++-------- 2 files changed, 89 insertions(+), 46 deletions(-) diff --git a/manager/dispatch.go b/manager/dispatch.go index ae626b85..5d0086aa 100644 --- a/manager/dispatch.go +++ b/manager/dispatch.go @@ -54,43 +54,50 @@ func (d *Dispatcher) notify(name string, alerts ...*Alert) error { } func (d *Dispatcher) Run() { + + updates := d.state.Alert().Iter() + cleanup := time.Tick(30 * time.Second) + for { - alert := d.state.Alert().Next() - - conf, err := d.state.Config().Get() - if err != nil { - log.Error(err) - continue - } - - for _, m := range conf.Routes.Match(alert.Labels) { - d.processAlert(alert, m) - } - - if !alert.Resolved() { - // After the constant timeout update the alert to be resolved. - go func(alert *Alert) { - for { - // TODO: get most recent version first. - time.Sleep(ResolveTimeout) - - a := *alert - a.ResolvedAt = time.Now() - - if err := d.state.Alert().Add(&a); err != nil { - log.Error(err) - continue - } - return + select { + case <-cleanup: + // Cleanup routine. + for _, ag := range d.aggrGroups { + if ag.empty() { + ag.stop() + delete(d.aggrGroups, ag.fingerprint()) } - }(alert) - } + } - // Cleanup routine. - for _, ag := range d.aggrGroups { - if ag.empty() { - ag.stop() - delete(d.aggrGroups, ag.fingerprint()) + case alert := <-updates: + + conf, err := d.state.Config().Get() + if err != nil { + log.Error(err) + continue + } + + for _, m := range conf.Routes.Match(alert.Labels) { + d.processAlert(alert, m) + } + + if !alert.Resolved() { + // After the constant timeout update the alert to be resolved. + go func(alert *Alert) { + for { + // TODO: get most recent version first. + time.Sleep(ResolveTimeout) + + a := *alert + a.ResolvedAt = time.Now() + + if err := d.state.Alert().Add(&a); err != nil { + log.Error(err) + continue + } + return + } + }(alert) } } } diff --git a/manager/state.go b/manager/state.go index f75ee3d4..330f1ed0 100644 --- a/manager/state.go +++ b/manager/state.go @@ -4,9 +4,10 @@ import ( "fmt" "sort" "sync" + "time" "github.com/prometheus/common/model" - // "github.com/prometheus/log" + "github.com/prometheus/log" ) // A State serves the Alertmanager's internal state about active silences. @@ -22,7 +23,7 @@ type AlertState interface { Get(model.Fingerprint) (*Alert, error) GetAll() ([]*Alert, error) - Next() *Alert + Iter() <-chan *Alert } type ConfigState interface { @@ -50,17 +51,21 @@ type simpleState struct { } func NewSimpleState() State { - return &simpleState{ + state := &simpleState{ silences: &memSilences{ sils: map[string]*Silence{}, nextID: 1, }, alerts: &memAlerts{ - alerts: map[model.Fingerprint]*Alert{}, - ch: make(chan *Alert, 100), + alerts: map[model.Fingerprint]*Alert{}, + updates: make(chan *Alert, 100), }, config: &memConfig{}, } + + go state.alerts.run() + + return state } func (s *simpleState) Alert() AlertState { @@ -96,9 +101,27 @@ func (c *memConfig) Get() (*Config, error) { } type memAlerts struct { - alerts map[model.Fingerprint]*Alert - ch chan *Alert - mtx sync.RWMutex + alerts map[model.Fingerprint]*Alert + updates chan *Alert + subs []chan *Alert + mtx sync.RWMutex +} + +func (s *memAlerts) run() { + for a := range s.updates { + s.mtx.RLock() + + for _, sub := range s.subs { + select { + case <-time.After(100 * time.Millisecond): + log.Errorf("dropped alert %s for subscriber", a) + case sub <- a: + // Success + } + } + + s.mtx.RUnlock() + } } func (s *memAlerts) GetAll() ([]*Alert, error) { @@ -129,8 +152,7 @@ func (s *memAlerts) Add(alerts ...*Alert) error { s.alerts[fp] = alert } - // TODO(fabxc): remove this as it blocks if the channel is full. - s.ch <- alert + s.updates <- alert } return nil @@ -147,8 +169,22 @@ func (s *memAlerts) Get(fp model.Fingerprint) (*Alert, error) { return nil, fmt.Errorf("alert with fingerprint %s does not exist", fp) } -func (s *memAlerts) Next() *Alert { - return <-s.ch +func (s *memAlerts) Iter() <-chan *Alert { + ch := make(chan *Alert, 100) + + s.mtx.Lock() + s.subs = append(s.subs, ch) + s.mtx.Unlock() + + go func() { + prev, _ := s.GetAll() + + for _, alert := range prev { + ch <- alert + } + }() + + return ch } type memSilences struct {