diff --git a/provider/boltmem/boltmem.go b/provider/boltmem/boltmem.go index 0c3481f6..d224ff4a 100644 --- a/provider/boltmem/boltmem.go +++ b/provider/boltmem/boltmem.go @@ -16,9 +16,9 @@ package boltmem import ( "encoding/binary" "encoding/json" - "fmt" "path/filepath" "sync" + "time" "github.com/boltdb/bolt" "github.com/prometheus/alertmanager/provider" @@ -30,38 +30,59 @@ import ( var ( bktNotificationInfo = []byte("notification_info") bktSilences = []byte("silences") - bktAlerts = []byte("alerts") + // bktAlerts = []byte("alerts") ) // Alerts gives access to a set of alerts. All methods are goroutine-safe. type Alerts struct { - db *bolt.DB + mtx sync.RWMutex + alerts map[model.Fingerprint]*types.Alert + stopGC chan struct{} - mtx sync.RWMutex listeners map[int]chan *types.Alert next int } // NewAlerts returns a new alert provider. func NewAlerts(path string) (*Alerts, error) { - db, err := bolt.Open(filepath.Join(path, "alerts.db"), 0666, nil) - if err != nil { - return nil, err - } - err = db.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(bktAlerts) - return err - }) - return &Alerts{ - db: db, + a := &Alerts{ + alerts: map[model.Fingerprint]*types.Alert{}, + stopGC: make(chan struct{}), listeners: map[int]chan *types.Alert{}, next: 0, - }, err + } + go a.runGC() + + return a, nil +} + +func (a *Alerts) runGC() { + for { + select { + case <-a.stopGC: + return + case <-time.After(30 * time.Minute): + } + + a.mtx.Lock() + + for fp, alert := range a.alerts { + // As we don't persist alerts, we no longer consider them after + // they are resolved. Alerts waiting for resolved notifications are + // held in memory in aggregation groups redundantly. + if alert.EndsAt.Before(time.Now()) { + delete(a.alerts, fp) + } + } + + a.mtx.Unlock() + } } // Close the alert provider. func (a *Alerts) Close() error { - return a.db.Close() + close(a.stopGC) + return nil } // Subscribe returns an iterator over active alerts that have not been @@ -128,42 +149,28 @@ func (a *Alerts) GetPending() provider.AlertIterator { } func (a *Alerts) getPending() ([]*types.Alert, error) { - var alerts []*types.Alert + a.mtx.RLock() + defer a.mtx.RUnlock() - err := a.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(bktAlerts) - c := b.Cursor() + res := make([]*types.Alert, 0, len(a.alerts)) - for k, v := c.First(); k != nil; k, v = c.Next() { - var a types.Alert - if err := json.Unmarshal(v, &a); err != nil { - return err - } - alerts = append(alerts, &a) - } + for _, alert := range a.alerts { + res = append(res, alert) + } - return nil - }) - return alerts, err + return res, nil } // Get returns the alert for a given fingerprint. func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { - var alert types.Alert - err := a.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(bktAlerts) + a.mtx.RLock() + defer a.mtx.RUnlock() - fpb := make([]byte, 8) - binary.BigEndian.PutUint64(fpb, uint64(fp)) - - ab := b.Get(fpb) - if ab == nil { - return provider.ErrNotFound - } - - return json.Unmarshal(ab, &alert) - }) - return &alert, err + alert, ok := a.alerts[fp] + if !ok { + return nil, provider.ErrNotFound + } + return alert, nil } // Put adds the given alert to the set. @@ -171,46 +178,25 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { a.mtx.Lock() defer a.mtx.Unlock() - err := a.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(bktAlerts) + for _, alert := range alerts { + fp := alert.Fingerprint() - for _, alert := range alerts { - fp := make([]byte, 8) - binary.BigEndian.PutUint64(fp, uint64(alert.Fingerprint())) - - ab := b.Get(fp) - - // Merge the alert with the existing one. - if ab != nil { - var old types.Alert - if err := json.Unmarshal(ab, &old); err != nil { - return fmt.Errorf("decoding alert failed: %s", err) - } - // Merge alerts if there is an overlap in activity range. - if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) || - (alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) { - alert = old.Merge(alert) - } - } - - ab, err := json.Marshal(alert) - if err != nil { - return fmt.Errorf("encoding alert failed: %s", err) - } - - if err := b.Put(fp, ab); err != nil { - return fmt.Errorf("writing alert failed: %s", err) - } - - // Send the update to all subscribers. - for _, ch := range a.listeners { - ch <- alert + if old, ok := a.alerts[fp]; ok { + // Merge alerts if there is an overlap in activity range. + if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) || + (alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) { + alert = old.Merge(alert) } } - return nil - }) - return err + a.alerts[fp] = alert + + for _, ch := range a.listeners { + ch <- alert + } + } + + return nil } // Silences gives access to silences. All methods are goroutine-safe. diff --git a/test/acceptance/send_test.go b/test/acceptance/send_test.go index c27b3183..c6d60a75 100644 --- a/test/acceptance/send_test.go +++ b/test/acceptance/send_test.go @@ -87,15 +87,16 @@ receivers: // Test against a bug which ocurrec after a restart. The previous occurrence of // the alert was sent rather than the most recent one. - at.Do(At(6.7), func() { - am.Terminate() - am.Start() - }) + // XXX(fabxc): temporarily disable while alerts are not persisted. + // at.Do(At(6.7), func() { + // am.Terminate() + // am.Start() + // }) // On restart the alert is flushed right away as the group_wait has already passed. // However, it must be caught in the deduplication stage. // The next attempt will be 1s later and won't be filtered in deduping. - co.Want(Between(7.7, 8), Alert("alertname", "test").Active(5.3)) + // co.Want(Between(7.7, 8), Alert("alertname", "test").Active(5.3)) at.Run() } @@ -138,10 +139,11 @@ receivers: // Times are provided in fractions of seconds. am.Push(At(1), Alert("alertname", "test").Active(1)) - at.Do(At(1.2), func() { - am.Terminate() - am.Start() - }) + // XXX(fabxc): disabled as long as alerts are not persisted. + // at.Do(At(1.2), func() { + // am.Terminate() + // am.Start() + // }) am.Push(At(3.5), Alert("alertname", "test").Active(1, 3)) // Declare which alerts are expected to arrive at the collector within