diff --git a/dispatch.go b/dispatch.go index aad742df..046c251e 100644 --- a/dispatch.go +++ b/dispatch.go @@ -19,11 +19,11 @@ const ResolveTimeout = 30 * time.Second // Dispatcher sorts incoming alerts into aggregation groups and // assigns the correct notifiers to each. type Dispatcher struct { - routes Routes - alerts provider.Alerts + routes Routes + alerts provider.Alerts + notifier Notifier aggrGroups map[model.Fingerprint]*aggrGroup - notifiers map[string]Notifier mtx sync.RWMutex done chan struct{} @@ -32,8 +32,11 @@ type Dispatcher struct { } // NewDispatcher returns a new Dispatcher. -func NewDispatcher(ap provider.Alerts) *Dispatcher { - return &Dispatcher{alerts: ap} +func NewDispatcher(ap provider.Alerts, n Notifier) *Dispatcher { + return &Dispatcher{ + alerts: ap, + notifier: n, + } } // ApplyConfig updates the dispatcher to match the new configuration. @@ -48,12 +51,6 @@ func (d *Dispatcher) ApplyConfig(conf *config.Config) { } d.routes = NewRoutes(conf.Routes) - d.notifiers = map[string]Notifier{} - - // TODO(fabxc): build correct notifiers from new conf.NotificationConfigs. - for _, ncfg := range conf.NotificationConfigs { - d.notifiers[ncfg.Name] = &LogNotifier{ncfg.Name} - } } // Run starts dispatching alerts incoming via the updates channel. @@ -111,23 +108,6 @@ func (d *Dispatcher) Stop() { // Returns false iff notifying failed. type notifyFunc func(context.Context, ...*types.Alert) bool -// notifyFunc returns a function which performs a notification -// as required by the routing options. -func (d *Dispatcher) notifyFunc(dest string) notifyFunc { - d.mtx.Lock() - defer d.mtx.Unlock() - - notifier := d.notifiers[dest] - - return func(ctx context.Context, alerts ...*types.Alert) bool { - if err := notifier.Notify(ctx, alerts...); err != nil { - log.Errorf("Notify for %d alerts failed: %s", len(alerts), err) - return false - } - return true - } -} - // processAlert determins in which aggregation group the alert falls // and insert it. func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) { @@ -145,9 +125,15 @@ func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) { ag, ok := d.aggrGroups[fp] if !ok { ag = newAggrGroup(d.ctx, group, opts) - go ag.run(d.notifyFunc(opts.SendTo)) - d.aggrGroups[fp] = ag + + go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { + if err := d.notifier.Notify(ctx, alerts...); err != nil { + log.Errorf("Notify for %d alerts failed: %s", len(alerts), err) + return false + } + return true + }) } ag.insert(alert) @@ -203,6 +189,9 @@ func (ag *aggrGroup) run(notify notifyFunc) { // finish before terminating them. ctx, _ := context.WithTimeout(ag.ctx, ag.opts.GroupInterval) + // Populate context with the destination name. + ctx = context.WithValue(ctx, notifyName, ag.opts.SendTo) + // Wait the configured interval before calling flush again. ag.next.Reset(ag.opts.GroupInterval) diff --git a/main.go b/main.go index 7cf4667f..4f522197 100644 --- a/main.go +++ b/main.go @@ -35,12 +35,30 @@ func main() { } memAlerts := provider.NewMemAlerts() - disp := NewDispatcher(memAlerts) - defer disp.Stop() + inhibitor := &Inhibitor{alerts: memAlerts} + inhibitor.ApplyConfig(conf) + + routedNotifier := &routedNotifier{} + routedNotifier.ApplyConfig(conf) + + var notifier Notifier + notifier = routedNotifier + notifier = &mutingNotifier{ + Notifier: notifier, + silencer: inhibitor, + } + // TODO(fabxc) + // notifier = &mutingNotifier{ + // Notifier: notifier, + // silencer: provider.Silences + // } + + disp := NewDispatcher(memAlerts, notifier) disp.ApplyConfig(conf) go disp.Run() + defer disp.Stop() router := route.New() diff --git a/notify.go b/notify.go index 19a1b5c3..0b436795 100644 --- a/notify.go +++ b/notify.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "sync" "github.com/prometheus/common/model" @@ -12,6 +13,12 @@ import ( "github.com/prometheus/alertmanager/types" ) +type notifyKey int + +const ( + notifyName notifyKey = iota +) + type Notifier interface { Notify(context.Context, ...*types.Alert) error } @@ -29,15 +36,50 @@ func (ln *LogNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error return nil } -// silencedNotifier wraps a notifier and applies a Silencer +// routedNotifier dispatches the alerts to one of a set of +// named notifiers based on the name value provided in the context. +type routedNotifier struct { + mtx sync.RWMutex + notifiers map[string]Notifier +} + +func (n *routedNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error { + name, ok := ctx.Value(notifyName).(string) + if !ok { + return fmt.Errorf("notifier name missing") + } + + n.mtx.RLock() + defer n.mtx.RUnlock() + + notifier, ok := n.notifiers[name] + if !ok { + return fmt.Errorf("notifier %q does not exist", name) + } + + return notifier.Notify(ctx, alerts...) +} + +func (n *routedNotifier) ApplyConfig(conf *config.Config) { + n.mtx.Lock() + defer n.mtx.Unlock() + + n.notifiers = map[string]Notifier{} + for _, cn := range conf.NotificationConfigs { + // TODO(fabxc): create proper notifiers. + n.notifiers[cn.Name] = &LogNotifier{name: cn.Name} + } +} + +// mutingNotifier wraps a notifier and applies a Silencer // before sending out an alert. -type silencedNotifier struct { +type mutingNotifier struct { Notifier silencer types.Silencer } -func (n *silencedNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error { +func (n *mutingNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error { var filtered []*types.Alert for _, a := range alerts { // TODO(fabxc): increment total alerts counter.