// Copyright 2015 Prometheus Team // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package notify import ( "context" "errors" "fmt" "log/slog" "sort" "sync" "time" "github.com/cenkalti/backoff/v4" "github.com/cespare/xxhash/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/nflog" "github.com/prometheus/alertmanager/nflog/nflogpb" "github.com/prometheus/alertmanager/silence" "github.com/prometheus/alertmanager/timeinterval" "github.com/prometheus/alertmanager/types" ) // ResolvedSender returns true if resolved notifications should be sent. type ResolvedSender interface { SendResolved() bool } // Peer represents the cluster node from where we are the sending the notification. type Peer interface { // WaitReady waits until the node silences and notifications have settled before attempting to send a notification. WaitReady(context.Context) error } // MinTimeout is the minimum timeout that is set for the context of a call // to a notification pipeline. const MinTimeout = 10 * time.Second // Notifier notifies about alerts under constraints of the given context. It // returns an error if unsuccessful and a flag whether the error is // recoverable. This information is useful for a retry logic. type Notifier interface { Notify(context.Context, ...*types.Alert) (bool, error) } // Integration wraps a notifier and its configuration to be uniquely identified // by name and index from its origin in the configuration. type Integration struct { notifier Notifier rs ResolvedSender name string idx int receiverName string } // NewIntegration returns a new integration. func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int, receiverName string) Integration { return Integration{ notifier: notifier, rs: rs, name: name, idx: idx, receiverName: receiverName, } } // Notify implements the Notifier interface. func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) { return i.notifier.Notify(ctx, alerts...) } // SendResolved implements the ResolvedSender interface. func (i *Integration) SendResolved() bool { return i.rs.SendResolved() } // Name returns the name of the integration. func (i *Integration) Name() string { return i.name } // Index returns the index of the integration. func (i *Integration) Index() int { return i.idx } // String implements the Stringer interface. func (i *Integration) String() string { return fmt.Sprintf("%s[%d]", i.name, i.idx) } // notifyKey defines a custom type with which a context is populated to // avoid accidental collisions. type notifyKey int const ( keyReceiverName notifyKey = iota keyRepeatInterval keyGroupLabels keyGroupKey keyFiringAlerts keyResolvedAlerts keyNow keyMuteTimeIntervals keyActiveTimeIntervals keyRouteID ) // WithReceiverName populates a context with a receiver name. func WithReceiverName(ctx context.Context, rcv string) context.Context { return context.WithValue(ctx, keyReceiverName, rcv) } // WithGroupKey populates a context with a group key. func WithGroupKey(ctx context.Context, s string) context.Context { return context.WithValue(ctx, keyGroupKey, s) } // WithFiringAlerts populates a context with a slice of firing alerts. func WithFiringAlerts(ctx context.Context, alerts []uint64) context.Context { return context.WithValue(ctx, keyFiringAlerts, alerts) } // WithResolvedAlerts populates a context with a slice of resolved alerts. func WithResolvedAlerts(ctx context.Context, alerts []uint64) context.Context { return context.WithValue(ctx, keyResolvedAlerts, alerts) } // WithGroupLabels populates a context with grouping labels. func WithGroupLabels(ctx context.Context, lset model.LabelSet) context.Context { return context.WithValue(ctx, keyGroupLabels, lset) } // WithNow populates a context with a now timestamp. func WithNow(ctx context.Context, t time.Time) context.Context { return context.WithValue(ctx, keyNow, t) } // WithRepeatInterval populates a context with a repeat interval. func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context { return context.WithValue(ctx, keyRepeatInterval, t) } // WithMuteTimeIntervals populates a context with a slice of mute time names. func WithMuteTimeIntervals(ctx context.Context, mt []string) context.Context { return context.WithValue(ctx, keyMuteTimeIntervals, mt) } func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context { return context.WithValue(ctx, keyActiveTimeIntervals, at) } func WithRouteID(ctx context.Context, routeID string) context.Context { return context.WithValue(ctx, keyRouteID, routeID) } // RepeatInterval extracts a repeat interval from the context. Iff none exists, the // second argument is false. func RepeatInterval(ctx context.Context) (time.Duration, bool) { v, ok := ctx.Value(keyRepeatInterval).(time.Duration) return v, ok } // ReceiverName extracts a receiver name from the context. Iff none exists, the // second argument is false. func ReceiverName(ctx context.Context) (string, bool) { v, ok := ctx.Value(keyReceiverName).(string) return v, ok } // GroupKey extracts a group key from the context. Iff none exists, the // second argument is false. func GroupKey(ctx context.Context) (string, bool) { v, ok := ctx.Value(keyGroupKey).(string) return v, ok } // GroupLabels extracts grouping label set from the context. Iff none exists, the // second argument is false. func GroupLabels(ctx context.Context) (model.LabelSet, bool) { v, ok := ctx.Value(keyGroupLabels).(model.LabelSet) return v, ok } // Now extracts a now timestamp from the context. Iff none exists, the // second argument is false. func Now(ctx context.Context) (time.Time, bool) { v, ok := ctx.Value(keyNow).(time.Time) return v, ok } // FiringAlerts extracts a slice of firing alerts from the context. // Iff none exists, the second argument is false. func FiringAlerts(ctx context.Context) ([]uint64, bool) { v, ok := ctx.Value(keyFiringAlerts).([]uint64) return v, ok } // ResolvedAlerts extracts a slice of firing alerts from the context. // Iff none exists, the second argument is false. func ResolvedAlerts(ctx context.Context) ([]uint64, bool) { v, ok := ctx.Value(keyResolvedAlerts).([]uint64) return v, ok } // MuteTimeIntervalNames extracts a slice of mute time names from the context. If and only if none exists, the // second argument is false. func MuteTimeIntervalNames(ctx context.Context) ([]string, bool) { v, ok := ctx.Value(keyMuteTimeIntervals).([]string) return v, ok } // ActiveTimeIntervalNames extracts a slice of active time names from the context. If none exists, the // second argument is false. func ActiveTimeIntervalNames(ctx context.Context) ([]string, bool) { v, ok := ctx.Value(keyActiveTimeIntervals).([]string) return v, ok } // RouteID extracts a RouteID from the context. Iff none exists, the // // second argument is false. func RouteID(ctx context.Context) (string, bool) { v, ok := ctx.Value(keyRouteID).(string) return v, ok } // A Stage processes alerts under the constraints of the given context. type Stage interface { Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) } // StageFunc wraps a function to represent a Stage. type StageFunc func(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) // Exec implements Stage interface. func (f StageFunc) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { return f(ctx, l, alerts...) } type NotificationLog interface { Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error) } type Metrics struct { numNotifications *prometheus.CounterVec numTotalFailedNotifications *prometheus.CounterVec numNotificationRequestsTotal *prometheus.CounterVec numNotificationRequestsFailedTotal *prometheus.CounterVec numNotificationSuppressedTotal *prometheus.CounterVec notificationLatencySeconds *prometheus.HistogramVec ff featurecontrol.Flagger } func NewMetrics(r prometheus.Registerer, ff featurecontrol.Flagger) *Metrics { labels := []string{"integration"} if ff.EnableReceiverNamesInMetrics() { labels = append(labels, "receiver_name") } m := &Metrics{ numNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "alertmanager", Name: "notifications_total", Help: "The total number of attempted notifications.", }, labels), numTotalFailedNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "alertmanager", Name: "notifications_failed_total", Help: "The total number of failed notifications.", }, append(labels, "reason")), numNotificationRequestsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "alertmanager", Name: "notification_requests_total", Help: "The total number of attempted notification requests.", }, labels), numNotificationRequestsFailedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "alertmanager", Name: "notification_requests_failed_total", Help: "The total number of failed notification requests.", }, labels), numNotificationSuppressedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "alertmanager", Name: "notifications_suppressed_total", Help: "The total number of notifications suppressed for being silenced, inhibited, outside of active time intervals or within muted time intervals.", }, []string{"reason"}), notificationLatencySeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "alertmanager", Name: "notification_latency_seconds", Help: "The latency of notifications in seconds.", Buckets: []float64{1, 5, 10, 15, 20}, NativeHistogramBucketFactor: 1.1, NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, }, labels), ff: ff, } r.MustRegister( m.numNotifications, m.numTotalFailedNotifications, m.numNotificationRequestsTotal, m.numNotificationRequestsFailedTotal, m.numNotificationSuppressedTotal, m.notificationLatencySeconds, ) return m } func (m *Metrics) InitializeFor(receiver map[string][]Integration) { if m.ff.EnableReceiverNamesInMetrics() { // Reset the vectors to take into account receiver names changing after hot reloads. m.numNotifications.Reset() m.numNotificationRequestsTotal.Reset() m.numNotificationRequestsFailedTotal.Reset() m.notificationLatencySeconds.Reset() m.numTotalFailedNotifications.Reset() for name, integrations := range receiver { for _, integration := range integrations { m.numNotifications.WithLabelValues(integration.Name(), name) m.numNotificationRequestsTotal.WithLabelValues(integration.Name(), name) m.numNotificationRequestsFailedTotal.WithLabelValues(integration.Name(), name) m.notificationLatencySeconds.WithLabelValues(integration.Name(), name) for _, reason := range possibleFailureReasonCategory { m.numTotalFailedNotifications.WithLabelValues(integration.Name(), name, reason) } } } return } // When the feature flag is not enabled, we just carry on registering _all_ the integrations. for _, integration := range []string{ "email", "pagerduty", "wechat", "pushover", "slack", "opsgenie", "webhook", "victorops", "sns", "telegram", "discord", "webex", "msteams", "msteamsv2", "jira", "rocketchat", } { m.numNotifications.WithLabelValues(integration) m.numNotificationRequestsTotal.WithLabelValues(integration) m.numNotificationRequestsFailedTotal.WithLabelValues(integration) m.notificationLatencySeconds.WithLabelValues(integration) for _, reason := range possibleFailureReasonCategory { m.numTotalFailedNotifications.WithLabelValues(integration, reason) } } } type PipelineBuilder struct { metrics *Metrics ff featurecontrol.Flagger } func NewPipelineBuilder(r prometheus.Registerer, ff featurecontrol.Flagger) *PipelineBuilder { return &PipelineBuilder{ metrics: NewMetrics(r, ff), ff: ff, } } // New returns a map of receivers to Stages. func (pb *PipelineBuilder) New( receivers map[string][]Integration, wait func() time.Duration, inhibitor *inhibit.Inhibitor, silencer *silence.Silencer, intervener *timeinterval.Intervener, marker types.GroupMarker, notificationLog NotificationLog, peer Peer, ) RoutingStage { rs := make(RoutingStage, len(receivers)) ms := NewGossipSettleStage(peer) is := NewMuteStage(inhibitor, pb.metrics) tas := NewTimeActiveStage(intervener, marker, pb.metrics) tms := NewTimeMuteStage(intervener, marker, pb.metrics) ss := NewMuteStage(silencer, pb.metrics) for name := range receivers { st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) rs[name] = MultiStage{ms, is, tas, tms, ss, st} } pb.metrics.InitializeFor(receivers) return rs } // createReceiverStage creates a pipeline of stages for a receiver. func createReceiverStage( name string, integrations []Integration, wait func() time.Duration, notificationLog NotificationLog, metrics *Metrics, ) Stage { var fs FanoutStage for i := range integrations { recv := &nflogpb.Receiver{ GroupName: name, Integration: integrations[i].Name(), Idx: uint32(integrations[i].Index()), } var s MultiStage s = append(s, NewWaitStage(wait)) s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) s = append(s, NewRetryStage(integrations[i], name, metrics)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) fs = append(fs, s) } return fs } // RoutingStage executes the inner stages based on the receiver specified in // the context. type RoutingStage map[string]Stage // Exec implements the Stage interface. func (rs RoutingStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { receiver, ok := ReceiverName(ctx) if !ok { return ctx, nil, errors.New("receiver missing") } s, ok := rs[receiver] if !ok { return ctx, nil, errors.New("stage for receiver missing") } return s.Exec(ctx, l, alerts...) } // A MultiStage executes a series of stages sequentially. type MultiStage []Stage // Exec implements the Stage interface. func (ms MultiStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var err error for _, s := range ms { if len(alerts) == 0 { return ctx, nil, nil } ctx, alerts, err = s.Exec(ctx, l, alerts...) if err != nil { return ctx, nil, err } } return ctx, alerts, nil } // FanoutStage executes its stages concurrently. type FanoutStage []Stage // Exec attempts to execute all stages concurrently and discards the results. // It returns its input alerts and a types.MultiError if one or more stages fail. func (fs FanoutStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var ( wg sync.WaitGroup me types.MultiError ) wg.Add(len(fs)) for _, s := range fs { go func(s Stage) { if _, _, err := s.Exec(ctx, l, alerts...); err != nil { me.Add(err) } wg.Done() }(s) } wg.Wait() if me.Len() > 0 { return ctx, alerts, &me } return ctx, alerts, nil } // GossipSettleStage waits until the Gossip has settled to forward alerts. type GossipSettleStage struct { peer Peer } // NewGossipSettleStage returns a new GossipSettleStage. func NewGossipSettleStage(p Peer) *GossipSettleStage { return &GossipSettleStage{peer: p} } func (n *GossipSettleStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if n.peer != nil { if err := n.peer.WaitReady(ctx); err != nil { return ctx, nil, err } } return ctx, alerts, nil } const ( SuppressedReasonSilence = "silence" SuppressedReasonInhibition = "inhibition" SuppressedReasonMuteTimeInterval = "mute_time_interval" SuppressedReasonActiveTimeInterval = "active_time_interval" ) // MuteStage filters alerts through a Muter. type MuteStage struct { muter types.Muter metrics *Metrics } // NewMuteStage return a new MuteStage. func NewMuteStage(m types.Muter, metrics *Metrics) *MuteStage { return &MuteStage{muter: m, metrics: metrics} } // Exec implements the Stage interface. func (n *MuteStage) Exec(ctx context.Context, logger *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var ( filtered []*types.Alert muted []*types.Alert ) for _, a := range alerts { // TODO(fabxc): increment total alerts counter. // Do not send the alert if muted. if n.muter.Mutes(a.Labels) { muted = append(muted, a) } else { filtered = append(filtered, a) } // TODO(fabxc): increment muted alerts counter if muted. } if len(muted) > 0 { var reason string switch n.muter.(type) { case *silence.Silencer: reason = SuppressedReasonSilence case *inhibit.Inhibitor: reason = SuppressedReasonInhibition default: } n.metrics.numNotificationSuppressedTotal.WithLabelValues(reason).Add(float64(len(muted))) logger.Debug("Notifications will not be sent for muted alerts", "alerts", fmt.Sprintf("%v", muted), "reason", reason) } return ctx, filtered, nil } // WaitStage waits for a certain amount of time before continuing or until the // context is done. type WaitStage struct { wait func() time.Duration } // NewWaitStage returns a new WaitStage. func NewWaitStage(wait func() time.Duration) *WaitStage { return &WaitStage{ wait: wait, } } // Exec implements the Stage interface. func (ws *WaitStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { select { case <-time.After(ws.wait()): case <-ctx.Done(): return ctx, nil, ctx.Err() } return ctx, alerts, nil } // DedupStage filters alerts. // Filtering happens based on a notification log. type DedupStage struct { rs ResolvedSender nflog NotificationLog recv *nflogpb.Receiver now func() time.Time hash func(*types.Alert) uint64 } // NewDedupStage wraps a DedupStage that runs against the given notification log. func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage { return &DedupStage{ rs: rs, nflog: l, recv: recv, now: utcNow, hash: hashAlert, } } func utcNow() time.Time { return time.Now().UTC() } // Wrap a slice in a struct so we can store a pointer in sync.Pool. type hashBuffer struct { buf []byte } var hashBuffers = sync.Pool{ New: func() interface{} { return &hashBuffer{buf: make([]byte, 0, 1024)} }, } func hashAlert(a *types.Alert) uint64 { const sep = '\xff' hb := hashBuffers.Get().(*hashBuffer) defer hashBuffers.Put(hb) b := hb.buf[:0] names := make(model.LabelNames, 0, len(a.Labels)) for ln := range a.Labels { names = append(names, ln) } sort.Sort(names) for _, ln := range names { b = append(b, string(ln)...) b = append(b, sep) b = append(b, string(a.Labels[ln])...) b = append(b, sep) } hash := xxhash.Sum64(b) return hash } func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool { // If we haven't notified about the alert group before, notify right away // unless we only have resolved alerts. if entry == nil { return len(firing) > 0 } if !entry.IsFiringSubset(firing) { return true } // Notify about all alerts being resolved. // This is done irrespective of the send_resolved flag to make sure that // the firing alerts are cleared from the notification log. if len(firing) == 0 { // If the current alert group and last notification contain no firing // alert, it means that some alerts have been fired and resolved during the // last interval. In this case, there is no need to notify the receiver // since it doesn't know about them. return len(entry.FiringAlerts) > 0 } if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) { return true } // Nothing changed, only notify if the repeat interval has passed. return entry.Timestamp.Before(n.now().Add(-repeat)) } // Exec implements the Stage interface. func (n *DedupStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { gkey, ok := GroupKey(ctx) if !ok { return ctx, nil, errors.New("group key missing") } repeatInterval, ok := RepeatInterval(ctx) if !ok { return ctx, nil, errors.New("repeat interval missing") } firingSet := map[uint64]struct{}{} resolvedSet := map[uint64]struct{}{} firing := []uint64{} resolved := []uint64{} var hash uint64 for _, a := range alerts { hash = n.hash(a) if a.Resolved() { resolved = append(resolved, hash) resolvedSet[hash] = struct{}{} } else { firing = append(firing, hash) firingSet[hash] = struct{}{} } } ctx = WithFiringAlerts(ctx, firing) ctx = WithResolvedAlerts(ctx, resolved) entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv)) if err != nil && !errors.Is(err, nflog.ErrNotFound) { return ctx, nil, err } var entry *nflogpb.Entry switch len(entries) { case 0: case 1: entry = entries[0] default: return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries)) } if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) { return ctx, alerts, nil } return ctx, nil, nil } // RetryStage notifies via passed integration with exponential backoff until it // succeeds. It aborts if the context is canceled or timed out. type RetryStage struct { integration Integration groupName string metrics *Metrics labelValues []string } // NewRetryStage returns a new instance of a RetryStage. func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage { labelValues := []string{i.Name()} if metrics.ff.EnableReceiverNamesInMetrics() { labelValues = append(labelValues, i.receiverName) } return &RetryStage{ integration: i, groupName: groupName, metrics: metrics, labelValues: labelValues, } } func (r RetryStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { r.metrics.numNotifications.WithLabelValues(r.labelValues...).Inc() ctx, alerts, err := r.exec(ctx, l, alerts...) failureReason := DefaultReason.String() if err != nil { var e *ErrorWithReason if errors.As(err, &e) { failureReason = e.Reason.String() } r.metrics.numTotalFailedNotifications.WithLabelValues(append(r.labelValues, failureReason)...).Inc() } return ctx, alerts, err } func (r RetryStage) exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var sent []*types.Alert // If we shouldn't send notifications for resolved alerts, but there are only // resolved alerts, report them all as successfully notified (we still want the // notification log to log them for the next run of DedupStage). if !r.integration.SendResolved() { firing, ok := FiringAlerts(ctx) if !ok { return ctx, nil, errors.New("firing alerts missing") } if len(firing) == 0 { return ctx, alerts, nil } for _, a := range alerts { if a.Status() != model.AlertResolved { sent = append(sent, a) } } } else { sent = alerts } b := backoff.NewExponentialBackOff() b.MaxElapsedTime = 0 // Always retry. tick := backoff.NewTicker(b) defer tick.Stop() var ( i = 0 iErr error ) l = l.With("receiver", r.groupName, "integration", r.integration.String()) if groupKey, ok := GroupKey(ctx); ok { l = l.With("aggrGroup", groupKey) } for { i++ // Always check the context first to not notify again. select { case <-ctx.Done(): if iErr == nil { iErr = ctx.Err() if errors.Is(iErr, context.Canceled) { iErr = NewErrorWithReason(ContextCanceledReason, iErr) } else if errors.Is(iErr, context.DeadlineExceeded) { iErr = NewErrorWithReason(ContextDeadlineExceededReason, iErr) } } if iErr != nil { return ctx, nil, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr) } return ctx, nil, nil default: } select { case <-tick.C: now := time.Now() retry, err := r.integration.Notify(ctx, sent...) dur := time.Since(now) r.metrics.notificationLatencySeconds.WithLabelValues(r.labelValues...).Observe(dur.Seconds()) r.metrics.numNotificationRequestsTotal.WithLabelValues(r.labelValues...).Inc() if err != nil { r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.labelValues...).Inc() if !retry { return ctx, alerts, fmt.Errorf("%s/%s: notify retry canceled due to unrecoverable error after %d attempts: %w", r.groupName, r.integration.String(), i, err) } if ctx.Err() == nil { if iErr == nil || err.Error() != iErr.Error() { // Log the error if the context isn't done and the error isn't the same as before. l.Warn("Notify attempt failed, will retry later", "attempts", i, "err", err) } // Save this error to be able to return the last seen error by an // integration upon context timeout. iErr = err } } else { l := l.With("attempts", i, "duration", dur) if i <= 1 { l = l.With("alerts", fmt.Sprintf("%v", alerts)) l.Debug("Notify success") } else { l.Info("Notify success") } return ctx, alerts, nil } case <-ctx.Done(): if iErr == nil { iErr = ctx.Err() if errors.Is(iErr, context.Canceled) { iErr = NewErrorWithReason(ContextCanceledReason, iErr) } else if errors.Is(iErr, context.DeadlineExceeded) { iErr = NewErrorWithReason(ContextDeadlineExceededReason, iErr) } } if iErr != nil { return ctx, nil, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr) } return ctx, nil, nil } } } // SetNotifiesStage sets the notification information about passed alerts. The // passed alerts should have already been sent to the receivers. type SetNotifiesStage struct { nflog NotificationLog recv *nflogpb.Receiver } // NewSetNotifiesStage returns a new instance of a SetNotifiesStage. func NewSetNotifiesStage(l NotificationLog, recv *nflogpb.Receiver) *SetNotifiesStage { return &SetNotifiesStage{ nflog: l, recv: recv, } } // Exec implements the Stage interface. func (n SetNotifiesStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { gkey, ok := GroupKey(ctx) if !ok { return ctx, nil, errors.New("group key missing") } firing, ok := FiringAlerts(ctx) if !ok { return ctx, nil, errors.New("firing alerts missing") } resolved, ok := ResolvedAlerts(ctx) if !ok { return ctx, nil, errors.New("resolved alerts missing") } repeat, ok := RepeatInterval(ctx) if !ok { return ctx, nil, errors.New("repeat interval missing") } expiry := 2 * repeat return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry) } type timeStage struct { muter types.TimeMuter marker types.GroupMarker metrics *Metrics } type TimeMuteStage timeStage func NewTimeMuteStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeMuteStage { return &TimeMuteStage{muter, marker, metrics} } // Exec implements the stage interface for TimeMuteStage. // TimeMuteStage is responsible for muting alerts whose route is not in an active time. func (tms TimeMuteStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { routeID, ok := RouteID(ctx) if !ok { return ctx, nil, errors.New("route ID missing") } gkey, ok := GroupKey(ctx) if !ok { return ctx, nil, errors.New("group key missing") } muteTimeIntervalNames, ok := MuteTimeIntervalNames(ctx) if !ok { return ctx, alerts, nil } now, ok := Now(ctx) if !ok { return ctx, alerts, errors.New("missing now timestamp") } // Skip this stage if there are no mute timings. if len(muteTimeIntervalNames) == 0 { return ctx, alerts, nil } muted, mutedBy, err := tms.muter.Mutes(muteTimeIntervalNames, now) if err != nil { return ctx, alerts, err } // If muted is false then mutedBy is nil and the muted marker is removed. tms.marker.SetMuted(routeID, gkey, mutedBy) // If the current time is inside a mute time, all alerts are removed from the pipeline. if muted { tms.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonMuteTimeInterval).Add(float64(len(alerts))) l.Debug("Notifications not sent, route is within mute time", "alerts", len(alerts)) return ctx, nil, nil } return ctx, alerts, nil } type TimeActiveStage timeStage func NewTimeActiveStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeActiveStage { return &TimeActiveStage{muter, marker, metrics} } // Exec implements the stage interface for TimeActiveStage. // TimeActiveStage is responsible for muting alerts whose route is not in an active time. func (tas TimeActiveStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { routeID, ok := RouteID(ctx) if !ok { return ctx, nil, errors.New("route ID missing") } gkey, ok := GroupKey(ctx) if !ok { return ctx, nil, errors.New("group key missing") } activeTimeIntervalNames, ok := ActiveTimeIntervalNames(ctx) if !ok { return ctx, alerts, nil } // if we don't have active time intervals at all it is always active. if len(activeTimeIntervalNames) == 0 { return ctx, alerts, nil } now, ok := Now(ctx) if !ok { return ctx, alerts, errors.New("missing now timestamp") } active, _, err := tas.muter.Mutes(activeTimeIntervalNames, now) if err != nil { return ctx, alerts, err } var mutedBy []string if !active { // If the group is muted, then it must be muted by all active time intervals. // Otherwise, the group must be in at least one active time interval for it // to be active. mutedBy = activeTimeIntervalNames } tas.marker.SetMuted(routeID, gkey, mutedBy) // If the current time is not inside an active time, all alerts are removed from the pipeline if !active { tas.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonActiveTimeInterval).Add(float64(len(alerts))) l.Debug("Notifications not sent, route is not within active time", "alerts", len(alerts)) return ctx, nil, nil } return ctx, alerts, nil }