Simplify initialization
No longer update components based on a new configuration. Generally, destroying and recreating has no performance impact and is less error-prone. This also removes the Reloadable interface and simplifies the entire startup contraption.
This commit is contained in:
parent
9c5468786d
commit
ce74f8363b
42
dispatch.go
42
dispatch.go
|
@ -26,7 +26,6 @@ type Dispatcher struct {
|
|||
|
||||
aggrGroups map[model.Fingerprint]*aggrGroup
|
||||
|
||||
mtx sync.RWMutex
|
||||
done chan struct{}
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
@ -35,26 +34,14 @@ type Dispatcher struct {
|
|||
}
|
||||
|
||||
// NewDispatcher returns a new Dispatcher.
|
||||
func NewDispatcher(ap provider.Alerts, n notify.Notifier) *Dispatcher {
|
||||
return &Dispatcher{
|
||||
func NewDispatcher(ap provider.Alerts, r []*config.Route, n notify.Notifier) *Dispatcher {
|
||||
disp := &Dispatcher{
|
||||
alerts: ap,
|
||||
notifier: n,
|
||||
routes: NewRoutes(r, nil),
|
||||
log: log.With("component", "dispatcher"),
|
||||
}
|
||||
}
|
||||
|
||||
// ApplyConfig updates the dispatcher to match the new configuration.
|
||||
func (d *Dispatcher) ApplyConfig(conf *config.Config) {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
|
||||
// If a cancelation function is set, the dispatcher is running.
|
||||
if d.cancel != nil {
|
||||
d.Stop()
|
||||
defer func() { go d.Run() }()
|
||||
}
|
||||
|
||||
d.routes = NewRoutes(conf.Routes, nil)
|
||||
return disp
|
||||
}
|
||||
|
||||
// Run starts dispatching alerts incoming via the updates channel.
|
||||
|
@ -65,10 +52,11 @@ func (d *Dispatcher) Run() {
|
|||
d.ctx, d.cancel = context.WithCancel(context.Background())
|
||||
|
||||
d.run(d.alerts.Subscribe())
|
||||
close(d.done)
|
||||
}
|
||||
|
||||
func (d *Dispatcher) run(it provider.AlertIterator) {
|
||||
cleanup := time.NewTicker(15 * time.Second)
|
||||
cleanup := time.NewTicker(5 * time.Minute)
|
||||
defer cleanup.Stop()
|
||||
|
||||
defer it.Close()
|
||||
|
@ -78,16 +66,13 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
|
|||
case alert := <-it.Next():
|
||||
d.log.With("alert", alert).Debug("Received alert")
|
||||
|
||||
// Log errors but keep trying
|
||||
// Log errors but keep trying.
|
||||
if err := it.Err(); err != nil {
|
||||
log.Errorf("Error on alert update: %s", err)
|
||||
continue
|
||||
}
|
||||
d.mtx.RLock()
|
||||
routes := d.routes.Match(alert.Labels)
|
||||
d.mtx.RUnlock()
|
||||
|
||||
for _, r := range routes {
|
||||
for _, r := range d.routes.Match(alert.Labels) {
|
||||
d.processAlert(alert, r)
|
||||
}
|
||||
|
||||
|
@ -107,6 +92,9 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
|
|||
|
||||
// Stop the dispatcher.
|
||||
func (d *Dispatcher) Stop() {
|
||||
if d == nil || d.cancel == nil {
|
||||
return
|
||||
}
|
||||
d.cancel()
|
||||
d.cancel = nil
|
||||
|
||||
|
@ -140,11 +128,11 @@ func (d *Dispatcher) processAlert(alert *types.Alert, opts *RouteOpts) {
|
|||
ag.log = log.With("aggrGroup", ag)
|
||||
|
||||
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
|
||||
if err := d.notifier.Notify(ctx, alerts...); err != nil {
|
||||
log.Errorf("Notify for %d alerts failed: %s", len(alerts), err)
|
||||
return false
|
||||
err := d.notifier.Notify(ctx, alerts...)
|
||||
if err != nil {
|
||||
log.Errorf("Notify for %d alerts failed: %s %T", len(alerts), err, err)
|
||||
}
|
||||
return true
|
||||
return err != nil
|
||||
})
|
||||
}
|
||||
|
||||
|
|
23
inhibit.go
23
inhibit.go
|
@ -18,10 +18,17 @@ type Inhibitor struct {
|
|||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
|
||||
ih.mtx.RLock()
|
||||
defer ih.mtx.RUnlock()
|
||||
func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule) *Inhibitor {
|
||||
ih := &Inhibitor{
|
||||
alerts: ap,
|
||||
}
|
||||
for _, cr := range rs {
|
||||
ih.rules = append(ih.rules, NewInhibitRule(cr))
|
||||
}
|
||||
return ih
|
||||
}
|
||||
|
||||
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
|
||||
alerts := ih.alerts.GetPending()
|
||||
defer alerts.Close()
|
||||
|
||||
|
@ -45,16 +52,6 @@ func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (ih *Inhibitor) ApplyConfig(conf *config.Config) {
|
||||
ih.mtx.Lock()
|
||||
defer ih.mtx.Unlock()
|
||||
|
||||
ih.rules = []*InhibitRule{}
|
||||
for _, cr := range conf.InhibitRules {
|
||||
ih.rules = append(ih.rules, NewInhibitRule(cr))
|
||||
}
|
||||
}
|
||||
|
||||
// An InhibitRule specifies that a class of (source) alerts should inhibit
|
||||
// notifications for another class of (target) alerts if all specified matching
|
||||
// labels are equal between the two alerts. This may be used to inhibit alerts
|
||||
|
|
182
main.go
182
main.go
|
@ -16,7 +16,6 @@ package main
|
|||
import (
|
||||
"database/sql"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
@ -25,13 +24,11 @@ import (
|
|||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/route"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/template"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -49,8 +46,6 @@ func main() {
|
|||
}
|
||||
defer db.Close()
|
||||
|
||||
tmpl := &template.Template{}
|
||||
|
||||
alerts, err := provider.NewSQLAlerts(db)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
@ -64,103 +59,73 @@ func main() {
|
|||
log.Fatal(err)
|
||||
}
|
||||
|
||||
inhibitor := &Inhibitor{alerts: alerts}
|
||||
|
||||
routedNotifier := ¬ify.RoutedNotifier{}
|
||||
|
||||
// Connect the pipeline of notifiers. Notifications will be sent
|
||||
// through them in inverted order.
|
||||
var notifier notify.Notifier
|
||||
notifier = ¬ify.LogNotifier{
|
||||
Log: log.With("notifier", "routed"),
|
||||
Notifier: routedNotifier,
|
||||
}
|
||||
|
||||
notifier = notify.NewDedupingNotifier(notifies, notifier)
|
||||
notifier = ¬ify.LogNotifier{
|
||||
Log: log.With("notifier", "dedup"),
|
||||
Notifier: notifier,
|
||||
}
|
||||
|
||||
notifier = ¬ify.MutingNotifier{
|
||||
Notifier: notifier,
|
||||
Muter: inhibitor,
|
||||
}
|
||||
notifier = ¬ify.LogNotifier{
|
||||
Log: log.With("notifier", "inhibit"),
|
||||
Notifier: notifier,
|
||||
}
|
||||
|
||||
notifier = ¬ify.MutingNotifier{
|
||||
Notifier: notifier,
|
||||
Muter: silences,
|
||||
}
|
||||
notifier = ¬ify.LogNotifier{
|
||||
Log: log.With("notifier", "silencer"),
|
||||
Notifier: notifier,
|
||||
}
|
||||
|
||||
build := func(conf *config.Config) {
|
||||
|
||||
res := map[string]notify.Notifier{}
|
||||
|
||||
for _, nc := range conf.NotificationConfigs {
|
||||
var all notify.Notifiers
|
||||
|
||||
for _, wc := range nc.WebhookConfigs {
|
||||
all = append(all, ¬ify.LogNotifier{
|
||||
Log: log.With("notifier", "webhook"),
|
||||
Notifier: notify.NewWebhook(wc),
|
||||
})
|
||||
}
|
||||
for _, ec := range nc.EmailConfigs {
|
||||
all = append(all, ¬ify.LogNotifier{
|
||||
Log: log.With("notifier", "email"),
|
||||
Notifier: notify.NewEmail(ec, tmpl),
|
||||
})
|
||||
}
|
||||
|
||||
for i, nv := range all {
|
||||
n := nv
|
||||
|
||||
n = ¬ify.RetryNotifier{Notifier: n}
|
||||
n = notify.NewDedupingNotifier(notifies, n)
|
||||
nn := notify.NotifyFunc(func(ctx context.Context, alerts ...*types.Alert) error {
|
||||
|
||||
dest, ok := notify.Destination(ctx)
|
||||
if !ok {
|
||||
return fmt.Errorf("missing destination name")
|
||||
}
|
||||
dest = fmt.Sprintf("%s/%s/%d", dest, nc.Name, i)
|
||||
|
||||
log.Debugln("destination new", dest)
|
||||
|
||||
ctx = notify.WithDestination(ctx, dest)
|
||||
return n.Notify(ctx, alerts...)
|
||||
})
|
||||
|
||||
all[i] = nn
|
||||
}
|
||||
|
||||
res[nc.Name] = all
|
||||
}
|
||||
|
||||
routedNotifier.Lock()
|
||||
routedNotifier.Notifiers = res
|
||||
routedNotifier.Unlock()
|
||||
}
|
||||
|
||||
disp := NewDispatcher(alerts, notifier)
|
||||
|
||||
if err := reloadConfig(*configFile, disp, types.ReloadFunc(build), inhibitor, tmpl); err != nil {
|
||||
log.Fatalf("Couldn't load configuration (-config.file=%s): %v", *configFile, err)
|
||||
}
|
||||
|
||||
go disp.Run()
|
||||
var (
|
||||
inhibitor *Inhibitor
|
||||
tmpl *template.Template
|
||||
disp *Dispatcher
|
||||
)
|
||||
defer disp.Stop()
|
||||
|
||||
router := route.New()
|
||||
build := func(nconf []*config.NotificationConfig) notify.Notifier {
|
||||
var (
|
||||
router = notify.Router{}
|
||||
fanouts = notify.Build(nconf, tmpl)
|
||||
)
|
||||
for name, fo := range fanouts {
|
||||
for i, n := range fo {
|
||||
n = notify.Retry(n)
|
||||
n = notify.Log(n, log.With("step", "retry"))
|
||||
n = notify.Dedup(notifies, n)
|
||||
n = notify.Log(n, log.With("step", "dedup"))
|
||||
|
||||
fo[i] = n
|
||||
}
|
||||
router[name] = fo
|
||||
}
|
||||
var n notify.Notifier = router
|
||||
|
||||
n = notify.Log(n, log.With("step", "route"))
|
||||
n = notify.Mute(silences, n)
|
||||
n = notify.Log(n, log.With("step", "silence"))
|
||||
n = notify.Mute(inhibitor, n)
|
||||
n = notify.Log(n, log.With("step", "inhibit"))
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
reload := func() (err error) {
|
||||
log.With("file", *configFile).Infof("Loading configuration file")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.With("file", *configFile).Errorf("Loading configuration file failed")
|
||||
}
|
||||
}()
|
||||
|
||||
conf, err := config.LoadFile(*configFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tmpl, err = template.FromGlobs(conf.Templates...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
disp.Stop()
|
||||
|
||||
inhibitor = NewInhibitor(alerts, conf.InhibitRules)
|
||||
disp = NewDispatcher(alerts, conf.Routes, build(conf.NotificationConfigs))
|
||||
|
||||
go disp.Run()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := reload(); err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
router := route.New()
|
||||
NewAPI(router.WithPrefix("/api/v1"), alerts, silences)
|
||||
|
||||
go http.ListenAndServe(*listenAddress, router)
|
||||
|
@ -174,28 +139,11 @@ func main() {
|
|||
|
||||
go func() {
|
||||
for range hup {
|
||||
if err := reloadConfig(*configFile, disp, types.ReloadFunc(build), inhibitor, tmpl); err != nil {
|
||||
log.Errorf("Couldn't load configuration (-config.file=%s): %v", *configFile, err)
|
||||
}
|
||||
reload()
|
||||
}
|
||||
}()
|
||||
|
||||
<-term
|
||||
|
||||
log.Infoln("Received SIGTERM, exiting gracefully...")
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
func reloadConfig(filename string, rls ...types.Reloadable) error {
|
||||
log.Infof("Loading configuration file %s", filename)
|
||||
|
||||
conf, err := config.LoadFile(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, rl := range rls {
|
||||
rl.ApplyConfig(conf)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -68,8 +68,3 @@ type Notifies interface {
|
|||
// Set several notifies at once. All or none must succeed.
|
||||
Set(ns ...*types.Notify) error
|
||||
}
|
||||
|
||||
type Config interface {
|
||||
// Reload initiates a configuration reload.
|
||||
Reload(...types.Reloadable) error
|
||||
}
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
package template
|
||||
|
||||
import (
|
||||
text_tmpl "html/template"
|
||||
html_tmpl "text/template"
|
||||
|
||||
"bytes"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
text_tmpl "html/template"
|
||||
html_tmpl "text/template"
|
||||
)
|
||||
|
||||
type Template struct {
|
||||
|
@ -16,6 +14,26 @@ type Template struct {
|
|||
html *html_tmpl.Template
|
||||
}
|
||||
|
||||
func FromGlobs(paths ...string) (*Template, error) {
|
||||
t := &Template{
|
||||
text: text_tmpl.New(""),
|
||||
html: html_tmpl.New(""),
|
||||
}
|
||||
var err error
|
||||
|
||||
for _, tp := range paths {
|
||||
if t.text, err = t.text.ParseGlob(tp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if t.html, err = t.html.ParseGlob(tp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
t.funcs(DefaultFuncs)
|
||||
return t, nil
|
||||
}
|
||||
|
||||
type FuncMap map[string]interface{}
|
||||
|
||||
var DefaultFuncs = FuncMap{
|
||||
|
@ -48,20 +66,3 @@ func (t *Template) ExecuteHTMLString(name string, data interface{}) (string, err
|
|||
func (t *Template) ExecuteHTML(w io.Writer, name string, data interface{}) error {
|
||||
return t.html.ExecuteTemplate(w, name, data)
|
||||
}
|
||||
|
||||
func (t *Template) ApplyConfig(conf *config.Config) {
|
||||
var (
|
||||
tt = text_tmpl.New("")
|
||||
ht = html_tmpl.New("")
|
||||
)
|
||||
|
||||
for _, tf := range conf.Templates {
|
||||
tt = text_tmpl.Must(tt.ParseGlob(tf))
|
||||
ht = html_tmpl.Must(ht.ParseGlob(tf))
|
||||
}
|
||||
|
||||
t.text = tt
|
||||
t.html = ht
|
||||
|
||||
t.funcs(DefaultFuncs)
|
||||
}
|
||||
|
|
|
@ -7,8 +7,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
)
|
||||
|
||||
type MultiError []error
|
||||
|
@ -21,18 +19,6 @@ func (e MultiError) Error() string {
|
|||
return strings.Join(es, "; ")
|
||||
}
|
||||
|
||||
// Reloadable is a component that can change its state based
|
||||
// on a new configuration.
|
||||
type Reloadable interface {
|
||||
ApplyConfig(*config.Config)
|
||||
}
|
||||
|
||||
type ReloadFunc func(*config.Config)
|
||||
|
||||
func (f ReloadFunc) ApplyConfig(cfg *config.Config) {
|
||||
f(cfg)
|
||||
}
|
||||
|
||||
// Alert wraps a model.Alert with additional information relevant
|
||||
// to internal of the Alertmanager.
|
||||
// The type is never exposed to external communication and the
|
||||
|
|
Loading…
Reference in New Issue