diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 879210827..e8302a03a 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -47,6 +47,7 @@ var cfg = struct { storage local.MemorySeriesStorageOptions localStorageEngine string notifier notifier.Options + notifierTimeout time.Duration queryEngine promql.EngineOptions web web.Options remote remote.Options @@ -228,7 +229,7 @@ func init() { "The capacity of the queue for pending alert manager notifications.", ) cfg.fs.DurationVar( - &cfg.notifier.Timeout, "alertmanager.timeout", 10*time.Second, + &cfg.notifierTimeout, "alertmanager.timeout", 10*time.Second, "Alert manager HTTP API timeout.", ) @@ -283,7 +284,6 @@ func parse(args []string) error { if err := validateAlertmanagerURL(u); err != nil { return err } - cfg.notifier.AlertmanagerURLs = cfg.alertmanagerURLs.slice() } cfg.remote.InfluxdbPassword = os.Getenv("INFLUXDB_PW") diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 363ffe44b..07608ee3e 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -18,6 +18,7 @@ import ( "flag" "fmt" _ "net/http/pprof" // Comment this line to disable pprof endpoint. + "net/url" "os" "os/signal" "syscall" @@ -25,6 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" + "github.com/prometheus/common/model" "github.com/prometheus/common/version" "golang.org/x/net/context" @@ -259,6 +261,31 @@ func reloadConfig(filename string, rls ...Reloadable) (err error) { return fmt.Errorf("couldn't load configuration (-config.file=%s): %v", filename, err) } + // Add AlertmanagerConfigs for legacy Alertmanager URL flags. + for us := range cfg.alertmanagerURLs { + u, err := url.Parse(us) + if err != nil { + return err + } + acfg := &config.AlertmanagersConfig{ + Scheme: u.Scheme, + PathPrefix: u.Path, + Timeout: cfg.notifierTimeout, + ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{ + StaticConfigs: []*config.TargetGroup{ + { + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue(u.Host), + }, + }, + }, + }, + }, + } + conf.AlertingConfig.AlertmanagersConfigs = append(conf.AlertingConfig.AlertmanagersConfigs, acfg) + } + failed := false for _, rl := range rls { if err := rl.ApplyConfig(conf); err != nil { diff --git a/notifier/notifier.go b/notifier/notifier.go index 8f37cc8a5..d34072fd7 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -17,7 +17,10 @@ import ( "bytes" "encoding/json" "fmt" + "net" "net/http" + "net/url" + "path" "strings" "sync" "sync/atomic" @@ -30,7 +33,9 @@ import ( "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/relabel" + "github.com/prometheus/prometheus/retrieval" ) const ( @@ -62,15 +67,16 @@ type Notifier struct { dropped prometheus.Counter queueLength prometheus.Gauge queueCapacity prometheus.Metric + + alertmanagers []*alertmanagerSet + cancelDiscovery func() } // Options are the configurable parameters of a Handler. type Options struct { - AlertmanagerURLs []string - QueueCapacity int - Timeout time.Duration - ExternalLabels model.LabelSet - RelabelConfigs []*config.RelabelConfig + QueueCapacity int + ExternalLabels model.LabelSet + RelabelConfigs []*config.RelabelConfig } // New constructs a new Notifier. @@ -139,6 +145,31 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs + + amSets := []*alertmanagerSet{} + ctx, cancel := context.WithCancel(n.ctx) + + for _, cfg := range conf.AlertingConfig.AlertmanagersConfigs { + ams, err := newAlertmanagerSet(cfg) + if err != nil { + return err + } + amSets = append(amSets, ams) + } + + // After all sets were created successfully, start them and cancel the + // old ones. + for _, ams := range amSets { + go ams.ts.Run(ctx) + ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig)) + } + if n.cancelDiscovery != nil { + n.cancelDiscovery() + } + + n.cancelDiscovery = cancel + n.alertmanagers = amSets + return nil } @@ -170,13 +201,6 @@ func (n *Notifier) nextBatch() []*model.Alert { // Run dispatches notifications continuously. func (n *Notifier) Run() { - numAMs := len(n.opts.AlertmanagerURLs) - // Just warn once in the beginning to prevent noisy logs. - if numAMs == 0 { - log.Warnf("No AlertManagers configured, not dispatching any alerts") - return - } - for { select { case <-n.ctx.Done(): @@ -185,17 +209,7 @@ func (n *Notifier) Run() { } alerts := n.nextBatch() - if numAMs > 0 { - - if len(alerts) > 0 { - numErrors := n.sendAll(alerts...) - // Increment the dropped counter if we could not send - // successfully to a single AlertManager. - if numErrors == numAMs { - n.dropped.Add(float64(len(alerts))) - } - } - } else { + if !n.sendAll(alerts...) { n.dropped.Add(float64(len(alerts))) } // If the queue still has items left, kick off the next iteration. @@ -267,58 +281,54 @@ func (n *Notifier) setMore() { } } -func postURL(u string) string { - return strings.TrimRight(u, "/") + alertPushEndpoint -} - // sendAll sends the alerts to all configured Alertmanagers at concurrently. -// It returns the number of sends that have failed. -func (n *Notifier) sendAll(alerts ...*model.Alert) int { +// It returns the number of sends that have failed and true if all failed. +func (n *Notifier) sendAll(alerts ...*model.Alert) bool { begin := time.Now() b, err := json.Marshal(alerts) if err != nil { log.Errorf("Encoding alerts failed: %s", err) - return len(n.opts.AlertmanagerURLs) + return false } - ctx, _ := context.WithTimeout(context.Background(), n.opts.Timeout) - send := func(u string) error { - resp, err := ctxhttp.Post(ctx, http.DefaultClient, postURL(u), contentTypeJSON, bytes.NewReader(b)) - if err != nil { - return err - } - defer resp.Body.Close() - - // Any HTTP status 2xx is OK. - if resp.StatusCode/100 != 2 { - return fmt.Errorf("bad response status %v", resp.Status) - } - return err - } + n.mtx.RLock() + amSets := n.alertmanagers + n.mtx.RUnlock() var ( - wg sync.WaitGroup - numErrors uint64 + wg sync.WaitGroup + numSuccess uint64 ) - for _, u := range n.opts.AlertmanagerURLs { - wg.Add(1) + for _, ams := range amSets { + ams.mtx.RLock() - go func(u string) { - if err := send(u); err != nil { - log.With("alertmanager", u).With("count", fmt.Sprintf("%d", len(alerts))).Errorf("Error sending alerts: %s", err) - n.errors.WithLabelValues(u).Inc() - atomic.AddUint64(&numErrors, 1) - } - n.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds()) - n.sent.WithLabelValues(u).Add(float64(len(alerts))) + for _, am := range ams.ams { + wg.Add(1) - wg.Done() - }(u) + ctx, cancel := context.WithTimeout(n.ctx, ams.cfg.Timeout) + defer cancel() + + go func(am alertmanager) { + u := am.url() + + if err := am.send(ctx, ams.client, b); err != nil { + log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err) + n.errors.WithLabelValues(u).Inc() + } else { + atomic.AddUint64(&numSuccess, 1) + } + n.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds()) + n.sent.WithLabelValues(u).Add(float64(len(alerts))) + + wg.Done() + }(am) + } + ams.mtx.RUnlock() } wg.Wait() - return int(numErrors) + return numSuccess > 0 } // Stop shuts down the notification handler. @@ -350,3 +360,161 @@ func (n *Notifier) Collect(ch chan<- prometheus.Metric) { ch <- n.queueLength ch <- n.queueCapacity } + +// alertmanager holds all necessary information to send alerts +// to an Alertmanager endpoint. +type alertmanager struct { + plainURL string // test injection hook + labels model.LabelSet +} + +const pathLabel = "__alerts_path__" + +func (a alertmanager) url() string { + if a.plainURL != "" { + return a.plainURL + } + u := &url.URL{ + Scheme: string(a.labels[model.SchemeLabel]), + Host: string(a.labels[model.AddressLabel]), + Path: string(a.labels[pathLabel]), + } + return u.String() +} + +func (a alertmanager) send(ctx context.Context, c *http.Client, b []byte) error { + resp, err := ctxhttp.Post(ctx, c, a.url(), contentTypeJSON, bytes.NewReader(b)) + if err != nil { + return err + } + defer resp.Body.Close() + + // Any HTTP status 2xx is OK. + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %v", resp.Status) + } + return err +} + +// alertmanagerSet contains a set of Alertmanagers discovered via a group of service +// discovery definitions that have a common configuration on how alerts should be sent. +type alertmanagerSet struct { + ts *discovery.TargetSet + cfg *config.AlertmanagersConfig + client *http.Client + + mtx sync.RWMutex + ams []alertmanager +} + +func newAlertmanagerSet(cfg *config.AlertmanagersConfig) (*alertmanagerSet, error) { + client, err := retrieval.NewHTTPClient(cfg.HTTPClientConfig) + if err != nil { + return nil, err + } + s := &alertmanagerSet{ + client: client, + cfg: cfg, + } + s.ts = discovery.NewTargetSet(s) + + return s, nil +} + +// Sync extracts a deduplicated set of Alertmanager endpoints from a list +// of target groups definitions. +func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) { + all := []alertmanager{} + + for _, tg := range tgs { + ams, err := alertmanagerFromGroup(tg, s.cfg) + if err != nil { + log.With("err", err).Error("generating discovered Alertmanagers failed") + continue + } + all = append(all, ams...) + } + + s.mtx.Lock() + defer s.mtx.Unlock() + // Set new Alertmanagers and deduplicate them along their unique URL. + s.ams = []alertmanager{} + seen := map[string]struct{}{} + + for _, am := range all { + us := am.url() + if _, ok := seen[us]; ok { + continue + } + + seen[us] = struct{}{} + s.ams = append(s.ams, am) + } +} + +func postPath(pre string) string { + return path.Join("/", pre, alertPushEndpoint) +} + +// alertmanagersFromGroup extracts a list of alertmanagers from a target group and an associcated +// AlertmanagerConfig. +func alertmanagerFromGroup(tg *config.TargetGroup, cfg *config.AlertmanagersConfig) ([]alertmanager, error) { + var res []alertmanager + + for _, lset := range tg.Targets { + // Set configured scheme as the initial scheme label for overwrite. + lset[model.SchemeLabel] = model.LabelValue(cfg.Scheme) + lset[pathLabel] = model.LabelValue(postPath(cfg.PathPrefix)) + + // Combine target labels with target group labels. + for ln, lv := range tg.Labels { + if _, ok := lset[ln]; !ok { + lset[ln] = lv + } + } + lset := relabel.Process(lset, cfg.RelabelConfigs...) + if lset == nil { + continue + } + + // addPort checks whether we should add a default port to the address. + // If the address is not valid, we don't append a port either. + addPort := func(s string) bool { + // If we can split, a port exists and we don't have to add one. + if _, _, err := net.SplitHostPort(s); err == nil { + return false + } + // If adding a port makes it valid, the previous error + // was not due to an invalid address and we can append a port. + _, _, err := net.SplitHostPort(s + ":1234") + return err == nil + } + // If it's an address with no trailing port, infer it based on the used scheme. + if addr := string(lset[model.AddressLabel]); addPort(addr) { + // Addresses reaching this point are already wrapped in [] if necessary. + switch lset[model.SchemeLabel] { + case "http", "": + addr = addr + ":80" + case "https": + addr = addr + ":443" + default: + return nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme) + } + lset[model.AddressLabel] = model.LabelValue(addr) + } + if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil { + return nil, err + } + + // Meta labels are deleted after relabelling. Other internal labels propagate to + // the target which decides whether they will be part of their label set. + for ln := range lset { + if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { + delete(lset, ln) + } + } + + res = append(res, alertmanager{labels: lset}) + } + return res, nil +} diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index efe248a9e..7664ee3fe 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -27,34 +27,34 @@ import ( "github.com/prometheus/prometheus/config" ) -func TestPostURL(t *testing.T) { +func TestPostPath(t *testing.T) { var cases = []struct { in, out string }{ { - in: "http://localhost:9093", - out: "http://localhost:9093/api/v1/alerts", + in: "", + out: "/api/v1/alerts", }, { - in: "http://localhost:9093/", - out: "http://localhost:9093/api/v1/alerts", + in: "/", + out: "/api/v1/alerts", }, { - in: "http://localhost:9093/prefix", - out: "http://localhost:9093/prefix/api/v1/alerts", + in: "/prefix", + out: "/prefix/api/v1/alerts", }, { - in: "http://localhost:9093/prefix//", - out: "http://localhost:9093/prefix/api/v1/alerts", + in: "/prefix//", + out: "/prefix/api/v1/alerts", }, { - in: "http://localhost:9093/prefix//", - out: "http://localhost:9093/prefix/api/v1/alerts", + in: "prefix//", + out: "/prefix/api/v1/alerts", }, } for _, c := range cases { - if res := postURL(c.in); res != c.out { - t.Errorf("Expected post URL %q for %q but got %q", c.out, c.in, res) + if res := postPath(c.in); res != c.out { + t.Errorf("Expected post path %q for %q but got %q", c.out, c.in, res) } } } @@ -123,9 +123,6 @@ func TestHandlerSendAll(t *testing.T) { ) f := func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != alertPushEndpoint { - t.Fatalf("Bad endpoint %q used, expected %q", r.URL.Path, alertPushEndpoint) - } defer r.Body.Close() var alerts model.Alerts @@ -150,9 +147,15 @@ func TestHandlerSendAll(t *testing.T) { defer server1.Close() defer server2.Close() - h := New(&Options{ - AlertmanagerURLs: []string{server1.URL, server2.URL}, - Timeout: time.Minute, + h := New(&Options{}) + h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{ + ams: []alertmanager{ + {plainURL: server1.URL}, + {plainURL: server2.URL}, + }, + cfg: &config.AlertmanagersConfig{ + Timeout: time.Second, + }, }) for i := range make([]struct{}, maxBatchSize) { @@ -170,18 +173,18 @@ func TestHandlerSendAll(t *testing.T) { status1 = http.StatusOK status2 = http.StatusOK - if ne := h.sendAll(h.queue...); ne != 0 { - t.Fatalf("Unexpected number of failed sends: %d", ne) + if !h.sendAll(h.queue...) { + t.Fatalf("all sends failed unexpectedly") } status1 = http.StatusNotFound - if ne := h.sendAll(h.queue...); ne != 1 { - t.Fatalf("Unexpected number of failed sends: %d", ne) + if !h.sendAll(h.queue...) { + t.Fatalf("all sends failed unexpectedly") } status2 = http.StatusInternalServerError - if ne := h.sendAll(h.queue...); ne != 2 { - t.Fatalf("Unexpected number of failed sends: %d", ne) + if h.sendAll(h.queue...) { + t.Fatalf("all sends succeeded unexpectedly") } } @@ -305,9 +308,15 @@ func TestHandlerQueueing(t *testing.T) { })) h := New(&Options{ - AlertmanagerURLs: []string{server.URL}, - Timeout: time.Second, - QueueCapacity: 3 * maxBatchSize, + QueueCapacity: 3 * maxBatchSize, + }) + h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{ + ams: []alertmanager{ + {plainURL: server.URL}, + }, + cfg: &config.AlertmanagersConfig{ + Timeout: time.Second, + }, }) var alerts model.Alerts