alertmanager/notify/notify.go
Frederic Branczyk dcf2b3afcb
notify: move resolved alert filtering to integration
Resolved alerts, even when filtered, have to end up in the
SetNotifiesStage, otherwise when an alert fires again it is ambiguous
whether it was resolved in between or not.

fixes #523
2016-10-05 17:45:35 +02:00

604 lines
17 KiB
Go

// Copyright 2015 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notify
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"fmt"
"sync"
"time"
"github.com/cenkalti/backoff"
"github.com/golang/protobuf/ptypes"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
)
var (
numNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_total",
Help: "The total number of attempted notifications.",
}, []string{"integration"})
numFailedNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_failed_total",
Help: "The total number of failed notifications.",
}, []string{"integration"})
)
func init() {
prometheus.Register(numNotifications)
prometheus.Register(numFailedNotifications)
}
// MinTimeout is the minimum timeout that is set for the context of a call
// to a notification pipeline.
const MinTimeout = 10 * time.Second
// notifyKey defines a custom type with which a context is populated to
// avoid accidental collisions.
type notifyKey int
const (
keyReceiverName notifyKey = iota
keyRepeatInterval
keyGroupLabels
keyGroupKey
keyNotificationHash
keyNow
)
// WithReceiverName populates a context with a receiver name.
func WithReceiverName(ctx context.Context, rcv string) context.Context {
return context.WithValue(ctx, keyReceiverName, rcv)
}
// WithGroupKey populates a context with a group key.
func WithGroupKey(ctx context.Context, fp model.Fingerprint) context.Context {
return context.WithValue(ctx, keyGroupKey, fp)
}
// WithNotificationHash populates a context with a notification hash.
func WithNotificationHash(ctx context.Context, hash []byte) context.Context {
return context.WithValue(ctx, keyNotificationHash, hash)
}
// WithGroupLabels populates a context with grouping labels.
func WithGroupLabels(ctx context.Context, lset model.LabelSet) context.Context {
return context.WithValue(ctx, keyGroupLabels, lset)
}
// WithNow populates a context with a now timestamp.
func WithNow(ctx context.Context, t time.Time) context.Context {
return context.WithValue(ctx, keyNow, t)
}
// WithRepeatInterval populates a context with a repeat interval.
func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context {
return context.WithValue(ctx, keyRepeatInterval, t)
}
// RepeatInterval extracts a repeat interval from the context. Iff none exists, the
// second argument is false.
func RepeatInterval(ctx context.Context) (time.Duration, bool) {
v, ok := ctx.Value(keyRepeatInterval).(time.Duration)
return v, ok
}
// ReceiverName extracts a receiver name from the context. Iff none exists, the
// second argument is false.
func ReceiverName(ctx context.Context) (string, bool) {
v, ok := ctx.Value(keyReceiverName).(string)
return v, ok
}
func receiverName(ctx context.Context) string {
recv, ok := ReceiverName(ctx)
if !ok {
log.Error("missing receiver")
}
return recv
}
// GroupKey extracts a group key from the context. Iff none exists, the
// second argument is false.
func GroupKey(ctx context.Context) (model.Fingerprint, bool) {
v, ok := ctx.Value(keyGroupKey).(model.Fingerprint)
return v, ok
}
func groupLabels(ctx context.Context) model.LabelSet {
groupLabels, ok := GroupLabels(ctx)
if !ok {
log.Error("missing group labels")
}
return groupLabels
}
// GroupLabels extracts grouping label set from the context. Iff none exists, the
// second argument is false.
func GroupLabels(ctx context.Context) (model.LabelSet, bool) {
v, ok := ctx.Value(keyGroupLabels).(model.LabelSet)
return v, ok
}
// Now extracts a now timestamp from the context. Iff none exists, the
// second argument is false.
func Now(ctx context.Context) (time.Time, bool) {
v, ok := ctx.Value(keyNow).(time.Time)
return v, ok
}
// NotificationHash extracts a notification hash from the context. Iff none exists,
// the second argument is false.
func NotificationHash(ctx context.Context) ([]byte, bool) {
v, ok := ctx.Value(keyNotificationHash).([]byte)
return v, ok
}
// A Stage processes alerts under the constraints of the given context.
type Stage interface {
Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
}
// StageFunc wraps a function to represent a Stage.
type StageFunc func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
// Exec implements Stage interface.
func (f StageFunc) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
return f(ctx, alerts...)
}
// BuildPipeline builds a map of receivers to Stages.
func BuildPipeline(
confs []*config.Receiver,
tmpl *template.Template,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silences *silence.Silences,
notificationLog nflog.Log,
marker types.Marker,
) RoutingStage {
rs := RoutingStage{}
is := NewInhibitStage(inhibitor, marker)
ss := NewSilenceStage(silences, marker)
for _, rc := range confs {
rs[rc.Name] = MultiStage{is, ss, createStage(rc, tmpl, wait, notificationLog)}
}
return rs
}
// createStage creates a pipeline of stages for a receiver.
func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.Duration, notificationLog nflog.Log) Stage {
var fs FanoutStage
for _, i := range BuildReceiverIntegrations(rc, tmpl) {
recv := &nflogpb.Receiver{
GroupName: rc.Name,
Integration: i.name,
Idx: uint32(i.idx),
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(notificationLog, recv))
s = append(s, NewRetryStage(i))
s = append(s, NewSetNotifiesStage(notificationLog, recv))
fs = append(fs, s)
}
return fs
}
// RoutingStage executes the inner stages based on the receiver specified in
// the context.
type RoutingStage map[string]Stage
// Exec implements the Stage interface.
func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
receiver, ok := ReceiverName(ctx)
if !ok {
return ctx, nil, fmt.Errorf("receiver missing")
}
s, ok := rs[receiver]
if !ok {
return ctx, nil, fmt.Errorf("stage for receiver missing")
}
return s.Exec(ctx, alerts...)
}
// A MultiStage executes a series of stages sequencially.
type MultiStage []Stage
// Exec implements the Stage interface.
func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var err error
for _, s := range ms {
if len(alerts) == 0 {
return ctx, nil, nil
}
ctx, alerts, err = s.Exec(ctx, alerts...)
if err != nil {
return ctx, nil, err
}
}
return ctx, alerts, nil
}
// FanoutStage executes its stages concurrently
type FanoutStage []Stage
// Exec attempts to execute all stages concurrently and discards the results.
// It returns its input alerts and a types.MultiError if one or more stages fail.
func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
wg sync.WaitGroup
me types.MultiError
)
wg.Add(len(fs))
for _, s := range fs {
go func(s Stage) {
if _, _, err := s.Exec(ctx, alerts...); err != nil {
me.Add(err)
log.Errorf("Error on notify: %s", err)
}
wg.Done()
}(s)
}
wg.Wait()
if me.Len() > 0 {
return ctx, alerts, &me
}
return ctx, alerts, nil
}
// InhibitStage filters alerts through an inhibition muter.
type InhibitStage struct {
muter types.Muter
marker types.Marker
}
// NewInhibitStage return a new InhibitStage.
func NewInhibitStage(m types.Muter, mk types.Marker) *InhibitStage {
return &InhibitStage{
muter: m,
marker: mk,
}
}
// Exec implements the Stage interface.
func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
ok := n.marker.Inhibited(a.Fingerprint())
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
if !n.muter.Mutes(a.Labels) {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
// Store whether a previously inhibited alert is firing again.
a.WasInhibited = ok
}
}
return ctx, filtered, nil
}
// SilenceStage filters alerts through a silence muter.
type SilenceStage struct {
silences *silence.Silences
marker types.Marker
}
// NewSilenceStage returns a new SilenceStage.
func NewSilenceStage(s *silence.Silences, mk types.Marker) *SilenceStage {
return &SilenceStage{
silences: s,
marker: mk,
}
}
// Exec implements the Stage interface.
func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
_, ok := n.marker.Silenced(a.Fingerprint())
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
sils, err := n.silences.Query(
silence.QState(silence.StateActive),
silence.QMatches(a.Labels),
)
if err != nil {
log.Errorf("Querying silences failed: %s", err)
}
if len(sils) == 0 {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
n.marker.SetSilenced(a.Labels.Fingerprint())
// Store whether a previously silenced alert is firing again.
a.WasSilenced = ok
} else {
n.marker.SetSilenced(a.Labels.Fingerprint(), sils[0].Id)
}
}
return ctx, filtered, nil
}
// WaitStage waits for a certain amount of time before continuing or until the
// context is done.
type WaitStage struct {
wait func() time.Duration
}
// NewWaitStage returns a new WaitStage.
func NewWaitStage(wait func() time.Duration) *WaitStage {
return &WaitStage{
wait: wait,
}
}
// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
return ctx, nil, ctx.Err()
}
return ctx, alerts, nil
}
// DedupStage filters alerts.
// Filtering happens based on a notification log.
type DedupStage struct {
nflog nflog.Log
recv *nflogpb.Receiver
// TODO(fabxc): consider creating an AlertBatch type received
// by stages that implements these functions.
// This can then also handle caching so we can skip passing
// the hash around as a context.
hash func([]*types.Alert) []byte
resolved func([]*types.Alert) bool
now func() time.Time
}
// NewDedupStage wraps a DedupStage that runs against the given notification log.
func NewDedupStage(l nflog.Log, recv *nflogpb.Receiver) *DedupStage {
return &DedupStage{
nflog: l,
recv: recv,
hash: hashAlerts,
resolved: allAlertsResolved,
now: utcNow,
}
}
func utcNow() time.Time {
return time.Now().UTC()
}
// TODO(fabxc): this could get slow, but is fine for now. We may want to
// have something mor sophisticated at some point.
// Alternatives are FNV64a as in fingerprints or xxhash.
func hashAlerts(alerts []*types.Alert) []byte {
// The xor'd sum so we don't have to sort the alerts.
// XXX(fabxc): this approach caused collision issues with FNV64a in
// the past. However, sha256 should not suffer from the bit cancelation
// in in small input changes.
xsum := [sha256.Size]byte{}
for _, a := range alerts {
b := make([]byte, 9)
binary.BigEndian.PutUint64(b, uint64(a.Fingerprint()))
// Resolved status is part of the identity.
if a.Resolved() {
b[8] = 1
}
for i, b := range sha256.Sum256(b) {
xsum[i] ^= b
}
}
return xsum[:]
}
func allAlertsResolved(alerts []*types.Alert) bool {
for _, a := range alerts {
if !a.Resolved() {
return false
}
}
return true
}
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, hash []byte, resolved bool, repeat time.Duration) (bool, error) {
// If we haven't notified about the alert group before, notify right away
// unless we only have resolved alerts.
if entry == nil {
return !resolved, nil
}
// Check whether the contents have changed.
if !bytes.Equal(entry.GroupHash, hash) {
return true, nil
}
// Nothing changed, only notify if the repeat interval has passed.
ts, err := ptypes.Timestamp(entry.Timestamp)
if err != nil {
return false, err
}
return ts.Before(n.now().Add(-repeat)), nil
}
// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// TODO(fabxc): GroupKey will turn into []byte eventually.
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}
gkeyb := make([]byte, 8)
binary.BigEndian.PutUint64(gkeyb, uint64(gkey))
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, fmt.Errorf("repeat interval missing")
}
hash := n.hash(alerts)
resolved := n.resolved(alerts)
ctx = WithNotificationHash(ctx, hash)
entries, err := n.nflog.Query(nflog.QGroupKey(gkeyb), nflog.QReceiver(n.recv))
if err != nil && err != nflog.ErrNotFound {
return ctx, nil, err
}
var entry *nflogpb.Entry
switch len(entries) {
case 0:
case 1:
entry = entries[0]
case 2:
return ctx, nil, fmt.Errorf("Unexpected entry result size %d", len(entries))
}
if ok, err := n.needsUpdate(entry, hash, resolved, repeatInterval); err != nil {
return ctx, nil, err
} else if ok {
return ctx, alerts, nil
}
return ctx, nil, nil
}
// RetryStage notifies via passed integration with exponential backoff until it
// succeeds. It aborts if the context is canceled or timed out.
type RetryStage struct {
integration Integration
}
// NewRetryStage returns a new instance of a RetryStage.
func NewRetryStage(i Integration) *RetryStage {
return &RetryStage{
integration: i,
}
}
// Exec implements the Stage interface.
func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
i = 0
b = backoff.NewExponentialBackOff()
tick = backoff.NewTicker(b)
iErr error
)
defer tick.Stop()
for {
i++
// Always check the context first to not notify again.
select {
case <-ctx.Done():
if iErr != nil {
return ctx, nil, iErr
}
return ctx, nil, ctx.Err()
default:
}
select {
case <-tick.C:
if retry, err := r.integration.Notify(ctx, alerts...); err != nil {
numFailedNotifications.WithLabelValues(r.integration.name).Inc()
log.Debugf("Notify attempt %d failed: %s", i, err)
if !retry {
return ctx, alerts, fmt.Errorf("Cancelling notify retry due to unrecoverable error: %s", err)
}
// Save this error to be able to return the last seen error by an
// integration upon context timeout.
iErr = err
} else {
numNotifications.WithLabelValues(r.integration.name).Inc()
return ctx, alerts, nil
}
case <-ctx.Done():
if iErr != nil {
return ctx, nil, iErr
}
return ctx, nil, ctx.Err()
}
}
}
// SetNotifiesStage sets the notification information about passed alerts. The
// passed alerts should have already been sent to the receivers.
type SetNotifiesStage struct {
nflog nflog.Log
recv *nflogpb.Receiver
resolved func([]*types.Alert) bool
}
// NewSetNotifiesStage returns a new instance of a SetNotifiesStage.
func NewSetNotifiesStage(l nflog.Log, recv *nflogpb.Receiver) *SetNotifiesStage {
return &SetNotifiesStage{
nflog: l,
recv: recv,
resolved: allAlertsResolved,
}
}
// Exec implements the Stage interface.
func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
hash, ok := NotificationHash(ctx)
if !ok {
return ctx, nil, fmt.Errorf("notification hash missing")
}
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}
gkeyb := make([]byte, 8)
binary.BigEndian.PutUint64(gkeyb, uint64(gkey))
if n.resolved(alerts) {
return ctx, alerts, n.nflog.LogResolved(n.recv, gkeyb, hash)
}
return ctx, alerts, n.nflog.LogActive(n.recv, gkeyb, hash)
}