Add safer AlertIterator interface

This commit is contained in:
Fabian Reinartz 2015-09-26 11:12:47 +02:00
parent 89e8d82a1b
commit 4b58d30f4d
4 changed files with 74 additions and 21 deletions

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"fmt"
"sync" "sync"
"time" "time"
@ -64,18 +65,18 @@ func (d *Dispatcher) Run() {
updates := d.alerts.IterActive() updates := d.alerts.IterActive()
defer close(d.done) defer updates.Close()
// TODO(fabxc): updates channel is never closed!!!
d.run(updates) d.run(updates.Next())
} }
func (d *Dispatcher) run(updates <-chan *types.Alert) { func (d *Dispatcher) run(updates <-chan *types.Alert) {
cleanup := time.Tick(30 * time.Second) cleanup := time.Tick(15 * time.Second)
for { for {
select { select {
case alert := <-updates: case alert := <-updates:
fmt.Println("update", alert)
d.mtx.RLock() d.mtx.RLock()
routes := d.routes.Match(alert.Labels) routes := d.routes.Match(alert.Labels)
d.mtx.RUnlock() d.mtx.RUnlock()
@ -85,6 +86,7 @@ func (d *Dispatcher) run(updates <-chan *types.Alert) {
} }
case <-cleanup: case <-cleanup:
fmt.Println("cleanup")
for _, ag := range d.aggrGroups { for _, ag := range d.aggrGroups {
if ag.empty() { if ag.empty() {
ag.stop() ag.stop()
@ -102,6 +104,7 @@ func (d *Dispatcher) run(updates <-chan *types.Alert) {
func (d *Dispatcher) Stop() { func (d *Dispatcher) Stop() {
d.cancel() d.cancel()
d.cancel = nil d.cancel = nil
<-d.done <-d.done
} }
@ -132,6 +135,7 @@ func (d *Dispatcher) notifyFunc(dest string) notifyFunc {
// and insert it. // and insert it.
func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) { func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) {
group := model.LabelSet{} group := model.LabelSet{}
fmt.Println("processing", alert)
for ln, lv := range alert.Labels { for ln, lv := range alert.Labels {
if _, ok := opts.GroupBy[ln]; ok { if _, ok := opts.GroupBy[ln]; ok {
@ -248,6 +252,7 @@ func (ag *aggrGroup) empty() bool {
// flush sends notifications for all new alerts. // flush sends notifications for all new alerts.
func (ag *aggrGroup) flush(notify func(*types.Alert) bool) { func (ag *aggrGroup) flush(notify func(*types.Alert) bool) {
ag.mtx.Lock() ag.mtx.Lock()
fmt.Println("flushing", ag)
alerts := make(map[model.Fingerprint]*types.Alert, len(ag.alerts)) alerts := make(map[model.Fingerprint]*types.Alert, len(ag.alerts))
for fp, alert := range ag.alerts { for fp, alert := range ag.alerts {

View File

@ -26,6 +26,18 @@ var (
ErrNotFound = fmt.Errorf("item not found") ErrNotFound = fmt.Errorf("item not found")
) )
type memAlertIterator struct {
ch <-chan *types.Alert
close func()
}
func (ai memAlertIterator) Next() <-chan *types.Alert {
return ai.ch
}
func (ai memAlertIterator) Err() error { return nil }
func (ai memAlertIterator) Close() { ai.close() }
// MemAlerts implements an Alerts provider based on in-memory data. // MemAlerts implements an Alerts provider based on in-memory data.
type MemAlerts struct { type MemAlerts struct {
mtx sync.RWMutex mtx sync.RWMutex
@ -39,30 +51,58 @@ func NewMemAlerts() *MemAlerts {
} }
} }
func (a *MemAlerts) IterActive() <-chan *types.Alert { func (a *MemAlerts) IterActive() AlertIterator {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
var alerts []*types.Alert
for _, a := range a.alerts {
if !a.Resolved() {
alerts = append(alerts, a)
}
}
ch := make(chan *types.Alert) ch := make(chan *types.Alert)
for _, alert := range a.alerts { go func() {
ch <- alert for _, a := range alerts {
ch <- a
} }
}()
i := len(a.listeners)
a.listeners = append(a.listeners, ch) a.listeners = append(a.listeners, ch)
return ch return memAlertIterator{
ch: ch,
close: func() {
a.mtx.Lock()
a.listeners = append(a.listeners[:i], a.listeners[i+1:]...)
close(ch)
a.mtx.Unlock()
},
}
} }
func (a *MemAlerts) Put(alert *types.Alert) error { func (a *MemAlerts) All() ([]*types.Alert, error) {
var alerts []*types.Alert
for _, a := range a.alerts {
alerts = append(alerts, a)
}
return alerts, nil
}
func (a *MemAlerts) Put(alerts ...*types.Alert) error {
a.mtx.RLock() a.mtx.RLock()
defer a.mtx.RUnlock() defer a.mtx.RUnlock()
for _, alert := range alerts {
a.alerts[alert.Fingerprint()] = alert a.alerts[alert.Fingerprint()] = alert
for _, ch := range a.listeners { for _, ch := range a.listeners {
ch <- alert ch <- alert
} }
}
return nil return nil
} }

View File

@ -20,16 +20,24 @@ import (
"github.com/prometheus/alertmanager/types" "github.com/prometheus/alertmanager/types"
) )
type AlertIterator interface {
Next() <-chan *types.Alert
Err() error
Close()
}
// Alerts gives access to a set of alerts. // Alerts gives access to a set of alerts.
type Alerts interface { type Alerts interface {
// Iter returns a channel on which all active alerts from the // IterActive returns an iterator over active alerts from the
// beginning of time are sent. They are not guaranteed to be in // beginning of time. They are not guaranteed to be in chronological order.
// chronological order. IterActive() AlertIterator
IterActive() <-chan *types.Alert // All returns a list of all existing alerts.
// TODO(fabxc): this is not a scalable solution
All() ([]*types.Alert, error)
// Get returns the alert for a given fingerprint. // Get returns the alert for a given fingerprint.
Get(model.Fingerprint) (*types.Alert, error) Get(model.Fingerprint) (*types.Alert, error)
// Put adds the given alert to the set. // Put adds the given alert to the set.
Put(*types.Alert) error Put(...*types.Alert) error
} }
// Silences gives access to silences. // Silences gives access to silences.

View File

@ -11,7 +11,7 @@ import (
) )
var DefaultRouteOpts = RouteOpts{ var DefaultRouteOpts = RouteOpts{
GroupWait: 10 * time.Second, GroupWait: 1 * time.Second,
RepeatInterval: 10 * time.Second, RepeatInterval: 10 * time.Second,
} }
@ -140,7 +140,7 @@ func (ro *RouteOpts) String() string {
for ln := range ro.GroupBy { for ln := range ro.GroupBy {
labels = append(labels, ln) labels = append(labels, ln)
} }
return fmt.Sprintf("<RouteOpts send_to:%q group_by:%q group_wait:%q %q %q>", ro.SendTo, labels, ro.GroupWait) return fmt.Sprintf("<RouteOpts send_to:%q group_by:%q group_wait:%q>", ro.SendTo, labels, ro.GroupWait)
} }
func (ro *RouteOpts) populateDefault(parent *RouteOpts) { func (ro *RouteOpts) populateDefault(parent *RouteOpts) {