Integrate alert provider with notify persistence

This commit is contained in:
Fabian Reinartz 2015-10-06 21:07:16 +02:00
parent e7d45c6a64
commit cc5662f1e8
2 changed files with 47 additions and 8 deletions

View File

@ -42,13 +42,14 @@ func NewMemData() *MemData {
type memAlertIterator struct {
ch <-chan *types.Alert
done chan struct{}
err error
}
func (ai memAlertIterator) Next() <-chan *types.Alert {
return ai.ch
}
func (ai memAlertIterator) Err() error { return nil }
func (ai memAlertIterator) Err() error { return ai.err }
func (ai memAlertIterator) Close() { close(ai.done) }
// MemAlerts implements an Alerts provider based on in-memory data.

View File

@ -176,9 +176,6 @@ func (a *SQLAlerts) Subscribe() AlertIterator {
done = make(chan struct{})
)
alerts, err := a.getPending()
if err != nil {
panic(err)
}
i := a.next
a.next++
@ -207,17 +204,45 @@ func (a *SQLAlerts) Subscribe() AlertIterator {
return memAlertIterator{
ch: ch,
done: done,
err: err,
}
}
func (a *SQLAlerts) GetPending() AlertIterator {
return nil
var (
ch = make(chan *types.Alert, 200)
done = make(chan struct{})
)
alerts, err := a.getPending()
go func() {
defer close(ch)
for _, a := range alerts {
select {
case ch <- a:
case <-done:
return
}
}
}()
return memAlertIterator{
ch: ch,
done: done,
err: err,
}
}
func (a *SQLAlerts) getPending() ([]*types.Alert, error) {
rows, err := a.db.Query(`
SELECT labels, annotations, starts_at, ends_at, updated_at, timeout
FROM alerts
SELECT a.labels, a.annotations, a.starts_at, a.ends_at, a.updated_at, a.timeout
FROM
alerts AS a LEFT OUTER JOIN notify_info AS n
ON a.fingerprint == n.fingerprint
WHERE
NOT (n.delivered AND n.resolved)
`)
if err != nil {
return nil, err
@ -267,6 +292,13 @@ func (a *SQLAlerts) Put(alerts ...*types.Alert) error {
return err
}
// The insert invariant requires that there are no two alerts with the same
// fingerprint that have overlapping activity range ([StartsAt:EndsAt]).
// Such alerts are merged into a single one with the union of both intervals
// as its new activity interval.
// The exact merge procedure is defined on the Alert structure. Here, we just
// care about finding intersecting alerts for each new inserts, deleting them
// if existant, and insert the new alert we retrieved by merging.
overlap, err := tx.Prepare(`
SELECT id(), annotations, starts_at, ends_at, updated_at, timeout FROM alerts
WHERE fingerprint = $1 AND $2 =< ends_at OR $3 >= starts_at
@ -299,7 +331,7 @@ func (a *SQLAlerts) Put(alerts ...*types.Alert) error {
for _, alert := range alerts {
fp := alert.Fingerprint()
// Retrieve all overlapping alerts and delete them.
// Retrieve all intersecting alerts and delete them.
olaps, err := overlap.Query(fp, alert.StartsAt, alert.EndsAt)
if err != nil {
tx.Rollback()
@ -377,6 +409,12 @@ func (a *SQLAlerts) Put(alerts ...*types.Alert) error {
tx.Rollback()
return err
}
a.mtx.RLock()
for _, ch := range a.listeners {
ch <- alert
}
a.mtx.RUnlock()
}
tx.Commit()