rework building of stage pipelines

This commit is contained in:
Frederic Branczyk 2016-08-12 19:18:26 +02:00
parent 840dd7d2f5
commit 7bc851e894
5 changed files with 525 additions and 401 deletions

View File

@ -150,8 +150,8 @@ func main() {
var (
inhibitor *inhibit.Inhibitor
tmpl *template.Template
pipeline notify.Stage
disp *dispatch.Dispatcher
pipeline *notify.Pipeline
)
defer disp.Stop()
@ -193,7 +193,7 @@ func main() {
disp.Stop()
inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker)
pipeline = notify.NewPipeline(conf.Receivers, tmpl, meshWait(mrouter, 5*time.Second), inhibitor, silences, ni, marker)
pipeline = notify.BuildPipeline(conf.Receivers, tmpl, meshWait(mrouter, 5*time.Second), inhibitor, silences, ni, marker)
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker)
go disp.Run()

View File

@ -19,9 +19,9 @@ import (
// Dispatcher sorts incoming alerts into aggregation groups and
// assigns the correct notifiers to each.
type Dispatcher struct {
route *Route
alerts provider.Alerts
notifier notify.Notifier
route *Route
alerts provider.Alerts
stage notify.Stage
marker types.Marker
@ -36,13 +36,13 @@ type Dispatcher struct {
}
// NewDispatcher returns a new Dispatcher.
func NewDispatcher(ap provider.Alerts, r *Route, n notify.Notifier, mk types.Marker) *Dispatcher {
func NewDispatcher(ap provider.Alerts, r *Route, s notify.Stage, mk types.Marker) *Dispatcher {
disp := &Dispatcher{
alerts: ap,
notifier: n,
route: r,
marker: mk,
log: log.With("component", "dispatcher"),
alerts: ap,
stage: s,
route: r,
marker: mk,
log: log.With("component", "dispatcher"),
}
return disp
}
@ -234,7 +234,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
groups[fp] = ag
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
err := d.notifier.Notify(ctx, alerts...)
_, err := d.stage.Exec(ctx, alerts...)
if err != nil {
log.Errorf("Notify for %d alerts failed: %s", len(alerts), err)
}

View File

@ -30,7 +30,6 @@ import (
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -41,113 +40,77 @@ import (
"github.com/prometheus/alertmanager/types"
)
var (
numNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_total",
Help: "The total number of attempted notifications.",
}, []string{"integration"})
numFailedNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_failed_total",
Help: "The total number of failed notifications.",
}, []string{"integration"})
)
func init() {
prometheus.Register(numNotifications)
prometheus.Register(numFailedNotifications)
}
type notifierConfig interface {
SendResolved() bool
}
type NotifierFunc func(context.Context, ...*types.Alert) error
func (f NotifierFunc) Notify(ctx context.Context, alerts ...*types.Alert) error {
return f(ctx, alerts...)
// A Notifier notifies about alerts under constraints of the given context.
type Notifier interface {
Notify(context.Context, ...*types.Alert) error
}
type integration interface {
Notifier
name() string
// An Integration wraps a notifier and its config to be uniquely identified by
// name and index from its origin in the configuration.
type Integration struct {
notifier Notifier
conf notifierConfig
name string
idx int
}
// BuildReceiverTree builds a tree of receivers and a unique id of notification providers.
func BuildReceiverTree(confs []*config.Receiver, tmpl *template.Template) map[string]map[string]Notifier {
res := make(map[string]map[string]Notifier)
// Notify implements the Notifier interface.
func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) error {
return i.notifier.Notify(ctx, alerts...)
}
filter := func(n integration, c notifierConfig) Notifier {
return NotifierFunc(func(ctx context.Context, alerts ...*types.Alert) error {
var res []*types.Alert
// BuildReceiverIntegrations builds a list of integration notifiers off of a
// receivers config.
func BuildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template) []Integration {
var (
integrations []Integration
add = func(name string, i int, n Notifier, nc notifierConfig) {
integrations = append(integrations, Integration{
notifier: n,
conf: nc,
name: name,
idx: i,
})
}
)
if c.SendResolved() {
res = alerts
} else {
for _, a := range alerts {
if a.Status() != model.AlertResolved {
res = append(res, a)
}
}
}
if len(res) == 0 {
return nil
}
err := n.Notify(ctx, res...)
if err != nil {
numFailedNotifications.WithLabelValues(n.name()).Inc()
}
numNotifications.WithLabelValues(n.name()).Inc()
return err
})
for i, c := range nc.WebhookConfigs {
n := NewWebhook(c, tmpl)
add("webhook", i, n, c)
}
for _, nc := range confs {
var (
integrations = make(map[string]Notifier)
add = func(i int, on integration, n Notifier) { integrations[fmt.Sprintf("%s/%d", on.name(), i)] = n }
)
for i, c := range nc.WebhookConfigs {
n := NewWebhook(c, tmpl)
add(i, n, filter(n, c))
}
for i, c := range nc.EmailConfigs {
n := NewEmail(c, tmpl)
add(i, n, filter(n, c))
}
for i, c := range nc.PagerdutyConfigs {
n := NewPagerDuty(c, tmpl)
add(i, n, filter(n, c))
}
for i, c := range nc.OpsGenieConfigs {
n := NewOpsGenie(c, tmpl)
add(i, n, filter(n, c))
}
for i, c := range nc.SlackConfigs {
n := NewSlack(c, tmpl)
add(i, n, filter(n, c))
}
for i, c := range nc.HipchatConfigs {
n := NewHipchat(c, tmpl)
add(i, n, filter(n, c))
}
for i, c := range nc.VictorOpsConfigs {
n := NewVictorOps(c, tmpl)
add(i, n, filter(n, c))
}
for i, c := range nc.PushoverConfigs {
n := NewPushover(c, tmpl)
add(i, n, filter(n, c))
}
res[nc.Name] = integrations
for i, c := range nc.EmailConfigs {
n := NewEmail(c, tmpl)
add("email", i, n, c)
}
return res
for i, c := range nc.PagerdutyConfigs {
n := NewPagerDuty(c, tmpl)
add("pagerduty", i, n, c)
}
for i, c := range nc.OpsGenieConfigs {
n := NewOpsGenie(c, tmpl)
add("opsgenie", i, n, c)
}
for i, c := range nc.SlackConfigs {
n := NewSlack(c, tmpl)
add("slack", i, n, c)
}
for i, c := range nc.HipchatConfigs {
n := NewHipchat(c, tmpl)
add("hipchat", i, n, c)
}
for i, c := range nc.VictorOpsConfigs {
n := NewVictorOps(c, tmpl)
add("victorops", i, n, c)
}
for i, c := range nc.PushoverConfigs {
n := NewPushover(c, tmpl)
add("pushover", i, n, c)
}
return integrations
}
const contentTypeJSON = "application/json"
@ -164,8 +127,6 @@ func NewWebhook(conf *config.WebhookConfig, t *template.Template) *Webhook {
return &Webhook{URL: conf.URL, tmpl: t}
}
func (*Webhook) name() string { return "webhook" }
// WebhookMessage defines the JSON object send to webhook endpoints.
type WebhookMessage struct {
*template.Data
@ -228,8 +189,6 @@ func NewEmail(c *config.EmailConfig, t *template.Template) *Email {
return &Email{conf: c, tmpl: t}
}
func (*Email) name() string { return "email" }
// auth resolves a string of authentication mechanisms.
func (n *Email) auth(mechs string) (smtp.Auth, error) {
username := n.conf.AuthUsername
@ -377,8 +336,6 @@ func NewPagerDuty(c *config.PagerdutyConfig, t *template.Template) *PagerDuty {
return &PagerDuty{conf: c, tmpl: t}
}
func (*PagerDuty) name() string { return "pagerduty" }
const (
pagerDutyEventTrigger = "trigger"
pagerDutyEventResolve = "resolve"
@ -467,8 +424,6 @@ func NewSlack(conf *config.SlackConfig, tmpl *template.Template) *Slack {
}
}
func (*Slack) name() string { return "slack" }
// slackReq is the request for sending a slack notification.
type slackReq struct {
Channel string `json:"channel,omitempty"`
@ -558,8 +513,6 @@ func NewHipchat(conf *config.HipchatConfig, tmpl *template.Template) *Hipchat {
}
}
func (*Hipchat) name() string { return "hipchat" }
type hipchatReq struct {
From string `json:"from"`
Notify bool `json:"notify"`
@ -626,8 +579,6 @@ func NewOpsGenie(c *config.OpsGenieConfig, t *template.Template) *OpsGenie {
return &OpsGenie{conf: c, tmpl: t}
}
func (*OpsGenie) name() string { return "opsgenie" }
type opsGenieMessage struct {
APIKey string `json:"apiKey"`
Alias model.Fingerprint `json:"alias"`
@ -750,8 +701,6 @@ func NewVictorOps(c *config.VictorOpsConfig, t *template.Template) *VictorOps {
}
}
func (*VictorOps) name() string { return "victorops" }
const (
victorOpsEventTrigger = "CRITICAL"
victorOpsEventResolve = "RECOVERY"
@ -849,8 +798,6 @@ func NewPushover(c *config.PushoverConfig, t *template.Template) *Pushover {
return &Pushover{conf: c, tmpl: t}
}
func (*Pushover) name() string { return "pushover" }
// Notify implements the Notifier interface.
func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) error {
key, ok := GroupKey(ctx)

View File

@ -19,6 +19,7 @@ import (
"time"
"github.com/cenkalti/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -31,6 +32,25 @@ import (
"github.com/prometheus/alertmanager/types"
)
var (
numNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_total",
Help: "The total number of attempted notifications.",
}, []string{"integration"})
numFailedNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_failed_total",
Help: "The total number of failed notifications.",
}, []string{"integration"})
)
func init() {
prometheus.Register(numNotifications)
prometheus.Register(numFailedNotifications)
}
// MinTimeout is the minimum timeout that is set for the context of a call
// to a notification pipeline.
const MinTimeout = 10 * time.Second
@ -123,21 +143,102 @@ func Now(ctx context.Context) (time.Time, bool) {
return v, ok
}
// A Notifier is a type which notifies about alerts under constraints of the
// given context.
type Notifier interface {
Notify(context.Context, ...*types.Alert) error
}
// A Stage processes alerts under the constraints of the given context.
type Stage interface {
Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error)
}
// StageFunc wraps a function to represent a Stage.
type StageFunc func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error)
// Exec implements Stage interface.
func (f StageFunc) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
return f(ctx, alerts...)
}
// BuildPipeline builds a map of receivers to Stages.
func BuildPipeline(
confs []*config.Receiver,
tmpl *template.Template,
meshWait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silences *meshprov.Silences,
ni *meshprov.NotificationInfos,
marker types.Marker,
) RoutingStage {
rs := RoutingStage{}
for _, rc := range confs {
rs[rc.Name] = createStage(BuildReceiverIntegrations(rc, tmpl), meshWait, inhibitor, silences, ni, marker)
}
return rs
}
// createStage creates a pipeline of stages for a receiver.
func createStage(
receiverIntegrations []Integration,
meshWait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silences *meshprov.Silences,
ni *meshprov.NotificationInfos,
marker types.Marker,
) Stage {
var ms MultiStage
ms = append(ms, NewLogStage(log.With("step", "inhibit")))
ms = append(ms, NewInhibitStage(inhibitor, marker))
ms = append(ms, NewLogStage(log.With("step", "silence")))
ms = append(ms, NewSilenceStage(silences, marker))
var fs = FanoutStage{}
for _, i := range receiverIntegrations {
var s MultiStage
s = append(s, NewLogStage(log.With("step", "wait")))
s = append(s, NewWaitStage(meshWait))
s = append(s, NewLogStage(log.With("step", "filterResolved")))
s = append(s, NewFilterResolvedStage(i.conf))
s = append(s, NewLogStage(log.With("step", "dedup")))
s = append(s, NewDedupStage(ni))
s = append(s, NewLogStage(log.With("step", "integration")))
s = append(s, NewRetryStage(i))
s = append(s, NewLogStage(log.With("step", "newNotifies")))
s = append(s, NewSetNotifiesStage(ni))
fs[fmt.Sprintf("%s/%d", i.name, i.idx)] = s
}
return append(ms, fs)
}
// RoutingStage executes the inner stages based on the receiver specified in
// the context.
type RoutingStage map[string]Stage
// Exec implements the Stage interface.
func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
receiver, ok := Receiver(ctx)
if !ok {
return nil, fmt.Errorf("receiver missing")
}
s, ok := rs[receiver]
if !ok {
return nil, fmt.Errorf("stage for receiver missing")
}
return s.Exec(ctx, alerts...)
}
// A MultiStage executes a series of stages sequencially.
type MultiStage []Stage
// Exec implements the Stage interface.
func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
var err error
for _, s := range ms {
if len(alerts) == 0 {
return nil, nil
}
alerts, err = s.Exec(ctx, alerts...)
if err != nil {
return nil, err
@ -146,83 +247,11 @@ func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types
return alerts, nil
}
// FanoutStage executes its stages concurrently
type FanoutStage map[string]Stage
func createStage(receiverNotifiers map[string]Notifier, receiverName string, meshWait func() time.Duration, inhibitor *inhibit.Inhibitor, silences *meshprov.Silences, ni *meshprov.NotificationInfos, marker types.Marker) Stage {
var ms MultiStage
ms = append(ms, NewLogStage(log.With("step", "inhibit")))
ms = append(ms, NewInhibitStage(inhibitor, marker))
ms = append(ms, NewLogStage(log.With("step", "silence")))
ms = append(ms, NewSilenceStage(silences, marker))
var fs = make(FanoutStage)
for integrationId, integration := range receiverNotifiers {
var s MultiStage
s = append(s, NewLogStage(log.With("step", "wait")))
s = append(s, NewWaitStage(meshWait))
s = append(s, NewLogStage(log.With("step", "integration")))
s = append(s, NewIntegrationStage(integration, ni))
fs[integrationId] = s
}
return append(ms, fs)
}
func NewPipeline(rcvs []*config.Receiver, tmpl *template.Template, meshWait func() time.Duration, inhibitor *inhibit.Inhibitor, silences *meshprov.Silences, ni *meshprov.NotificationInfos, marker types.Marker) *Pipeline {
receiverConfigs := BuildReceiverTree(rcvs, tmpl)
receiverPipelines := make(map[string]Stage)
for receiver, notifiers := range receiverConfigs {
receiverPipelines[receiver] = createStage(notifiers, receiver, meshWait, inhibitor, silences, ni, marker)
}
return &Pipeline{receiverPipelines: receiverPipelines}
}
type Pipeline struct {
receiverPipelines map[string]Stage
}
func (p *Pipeline) Notify(ctx context.Context, alerts ...*types.Alert) error {
r, ok := Receiver(ctx)
if !ok {
return fmt.Errorf("receiver missing")
}
_, err := p.receiverPipelines[r].Exec(ctx, alerts...)
return err
}
type IntegrationStage struct {
notificationInfos *meshprov.NotificationInfos
notifier Notifier
}
func NewIntegrationStage(n Notifier, ni *meshprov.NotificationInfos) *IntegrationStage {
return &IntegrationStage{
notificationInfos: ni,
notifier: n,
}
}
func (i IntegrationStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
newNotifies, err := Dedup(i.notificationInfos).ExtractNewNotifies(ctx, alerts...)
if err != nil {
return nil, err
}
if newNotifies != nil {
err = Retry(i.notifier, ctx, alerts...)
if err != nil {
return nil, err
}
}
return nil, i.notificationInfos.Set(newNotifies...)
}
// Exec attempts to notify all Notifiers concurrently. It returns a types.MultiError
// if any of them fails.
// Exec attempts to execute all stages concurrently. It returns a
// types.MultiError if any of them fails.
func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
var (
wg sync.WaitGroup
@ -255,182 +284,23 @@ func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*type
return nil, &me
}
return nil, nil
}
// Retry calls the passed notifier with exponential backoff until it succeeds.
// It aborts if the context is canceled or timed out.
func Retry(n Notifier, ctx context.Context, alerts ...*types.Alert) error {
var (
i = 0
b = backoff.NewExponentialBackOff()
tick = backoff.NewTicker(b)
)
defer tick.Stop()
for {
i++
// Always check the context first to not notify again.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
select {
case <-tick.C:
if err := n.Notify(ctx, alerts...); err != nil {
log.Warnf("Notify attempt %d failed: %s", i, err)
} else {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}
// Deduplicator filters alerts.
// Filtering happens based on a provider of NotifyInfos.
type Deduplicator struct {
notifies provider.Notifies
}
// Dedup wraps a Deduplicator that runs against the given NotifyInfo provider.
func Dedup(notifies provider.Notifies) *Deduplicator {
return &Deduplicator{notifies: notifies}
}
// hasUpdates checks an alert against the last notification that was made
// about it.
func (n *Deduplicator) hasUpdate(alert *types.Alert, last *types.NotificationInfo, now time.Time, interval time.Duration) bool {
if last != nil {
if alert.Resolved() {
if last.Resolved {
return false
}
} else if !last.Resolved {
// Do not send again if last was delivered unless
// the repeat interval has already passed.
if !now.After(last.Timestamp.Add(interval)) {
return false
}
}
} else if alert.Resolved() {
// If the alert is resolved but we never notified about it firing,
// there is nothing to do.
return false
}
return true
}
// ExtractNewNotifies filters out the notifications that shall be sent
func (n *Deduplicator) ExtractNewNotifies(ctx context.Context, alerts ...*types.Alert) ([]*types.NotificationInfo, error) {
name, ok := Receiver(ctx)
if !ok {
return nil, fmt.Errorf("notifier name missing")
}
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return nil, fmt.Errorf("repeat interval missing")
}
now, ok := Now(ctx)
if !ok {
return nil, fmt.Errorf("now time missing")
}
var fps []model.Fingerprint
for _, a := range alerts {
fps = append(fps, a.Fingerprint())
}
notifyInfo, err := n.notifies.Get(name, fps...)
if err != nil {
return nil, err
}
// If we have to notify about any of the alerts, we send a notification
// for the entire batch.
var send bool
for i, alert := range alerts {
if n.hasUpdate(alert, notifyInfo[i], now, repeatInterval) {
send = true
break
}
}
if !send {
return nil, nil
}
var newNotifies []*types.NotificationInfo
for _, a := range alerts {
newNotifies = append(newNotifies, &types.NotificationInfo{
Alert: a.Fingerprint(),
Receiver: name,
Resolved: a.Resolved(),
Timestamp: now,
})
}
return newNotifies, nil
}
// WaitStage waits for a certain amount of time before continuing or until the
// context is done.
type WaitStage struct {
wait func() time.Duration
}
// NewWaitStage returns a new WaitStage.
func NewWaitStage(wait func() time.Duration) *WaitStage {
return &WaitStage{
wait: wait,
}
}
// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
return nil, ctx.Err()
}
return alerts, nil
}
// SilenceStage filters alerts through a silence muter.
type SilenceStage struct {
muter types.Muter
marker types.Marker
// LogStage logs the passed alerts with a given logger.
type LogStage struct {
log log.Logger
}
// NewSilenceStage returns a new SilenceStage.
func NewSilenceStage(m types.Muter, mk types.Marker) *SilenceStage {
return &SilenceStage{
muter: m,
marker: mk,
}
func NewLogStage(log log.Logger) *LogStage {
return &LogStage{log: log}
}
// Exec implements the Stage interface.
func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
_, ok := n.marker.Silenced(a.Fingerprint())
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
if !n.muter.Mutes(a.Labels) {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
// Store whether a previously silenced alert is firing again.
a.WasSilenced = ok
}
}
func (l *LogStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
l.log.Debugf("notify %v", alerts)
return filtered, nil
return alerts, nil
}
// InhibitStage filters alerts through an inhibition muter.
@ -465,16 +335,244 @@ func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*typ
return filtered, nil
}
type LogStage struct {
log log.Logger
// SilenceStage filters alerts through a silence muter.
type SilenceStage struct {
muter types.Muter
marker types.Marker
}
func NewLogStage(log log.Logger) *LogStage {
return &LogStage{log: log}
// NewSilenceStage returns a new SilenceStage.
func NewSilenceStage(m types.Muter, mk types.Marker) *SilenceStage {
return &SilenceStage{
muter: m,
marker: mk,
}
}
func (l *LogStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
l.log.Debugf("notify %v", alerts)
// Exec implements the Stage interface.
func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
_, ok := n.marker.Silenced(a.Fingerprint())
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
if !n.muter.Mutes(a.Labels) {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
// Store whether a previously silenced alert is firing again.
a.WasSilenced = ok
}
}
return filtered, nil
}
// WaitStage waits for a certain amount of time before continuing or until the
// context is done.
type WaitStage struct {
wait func() time.Duration
}
// NewWaitStage returns a new WaitStage.
func NewWaitStage(wait func() time.Duration) *WaitStage {
return &WaitStage{
wait: wait,
}
}
// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
return nil, ctx.Err()
}
return alerts, nil
}
// FilterResolvedStage filters alerts based on a given notifierConfig. Either
// returns all alerts or only those that are not resolved.
type FilterResolvedStage struct {
conf notifierConfig
}
// NewFilterRecolvedStage returns a new instance of a FilterResolvedStage.
func NewFilterResolvedStage(conf notifierConfig) *FilterResolvedStage {
return &FilterResolvedStage{
conf: conf,
}
}
// Exec implements the Stage interface.
func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
var res []*types.Alert
if fr.conf.SendResolved() {
res = alerts
} else {
for _, a := range alerts {
if a.Status() != model.AlertResolved {
res = append(res, a)
}
}
}
return res, nil
}
// DedupStage filters alerts.
// Filtering happens based on a provider of NotifyInfos.
type DedupStage struct {
notifies provider.Notifies
}
// NewDedupStage wraps a DedupStage that runs against the given NotifyInfo provider.
func NewDedupStage(notifies provider.Notifies) *DedupStage {
return &DedupStage{notifies}
}
// hasUpdates checks an alert against the last notification that was made
// about it.
func (n *DedupStage) hasUpdate(alert *types.Alert, last *types.NotificationInfo, now time.Time, interval time.Duration) bool {
if last != nil {
if alert.Resolved() {
if last.Resolved {
return false
}
} else if !last.Resolved {
// Do not send again if last was delivered unless
// the repeat interval has already passed.
if !now.After(last.Timestamp.Add(interval)) {
return false
}
}
} else if alert.Resolved() {
// If the alert is resolved but we never notified about it firing,
// there is nothing to do.
return false
}
return true
}
// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
name, ok := Receiver(ctx)
if !ok {
return nil, fmt.Errorf("notifier name missing")
}
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return nil, fmt.Errorf("repeat interval missing")
}
now, ok := Now(ctx)
if !ok {
return nil, fmt.Errorf("now time missing")
}
var fps []model.Fingerprint
for _, a := range alerts {
fps = append(fps, a.Fingerprint())
}
notifyInfo, err := n.notifies.Get(name, fps...)
if err != nil {
return nil, err
}
// If we have to notify about any of the alerts, we send a notification
// for the entire batch.
for i, alert := range alerts {
if n.hasUpdate(alert, notifyInfo[i], now, repeatInterval) {
return alerts, nil
}
}
return nil, nil
}
// RetryStage notifies via passed integration with exponential backoff until it
// succeeds. It aborts if the context is canceled or timed out.
type RetryStage struct {
integration Integration
}
// NewRetryStage returns a new instance of a RetryStage.
func NewRetryStage(i Integration) *RetryStage {
return &RetryStage{
integration: i,
}
}
// Exec implements the Stage interface.
func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
var (
i = 0
b = backoff.NewExponentialBackOff()
tick = backoff.NewTicker(b)
)
defer tick.Stop()
for {
i++
// Always check the context first to not notify again.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
select {
case <-tick.C:
if err := r.integration.Notify(ctx, alerts...); err != nil {
numFailedNotifications.WithLabelValues(r.integration.name).Inc()
log.Warnf("Notify attempt %d failed: %s", i, err)
} else {
numNotifications.WithLabelValues(r.integration.name).Inc()
return alerts, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// SetNotifiesStage sets the notification information about passed alerts. The
// passed alerts should have already been sent to the receivers.
type SetNotifiesStage struct {
notifies provider.Notifies
}
// NewSetNotifiesStage returns a new instance of a SetNotifiesStage.
func NewSetNotifiesStage(notifies provider.Notifies) *SetNotifiesStage {
return &SetNotifiesStage{
notifies: notifies,
}
}
// Exec implements the Stage interface.
func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
name, ok := Receiver(ctx)
if !ok {
return nil, fmt.Errorf("notifier name missing")
}
now, ok := Now(ctx)
if !ok {
return nil, fmt.Errorf("now time missing")
}
var newNotifies []*types.NotificationInfo
for _, a := range alerts {
newNotifies = append(newNotifies, &types.NotificationInfo{
Alert: a.Fingerprint(),
Receiver: name,
Resolved: a.Resolved(),
Timestamp: now,
})
}
return alerts, n.notifies.Set(newNotifies...)
}

View File

@ -26,26 +26,15 @@ import (
"github.com/satori/go.uuid"
)
type recordNotifier struct {
ctx context.Context
alerts []*types.Alert
}
type failStage struct{}
func (n *recordNotifier) Notify(ctx context.Context, as ...*types.Alert) error {
n.ctx = ctx
n.alerts = append(n.alerts, as...)
return nil
}
type failNotifier struct{}
func (n *failNotifier) Notify(ctx context.Context, as ...*types.Alert) error {
return fmt.Errorf("some error")
func (s failStage) Exec(ctx context.Context, as ...*types.Alert) ([]*types.Alert, error) {
return nil, fmt.Errorf("some error")
}
func TestDedupingNotifierHasUpdate(t *testing.T) {
var (
n = &Deduplicator{}
n = &DedupStage{}
now = time.Now()
interval = 100 * time.Minute
)
@ -166,10 +155,83 @@ func TestDedupingNotifierHasUpdate(t *testing.T) {
}
}
func TestDedupingNewNotifiesExtraction(t *testing.T) {
func TestMultiStage(t *testing.T) {
var (
alerts1 = []*types.Alert{{}}
alerts2 = []*types.Alert{{}, {}}
alerts3 = []*types.Alert{{}, {}, {}}
)
stage := MultiStage{
StageFunc(func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
if !reflect.DeepEqual(alerts, alerts1) {
t.Fatal("Input not equal to input of MultiStage")
}
return alerts2, nil
}),
StageFunc(func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
if !reflect.DeepEqual(alerts, alerts2) {
t.Fatal("Input not equal to output of previous stage")
}
return alerts3, nil
}),
}
alerts, err := stage.Exec(context.Background(), alerts1...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
if !reflect.DeepEqual(alerts, alerts3) {
t.Fatal("Output of MultiStage is not equal to the output of the last stage")
}
}
func TestMultiStageFailure(t *testing.T) {
var (
ctx = context.Background()
s1 = failStage{}
stage = MultiStage{s1}
)
_, err := stage.Exec(ctx, nil)
if err.Error() != "some error" {
t.Fatal("Errors were not propagated correctly by MultiStage")
}
}
func TestRoutingStage(t *testing.T) {
var (
alerts1 = []*types.Alert{{}}
alerts2 = []*types.Alert{{}, {}}
)
stage := RoutingStage{
"name": StageFunc(func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
if !reflect.DeepEqual(alerts, alerts1) {
t.Fatal("Input not equal to input of RoutingStage")
}
return alerts2, nil
}),
"not": failStage{},
}
ctx := WithReceiver(context.Background(), "name")
alerts, err := stage.Exec(ctx, alerts1...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
if !reflect.DeepEqual(alerts, alerts2) {
t.Fatal("Output of RoutingStage is not equal to the output of the inner stage")
}
}
func TestDedupStage(t *testing.T) {
var (
notifies = newTestInfos()
deduper = Dedup(notifies)
stage = NewSetNotifiesStage(notifies)
ctx = context.Background()
)
now := time.Now()
@ -207,9 +269,14 @@ func TestDedupingNewNotifiesExtraction(t *testing.T) {
t.Fatalf("Setting notifies failed: %s", err)
}
newNotifies, err := deduper.ExtractNewNotifies(ctx, alerts...)
_, err := stage.Exec(ctx, alerts...)
if err != nil {
t.Fatalf("Notify failed: %s", err)
t.Fatalf("Exec failed: %s", err)
}
nsCur, err := notifies.Get("name", alerts[0].Fingerprint(), alerts[1].Fingerprint())
if err != nil {
t.Fatalf("Error getting notifies: %s", err)
}
nsAfter := []*types.NotificationInfo{
@ -227,8 +294,20 @@ func TestDedupingNewNotifiesExtraction(t *testing.T) {
},
}
if !reflect.DeepEqual(nsAfter, newNotifies) {
t.Errorf("Unexpected notifies, expected: %v, got: %v", nsAfter, newNotifies)
for i, after := range nsAfter {
cur := nsCur[i]
// Hack correct timestamps back in if they are sane.
if cur != nil && after.Timestamp.IsZero() {
if cur.Timestamp.Before(now) {
t.Fatalf("Wrong timestamp for notify %v", cur)
}
after.Timestamp = cur.Timestamp
}
if !reflect.DeepEqual(after, cur) {
t.Errorf("Unexpected notifies, expected: %v, got: %v", after, cur)
}
}
}
@ -272,7 +351,7 @@ func TestSilenceStage(t *testing.T) {
alerts, err := silencer.Exec(nil, inAlerts...)
if err != nil {
t.Fatalf("Notifying failed: %s", err)
t.Fatalf("Exec failed: %s", err)
}
var got []model.LabelSet
@ -328,7 +407,7 @@ func TestInhibitStage(t *testing.T) {
alerts, err := inhibitor.Exec(nil, inAlerts...)
if err != nil {
t.Fatalf("Notifying failed: %s", err)
t.Fatalf("Exec failed: %s", err)
}
var got []model.LabelSet