improve dispatcher code and fix concurrency

This commit is contained in:
Fabian Reinartz 2015-07-02 20:48:21 +02:00
parent 2bb6233dd8
commit 41c9dcc383
1 changed files with 44 additions and 35 deletions

View File

@ -36,22 +36,17 @@ func (d *Dispatcher) notify(name string, alerts ...*Alert) {
func (d *Dispatcher) Run() {
for {
log.Infoln("waiting")
alert := d.state.Alert().Next()
log.Infoln("received:", alert)
conf, err := d.state.Config().Get()
if err != nil {
log.Error(err)
continue
}
log.Infoln("retrieved config")
for _, m := range conf.Routes.Match(alert.Labels) {
d.processAlert(alert, m)
}
log.Infoln("processing done")
}
}
@ -68,20 +63,16 @@ func (d *Dispatcher) processAlert(alert *Alert, opts *RouteOpts) {
ag, ok := d.aggrGroups[fp]
if !ok {
ag = &aggrGroup{
dispatcher: d,
labels: group,
wait: opts.GroupWait(),
waitTimer: time.NewTimer(time.Hour),
notify: opts.SendTo,
}
ag = newAggrGroup(group, opts)
d.aggrGroups[fp] = ag
go ag.run()
}
ag.insert(alert)
if ag.empty() {
ag.stop()
delete(d.aggrGroups, fp)
}
}
// aggrGroup aggregates alerts into groups based on
@ -101,18 +92,31 @@ type aggrGroup struct {
mtx sync.RWMutex
}
func (ag *aggrGroup) run() {
// newAggrGroup returns a new aggregation group and starts background processing
// that sends notifications about the contained alerts.
func newAggrGroup(labels model.LabelSet, opts *RouteOpts) *aggrGroup {
ag := &aggrGroup{
dispatcher: d,
ag.waitTimer.Stop()
if ag.wait > 0 {
ag.waitTimer.Reset(ag.wait)
labels: group,
wait: opts.GroupWait(),
waitTimer: time.NewTimer(opts.GroupWait()),
notify: opts.SendTo,
done: make(chan bool),
}
if ag.wait == 0 {
ag.waitTimer.Stop()
}
go ag.run()
return ag
}
func (ag *aggrGroup) run() {
for {
select {
case <-ag.waitTimer.C:
log.Infoln("timer flush")
ag.flush()
case <-ag.done:
return
@ -129,12 +133,18 @@ func (ag *aggrGroup) fingerprint() model.Fingerprint {
return ag.labels.Fingerprint()
}
// insert the alert into the aggregation group. If the aggregation group
// is empty afterwards, true is returned.
func (ag *aggrGroup) insert(alert *Alert) {
log.Infoln("insert:", alert)
ag.mtx.Lock()
ag.alertsNew = append(ag.alertsNew, alert)
sort.Sort(ag.alertsNew)
ag.mtx.Unlock()
// Immediately trigger a flush if the wait duration for this
// alert is already over.
if alert.Timestamp.Add(ag.wait).Before(time.Now()) {
ag.flush()
}
@ -144,9 +154,15 @@ func (ag *aggrGroup) insert(alert *Alert) {
}
}
func (ag *aggrGroup) flush() {
log.Infoln("flush")
func (ag *aggrGroup) empty() bool {
ag.mtx.RLock()
defer ag.mtx.RUnlock()
return len(ag.alertsNew)+len(ag.alertsOld) == 0
}
// flush sends notifications for all new alerts.
func (ag *aggrGroup) flush() {
ag.mtx.Lock()
defer ag.mtx.Unlock()
@ -156,16 +172,9 @@ func (ag *aggrGroup) flush() {
ag.alertsNew = ag.alertsNew[:0]
}
// alertTimeline is a list of alerts sorted by their timestamp.
type alertTimeline []*Alert
func (at alertTimeline) Len() int {
return len(at)
}
func (at alertTimeline) Less(i, j int) bool {
return at[i].Timestamp.Before(at[j].Timestamp)
}
func (at alertTimeline) Swap(i, j int) {
at[i], at[j] = at[j], at[i]
}
func (at alertTimeline) Len() int { return len(at) }
func (at alertTimeline) Less(i, j int) bool { return at[i].Timestamp.Before(at[j].Timestamp) }
func (at alertTimeline) Swap(i, j int) { at[i], at[j] = at[j], at[i] }