diff --git a/provider/sql.go b/provider/sql.go index dd8ac0a8..8ee58aac 100644 --- a/provider/sql.go +++ b/provider/sql.go @@ -3,8 +3,10 @@ package provider import ( "database/sql" "encoding/json" + "sync" "github.com/cznic/ql" + "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/alertmanager/types" @@ -14,6 +16,250 @@ func init() { ql.RegisterDriver() } +type SQLAlerts struct { + db *sql.DB + + mtx sync.RWMutex + listeners map[int]chan *types.Alert + next int +} + +func NewSQLAlerts(file string) (*SQLAlerts, error) { + db, err := sql.Open("ql", file) + if err != nil { + return nil, err + } + + tx, err := db.Begin() + if err != nil { + return nil, err + } + if _, err := tx.Exec(createAlertsTable); err != nil { + tx.Rollback() + return nil, err + } + tx.Commit() + + return &SQLAlerts{ + db: db, + listeners: map[int]chan *types.Alert{}, + }, nil +} + +const createAlertsTable = ` +CREATE TABLE IF NOT EXISTS alerts ( + fingerprint uint64 + labels string + annotations string + starts_at time + ends_at time + updated_at time + timeout bool +); +CREATE INDEX IF NOT EXISTS alerts_start ON alerts (starts_at); +CREATE INDEX IF NOT EXISTS alerts_end ON alerts (ends_at); +` + +func (a *SQLAlerts) Subscribe() AlertIterator { + var ( + ch = make(chan *types.Alert, 200) + done = make(chan struct{}) + ) + alerts, err := a.getPending() + if err != nil { + panic(err) + } + + i := a.next + a.next++ + + a.listeners[i] = ch + + go func() { + defer func() { + a.mtx.Lock() + delete(a.listeners, i) + close(ch) + a.mtx.Unlock() + }() + + for _, a := range alerts { + select { + case ch <- a: + case <-done: + return + } + } + + <-done + }() + + return memAlertIterator{ + ch: ch, + done: done, + } +} + +func (a *SQLAlerts) GetPending() AlertIterator { + return nil +} + +func (a *SQLAlerts) getPending() ([]*types.Alert, error) { + rows, err := a.db.Query(` + SELECT labels, annotations, starts_at, ends_at, updated_at, timeout + FROM alerts + `) + if err != nil { + return nil, err + } + + var alerts []*types.Alert + for rows.Next() { + var ( + labels string + annotations string + al types.Alert + ) + if err := rows.Scan(&labels, &annotations, &al.StartsAt, &al.EndsAt, &al.UpdatedAt, &al.Timeout); err != nil { + return nil, err + } + + if err := json.Unmarshal([]byte(labels), &al.Labels); err != nil { + return nil, err + } + if err := json.Unmarshal([]byte(annotations), &al.Annotations); err != nil { + return nil, err + } + + alerts = append(alerts, &al) + } + + if err := rows.Err(); err != nil { + return nil, err + } + return alerts, nil +} + +func (a *SQLAlerts) Get(model.Fingerprint) (*types.Alert, error) { + return nil, nil +} + +func (a *SQLAlerts) Put(alerts ...*types.Alert) error { + tx, err := a.db.Begin() + if err != nil { + return err + } + + overlap, err := tx.Prepare(` + SELECT id(), annotations, starts_at, ends_at, updated_at, timeout FROM alerts + WHERE fingerprint = $1 AND $2 =< ends_at OR $3 >= starts_at + `) + if err != nil { + return err + } + defer overlap.Close() + + delOverlap, err := tx.Prepare(` + DELETE FROM alerts WHERE id() IN ( + SELECT id() FROM alerts + WHERE fingerprint = $1 AND $2 =< ends_at OR $3 >= starts_at + ) + `) + if err != nil { + return err + } + defer delOverlap.Close() + + insert, err := tx.Prepare(` + INSERT INTO alerts(fingerprint, labels, annotations, starts_at, ends_at, updated_at, timeout) + VALUES ($1, $2, $3, $4, $5, $6, $7) + `) + if err != nil { + return err + } + defer insert.Close() + + for _, alert := range alerts { + fp := alert.Fingerprint() + + // Retrieve all overlapping alerts and delete them. + olaps, err := overlap.Query(fp, alert.StartsAt, alert.EndsAt) + if err != nil { + tx.Rollback() + return err + } + + var ( + overlapIDs []int64 + merges []*types.Alert + ) + for olaps.Next() { + var ( + id int64 + na types.Alert + ann string + ) + if err := olaps.Scan(&id, &ann, &na.StartsAt, &na.EndsAt, &na.UpdatedAt, &na.Timeout); err != nil { + tx.Rollback() + return err + } + if err := json.Unmarshal([]byte(ann), &na.Annotations); err != nil { + tx.Rollback() + return err + } + na.Labels = alert.Labels + + merges = append(merges, &na) + overlapIDs = append(overlapIDs, id) + } + if err := olaps.Err(); err != nil { + tx.Rollback() + return err + } + + // Merge them. + for _, ma := range merges { + alert = alert.Merge(ma) + } + + // Delete the old ones. + if _, err := delOverlap.Exec(fp, alert.StartsAt, alert.EndsAt); err != nil { + tx.Rollback() + return err + } + + // Insert the final alert. + labels, err := json.Marshal(alert.Labels) + if err != nil { + tx.Rollback() + return err + } + annotations, err := json.Marshal(alert.Annotations) + if err != nil { + tx.Rollback() + return err + } + + _, err = insert.Exec( + fp, + string(labels), + string(annotations), + alert.StartsAt, + alert.EndsAt, + alert.UpdatedAt, + alert.Timeout, + ) + if err != nil { + tx.Rollback() + return err + } + } + + tx.Commit() + + return nil +} + type SQLSilences struct { db *sql.DB } @@ -72,7 +318,11 @@ func (s *SQLSilences) Mutes(lset model.LabelSet) bool { } func (s *SQLSilences) All() ([]*types.Silence, error) { - rows, err := s.db.Query(`SELECT id(), matchers, starts_at, ends_at, created_at, created_by, comment FROM silences ORDER BY starts_at DESC`) + rows, err := s.db.Query(` + SELECT id(), matchers, starts_at, ends_at, created_at, created_by, comment + FROM silences + ORDER BY starts_at DESC + `) if err != nil { return nil, err } @@ -114,7 +364,10 @@ func (s *SQLSilences) Set(sil *types.Silence) (uint64, error) { return 0, err } - res, err := tx.Exec(`INSERT INTO silences VALUES ($1, $2, $3, $4, $5, $6)`, + res, err := tx.Exec(` + INSERT INTO silences(matchers, starts_at, ends_at, created_at, created_by. comment) + VALUES ($1, $2, $3, $4, $5, $6) + `, string(mb), sil.StartsAt, sil.EndsAt, @@ -154,7 +407,11 @@ func (s *SQLSilences) Del(sid uint64) error { } func (s *SQLSilences) Get(sid uint64) (*types.Silence, error) { - row := s.db.QueryRow(`SELECT id(), matchers, starts_at, ends_at, created_at, created_by, comment FROM silences WHERE id() == $1`, sid) + row := s.db.QueryRow(` + SELECT id(), matchers, starts_at, ends_at, created_at, created_by, comment + FROM silences + WHERE id() == $1 + `, sid) var ( sil model.Silence