From cc5662f1e83bfc4b3119e87210024eb92d543c9b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 6 Oct 2015 21:07:16 +0200 Subject: [PATCH] Integrate alert provider with notify persistence --- provider/mem.go | 3 ++- provider/sql.go | 52 ++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/provider/mem.go b/provider/mem.go index e90f30c2..dd0dfd34 100644 --- a/provider/mem.go +++ b/provider/mem.go @@ -42,13 +42,14 @@ func NewMemData() *MemData { type memAlertIterator struct { ch <-chan *types.Alert done chan struct{} + err error } func (ai memAlertIterator) Next() <-chan *types.Alert { return ai.ch } -func (ai memAlertIterator) Err() error { return nil } +func (ai memAlertIterator) Err() error { return ai.err } func (ai memAlertIterator) Close() { close(ai.done) } // MemAlerts implements an Alerts provider based on in-memory data. diff --git a/provider/sql.go b/provider/sql.go index f820527c..9535ef25 100644 --- a/provider/sql.go +++ b/provider/sql.go @@ -176,9 +176,6 @@ func (a *SQLAlerts) Subscribe() AlertIterator { done = make(chan struct{}) ) alerts, err := a.getPending() - if err != nil { - panic(err) - } i := a.next a.next++ @@ -207,17 +204,45 @@ func (a *SQLAlerts) Subscribe() AlertIterator { return memAlertIterator{ ch: ch, done: done, + err: err, } } func (a *SQLAlerts) GetPending() AlertIterator { - return nil + var ( + ch = make(chan *types.Alert, 200) + done = make(chan struct{}) + ) + + alerts, err := a.getPending() + + go func() { + defer close(ch) + + for _, a := range alerts { + select { + case ch <- a: + case <-done: + return + } + } + }() + + return memAlertIterator{ + ch: ch, + done: done, + err: err, + } } func (a *SQLAlerts) getPending() ([]*types.Alert, error) { rows, err := a.db.Query(` - SELECT labels, annotations, starts_at, ends_at, updated_at, timeout - FROM alerts + SELECT a.labels, a.annotations, a.starts_at, a.ends_at, a.updated_at, a.timeout + FROM + alerts AS a LEFT OUTER JOIN notify_info AS n + ON a.fingerprint == n.fingerprint + WHERE + NOT (n.delivered AND n.resolved) `) if err != nil { return nil, err @@ -267,6 +292,13 @@ func (a *SQLAlerts) Put(alerts ...*types.Alert) error { return err } + // The insert invariant requires that there are no two alerts with the same + // fingerprint that have overlapping activity range ([StartsAt:EndsAt]). + // Such alerts are merged into a single one with the union of both intervals + // as its new activity interval. + // The exact merge procedure is defined on the Alert structure. Here, we just + // care about finding intersecting alerts for each new inserts, deleting them + // if existant, and insert the new alert we retrieved by merging. overlap, err := tx.Prepare(` SELECT id(), annotations, starts_at, ends_at, updated_at, timeout FROM alerts WHERE fingerprint = $1 AND $2 =< ends_at OR $3 >= starts_at @@ -299,7 +331,7 @@ func (a *SQLAlerts) Put(alerts ...*types.Alert) error { for _, alert := range alerts { fp := alert.Fingerprint() - // Retrieve all overlapping alerts and delete them. + // Retrieve all intersecting alerts and delete them. olaps, err := overlap.Query(fp, alert.StartsAt, alert.EndsAt) if err != nil { tx.Rollback() @@ -377,6 +409,12 @@ func (a *SQLAlerts) Put(alerts ...*types.Alert) error { tx.Rollback() return err } + + a.mtx.RLock() + for _, ch := range a.listeners { + ch <- alert + } + a.mtx.RUnlock() } tx.Commit()