2015-07-02 16:38:05 +00:00
|
|
|
package manager
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/prometheus/common/model"
|
|
|
|
"github.com/prometheus/log"
|
|
|
|
)
|
|
|
|
|
2015-07-04 10:52:53 +00:00
|
|
|
const ResolveTimeout = 15 * time.Second
|
|
|
|
|
2015-07-02 16:38:05 +00:00
|
|
|
// Dispatcher dispatches alerts. It is absed on the alert's data
|
|
|
|
// rather than the time they arrive. Thus it can recover it's state
|
|
|
|
// without persistence.
|
|
|
|
type Dispatcher struct {
|
|
|
|
state State
|
|
|
|
|
|
|
|
aggrGroups map[model.Fingerprint]*aggrGroup
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewDispatcher(state State) *Dispatcher {
|
|
|
|
return &Dispatcher{
|
|
|
|
state: state,
|
|
|
|
aggrGroups: map[model.Fingerprint]*aggrGroup{},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *Dispatcher) notify(name string, alerts ...*Alert) {
|
|
|
|
n := &LogNotifier{}
|
|
|
|
i := []interface{}{name, "::"}
|
|
|
|
for _, a := range alerts {
|
|
|
|
i = append(i, a)
|
|
|
|
}
|
|
|
|
n.Send(i...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *Dispatcher) Run() {
|
|
|
|
for {
|
|
|
|
alert := d.state.Alert().Next()
|
|
|
|
|
|
|
|
conf, err := d.state.Config().Get()
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, m := range conf.Routes.Match(alert.Labels) {
|
|
|
|
d.processAlert(alert, m)
|
|
|
|
}
|
2015-07-04 10:52:53 +00:00
|
|
|
|
|
|
|
if !alert.Resolved {
|
|
|
|
// After the constant timeout update the alert to be resolved.
|
|
|
|
go func(alert *Alert) {
|
|
|
|
for {
|
|
|
|
// TODO: get most recent version first.
|
|
|
|
time.Sleep(ResolveTimeout)
|
|
|
|
|
|
|
|
a := *alert
|
|
|
|
a.Resolved = true
|
|
|
|
|
|
|
|
if err := d.state.Alert().Add(&a); err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}(alert)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cleanup routine.
|
|
|
|
for _, ag := range d.aggrGroups {
|
|
|
|
if ag.empty() {
|
|
|
|
ag.stop()
|
|
|
|
delete(d.aggrGroups, ag.fingerprint())
|
|
|
|
}
|
|
|
|
}
|
2015-07-02 16:38:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *Dispatcher) processAlert(alert *Alert, opts *RouteOpts) {
|
|
|
|
group := model.LabelSet{}
|
|
|
|
|
|
|
|
for ln, lv := range alert.Labels {
|
|
|
|
if _, ok := opts.GroupBy[ln]; ok {
|
|
|
|
group[ln] = lv
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fp := group.Fingerprint()
|
|
|
|
|
|
|
|
ag, ok := d.aggrGroups[fp]
|
|
|
|
if !ok {
|
2015-07-04 10:52:53 +00:00
|
|
|
ag = newAggrGroup(d, group, opts)
|
2015-07-02 16:38:05 +00:00
|
|
|
d.aggrGroups[fp] = ag
|
|
|
|
}
|
|
|
|
|
|
|
|
ag.insert(alert)
|
2015-07-04 10:52:53 +00:00
|
|
|
}
|
2015-07-02 18:48:21 +00:00
|
|
|
|
2015-07-04 10:52:53 +00:00
|
|
|
// Alert models an action triggered by Prometheus.
|
|
|
|
type Alert struct {
|
|
|
|
// Label value pairs for purpose of aggregation, matching, and disposition
|
|
|
|
// dispatching. This must minimally include an "alertname" label.
|
|
|
|
Labels model.LabelSet `json:"labels"`
|
|
|
|
|
|
|
|
// Extra key/value information which is not used for aggregation.
|
|
|
|
Payload map[string]string `json:"payload"`
|
|
|
|
|
|
|
|
// Short summary of alert.
|
|
|
|
Summary string `json:"summary"`
|
|
|
|
Description string `json:"description"`
|
|
|
|
Runbook string `json:"runbook"`
|
|
|
|
|
|
|
|
// When the alert was reported.
|
|
|
|
Timestamp time.Time `json:"timestamp"`
|
|
|
|
|
|
|
|
// Whether the alert with the given label set is resolved.
|
|
|
|
Resolved bool `json:"resolved"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// Name returns the name of the alert. It is equivalent to the "alertname" label.
|
|
|
|
func (a *Alert) Name() string {
|
|
|
|
return string(a.Labels[model.AlertNameLabel])
|
|
|
|
}
|
|
|
|
|
|
|
|
// Fingerprint returns a unique hash for the alert. It is equivalent to
|
|
|
|
// the fingerprint of the alert's label set.
|
|
|
|
func (a *Alert) Fingerprint() model.Fingerprint {
|
|
|
|
return a.Labels.Fingerprint()
|
2015-07-02 16:38:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// aggrGroup aggregates alerts into groups based on
|
|
|
|
// common values for a set of labels.
|
|
|
|
type aggrGroup struct {
|
|
|
|
dispatcher *Dispatcher
|
|
|
|
|
2015-07-04 10:52:53 +00:00
|
|
|
labels model.LabelSet
|
|
|
|
opts *RouteOpts
|
2015-07-02 16:38:05 +00:00
|
|
|
|
2015-07-04 10:52:53 +00:00
|
|
|
next *time.Timer
|
|
|
|
done chan struct{}
|
2015-07-02 16:38:05 +00:00
|
|
|
|
2015-07-04 10:52:53 +00:00
|
|
|
mtx sync.RWMutex
|
|
|
|
alerts map[model.Fingerprint]struct{}
|
|
|
|
hasSent bool
|
2015-07-02 16:38:05 +00:00
|
|
|
}
|
|
|
|
|
2015-07-02 18:48:21 +00:00
|
|
|
// newAggrGroup returns a new aggregation group and starts background processing
|
|
|
|
// that sends notifications about the contained alerts.
|
2015-07-04 10:52:53 +00:00
|
|
|
func newAggrGroup(d *Dispatcher, labels model.LabelSet, opts *RouteOpts) *aggrGroup {
|
2015-07-02 18:48:21 +00:00
|
|
|
ag := &aggrGroup{
|
|
|
|
dispatcher: d,
|
|
|
|
|
2015-07-04 10:52:53 +00:00
|
|
|
labels: labels,
|
|
|
|
opts: opts,
|
|
|
|
|
|
|
|
alerts: map[model.Fingerprint]struct{}{},
|
|
|
|
done: make(chan struct{}),
|
2015-07-02 18:48:21 +00:00
|
|
|
}
|
2015-07-02 16:38:05 +00:00
|
|
|
|
2015-07-02 18:48:21 +00:00
|
|
|
go ag.run()
|
2015-07-02 16:38:05 +00:00
|
|
|
|
2015-07-02 18:48:21 +00:00
|
|
|
return ag
|
|
|
|
}
|
2015-07-02 16:38:05 +00:00
|
|
|
|
2015-07-02 18:48:21 +00:00
|
|
|
func (ag *aggrGroup) run() {
|
2015-07-04 10:52:53 +00:00
|
|
|
// Set an initial one-time wait before flushing
|
|
|
|
// the first batch of notifications.
|
|
|
|
ag.next = time.NewTimer(ag.opts.GroupWait)
|
|
|
|
|
|
|
|
defer ag.next.Stop()
|
|
|
|
|
2015-07-02 16:38:05 +00:00
|
|
|
for {
|
|
|
|
select {
|
2015-07-04 10:52:53 +00:00
|
|
|
case <-ag.next.C:
|
2015-07-02 16:38:05 +00:00
|
|
|
ag.flush()
|
2015-07-04 10:52:53 +00:00
|
|
|
// Wait the configured interval before calling flush again.
|
|
|
|
ag.next.Reset(ag.opts.GroupInterval)
|
|
|
|
|
2015-07-02 16:38:05 +00:00
|
|
|
case <-ag.done:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ag *aggrGroup) stop() {
|
|
|
|
close(ag.done)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ag *aggrGroup) fingerprint() model.Fingerprint {
|
|
|
|
return ag.labels.Fingerprint()
|
|
|
|
}
|
|
|
|
|
2015-07-02 18:48:21 +00:00
|
|
|
// insert the alert into the aggregation group. If the aggregation group
|
|
|
|
// is empty afterwards, true is returned.
|
2015-07-02 16:38:05 +00:00
|
|
|
func (ag *aggrGroup) insert(alert *Alert) {
|
2015-07-04 10:52:53 +00:00
|
|
|
fp := alert.Fingerprint()
|
2015-07-02 16:38:05 +00:00
|
|
|
|
2015-07-04 10:52:53 +00:00
|
|
|
ag.mtx.Lock()
|
|
|
|
ag.alerts[fp] = struct{}{}
|
2015-07-02 18:48:21 +00:00
|
|
|
ag.mtx.Unlock()
|
|
|
|
|
|
|
|
// Immediately trigger a flush if the wait duration for this
|
|
|
|
// alert is already over.
|
2015-07-04 10:52:53 +00:00
|
|
|
if !ag.hasSent && alert.Timestamp.Add(ag.opts.GroupWait).Before(time.Now()) {
|
|
|
|
ag.next.Reset(0)
|
2015-07-02 16:38:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-07-02 18:48:21 +00:00
|
|
|
func (ag *aggrGroup) empty() bool {
|
|
|
|
ag.mtx.RLock()
|
|
|
|
defer ag.mtx.RUnlock()
|
|
|
|
|
2015-07-04 10:52:53 +00:00
|
|
|
return len(ag.alerts) == 0
|
2015-07-02 18:48:21 +00:00
|
|
|
}
|
2015-07-02 16:38:05 +00:00
|
|
|
|
2015-07-02 18:48:21 +00:00
|
|
|
// flush sends notifications for all new alerts.
|
|
|
|
func (ag *aggrGroup) flush() {
|
2015-07-02 16:38:05 +00:00
|
|
|
ag.mtx.Lock()
|
|
|
|
defer ag.mtx.Unlock()
|
|
|
|
|
2015-07-04 10:52:53 +00:00
|
|
|
var alerts []*Alert
|
|
|
|
for fp := range ag.alerts {
|
|
|
|
a, err := ag.dispatcher.state.Alert().Get(fp)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
// TODO(fabxc): only delete if notify successful.
|
|
|
|
if a.Resolved {
|
|
|
|
delete(ag.alerts, fp)
|
|
|
|
}
|
|
|
|
alerts = append(alerts, a)
|
|
|
|
}
|
2015-07-02 16:38:05 +00:00
|
|
|
|
2015-07-04 10:52:53 +00:00
|
|
|
ag.dispatcher.notify(ag.opts.SendTo, alerts...)
|
|
|
|
ag.hasSent = true
|
2015-07-02 16:38:05 +00:00
|
|
|
}
|
|
|
|
|
2015-07-02 18:48:21 +00:00
|
|
|
// alertTimeline is a list of alerts sorted by their timestamp.
|
2015-07-02 16:38:05 +00:00
|
|
|
type alertTimeline []*Alert
|
|
|
|
|
2015-07-02 18:48:21 +00:00
|
|
|
func (at alertTimeline) Len() int { return len(at) }
|
|
|
|
func (at alertTimeline) Less(i, j int) bool { return at[i].Timestamp.Before(at[j].Timestamp) }
|
|
|
|
func (at alertTimeline) Swap(i, j int) { at[i], at[j] = at[j], at[i] }
|