From 6e1193c9a2afe78ab96630ac95884e3dcc0fd0e2 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 25 Sep 2015 13:44:00 +0200 Subject: [PATCH] Dispatcher config reloading and restarting --- manager/dispatch.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/manager/dispatch.go b/manager/dispatch.go index 363952a1..6cc624dd 100644 --- a/manager/dispatch.go +++ b/manager/dispatch.go @@ -17,8 +17,8 @@ const ResolveTimeout = 30 * time.Second // Dispatcher sorts incoming alerts into aggregation groups and // assigns the correct notifiers to each. type Dispatcher struct { - routes Routes - alertProvider provider.Alerts + routes Routes + alerts provider.Alerts aggrGroups map[model.Fingerprint]*aggrGroup notifiers map[string]Notifier @@ -30,13 +30,8 @@ type Dispatcher struct { } // NewDispatcher returns a new Dispatcher. -func NewDispatcher(ctx context.Context, state State, notifier Notifier) *Dispatcher { - d := &Dispatcher{ - aggrGroups: map[model.Fingerprint]*aggrGroup{}, - } - d.ctx, d.cancel = context.WithCancel(ctx) - - return d +func NewDispatcher(ap provider.Alerts) *Dispatcher { + return &Dispatcher{alerts: ap} } // ApplyConfig updates the dispatcher to match the new configuration. @@ -44,6 +39,8 @@ func (d *Dispatcher) ApplyConfig(conf *Config) { d.mtx.Lock() defer d.mtx.Unlock() + d.Stop() + d.routes = conf.Routes d.notifiers = map[string]Notifier{} @@ -51,15 +48,26 @@ func (d *Dispatcher) ApplyConfig(conf *Config) { for _, ncfg := range conf.NotificationConfigs { d.notifiers[ncfg.Name] = &LogNotifier{ncfg.Name} } + + go d.Run() } // Run starts dispatching alerts incoming via the updates channel. -func (d *Dispatcher) Run(updates <-chan *Alert) { +func (d *Dispatcher) Run() { d.done = make(chan struct{}) + d.aggrGroups = map[model.Fingerprint]*aggrGroup{} + + d.ctx, d.cancel = context.WithCancel(context.Background()) + + updates := d.alertProvider.IterActive() defer close(d.done) defer close(updates) + d.run(updates) +} + +func (d *Dispatcher) run(updates <-chan *Alert) { cleanup := time.Tick(30 * time.Second) for { @@ -74,7 +82,6 @@ func (d *Dispatcher) Run(updates <-chan *Alert) { } case <-cleanup: - // Cleanup routine. for _, ag := range d.aggrGroups { if ag.empty() { ag.stop() @@ -109,7 +116,7 @@ func (d *Dispatcher) notifyFunc(dest string) notifyFunc { notifier := d.notifiers[dest] return func(ctx context.Context, fp model.Fingerprint) bool { - alert := d.alertProvider.Get(fp) + alert := d.alerts.Get(fp) if err := notifier.Notify(ctx, alert); err != nil { log.Errorf("Notify for %v failed: %s", alert, err)