From 2533f63a2d2330b49f2bd01f9351ca1d5b12fc1b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 7 Jul 2015 09:47:09 +0200 Subject: [PATCH] add experimental alert state backed by CRDTs --- manager/dispatch.go | 27 ++++++----- manager/state.go | 106 +++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 116 insertions(+), 17 deletions(-) diff --git a/manager/dispatch.go b/manager/dispatch.go index 255787ac..19624c52 100644 --- a/manager/dispatch.go +++ b/manager/dispatch.go @@ -69,15 +69,20 @@ func (d *Dispatcher) Run() { } } - now := time.Now() + // now := time.Now() - for _, a := range d.state.Alert().GetAll() { - if a.Resolved() && a.ResolvedAt.Before(now.Sub(ResolveTimeout)) { - if err := d.state.Alert().Del(a.Fingerprint()); err != nil { - log.Errorf("error cleaning resolved alerts: %s", err) - } - } - } + // list, err := d.state.Alert().GetAll() + // if err != nil { + // log.Error(err) + // } + + // for _, a := range list { + // if a.Resolved() && a.ResolvedAt.Before(now.Sub(ResolveTimeout)) { + // if err := d.state.Alert().Del(a.Fingerprint()); err != nil { + // log.Errorf("error cleaning resolved alerts: %s", err) + // } + // } + // } case alert := <-updates: @@ -96,16 +101,16 @@ func (d *Dispatcher) Run() { a.ResolvedAt = alert.CreatedAt.Add(ResolveTimeout) // After the constant timeout update the alert to be resolved. - go func(alert *Alert) { + go func(a Alert) { now := time.Now() if a.ResolvedAt.After(now) { - time.Sleep(now.Sub(a.ResolvedAt)) + time.Sleep(a.ResolvedAt.Sub(now)) } if err := d.state.Alert().Add(&a); err != nil { log.Errorf("alert auto-resolve failed: %s", err) } - }(alert) + }(a) } } } diff --git a/manager/state.go b/manager/state.go index c603e105..e2400bbd 100644 --- a/manager/state.go +++ b/manager/state.go @@ -8,6 +8,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/log" + + "github.com/prometheus/alertmanager/crdt" ) // A State serves the Alertmanager's internal state about active silences. @@ -20,7 +22,6 @@ type State interface { type AlertState interface { Add(...*Alert) error - Del(model.Fingerprint) error Get(model.Fingerprint) (*Alert, error) GetAll() ([]*Alert, error) @@ -48,7 +49,7 @@ type SilenceState interface { // simpleState implements the State interface based on in-memory storage. type simpleState struct { silences *memSilences - alerts *memAlerts + alerts *crdtAlerts config *memConfig } @@ -58,10 +59,11 @@ func NewSimpleState() State { sils: map[string]*Silence{}, nextID: 1, }, - alerts: &memAlerts{ - alerts: map[model.Fingerprint]*Alert{}, - updates: make(chan *Alert, 100), - }, + alerts: newCRDTAlerts(crdt.NewMemStorage()), + // alerts: &memAlerts{ + // alerts: map[model.Fingerprint]*Alert{}, + // updates: make(chan *Alert, 100), + // }, config: &memConfig{}, } @@ -102,6 +104,98 @@ func (c *memConfig) Get() (*Config, error) { return c.config, nil } +type crdtAlerts struct { + set crdt.Set + + updates chan *Alert + subs []chan *Alert + mtx sync.RWMutex +} + +func newCRDTAlerts(storage crdt.Storage) *crdtAlerts { + return &crdtAlerts{ + set: crdt.NewLWW(storage), + updates: make(chan *Alert, 100), + } +} + +func (s *crdtAlerts) run() { + for a := range s.updates { + s.mtx.RLock() + + for _, sub := range s.subs { + select { + case <-time.After(100 * time.Millisecond): + log.Errorf("dropped alert %s for subscriber", a) + case sub <- a: + // Success + } + } + + s.mtx.RUnlock() + } +} + +func (s *crdtAlerts) Add(alerts ...*Alert) error { + for _, a := range alerts { + err := s.set.Add(a.Fingerprint().String(), uint64(a.Timestamp.UnixNano()/1e6), a) + if err != nil { + return err + } + + s.updates <- a + } + return nil +} + +func (s *crdtAlerts) Get(fp model.Fingerprint) (*Alert, error) { + e, err := s.set.Get(fp.String()) + if err != nil { + return nil, err + } + + alert := e.Value.(*Alert) + + return alert, nil +} + +func (s *crdtAlerts) GetAll() ([]*Alert, error) { + list, err := s.set.List() + if err != nil { + return nil, err + } + + var alerts []*Alert + for _, e := range list { + alerts = append(alerts, e.Value.(*Alert)) + } + return alerts, nil +} + +func (s *crdtAlerts) Iter() <-chan *Alert { + ch := make(chan *Alert, 100) + + // As we append the channel to the subcription channels + // before sending the current list of all alerts, no alert is lost. + // Handling the some alert twice is effectively a noop. + s.mtx.Lock() + s.subs = append(s.subs, ch) + s.mtx.Unlock() + + prev, err := s.GetAll() + if err != nil { + log.Error(err) + } + + go func() { + for _, alert := range prev { + ch <- alert + } + }() + + return ch +} + type memAlerts struct { alerts map[model.Fingerprint]*Alert updates chan *Alert