alertmanager/notify/notify.go

604 lines
17 KiB
Go

// Copyright 2015 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notify
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"fmt"
"sync"
"time"
"github.com/cenkalti/backoff"
"github.com/golang/protobuf/ptypes"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
)
var (
numNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_total",
Help: "The total number of attempted notifications.",
}, []string{"integration"})
numFailedNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_failed_total",
Help: "The total number of failed notifications.",
}, []string{"integration"})
)
func init() {
prometheus.Register(numNotifications)
prometheus.Register(numFailedNotifications)
}
// MinTimeout is the minimum timeout that is set for the context of a call
// to a notification pipeline.
const MinTimeout = 10 * time.Second
// notifyKey defines a custom type with which a context is populated to
// avoid accidental collisions.
type notifyKey int
const (
keyReceiverName notifyKey = iota
keyRepeatInterval
keyGroupLabels
keyGroupKey
keyNotificationHash
keyNow
)
// WithReceiverName populates a context with a receiver name.
func WithReceiverName(ctx context.Context, rcv string) context.Context {
return context.WithValue(ctx, keyReceiverName, rcv)
}
// WithGroupKey populates a context with a group key.
func WithGroupKey(ctx context.Context, fp model.Fingerprint) context.Context {
return context.WithValue(ctx, keyGroupKey, fp)
}
// WithNotificationHash populates a context with a notification hash.
func WithNotificationHash(ctx context.Context, hash []byte) context.Context {
return context.WithValue(ctx, keyNotificationHash, hash)
}
// WithGroupLabels populates a context with grouping labels.
func WithGroupLabels(ctx context.Context, lset model.LabelSet) context.Context {
return context.WithValue(ctx, keyGroupLabels, lset)
}
// WithNow populates a context with a now timestamp.
func WithNow(ctx context.Context, t time.Time) context.Context {
return context.WithValue(ctx, keyNow, t)
}
// WithRepeatInterval populates a context with a repeat interval.
func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context {
return context.WithValue(ctx, keyRepeatInterval, t)
}
// RepeatInterval extracts a repeat interval from the context. Iff none exists, the
// second argument is false.
func RepeatInterval(ctx context.Context) (time.Duration, bool) {
v, ok := ctx.Value(keyRepeatInterval).(time.Duration)
return v, ok
}
// ReceiverName extracts a receiver name from the context. Iff none exists, the
// second argument is false.
func ReceiverName(ctx context.Context) (string, bool) {
v, ok := ctx.Value(keyReceiverName).(string)
return v, ok
}
func receiverName(ctx context.Context) string {
recv, ok := ReceiverName(ctx)
if !ok {
log.Error("missing receiver")
}
return recv
}
// GroupKey extracts a group key from the context. Iff none exists, the
// second argument is false.
func GroupKey(ctx context.Context) (model.Fingerprint, bool) {
v, ok := ctx.Value(keyGroupKey).(model.Fingerprint)
return v, ok
}
func groupLabels(ctx context.Context) model.LabelSet {
groupLabels, ok := GroupLabels(ctx)
if !ok {
log.Error("missing group labels")
}
return groupLabels
}
// GroupLabels extracts grouping label set from the context. Iff none exists, the
// second argument is false.
func GroupLabels(ctx context.Context) (model.LabelSet, bool) {
v, ok := ctx.Value(keyGroupLabels).(model.LabelSet)
return v, ok
}
// Now extracts a now timestamp from the context. Iff none exists, the
// second argument is false.
func Now(ctx context.Context) (time.Time, bool) {
v, ok := ctx.Value(keyNow).(time.Time)
return v, ok
}
// NotificationHash extracts a notification hash from the context. Iff none exists,
// the second argument is false.
func NotificationHash(ctx context.Context) ([]byte, bool) {
v, ok := ctx.Value(keyNotificationHash).([]byte)
return v, ok
}
// A Stage processes alerts under the constraints of the given context.
type Stage interface {
Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
}
// StageFunc wraps a function to represent a Stage.
type StageFunc func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
// Exec implements Stage interface.
func (f StageFunc) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
return f(ctx, alerts...)
}
// BuildPipeline builds a map of receivers to Stages.
func BuildPipeline(
confs []*config.Receiver,
tmpl *template.Template,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silences *silence.Silences,
notificationLog nflog.Log,
marker types.Marker,
) RoutingStage {
rs := RoutingStage{}
is := NewInhibitStage(inhibitor, marker)
ss := NewSilenceStage(silences, marker)
for _, rc := range confs {
rs[rc.Name] = MultiStage{is, ss, createStage(rc, tmpl, wait, notificationLog)}
}
return rs
}
// createStage creates a pipeline of stages for a receiver.
func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.Duration, notificationLog nflog.Log) Stage {
var fs FanoutStage
for _, i := range BuildReceiverIntegrations(rc, tmpl) {
recv := &nflogpb.Receiver{
GroupName: rc.Name,
Integration: i.name,
Idx: uint32(i.idx),
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(notificationLog, recv))
s = append(s, NewRetryStage(i))
s = append(s, NewSetNotifiesStage(notificationLog, recv))
fs = append(fs, s)
}
return fs
}
// RoutingStage executes the inner stages based on the receiver specified in
// the context.
type RoutingStage map[string]Stage
// Exec implements the Stage interface.
func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
receiver, ok := ReceiverName(ctx)
if !ok {
return ctx, nil, fmt.Errorf("receiver missing")
}
s, ok := rs[receiver]
if !ok {
return ctx, nil, fmt.Errorf("stage for receiver missing")
}
return s.Exec(ctx, alerts...)
}
// A MultiStage executes a series of stages sequencially.
type MultiStage []Stage
// Exec implements the Stage interface.
func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var err error
for _, s := range ms {
if len(alerts) == 0 {
return ctx, nil, nil
}
ctx, alerts, err = s.Exec(ctx, alerts...)
if err != nil {
return ctx, nil, err
}
}
return ctx, alerts, nil
}
// FanoutStage executes its stages concurrently
type FanoutStage []Stage
// Exec attempts to execute all stages concurrently and discards the results.
// It returns its input alerts and a types.MultiError if one or more stages fail.
func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
wg sync.WaitGroup
me types.MultiError
)
wg.Add(len(fs))
for _, s := range fs {
go func(s Stage) {
if _, _, err := s.Exec(ctx, alerts...); err != nil {
me.Add(err)
log.Errorf("Error on notify: %s", err)
}
wg.Done()
}(s)
}
wg.Wait()
if me.Len() > 0 {
return ctx, alerts, &me
}
return ctx, alerts, nil
}
// InhibitStage filters alerts through an inhibition muter.
type InhibitStage struct {
muter types.Muter
marker types.Marker
}
// NewInhibitStage return a new InhibitStage.
func NewInhibitStage(m types.Muter, mk types.Marker) *InhibitStage {
return &InhibitStage{
muter: m,
marker: mk,
}
}
// Exec implements the Stage interface.
func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
ok := n.marker.Inhibited(a.Fingerprint())
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
if !n.muter.Mutes(a.Labels) {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
// Store whether a previously inhibited alert is firing again.
a.WasInhibited = ok
}
}
return ctx, filtered, nil
}
// SilenceStage filters alerts through a silence muter.
type SilenceStage struct {
silences *silence.Silences
marker types.Marker
}
// NewSilenceStage returns a new SilenceStage.
func NewSilenceStage(s *silence.Silences, mk types.Marker) *SilenceStage {
return &SilenceStage{
silences: s,
marker: mk,
}
}
// Exec implements the Stage interface.
func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
_, ok := n.marker.Silenced(a.Fingerprint())
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
sils, err := n.silences.Query(
silence.QState(silence.StateActive),
silence.QMatches(a.Labels),
)
if err != nil {
log.Errorf("Querying silences failed: %s", err)
}
if len(sils) == 0 {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
n.marker.SetSilenced(a.Labels.Fingerprint())
// Store whether a previously silenced alert is firing again.
a.WasSilenced = ok
} else {
n.marker.SetSilenced(a.Labels.Fingerprint(), sils[0].Id)
}
}
return ctx, filtered, nil
}
// WaitStage waits for a certain amount of time before continuing or until the
// context is done.
type WaitStage struct {
wait func() time.Duration
}
// NewWaitStage returns a new WaitStage.
func NewWaitStage(wait func() time.Duration) *WaitStage {
return &WaitStage{
wait: wait,
}
}
// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
return ctx, nil, ctx.Err()
}
return ctx, alerts, nil
}
// DedupStage filters alerts.
// Filtering happens based on a notification log.
type DedupStage struct {
nflog nflog.Log
recv *nflogpb.Receiver
// TODO(fabxc): consider creating an AlertBatch type received
// by stages that implements these functions.
// This can then also handle caching so we can skip passing
// the hash around as a context.
hash func([]*types.Alert) []byte
resolved func([]*types.Alert) bool
now func() time.Time
}
// NewDedupStage wraps a DedupStage that runs against the given notification log.
func NewDedupStage(l nflog.Log, recv *nflogpb.Receiver) *DedupStage {
return &DedupStage{
nflog: l,
recv: recv,
hash: hashAlerts,
resolved: allAlertsResolved,
now: utcNow,
}
}
func utcNow() time.Time {
return time.Now().UTC()
}
// TODO(fabxc): this could get slow, but is fine for now. We may want to
// have something mor sophisticated at some point.
// Alternatives are FNV64a as in fingerprints or xxhash.
func hashAlerts(alerts []*types.Alert) []byte {
// The xor'd sum so we don't have to sort the alerts.
// XXX(fabxc): this approach caused collision issues with FNV64a in
// the past. However, sha256 should not suffer from the bit cancelation
// in in small input changes.
xsum := [sha256.Size]byte{}
for _, a := range alerts {
b := make([]byte, 9)
binary.BigEndian.PutUint64(b, uint64(a.Fingerprint()))
// Resolved status is part of the identity.
if a.Resolved() {
b[8] = 1
}
for i, b := range sha256.Sum256(b) {
xsum[i] ^= b
}
}
return xsum[:]
}
func allAlertsResolved(alerts []*types.Alert) bool {
for _, a := range alerts {
if !a.Resolved() {
return false
}
}
return true
}
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, hash []byte, resolved bool, repeat time.Duration) (bool, error) {
// If we haven't notified about the alert group before, notify right away
// unless we only have resolved alerts.
if entry == nil {
return !resolved, nil
}
// Check whether the contents have changed.
if !bytes.Equal(entry.GroupHash, hash) {
return true, nil
}
// Nothing changed, only notify if the repeat interval has passed.
ts, err := ptypes.Timestamp(entry.Timestamp)
if err != nil {
return false, err
}
return ts.Before(n.now().Add(-repeat)), nil
}
// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// TODO(fabxc): GroupKey will turn into []byte eventually.
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}
gkeyb := make([]byte, 8)
binary.BigEndian.PutUint64(gkeyb, uint64(gkey))
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, fmt.Errorf("repeat interval missing")
}
hash := n.hash(alerts)
resolved := n.resolved(alerts)
ctx = WithNotificationHash(ctx, hash)
entries, err := n.nflog.Query(nflog.QGroupKey(gkeyb), nflog.QReceiver(n.recv))
if err != nil && err != nflog.ErrNotFound {
return ctx, nil, err
}
var entry *nflogpb.Entry
switch len(entries) {
case 0:
case 1:
entry = entries[0]
case 2:
return ctx, nil, fmt.Errorf("Unexpected entry result size %d", len(entries))
}
if ok, err := n.needsUpdate(entry, hash, resolved, repeatInterval); err != nil {
return ctx, nil, err
} else if ok {
return ctx, alerts, nil
}
return ctx, nil, nil
}
// RetryStage notifies via passed integration with exponential backoff until it
// succeeds. It aborts if the context is canceled or timed out.
type RetryStage struct {
integration Integration
}
// NewRetryStage returns a new instance of a RetryStage.
func NewRetryStage(i Integration) *RetryStage {
return &RetryStage{
integration: i,
}
}
// Exec implements the Stage interface.
func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
i = 0
b = backoff.NewExponentialBackOff()
tick = backoff.NewTicker(b)
iErr error
)
defer tick.Stop()
for {
i++
// Always check the context first to not notify again.
select {
case <-ctx.Done():
if iErr != nil {
return ctx, nil, iErr
}
return ctx, nil, ctx.Err()
default:
}
select {
case <-tick.C:
if retry, err := r.integration.Notify(ctx, alerts...); err != nil {
numFailedNotifications.WithLabelValues(r.integration.name).Inc()
log.Debugf("Notify attempt %d failed: %s", i, err)
if !retry {
return ctx, alerts, fmt.Errorf("Cancelling notify retry due to unrecoverable error: %s", err)
}
// Save this error to be able to return the last seen error by an
// integration upon context timeout.
iErr = err
} else {
numNotifications.WithLabelValues(r.integration.name).Inc()
return ctx, alerts, nil
}
case <-ctx.Done():
if iErr != nil {
return ctx, nil, iErr
}
return ctx, nil, ctx.Err()
}
}
}
// SetNotifiesStage sets the notification information about passed alerts. The
// passed alerts should have already been sent to the receivers.
type SetNotifiesStage struct {
nflog nflog.Log
recv *nflogpb.Receiver
resolved func([]*types.Alert) bool
}
// NewSetNotifiesStage returns a new instance of a SetNotifiesStage.
func NewSetNotifiesStage(l nflog.Log, recv *nflogpb.Receiver) *SetNotifiesStage {
return &SetNotifiesStage{
nflog: l,
recv: recv,
resolved: allAlertsResolved,
}
}
// Exec implements the Stage interface.
func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
hash, ok := NotificationHash(ctx)
if !ok {
return ctx, nil, fmt.Errorf("notification hash missing")
}
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}
gkeyb := make([]byte, 8)
binary.BigEndian.PutUint64(gkeyb, uint64(gkey))
if n.resolved(alerts) {
return ctx, alerts, n.nflog.LogResolved(n.recv, gkeyb, hash)
}
return ctx, alerts, n.nflog.LogActive(n.recv, gkeyb, hash)
}