mirror of
https://github.com/prometheus/alertmanager
synced 2025-01-16 02:54:37 +00:00
54a8d6ea04
Rename IterActive to Subscribe. The returned iterator now returns all alerts with pending notifications which may also include resolved alerts.
277 lines
6.1 KiB
Go
277 lines
6.1 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/common/log"
|
|
"github.com/prometheus/common/model"
|
|
"golang.org/x/net/context"
|
|
|
|
"github.com/prometheus/alertmanager/config"
|
|
"github.com/prometheus/alertmanager/provider"
|
|
"github.com/prometheus/alertmanager/types"
|
|
)
|
|
|
|
const ResolveTimeout = 30 * time.Second
|
|
|
|
// Dispatcher sorts incoming alerts into aggregation groups and
|
|
// assigns the correct notifiers to each.
|
|
type Dispatcher struct {
|
|
routes Routes
|
|
alerts provider.Alerts
|
|
notifier Notifier
|
|
|
|
aggrGroups map[model.Fingerprint]*aggrGroup
|
|
|
|
mtx sync.RWMutex
|
|
done chan struct{}
|
|
ctx context.Context
|
|
cancel func()
|
|
}
|
|
|
|
// NewDispatcher returns a new Dispatcher.
|
|
func NewDispatcher(ap provider.Alerts, n Notifier) *Dispatcher {
|
|
return &Dispatcher{
|
|
alerts: ap,
|
|
notifier: n,
|
|
}
|
|
}
|
|
|
|
// ApplyConfig updates the dispatcher to match the new configuration.
|
|
func (d *Dispatcher) ApplyConfig(conf *config.Config) {
|
|
d.mtx.Lock()
|
|
defer d.mtx.Unlock()
|
|
|
|
// If a cancelation function is set, the dispatcher is running.
|
|
if d.cancel != nil {
|
|
d.Stop()
|
|
defer func() { go d.Run() }()
|
|
}
|
|
|
|
d.routes = NewRoutes(conf.Routes)
|
|
}
|
|
|
|
// Run starts dispatching alerts incoming via the updates channel.
|
|
func (d *Dispatcher) Run() {
|
|
d.done = make(chan struct{})
|
|
d.aggrGroups = map[model.Fingerprint]*aggrGroup{}
|
|
|
|
d.ctx, d.cancel = context.WithCancel(context.Background())
|
|
|
|
d.run(d.alerts.Subscribe())
|
|
}
|
|
|
|
func (d *Dispatcher) run(it provider.AlertIterator) {
|
|
cleanup := time.NewTicker(15 * time.Second)
|
|
defer cleanup.Stop()
|
|
|
|
defer it.Close()
|
|
|
|
for {
|
|
select {
|
|
case alert := <-it.Next():
|
|
// Log errors but keep trying
|
|
if err := it.Err(); err != nil {
|
|
log.Errorf("Error on alert update: %s", err)
|
|
continue
|
|
}
|
|
d.mtx.RLock()
|
|
routes := d.routes.Match(alert.Labels)
|
|
d.mtx.RUnlock()
|
|
|
|
for _, r := range routes {
|
|
d.processAlert(alert, r)
|
|
}
|
|
|
|
case <-cleanup.C:
|
|
for _, ag := range d.aggrGroups {
|
|
if ag.empty() {
|
|
ag.stop()
|
|
delete(d.aggrGroups, ag.fingerprint())
|
|
}
|
|
}
|
|
|
|
case <-d.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop the dispatcher.
|
|
func (d *Dispatcher) Stop() {
|
|
d.cancel()
|
|
d.cancel = nil
|
|
|
|
<-d.done
|
|
}
|
|
|
|
// notifyFunc is a function that performs notifcation for the alert
|
|
// with the given fingerprint. It aborts on context cancelation.
|
|
// Returns false iff notifying failed.
|
|
type notifyFunc func(context.Context, ...*types.Alert) bool
|
|
|
|
// processAlert determins in which aggregation group the alert falls
|
|
// and insert it.
|
|
func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) {
|
|
group := model.LabelSet{}
|
|
|
|
for ln, lv := range alert.Labels {
|
|
if _, ok := opts.GroupBy[ln]; ok {
|
|
group[ln] = lv
|
|
}
|
|
}
|
|
|
|
fp := group.Fingerprint()
|
|
|
|
// If the group does not exist, create it.
|
|
ag, ok := d.aggrGroups[fp]
|
|
if !ok {
|
|
ag = newAggrGroup(d.ctx, group, opts)
|
|
d.aggrGroups[fp] = ag
|
|
|
|
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
|
|
if err := d.notifier.Notify(ctx, alerts...); err != nil {
|
|
log.Errorf("Notify for %d alerts failed: %s", len(alerts), err)
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
ag.insert(alert)
|
|
}
|
|
|
|
// aggrGroup aggregates alert fingerprints into groups to which a
|
|
// common set of routing options applies.
|
|
// It emits notifications in the specified intervals.
|
|
type aggrGroup struct {
|
|
labels model.LabelSet
|
|
opts *RouteOpts
|
|
|
|
ctx context.Context
|
|
cancel func()
|
|
done chan struct{}
|
|
next *time.Timer
|
|
|
|
mtx sync.RWMutex
|
|
alerts map[model.Fingerprint]*types.Alert
|
|
hasSent bool
|
|
}
|
|
|
|
// newAggrGroup returns a new aggregation group.
|
|
func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts) *aggrGroup {
|
|
ag := &aggrGroup{
|
|
labels: labels,
|
|
opts: opts,
|
|
alerts: map[model.Fingerprint]*types.Alert{},
|
|
}
|
|
ag.ctx, ag.cancel = context.WithCancel(ctx)
|
|
|
|
return ag
|
|
}
|
|
|
|
func (ag *aggrGroup) String() string {
|
|
return fmt.Sprintf("%x", ag.fingerprint())
|
|
}
|
|
|
|
func (ag *aggrGroup) run(notify notifyFunc) {
|
|
ag.done = make(chan struct{})
|
|
|
|
// Set an initial one-time wait before flushing
|
|
// the first batch of notifications.
|
|
ag.next = time.NewTimer(ag.opts.GroupWait)
|
|
|
|
defer close(ag.done)
|
|
defer ag.next.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ag.next.C:
|
|
// Give the notifcations time until the next flush to
|
|
// finish before terminating them.
|
|
ctx, _ := context.WithTimeout(ag.ctx, ag.opts.GroupInterval)
|
|
|
|
// Populate context with the destination name.
|
|
ctx = context.WithValue(ctx, notifyName, ag.opts.SendTo)
|
|
|
|
// Wait the configured interval before calling flush again.
|
|
ag.next.Reset(ag.opts.GroupInterval)
|
|
|
|
ag.flush(func(alerts ...*types.Alert) bool {
|
|
return notify(ctx, alerts...)
|
|
})
|
|
|
|
case <-ag.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ag *aggrGroup) stop() {
|
|
// Calling cancel will terminate all in-process notifications
|
|
// and the run() loop.
|
|
ag.cancel()
|
|
<-ag.done
|
|
}
|
|
|
|
func (ag *aggrGroup) fingerprint() model.Fingerprint {
|
|
return ag.labels.Fingerprint()
|
|
}
|
|
|
|
// insert the alert into the aggregation group. If the aggregation group
|
|
// is empty afterwards, true is returned.
|
|
func (ag *aggrGroup) insert(alert *types.Alert) {
|
|
ag.mtx.Lock()
|
|
defer ag.mtx.Unlock()
|
|
|
|
ag.alerts[alert.Fingerprint()] = alert
|
|
|
|
// Immediately trigger a flush if the wait duration for this
|
|
// alert is already over.
|
|
if !ag.hasSent && alert.Timestamp.Add(ag.opts.GroupWait).Before(time.Now()) {
|
|
ag.next.Reset(0)
|
|
}
|
|
}
|
|
|
|
func (ag *aggrGroup) empty() bool {
|
|
ag.mtx.RLock()
|
|
defer ag.mtx.RUnlock()
|
|
|
|
return len(ag.alerts) == 0
|
|
}
|
|
|
|
// flush sends notifications for all new alerts.
|
|
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
|
if ag.empty() {
|
|
return
|
|
}
|
|
ag.mtx.Lock()
|
|
|
|
var (
|
|
alerts = make(map[model.Fingerprint]*types.Alert, len(ag.alerts))
|
|
alertsSlice = make([]*types.Alert, 0, len(ag.alerts))
|
|
)
|
|
for fp, alert := range ag.alerts {
|
|
alerts[fp] = alert
|
|
alertsSlice = append(alertsSlice, alert)
|
|
}
|
|
|
|
ag.mtx.Unlock()
|
|
|
|
if notify(alertsSlice...) {
|
|
ag.mtx.Lock()
|
|
for fp, a := range alerts {
|
|
// Only delete if the fingerprint has not been inserted
|
|
// again since we notified about it.
|
|
if a.Resolved() && ag.alerts[fp] == a {
|
|
delete(ag.alerts, fp)
|
|
}
|
|
}
|
|
ag.mtx.Unlock()
|
|
}
|
|
|
|
ag.hasSent = true
|
|
}
|