diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index f6718104..338c7d2d 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -183,6 +183,14 @@ func main() { log.Fatal(err) } + waitFunc := meshWait(mrouter, 5*time.Second) + timeoutFunc := func(d time.Duration) time.Duration { + if d < notify.MinTimeout { + d = notify.MinTimeout + } + return d + waitFunc() + } + reload := func() (err error) { log.With("file", *configFile).Infof("Loading configuration file") defer func() { @@ -215,13 +223,13 @@ func main() { pipeline = notify.BuildPipeline( conf.Receivers, tmpl, - meshWait(mrouter, 5*time.Second), + waitFunc, inhibitor, silences, notificationLog, marker, ) - disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker) + disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc) go disp.Run() go inhibitor.Run() diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index d9253942..e504922a 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -22,7 +22,8 @@ type Dispatcher struct { alerts provider.Alerts stage notify.Stage - marker types.Marker + marker types.Marker + timeout func(time.Duration) time.Duration aggrGroups map[*Route]map[model.Fingerprint]*aggrGroup mtx sync.RWMutex @@ -35,13 +36,20 @@ type Dispatcher struct { } // NewDispatcher returns a new Dispatcher. -func NewDispatcher(ap provider.Alerts, r *Route, s notify.Stage, mk types.Marker) *Dispatcher { +func NewDispatcher( + ap provider.Alerts, + r *Route, + s notify.Stage, + mk types.Marker, + to func(time.Duration) time.Duration, +) *Dispatcher { disp := &Dispatcher{ - alerts: ap, - stage: s, - route: r, - marker: mk, - log: log.With("component", "dispatcher"), + alerts: ap, + stage: s, + route: r, + marker: mk, + timeout: to, + log: log.With("component", "dispatcher"), } return disp } @@ -229,7 +237,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { // If the group does not exist, create it. ag, ok := groups[fp] if !ok { - ag = newAggrGroup(d.ctx, group, &route.RouteOpts) + ag = newAggrGroup(d.ctx, group, &route.RouteOpts, d.timeout) groups[fp] = ag go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { @@ -253,10 +261,11 @@ type aggrGroup struct { routeFP model.Fingerprint log log.Logger - ctx context.Context - cancel func() - done chan struct{} - next *time.Timer + ctx context.Context + cancel func() + done chan struct{} + next *time.Timer + timeout func(time.Duration) time.Duration mtx sync.RWMutex alerts map[model.Fingerprint]*types.Alert @@ -264,11 +273,15 @@ type aggrGroup struct { } // newAggrGroup returns a new aggregation group. -func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts) *aggrGroup { +func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts, to func(time.Duration) time.Duration) *aggrGroup { + if to == nil { + to = func(d time.Duration) time.Duration { return d } + } ag := &aggrGroup{ - labels: labels, - opts: opts, - alerts: map[model.Fingerprint]*types.Alert{}, + labels: labels, + opts: opts, + timeout: to, + alerts: map[model.Fingerprint]*types.Alert{}, } ag.ctx, ag.cancel = context.WithCancel(ctx) @@ -302,18 +315,12 @@ func (ag *aggrGroup) run(nf notifyFunc) { defer close(ag.done) defer ag.next.Stop() - timeout := ag.opts.GroupInterval - - if timeout < notify.MinTimeout { - timeout = notify.MinTimeout - } - for { select { case now := <-ag.next.C: // Give the notifcations time until the next flush to // finish before terminating them. - ctx, cancel := context.WithTimeout(ag.ctx, timeout) + ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval)) // The now time we retrieve from the ticker is the only reliable // point of time reference for the subsequent notification pipeline. diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 59df9f36..b1f0597a 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -98,7 +98,7 @@ func TestAggrGroup(t *testing.T) { } // Test regular situation where we wait for group_wait to send out alerts. - ag := newAggrGroup(context.Background(), lset, opts) + ag := newAggrGroup(context.Background(), lset, opts, nil) go ag.run(ntfy) ag.insert(a1) @@ -146,7 +146,7 @@ func TestAggrGroup(t *testing.T) { // immediate flushing. // Finally, set all alerts to be resolved. After successful notify the aggregation group // should empty itself. - ag = newAggrGroup(context.Background(), lset, opts) + ag = newAggrGroup(context.Background(), lset, opts, nil) go ag.run(ntfy) ag.insert(a1)