Change notify interface to multiple alerts

This commit is contained in:
Fabian Reinartz 2015-09-26 14:12:55 +02:00
parent 3b7f880f60
commit 79eac01917
2 changed files with 43 additions and 43 deletions

View File

@ -71,9 +71,11 @@ func (d *Dispatcher) Run() {
}
func (d *Dispatcher) run(updates <-chan *types.Alert) {
cleanup := time.Tick(15 * time.Second)
cleanup := time.NewTicker(15 * time.Second)
defer cleanup.Stop()
for {
fmt.Println("run")
select {
case alert := <-updates:
fmt.Println("update", alert)
@ -85,7 +87,7 @@ func (d *Dispatcher) run(updates <-chan *types.Alert) {
d.processAlert(alert, r)
}
case <-cleanup:
case <-cleanup.C:
fmt.Println("cleanup")
for _, ag := range d.aggrGroups {
if ag.empty() {
@ -112,7 +114,7 @@ func (d *Dispatcher) Stop() {
// with the given fingerprint. It aborts on context cancelation.
// It returns whether the alert has successfully been communiated as
// resolved.
type notifyFunc func(context.Context, *types.Alert) bool
type notifyFunc func(context.Context, ...*types.Alert) bool
// notifyFunc returns a function which performs a notification
// as required by the routing options.
@ -122,12 +124,12 @@ func (d *Dispatcher) notifyFunc(dest string) notifyFunc {
notifier := d.notifiers[dest]
return func(ctx context.Context, alert *types.Alert) bool {
if err := notifier.Notify(ctx, alert); err != nil {
log.Errorf("Notify for %v failed: %s", alert, err)
return func(ctx context.Context, alerts ...*types.Alert) bool {
if err := notifier.Notify(ctx, alerts...); err != nil {
log.Errorf("Notify for %d alerts failed: %s", len(alerts), err)
return false
}
return alert.Resolved()
return true
}
}
@ -136,6 +138,7 @@ func (d *Dispatcher) notifyFunc(dest string) notifyFunc {
func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) {
group := model.LabelSet{}
fmt.Println("processing", alert)
defer fmt.Println("proecssing done", alert)
for ln, lv := range alert.Labels {
if _, ok := opts.GroupBy[ln]; ok {
@ -149,7 +152,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) {
ag, ok := d.aggrGroups[fp]
if !ok {
ag = newAggrGroup(d.ctx, group, opts)
ag.run(d.notifyFunc(opts.SendTo))
go ag.run(d.notifyFunc(opts.SendTo))
d.aggrGroups[fp] = ag
}
@ -206,8 +209,8 @@ func (ag *aggrGroup) run(notify notifyFunc) {
// Wait the configured interval before calling flush again.
ag.next.Reset(ag.opts.RepeatInterval)
ag.flush(func(a *types.Alert) bool {
return notify(ctx, a)
ag.flush(func(alerts ...*types.Alert) bool {
return notify(ctx, alerts...)
})
case <-ag.ctx.Done():
@ -250,38 +253,32 @@ func (ag *aggrGroup) empty() bool {
}
// flush sends notifications for all new alerts.
func (ag *aggrGroup) flush(notify func(*types.Alert) bool) {
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
ag.mtx.Lock()
fmt.Println("flushing", ag)
alerts := make(map[model.Fingerprint]*types.Alert, len(ag.alerts))
var (
alerts = make(map[model.Fingerprint]*types.Alert, len(ag.alerts))
alertsSlice = make([]*types.Alert, 0, len(ag.alerts))
)
for fp, alert := range ag.alerts {
alerts[fp] = alert
alertsSlice = append(alertsSlice, alert)
}
ag.mtx.Unlock()
var wg sync.WaitGroup
wg.Add(len(alerts))
for fp, a := range alerts {
go func(fp model.Fingerprint, a *types.Alert) {
// notify returns whether the alert can be deleted
// afterwards.
if notify(a) {
ag.mtx.Lock()
// Only delete if the fingerprint has not been inserted
// again since we notified about it.
if ag.alerts[fp] == a {
delete(alerts, fp)
}
ag.mtx.Unlock()
if notify(alertsSlice...) {
ag.mtx.Lock()
for fp, a := range alerts {
// Only delete if the fingerprint has not been inserted
// again since we notified about it.
if a.Resolved() && ag.alerts[fp] == a {
delete(alerts, fp)
}
wg.Done()
}(fp, a)
}
ag.mtx.Unlock()
}
wg.Wait()
ag.hasSent = true
}

View File

@ -9,19 +9,19 @@ import (
)
type Notifier interface {
Notify(context.Context, *types.Alert) error
Notify(context.Context, ...*types.Alert) error
}
type LogNotifier struct {
name string
}
func (ln *LogNotifier) Notify(ctx context.Context, a *types.Alert) error {
func (ln *LogNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error {
log.Infof("notify %q", ln.name)
// for _, a := range alerts {
log.Infof(" - %v", a)
// }
for _, a := range alerts {
log.Infof(" - %v", a)
}
return nil
}
@ -50,15 +50,18 @@ type silencedNotifier struct {
silencer types.Silencer
}
func (n *silencedNotifier) Notify(ctx context.Context, alert *types.Alert) error {
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
if n.silencer.Mutes(alert.Labels) {
// TODO(fabxc): increment muted alerts counter.
return nil
func (n *silencedNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error {
var filtered []*types.Alert
for _, a := range alerts {
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
if !n.silencer.Mutes(a.Labels) {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
}
}
return n.Notifier.Notify(ctx, alert)
return n.Notifier.Notify(ctx, filtered...)
}
type Inhibitor interface {