diff --git a/notify/notify.go b/notify/notify.go index a137f461..48758a18 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/inhibit" + "github.com/prometheus/alertmanager/nflog/nflogpb" "github.com/prometheus/alertmanager/provider" meshprov "github.com/prometheus/alertmanager/provider/mesh" "github.com/prometheus/alertmanager/template" @@ -92,6 +93,13 @@ func WithNow(ctx context.Context, t time.Time) context.Context { return context.WithValue(ctx, keyNow, t) } +// ReceiverName extracts a receiver name from the context. Iff none exists, the +// second argument is false. +func ReceiverName(ctx context.Context) (string, bool) { + v, ok := ctx.Value(keyReceiverName).(string) + return v, ok +} + func receiverName(ctx context.Context) string { recv, ok := ReceiverName(ctx) if !ok { @@ -100,13 +108,6 @@ func receiverName(ctx context.Context) string { return recv } -// ReceiverName extracts a receiver name from the context. Iff none exists, the -// second argument is false. -func ReceiverName(ctx context.Context) (string, bool) { - v, ok := ctx.Value(keyReceiverName).(string) - return v, ok -} - // RepeatInterval extracts a repeat interval from the context. Iff none exists, the // second argument is false. func RepeatInterval(ctx context.Context) (time.Duration, bool) { @@ -160,7 +161,7 @@ func (f StageFunc) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.A func BuildPipeline( confs []*config.Receiver, tmpl *template.Template, - meshWait func() time.Duration, + wait func() time.Duration, inhibitor *inhibit.Inhibitor, silences *meshprov.Silences, ni *meshprov.NotificationInfos, @@ -168,45 +169,34 @@ func BuildPipeline( ) RoutingStage { rs := RoutingStage{} - for _, rc := range confs { - rs[rc.Name] = createStage(BuildReceiverIntegrations(rc, tmpl), meshWait, inhibitor, silences, ni, marker) - } + is := NewInhibitStage(inhibitor, marker) + ss := NewSilenceStage(silences, marker) + for _, rc := range confs { + rs[rc.Name] = MultiStage{is, ss, createStage(rc, tmpl, wait, ni)} + } return rs } // createStage creates a pipeline of stages for a receiver. -func createStage( - receiverIntegrations []Integration, - meshWait func() time.Duration, - inhibitor *inhibit.Inhibitor, - silences *meshprov.Silences, - ni *meshprov.NotificationInfos, - marker types.Marker, -) Stage { - var ms MultiStage - ms = append(ms, NewLogStage(log.With("step", "inhibit"))) - ms = append(ms, NewInhibitStage(inhibitor, marker)) - ms = append(ms, NewLogStage(log.With("step", "silence"))) - ms = append(ms, NewSilenceStage(silences, marker)) - - var fs = FanoutStage{} - for _, i := range receiverIntegrations { +func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.Duration, ni *meshprov.NotificationInfos) Stage { + var fs FanoutStage + for _, i := range BuildReceiverIntegrations(rc, tmpl) { + recv := &nflogpb.Receiver{ + GroupName: rc.Name, + Integration: i.name, + Idx: uint32(i.idx), + } var s MultiStage - s = append(s, NewLogStage(log.With("step", "wait"))) - s = append(s, NewWaitStage(meshWait)) - s = append(s, NewLogStage(log.With("step", "filterResolved"))) + s = append(s, NewWaitStage(wait)) s = append(s, NewFilterResolvedStage(i.conf)) - s = append(s, NewLogStage(log.With("step", "dedup"))) - s = append(s, NewDedupStage(ni)) - s = append(s, NewLogStage(log.With("step", "integration"))) + s = append(s, NewDedupStage(ni, recv)) s = append(s, NewRetryStage(i)) - s = append(s, NewLogStage(log.With("step", "newNotifies"))) - s = append(s, NewSetNotifiesStage(ni)) - fs[fmt.Sprintf("%s/%d", i.name, i.idx)] = s - } + s = append(s, NewSetNotifiesStage(ni, recv)) - return append(ms, fs) + fs = append(fs, s) + } + return fs } // RoutingStage executes the inner stages based on the receiver specified in @@ -248,10 +238,10 @@ func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types } // FanoutStage executes its stages concurrently -type FanoutStage map[string]Stage +type FanoutStage []Stage -// Exec attempts to execute all stages concurrently. It returns a -// types.MultiError if any of them fails. +// Exec attempts to execute all stages concurrently and discards the results. +// It returns its input alerts and a types.MultiError if one or more stages fail. func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { var ( wg sync.WaitGroup @@ -259,31 +249,20 @@ func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*type ) wg.Add(len(fs)) - receiver, ok := ReceiverName(ctx) - if !ok { - return nil, fmt.Errorf("receiver missing") - } - - for suffix, s := range fs { - // Suffix the receiver with the unique key for the fanout. - foCtx := WithReceiverName(ctx, fmt.Sprintf("%s/%s", receiver, suffix)) - + for _, s := range fs { go func(s Stage) { - _, err := s.Exec(foCtx, alerts...) - if err != nil { + if _, err := s.Exec(ctx, alerts...); err != nil { me.Add(err) log.Errorf("Error on notify: %s", err) } wg.Done() }(s) } - wg.Wait() if me.Len() > 0 { - return nil, &me + return alerts, &me } - return alerts, nil } @@ -424,11 +403,15 @@ func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert) // Filtering happens based on a provider of NotifyInfos. type DedupStage struct { notifies provider.Notifies + recv *nflogpb.Receiver } // NewDedupStage wraps a DedupStage that runs against the given NotifyInfo provider. -func NewDedupStage(notifies provider.Notifies) *DedupStage { - return &DedupStage{notifies} +func NewDedupStage(notifies provider.Notifies, recv *nflogpb.Receiver) *DedupStage { + return &DedupStage{ + notifies: notifies, + recv: recv, + } } // hasUpdates checks an alert against the last notification that was made @@ -456,11 +439,6 @@ func (n *DedupStage) hasUpdate(alert *types.Alert, last *types.NotificationInfo, // Exec implements the Stage interface. func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { - name, ok := ReceiverName(ctx) - if !ok { - return nil, fmt.Errorf("notifier name missing") - } - repeatInterval, ok := RepeatInterval(ctx) if !ok { return nil, fmt.Errorf("repeat interval missing") @@ -476,7 +454,7 @@ func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types fps = append(fps, a.Fingerprint()) } - notifyInfo, err := n.notifies.Get(name, fps...) + notifyInfo, err := n.notifies.Get(n.recv.String(), fps...) if err != nil { return nil, err } @@ -542,23 +520,19 @@ func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types. // passed alerts should have already been sent to the receivers. type SetNotifiesStage struct { notifies provider.Notifies + recv *nflogpb.Receiver } // NewSetNotifiesStage returns a new instance of a SetNotifiesStage. -func NewSetNotifiesStage(notifies provider.Notifies) *SetNotifiesStage { +func NewSetNotifiesStage(notifies provider.Notifies, recv *nflogpb.Receiver) *SetNotifiesStage { return &SetNotifiesStage{ notifies: notifies, + recv: recv, } } // Exec implements the Stage interface. func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { - name, ok := ReceiverName(ctx) - if !ok { - return nil, fmt.Errorf("notifier name missing") - } - log.Errorln(name) - now, ok := Now(ctx) if !ok { return nil, fmt.Errorf("now time missing") @@ -569,7 +543,7 @@ func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]* for _, a := range alerts { newNotifies = append(newNotifies, &types.NotificationInfo{ Alert: a.Fingerprint(), - Receiver: name, + Receiver: n.recv.String(), Resolved: a.Resolved(), Timestamp: now, }) diff --git a/notify/notify_test.go b/notify/notify_test.go index ab65b135..3304d936 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/common/model" "golang.org/x/net/context" + "github.com/prometheus/alertmanager/nflog/nflogpb" "github.com/prometheus/alertmanager/types" "github.com/satori/go.uuid" ) @@ -32,7 +33,7 @@ func (s failStage) Exec(ctx context.Context, as ...*types.Alert) ([]*types.Alert return nil, fmt.Errorf("some error") } -func TestDedupingNotifierHasUpdate(t *testing.T) { +func TestDedupStageHasUpdate(t *testing.T) { var ( n = &DedupStage{} now = time.Now() @@ -228,15 +229,15 @@ func TestRoutingStage(t *testing.T) { } } -func TestDedupStage(t *testing.T) { +func TestSetNotifiesStage(t *testing.T) { var ( notifies = newTestInfos() - stage = NewSetNotifiesStage(notifies) + recv = &nflogpb.Receiver{GroupName: "name"} + stage = NewSetNotifiesStage(notifies, recv) ctx = context.Background() ) now := time.Now() - ctx = WithReceiverName(ctx, "name") ctx = WithRepeatInterval(ctx, time.Duration(100*time.Minute)) ctx = WithNow(ctx, now) @@ -259,7 +260,7 @@ func TestDedupStage(t *testing.T) { nil, { Alert: alerts[1].Fingerprint(), - Receiver: "name", + Receiver: recv.String(), Resolved: false, Timestamp: now.Add(-10 * time.Minute), }, @@ -274,7 +275,7 @@ func TestDedupStage(t *testing.T) { t.Fatalf("Exec failed: %s", err) } - nsCur, err := notifies.Get("name", alerts[0].Fingerprint(), alerts[1].Fingerprint()) + nsCur, err := notifies.Get(recv.String(), alerts[0].Fingerprint(), alerts[1].Fingerprint()) if err != nil { t.Fatalf("Error getting notifies: %s", err) } @@ -282,13 +283,13 @@ func TestDedupStage(t *testing.T) { nsAfter := []*types.NotificationInfo{ { Alert: alerts[0].Fingerprint(), - Receiver: "name", + Receiver: recv.String(), Resolved: false, Timestamp: now, }, { Alert: alerts[1].Fingerprint(), - Receiver: "name", + Receiver: recv.String(), Resolved: true, Timestamp: now, },