add experimental alert state backed by CRDTs

This commit is contained in:
Fabian Reinartz 2015-07-07 09:47:09 +02:00
parent 73ecc0deff
commit 2533f63a2d
2 changed files with 116 additions and 17 deletions

View File

@ -69,15 +69,20 @@ func (d *Dispatcher) Run() {
}
}
now := time.Now()
// now := time.Now()
for _, a := range d.state.Alert().GetAll() {
if a.Resolved() && a.ResolvedAt.Before(now.Sub(ResolveTimeout)) {
if err := d.state.Alert().Del(a.Fingerprint()); err != nil {
log.Errorf("error cleaning resolved alerts: %s", err)
}
}
}
// list, err := d.state.Alert().GetAll()
// if err != nil {
// log.Error(err)
// }
// for _, a := range list {
// if a.Resolved() && a.ResolvedAt.Before(now.Sub(ResolveTimeout)) {
// if err := d.state.Alert().Del(a.Fingerprint()); err != nil {
// log.Errorf("error cleaning resolved alerts: %s", err)
// }
// }
// }
case alert := <-updates:
@ -96,16 +101,16 @@ func (d *Dispatcher) Run() {
a.ResolvedAt = alert.CreatedAt.Add(ResolveTimeout)
// After the constant timeout update the alert to be resolved.
go func(alert *Alert) {
go func(a Alert) {
now := time.Now()
if a.ResolvedAt.After(now) {
time.Sleep(now.Sub(a.ResolvedAt))
time.Sleep(a.ResolvedAt.Sub(now))
}
if err := d.state.Alert().Add(&a); err != nil {
log.Errorf("alert auto-resolve failed: %s", err)
}
}(alert)
}(a)
}
}
}

View File

@ -8,6 +8,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/log"
"github.com/prometheus/alertmanager/crdt"
)
// A State serves the Alertmanager's internal state about active silences.
@ -20,7 +22,6 @@ type State interface {
type AlertState interface {
Add(...*Alert) error
Del(model.Fingerprint) error
Get(model.Fingerprint) (*Alert, error)
GetAll() ([]*Alert, error)
@ -48,7 +49,7 @@ type SilenceState interface {
// simpleState implements the State interface based on in-memory storage.
type simpleState struct {
silences *memSilences
alerts *memAlerts
alerts *crdtAlerts
config *memConfig
}
@ -58,10 +59,11 @@ func NewSimpleState() State {
sils: map[string]*Silence{},
nextID: 1,
},
alerts: &memAlerts{
alerts: map[model.Fingerprint]*Alert{},
updates: make(chan *Alert, 100),
},
alerts: newCRDTAlerts(crdt.NewMemStorage()),
// alerts: &memAlerts{
// alerts: map[model.Fingerprint]*Alert{},
// updates: make(chan *Alert, 100),
// },
config: &memConfig{},
}
@ -102,6 +104,98 @@ func (c *memConfig) Get() (*Config, error) {
return c.config, nil
}
type crdtAlerts struct {
set crdt.Set
updates chan *Alert
subs []chan *Alert
mtx sync.RWMutex
}
func newCRDTAlerts(storage crdt.Storage) *crdtAlerts {
return &crdtAlerts{
set: crdt.NewLWW(storage),
updates: make(chan *Alert, 100),
}
}
func (s *crdtAlerts) run() {
for a := range s.updates {
s.mtx.RLock()
for _, sub := range s.subs {
select {
case <-time.After(100 * time.Millisecond):
log.Errorf("dropped alert %s for subscriber", a)
case sub <- a:
// Success
}
}
s.mtx.RUnlock()
}
}
func (s *crdtAlerts) Add(alerts ...*Alert) error {
for _, a := range alerts {
err := s.set.Add(a.Fingerprint().String(), uint64(a.Timestamp.UnixNano()/1e6), a)
if err != nil {
return err
}
s.updates <- a
}
return nil
}
func (s *crdtAlerts) Get(fp model.Fingerprint) (*Alert, error) {
e, err := s.set.Get(fp.String())
if err != nil {
return nil, err
}
alert := e.Value.(*Alert)
return alert, nil
}
func (s *crdtAlerts) GetAll() ([]*Alert, error) {
list, err := s.set.List()
if err != nil {
return nil, err
}
var alerts []*Alert
for _, e := range list {
alerts = append(alerts, e.Value.(*Alert))
}
return alerts, nil
}
func (s *crdtAlerts) Iter() <-chan *Alert {
ch := make(chan *Alert, 100)
// As we append the channel to the subcription channels
// before sending the current list of all alerts, no alert is lost.
// Handling the some alert twice is effectively a noop.
s.mtx.Lock()
s.subs = append(s.subs, ch)
s.mtx.Unlock()
prev, err := s.GetAll()
if err != nil {
log.Error(err)
}
go func() {
for _, alert := range prev {
ch <- alert
}
}()
return ch
}
type memAlerts struct {
alerts map[model.Fingerprint]*Alert
updates chan *Alert