Merge pull request #464 from prometheus/fabxc-stagectx

notify: include context in Stage interface
This commit is contained in:
Fabian Reinartz 2016-08-18 14:41:17 +02:00 committed by GitHub
commit a52992196b
3 changed files with 57 additions and 68 deletions

View File

@ -234,7 +234,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
groups[fp] = ag
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, err := d.stage.Exec(ctx, alerts...)
_, _, err := d.stage.Exec(ctx, alerts...)
if err != nil {
log.Errorf("Notify for %d alerts failed: %s", len(alerts), err)
}

View File

@ -146,14 +146,14 @@ func Now(ctx context.Context) (time.Time, bool) {
// A Stage processes alerts under the constraints of the given context.
type Stage interface {
Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error)
Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
}
// StageFunc wraps a function to represent a Stage.
type StageFunc func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error)
type StageFunc func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
// Exec implements Stage interface.
func (f StageFunc) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (f StageFunc) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
return f(ctx, alerts...)
}
@ -204,15 +204,15 @@ func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.
type RoutingStage map[string]Stage
// Exec implements the Stage interface.
func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
receiver, ok := ReceiverName(ctx)
if !ok {
return nil, fmt.Errorf("receiver missing")
return ctx, nil, fmt.Errorf("receiver missing")
}
s, ok := rs[receiver]
if !ok {
return nil, fmt.Errorf("stage for receiver missing")
return ctx, nil, fmt.Errorf("stage for receiver missing")
}
return s.Exec(ctx, alerts...)
@ -222,19 +222,19 @@ func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*typ
type MultiStage []Stage
// Exec implements the Stage interface.
func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var err error
for _, s := range ms {
if len(alerts) == 0 {
return nil, nil
return ctx, nil, nil
}
alerts, err = s.Exec(ctx, alerts...)
ctx, alerts, err = s.Exec(ctx, alerts...)
if err != nil {
return nil, err
return ctx, nil, err
}
}
return alerts, nil
return ctx, alerts, nil
}
// FanoutStage executes its stages concurrently
@ -242,7 +242,7 @@ type FanoutStage []Stage
// Exec attempts to execute all stages concurrently and discards the results.
// It returns its input alerts and a types.MultiError if one or more stages fail.
func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
wg sync.WaitGroup
me types.MultiError
@ -251,7 +251,7 @@ func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*type
for _, s := range fs {
go func(s Stage) {
if _, err := s.Exec(ctx, alerts...); err != nil {
if _, _, err := s.Exec(ctx, alerts...); err != nil {
me.Add(err)
log.Errorf("Error on notify: %s", err)
}
@ -261,25 +261,9 @@ func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*type
wg.Wait()
if me.Len() > 0 {
return alerts, &me
return ctx, alerts, &me
}
return alerts, nil
}
// LogStage logs the passed alerts with a given logger.
type LogStage struct {
log log.Logger
}
func NewLogStage(log log.Logger) *LogStage {
return &LogStage{log: log}
}
// Exec implements the Stage interface.
func (l *LogStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
l.log.Debugf("notify %v", alerts)
return alerts, nil
return ctx, alerts, nil
}
// InhibitStage filters alerts through an inhibition muter.
@ -297,7 +281,7 @@ func NewInhibitStage(m types.Muter, mk types.Marker) *InhibitStage {
}
// Exec implements the Stage interface.
func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
ok := n.marker.Inhibited(a.Fingerprint())
@ -311,7 +295,7 @@ func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*typ
}
}
return filtered, nil
return ctx, filtered, nil
}
// SilenceStage filters alerts through a silence muter.
@ -329,7 +313,7 @@ func NewSilenceStage(m types.Muter, mk types.Marker) *SilenceStage {
}
// Exec implements the Stage interface.
func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
_, ok := n.marker.Silenced(a.Fingerprint())
@ -343,7 +327,7 @@ func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*typ
}
}
return filtered, nil
return ctx, filtered, nil
}
// WaitStage waits for a certain amount of time before continuing or until the
@ -360,13 +344,13 @@ func NewWaitStage(wait func() time.Duration) *WaitStage {
}
// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
return nil, ctx.Err()
return ctx, nil, ctx.Err()
}
return alerts, nil
return ctx, alerts, nil
}
// FilterResolvedStage filters alerts based on a given notifierConfig. Either
@ -383,7 +367,7 @@ func NewFilterResolvedStage(conf notifierConfig) *FilterResolvedStage {
}
// Exec implements the Stage interface.
func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var res []*types.Alert
if fr.conf.SendResolved() {
@ -396,7 +380,7 @@ func (fr *FilterResolvedStage) Exec(ctx context.Context, alerts ...*types.Alert)
}
}
return res, nil
return ctx, res, nil
}
// DedupStage filters alerts.
@ -438,15 +422,15 @@ func (n *DedupStage) hasUpdate(alert *types.Alert, last *types.NotificationInfo,
}
// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return nil, fmt.Errorf("repeat interval missing")
return ctx, nil, fmt.Errorf("repeat interval missing")
}
now, ok := Now(ctx)
if !ok {
return nil, fmt.Errorf("now time missing")
return ctx, nil, fmt.Errorf("now time missing")
}
var fps []model.Fingerprint
@ -456,18 +440,18 @@ func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types
notifyInfo, err := n.notifies.Get(n.recv.String(), fps...)
if err != nil {
return nil, err
return ctx, nil, err
}
// If we have to notify about any of the alerts, we send a notification
// for the entire batch.
for i, alert := range alerts {
if n.hasUpdate(alert, notifyInfo[i], now, repeatInterval) {
return alerts, nil
return ctx, alerts, nil
}
}
return nil, nil
return ctx, nil, nil
}
// RetryStage notifies via passed integration with exponential backoff until it
@ -484,7 +468,7 @@ func NewRetryStage(i Integration) *RetryStage {
}
// Exec implements the Stage interface.
func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
i = 0
b = backoff.NewExponentialBackOff()
@ -497,7 +481,7 @@ func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.
// Always check the context first to not notify again.
select {
case <-ctx.Done():
return nil, ctx.Err()
return ctx, nil, ctx.Err()
default:
}
@ -508,10 +492,10 @@ func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.
log.Warnf("Notify attempt %d failed: %s", i, err)
} else {
numNotifications.WithLabelValues(r.integration.name).Inc()
return alerts, nil
return ctx, alerts, nil
}
case <-ctx.Done():
return nil, ctx.Err()
return ctx, nil, ctx.Err()
}
}
}
@ -532,10 +516,10 @@ func NewSetNotifiesStage(notifies provider.Notifies, recv *nflogpb.Receiver) *Se
}
// Exec implements the Stage interface.
func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
now, ok := Now(ctx)
if !ok {
return nil, fmt.Errorf("now time missing")
return ctx, nil, fmt.Errorf("now time missing")
}
var newNotifies []*types.NotificationInfo
@ -549,5 +533,5 @@ func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) ([]*
})
}
return alerts, n.notifies.Set(newNotifies...)
return ctx, alerts, n.notifies.Set(newNotifies...)
}

