Extract notifiers from dispatcher to routed notifier

This commit is contained in:
Fabian Reinartz 2015-09-27 13:09:02 +02:00
parent 5d0f3ffa0e
commit e10ae037d0
3 changed files with 84 additions and 35 deletions

View File

@ -19,11 +19,11 @@ const ResolveTimeout = 30 * time.Second
// Dispatcher sorts incoming alerts into aggregation groups and
// assigns the correct notifiers to each.
type Dispatcher struct {
routes Routes
alerts provider.Alerts
routes Routes
alerts provider.Alerts
notifier Notifier
aggrGroups map[model.Fingerprint]*aggrGroup
notifiers map[string]Notifier
mtx sync.RWMutex
done chan struct{}
@ -32,8 +32,11 @@ type Dispatcher struct {
}
// NewDispatcher returns a new Dispatcher.
func NewDispatcher(ap provider.Alerts) *Dispatcher {
return &Dispatcher{alerts: ap}
func NewDispatcher(ap provider.Alerts, n Notifier) *Dispatcher {
return &Dispatcher{
alerts: ap,
notifier: n,
}
}
// ApplyConfig updates the dispatcher to match the new configuration.
@ -48,12 +51,6 @@ func (d *Dispatcher) ApplyConfig(conf *config.Config) {
}
d.routes = NewRoutes(conf.Routes)
d.notifiers = map[string]Notifier{}
// TODO(fabxc): build correct notifiers from new conf.NotificationConfigs.
for _, ncfg := range conf.NotificationConfigs {
d.notifiers[ncfg.Name] = &LogNotifier{ncfg.Name}
}
}
// Run starts dispatching alerts incoming via the updates channel.
@ -111,23 +108,6 @@ func (d *Dispatcher) Stop() {
// Returns false iff notifying failed.
type notifyFunc func(context.Context, ...*types.Alert) bool
// notifyFunc returns a function which performs a notification
// as required by the routing options.
func (d *Dispatcher) notifyFunc(dest string) notifyFunc {
d.mtx.Lock()
defer d.mtx.Unlock()
notifier := d.notifiers[dest]
return func(ctx context.Context, alerts ...*types.Alert) bool {
if err := notifier.Notify(ctx, alerts...); err != nil {
log.Errorf("Notify for %d alerts failed: %s", len(alerts), err)
return false
}
return true
}
}
// processAlert determins in which aggregation group the alert falls
// and insert it.
func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) {
@ -145,9 +125,15 @@ func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) {
ag, ok := d.aggrGroups[fp]
if !ok {
ag = newAggrGroup(d.ctx, group, opts)
go ag.run(d.notifyFunc(opts.SendTo))
d.aggrGroups[fp] = ag
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
if err := d.notifier.Notify(ctx, alerts...); err != nil {
log.Errorf("Notify for %d alerts failed: %s", len(alerts), err)
return false
}
return true
})
}
ag.insert(alert)
@ -203,6 +189,9 @@ func (ag *aggrGroup) run(notify notifyFunc) {
// finish before terminating them.
ctx, _ := context.WithTimeout(ag.ctx, ag.opts.GroupInterval)
// Populate context with the destination name.
ctx = context.WithValue(ctx, notifyName, ag.opts.SendTo)
// Wait the configured interval before calling flush again.
ag.next.Reset(ag.opts.GroupInterval)

22
main.go
View File

@ -35,12 +35,30 @@ func main() {
}
memAlerts := provider.NewMemAlerts()
disp := NewDispatcher(memAlerts)
defer disp.Stop()
inhibitor := &Inhibitor{alerts: memAlerts}
inhibitor.ApplyConfig(conf)
routedNotifier := &routedNotifier{}
routedNotifier.ApplyConfig(conf)
var notifier Notifier
notifier = routedNotifier
notifier = &mutingNotifier{
Notifier: notifier,
silencer: inhibitor,
}
// TODO(fabxc)
// notifier = &mutingNotifier{
// Notifier: notifier,
// silencer: provider.Silences
// }
disp := NewDispatcher(memAlerts, notifier)
disp.ApplyConfig(conf)
go disp.Run()
defer disp.Stop()
router := route.New()

View File

@ -1,6 +1,7 @@
package main
import (
"fmt"
"sync"
"github.com/prometheus/common/model"
@ -12,6 +13,12 @@ import (
"github.com/prometheus/alertmanager/types"
)
type notifyKey int
const (
notifyName notifyKey = iota
)
type Notifier interface {
Notify(context.Context, ...*types.Alert) error
}
@ -29,15 +36,50 @@ func (ln *LogNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error
return nil
}
// silencedNotifier wraps a notifier and applies a Silencer
// routedNotifier dispatches the alerts to one of a set of
// named notifiers based on the name value provided in the context.
type routedNotifier struct {
mtx sync.RWMutex
notifiers map[string]Notifier
}
func (n *routedNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error {
name, ok := ctx.Value(notifyName).(string)
if !ok {
return fmt.Errorf("notifier name missing")
}
n.mtx.RLock()
defer n.mtx.RUnlock()
notifier, ok := n.notifiers[name]
if !ok {
return fmt.Errorf("notifier %q does not exist", name)
}
return notifier.Notify(ctx, alerts...)
}
func (n *routedNotifier) ApplyConfig(conf *config.Config) {
n.mtx.Lock()
defer n.mtx.Unlock()
n.notifiers = map[string]Notifier{}
for _, cn := range conf.NotificationConfigs {
// TODO(fabxc): create proper notifiers.
n.notifiers[cn.Name] = &LogNotifier{name: cn.Name}
}
}
// mutingNotifier wraps a notifier and applies a Silencer
// before sending out an alert.
type silencedNotifier struct {
type mutingNotifier struct {
Notifier
silencer types.Silencer
}
func (n *silencedNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error {
func (n *mutingNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error {
var filtered []*types.Alert
for _, a := range alerts {
// TODO(fabxc): increment total alerts counter.