diff --git a/manager/dispatch.go b/manager/dispatch.go index 0054e34a..f49dbbec 100644 --- a/manager/dispatch.go +++ b/manager/dispatch.go @@ -36,22 +36,17 @@ func (d *Dispatcher) notify(name string, alerts ...*Alert) { func (d *Dispatcher) Run() { for { - log.Infoln("waiting") alert := d.state.Alert().Next() - log.Infoln("received:", alert) - conf, err := d.state.Config().Get() if err != nil { log.Error(err) continue } - log.Infoln("retrieved config") for _, m := range conf.Routes.Match(alert.Labels) { d.processAlert(alert, m) } - log.Infoln("processing done") } } @@ -68,20 +63,16 @@ func (d *Dispatcher) processAlert(alert *Alert, opts *RouteOpts) { ag, ok := d.aggrGroups[fp] if !ok { - ag = &aggrGroup{ - dispatcher: d, - - labels: group, - wait: opts.GroupWait(), - waitTimer: time.NewTimer(time.Hour), - notify: opts.SendTo, - } + ag = newAggrGroup(group, opts) d.aggrGroups[fp] = ag - - go ag.run() } ag.insert(alert) + + if ag.empty() { + ag.stop() + delete(d.aggrGroups, fp) + } } // aggrGroup aggregates alerts into groups based on @@ -101,18 +92,31 @@ type aggrGroup struct { mtx sync.RWMutex } -func (ag *aggrGroup) run() { +// newAggrGroup returns a new aggregation group and starts background processing +// that sends notifications about the contained alerts. +func newAggrGroup(labels model.LabelSet, opts *RouteOpts) *aggrGroup { + ag := &aggrGroup{ + dispatcher: d, - ag.waitTimer.Stop() - - if ag.wait > 0 { - ag.waitTimer.Reset(ag.wait) + labels: group, + wait: opts.GroupWait(), + waitTimer: time.NewTimer(opts.GroupWait()), + notify: opts.SendTo, + done: make(chan bool), + } + if ag.wait == 0 { + ag.waitTimer.Stop() } + go ag.run() + + return ag +} + +func (ag *aggrGroup) run() { for { select { case <-ag.waitTimer.C: - log.Infoln("timer flush") ag.flush() case <-ag.done: return @@ -129,12 +133,18 @@ func (ag *aggrGroup) fingerprint() model.Fingerprint { return ag.labels.Fingerprint() } +// insert the alert into the aggregation group. If the aggregation group +// is empty afterwards, true is returned. func (ag *aggrGroup) insert(alert *Alert) { - log.Infoln("insert:", alert) + ag.mtx.Lock() ag.alertsNew = append(ag.alertsNew, alert) sort.Sort(ag.alertsNew) + ag.mtx.Unlock() + + // Immediately trigger a flush if the wait duration for this + // alert is already over. if alert.Timestamp.Add(ag.wait).Before(time.Now()) { ag.flush() } @@ -144,9 +154,15 @@ func (ag *aggrGroup) insert(alert *Alert) { } } -func (ag *aggrGroup) flush() { - log.Infoln("flush") +func (ag *aggrGroup) empty() bool { + ag.mtx.RLock() + defer ag.mtx.RUnlock() + return len(ag.alertsNew)+len(ag.alertsOld) == 0 +} + +// flush sends notifications for all new alerts. +func (ag *aggrGroup) flush() { ag.mtx.Lock() defer ag.mtx.Unlock() @@ -156,16 +172,9 @@ func (ag *aggrGroup) flush() { ag.alertsNew = ag.alertsNew[:0] } +// alertTimeline is a list of alerts sorted by their timestamp. type alertTimeline []*Alert -func (at alertTimeline) Len() int { - return len(at) -} - -func (at alertTimeline) Less(i, j int) bool { - return at[i].Timestamp.Before(at[j].Timestamp) -} - -func (at alertTimeline) Swap(i, j int) { - at[i], at[j] = at[j], at[i] -} +func (at alertTimeline) Len() int { return len(at) } +func (at alertTimeline) Less(i, j int) bool { return at[i].Timestamp.Before(at[j].Timestamp) } +func (at alertTimeline) Swap(i, j int) { at[i], at[j] = at[j], at[i] }