Dispatcher config reloading and restarting
This commit is contained in:
parent
0ffdd6fa2f
commit
6e1193c9a2
|
@ -17,8 +17,8 @@ const ResolveTimeout = 30 * time.Second
|
||||||
// Dispatcher sorts incoming alerts into aggregation groups and
|
// Dispatcher sorts incoming alerts into aggregation groups and
|
||||||
// assigns the correct notifiers to each.
|
// assigns the correct notifiers to each.
|
||||||
type Dispatcher struct {
|
type Dispatcher struct {
|
||||||
routes Routes
|
routes Routes
|
||||||
alertProvider provider.Alerts
|
alerts provider.Alerts
|
||||||
|
|
||||||
aggrGroups map[model.Fingerprint]*aggrGroup
|
aggrGroups map[model.Fingerprint]*aggrGroup
|
||||||
notifiers map[string]Notifier
|
notifiers map[string]Notifier
|
||||||
|
@ -30,13 +30,8 @@ type Dispatcher struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDispatcher returns a new Dispatcher.
|
// NewDispatcher returns a new Dispatcher.
|
||||||
func NewDispatcher(ctx context.Context, state State, notifier Notifier) *Dispatcher {
|
func NewDispatcher(ap provider.Alerts) *Dispatcher {
|
||||||
d := &Dispatcher{
|
return &Dispatcher{alerts: ap}
|
||||||
aggrGroups: map[model.Fingerprint]*aggrGroup{},
|
|
||||||
}
|
|
||||||
d.ctx, d.cancel = context.WithCancel(ctx)
|
|
||||||
|
|
||||||
return d
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ApplyConfig updates the dispatcher to match the new configuration.
|
// ApplyConfig updates the dispatcher to match the new configuration.
|
||||||
|
@ -44,6 +39,8 @@ func (d *Dispatcher) ApplyConfig(conf *Config) {
|
||||||
d.mtx.Lock()
|
d.mtx.Lock()
|
||||||
defer d.mtx.Unlock()
|
defer d.mtx.Unlock()
|
||||||
|
|
||||||
|
d.Stop()
|
||||||
|
|
||||||
d.routes = conf.Routes
|
d.routes = conf.Routes
|
||||||
d.notifiers = map[string]Notifier{}
|
d.notifiers = map[string]Notifier{}
|
||||||
|
|
||||||
|
@ -51,15 +48,26 @@ func (d *Dispatcher) ApplyConfig(conf *Config) {
|
||||||
for _, ncfg := range conf.NotificationConfigs {
|
for _, ncfg := range conf.NotificationConfigs {
|
||||||
d.notifiers[ncfg.Name] = &LogNotifier{ncfg.Name}
|
d.notifiers[ncfg.Name] = &LogNotifier{ncfg.Name}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go d.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts dispatching alerts incoming via the updates channel.
|
// Run starts dispatching alerts incoming via the updates channel.
|
||||||
func (d *Dispatcher) Run(updates <-chan *Alert) {
|
func (d *Dispatcher) Run() {
|
||||||
d.done = make(chan struct{})
|
d.done = make(chan struct{})
|
||||||
|
d.aggrGroups = map[model.Fingerprint]*aggrGroup{}
|
||||||
|
|
||||||
|
d.ctx, d.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
updates := d.alertProvider.IterActive()
|
||||||
|
|
||||||
defer close(d.done)
|
defer close(d.done)
|
||||||
defer close(updates)
|
defer close(updates)
|
||||||
|
|
||||||
|
d.run(updates)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Dispatcher) run(updates <-chan *Alert) {
|
||||||
cleanup := time.Tick(30 * time.Second)
|
cleanup := time.Tick(30 * time.Second)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -74,7 +82,6 @@ func (d *Dispatcher) Run(updates <-chan *Alert) {
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-cleanup:
|
case <-cleanup:
|
||||||
// Cleanup routine.
|
|
||||||
for _, ag := range d.aggrGroups {
|
for _, ag := range d.aggrGroups {
|
||||||
if ag.empty() {
|
if ag.empty() {
|
||||||
ag.stop()
|
ag.stop()
|
||||||
|
@ -109,7 +116,7 @@ func (d *Dispatcher) notifyFunc(dest string) notifyFunc {
|
||||||
notifier := d.notifiers[dest]
|
notifier := d.notifiers[dest]
|
||||||
|
|
||||||
return func(ctx context.Context, fp model.Fingerprint) bool {
|
return func(ctx context.Context, fp model.Fingerprint) bool {
|
||||||
alert := d.alertProvider.Get(fp)
|
alert := d.alerts.Get(fp)
|
||||||
|
|
||||||
if err := notifier.Notify(ctx, alert); err != nil {
|
if err := notifier.Notify(ctx, alert); err != nil {
|
||||||
log.Errorf("Notify for %v failed: %s", alert, err)
|
log.Errorf("Notify for %v failed: %s", alert, err)
|
||||||
|
|
Loading…
Reference in New Issue