Add mute time stage and pipeline

Signed-off-by: Ben Ridley <benridley29@gmail.com>
This commit is contained in:
ben 2020-10-06 21:07:42 +11:00 committed by Ben Ridley
parent 44d8cb43d2
commit d1f5e07909
3 changed files with 67 additions and 1 deletions

View File

@ -28,6 +28,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/benridley/gotime"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -413,6 +414,12 @@ func run() int {
integrationsNum += len(integrations) integrationsNum += len(integrations)
} }
// Build the map of time interval names to mute time definitions
muteTimes := make(map[string][]gotime.TimeInterval)
for _, ti := range conf.MuteTimeIntervals {
muteTimes[ti.Name] = ti.TimeIntervals
}
inhibitor.Stop() inhibitor.Stop()
disp.Stop() disp.Stop()
@ -423,6 +430,7 @@ func run() int {
waitFunc, waitFunc,
inhibitor, inhibitor,
silencer, silencer,
muteTimes,
notificationLog, notificationLog,
peer, peer,
) )

View File

@ -404,6 +404,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithGroupLabels(ctx, ag.labels) ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver) ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimes(ctx, ag.opts.MuteTimes)
// Wait the configured interval before calling flush again. // Wait the configured interval before calling flush again.
ag.mtx.Lock() ag.mtx.Lock()

View File

@ -20,6 +20,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/benridley/gotime"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
"github.com/cespare/xxhash" "github.com/cespare/xxhash"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
@ -108,6 +109,7 @@ const (
keyFiringAlerts keyFiringAlerts
keyResolvedAlerts keyResolvedAlerts
keyNow keyNow
keyMuteTimes
) )
// WithReceiverName populates a context with a receiver name. // WithReceiverName populates a context with a receiver name.
@ -145,6 +147,11 @@ func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context {
return context.WithValue(ctx, keyRepeatInterval, t) return context.WithValue(ctx, keyRepeatInterval, t)
} }
// WithMuteTimes populates a context with a slice of mute time names.
func WithMuteTimes(ctx context.Context, mt []string) context.Context {
return context.WithValue(ctx, keyMuteTimes, mt)
}
// RepeatInterval extracts a repeat interval from the context. Iff none exists, the // RepeatInterval extracts a repeat interval from the context. Iff none exists, the
// second argument is false. // second argument is false.
func RepeatInterval(ctx context.Context) (time.Duration, bool) { func RepeatInterval(ctx context.Context) (time.Duration, bool) {
@ -194,6 +201,13 @@ func ResolvedAlerts(ctx context.Context) ([]uint64, bool) {
return v, ok return v, ok
} }
// MuteTimeNames extracts a slice of mute time names from the context. Iff none exists, the
// second argument is false.
func MuteTimeNames(ctx context.Context) ([]string, bool) {
v, ok := ctx.Value(keyMuteTimes).([]string)
return v, ok
}
// A Stage processes alerts under the constraints of the given context. // A Stage processes alerts under the constraints of the given context.
type Stage interface { type Stage interface {
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
@ -289,6 +303,7 @@ func (pb *PipelineBuilder) New(
wait func() time.Duration, wait func() time.Duration,
inhibitor *inhibit.Inhibitor, inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer, silencer *silence.Silencer,
muteTimes map[string][]gotime.TimeInterval,
notificationLog NotificationLog, notificationLog NotificationLog,
peer *cluster.Peer, peer *cluster.Peer,
) RoutingStage { ) RoutingStage {
@ -297,10 +312,11 @@ func (pb *PipelineBuilder) New(
ms := NewGossipSettleStage(peer) ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor) is := NewMuteStage(inhibitor)
ss := NewMuteStage(silencer) ss := NewMuteStage(silencer)
mts := NewTimeMuteStage(muteTimes)
for name := range receivers { for name := range receivers {
st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
rs[name] = MultiStage{ms, is, ss, st} rs[name] = MultiStage{ms, is, mts, ss, st}
} }
return rs return rs
} }
@ -755,3 +771,44 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ
return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved) return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
} }
type TimeMuteStage struct {
muteTimes map[string][]gotime.TimeInterval
}
func NewTimeMuteStage(mt map[string][]gotime.TimeInterval) *TimeMuteStage {
return &TimeMuteStage{mt}
}
// Exec implements the stage interface for TimeMuteStage
// TimeMuteStage is responsible for muting alerts whose route is not in an active time
func (mts TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
muteTimeNames, ok := MuteTimeNames(ctx)
if !ok {
return ctx, alerts, nil
}
now, ok := Now(ctx)
if !ok {
return ctx, alerts, errors.New("missing now timestamp")
}
muted := false
for _, mtName := range muteTimeNames {
mt, ok := mts.muteTimes[mtName]
if !ok {
return ctx, alerts, errors.Errorf("mute time %s doesn't exist in config", mtName)
}
for _, ti := range mt {
if ti.ContainsTime(now) {
muted = true
}
}
}
// If the current time is inside a mute time, all alerts are removed from the pipeline
if muted {
lvl := level.Warn(l)
lvl.Log("Mail not sent due to being outside time interval")
return ctx, nil, nil
}
return ctx, alerts, nil
}