notify: embed nflogpb.Receiver in stage

This commit directly adds the nflogpb.Receiver object to stage
objects at stage creation time. Hence, we no longer rely on a value from
within the context.
This commit is contained in:
Fabian Reinartz 2016-08-16 14:32:24 +02:00
parent 998a9ce38e
commit ed4f295c70
2 changed files with 54 additions and 79 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/provider"
meshprov "github.com/prometheus/alertmanager/provider/mesh"
"github.com/prometheus/alertmanager/template"
@ -92,6 +93,13 @@ func WithNow(ctx context.Context, t time.Time) context.Context {
return context.WithValue(ctx, keyNow, t)
}
// ReceiverName extracts a receiver name from the context. Iff none exists, the
// second argument is false.
func ReceiverName(ctx context.Context) (string, bool) {
v, ok := ctx.Value(keyReceiverName).(string)
return v, ok
}
func receiverName(ctx context.Context) string {
recv, ok := ReceiverName(ctx)
if !ok {
@ -100,13 +108,6 @@ func receiverName(ctx context.Context) string {
return recv
}
// ReceiverName extracts a receiver name from the context. Iff none exists, the
// second argument is false.
func ReceiverName(ctx context.Context) (string, bool) {
v, ok := ctx.Value(keyReceiverName).(string)
return v, ok
}
// RepeatInterval extracts a repeat interval from the context. Iff none exists, the
// second argument is false.
func RepeatInterval(ctx context.Context) (time.Duration, bool) {
@ -160,7 +161,7 @@ func (f StageFunc) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.A
func BuildPipeline(
confs []*config.Receiver,
tmpl *template.Template,
meshWait func() time.Duration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silences *meshprov.Silences,
ni *meshprov.NotificationInfos,
@ -168,45 +169,34 @@ func BuildPipeline(
) RoutingStage {
rs := RoutingStage{}
for _, rc := range confs {
rs[rc.Name] = createStage(BuildReceiverIntegrations(rc, tmpl), meshWait, inhibitor, silences, ni, marker)
}
is := NewInhibitStage(inhibitor, marker)
ss := NewSilenceStage(silences, marker)
for _, rc := range confs {
rs[rc.Name] = MultiStage{is, ss, createStage(rc, tmpl, wait, ni)}
}
return rs
}
// createStage creates a pipeline of stages for a receiver.
func createStage(
receiverIntegrations []Integration,
meshWait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silences *meshprov.Silences,
ni *meshprov.NotificationInfos,
marker types.Marker,
) Stage {
var ms MultiStage
ms = append(ms, NewLogStage(log.With("step", "inhibit")))
ms = append(ms, NewInhibitStage(inhibitor, marker))
ms = append(ms, NewLogStage(log.With("step", "silence")))
ms = append(ms, NewSilenceStage(silences, marker))
var fs = FanoutStage{}
for _, i := range receiverIntegrations {
func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.Duration, ni *meshprov.NotificationInfos) Stage {
var fs FanoutStage
for _, i := range BuildReceiverIntegrations(rc, tmpl) {
recv := &nflogpb.Receiver{
GroupName: rc.Name,
Integration: i.name,
Idx: uint32(i.idx),
}
var s MultiStage
s = append(s, NewLogStage(log.With("step", "wait")))
s = append(s, NewWaitStage(meshWait))
s = append(s, NewLogStage(log.With("step", "filterResolved")))
s = append(s, NewWaitStage(wait))
s = append(s, NewFilterResolvedStage(i.conf))
s = append(s, NewLogStage(log.With("step", "dedup")))
s = append(s, NewDedupStage(ni))
s = append(s, NewLogStage(log.With("step", "integration")))
s = append(s, NewDedupStage(ni, recv))
s = append(s, NewRetryStage(i))
s = append(s, NewLogStage(log.With("step", "newNotifies")))
s = append(s, NewSetNotifiesStage(ni))
fs[fmt.Sprintf("%s/%d", i.name, i.idx)] = s
}
s = append(s, NewSetNotifiesStage(ni, recv))
return append(ms, fs)
fs = append(fs, s)
}
return fs
}
// RoutingStage executes the inner stages based on the receiver specified in
@ -248,10 +238,10 @@ func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types
}
// FanoutStage executes its stages concurrently
type FanoutStage map[string]Stage
type FanoutStage []Stage
// Exec attempts to execute all stages concurrently. It returns a
// types.MultiError if any of them fails.
// Exec attempts to execute all stages concurrently and discards the results.
// It returns its input alerts and a types.MultiError if one or more stages fail.
func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
var (
wg sync.WaitGroup
@ -259,31 +249,20 @@ func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*type
)
wg.Add(len(fs))
receiver, ok := ReceiverName(ctx)
if !ok {
return nil, fmt.Errorf("receiver missing")
}
for suffix, s := range fs {
// Suffix the receiver with the unique key for the fanout.
foCtx := WithReceiverName(ctx, fmt.Sprintf("%s/%s", receiver, suffix))
for _, s := range fs {
go func(s Stage) {
_, err := s.Exec(foCtx, alerts...)
if err != nil {
if _, err := s.Exec(ctx, alerts...); err != nil {
me.Add(err)
log.Errorf("Error on notify: %s", err)
}
wg.Done()
}(s)
}
wg.Wait()
if me.Len() > 0 {
return nil, &me
return alerts, &me
}
return alerts, nil
}
@ -424,11 +403,15 @@ func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert)
// Filtering happens based on a provider of NotifyInfos.
type DedupStage struct {
notifies provider.Notifies
recv *nflogpb.Receiver
}
// NewDedupStage wraps a DedupStage that runs against the given NotifyInfo provider.
func NewDedupStage(notifies provider.Notifies) *DedupStage {
return &DedupStage{notifies}
func NewDedupStage(notifies provider.Notifies, recv *nflogpb.Receiver) *DedupStage {
return &DedupStage{
notifies: notifies,
recv: recv,
}
}
// hasUpdates checks an alert against the last notification that was made
@ -456,11 +439,6 @@ func (n *DedupStage) hasUpdate(alert *types.Alert, last *types.NotificationInfo,
// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
name, ok := ReceiverName(ctx)
if !ok {
return nil, fmt.Errorf("notifier name missing")
}
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return nil, fmt.Errorf("repeat interval missing")
@ -476,7 +454,7 @@ func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types
fps = append(fps, a.Fingerprint())
}
notifyInfo, err := n.notifies.Get(name, fps...)
notifyInfo, err := n.notifies.Get(n.recv.String(), fps...)
if err != nil {
return nil, err
}
@ -542,23 +520,19 @@ func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.
// passed alerts should have already been sent to the receivers.
type SetNotifiesStage struct {
notifies provider.Notifies
recv *nflogpb.Receiver
}
// NewSetNotifiesStage returns a new instance of a SetNotifiesStage.
func NewSetNotifiesStage(notifies provider.Notifies) *SetNotifiesStage {
func NewSetNotifiesStage(notifies provider.Notifies, recv *nflogpb.Receiver) *SetNotifiesStage {
return &SetNotifiesStage{
notifies: notifies,
recv: recv,
}
}
// Exec implements the Stage interface.
func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
name, ok := ReceiverName(ctx)
if !ok {
return nil, fmt.Errorf("notifier name missing")
}
log.Errorln(name)
now, ok := Now(ctx)
if !ok {
return nil, fmt.Errorf("now time missing")
@ -569,7 +543,7 @@ func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*
for _, a := range alerts {
newNotifies = append(newNotifies, &types.NotificationInfo{
Alert: a.Fingerprint(),
Receiver: name,
Receiver: n.recv.String(),
Resolved: a.Resolved(),
Timestamp: now,
})

View File

@ -22,6 +22,7 @@ import (
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/types"
"github.com/satori/go.uuid"
)
@ -32,7 +33,7 @@ func (s failStage) Exec(ctx context.Context, as ...*types.Alert) ([]*types.Alert
return nil, fmt.Errorf("some error")
}
func TestDedupingNotifierHasUpdate(t *testing.T) {
func TestDedupStageHasUpdate(t *testing.T) {
var (
n = &DedupStage{}
now = time.Now()
@ -228,15 +229,15 @@ func TestRoutingStage(t *testing.T) {
}
}
func TestDedupStage(t *testing.T) {
func TestSetNotifiesStage(t *testing.T) {
var (
notifies = newTestInfos()
stage = NewSetNotifiesStage(notifies)
recv = &nflogpb.Receiver{GroupName: "name"}
stage = NewSetNotifiesStage(notifies, recv)
ctx = context.Background()
)
now := time.Now()
ctx = WithReceiverName(ctx, "name")
ctx = WithRepeatInterval(ctx, time.Duration(100*time.Minute))
ctx = WithNow(ctx, now)
@ -259,7 +260,7 @@ func TestDedupStage(t *testing.T) {
nil,
{
Alert: alerts[1].Fingerprint(),
Receiver: "name",
Receiver: recv.String(),
Resolved: false,
Timestamp: now.Add(-10 * time.Minute),
},
@ -274,7 +275,7 @@ func TestDedupStage(t *testing.T) {
t.Fatalf("Exec failed: %s", err)
}
nsCur, err := notifies.Get("name", alerts[0].Fingerprint(), alerts[1].Fingerprint())
nsCur, err := notifies.Get(recv.String(), alerts[0].Fingerprint(), alerts[1].Fingerprint())
if err != nil {
t.Fatalf("Error getting notifies: %s", err)
}
@ -282,13 +283,13 @@ func TestDedupStage(t *testing.T) {
nsAfter := []*types.NotificationInfo{
{
Alert: alerts[0].Fingerprint(),
Receiver: "name",
Receiver: recv.String(),
Resolved: false,
Timestamp: now,
},
{
Alert: alerts[1].Fingerprint(),
Receiver: "name",
Receiver: recv.String(),
Resolved: true,
Timestamp: now,
},