Fix compile issues
This commit is contained in:
parent
80970a2c44
commit
89e8d82a1b
229
api.go
229
api.go
|
@ -1,37 +1,34 @@
|
|||
package manager
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
// "time"
|
||||
|
||||
"github.com/prometheus/common/route"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type API struct {
|
||||
state State
|
||||
|
||||
// context is an indirection for testing.
|
||||
context func(r *http.Request) context.Context
|
||||
}
|
||||
|
||||
func NewAPI(r *route.Router, s State) *API {
|
||||
func NewAPI(r *route.Router) *API {
|
||||
api := &API{
|
||||
state: s,
|
||||
context: route.Context,
|
||||
}
|
||||
|
||||
r.Get("/alerts", api.listAlerts)
|
||||
r.Post("/alerts", api.addAlerts)
|
||||
// r.Get("/alerts", api.listAlerts)
|
||||
// r.Post("/alerts", api.addAlerts)
|
||||
|
||||
r.Get("/silences", api.listSilences)
|
||||
r.Post("/silences", api.addSilence)
|
||||
// r.Get("/silences", api.listSilences)
|
||||
// r.Post("/silences", api.addSilence)
|
||||
|
||||
r.Get("/silence/:sid", api.getSilence)
|
||||
r.Put("/silence/:sid", api.setSilence)
|
||||
r.Del("/silence/:sid", api.delSilence)
|
||||
// r.Get("/silence/:sid", api.getSilence)
|
||||
// r.Put("/silence/:sid", api.setSilence)
|
||||
// r.Del("/silence/:sid", api.delSilence)
|
||||
|
||||
return api
|
||||
}
|
||||
|
@ -54,123 +51,123 @@ func (e *apiError) Error() string {
|
|||
return fmt.Sprintf("%s: %s", e.typ, e.err)
|
||||
}
|
||||
|
||||
func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
|
||||
alerts, err := api.state.Alert().GetAll()
|
||||
if err != nil {
|
||||
respondError(w, apiError{
|
||||
typ: errorBadData,
|
||||
err: err,
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
respond(w, alerts)
|
||||
}
|
||||
// func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
|
||||
// alerts, err := api.state.Alert().GetAll()
|
||||
// if err != nil {
|
||||
// respondError(w, apiError{
|
||||
// typ: errorBadData,
|
||||
// err: err,
|
||||
// }, nil)
|
||||
// return
|
||||
// }
|
||||
// respond(w, alerts)
|
||||
// }
|
||||
|
||||
func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) {
|
||||
var alerts []*Alert
|
||||
if err := receive(r, &alerts); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
for _, alert := range alerts {
|
||||
now := time.Now()
|
||||
// func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) {
|
||||
// var alerts []*Alert
|
||||
// if err := receive(r, &alerts); err != nil {
|
||||
// http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
// return
|
||||
// }
|
||||
// for _, alert := range alerts {
|
||||
// now := time.Now()
|
||||
|
||||
if alert.Timestamp.IsZero() {
|
||||
alert.Timestamp = now
|
||||
}
|
||||
if alert.CreatedAt.IsZero() {
|
||||
alert.CreatedAt = now
|
||||
}
|
||||
if alert.ResolvedAt.IsZero() {
|
||||
alert.ResolvedAt = alert.CreatedAt.Add(ResolveTimeout)
|
||||
}
|
||||
}
|
||||
// if alert.Timestamp.IsZero() {
|
||||
// alert.Timestamp = now
|
||||
// }
|
||||
// if alert.CreatedAt.IsZero() {
|
||||
// alert.CreatedAt = now
|
||||
// }
|
||||
// if alert.ResolvedAt.IsZero() {
|
||||
// alert.ResolvedAt = alert.CreatedAt.Add(ResolveTimeout)
|
||||
// }
|
||||
// }
|
||||
|
||||
// TODO(fabxc): validate input.
|
||||
if err := api.state.Alert().Add(alerts...); err != nil {
|
||||
respondError(w, apiError{
|
||||
typ: errorBadData,
|
||||
err: err,
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
// // TODO(fabxc): validate input.
|
||||
// if err := api.state.Alert().Add(alerts...); err != nil {
|
||||
// respondError(w, apiError{
|
||||
// typ: errorBadData,
|
||||
// err: err,
|
||||
// }, nil)
|
||||
// return
|
||||
// }
|
||||
|
||||
respond(w, nil)
|
||||
}
|
||||
// respond(w, nil)
|
||||
// }
|
||||
|
||||
func (api *API) addSilence(w http.ResponseWriter, r *http.Request) {
|
||||
var sil Silence
|
||||
if err := receive(r, &sil); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// TODO(fabxc): validate input.
|
||||
if err := api.state.Silence().Set(&sil); err != nil {
|
||||
respondError(w, apiError{
|
||||
typ: errorBadData,
|
||||
err: err,
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
// func (api *API) addSilence(w http.ResponseWriter, r *http.Request) {
|
||||
// var sil Silence
|
||||
// if err := receive(r, &sil); err != nil {
|
||||
// http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
// return
|
||||
// }
|
||||
// // TODO(fabxc): validate input.
|
||||
// if err := api.state.Silence().Set(&sil); err != nil {
|
||||
// respondError(w, apiError{
|
||||
// typ: errorBadData,
|
||||
// err: err,
|
||||
// }, nil)
|
||||
// return
|
||||
// }
|
||||
|
||||
respond(w, nil)
|
||||
}
|
||||
// respond(w, nil)
|
||||
// }
|
||||
|
||||
func (api *API) getSilence(w http.ResponseWriter, r *http.Request) {
|
||||
sid := route.Param(api.context(r), "sid")
|
||||
// func (api *API) getSilence(w http.ResponseWriter, r *http.Request) {
|
||||
// sid := route.Param(api.context(r), "sid")
|
||||
|
||||
sil, err := api.state.Silence().Get(sid)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprint("Error getting silence: ", err), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
// sil, err := api.state.Silence().Get(sid)
|
||||
// if err != nil {
|
||||
// http.Error(w, fmt.Sprint("Error getting silence: ", err), http.StatusNotFound)
|
||||
// return
|
||||
// }
|
||||
|
||||
respond(w, &sil)
|
||||
}
|
||||
// respond(w, &sil)
|
||||
// }
|
||||
|
||||
func (api *API) setSilence(w http.ResponseWriter, r *http.Request) {
|
||||
var sil Silence
|
||||
if err := receive(r, &sil); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// TODO(fabxc): validate input.
|
||||
sil.ID = route.Param(api.context(r), "sid")
|
||||
// func (api *API) setSilence(w http.ResponseWriter, r *http.Request) {
|
||||
// var sil Silence
|
||||
// if err := receive(r, &sil); err != nil {
|
||||
// http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
// return
|
||||
// }
|
||||
// // TODO(fabxc): validate input.
|
||||
// sil.ID = route.Param(api.context(r), "sid")
|
||||
|
||||
if err := api.state.Silence().Set(&sil); err != nil {
|
||||
respondError(w, apiError{
|
||||
typ: errorBadData,
|
||||
err: err,
|
||||
}, &sil)
|
||||
return
|
||||
}
|
||||
respond(w, nil)
|
||||
}
|
||||
// if err := api.state.Silence().Set(&sil); err != nil {
|
||||
// respondError(w, apiError{
|
||||
// typ: errorBadData,
|
||||
// err: err,
|
||||
// }, &sil)
|
||||
// return
|
||||
// }
|
||||
// respond(w, nil)
|
||||
// }
|
||||
|
||||
func (api *API) delSilence(w http.ResponseWriter, r *http.Request) {
|
||||
sid := route.Param(api.context(r), "sid")
|
||||
// func (api *API) delSilence(w http.ResponseWriter, r *http.Request) {
|
||||
// sid := route.Param(api.context(r), "sid")
|
||||
|
||||
if err := api.state.Silence().Del(sid); err != nil {
|
||||
respondError(w, apiError{
|
||||
typ: errorBadData,
|
||||
err: err,
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
respond(w, nil)
|
||||
}
|
||||
// if err := api.state.Silence().Del(sid); err != nil {
|
||||
// respondError(w, apiError{
|
||||
// typ: errorBadData,
|
||||
// err: err,
|
||||
// }, nil)
|
||||
// return
|
||||
// }
|
||||
// respond(w, nil)
|
||||
// }
|
||||
|
||||
func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
|
||||
sils, err := api.state.Silence().List()
|
||||
if err != nil {
|
||||
respondError(w, apiError{
|
||||
typ: errorBadData,
|
||||
err: err,
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
respond(w, sils)
|
||||
}
|
||||
// func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
|
||||
// sils, err := api.state.Silence().List()
|
||||
// if err != nil {
|
||||
// respondError(w, apiError{
|
||||
// typ: errorBadData,
|
||||
// err: err,
|
||||
// }, nil)
|
||||
// return
|
||||
// }
|
||||
// respond(w, sils)
|
||||
// }
|
||||
|
||||
type status string
|
||||
|
||||
|
|
68
dispatch.go
68
dispatch.go
|
@ -1,7 +1,6 @@
|
|||
package manager
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -9,6 +8,7 @@ import (
|
|||
"github.com/prometheus/log"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
@ -36,21 +36,23 @@ func NewDispatcher(ap provider.Alerts) *Dispatcher {
|
|||
}
|
||||
|
||||
// ApplyConfig updates the dispatcher to match the new configuration.
|
||||
func (d *Dispatcher) ApplyConfig(conf *Config) {
|
||||
func (d *Dispatcher) ApplyConfig(conf *config.Config) {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
|
||||
d.Stop()
|
||||
// If a cancelation function is set, the dispatcher is running.
|
||||
if d.cancel != nil {
|
||||
d.Stop()
|
||||
defer func() { go d.Run() }()
|
||||
}
|
||||
|
||||
d.routes = conf.Routes
|
||||
d.routes = NewRoutes(conf.Routes)
|
||||
d.notifiers = map[string]Notifier{}
|
||||
|
||||
// TODO(fabxc): build correct notifiers from new conf.NotificationConfigs.
|
||||
for _, ncfg := range conf.NotificationConfigs {
|
||||
d.notifiers[ncfg.Name] = &LogNotifier{ncfg.Name}
|
||||
}
|
||||
|
||||
go d.Run()
|
||||
}
|
||||
|
||||
// Run starts dispatching alerts incoming via the updates channel.
|
||||
|
@ -60,10 +62,10 @@ func (d *Dispatcher) Run() {
|
|||
|
||||
d.ctx, d.cancel = context.WithCancel(context.Background())
|
||||
|
||||
updates := d.alertProvider.IterActive()
|
||||
updates := d.alerts.IterActive()
|
||||
|
||||
defer close(d.done)
|
||||
defer close(updates)
|
||||
// TODO(fabxc): updates channel is never closed!!!
|
||||
|
||||
d.run(updates)
|
||||
}
|
||||
|
@ -99,6 +101,7 @@ func (d *Dispatcher) run(updates <-chan *types.Alert) {
|
|||
// Stop the dispatcher.
|
||||
func (d *Dispatcher) Stop() {
|
||||
d.cancel()
|
||||
d.cancel = nil
|
||||
<-d.done
|
||||
}
|
||||
|
||||
|
@ -106,7 +109,7 @@ func (d *Dispatcher) Stop() {
|
|||
// with the given fingerprint. It aborts on context cancelation.
|
||||
// It returns whether the alert has successfully been communiated as
|
||||
// resolved.
|
||||
type notifyFunc func(context.Context, model.Fingerprint) bool
|
||||
type notifyFunc func(context.Context, *types.Alert) bool
|
||||
|
||||
// notifyFunc returns a function which performs a notification
|
||||
// as required by the routing options.
|
||||
|
@ -116,9 +119,7 @@ func (d *Dispatcher) notifyFunc(dest string) notifyFunc {
|
|||
|
||||
notifier := d.notifiers[dest]
|
||||
|
||||
return func(ctx context.Context, fp model.Fingerprint) bool {
|
||||
alert := d.alerts.Get(fp)
|
||||
|
||||
return func(ctx context.Context, alert *types.Alert) bool {
|
||||
if err := notifier.Notify(ctx, alert); err != nil {
|
||||
log.Errorf("Notify for %v failed: %s", alert, err)
|
||||
return false
|
||||
|
@ -143,8 +144,8 @@ func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) {
|
|||
// If the group does not exist, create it.
|
||||
ag, ok := d.aggrGroups[fp]
|
||||
if !ok {
|
||||
ag = newAggrGroup(d, group, opts)
|
||||
ag.run(ag.notifyFunc(opts.SendTo))
|
||||
ag = newAggrGroup(d.ctx, group, opts)
|
||||
ag.run(d.notifyFunc(opts.SendTo))
|
||||
|
||||
d.aggrGroups[fp] = ag
|
||||
}
|
||||
|
@ -162,11 +163,11 @@ type aggrGroup struct {
|
|||
ctx context.Context
|
||||
cancel func()
|
||||
done chan struct{}
|
||||
next *time.Timer
|
||||
|
||||
mtx sync.RWMutex
|
||||
alerts map[model.Fingerprint]struct{}
|
||||
alerts map[model.Fingerprint]*types.Alert
|
||||
hasSent bool
|
||||
curRev int
|
||||
}
|
||||
|
||||
// newAggrGroup returns a new aggregation group.
|
||||
|
@ -174,7 +175,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts) *
|
|||
ag := &aggrGroup{
|
||||
labels: labels,
|
||||
opts: opts,
|
||||
alerts: map[model.Fingerprint]struct{}{},
|
||||
alerts: map[model.Fingerprint]*types.Alert{},
|
||||
}
|
||||
ag.ctx, ag.cancel = context.WithCancel(ctx)
|
||||
|
||||
|
@ -186,7 +187,7 @@ func (ag *aggrGroup) run(notify notifyFunc) {
|
|||
|
||||
// Set an initial one-time wait before flushing
|
||||
// the first batch of notifications.
|
||||
next := time.NewTimer(opts.GroupWait)
|
||||
ag.next = time.NewTimer(ag.opts.GroupWait)
|
||||
|
||||
defer close(ag.done)
|
||||
defer ag.next.Stop()
|
||||
|
@ -199,10 +200,10 @@ func (ag *aggrGroup) run(notify notifyFunc) {
|
|||
ctx, _ := context.WithTimeout(ag.ctx, ag.opts.RepeatInterval*2/3)
|
||||
|
||||
// Wait the configured interval before calling flush again.
|
||||
next.Reset(ag.opts.RepeatInterval)
|
||||
ag.next.Reset(ag.opts.RepeatInterval)
|
||||
|
||||
ag.flush(func(fp model.Fingerprint) bool {
|
||||
notify(ctx, fp)
|
||||
ag.flush(func(a *types.Alert) bool {
|
||||
return notify(ctx, a)
|
||||
})
|
||||
|
||||
case <-ag.ctx.Done():
|
||||
|
@ -224,12 +225,11 @@ func (ag *aggrGroup) fingerprint() model.Fingerprint {
|
|||
|
||||
// insert the alert into the aggregation group. If the aggregation group
|
||||
// is empty afterwards, true is returned.
|
||||
func (ag *aggrGroup) insert(fp model.Fingerprint) {
|
||||
func (ag *aggrGroup) insert(alert *types.Alert) {
|
||||
ag.mtx.Lock()
|
||||
defer ag.mtx.Unlock()
|
||||
|
||||
ag.curRev++
|
||||
ag.alerts[fp] = ag.curRev
|
||||
ag.alerts[alert.Fingerprint()] = alert
|
||||
|
||||
// Immediately trigger a flush if the wait duration for this
|
||||
// alert is already over.
|
||||
|
@ -246,12 +246,12 @@ func (ag *aggrGroup) empty() bool {
|
|||
}
|
||||
|
||||
// flush sends notifications for all new alerts.
|
||||
func (ag *aggrGroup) flush(notify func(model.Fingerprint) bool) {
|
||||
func (ag *aggrGroup) flush(notify func(*types.Alert) bool) {
|
||||
ag.mtx.Lock()
|
||||
|
||||
alerts := make(map[model.Fingerprint]int, len(ag.alerts))
|
||||
for fp, rev := range ag.alerts {
|
||||
alerts[fp] = rev
|
||||
alerts := make(map[model.Fingerprint]*types.Alert, len(ag.alerts))
|
||||
for fp, alert := range ag.alerts {
|
||||
alerts[fp] = alert
|
||||
}
|
||||
|
||||
ag.mtx.Unlock()
|
||||
|
@ -259,21 +259,21 @@ func (ag *aggrGroup) flush(notify func(model.Fingerprint) bool) {
|
|||
var wg sync.WaitGroup
|
||||
wg.Add(len(alerts))
|
||||
|
||||
for fp, rev := range alerts {
|
||||
go func(fp model.Fingerprint) {
|
||||
for fp, a := range alerts {
|
||||
go func(fp model.Fingerprint, a *types.Alert) {
|
||||
// notify returns whether the alert can be deleted
|
||||
// afterwards.
|
||||
if notify(fp) {
|
||||
if notify(a) {
|
||||
ag.mtx.Lock()
|
||||
// Only delete if the fingerprint has not been inserted
|
||||
// again since we notified about it.
|
||||
if ag.alerts[fp] == rev {
|
||||
if ag.alerts[fp] == a {
|
||||
delete(alerts, fp)
|
||||
}
|
||||
ag.mtx.Unlock()
|
||||
}
|
||||
wg.Done()
|
||||
}(fp)
|
||||
}(fp, a)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
package manager
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/mode"
|
||||
)
|
||||
|
||||
func TestAggrGroupInsert(t *testing.T) {
|
||||
ag := newAggrGroup(nil, model.LabelSet{
|
||||
model.AlertNameLabel: "test",
|
||||
}, opts)
|
||||
}
|
7
main.go
7
main.go
|
@ -20,6 +20,7 @@ import (
|
|||
"github.com/prometheus/common/route"
|
||||
"github.com/prometheus/log"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
)
|
||||
|
||||
|
@ -28,7 +29,7 @@ var (
|
|||
)
|
||||
|
||||
func main() {
|
||||
conf, err := manager.LoadFile(*configFile)
|
||||
conf, err := config.LoadFile(*configFile)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@ -36,11 +37,13 @@ func main() {
|
|||
memAlerts := provider.NewMemAlerts()
|
||||
disp := NewDispatcher(memAlerts)
|
||||
|
||||
defer disp.Stop()
|
||||
|
||||
disp.ApplyConfig(conf)
|
||||
|
||||
router := route.New()
|
||||
|
||||
manager.NewAPI(router.WithPrefix("/api"), state)
|
||||
NewAPI(router.WithPrefix("/api"))
|
||||
|
||||
http.ListenAndServe(":9091", router)
|
||||
}
|
||||
|
|
79
notify.go
79
notify.go
|
@ -1,70 +1,45 @@
|
|||
package manager
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/log"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
type Notifier interface {
|
||||
Notify(context.Context, *Alert) error
|
||||
Notify(context.Context, *types.Alert) error
|
||||
}
|
||||
|
||||
type LogNotifier struct {
|
||||
name string
|
||||
}
|
||||
|
||||
func (ln *LogNotifier) Notify(ctx context.Context, a *Alert) error {
|
||||
func (ln *LogNotifier) Notify(ctx context.Context, a *types.Alert) error {
|
||||
log.Infof("notify %q", ln.name)
|
||||
|
||||
for _, a := range alerts {
|
||||
log.Infof(" - %v", a)
|
||||
}
|
||||
// for _, a := range alerts {
|
||||
log.Infof(" - %v", a)
|
||||
// }
|
||||
return nil
|
||||
}
|
||||
|
||||
// routedNotifier forwards alerts to notifiers matching the alert in
|
||||
// a routing tree.
|
||||
type routedNotifier struct {
|
||||
notifiers map[string]Notifier
|
||||
}
|
||||
|
||||
func (n *routedNotifier) Notify(alert *Alert) error {
|
||||
|
||||
}
|
||||
|
||||
// A Silencer determines whether a given label set is muted.
|
||||
type Silencer interface {
|
||||
Mutes(model.LabelSet) bool
|
||||
}
|
||||
|
||||
// A Silence determines whether a given label set is muted
|
||||
// at the current time.
|
||||
type Silence struct {
|
||||
ID model.Fingerprint
|
||||
|
||||
// A set of matchers determining if an alert is
|
||||
Matchers Matchers
|
||||
// Name/email of the silence creator.
|
||||
CreatedBy string
|
||||
// When the silence was first created (Unix timestamp).
|
||||
CreatedAt, EndsAt time.Time
|
||||
|
||||
// Additional comment about the silence.
|
||||
Comment string
|
||||
|
||||
// timeFunc provides the time against which to evaluate
|
||||
// the silence.
|
||||
timeFunc func() time.Time
|
||||
}
|
||||
|
||||
func (sil *Silence) Mutes(lset model.LabelSet) bool {
|
||||
t := sil.timeFunc()
|
||||
|
||||
if t.Before(sil.CreatedAt) || t.After(sil.EndsAt) {
|
||||
return false
|
||||
}
|
||||
|
||||
return sil.Matchers.Match(lset)
|
||||
// An InhibitRule specifies that a class of (source) alerts should inhibit
|
||||
// notifications for another class of (target) alerts if all specified matching
|
||||
// labels are equal between the two alerts. This may be used to inhibit alerts
|
||||
// from sending notifications if their meaning is logically a subset of a
|
||||
// higher-level alert.
|
||||
type InhibitRule struct {
|
||||
// The set of Filters which define the group of source alerts (which inhibit
|
||||
// the target alerts).
|
||||
SourceMatchers types.Matchers
|
||||
// The set of Filters which define the group of target alerts (which are
|
||||
// inhibited by the source alerts).
|
||||
TargetMatchers types.Matchers
|
||||
// A set of label names whose label values need to be identical in source and
|
||||
// target alerts in order for the inhibition to take effect.
|
||||
Equal model.LabelNames
|
||||
}
|
||||
|
||||
// silencedNotifier wraps a notifier and applies a Silencer
|
||||
|
@ -72,10 +47,10 @@ func (sil *Silence) Mutes(lset model.LabelSet) bool {
|
|||
type silencedNotifier struct {
|
||||
Notifier
|
||||
|
||||
silencer Silencer
|
||||
silencer types.Silencer
|
||||
}
|
||||
|
||||
func (n *silencedNotifier) Notify(alert *Alert) error {
|
||||
func (n *silencedNotifier) Notify(ctx context.Context, alert *types.Alert) error {
|
||||
// TODO(fabxc): increment total alerts counter.
|
||||
// Do not send the alert if the silencer mutes it.
|
||||
if n.silencer.Mutes(alert.Labels) {
|
||||
|
@ -83,7 +58,7 @@ func (n *silencedNotifier) Notify(alert *Alert) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
return n.Notifier.Send(alert)
|
||||
return n.Notifier.Notify(ctx, alert)
|
||||
}
|
||||
|
||||
type Inhibitor interface {
|
||||
|
|
|
@ -33,13 +33,13 @@ type MemAlerts struct {
|
|||
listeners []chan *types.Alert
|
||||
}
|
||||
|
||||
func NewMemAlert() *MemAlerts {
|
||||
func NewMemAlerts() *MemAlerts {
|
||||
return &MemAlerts{
|
||||
alerts: map[model.Fingerprint]*types.Alert,
|
||||
alerts: map[model.Fingerprint]*types.Alert{},
|
||||
}
|
||||
}
|
||||
|
||||
func (a *MemAlerts) IterActive() <-chan *Alert {
|
||||
func (a *MemAlerts) IterActive() <-chan *types.Alert {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
|
@ -54,7 +54,7 @@ func (a *MemAlerts) IterActive() <-chan *Alert {
|
|||
return ch
|
||||
}
|
||||
|
||||
func (a *MemAlerts) Put(alert *Alert) error {
|
||||
func (a *MemAlerts) Put(alert *types.Alert) error {
|
||||
a.mtx.RLock()
|
||||
defer a.mtx.RUnlock()
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package manager
|
||||
package provider
|
||||
|
||||
import (
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -37,16 +37,16 @@ type Silences interface {
|
|||
// The Silences provider must implement the Silencer interface
|
||||
// for all its silences. The data provider may have access to an
|
||||
// optimized view of the data to perform this evaluation.
|
||||
Silencer
|
||||
types.Silencer
|
||||
|
||||
// All returns all existing silences.
|
||||
All() []*Silence
|
||||
All() []*types.Silence
|
||||
// Set a new silence.
|
||||
Set(*Silence) error
|
||||
Set(*types.Silence) error
|
||||
// Del removes a silence.
|
||||
Del(model.Fingerprint) error
|
||||
// Get a silence associated with a fingerprint.
|
||||
Get(model.Fingerprint) (*Silence, error)
|
||||
Get(model.Fingerprint) (*types.Silence, error)
|
||||
}
|
||||
|
||||
// Reloadable is a component that can change its state based
|
||||
|
|
129
route.go
129
route.go
|
@ -1,15 +1,18 @@
|
|||
package manager
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
var DefaultRouteOpts = RouteOpts{
|
||||
GroupWait: 10 * time.Second,
|
||||
GroupInterval: 10 * time.Second,
|
||||
GroupWait: 10 * time.Second,
|
||||
RepeatInterval: 10 * time.Second,
|
||||
}
|
||||
|
||||
type Routes []*Route
|
||||
|
@ -29,7 +32,7 @@ type Route struct {
|
|||
|
||||
// Equality or regex matchers an alert has to fulfill to match
|
||||
// this route.
|
||||
Matchers Matchers
|
||||
Matchers types.Matchers
|
||||
|
||||
// If true, an alert matches further routes on the same level.
|
||||
Continue bool
|
||||
|
@ -38,6 +41,55 @@ type Route struct {
|
|||
Routes Routes
|
||||
}
|
||||
|
||||
func NewRoute(cr *config.Route) *Route {
|
||||
groupBy := map[model.LabelName]struct{}{}
|
||||
for _, ln := range cr.GroupBy {
|
||||
groupBy[ln] = struct{}{}
|
||||
}
|
||||
|
||||
opts := RouteOpts{
|
||||
SendTo: cr.SendTo,
|
||||
GroupBy: groupBy,
|
||||
hasWait: cr.GroupWait != nil,
|
||||
hasInterval: cr.RepeatInterval != nil,
|
||||
}
|
||||
if opts.hasWait {
|
||||
opts.GroupWait = time.Duration(*cr.GroupWait)
|
||||
}
|
||||
if opts.hasInterval {
|
||||
opts.RepeatInterval = time.Duration(*cr.RepeatInterval)
|
||||
}
|
||||
|
||||
var matchers types.Matchers
|
||||
|
||||
for ln, lv := range cr.Match {
|
||||
matchers = append(matchers, types.NewMatcher(model.LabelName(ln), lv))
|
||||
}
|
||||
for ln, lv := range cr.MatchRE {
|
||||
m, err := types.NewRegexMatcher(model.LabelName(ln), lv.String())
|
||||
if err != nil {
|
||||
// Must have been sanitized during config validation.
|
||||
panic(err)
|
||||
}
|
||||
matchers = append(matchers, m)
|
||||
}
|
||||
|
||||
return &Route{
|
||||
RouteOpts: opts,
|
||||
Matchers: matchers,
|
||||
Continue: cr.Continue,
|
||||
Routes: NewRoutes(cr.Routes),
|
||||
}
|
||||
}
|
||||
|
||||
func NewRoutes(croutes []*config.Route) Routes {
|
||||
res := Routes{}
|
||||
for _, cr := range croutes {
|
||||
res = append(res, NewRoute(cr))
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// Match does a depth-first left-to-right search through the route tree
|
||||
// and returns the flattened configuration for the reached node.
|
||||
func (r *Route) Match(lset model.LabelSet) []*RouteOpts {
|
||||
|
@ -68,73 +120,6 @@ func (r *Route) Match(lset model.LabelSet) []*RouteOpts {
|
|||
return all
|
||||
}
|
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||
func (r *Route) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||
type route struct {
|
||||
SendTo string `yaml:"send_to,omitempty"`
|
||||
GroupBy []model.LabelName `yaml:"group_by,omitempty"`
|
||||
GroupWait *model.Duration `yaml:"group_wait,omitempty"`
|
||||
RepeatInterval *model.Duration `yaml:"repeat_interval,omitempty"`
|
||||
|
||||
Match map[string]string `yaml:"match,omitempty"`
|
||||
MatchRE map[string]string `yaml:"match_re,omitempty"`
|
||||
Continue bool `yaml:"continue,omitempty"`
|
||||
Routes []*Route `yaml:"routes,omitempty"`
|
||||
|
||||
// Catches all undefined fields and must be empty after parsing.
|
||||
XXX map[string]interface{} `yaml:",inline"`
|
||||
}
|
||||
var v route
|
||||
if err := unmarshal(&v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for k, val := range v.Match {
|
||||
if !model.LabelNameRE.MatchString(k) {
|
||||
fmt.Errorf("invalid label name %q", k)
|
||||
}
|
||||
ln := model.LabelName(k)
|
||||
r.Matchers = append(r.Matchers, NewMatcher(ln, val))
|
||||
}
|
||||
|
||||
for k, val := range v.MatchRE {
|
||||
if !model.LabelNameRE.MatchString(k) {
|
||||
fmt.Errorf("invalid label name %q", k)
|
||||
}
|
||||
ln := model.LabelName(k)
|
||||
|
||||
m, err := NewRegexMatcher(ln, val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Matchers = append(r.Matchers, m)
|
||||
}
|
||||
|
||||
r.RouteOpts.GroupBy = make(map[model.LabelName]struct{}, len(v.GroupBy))
|
||||
|
||||
for _, ln := range v.GroupBy {
|
||||
if _, ok := r.RouteOpts.GroupBy[ln]; ok {
|
||||
return fmt.Errorf("duplicated label %q in group_by", ln)
|
||||
}
|
||||
r.RouteOpts.GroupBy[ln] = struct{}{}
|
||||
}
|
||||
|
||||
if v.GroupWait != nil {
|
||||
r.RouteOpts.GroupWait = time.Duration(*v.GroupWait)
|
||||
r.RouteOpts.hasWait = true
|
||||
}
|
||||
if v.RepeatInterval != nil {
|
||||
r.RouteOpts.RepeatInterval = time.Duration(*v.RepeatInterval)
|
||||
r.RouteOpts.hasInterval = true
|
||||
}
|
||||
r.RouteOpts.SendTo = v.SendTo
|
||||
|
||||
r.Continue = v.Continue
|
||||
r.Routes = v.Routes
|
||||
|
||||
return checkOverflow(v.XXX, "route")
|
||||
}
|
||||
|
||||
type RouteOpts struct {
|
||||
// The identifier of the associated notification configuration
|
||||
SendTo string
|
||||
|
@ -155,7 +140,7 @@ func (ro *RouteOpts) String() string {
|
|||
for ln := range ro.GroupBy {
|
||||
labels = append(labels, ln)
|
||||
}
|
||||
return fmt.Sprintf("<RouteOpts send_to:%q group_by:%q group_wait:%q>", ro.SendTo, labels, ro.GroupWait)
|
||||
return fmt.Sprintf("<RouteOpts send_to:%q group_by:%q group_wait:%q %q %q>", ro.SendTo, labels, ro.GroupWait)
|
||||
}
|
||||
|
||||
func (ro *RouteOpts) populateDefault(parent *RouteOpts) {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package manager
|
||||
package main
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
@ -7,6 +7,8 @@ import (
|
|||
|
||||
"github.com/prometheus/common/model"
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
)
|
||||
|
||||
func TestRouteMatch(t *testing.T) {
|
||||
|
@ -49,11 +51,13 @@ routes:
|
|||
send_to: 'notify-BC'
|
||||
`
|
||||
|
||||
var tree Route
|
||||
if err := yaml.Unmarshal([]byte(in), &tree); err != nil {
|
||||
var ctree config.Route
|
||||
if err := yaml.Unmarshal([]byte(in), &ctree); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
tree := NewRoute(&ctree)
|
||||
|
||||
lset := func(labels ...string) map[model.LabelName]struct{} {
|
||||
s := map[model.LabelName]struct{}{}
|
||||
for _, ls := range labels {
|
||||
|
@ -62,8 +66,6 @@ routes:
|
|||
return s
|
||||
}
|
||||
|
||||
gwait := func(d time.Duration) *time.Duration { return &d }
|
||||
|
||||
tests := []struct {
|
||||
input model.LabelSet
|
||||
result []*RouteOpts
|
||||
|
@ -99,7 +101,8 @@ routes:
|
|||
{
|
||||
SendTo: "notify-BC",
|
||||
GroupBy: lset("foo", "bar"),
|
||||
groupWait: gwait(2 * time.Minute),
|
||||
GroupWait: 2 * time.Minute,
|
||||
hasWait: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -124,12 +127,14 @@ routes:
|
|||
{
|
||||
SendTo: "notify-productionA",
|
||||
GroupBy: lset(),
|
||||
groupWait: gwait(1 * time.Minute),
|
||||
GroupWait: 1 * time.Minute,
|
||||
hasWait: true,
|
||||
},
|
||||
{
|
||||
SendTo: "notify-productionB",
|
||||
GroupBy: lset("job"),
|
||||
groupWait: gwait(10 * time.Minute),
|
||||
GroupWait: 10 * time.Minute,
|
||||
hasWait: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -139,7 +144,7 @@ routes:
|
|||
matches := tree.Match(test.input)
|
||||
|
||||
if !reflect.DeepEqual(matches, test.result) {
|
||||
t.Errorf("expected:\n%v\n\ngot:\n%v", test.result, matches)
|
||||
t.Errorf("\nexpected:\n%v\ngot:\n%v", test.result, matches)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
set -e
|
||||
|
||||
repo_path="github.com/prometheus/prometheus"
|
||||
repo_path="github.com/prometheus/alertmanager"
|
||||
|
||||
version=$( cat version/VERSION )
|
||||
revision=$( git rev-parse --short HEAD 2> /dev/null || echo 'unknown' )
|
||||
|
@ -38,10 +38,7 @@ ldflags="
|
|||
|
||||
export GO15VENDOREXPERIMENT="1"
|
||||
|
||||
echo " > prometheus"
|
||||
go build -ldflags "${ldflags}" -o prometheus${ext} ${repo_path}/cmd/prometheus
|
||||
|
||||
echo " > promtool"
|
||||
go build -ldflags "${ldflags}" -o promtool${ext} ${repo_path}/cmd/promtool
|
||||
echo " > alertmanager"
|
||||
go build -ldflags "${ldflags}" -o alertmanager${ext} ${repo_path}
|
||||
|
||||
exit 0
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package manager
|
||||
package types
|
||||
|
||||
import (
|
||||
"fmt"
|
|
@ -1,5 +1,12 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
type Alert struct {
|
||||
// Label value pairs for purpose of aggregation, matching, and disposition
|
||||
// dispatching. This must minimally include an "alertname" label.
|
||||
|
@ -51,3 +58,38 @@ type alertTimeline []*Alert
|
|||
func (at alertTimeline) Len() int { return len(at) }
|
||||
func (at alertTimeline) Less(i, j int) bool { return at[i].Timestamp.Before(at[j].Timestamp) }
|
||||
func (at alertTimeline) Swap(i, j int) { at[i], at[j] = at[j], at[i] }
|
||||
|
||||
// A Silencer determines whether a given label set is muted.
|
||||
type Silencer interface {
|
||||
Mutes(model.LabelSet) bool
|
||||
}
|
||||
|
||||
// A Silence determines whether a given label set is muted
|
||||
// at the current time.
|
||||
type Silence struct {
|
||||
ID model.Fingerprint
|
||||
|
||||
// A set of matchers determining if an alert is
|
||||
Matchers Matchers
|
||||
// Name/email of the silence creator.
|
||||
CreatedBy string
|
||||
// When the silence was first created (Unix timestamp).
|
||||
CreatedAt, EndsAt time.Time
|
||||
|
||||
// Additional comment about the silence.
|
||||
Comment string
|
||||
|
||||
// timeFunc provides the time against which to evaluate
|
||||
// the silence.
|
||||
timeFunc func() time.Time
|
||||
}
|
||||
|
||||
func (sil *Silence) Mutes(lset model.LabelSet) bool {
|
||||
t := sil.timeFunc()
|
||||
|
||||
if t.Before(sil.CreatedAt) || t.After(sil.EndsAt) {
|
||||
return false
|
||||
}
|
||||
|
||||
return sil.Matchers.Match(lset)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue