allow multiple alert subscribers, improve cleanup

This commit is contained in:
Fabian Reinartz 2015-07-04 14:59:52 +02:00
parent d5ff099d05
commit 6a82b58efe
2 changed files with 89 additions and 46 deletions

View File

@ -54,43 +54,50 @@ func (d *Dispatcher) notify(name string, alerts ...*Alert) error {
}
func (d *Dispatcher) Run() {
updates := d.state.Alert().Iter()
cleanup := time.Tick(30 * time.Second)
for {
alert := d.state.Alert().Next()
conf, err := d.state.Config().Get()
if err != nil {
log.Error(err)
continue
}
for _, m := range conf.Routes.Match(alert.Labels) {
d.processAlert(alert, m)
}
if !alert.Resolved() {
// After the constant timeout update the alert to be resolved.
go func(alert *Alert) {
for {
// TODO: get most recent version first.
time.Sleep(ResolveTimeout)
a := *alert
a.ResolvedAt = time.Now()
if err := d.state.Alert().Add(&a); err != nil {
log.Error(err)
continue
}
return
select {
case <-cleanup:
// Cleanup routine.
for _, ag := range d.aggrGroups {
if ag.empty() {
ag.stop()
delete(d.aggrGroups, ag.fingerprint())
}
}(alert)
}
}
// Cleanup routine.
for _, ag := range d.aggrGroups {
if ag.empty() {
ag.stop()
delete(d.aggrGroups, ag.fingerprint())
case alert := <-updates:
conf, err := d.state.Config().Get()
if err != nil {
log.Error(err)
continue
}
for _, m := range conf.Routes.Match(alert.Labels) {
d.processAlert(alert, m)
}
if !alert.Resolved() {
// After the constant timeout update the alert to be resolved.
go func(alert *Alert) {
for {
// TODO: get most recent version first.
time.Sleep(ResolveTimeout)
a := *alert
a.ResolvedAt = time.Now()
if err := d.state.Alert().Add(&a); err != nil {
log.Error(err)
continue
}
return
}
}(alert)
}
}
}

View File

@ -4,9 +4,10 @@ import (
"fmt"
"sort"
"sync"
"time"
"github.com/prometheus/common/model"
// "github.com/prometheus/log"
"github.com/prometheus/log"
)
// A State serves the Alertmanager's internal state about active silences.
@ -22,7 +23,7 @@ type AlertState interface {
Get(model.Fingerprint) (*Alert, error)
GetAll() ([]*Alert, error)
Next() *Alert
Iter() <-chan *Alert
}
type ConfigState interface {
@ -50,17 +51,21 @@ type simpleState struct {
}
func NewSimpleState() State {
return &simpleState{
state := &simpleState{
silences: &memSilences{
sils: map[string]*Silence{},
nextID: 1,
},
alerts: &memAlerts{
alerts: map[model.Fingerprint]*Alert{},
ch: make(chan *Alert, 100),
alerts: map[model.Fingerprint]*Alert{},
updates: make(chan *Alert, 100),
},
config: &memConfig{},
}
go state.alerts.run()
return state
}
func (s *simpleState) Alert() AlertState {
@ -96,9 +101,27 @@ func (c *memConfig) Get() (*Config, error) {
}
type memAlerts struct {
alerts map[model.Fingerprint]*Alert
ch chan *Alert
mtx sync.RWMutex
alerts map[model.Fingerprint]*Alert
updates chan *Alert
subs []chan *Alert
mtx sync.RWMutex
}
func (s *memAlerts) run() {
for a := range s.updates {
s.mtx.RLock()
for _, sub := range s.subs {
select {
case <-time.After(100 * time.Millisecond):
log.Errorf("dropped alert %s for subscriber", a)
case sub <- a:
// Success
}
}
s.mtx.RUnlock()
}
}
func (s *memAlerts) GetAll() ([]*Alert, error) {
@ -129,8 +152,7 @@ func (s *memAlerts) Add(alerts ...*Alert) error {
s.alerts[fp] = alert
}
// TODO(fabxc): remove this as it blocks if the channel is full.
s.ch <- alert
s.updates <- alert
}
return nil
@ -147,8 +169,22 @@ func (s *memAlerts) Get(fp model.Fingerprint) (*Alert, error) {
return nil, fmt.Errorf("alert with fingerprint %s does not exist", fp)
}
func (s *memAlerts) Next() *Alert {
return <-s.ch
func (s *memAlerts) Iter() <-chan *Alert {
ch := make(chan *Alert, 100)
s.mtx.Lock()
s.subs = append(s.subs, ch)
s.mtx.Unlock()
go func() {
prev, _ := s.GetAll()
for _, alert := range prev {
ch <- alert
}
}()
return ch
}
type memSilences struct {