From d2a556b269753321fc8f51ca155b9030e28a3753 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 17 Aug 2016 10:54:17 +0200 Subject: [PATCH] notify: include context in Stage interface This adds context.Context to the return arguments of a Stage. This is necessary to propagate modified contexts. --- dispatch/dispatch.go | 2 +- notify/notify.go | 90 ++++++++++++++++++------------------------- notify/notify_test.go | 33 +++++++++------- 3 files changed, 57 insertions(+), 68 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 697936a7..9e952506 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -234,7 +234,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { groups[fp] = ag go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { - _, err := d.stage.Exec(ctx, alerts...) + _, _, err := d.stage.Exec(ctx, alerts...) if err != nil { log.Errorf("Notify for %d alerts failed: %s", len(alerts), err) } diff --git a/notify/notify.go b/notify/notify.go index 48758a18..704b5f0f 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -146,14 +146,14 @@ func Now(ctx context.Context) (time.Time, bool) { // A Stage processes alerts under the constraints of the given context. type Stage interface { - Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) + Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) } // StageFunc wraps a function to represent a Stage. -type StageFunc func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) +type StageFunc func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) // Exec implements Stage interface. -func (f StageFunc) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (f StageFunc) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { return f(ctx, alerts...) } @@ -204,15 +204,15 @@ func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time. type RoutingStage map[string]Stage // Exec implements the Stage interface. -func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { receiver, ok := ReceiverName(ctx) if !ok { - return nil, fmt.Errorf("receiver missing") + return ctx, nil, fmt.Errorf("receiver missing") } s, ok := rs[receiver] if !ok { - return nil, fmt.Errorf("stage for receiver missing") + return ctx, nil, fmt.Errorf("stage for receiver missing") } return s.Exec(ctx, alerts...) @@ -222,19 +222,19 @@ func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*typ type MultiStage []Stage // Exec implements the Stage interface. -func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var err error for _, s := range ms { if len(alerts) == 0 { - return nil, nil + return ctx, nil, nil } - alerts, err = s.Exec(ctx, alerts...) + ctx, alerts, err = s.Exec(ctx, alerts...) if err != nil { - return nil, err + return ctx, nil, err } } - return alerts, nil + return ctx, alerts, nil } // FanoutStage executes its stages concurrently @@ -242,7 +242,7 @@ type FanoutStage []Stage // Exec attempts to execute all stages concurrently and discards the results. // It returns its input alerts and a types.MultiError if one or more stages fail. -func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var ( wg sync.WaitGroup me types.MultiError @@ -251,7 +251,7 @@ func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*type for _, s := range fs { go func(s Stage) { - if _, err := s.Exec(ctx, alerts...); err != nil { + if _, _, err := s.Exec(ctx, alerts...); err != nil { me.Add(err) log.Errorf("Error on notify: %s", err) } @@ -261,25 +261,9 @@ func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*type wg.Wait() if me.Len() > 0 { - return alerts, &me + return ctx, alerts, &me } - return alerts, nil -} - -// LogStage logs the passed alerts with a given logger. -type LogStage struct { - log log.Logger -} - -func NewLogStage(log log.Logger) *LogStage { - return &LogStage{log: log} -} - -// Exec implements the Stage interface. -func (l *LogStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { - l.log.Debugf("notify %v", alerts) - - return alerts, nil + return ctx, alerts, nil } // InhibitStage filters alerts through an inhibition muter. @@ -297,7 +281,7 @@ func NewInhibitStage(m types.Muter, mk types.Marker) *InhibitStage { } // Exec implements the Stage interface. -func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var filtered []*types.Alert for _, a := range alerts { ok := n.marker.Inhibited(a.Fingerprint()) @@ -311,7 +295,7 @@ func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*typ } } - return filtered, nil + return ctx, filtered, nil } // SilenceStage filters alerts through a silence muter. @@ -329,7 +313,7 @@ func NewSilenceStage(m types.Muter, mk types.Marker) *SilenceStage { } // Exec implements the Stage interface. -func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var filtered []*types.Alert for _, a := range alerts { _, ok := n.marker.Silenced(a.Fingerprint()) @@ -343,7 +327,7 @@ func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*typ } } - return filtered, nil + return ctx, filtered, nil } // WaitStage waits for a certain amount of time before continuing or until the @@ -360,13 +344,13 @@ func NewWaitStage(wait func() time.Duration) *WaitStage { } // Exec implements the Stage interface. -func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { select { case <-time.After(ws.wait()): case <-ctx.Done(): - return nil, ctx.Err() + return ctx, nil, ctx.Err() } - return alerts, nil + return ctx, alerts, nil } // FilterResolvedStage filters alerts based on a given notifierConfig. Either @@ -383,7 +367,7 @@ func NewFilterResolvedStage(conf notifierConfig) *FilterResolvedStage { } // Exec implements the Stage interface. -func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var res []*types.Alert if fr.conf.SendResolved() { @@ -396,7 +380,7 @@ func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert) } } - return res, nil + return ctx, res, nil } // DedupStage filters alerts. @@ -438,15 +422,15 @@ func (n *DedupStage) hasUpdate(alert *types.Alert, last *types.NotificationInfo, } // Exec implements the Stage interface. -func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { repeatInterval, ok := RepeatInterval(ctx) if !ok { - return nil, fmt.Errorf("repeat interval missing") + return ctx, nil, fmt.Errorf("repeat interval missing") } now, ok := Now(ctx) if !ok { - return nil, fmt.Errorf("now time missing") + return ctx, nil, fmt.Errorf("now time missing") } var fps []model.Fingerprint @@ -456,18 +440,18 @@ func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types notifyInfo, err := n.notifies.Get(n.recv.String(), fps...) if err != nil { - return nil, err + return ctx, nil, err } // If we have to notify about any of the alerts, we send a notification // for the entire batch. for i, alert := range alerts { if n.hasUpdate(alert, notifyInfo[i], now, repeatInterval) { - return alerts, nil + return ctx, alerts, nil } } - return nil, nil + return ctx, nil, nil } // RetryStage notifies via passed integration with exponential backoff until it @@ -484,7 +468,7 @@ func NewRetryStage(i Integration) *RetryStage { } // Exec implements the Stage interface. -func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var ( i = 0 b = backoff.NewExponentialBackOff() @@ -497,7 +481,7 @@ func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types. // Always check the context first to not notify again. select { case <-ctx.Done(): - return nil, ctx.Err() + return ctx, nil, ctx.Err() default: } @@ -508,10 +492,10 @@ func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types. log.Warnf("Notify attempt %d failed: %s", i, err) } else { numNotifications.WithLabelValues(r.integration.name).Inc() - return alerts, nil + return ctx, alerts, nil } case <-ctx.Done(): - return nil, ctx.Err() + return ctx, nil, ctx.Err() } } } @@ -532,10 +516,10 @@ func NewSetNotifiesStage(notifies provider.Notifies, recv *nflogpb.Receiver) *Se } // Exec implements the Stage interface. -func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { +func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { now, ok := Now(ctx) if !ok { - return nil, fmt.Errorf("now time missing") + return ctx, nil, fmt.Errorf("now time missing") } var newNotifies []*types.NotificationInfo @@ -549,5 +533,5 @@ func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]* }) } - return alerts, n.notifies.Set(newNotifies...) + return ctx, alerts, n.notifies.Set(newNotifies...) } diff --git a/notify/notify_test.go b/notify/notify_test.go index 3304d936..975cf66e 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -29,8 +29,8 @@ import ( type failStage struct{} -func (s failStage) Exec(ctx context.Context, as ...*types.Alert) ([]*types.Alert, error) { - return nil, fmt.Errorf("some error") +func (s failStage) Exec(ctx context.Context, as ...*types.Alert) (context.Context, []*types.Alert, error) { + return ctx, nil, fmt.Errorf("some error") } func TestDedupStageHasUpdate(t *testing.T) { @@ -164,21 +164,26 @@ func TestMultiStage(t *testing.T) { ) stage := MultiStage{ - StageFunc(func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { + StageFunc(func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of MultiStage") } - return alerts2, nil + ctx = context.WithValue(ctx, "key", "value") + return ctx, alerts2, nil }), - StageFunc(func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { + StageFunc(func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts2) { t.Fatal("Input not equal to output of previous stage") } - return alerts3, nil + v, ok := ctx.Value("key").(string) + if !ok || v != "value" { + t.Fatalf("Expected value %q for key %q but got %q", "value", "key", v) + } + return ctx, alerts3, nil }), } - alerts, err := stage.Exec(context.Background(), alerts1...) + _, alerts, err := stage.Exec(context.Background(), alerts1...) if err != nil { t.Fatalf("Exec failed: %s", err) } @@ -195,7 +200,7 @@ func TestMultiStageFailure(t *testing.T) { stage = MultiStage{s1} ) - _, err := stage.Exec(ctx, nil) + _, _, err := stage.Exec(ctx, nil) if err.Error() != "some error" { t.Fatal("Errors were not propagated correctly by MultiStage") } @@ -208,18 +213,18 @@ func TestRoutingStage(t *testing.T) { ) stage := RoutingStage{ - "name": StageFunc(func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) { + "name": StageFunc(func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of RoutingStage") } - return alerts2, nil + return ctx, alerts2, nil }), "not": failStage{}, } ctx := WithReceiverName(context.Background(), "name") - alerts, err := stage.Exec(ctx, alerts1...) + _, alerts, err := stage.Exec(ctx, alerts1...) if err != nil { t.Fatalf("Exec failed: %s", err) } @@ -270,7 +275,7 @@ func TestSetNotifiesStage(t *testing.T) { t.Fatalf("Setting notifies failed: %s", err) } - _, err := stage.Exec(ctx, alerts...) + _, _, err := stage.Exec(ctx, alerts...) if err != nil { t.Fatalf("Exec failed: %s", err) } @@ -350,7 +355,7 @@ func TestSilenceStage(t *testing.T) { // the WasSilenced flag set to true afterwards. marker.SetSilenced(inAlerts[1].Fingerprint(), uuid.NewV4()) - alerts, err := silencer.Exec(nil, inAlerts...) + _, alerts, err := silencer.Exec(nil, inAlerts...) if err != nil { t.Fatalf("Exec failed: %s", err) } @@ -406,7 +411,7 @@ func TestInhibitStage(t *testing.T) { // the WasInhibited flag set to true afterwards. marker.SetInhibited(inAlerts[1].Fingerprint(), true) - alerts, err := inhibitor.Exec(nil, inAlerts...) + _, alerts, err := inhibitor.Exec(nil, inAlerts...) if err != nil { t.Fatalf("Exec failed: %s", err) }