diff --git a/api/api.go b/api/api.go index 7d9d309c..bdebb177 100644 --- a/api/api.go +++ b/api/api.go @@ -14,7 +14,10 @@ package api import ( + "errors" + "fmt" "net/http" + "runtime" "time" apiv1 "github.com/prometheus/alertmanager/api/v1" @@ -24,6 +27,7 @@ import ( "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/silence" "github.com/prometheus/alertmanager/types" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/common/route" @@ -32,31 +36,85 @@ import ( // API represents all APIs of Alertmanager. type API struct { - v1 *apiv1.API - v2 *apiv2.API + v1 *apiv1.API + v2 *apiv2.API + requestsInFlight prometheus.Gauge + concurrencyLimitExceeded prometheus.Counter + timeout time.Duration + inFlightSem chan struct{} +} + +// Options for the creation of an API object. Alerts, Silences, and StatusFunc +// are mandatory to set. The zero value for everything else is a safe default. +type Options struct { + // Alerts to be used by the API. Mandatory. + Alerts provider.Alerts + // Silences to be used by the API. Mandatory. + Silences *silence.Silences + // StatusFunc is used be the API to retrieve the AlertStatus of an + // alert. Mandatory. + StatusFunc func(model.Fingerprint) types.AlertStatus + // Peer from the gossip cluster. If nil, no clustering will be used. + Peer *cluster.Peer + // Timeout for all HTTP connections. The zero value (and negative + // values) result in no timeout. + Timeout time.Duration + // Concurrency limit for GET requests. The zero value (and negative + // values) result in a limit of GOMAXPROCS or 8, whichever is + // larger. Status code 503 is served for GET requests that would exceed + // the concurrency limit. + Concurrency int + // Logger is used for logging, if nil, no logging will happen. + Logger log.Logger + // Registry is used to register Prometheus metrics. If nil, no metrics + // registration will happen. + Registry prometheus.Registerer +} + +func (o Options) validate() error { + if o.Alerts == nil { + return errors.New("mandatory field Alerts not set") + } + if o.Silences == nil { + return errors.New("mandatory field Silences not set") + } + if o.StatusFunc == nil { + return errors.New("mandatory field StatusFunc not set") + } + return nil } // New creates a new API object combining all API versions. -func New( - alerts provider.Alerts, - silences *silence.Silences, - sf func(model.Fingerprint) types.AlertStatus, - peer *cluster.Peer, - l log.Logger, -) (*API, error) { +func New(opts Options) (*API, error) { + if err := opts.validate(); err != nil { + return nil, fmt.Errorf("invalid API options: %s", err) + } + l := opts.Logger + if l == nil { + l = log.NewNopLogger() + } + concurrency := opts.Concurrency + if concurrency < 1 { + concurrency = runtime.GOMAXPROCS(0) + if concurrency < 8 { + concurrency = 8 + } + } + v1 := apiv1.New( - alerts, - silences, - sf, - peer, + opts.Alerts, + opts.Silences, + opts.StatusFunc, + opts.Peer, log.With(l, "version", "v1"), + opts.Registry, ) v2, err := apiv2.NewAPI( - alerts, - sf, - silences, - peer, + opts.Alerts, + opts.StatusFunc, + opts.Silences, + opts.Peer, log.With(l, "version", "v2"), ) @@ -64,24 +122,63 @@ func New( return nil, err } + // TODO(beorn7): For now, this hardcodes the method="get" label. Other + // methods should get the same instrumentation. + requestsInFlight := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "alertmanager_http_requests_in_flight", + Help: "Current number of HTTP requests being processed.", + ConstLabels: prometheus.Labels{"method": "get"}, + }) + concurrencyLimitExceeded := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_http_concurrency_limit_exceeded_total", + Help: "Total number of times an HTTP request failed because the concurrency limit was reached.", + ConstLabels: prometheus.Labels{"method": "get"}, + }) + if opts.Registry != nil { + if err := opts.Registry.Register(requestsInFlight); err != nil { + return nil, err + } + if err := opts.Registry.Register(concurrencyLimitExceeded); err != nil { + return nil, err + } + } + return &API{ - v1: v1, - v2: v2, + v1: v1, + v2: v2, + requestsInFlight: requestsInFlight, + concurrencyLimitExceeded: concurrencyLimitExceeded, + timeout: opts.Timeout, + inFlightSem: make(chan struct{}, concurrency), }, nil } -// Register all APIs with the given router and return a mux. +// Register all APIs. It registers APIv1 with the provided router directly. As +// APIv2 works on the http.Handler level, this method also creates a new +// http.ServeMux and then uses it to register both the provided router (to +// handle "/") and APIv2 (to handle "/api/v2"). The method returns +// the newly created http.ServeMux. If a timeout has been set on construction of +// API, it is enforced for all HTTP request going through this mux. The same is +// true for the concurrency limit, with the exception that it is only applied to +// GET requests. func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux { api.v1.Register(r.WithPrefix("/api/v1")) mux := http.NewServeMux() - mux.Handle("/", r) + mux.Handle("/", api.limitHandler(r)) apiPrefix := "" if routePrefix != "/" { apiPrefix = routePrefix } - mux.Handle(apiPrefix+"/api/v2/", http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler)) + // TODO(beorn7): HTTP instrumentation is only in place for Router. Since + // /api/v2 works on the Handler level, it is currently not instrumented + // at all (with the exception of requestsInFlight, which is handled in + // limitHandler below). + mux.Handle( + apiPrefix+"/api/v2/", + api.limitHandler(http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler)), + ) return mux } @@ -94,3 +191,31 @@ func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration) error { return api.v2.Update(cfg, resolveTimeout) } + +func (api *API) limitHandler(h http.Handler) http.Handler { + concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { + if req.Method == http.MethodGet { // Only limit concurrency of GETs. + select { + case api.inFlightSem <- struct{}{}: // All good, carry on. + api.requestsInFlight.Inc() + defer func() { + <-api.inFlightSem + api.requestsInFlight.Dec() + }() + default: + api.concurrencyLimitExceeded.Inc() + http.Error(rsp, fmt.Sprintf( + "Limit of concurrent GET requests reached (%d), try again later.\n", cap(api.inFlightSem), + ), http.StatusServiceUnavailable) + return + } + } + h.ServeHTTP(rsp, req) + }) + if api.timeout <= 0 { + return concLimiter + } + return http.TimeoutHandler(concLimiter, api.timeout, fmt.Sprintf( + "Exceeded configured timeout of %v.\n", api.timeout, + )) +} diff --git a/api/v1/api.go b/api/v1/api.go index a8e2cc52..56245996 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -41,28 +41,6 @@ import ( "github.com/prometheus/alertmanager/types" ) -var ( - numReceivedAlerts = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "alertmanager", - Name: "alerts_received_total", - Help: "The total number of received alerts.", - }, []string{"status"}) - - numInvalidAlerts = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "alertmanager", - Name: "alerts_invalid_total", - Help: "The total number of received alerts that were invalid.", - }) -) - -func init() { - numReceivedAlerts.WithLabelValues("firing") - numReceivedAlerts.WithLabelValues("resolved") - - prometheus.MustRegister(numReceivedAlerts) - prometheus.MustRegister(numInvalidAlerts) -} - var corsHeaders = map[string]string{ "Access-Control-Allow-Headers": "Accept, Authorization, Content-Type, Origin", "Access-Control-Allow-Methods": "GET, DELETE, OPTIONS", @@ -98,6 +76,9 @@ type API struct { peer *cluster.Peer logger log.Logger + numReceivedAlerts *prometheus.CounterVec + numInvalidAlerts prometheus.Counter + getAlertStatus getAlertStatusFn mtx sync.RWMutex @@ -112,18 +93,38 @@ func New( sf getAlertStatusFn, peer *cluster.Peer, l log.Logger, + r prometheus.Registerer, ) *API { if l == nil { l = log.NewNopLogger() } + numReceivedAlerts := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "alertmanager", + Name: "alerts_received_total", + Help: "The total number of received alerts.", + }, []string{"status"}) + + numInvalidAlerts := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "alertmanager", + Name: "alerts_invalid_total", + Help: "The total number of received alerts that were invalid.", + }) + numReceivedAlerts.WithLabelValues("firing") + numReceivedAlerts.WithLabelValues("resolved") + if r != nil { + r.MustRegister(numReceivedAlerts, numInvalidAlerts) + } + return &API{ - alerts: alerts, - silences: silences, - getAlertStatus: sf, - uptime: time.Now(), - peer: peer, - logger: l, + alerts: alerts, + silences: silences, + getAlertStatus: sf, + uptime: time.Now(), + peer: peer, + logger: l, + numReceivedAlerts: numReceivedAlerts, + numInvalidAlerts: numInvalidAlerts, } } @@ -253,6 +254,7 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) { // are no alerts present res = []*Alert{} matchers = []*labels.Matcher{} + ctx = r.Context() showActive, showInhibited bool showSilenced, showUnprocessed bool @@ -326,11 +328,13 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) { defer alerts.Close() api.mtx.RLock() - // TODO(fabxc): enforce a sensible timeout. for a := range alerts.Next() { if err = alerts.Err(); err != nil { break } + if err = ctx.Err(); err != nil { + break + } routes := api.route.Match(a.Labels) receivers := make([]string, 0, len(routes)) @@ -449,9 +453,9 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* alert.EndsAt = now.Add(resolveTimeout) } if alert.EndsAt.After(time.Now()) { - numReceivedAlerts.WithLabelValues("firing").Inc() + api.numReceivedAlerts.WithLabelValues("firing").Inc() } else { - numReceivedAlerts.WithLabelValues("resolved").Inc() + api.numReceivedAlerts.WithLabelValues("resolved").Inc() } } @@ -465,7 +469,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* if err := a.Validate(); err != nil { validationErrs.Add(err) - numInvalidAlerts.Inc() + api.numInvalidAlerts.Inc() continue } validAlerts = append(validAlerts, a) diff --git a/api/v1/api_test.go b/api/v1/api_test.go index a2991ab4..b83ea007 100644 --- a/api/v1/api_test.go +++ b/api/v1/api_test.go @@ -132,7 +132,7 @@ func TestAddAlerts(t *testing.T) { } alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err) - api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil) + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil) r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) w := httptest.NewRecorder() @@ -259,7 +259,7 @@ func TestListAlerts(t *testing.T) { }, } { alertsProvider := newFakeAlerts(alerts, tc.err) - api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil) + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil) api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil) r, err := http.NewRequest("GET", "/api/v1/alerts", nil) diff --git a/api/v2/api.go b/api/v2/api.go index b0e619ed..ae633342 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -195,6 +195,7 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re // are no alerts present res = open_api_models.GettableAlerts{} matchers = []*labels.Matcher{} + ctx = params.HTTPRequest.Context() ) if params.Filter != nil { @@ -224,11 +225,13 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re defer alerts.Close() api.mtx.RLock() - // TODO(fabxc): enforce a sensible timeout. for a := range alerts.Next() { if err = alerts.Err(); err != nil { break } + if err = ctx.Err(); err != nil { + break + } routes := api.route.Match(a.Labels) receivers := make([]*open_api_models.Receiver, 0, len(routes)) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 9d1f31c7..b799edd7 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -96,10 +96,11 @@ func init() { } func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { + handlerLabel := prometheus.Labels{"handler": handlerName} return promhttp.InstrumentHandlerDuration( - requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}), + requestDuration.MustCurryWith(handlerLabel), promhttp.InstrumentHandlerResponseSize( - responseSize.MustCurryWith(prometheus.Labels{"handler": handlerName}), + responseSize.MustCurryWith(handlerLabel), handler, ), ) @@ -123,9 +124,11 @@ func run() int { retention = kingpin.Flag("data.retention", "How long to keep data for.").Default("120h").Duration() alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration() - externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String() - routePrefix = kingpin.Flag("web.route-prefix", "Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url.").String() - listenAddress = kingpin.Flag("web.listen-address", "Address to listen on for the web interface and API.").Default(":9093").String() + externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String() + routePrefix = kingpin.Flag("web.route-prefix", "Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url.").String() + listenAddress = kingpin.Flag("web.listen-address", "Address to listen on for the web interface and API.").Default(":9093").String() + getConcurrency = kingpin.Flag("web.get-concurrency", "Maximum number of GET requests processed concurrently. If negative or zero, the limit is GOMAXPROC or 8, whichever is larger.").Default("0").Int() + httpTimeout = kingpin.Flag("web.timeout", "Timeout for HTTP requests. If negative or zero, no timeout is set.").Default("0").Duration() clusterBindAddr = kingpin.Flag("cluster.listen-address", "Listen address for cluster."). Default(defaultClusterAddr).String() @@ -267,13 +270,16 @@ func run() int { ) defer disp.Stop() - api, err := api.New( - alerts, - silences, - marker.Status, - peer, - log.With(logger, "component", "api"), - ) + api, err := api.New(api.Options{ + Alerts: alerts, + Silences: silences, + StatusFunc: marker.Status, + Peer: peer, + Timeout: *httpTimeout, + Concurrency: *getConcurrency, + Logger: log.With(logger, "component", "api"), + Registry: prometheus.DefaultRegisterer, + }) if err != nil { level.Error(logger).Log("err", fmt.Errorf("failed to create API: %v", err.Error()))