alertmanager/notify/notify.go

685 lines
20 KiB
Go
Raw Normal View History

2015-10-11 15:24:49 +00:00
// Copyright 2015 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2015-09-29 12:45:38 +00:00
package notify
import (
"context"
2015-09-29 12:45:38 +00:00
"fmt"
"sort"
2015-09-29 13:12:31 +00:00
"sync"
"time"
2015-09-29 12:45:38 +00:00
"github.com/cenkalti/backoff"
"github.com/cespare/xxhash"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
2016-08-12 17:18:26 +00:00
"github.com/prometheus/client_golang/prometheus"
2015-09-29 13:12:31 +00:00
"github.com/prometheus/common/model"
2015-09-29 12:45:38 +00:00
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb"
2016-08-30 09:58:27 +00:00
"github.com/prometheus/alertmanager/silence"
2015-09-29 12:45:38 +00:00
"github.com/prometheus/alertmanager/types"
)
2016-08-12 17:18:26 +00:00
var (
numNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_total",
Help: "The total number of attempted notifications.",
}, []string{"integration"})
numFailedNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_failed_total",
Help: "The total number of failed notifications.",
}, []string{"integration"})
notificationLatencySeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "alertmanager",
Name: "notification_latency_seconds",
Help: "The latency of notifications in seconds.",
Buckets: []float64{1, 5, 10, 15, 20},
}, []string{"integration"})
2016-08-12 17:18:26 +00:00
)
func init() {
numNotifications.WithLabelValues("email")
numNotifications.WithLabelValues("hipchat")
numNotifications.WithLabelValues("pagerduty")
numNotifications.WithLabelValues("wechat")
numNotifications.WithLabelValues("pushover")
numNotifications.WithLabelValues("slack")
numNotifications.WithLabelValues("opsgenie")
numNotifications.WithLabelValues("webhook")
numNotifications.WithLabelValues("victorops")
numFailedNotifications.WithLabelValues("email")
numFailedNotifications.WithLabelValues("hipchat")
numFailedNotifications.WithLabelValues("pagerduty")
numFailedNotifications.WithLabelValues("wechat")
numFailedNotifications.WithLabelValues("pushover")
numFailedNotifications.WithLabelValues("slack")
numFailedNotifications.WithLabelValues("opsgenie")
numFailedNotifications.WithLabelValues("webhook")
numFailedNotifications.WithLabelValues("victorops")
notificationLatencySeconds.WithLabelValues("email")
notificationLatencySeconds.WithLabelValues("hipchat")
notificationLatencySeconds.WithLabelValues("pagerduty")
notificationLatencySeconds.WithLabelValues("wechat")
notificationLatencySeconds.WithLabelValues("pushover")
notificationLatencySeconds.WithLabelValues("slack")
notificationLatencySeconds.WithLabelValues("opsgenie")
notificationLatencySeconds.WithLabelValues("webhook")
notificationLatencySeconds.WithLabelValues("victorops")
prometheus.MustRegister(numNotifications)
prometheus.MustRegister(numFailedNotifications)
prometheus.MustRegister(notificationLatencySeconds)
2016-08-12 17:18:26 +00:00
}
// ResolvedSender returns true if resolved notifications should be sent.
type ResolvedSender interface {
SendResolved() bool
}
2015-11-12 12:18:36 +00:00
// MinTimeout is the minimum timeout that is set for the context of a call
// to a notification pipeline.
const MinTimeout = 10 * time.Second
2015-11-12 12:18:36 +00:00
// notifyKey defines a custom type with which a context is populated to
// avoid accidental collisions.
2015-09-29 13:12:31 +00:00
type notifyKey int
const (
keyReceiverName notifyKey = iota
keyRepeatInterval
keyGroupLabels
keyGroupKey
keyFiringAlerts
keyResolvedAlerts
keyNow
2015-09-29 13:12:31 +00:00
)
// WithReceiverName populates a context with a receiver name.
func WithReceiverName(ctx context.Context, rcv string) context.Context {
return context.WithValue(ctx, keyReceiverName, rcv)
}
2015-11-12 12:18:36 +00:00
// WithGroupKey populates a context with a group key.
func WithGroupKey(ctx context.Context, s string) context.Context {
return context.WithValue(ctx, keyGroupKey, s)
}
// WithFiringAlerts populates a context with a slice of firing alerts.
func WithFiringAlerts(ctx context.Context, alerts []uint64) context.Context {
return context.WithValue(ctx, keyFiringAlerts, alerts)
}
// WithResolvedAlerts populates a context with a slice of resolved alerts.
func WithResolvedAlerts(ctx context.Context, alerts []uint64) context.Context {
return context.WithValue(ctx, keyResolvedAlerts, alerts)
}
2015-11-12 12:18:36 +00:00
// WithGroupLabels populates a context with grouping labels.
func WithGroupLabels(ctx context.Context, lset model.LabelSet) context.Context {
return context.WithValue(ctx, keyGroupLabels, lset)
}
2015-11-12 12:18:36 +00:00
// WithNow populates a context with a now timestamp.
func WithNow(ctx context.Context, t time.Time) context.Context {
return context.WithValue(ctx, keyNow, t)
}
// WithRepeatInterval populates a context with a repeat interval.
func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context {
return context.WithValue(ctx, keyRepeatInterval, t)
}
// RepeatInterval extracts a repeat interval from the context. Iff none exists, the
// second argument is false.
func RepeatInterval(ctx context.Context) (time.Duration, bool) {
v, ok := ctx.Value(keyRepeatInterval).(time.Duration)
return v, ok
}
// ReceiverName extracts a receiver name from the context. Iff none exists, the
// second argument is false.
func ReceiverName(ctx context.Context) (string, bool) {
v, ok := ctx.Value(keyReceiverName).(string)
return v, ok
}
func receiverName(ctx context.Context, l log.Logger) string {
recv, ok := ReceiverName(ctx)
2015-11-26 17:19:46 +00:00
if !ok {
level.Error(l).Log("msg", "Missing receiver")
2015-11-26 17:19:46 +00:00
}
return recv
}
2015-11-12 12:18:36 +00:00
// GroupKey extracts a group key from the context. Iff none exists, the
// second argument is false.
func GroupKey(ctx context.Context) (string, bool) {
v, ok := ctx.Value(keyGroupKey).(string)
return v, ok
}
func groupLabels(ctx context.Context, l log.Logger) model.LabelSet {
2015-11-25 14:49:26 +00:00
groupLabels, ok := GroupLabels(ctx)
if !ok {
level.Error(l).Log("msg", "Missing group labels")
2015-11-25 14:49:26 +00:00
}
return groupLabels
}
2015-11-12 12:18:36 +00:00
// GroupLabels extracts grouping label set from the context. Iff none exists, the
// second argument is false.
func GroupLabels(ctx context.Context) (model.LabelSet, bool) {
v, ok := ctx.Value(keyGroupLabels).(model.LabelSet)
return v, ok
}
2015-11-12 12:18:36 +00:00
// Now extracts a now timestamp from the context. Iff none exists, the
// second argument is false.
func Now(ctx context.Context) (time.Time, bool) {
v, ok := ctx.Value(keyNow).(time.Time)
return v, ok
}
// FiringAlerts extracts a slice of firing alerts from the context.
// Iff none exists, the second argument is false.
func FiringAlerts(ctx context.Context) ([]uint64, bool) {
v, ok := ctx.Value(keyFiringAlerts).([]uint64)
return v, ok
}
// ResolvedAlerts extracts a slice of firing alerts from the context.
// Iff none exists, the second argument is false.
func ResolvedAlerts(ctx context.Context) ([]uint64, bool) {
v, ok := ctx.Value(keyResolvedAlerts).([]uint64)
return v, ok
}
2016-08-12 17:18:26 +00:00
// A Stage processes alerts under the constraints of the given context.
2016-08-12 13:22:17 +00:00
type Stage interface {
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
}
2016-08-12 17:18:26 +00:00
// StageFunc wraps a function to represent a Stage.
type StageFunc func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
2016-08-12 13:22:17 +00:00
2016-08-12 17:18:26 +00:00
// Exec implements Stage interface.
func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
return f(ctx, l, alerts...)
2016-08-12 13:22:17 +00:00
}
2018-02-07 15:36:47 +00:00
type NotificationLog interface {
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error)
}
2016-08-12 17:18:26 +00:00
// BuildPipeline builds a map of receivers to Stages.
func BuildPipeline(
receivers map[string][]Integration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
2018-02-07 15:36:47 +00:00
notificationLog NotificationLog,
peer *cluster.Peer,
2016-08-12 17:18:26 +00:00
) RoutingStage {
rs := make(RoutingStage, len(receivers))
2016-08-12 17:18:26 +00:00
ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor)
ss := NewMuteStage(silencer)
for name := range receivers {
st := createReceiverStage(name, receivers[name], wait, notificationLog)
rs[name] = MultiStage{ms, is, ss, st}
2016-08-12 17:18:26 +00:00
}
return rs
}
// createReceiverStage creates a pipeline of stages for a receiver.
func createReceiverStage(name string, integrations []Integration, wait func() time.Duration, notificationLog NotificationLog) Stage {
var fs FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
GroupName: name,
Integration: integrations[i].Name(),
Idx: uint32(integrations[i].Index()),
}
2016-08-12 13:22:17 +00:00
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
s = append(s, NewRetryStage(integrations[i], name))
s = append(s, NewSetNotifiesStage(notificationLog, recv))
2016-08-12 13:22:17 +00:00
fs = append(fs, s)
}
return fs
2016-08-12 13:22:17 +00:00
}
2016-08-12 17:18:26 +00:00
// RoutingStage executes the inner stages based on the receiver specified in
// the context.
type RoutingStage map[string]Stage
2016-08-12 13:22:17 +00:00
2016-08-12 17:18:26 +00:00
// Exec implements the Stage interface.
func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
receiver, ok := ReceiverName(ctx)
2016-08-12 17:18:26 +00:00
if !ok {
return ctx, nil, fmt.Errorf("receiver missing")
2016-08-12 13:22:17 +00:00
}
2016-08-12 17:18:26 +00:00
s, ok := rs[receiver]
2016-08-12 13:22:17 +00:00
if !ok {
return ctx, nil, fmt.Errorf("stage for receiver missing")
}
return s.Exec(ctx, l, alerts...)
}
// A MultiStage executes a series of stages sequentially.
2016-08-12 17:18:26 +00:00
type MultiStage []Stage
2016-08-12 17:18:26 +00:00
// Exec implements the Stage interface.
func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
2016-08-12 17:18:26 +00:00
var err error
for _, s := range ms {
if len(alerts) == 0 {
return ctx, nil, nil
2016-08-12 17:18:26 +00:00
}
ctx, alerts, err = s.Exec(ctx, l, alerts...)
if err != nil {
return ctx, nil, err
}
}
return ctx, alerts, nil
}
2016-08-12 17:18:26 +00:00
// FanoutStage executes its stages concurrently
type FanoutStage []Stage
2016-08-12 17:18:26 +00:00
// Exec attempts to execute all stages concurrently and discards the results.
// It returns its input alerts and a types.MultiError if one or more stages fail.
func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
2015-10-11 10:40:43 +00:00
var (
wg sync.WaitGroup
me types.MultiError
)
2016-08-12 13:22:17 +00:00
wg.Add(len(fs))
2015-09-29 13:12:31 +00:00
for _, s := range fs {
2016-08-12 13:22:17 +00:00
go func(s Stage) {
if _, _, err := s.Exec(ctx, l, alerts...); err != nil {
me.Add(err)
lvl := level.Error(l)
if ctx.Err() == context.Canceled {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
lvl = level.Debug(l)
}
lvl.Log("msg", "Error on notify", "err", err)
2015-09-29 13:12:31 +00:00
}
wg.Done()
2016-08-12 13:22:17 +00:00
}(s)
2015-09-29 13:12:31 +00:00
}
wg.Wait()
if me.Len() > 0 {
return ctx, alerts, &me
2015-10-11 14:54:31 +00:00
}
return ctx, alerts, nil
2016-08-12 17:18:26 +00:00
}
// GossipSettleStage waits until the Gossip has settled to forward alerts.
type GossipSettleStage struct {
peer *cluster.Peer
}
// NewGossipSettleStage returns a new GossipSettleStage.
func NewGossipSettleStage(p *cluster.Peer) *GossipSettleStage {
return &GossipSettleStage{peer: p}
}
func (n *GossipSettleStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
if n.peer != nil {
n.peer.WaitReady()
}
return ctx, alerts, nil
}
// MuteStage filters alerts through a Muter.
type MuteStage struct {
2018-03-03 10:07:47 +00:00
muter types.Muter
2016-08-12 17:18:26 +00:00
}
// NewMuteStage return a new MuteStage.
func NewMuteStage(m types.Muter) *MuteStage {
return &MuteStage{muter: m}
2016-08-12 17:18:26 +00:00
}
// Exec implements the Stage interface.
func (n *MuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
2016-08-12 17:18:26 +00:00
var filtered []*types.Alert
for _, a := range alerts {
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if muted.
2016-08-12 17:18:26 +00:00
if !n.muter.Mutes(a.Labels) {
filtered = append(filtered, a)
}
// TODO(fabxc): increment muted alerts counter if muted.
2016-08-12 17:18:26 +00:00
}
return ctx, filtered, nil
2016-08-12 17:18:26 +00:00
}
// WaitStage waits for a certain amount of time before continuing or until the
// context is done.
type WaitStage struct {
wait func() time.Duration
}
// NewWaitStage returns a new WaitStage.
func NewWaitStage(wait func() time.Duration) *WaitStage {
return &WaitStage{
wait: wait,
}
}
// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
2016-08-12 17:18:26 +00:00
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
return ctx, nil, ctx.Err()
2016-08-12 17:18:26 +00:00
}
return ctx, alerts, nil
2016-08-12 17:18:26 +00:00
}
// DedupStage filters alerts.
// Filtering happens based on a notification log.
2016-08-12 17:18:26 +00:00
type DedupStage struct {
rs ResolvedSender
2018-02-07 15:36:47 +00:00
nflog NotificationLog
recv *nflogpb.Receiver
now func() time.Time
hash func(*types.Alert) uint64
2015-09-29 12:45:38 +00:00
}
// NewDedupStage wraps a DedupStage that runs against the given notification log.
func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
return &DedupStage{
rs: rs,
nflog: l,
recv: recv,
now: utcNow,
hash: hashAlert,
}
2015-09-29 12:45:38 +00:00
}
func utcNow() time.Time {
return time.Now().UTC()
}
var hashBuffers = sync.Pool{}
func getHashBuffer() []byte {
b := hashBuffers.Get()
if b == nil {
return make([]byte, 0, 1024)
}
return b.([]byte)
}
func putHashBuffer(b []byte) {
b = b[:0]
//lint:ignore SA6002 relax staticcheck verification.
hashBuffers.Put(b)
}
func hashAlert(a *types.Alert) uint64 {
const sep = '\xff'
b := getHashBuffer()
defer putHashBuffer(b)
names := make(model.LabelNames, 0, len(a.Labels))
for ln := range a.Labels {
names = append(names, ln)
}
sort.Sort(names)
for _, ln := range names {
b = append(b, string(ln)...)
b = append(b, sep)
b = append(b, string(a.Labels[ln])...)
b = append(b, sep)
}
hash := xxhash.Sum64(b)
return hash
}
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
// If we haven't notified about the alert group before, notify right away
// unless we only have resolved alerts.
if entry == nil {
return len(firing) > 0
}
if !entry.IsFiringSubset(firing) {
return true
}
// Notify about all alerts being resolved.
// This is done irrespective of the send_resolved flag to make sure that
// the firing alerts are cleared from the notification log.
if len(firing) == 0 {
// If the current alert group and last notification contain no firing
// alert, it means that some alerts have been fired and resolved during the
// last interval. In this case, there is no need to notify the receiver
// since it doesn't know about them.
return len(entry.FiringAlerts) > 0
}
if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
return true
}
// Nothing changed, only notify if the repeat interval has passed.
return entry.Timestamp.Before(n.now().Add(-repeat))
}
2016-08-12 17:18:26 +00:00
// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
2015-09-29 13:12:31 +00:00
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
2015-09-29 12:45:38 +00:00
}
2015-09-29 13:12:31 +00:00
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, fmt.Errorf("repeat interval missing")
}
firingSet := map[uint64]struct{}{}
resolvedSet := map[uint64]struct{}{}
firing := []uint64{}
resolved := []uint64{}
var hash uint64
for _, a := range alerts {
hash = n.hash(a)
if a.Resolved() {
resolved = append(resolved, hash)
resolvedSet[hash] = struct{}{}
} else {
firing = append(firing, hash)
firingSet[hash] = struct{}{}
}
}
2015-09-29 12:45:38 +00:00
ctx = WithFiringAlerts(ctx, firing)
ctx = WithResolvedAlerts(ctx, resolved)
entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
if err != nil && err != nflog.ErrNotFound {
return ctx, nil, err
2015-09-29 12:45:38 +00:00
}
var entry *nflogpb.Entry
switch len(entries) {
case 0:
case 1:
entry = entries[0]
case 2:
2017-11-01 22:08:34 +00:00
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
return ctx, alerts, nil
}
return ctx, nil, nil
}
2016-08-12 17:18:26 +00:00
// RetryStage notifies via passed integration with exponential backoff until it
// succeeds. It aborts if the context is canceled or timed out.
type RetryStage struct {
integration Integration
groupName string
}
2016-08-12 17:18:26 +00:00
// NewRetryStage returns a new instance of a RetryStage.
func NewRetryStage(i Integration, groupName string) *RetryStage {
2016-08-12 17:18:26 +00:00
return &RetryStage{
integration: i,
groupName: groupName,
2015-09-29 13:12:31 +00:00
}
2016-08-12 13:22:17 +00:00
}
2015-09-29 13:12:31 +00:00
2016-08-12 13:22:17 +00:00
// Exec implements the Stage interface.
func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var sent []*types.Alert
// If we shouldn't send notifications for resolved alerts, but there are only
// resolved alerts, report them all as successfully notified (we still want the
// notification log to log them for the next run of DedupStage).
if !r.integration.SendResolved() {
firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, fmt.Errorf("firing alerts missing")
}
if len(firing) == 0 {
return ctx, alerts, nil
}
for _, a := range alerts {
if a.Status() != model.AlertResolved {
sent = append(sent, a)
}
}
} else {
sent = alerts
}
2016-08-12 17:18:26 +00:00
var (
i = 0
b = backoff.NewExponentialBackOff()
tick = backoff.NewTicker(b)
iErr error
2016-08-12 17:18:26 +00:00
)
defer tick.Stop()
2015-10-11 14:54:31 +00:00
2016-08-12 17:18:26 +00:00
for {
i++
// Always check the context first to not notify again.
select {
case <-ctx.Done():
if iErr != nil {
return ctx, nil, iErr
}
return ctx, nil, ctx.Err()
2016-08-12 17:18:26 +00:00
default:
}
2015-09-29 13:12:31 +00:00
2016-08-12 17:18:26 +00:00
select {
case <-tick.C:
now := time.Now()
retry, err := r.integration.Notify(ctx, sent...)
notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds())
numNotifications.WithLabelValues(r.integration.Name()).Inc()
if err != nil {
numFailedNotifications.WithLabelValues(r.integration.Name()).Inc()
level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.Name(), "receiver", r.groupName, "err", err)
if !retry {
return ctx, alerts, fmt.Errorf("cancelling notify retry for %q due to unrecoverable error: %s", r.integration.Name(), err)
}
// Save this error to be able to return the last seen error by an
// integration upon context timeout.
iErr = err
2016-08-12 17:18:26 +00:00
} else {
return ctx, alerts, nil
2016-08-12 17:18:26 +00:00
}
case <-ctx.Done():
if iErr != nil {
return ctx, nil, iErr
}
return ctx, nil, ctx.Err()
2015-12-03 16:27:36 +00:00
}
}
}
2016-08-12 17:18:26 +00:00
// SetNotifiesStage sets the notification information about passed alerts. The
// passed alerts should have already been sent to the receivers.
type SetNotifiesStage struct {
2018-02-07 15:36:47 +00:00
nflog NotificationLog
recv *nflogpb.Receiver
2015-12-03 16:27:36 +00:00
}
2016-08-12 17:18:26 +00:00
// NewSetNotifiesStage returns a new instance of a SetNotifiesStage.
2018-02-07 15:36:47 +00:00
func NewSetNotifiesStage(l NotificationLog, recv *nflogpb.Receiver) *SetNotifiesStage {
2016-08-12 17:18:26 +00:00
return &SetNotifiesStage{
nflog: l,
recv: recv,
2015-12-03 16:27:36 +00:00
}
}
2016-08-12 13:22:17 +00:00
// Exec implements the Stage interface.
func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
2016-08-12 17:18:26 +00:00
}
2016-08-12 13:22:17 +00:00
firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, fmt.Errorf("firing alerts missing")
}
resolved, ok := ResolvedAlerts(ctx)
if !ok {
return ctx, nil, fmt.Errorf("resolved alerts missing")
}
return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
2016-08-12 13:22:17 +00:00
}