alertmanager/manager/dispatch.go

351 lines
7.3 KiB
Go
Raw Normal View History

package manager
import (
"fmt"
"sync"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/log"
)
const ResolveTimeout = 35 * time.Second
// Dispatcher dispatches alerts. It is absed on the alert's data
// rather than the time they arrive. Thus it can recover it's state
// without persistence.
type Dispatcher struct {
state State
aggrGroups map[model.Fingerprint]*aggrGroup
notifiers map[string]Notifier
mtx sync.RWMutex
}
func NewDispatcher(state State, notifiers []Notifier) *Dispatcher {
disp := &Dispatcher{
state: state,
aggrGroups: map[model.Fingerprint]*aggrGroup{},
notifiers: map[string]Notifier{},
}
for _, n := range notifiers {
disp.notifiers[n.Name()] = n
}
return disp
}
func (d *Dispatcher) filter(alerts ...*Alert) ([]*Alert, error) {
silences, err := d.state.Silence().List()
if err != nil {
return nil, err
}
var sentAlerts []*Alert
for _, alert := range alerts {
add := true
// None of the existing silences must match the alert.
for _, sil := range silences {
if sil.Match(alert.Labels) {
add = false
break
}
}
if !add {
continue
}
// Filter out alerts that have already been sent.
ni, err := d.state.Notify().Get(alert.Fingerprint())
// Always try to send on error as the safest option.
if err == nil && ni.LastSent.Before(alert.ResolvedAt) && ni.LastResolved == alert.Resolved() {
continue
}
sentAlerts = append(sentAlerts, alert)
}
return sentAlerts, nil
}
func (d *Dispatcher) notify(name string, alerts ...*Alert) error {
alerts, err := d.filter(alerts...)
if err != nil {
return err
}
if len(alerts) == 0 {
return nil
}
d.mtx.RLock()
notifier, ok := d.notifiers[name]
d.mtx.RUnlock()
if !ok {
return fmt.Errorf("notifier %q does not exist", name)
}
if err = notifier.Send(alerts...); err != nil {
return err
}
for _, alert := range alerts {
_ = d.state.Notify().Set(alert.Fingerprint(), &NotifyInfo{
LastSent: time.Now(),
LastResolved: alert.Resolved(),
})
}
return nil
}
func (d *Dispatcher) Run() {
var (
updates = d.state.Alert().Iter()
cleanup = time.Tick(30 * time.Second)
)
for {
select {
case <-cleanup:
// Cleanup routine.
for _, ag := range d.aggrGroups {
if ag.empty() {
ag.stop()
delete(d.aggrGroups, ag.fingerprint())
}
}
case alert := <-updates:
conf, err := d.state.Config().Get()
if err != nil {
log.Error(err)
continue
}
for _, m := range conf.Routes.Match(alert.Labels) {
d.processAlert(alert, m)
}
if alert.ResolvedAt.IsZero() {
alert.ResolvedAt = alert.CreatedAt.Add(ResolveTimeout)
}
}
}
}
func (d *Dispatcher) processAlert(alert *Alert, opts *RouteOpts) {
group := model.LabelSet{}
for ln, lv := range alert.Labels {
if _, ok := opts.GroupBy[ln]; ok {
group[ln] = lv
}
}
fp := group.Fingerprint()
ag, ok := d.aggrGroups[fp]
if !ok {
ag = newAggrGroup(d, group, opts)
d.aggrGroups[fp] = ag
}
ag.insert(alert)
}
type Silence struct {
Matchers Matchers
// The numeric ID of the silence.
ID string
// Name/email of the silence creator.
CreatedBy string
// When the silence was first created (Unix timestamp).
CreatedAt, EndsAt time.Time
// Additional comment about the silence.
Comment string
}
func (sil *Silence) Match(lset model.LabelSet) bool {
now := time.Now()
if now.Before(sil.CreatedAt) || now.After(sil.EndsAt) {
return false
}
return sil.Matchers.Match(lset)
}
// Alert models an action triggered by Prometheus.
type Alert struct {
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include an "alertname" label.
Labels model.LabelSet `json:"labels"`
// Extra key/value information which is not used for aggregation.
Payload map[string]string `json:"payload,omitempty"`
Summary string `json:"summary,omitempty"`
Description string `json:"description,omitempty"`
Runbook string `json:"runbook,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
ResolvedAt time.Time `json:"resolved_at,omitempty"`
// The authoritative timestamp.
Timestamp time.Time `json:"timestamp"`
}
// Name returns the name of the alert. It is equivalent to the "alertname" label.
func (a *Alert) Name() string {
return string(a.Labels[model.AlertNameLabel])
}
// Fingerprint returns a unique hash for the alert. It is equivalent to
// the fingerprint of the alert's label set.
func (a *Alert) Fingerprint() model.Fingerprint {
return a.Labels.Fingerprint()
}
func (a *Alert) String() string {
s := fmt.Sprintf("%s[%s]", a.Name(), a.Fingerprint())
if a.Resolved() {
return s + "[resolved]"
}
return s + "[active]"
}
func (a *Alert) Resolved() bool {
if a.ResolvedAt.IsZero() {
return false
}
return !a.ResolvedAt.After(time.Now())
}
// aggrGroup aggregates alerts into groups based on
// common values for a set of labels.
type aggrGroup struct {
dispatcher *Dispatcher
labels model.LabelSet
opts *RouteOpts
next *time.Timer
done chan struct{}
mtx sync.RWMutex
alerts map[model.Fingerprint]struct{}
hasSent bool
}
// newAggrGroup returns a new aggregation group and starts background processing
// that sends notifications about the contained alerts.
func newAggrGroup(d *Dispatcher, labels model.LabelSet, opts *RouteOpts) *aggrGroup {
ag := &aggrGroup{
dispatcher: d,
labels: labels,
opts: opts,
alerts: map[model.Fingerprint]struct{}{},
done: make(chan struct{}),
// Set an initial one-time wait before flushing
// the first batch of notifications.
next: time.NewTimer(opts.GroupWait),
}
go ag.run()
return ag
}
func (ag *aggrGroup) run() {
defer ag.next.Stop()
for {
select {
case <-ag.next.C:
ag.flush()
// Wait the configured interval before calling flush again.
ag.next.Reset(ag.opts.GroupInterval)
case <-ag.done:
return
}
}
}
func (ag *aggrGroup) stop() {
close(ag.done)
}
func (ag *aggrGroup) fingerprint() model.Fingerprint {
return ag.labels.Fingerprint()
}
// insert the alert into the aggregation group. If the aggregation group
// is empty afterwards, true is returned.
func (ag *aggrGroup) insert(alert *Alert) {
fp := alert.Fingerprint()
ag.mtx.Lock()
ag.alerts[fp] = struct{}{}
ag.mtx.Unlock()
// Immediately trigger a flush if the wait duration for this
// alert is already over.
if !ag.hasSent && alert.Timestamp.Add(ag.opts.GroupWait).Before(time.Now()) {
ag.next.Reset(0)
}
log.Infof("inserted %s", alert)
}
func (ag *aggrGroup) empty() bool {
ag.mtx.RLock()
defer ag.mtx.RUnlock()
return len(ag.alerts) == 0
}
// flush sends notifications for all new alerts.
func (ag *aggrGroup) flush() {
ag.mtx.Lock()
defer ag.mtx.Unlock()
var alerts []*Alert
for fp := range ag.alerts {
a, err := ag.dispatcher.state.Alert().Get(fp)
if err != nil {
log.Error(err)
continue
}
// TODO(fabxc): only delete if notify successful.
if a.Resolved() {
delete(ag.alerts, fp)
}
alerts = append(alerts, a)
}
ag.dispatcher.notify(ag.opts.SendTo, alerts...)
ag.hasSent = true
}
// alertTimeline is a list of alerts sorted by their timestamp.
type alertTimeline []*Alert
func (at alertTimeline) Len() int { return len(at) }
func (at alertTimeline) Less(i, j int) bool { return at[i].Timestamp.Before(at[j].Timestamp) }
func (at alertTimeline) Swap(i, j int) { at[i], at[j] = at[j], at[i] }