cleanup dispatcher notifications, fix default route opts

This commit is contained in:
Fabian Reinartz 2015-07-04 14:41:10 +02:00
parent b4755b0046
commit d5ff099d05
5 changed files with 53 additions and 21 deletions

View File

@ -39,7 +39,10 @@ func main() {
log.Fatal(err)
}
disp := manager.NewDispatcher(state)
disp := manager.NewDispatcher(state, []manager.Notifier{
manager.NewLogNotifier("default"),
})
router := route.New()
go disp.Run()

View File

@ -73,9 +73,13 @@ func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) {
return
}
for _, alert := range alerts {
now := time.Now()
if alert.Timestamp.IsZero() {
alert.Timestamp = time.Now()
alert.CreatedAt = alert.Timestamp
alert.Timestamp = now
}
if alert.CreatedAt.IsZero() {
alert.CreatedAt = now
}
}

View File

@ -18,25 +18,39 @@ type Dispatcher struct {
state State
aggrGroups map[model.Fingerprint]*aggrGroup
notifiers map[string]Notifier
mtx sync.RWMutex
}
func NewDispatcher(state State) *Dispatcher {
return &Dispatcher{
func NewDispatcher(state State, notifiers []Notifier) *Dispatcher {
disp := &Dispatcher{
state: state,
aggrGroups: map[model.Fingerprint]*aggrGroup{},
notifiers: map[string]Notifier{},
}
for _, n := range notifiers {
disp.notifiers[n.Name()] = n
}
return disp
}
func (d *Dispatcher) notify(name string, alerts ...*Alert) error {
if len(alerts) == 0 {
return
return nil
}
n := &LogNotifier{}
i := []interface{}{name, "::"}
for _, a := range alerts {
i = append(i, a)
d.mtx.RLock()
notifier, ok := d.notifiers[name]
d.mtx.RUnlock()
if !ok {
return fmt.Errorf("notifier %q does not exist", name)
}
n.Send(i...)
return notifier.Send(alerts...)
}
func (d *Dispatcher) Run() {
@ -171,6 +185,10 @@ func newAggrGroup(d *Dispatcher, labels model.LabelSet, opts *RouteOpts) *aggrGr
alerts: map[model.Fingerprint]struct{}{},
done: make(chan struct{}),
// Set an initial one-time wait before flushing
// the first batch of notifications.
next: time.NewTimer(opts.GroupWait),
}
go ag.run()
@ -179,9 +197,6 @@ func newAggrGroup(d *Dispatcher, labels model.LabelSet, opts *RouteOpts) *aggrGr
}
func (ag *aggrGroup) run() {
// Set an initial one-time wait before flushing
// the first batch of notifications.
ag.next = time.NewTimer(ag.opts.GroupWait)
defer ag.next.Stop()

View File

@ -6,16 +6,26 @@ import (
type Notifier interface {
Name() string
Send(...interface{})
Send(...*Alert) error
}
type LogNotifier struct {
name string
}
func (*LogNotifier) Name() string {
return "default"
func NewLogNotifier(name string) Notifier {
return &LogNotifier{name}
}
func (*LogNotifier) Send(v ...interface{}) {
log.Infoln(v...)
func (ln *LogNotifier) Name() string {
return ln.name
}
func (ln *LogNotifier) Send(alerts ...*Alert) error {
log.Infof("notify %q", ln.name)
for _, a := range alerts {
log.Infof(" - %v", a)
}
return nil
}

View File

@ -167,10 +167,10 @@ func (ro *RouteOpts) populateDefault(parent *RouteOpts) {
if ro.SendTo == "" {
ro.SendTo = parent.SendTo
}
if ro.hasWait {
if !ro.hasWait {
ro.GroupWait = parent.GroupWait
}
if ro.hasInterval {
if !ro.hasInterval {
ro.GroupInterval = parent.GroupInterval
}
}