From 4b58d30f4d509b4f93dce2ae05cb0a1f34805fc4 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 26 Sep 2015 11:12:47 +0200 Subject: [PATCH] Add safer AlertIterator interface --- dispatch.go | 13 +++++++--- provider/mem.go | 60 ++++++++++++++++++++++++++++++++++++-------- provider/provider.go | 18 +++++++++---- route.go | 4 +-- 4 files changed, 74 insertions(+), 21 deletions(-) diff --git a/dispatch.go b/dispatch.go index e8210340..77b551d7 100644 --- a/dispatch.go +++ b/dispatch.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "sync" "time" @@ -64,18 +65,18 @@ func (d *Dispatcher) Run() { updates := d.alerts.IterActive() - defer close(d.done) - // TODO(fabxc): updates channel is never closed!!! + defer updates.Close() - d.run(updates) + d.run(updates.Next()) } func (d *Dispatcher) run(updates <-chan *types.Alert) { - cleanup := time.Tick(30 * time.Second) + cleanup := time.Tick(15 * time.Second) for { select { case alert := <-updates: + fmt.Println("update", alert) d.mtx.RLock() routes := d.routes.Match(alert.Labels) d.mtx.RUnlock() @@ -85,6 +86,7 @@ func (d *Dispatcher) run(updates <-chan *types.Alert) { } case <-cleanup: + fmt.Println("cleanup") for _, ag := range d.aggrGroups { if ag.empty() { ag.stop() @@ -102,6 +104,7 @@ func (d *Dispatcher) run(updates <-chan *types.Alert) { func (d *Dispatcher) Stop() { d.cancel() d.cancel = nil + <-d.done } @@ -132,6 +135,7 @@ func (d *Dispatcher) notifyFunc(dest string) notifyFunc { // and insert it. func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) { group := model.LabelSet{} + fmt.Println("processing", alert) for ln, lv := range alert.Labels { if _, ok := opts.GroupBy[ln]; ok { @@ -248,6 +252,7 @@ func (ag *aggrGroup) empty() bool { // flush sends notifications for all new alerts. func (ag *aggrGroup) flush(notify func(*types.Alert) bool) { ag.mtx.Lock() + fmt.Println("flushing", ag) alerts := make(map[model.Fingerprint]*types.Alert, len(ag.alerts)) for fp, alert := range ag.alerts { diff --git a/provider/mem.go b/provider/mem.go index 79135c1f..5d9c0234 100644 --- a/provider/mem.go +++ b/provider/mem.go @@ -26,6 +26,18 @@ var ( ErrNotFound = fmt.Errorf("item not found") ) +type memAlertIterator struct { + ch <-chan *types.Alert + close func() +} + +func (ai memAlertIterator) Next() <-chan *types.Alert { + return ai.ch +} + +func (ai memAlertIterator) Err() error { return nil } +func (ai memAlertIterator) Close() { ai.close() } + // MemAlerts implements an Alerts provider based on in-memory data. type MemAlerts struct { mtx sync.RWMutex @@ -39,29 +51,57 @@ func NewMemAlerts() *MemAlerts { } } -func (a *MemAlerts) IterActive() <-chan *types.Alert { +func (a *MemAlerts) IterActive() AlertIterator { a.mtx.Lock() defer a.mtx.Unlock() - ch := make(chan *types.Alert) - - for _, alert := range a.alerts { - ch <- alert + var alerts []*types.Alert + for _, a := range a.alerts { + if !a.Resolved() { + alerts = append(alerts, a) + } } + ch := make(chan *types.Alert) + + go func() { + for _, a := range alerts { + ch <- a + } + }() + + i := len(a.listeners) a.listeners = append(a.listeners, ch) - return ch + return memAlertIterator{ + ch: ch, + close: func() { + a.mtx.Lock() + a.listeners = append(a.listeners[:i], a.listeners[i+1:]...) + close(ch) + a.mtx.Unlock() + }, + } } -func (a *MemAlerts) Put(alert *types.Alert) error { +func (a *MemAlerts) All() ([]*types.Alert, error) { + var alerts []*types.Alert + for _, a := range a.alerts { + alerts = append(alerts, a) + } + return alerts, nil +} + +func (a *MemAlerts) Put(alerts ...*types.Alert) error { a.mtx.RLock() defer a.mtx.RUnlock() - a.alerts[alert.Fingerprint()] = alert + for _, alert := range alerts { + a.alerts[alert.Fingerprint()] = alert - for _, ch := range a.listeners { - ch <- alert + for _, ch := range a.listeners { + ch <- alert + } } return nil diff --git a/provider/provider.go b/provider/provider.go index 141e0470..69b80e24 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -20,16 +20,24 @@ import ( "github.com/prometheus/alertmanager/types" ) +type AlertIterator interface { + Next() <-chan *types.Alert + Err() error + Close() +} + // Alerts gives access to a set of alerts. type Alerts interface { - // Iter returns a channel on which all active alerts from the - // beginning of time are sent. They are not guaranteed to be in - // chronological order. - IterActive() <-chan *types.Alert + // IterActive returns an iterator over active alerts from the + // beginning of time. They are not guaranteed to be in chronological order. + IterActive() AlertIterator + // All returns a list of all existing alerts. + // TODO(fabxc): this is not a scalable solution + All() ([]*types.Alert, error) // Get returns the alert for a given fingerprint. Get(model.Fingerprint) (*types.Alert, error) // Put adds the given alert to the set. - Put(*types.Alert) error + Put(...*types.Alert) error } // Silences gives access to silences. diff --git a/route.go b/route.go index c9837824..b996865c 100644 --- a/route.go +++ b/route.go @@ -11,7 +11,7 @@ import ( ) var DefaultRouteOpts = RouteOpts{ - GroupWait: 10 * time.Second, + GroupWait: 1 * time.Second, RepeatInterval: 10 * time.Second, } @@ -140,7 +140,7 @@ func (ro *RouteOpts) String() string { for ln := range ro.GroupBy { labels = append(labels, ln) } - return fmt.Sprintf("", ro.SendTo, labels, ro.GroupWait) + return fmt.Sprintf("", ro.SendTo, labels, ro.GroupWait) } func (ro *RouteOpts) populateDefault(parent *RouteOpts) {