View File

@ -29,8 +29,8 @@ import (
type failStage struct{}
func (s failStage) Exec(ctx context.Context, as ...*types.Alert) ([]*types.Alert, error) {
return nil, fmt.Errorf("some error")
func (s failStage) Exec(ctx context.Context, as ...*types.Alert) (context.Context, []*types.Alert, error) {
return ctx, nil, fmt.Errorf("some error")
}
func TestDedupStageHasUpdate(t *testing.T) {
@ -164,21 +164,26 @@ func TestMultiStage(t *testing.T) {
)
stage := MultiStage{
StageFunc(func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
StageFunc(func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
if !reflect.DeepEqual(alerts, alerts1) {
t.Fatal("Input not equal to input of MultiStage")
}
return alerts2, nil
ctx = context.WithValue(ctx, "key", "value")
return ctx, alerts2, nil
}),
StageFunc(func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
StageFunc(func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
if !reflect.DeepEqual(alerts, alerts2) {
t.Fatal("Input not equal to output of previous stage")
}
return alerts3, nil
v, ok := ctx.Value("key").(string)
if !ok || v != "value" {
t.Fatalf("Expected value %q for key %q but got %q", "value", "key", v)
}
return ctx, alerts3, nil
}),
}
alerts, err := stage.Exec(context.Background(), alerts1...)
_, alerts, err := stage.Exec(context.Background(), alerts1...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
@ -195,7 +200,7 @@ func TestMultiStageFailure(t *testing.T) {
stage = MultiStage{s1}
)
_, err := stage.Exec(ctx, nil)
_, _, err := stage.Exec(ctx, nil)
if err.Error() != "some error" {
t.Fatal("Errors were not propagated correctly by MultiStage")
}
@ -208,18 +213,18 @@ func TestRoutingStage(t *testing.T) {
)
stage := RoutingStage{
"name": StageFunc(func(ctx context.Context, alerts ...*types.Alert) ([]*types.Alert, error) {
"name": StageFunc(func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
if !reflect.DeepEqual(alerts, alerts1) {
t.Fatal("Input not equal to input of RoutingStage")
}
return alerts2, nil
return ctx, alerts2, nil
}),
"not": failStage{},
}
ctx := WithReceiverName(context.Background(), "name")
alerts, err := stage.Exec(ctx, alerts1...)
_, alerts, err := stage.Exec(ctx, alerts1...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
@ -270,7 +275,7 @@ func TestSetNotifiesStage(t *testing.T) {
t.Fatalf("Setting notifies failed: %s", err)
}
_, err := stage.Exec(ctx, alerts...)
_, _, err := stage.Exec(ctx, alerts...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
@ -350,7 +355,7 @@ func TestSilenceStage(t *testing.T) {
// the WasSilenced flag set to true afterwards.
marker.SetSilenced(inAlerts[1].Fingerprint(), uuid.NewV4())
alerts, err := silencer.Exec(nil, inAlerts...)
_, alerts, err := silencer.Exec(nil, inAlerts...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
@ -406,7 +411,7 @@ func TestInhibitStage(t *testing.T) {
// the WasInhibited flag set to true afterwards.
marker.SetInhibited(inAlerts[1].Fingerprint(), true)
alerts, err := inhibitor.Exec(nil, inAlerts...)
_, alerts, err := inhibitor.Exec(nil, inAlerts...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}