Merge pull request #420 from prometheus/fabxc-memalerts

provider/boltmem: make alerts purely in-memory.
This commit is contained in:
Fabian Reinartz 2016-07-07 11:16:33 +02:00 committed by GitHub
commit e922acd088
2 changed files with 77 additions and 89 deletions

View File

@ -16,9 +16,9 @@ package boltmem
import ( import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/provider"
@ -30,38 +30,59 @@ import (
var ( var (
bktNotificationInfo = []byte("notification_info") bktNotificationInfo = []byte("notification_info")
bktSilences = []byte("silences") bktSilences = []byte("silences")
bktAlerts = []byte("alerts") // bktAlerts = []byte("alerts")
) )
// Alerts gives access to a set of alerts. All methods are goroutine-safe. // Alerts gives access to a set of alerts. All methods are goroutine-safe.
type Alerts struct { type Alerts struct {
db *bolt.DB mtx sync.RWMutex
alerts map[model.Fingerprint]*types.Alert
stopGC chan struct{}
mtx sync.RWMutex
listeners map[int]chan *types.Alert listeners map[int]chan *types.Alert
next int next int
} }
// NewAlerts returns a new alert provider. // NewAlerts returns a new alert provider.
func NewAlerts(path string) (*Alerts, error) { func NewAlerts(path string) (*Alerts, error) {
db, err := bolt.Open(filepath.Join(path, "alerts.db"), 0666, nil) a := &Alerts{
if err != nil { alerts: map[model.Fingerprint]*types.Alert{},
return nil, err stopGC: make(chan struct{}),
}
err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bktAlerts)
return err
})
return &Alerts{
db: db,
listeners: map[int]chan *types.Alert{}, listeners: map[int]chan *types.Alert{},
next: 0, next: 0,
}, err }
go a.runGC()
return a, nil
}
func (a *Alerts) runGC() {
for {
select {
case <-a.stopGC:
return
case <-time.After(30 * time.Minute):
}
a.mtx.Lock()
for fp, alert := range a.alerts {
// As we don't persist alerts, we no longer consider them after
// they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly.
if alert.EndsAt.Before(time.Now()) {
delete(a.alerts, fp)
}
}
a.mtx.Unlock()
}
} }
// Close the alert provider. // Close the alert provider.
func (a *Alerts) Close() error { func (a *Alerts) Close() error {
return a.db.Close() close(a.stopGC)
return nil
} }
// Subscribe returns an iterator over active alerts that have not been // Subscribe returns an iterator over active alerts that have not been
@ -128,42 +149,28 @@ func (a *Alerts) GetPending() provider.AlertIterator {
} }
func (a *Alerts) getPending() ([]*types.Alert, error) { func (a *Alerts) getPending() ([]*types.Alert, error) {
var alerts []*types.Alert a.mtx.RLock()
defer a.mtx.RUnlock()
err := a.db.View(func(tx *bolt.Tx) error { res := make([]*types.Alert, 0, len(a.alerts))
b := tx.Bucket(bktAlerts)
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() { for _, alert := range a.alerts {
var a types.Alert res = append(res, alert)
if err := json.Unmarshal(v, &a); err != nil { }
return err
}
alerts = append(alerts, &a)
}
return nil return res, nil
})
return alerts, err
} }
// Get returns the alert for a given fingerprint. // Get returns the alert for a given fingerprint.
func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
var alert types.Alert a.mtx.RLock()
err := a.db.View(func(tx *bolt.Tx) error { defer a.mtx.RUnlock()
b := tx.Bucket(bktAlerts)
fpb := make([]byte, 8) alert, ok := a.alerts[fp]
binary.BigEndian.PutUint64(fpb, uint64(fp)) if !ok {
return nil, provider.ErrNotFound
ab := b.Get(fpb) }
if ab == nil { return alert, nil
return provider.ErrNotFound
}
return json.Unmarshal(ab, &alert)
})
return &alert, err
} }
// Put adds the given alert to the set. // Put adds the given alert to the set.
@ -171,46 +178,25 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
err := a.db.Update(func(tx *bolt.Tx) error { for _, alert := range alerts {
b := tx.Bucket(bktAlerts) fp := alert.Fingerprint()
for _, alert := range alerts { if old, ok := a.alerts[fp]; ok {
fp := make([]byte, 8) // Merge alerts if there is an overlap in activity range.
binary.BigEndian.PutUint64(fp, uint64(alert.Fingerprint())) if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
ab := b.Get(fp) alert = old.Merge(alert)
// Merge the alert with the existing one.
if ab != nil {
var old types.Alert
if err := json.Unmarshal(ab, &old); err != nil {
return fmt.Errorf("decoding alert failed: %s", err)
}
// Merge alerts if there is an overlap in activity range.
if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
alert = old.Merge(alert)
}
}
ab, err := json.Marshal(alert)
if err != nil {
return fmt.Errorf("encoding alert failed: %s", err)
}
if err := b.Put(fp, ab); err != nil {
return fmt.Errorf("writing alert failed: %s", err)
}
// Send the update to all subscribers.
for _, ch := range a.listeners {
ch <- alert
} }
} }
return nil
})
return err a.alerts[fp] = alert
for _, ch := range a.listeners {
ch <- alert
}
}
return nil
} }
// Silences gives access to silences. All methods are goroutine-safe. // Silences gives access to silences. All methods are goroutine-safe.

View File

@ -87,15 +87,16 @@ receivers:
// Test against a bug which ocurrec after a restart. The previous occurrence of // Test against a bug which ocurrec after a restart. The previous occurrence of
// the alert was sent rather than the most recent one. // the alert was sent rather than the most recent one.
at.Do(At(6.7), func() { // XXX(fabxc): temporarily disable while alerts are not persisted.
am.Terminate() // at.Do(At(6.7), func() {
am.Start() // am.Terminate()
}) // am.Start()
// })
// On restart the alert is flushed right away as the group_wait has already passed. // On restart the alert is flushed right away as the group_wait has already passed.
// However, it must be caught in the deduplication stage. // However, it must be caught in the deduplication stage.
// The next attempt will be 1s later and won't be filtered in deduping. // The next attempt will be 1s later and won't be filtered in deduping.
co.Want(Between(7.7, 8), Alert("alertname", "test").Active(5.3)) // co.Want(Between(7.7, 8), Alert("alertname", "test").Active(5.3))
at.Run() at.Run()
} }
@ -138,10 +139,11 @@ receivers:
// Times are provided in fractions of seconds. // Times are provided in fractions of seconds.
am.Push(At(1), Alert("alertname", "test").Active(1)) am.Push(At(1), Alert("alertname", "test").Active(1))
at.Do(At(1.2), func() { // XXX(fabxc): disabled as long as alerts are not persisted.
am.Terminate() // at.Do(At(1.2), func() {
am.Start() // am.Terminate()
}) // am.Start()
// })
am.Push(At(3.5), Alert("alertname", "test").Active(1, 3)) am.Push(At(3.5), Alert("alertname", "test").Active(1, 3))
// Declare which alerts are expected to arrive at the collector within // Declare which alerts are expected to arrive at the collector within