Add pending alert iterator

This commit is contained in:
Fabian Reinartz 2015-09-29 10:45:58 +02:00
parent 54a8d6ea04
commit 3b401c413b
4 changed files with 95 additions and 52 deletions

18
api.go
View File

@ -61,15 +61,15 @@ func (e *apiError) Error() string {
}
func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
alerts, err := api.alerts.All()
if err != nil {
respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
return
}
respond(w, alerts)
// alerts, err := api.alerts.GetAll()
// if err != nil {
// respondError(w, apiError{
// typ: errorBadData,
// err: err,
// }, nil)
// return
// }
// respond(w, alerts)
}
func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) {

View File

@ -3,6 +3,7 @@ package main
import (
"sync"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/alertmanager/config"
@ -21,13 +22,14 @@ func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
ih.mtx.RLock()
defer ih.mtx.RUnlock()
alerts, err := ih.alerts.All()
if err != nil {
// TODO(fabxc): log error.
return false
}
alerts := ih.alerts.GetPending()
defer alerts.Close()
for _, alert := range alerts {
for alert := range alerts.Next() {
if err := alerts.Err(); err != nil {
log.Errorf("Error iterating alerts: %s", err)
continue
}
if alert.Resolved() {
continue
}

View File

@ -40,8 +40,8 @@ func NewMemData() *MemData {
}
type memAlertIterator struct {
ch <-chan *types.Alert
close func()
ch <-chan *types.Alert
done chan struct{}
}
func (ai memAlertIterator) Next() <-chan *types.Alert {
@ -49,7 +49,7 @@ func (ai memAlertIterator) Next() <-chan *types.Alert {
}
func (ai memAlertIterator) Err() error { return nil }
func (ai memAlertIterator) Close() { ai.close() }
func (ai memAlertIterator) Close() { close(ai.done) }
// MemAlerts implements an Alerts provider based on in-memory data.
type MemAlerts struct {
@ -71,6 +71,69 @@ func (a *MemAlerts) Subscribe() AlertIterator {
a.data.mtx.Lock()
defer a.data.mtx.Unlock()
var (
alerts = a.getPending()
ch = make(chan *types.Alert, 200)
done = make(chan struct{})
)
i := len(a.listeners)
a.listeners = append(a.listeners, ch)
go func() {
defer func() {
a.mtx.Lock()
a.listeners = append(a.listeners[:i], a.listeners[i+1:]...)
close(ch)
a.mtx.Unlock()
}()
for _, a := range alerts {
select {
case ch <- a:
case <-done:
return
}
}
}()
return memAlertIterator{
ch: ch,
done: done,
}
}
func (a *MemAlerts) GetPending() AlertIterator {
a.mtx.Lock()
defer a.mtx.Unlock()
a.data.mtx.Lock()
defer a.data.mtx.Unlock()
var (
alerts = a.getPending()
ch = make(chan *types.Alert, 200)
done = make(chan struct{})
)
go func() {
defer close(ch)
for _, a := range alerts {
select {
case ch <- a:
case <-done:
return
}
}
}()
return memAlertIterator{
ch: ch,
done: done,
}
}
func (a *MemAlerts) getPending() []*types.Alert {
// Get fingerprints for all alerts that have pending notifications.
fps := map[model.Fingerprint]struct{}{}
for _, ns := range a.data.notifies {
@ -90,37 +153,7 @@ func (a *MemAlerts) Subscribe() AlertIterator {
}
}
ch := make(chan *types.Alert)
go func() {
for _, a := range alerts {
ch <- a
}
}()
i := len(a.listeners)
a.listeners = append(a.listeners, ch)
return memAlertIterator{
ch: ch,
close: func() {
a.mtx.Lock()
a.listeners = append(a.listeners[:i], a.listeners[i+1:]...)
close(ch)
a.mtx.Unlock()
},
}
}
func (a *MemAlerts) All() ([]*types.Alert, error) {
a.data.mtx.RLock()
defer a.data.mtx.RUnlock()
var alerts []*types.Alert
for _, a := range a.data.alerts {
alerts = append(alerts, a)
}
return alerts, nil
return alerts
}
func (a *MemAlerts) Put(alerts ...*types.Alert) error {
@ -137,6 +170,14 @@ func (a *MemAlerts) Put(alerts ...*types.Alert) error {
}
}
ch := make(chan *types.Alert)
go func() {
for _, a := range alerts {
ch <- a
}
}()
return nil
}

View File

@ -36,9 +36,9 @@ type Alerts interface {
// resolved and successfully notified about.
// They are not guaranteed to be in chronological order.
Subscribe() AlertIterator
// All returns a list of all existing alerts.
// TODO(fabxc): this is not a scalable solution
All() ([]*types.Alert, error)
// GetPending returns an iterator over all alerts that have
// pending notifications.
GetPending() AlertIterator
// Get returns the alert for a given fingerprint.
Get(model.Fingerprint) (*types.Alert, error)
// Put adds the given alert to the set.