fix aggregation group handling and state
This commit is contained in:
parent
dba2b85318
commit
4aa5dcccf3
|
@ -73,8 +73,10 @@ func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
for _, alert := range alerts {
|
||||
if alert.Timestamp.IsZero() {
|
||||
alert.Timestamp = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(fabxc): validate input.
|
||||
if err := api.state.Alert().Add(alerts...); err != nil {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package manager
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -9,6 +8,8 @@ import (
|
|||
"github.com/prometheus/log"
|
||||
)
|
||||
|
||||
const ResolveTimeout = 15 * time.Second
|
||||
|
||||
// Dispatcher dispatches alerts. It is absed on the alert's data
|
||||
// rather than the time they arrive. Thus it can recover it's state
|
||||
// without persistence.
|
||||
|
@ -47,6 +48,33 @@ func (d *Dispatcher) Run() {
|
|||
for _, m := range conf.Routes.Match(alert.Labels) {
|
||||
d.processAlert(alert, m)
|
||||
}
|
||||
|
||||
if !alert.Resolved {
|
||||
// After the constant timeout update the alert to be resolved.
|
||||
go func(alert *Alert) {
|
||||
for {
|
||||
// TODO: get most recent version first.
|
||||
time.Sleep(ResolveTimeout)
|
||||
|
||||
a := *alert
|
||||
a.Resolved = true
|
||||
|
||||
if err := d.state.Alert().Add(&a); err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
}(alert)
|
||||
}
|
||||
|
||||
// Cleanup routine.
|
||||
for _, ag := range d.aggrGroups {
|
||||
if ag.empty() {
|
||||
ag.stop()
|
||||
delete(d.aggrGroups, ag.fingerprint())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,16 +91,43 @@ func (d *Dispatcher) processAlert(alert *Alert, opts *RouteOpts) {
|
|||
|
||||
ag, ok := d.aggrGroups[fp]
|
||||
if !ok {
|
||||
ag = newAggrGroup(group, opts)
|
||||
ag = newAggrGroup(d, group, opts)
|
||||
d.aggrGroups[fp] = ag
|
||||
}
|
||||
|
||||
ag.insert(alert)
|
||||
}
|
||||
|
||||
if ag.empty() {
|
||||
ag.stop()
|
||||
delete(d.aggrGroups, fp)
|
||||
}
|
||||
// Alert models an action triggered by Prometheus.
|
||||
type Alert struct {
|
||||
// Label value pairs for purpose of aggregation, matching, and disposition
|
||||
// dispatching. This must minimally include an "alertname" label.
|
||||
Labels model.LabelSet `json:"labels"`
|
||||
|
||||
// Extra key/value information which is not used for aggregation.
|
||||
Payload map[string]string `json:"payload"`
|
||||
|
||||
// Short summary of alert.
|
||||
Summary string `json:"summary"`
|
||||
Description string `json:"description"`
|
||||
Runbook string `json:"runbook"`
|
||||
|
||||
// When the alert was reported.
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
|
||||
// Whether the alert with the given label set is resolved.
|
||||
Resolved bool `json:"resolved"`
|
||||
}
|
||||
|
||||
// Name returns the name of the alert. It is equivalent to the "alertname" label.
|
||||
func (a *Alert) Name() string {
|
||||
return string(a.Labels[model.AlertNameLabel])
|
||||
}
|
||||
|
||||
// Fingerprint returns a unique hash for the alert. It is equivalent to
|
||||
// the fingerprint of the alert's label set.
|
||||
func (a *Alert) Fingerprint() model.Fingerprint {
|
||||
return a.Labels.Fingerprint()
|
||||
}
|
||||
|
||||
// aggrGroup aggregates alerts into groups based on
|
||||
|
@ -81,31 +136,27 @@ type aggrGroup struct {
|
|||
dispatcher *Dispatcher
|
||||
|
||||
labels model.LabelSet
|
||||
alertsOld alertTimeline
|
||||
alertsNew alertTimeline
|
||||
notify string
|
||||
opts *RouteOpts
|
||||
|
||||
wait time.Duration
|
||||
waitTimer *time.Timer
|
||||
next *time.Timer
|
||||
done chan struct{}
|
||||
|
||||
done chan bool
|
||||
mtx sync.RWMutex
|
||||
alerts map[model.Fingerprint]struct{}
|
||||
hasSent bool
|
||||
}
|
||||
|
||||
// newAggrGroup returns a new aggregation group and starts background processing
|
||||
// that sends notifications about the contained alerts.
|
||||
func newAggrGroup(labels model.LabelSet, opts *RouteOpts) *aggrGroup {
|
||||
func newAggrGroup(d *Dispatcher, labels model.LabelSet, opts *RouteOpts) *aggrGroup {
|
||||
ag := &aggrGroup{
|
||||
dispatcher: d,
|
||||
|
||||
labels: group,
|
||||
wait: opts.GroupWait(),
|
||||
waitTimer: time.NewTimer(opts.GroupWait()),
|
||||
notify: opts.SendTo,
|
||||
done: make(chan bool),
|
||||
}
|
||||
if ag.wait == 0 {
|
||||
ag.waitTimer.Stop()
|
||||
labels: labels,
|
||||
opts: opts,
|
||||
|
||||
alerts: map[model.Fingerprint]struct{}{},
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
go ag.run()
|
||||
|
@ -114,10 +165,19 @@ func newAggrGroup(labels model.LabelSet, opts *RouteOpts) *aggrGroup {
|
|||
}
|
||||
|
||||
func (ag *aggrGroup) run() {
|
||||
// Set an initial one-time wait before flushing
|
||||
// the first batch of notifications.
|
||||
ag.next = time.NewTimer(ag.opts.GroupWait)
|
||||
|
||||
defer ag.next.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ag.waitTimer.C:
|
||||
case <-ag.next.C:
|
||||
ag.flush()
|
||||
// Wait the configured interval before calling flush again.
|
||||
ag.next.Reset(ag.opts.GroupInterval)
|
||||
|
||||
case <-ag.done:
|
||||
return
|
||||
}
|
||||
|
@ -125,7 +185,6 @@ func (ag *aggrGroup) run() {
|
|||
}
|
||||
|
||||
func (ag *aggrGroup) stop() {
|
||||
ag.waitTimer.Stop()
|
||||
close(ag.done)
|
||||
}
|
||||
|
||||
|
@ -136,21 +195,16 @@ func (ag *aggrGroup) fingerprint() model.Fingerprint {
|
|||
// insert the alert into the aggregation group. If the aggregation group
|
||||
// is empty afterwards, true is returned.
|
||||
func (ag *aggrGroup) insert(alert *Alert) {
|
||||
fp := alert.Fingerprint()
|
||||
|
||||
ag.mtx.Lock()
|
||||
|
||||
ag.alertsNew = append(ag.alertsNew, alert)
|
||||
sort.Sort(ag.alertsNew)
|
||||
|
||||
ag.alerts[fp] = struct{}{}
|
||||
ag.mtx.Unlock()
|
||||
|
||||
// Immediately trigger a flush if the wait duration for this
|
||||
// alert is already over.
|
||||
if alert.Timestamp.Add(ag.wait).Before(time.Now()) {
|
||||
ag.flush()
|
||||
}
|
||||
|
||||
if ag.wait > 0 {
|
||||
ag.waitTimer.Reset(ag.wait)
|
||||
if !ag.hasSent && alert.Timestamp.Add(ag.opts.GroupWait).Before(time.Now()) {
|
||||
ag.next.Reset(0)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,7 +212,7 @@ func (ag *aggrGroup) empty() bool {
|
|||
ag.mtx.RLock()
|
||||
defer ag.mtx.RUnlock()
|
||||
|
||||
return len(ag.alertsNew)+len(ag.alertsOld) == 0
|
||||
return len(ag.alerts) == 0
|
||||
}
|
||||
|
||||
// flush sends notifications for all new alerts.
|
||||
|
@ -166,10 +220,22 @@ func (ag *aggrGroup) flush() {
|
|||
ag.mtx.Lock()
|
||||
defer ag.mtx.Unlock()
|
||||
|
||||
ag.dispatcher.notify(ag.notify, ag.alertsNew...)
|
||||
var alerts []*Alert
|
||||
for fp := range ag.alerts {
|
||||
a, err := ag.dispatcher.state.Alert().Get(fp)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
// TODO(fabxc): only delete if notify successful.
|
||||
if a.Resolved {
|
||||
delete(ag.alerts, fp)
|
||||
}
|
||||
alerts = append(alerts, a)
|
||||
}
|
||||
|
||||
ag.alertsOld = append(ag.alertsOld, ag.alertsNew...)
|
||||
ag.alertsNew = ag.alertsNew[:0]
|
||||
ag.dispatcher.notify(ag.opts.SendTo, alerts...)
|
||||
ag.hasSent = true
|
||||
}
|
||||
|
||||
// alertTimeline is a list of alerts sorted by their timestamp.
|
||||
|
|
|
@ -3,7 +3,6 @@ package manager
|
|||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
// "github.com/prometheus/log"
|
||||
|
@ -19,6 +18,7 @@ type State interface {
|
|||
|
||||
type AlertState interface {
|
||||
Add(...*Alert) error
|
||||
Get(model.Fingerprint) (*Alert, error)
|
||||
GetAll() ([]*Alert, error)
|
||||
|
||||
Next() *Alert
|
||||
|
@ -51,10 +51,11 @@ type simpleState struct {
|
|||
func NewSimpleState() State {
|
||||
return &simpleState{
|
||||
silences: &memSilences{
|
||||
m: map[string]*Silence{},
|
||||
sils: map[string]*Silence{},
|
||||
nextID: 1,
|
||||
},
|
||||
alerts: &memAlerts{
|
||||
alerts: map[model.Fingerprint]*Alert{},
|
||||
ch: make(chan *Alert, 100),
|
||||
},
|
||||
config: &memConfig{},
|
||||
|
@ -94,7 +95,7 @@ func (c *memConfig) Get() (*Config, error) {
|
|||
}
|
||||
|
||||
type memAlerts struct {
|
||||
alerts []*Alert
|
||||
alerts map[model.Fingerprint]*Alert
|
||||
ch chan *Alert
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
@ -104,7 +105,9 @@ func (s *memAlerts) GetAll() ([]*Alert, error) {
|
|||
defer s.mtx.RUnlock()
|
||||
|
||||
alerts := make([]*Alert, len(s.alerts))
|
||||
copy(alerts, s.alerts)
|
||||
for i, a := range s.alerts {
|
||||
alerts[i] = a
|
||||
}
|
||||
|
||||
return alerts, nil
|
||||
}
|
||||
|
@ -113,23 +116,40 @@ func (s *memAlerts) Add(alerts ...*Alert) error {
|
|||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
s.alerts = append(s.alerts, alerts...)
|
||||
for _, alert := range alerts {
|
||||
fp := alert.Fingerprint()
|
||||
|
||||
// Last write wins.
|
||||
if prev, ok := s.alerts[fp]; !ok || alert.Timestamp.After(prev.Timestamp) {
|
||||
s.alerts[fp] = alert
|
||||
}
|
||||
|
||||
// TODO(fabxc): remove this as it blocks if the channel is full.
|
||||
for _, alert := range alerts {
|
||||
s.ch <- alert
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *memAlerts) Get(fp model.Fingerprint) (*Alert, error) {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
if a, ok := s.alerts[fp]; ok {
|
||||
return a, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("alert with fingerprint %s does not exist", fp)
|
||||
}
|
||||
|
||||
func (s *memAlerts) Next() *Alert {
|
||||
return <-s.ch
|
||||
}
|
||||
|
||||
type memSilences struct {
|
||||
m map[string]*Silence
|
||||
mtx sync.RWMutex
|
||||
sils map[string]*Silence
|
||||
|
||||
mtx sync.RWMutex
|
||||
nextID uint64
|
||||
}
|
||||
|
||||
|
@ -140,13 +160,22 @@ func (s *memSilences) genID() string {
|
|||
}
|
||||
|
||||
func (s *memSilences) Get(sid string) (*Silence, error) {
|
||||
return nil, nil
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
if sil, ok := s.sils[sid]; ok {
|
||||
return sil, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("silence with ID %s does not exist", sid)
|
||||
}
|
||||
|
||||
func (s *memSilences) Del(sid string) error {
|
||||
if _, ok := s.m[sid]; !ok {
|
||||
if _, ok := s.sils[sid]; !ok {
|
||||
return fmt.Errorf("silence with ID %s does not exist", sid)
|
||||
}
|
||||
delete(s.m, sid)
|
||||
|
||||
delete(s.sils, sid)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -154,8 +183,8 @@ func (s *memSilences) GetAll() ([]*Silence, error) {
|
|||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
sils := make([]*Silence, 0, len(s.m))
|
||||
for _, sil := range s.m {
|
||||
sils := make([]*Silence, 0, len(s.sils))
|
||||
for _, sil := range s.sils {
|
||||
sils = append(sils, sil)
|
||||
}
|
||||
return sils, nil
|
||||
|
@ -169,39 +198,6 @@ func (s *memSilences) Set(sil *Silence) error {
|
|||
sil.ID = s.genID()
|
||||
}
|
||||
|
||||
s.m[sil.ID] = sil
|
||||
s.sils[sil.ID] = sil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Alert models an action triggered by Prometheus.
|
||||
type Alert struct {
|
||||
// Label value pairs for purpose of aggregation, matching, and disposition
|
||||
// dispatching. This must minimally include an "alertname" label.
|
||||
Labels model.LabelSet `json:"labels"`
|
||||
|
||||
// Extra key/value information which is not used for aggregation.
|
||||
Payload map[string]string `json:"payload"`
|
||||
|
||||
// Short summary of alert.
|
||||
Summary string `json:"summary"`
|
||||
|
||||
// Long description of alert.
|
||||
Description string `json:"description"`
|
||||
|
||||
// Runbook link or reference for the alert.
|
||||
Runbook string `json:"runbook"`
|
||||
|
||||
// When the alert was reported.
|
||||
Timestamp time.Time `json:"-"`
|
||||
}
|
||||
|
||||
// Name returns the name of the alert. It is equivalent to the "alertname" label.
|
||||
func (a *Alert) Name() string {
|
||||
return string(a.Labels[model.AlertNameLabel])
|
||||
}
|
||||
|
||||
// Fingerprint returns a unique hash for the alert. It is equivalent to
|
||||
// the fingerprint of the alert's label set.
|
||||
func (a *Alert) Fingerprint() model.Fingerprint {
|
||||
return a.Labels.Fingerprint()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue