From 575b2257b1f89a556bc9228f9831e45a51755cc5 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 25 Sep 2015 00:15:27 +0200 Subject: [PATCH] Separate concerns from dispatcher and aggregation groups --- manager/dispatch.go | 369 ++++++++++++++++++-------------------------- 1 file changed, 154 insertions(+), 215 deletions(-) diff --git a/manager/dispatch.go b/manager/dispatch.go index 927c0c0e..2f3c6a15 100644 --- a/manager/dispatch.go +++ b/manager/dispatch.go @@ -7,151 +7,70 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/log" + "golang.org/x/net/context" ) -const ResolveTimeout = 35 * time.Second +const ResolveTimeout = 30 * time.Second -// Dispatcher dispatches alerts. It is absed on the alert's data -// rather than the time they arrive. Thus it can recover it's state -// without persistence. +// Dispatcher sorts incoming alerts into aggregation groups and +// assigns the correct notifiers to each. type Dispatcher struct { - state State + routes Routes + alertProvider AlertProvider aggrGroups map[model.Fingerprint]*aggrGroup notifiers map[string]Notifier - mtx sync.RWMutex + mtx sync.RWMutex + done chan struct{} + ctx context.Context + cancel func() } -func NewDispatcher(state State, notifiers []Notifier) *Dispatcher { - disp := &Dispatcher{ - state: state, +// NewDispatcher returns a new Dispatcher. +func NewDispatcher(ctx context.Context, state State, notifier Notifier) *Dispatcher { + d := &Dispatcher{ aggrGroups: map[model.Fingerprint]*aggrGroup{}, - notifiers: map[string]Notifier{}, } + d.ctx, d.cancel = context.WithCancel(ctx) - for _, n := range notifiers { - disp.notifiers[n.Name()] = n - } - - return disp + return d } -func (d *Dispatcher) filter(alerts ...*Alert) ([]*Alert, error) { +// ApplyConfig updates the dispatcher to match the new configuration. +func (d *Dispatcher) ApplyConfig(conf *Config) { + d.mtx.Lock() + defer d.mtx.Unlock() - conf, err := d.state.Config().Get() - if err != nil { - return nil, err + d.routes = conf.Routes + d.notifiers = map[string]Notifier{} + + // TODO(fabxc): build correct notifiers from new conf.NotificationConfigs. + for _, ncfg := range conf.NotificationConfigs { + d.notifiers[ncfg.Name] = &LogNotifier{ncfg.Name} } - notifies, err := d.state.Notify().List() - if err != nil { - return nil, err - } - - log.Infoln("check inhibit") - - var blaAlerts []*Alert - - for _, alert := range alerts { - inhibited := false - for _, ir := range conf.InhibitRules { - log.Infoln(ir, "against", alert) - if !ir.TargetMatchers.Match(alert.Labels) { - continue - } - - for _, n := range notifies { - if !n.LastResolved && ir.SourceMatchers.Match(n.Labels) { - inhibited = true - break - } - } - if inhibited { - break - } - } - if !inhibited { - log.Infoln("not inhibited", alert) - blaAlerts = append(blaAlerts, alert) - } else { - log.Infoln("inhibited", alert) - } - } - - silences, err := d.state.Silence().List() - if err != nil { - return nil, err - } - - var sentAlerts []*Alert - - for _, alert := range alerts { - add := true - // None of the existing silences must match the alert. - for _, sil := range silences { - if sil.Match(alert.Labels) { - add = false - break - } - } - if !add { - continue - } - - // Filter out alerts that have already been sent. - ni, err := d.state.Notify().Get(alert.Fingerprint()) - // Always try to send on error as the safest option. - if err == nil && ni.LastSent.Before(alert.ResolvedAt) && ni.LastResolved == alert.Resolved() { - continue - } - - sentAlerts = append(sentAlerts, alert) - } - - return sentAlerts, nil } -func (d *Dispatcher) notify(name string, alerts ...*Alert) error { - alerts, err := d.filter(alerts...) - if err != nil { - return err - } +// Run starts dispatching alerts incoming via the updates channel. +func (d *Dispatcher) Run(updates <-chan *Alert) { + d.done = make(chan struct{}) - if len(alerts) == 0 { - return nil - } + defer close(d.done) + defer close(updates) - d.mtx.RLock() - notifier, ok := d.notifiers[name] - d.mtx.RUnlock() - - if !ok { - return fmt.Errorf("notifier %q does not exist", name) - } - - if err = notifier.Send(alerts...); err != nil { - return err - } - - for _, alert := range alerts { - _ = d.state.Notify().Set(alert.Fingerprint(), &NotifyInfo{ - LastSent: time.Now(), - LastResolved: alert.Resolved(), - Labels: alert.Labels, - }) - } - - return nil -} - -func (d *Dispatcher) Run() { - var ( - updates = d.state.Alert().Iter() - cleanup = time.Tick(30 * time.Second) - ) + cleanup := time.Tick(30 * time.Second) for { select { + case alert := <-updates: + d.mtx.RLock() + routes := d.routes.Match(alert.Labels) + d.mtx.RUnlock() + + for _, r := range routes { + d.processAlert(alert, r) + } + case <-cleanup: // Cleanup routine. for _, ag := range d.aggrGroups { @@ -161,21 +80,45 @@ func (d *Dispatcher) Run() { } } - case alert := <-updates: - - conf, err := d.state.Config().Get() - if err != nil { - log.Error(err) - continue - } - - for _, m := range conf.Routes.Match(alert.Labels) { - d.processAlert(alert, m) - } + case <-d.ctx.Done(): + return } } } +// Stop the dispatcher. +func (d *Dispatcher) Stop() { + d.cancel() + <-d.done +} + +// notifyFunc is a function that performs notifcation for the alert +// with the given fingerprint. It aborts on context cancelation. +// It returns whether the alert has successfully been communiated as +// resolved. +type notifyFunc func(context.Context, model.Fingerprint) bool + +// notifyFunc returns a function which performs a notification +// as required by the routing options. +func (d *Dispatcher) notifyFunc(dest string) notifyFunc { + d.mtx.Lock() + defer d.mtx.Unlock() + + notifier := d.notifiers[dest] + + return func(ctx context.Context, fp model.Fingerprint) bool { + alert := d.alertProvider.Get(fp) + + if err := notifier.Notify(ctx, alert); err != nil { + log.Errorf("Notify for %v failed: %s", alert, err) + return false + } + return alert.Resolved() + } +} + +// processAlert determins in which aggregation group the alert falls +// and insert it. func (d *Dispatcher) processAlert(alert *Alert, opts *RouteOpts) { group := model.LabelSet{} @@ -187,51 +130,25 @@ func (d *Dispatcher) processAlert(alert *Alert, opts *RouteOpts) { fp := group.Fingerprint() + // If the group does not exist, create it. ag, ok := d.aggrGroups[fp] if !ok { ag = newAggrGroup(d, group, opts) + ag.run(ag.notifyFunc(opts.SendTo)) + d.aggrGroups[fp] = ag } ag.insert(alert) } -type Silence struct { - Matchers Matchers - - // The numeric ID of the silence. - ID string - - // Name/email of the silence creator. - CreatedBy string - // When the silence was first created (Unix timestamp). - CreatedAt, EndsAt time.Time - - // Additional comment about the silence. - Comment string -} - -func (sil *Silence) Match(lset model.LabelSet) bool { - now := time.Now() - - if now.Before(sil.CreatedAt) || now.After(sil.EndsAt) { - return false - } - - return sil.Matchers.Match(lset) -} - -// Alert models an action triggered by Prometheus. type Alert struct { // Label value pairs for purpose of aggregation, matching, and disposition // dispatching. This must minimally include an "alertname" label. Labels model.LabelSet `json:"labels"` // Extra key/value information which is not used for aggregation. - Payload map[string]string `json:"payload,omitempty"` - Summary string `json:"summary,omitempty"` - Description string `json:"description,omitempty"` - Runbook string `json:"runbook,omitempty"` + Payload map[string]string `json:"payload,omitempty"` CreatedAt time.Time `json:"created_at,omitempty"` ResolvedAt time.Time `json:"resolved_at,omitempty"` @@ -245,6 +162,10 @@ func (a *Alert) Name() string { return string(a.Labels[model.AlertNameLabel]) } +// func (a *Alert) Merge(o *Alert) bool { + +// } + // Fingerprint returns a unique hash for the alert. It is equivalent to // the fingerprint of the alert's label set. func (a *Alert) Fingerprint() model.Fingerprint { @@ -266,63 +187,77 @@ func (a *Alert) Resolved() bool { return !a.ResolvedAt.After(time.Now()) } -// aggrGroup aggregates alerts into groups based on -// common values for a set of labels. -type aggrGroup struct { - dispatcher *Dispatcher +// alertTimeline is a list of alerts sorted by their timestamp. +type alertTimeline []*Alert +func (at alertTimeline) Len() int { return len(at) } +func (at alertTimeline) Less(i, j int) bool { return at[i].Timestamp.Before(at[j].Timestamp) } +func (at alertTimeline) Swap(i, j int) { at[i], at[j] = at[j], at[i] } + +// aggrGroup aggregates alert fingerprints into groups to which a +// common set of routing options applies. +// It emits notifications in the specified intervals. +type aggrGroup struct { labels model.LabelSet opts *RouteOpts - next *time.Timer - done chan struct{} + ctx context.Context + cancel func() + done chan struct{} mtx sync.RWMutex alerts map[model.Fingerprint]struct{} hasSent bool + curRev int } -// newAggrGroup returns a new aggregation group and starts background processing -// that sends notifications about the contained alerts. -func newAggrGroup(d *Dispatcher, labels model.LabelSet, opts *RouteOpts) *aggrGroup { +// newAggrGroup returns a new aggregation group. +func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts) *aggrGroup { ag := &aggrGroup{ - dispatcher: d, - labels: labels, opts: opts, - alerts: map[model.Fingerprint]struct{}{}, - done: make(chan struct{}), - - // Set an initial one-time wait before flushing - // the first batch of notifications. - next: time.NewTimer(opts.GroupWait), } - - go ag.run() + ag.ctx, ag.cancel = context.WithCancel(ctx) return ag } -func (ag *aggrGroup) run() { +func (ag *aggrGroup) run(notify notifyFunc) { + ag.done = make(chan struct{}) + // Set an initial one-time wait before flushing + // the first batch of notifications. + next := time.NewTimer(opts.GroupWait) + + defer close(ag.done) defer ag.next.Stop() for { select { case <-ag.next.C: - ag.flush() - // Wait the configured interval before calling flush again. - ag.next.Reset(ag.opts.GroupInterval) + // Give the notifcations 2/3 the time of the repeat interval + // to finish before terminating them. + ctx, _ := context.WithTimeout(ag.ctx, ag.opts.RepeatInterval*2/3) - case <-ag.done: + // Wait the configured interval before calling flush again. + next.Reset(ag.opts.RepeatInterval) + + ag.flush(func(fp model.Fingerprint) bool { + notify(ctx, fp) + }) + + case <-ag.ctx.Done(): return } } } func (ag *aggrGroup) stop() { - close(ag.done) + // Calling cancel will terminate all in-process notifications + // and the run() loop. + ag.cancel() + <-ag.done } func (ag *aggrGroup) fingerprint() model.Fingerprint { @@ -331,12 +266,12 @@ func (ag *aggrGroup) fingerprint() model.Fingerprint { // insert the alert into the aggregation group. If the aggregation group // is empty afterwards, true is returned. -func (ag *aggrGroup) insert(alert *Alert) { - fp := alert.Fingerprint() - +func (ag *aggrGroup) insert(fp model.Fingerprint) { ag.mtx.Lock() - ag.alerts[fp] = struct{}{} - ag.mtx.Unlock() + defer ag.mtx.Unlock() + + ag.curRev++ + ag.alerts[fp] = ag.curRev // Immediately trigger a flush if the wait duration for this // alert is already over. @@ -353,33 +288,37 @@ func (ag *aggrGroup) empty() bool { } // flush sends notifications for all new alerts. -func (ag *aggrGroup) flush() { +func (ag *aggrGroup) flush(notify func(model.Fingerprint) bool) { ag.mtx.Lock() - defer ag.mtx.Unlock() - var alerts []*Alert - for fp := range ag.alerts { - a, err := ag.dispatcher.state.Alert().Get(fp) - if err != nil { - log.Error(err) - continue - } - // TODO(fabxc): only delete if notify successful. - if a.Resolved() { - delete(ag.alerts, fp) - } - alerts = append(alerts, a) + alerts := make(map[model.Fingerprint]int, len(ag.alerts)) + for fp, rev := range ag.alerts { + alerts[fp] = rev } - if err := ag.dispatcher.notify(ag.opts.SendTo, alerts...); err != nil { - log.Error(err) + ag.mtx.Unlock() + + var wg sync.WaitGroup + wg.Add(len(alerts)) + + for fp, rev := range alerts { + go func(fp model.Fingerprint) { + // notify returns whether the alert can be deleted + // afterwards. + if notify(fp) { + ag.mtx.Lock() + // Only delete if the fingerprint has not been inserted + // again since we notified about it. + if ag.alerts[fp] == rev { + delete(alerts, fp) + } + ag.mtx.Unlock() + } + wg.Done() + }(fp) } + + wg.Wait() + ag.hasSent = true } - -// alertTimeline is a list of alerts sorted by their timestamp. -type alertTimeline []*Alert - -func (at alertTimeline) Len() int { return len(at) } -func (at alertTimeline) Less(i, j int) bool { return at[i].Timestamp.Before(at[j].Timestamp) } -func (at alertTimeline) Swap(i, j int) { at[i], at[j] = at[j], at[i] }