Merge pull request #489 from prometheus/timeout

*: consider mesh wait in notification timeouts
This commit is contained in:
Fabian Reinartz 2016-09-06 14:17:53 +02:00 committed by GitHub
commit bf6d47934e
3 changed files with 42 additions and 27 deletions

View File

@ -180,6 +180,14 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
waitFunc := meshWait(mrouter, 5*time.Second)
timeoutFunc := func(d time.Duration) time.Duration {
if d < notify.MinTimeout {
d = notify.MinTimeout
}
return d + waitFunc()
}
reload := func() (err error) { reload := func() (err error) {
log.With("file", *configFile).Infof("Loading configuration file") log.With("file", *configFile).Infof("Loading configuration file")
defer func() { defer func() {
@ -212,13 +220,13 @@ func main() {
pipeline = notify.BuildPipeline( pipeline = notify.BuildPipeline(
conf.Receivers, conf.Receivers,
tmpl, tmpl,
meshWait(mrouter, 5*time.Second), waitFunc,
inhibitor, inhibitor,
silences, silences,
notificationLog, notificationLog,
marker, marker,
) )
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker) disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc)
go disp.Run() go disp.Run()
go inhibitor.Run() go inhibitor.Run()

View File

@ -23,6 +23,7 @@ type Dispatcher struct {
stage notify.Stage stage notify.Stage
marker types.Marker marker types.Marker
timeout func(time.Duration) time.Duration
aggrGroups map[*Route]map[model.Fingerprint]*aggrGroup aggrGroups map[*Route]map[model.Fingerprint]*aggrGroup
mtx sync.RWMutex mtx sync.RWMutex
@ -35,12 +36,19 @@ type Dispatcher struct {
} }
// NewDispatcher returns a new Dispatcher. // NewDispatcher returns a new Dispatcher.
func NewDispatcher(ap provider.Alerts, r *Route, s notify.Stage, mk types.Marker) *Dispatcher { func NewDispatcher(
ap provider.Alerts,
r *Route,
s notify.Stage,
mk types.Marker,
to func(time.Duration) time.Duration,
) *Dispatcher {
disp := &Dispatcher{ disp := &Dispatcher{
alerts: ap, alerts: ap,
stage: s, stage: s,
route: r, route: r,
marker: mk, marker: mk,
timeout: to,
log: log.With("component", "dispatcher"), log: log.With("component", "dispatcher"),
} }
return disp return disp
@ -229,7 +237,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// If the group does not exist, create it. // If the group does not exist, create it.
ag, ok := groups[fp] ag, ok := groups[fp]
if !ok { if !ok {
ag = newAggrGroup(d.ctx, group, &route.RouteOpts) ag = newAggrGroup(d.ctx, group, &route.RouteOpts, d.timeout)
groups[fp] = ag groups[fp] = ag
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
@ -257,6 +265,7 @@ type aggrGroup struct {
cancel func() cancel func()
done chan struct{} done chan struct{}
next *time.Timer next *time.Timer
timeout func(time.Duration) time.Duration
mtx sync.RWMutex mtx sync.RWMutex
alerts map[model.Fingerprint]*types.Alert alerts map[model.Fingerprint]*types.Alert
@ -264,10 +273,14 @@ type aggrGroup struct {
} }
// newAggrGroup returns a new aggregation group. // newAggrGroup returns a new aggregation group.
func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts) *aggrGroup { func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts, to func(time.Duration) time.Duration) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
ag := &aggrGroup{ ag := &aggrGroup{
labels: labels, labels: labels,
opts: opts, opts: opts,
timeout: to,
alerts: map[model.Fingerprint]*types.Alert{}, alerts: map[model.Fingerprint]*types.Alert{},
} }
ag.ctx, ag.cancel = context.WithCancel(ctx) ag.ctx, ag.cancel = context.WithCancel(ctx)
@ -302,18 +315,12 @@ func (ag *aggrGroup) run(nf notifyFunc) {
defer close(ag.done) defer close(ag.done)
defer ag.next.Stop() defer ag.next.Stop()
timeout := ag.opts.GroupInterval
if timeout < notify.MinTimeout {
timeout = notify.MinTimeout
}
for { for {
select { select {
case now := <-ag.next.C: case now := <-ag.next.C:
// Give the notifcations time until the next flush to // Give the notifcations time until the next flush to
// finish before terminating them. // finish before terminating them.
ctx, cancel := context.WithTimeout(ag.ctx, timeout) ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
// The now time we retrieve from the ticker is the only reliable // The now time we retrieve from the ticker is the only reliable
// point of time reference for the subsequent notification pipeline. // point of time reference for the subsequent notification pipeline.

View File

@ -98,7 +98,7 @@ func TestAggrGroup(t *testing.T) {
} }
// Test regular situation where we wait for group_wait to send out alerts. // Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, opts) ag := newAggrGroup(context.Background(), lset, opts, nil)
go ag.run(ntfy) go ag.run(ntfy)
ag.insert(a1) ag.insert(a1)
@ -146,7 +146,7 @@ func TestAggrGroup(t *testing.T) {
// immediate flushing. // immediate flushing.
// Finally, set all alerts to be resolved. After successful notify the aggregation group // Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself. // should empty itself.
ag = newAggrGroup(context.Background(), lset, opts) ag = newAggrGroup(context.Background(), lset, opts, nil)
go ag.run(ntfy) go ag.run(ntfy)
ag.insert(a1) ag.insert(a1)