From 21de9ff88c7f2b312139715da676a7a71d084791 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 12 Feb 2019 17:42:11 +0100 Subject: [PATCH] Various improvements after code review Most importantly, `api.New` now takes an `Options` struct as an argument, which allows some other things done here as well: - Timout and concurrency limit are now in the options, streamlining the registration and the implementation of the limiting middleware. - A local registry is used for metrics, and the metrics used so far inside any of the api packages are using it now. The 'in flight' metric now contains the 'get' as a method label. I have also added a TODO to instrument other methods in the same way (otherwise, the label doesn't reall make sense, semantically). I have also added an explicit error counter for requests rejected because of the concurrency limit. (They also show up as 503s in the generic HTTP instrumentation (or they would, if v2 were instrumented, too), but those 503s might have a number of reasons, while users might want to alert on concurrency limit problems explicitly). Signed-off-by: beorn7 --- api/api.go | 221 +++++++++++++++++++++++---------------- api/v1/api.go | 63 +++++------ api/v1/api_test.go | 4 +- cmd/alertmanager/main.go | 23 ++-- 4 files changed, 177 insertions(+), 134 deletions(-) diff --git a/api/api.go b/api/api.go index 8241f730..bdebb177 100644 --- a/api/api.go +++ b/api/api.go @@ -14,6 +14,7 @@ package api import ( + "errors" "fmt" "net/http" "runtime" @@ -33,44 +34,87 @@ import ( "github.com/go-kit/kit/log" ) -var ( - requestsInFlight = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "alertmanager_http_get_requests_in_flight", - Help: "Current number of HTTP GET requests being processed.", - }) -) - -func init() { - prometheus.MustRegister(requestsInFlight) -} - // API represents all APIs of Alertmanager. type API struct { - v1 *apiv1.API - v2 *apiv2.API + v1 *apiv1.API + v2 *apiv2.API + requestsInFlight prometheus.Gauge + concurrencyLimitExceeded prometheus.Counter + timeout time.Duration + inFlightSem chan struct{} +} + +// Options for the creation of an API object. Alerts, Silences, and StatusFunc +// are mandatory to set. The zero value for everything else is a safe default. +type Options struct { + // Alerts to be used by the API. Mandatory. + Alerts provider.Alerts + // Silences to be used by the API. Mandatory. + Silences *silence.Silences + // StatusFunc is used be the API to retrieve the AlertStatus of an + // alert. Mandatory. + StatusFunc func(model.Fingerprint) types.AlertStatus + // Peer from the gossip cluster. If nil, no clustering will be used. + Peer *cluster.Peer + // Timeout for all HTTP connections. The zero value (and negative + // values) result in no timeout. + Timeout time.Duration + // Concurrency limit for GET requests. The zero value (and negative + // values) result in a limit of GOMAXPROCS or 8, whichever is + // larger. Status code 503 is served for GET requests that would exceed + // the concurrency limit. + Concurrency int + // Logger is used for logging, if nil, no logging will happen. + Logger log.Logger + // Registry is used to register Prometheus metrics. If nil, no metrics + // registration will happen. + Registry prometheus.Registerer +} + +func (o Options) validate() error { + if o.Alerts == nil { + return errors.New("mandatory field Alerts not set") + } + if o.Silences == nil { + return errors.New("mandatory field Silences not set") + } + if o.StatusFunc == nil { + return errors.New("mandatory field StatusFunc not set") + } + return nil } // New creates a new API object combining all API versions. -func New( - alerts provider.Alerts, - silences *silence.Silences, - sf func(model.Fingerprint) types.AlertStatus, - peer *cluster.Peer, - l log.Logger, -) (*API, error) { +func New(opts Options) (*API, error) { + if err := opts.validate(); err != nil { + return nil, fmt.Errorf("invalid API options: %s", err) + } + l := opts.Logger + if l == nil { + l = log.NewNopLogger() + } + concurrency := opts.Concurrency + if concurrency < 1 { + concurrency = runtime.GOMAXPROCS(0) + if concurrency < 8 { + concurrency = 8 + } + } + v1 := apiv1.New( - alerts, - silences, - sf, - peer, + opts.Alerts, + opts.Silences, + opts.StatusFunc, + opts.Peer, log.With(l, "version", "v1"), + opts.Registry, ) v2, err := apiv2.NewAPI( - alerts, - sf, - silences, - peer, + opts.Alerts, + opts.StatusFunc, + opts.Silences, + opts.Peer, log.With(l, "version", "v2"), ) @@ -78,9 +122,34 @@ func New( return nil, err } + // TODO(beorn7): For now, this hardcodes the method="get" label. Other + // methods should get the same instrumentation. + requestsInFlight := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "alertmanager_http_requests_in_flight", + Help: "Current number of HTTP requests being processed.", + ConstLabels: prometheus.Labels{"method": "get"}, + }) + concurrencyLimitExceeded := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_http_concurrency_limit_exceeded_total", + Help: "Total number of times an HTTP request failed because the concurrency limit was reached.", + ConstLabels: prometheus.Labels{"method": "get"}, + }) + if opts.Registry != nil { + if err := opts.Registry.Register(requestsInFlight); err != nil { + return nil, err + } + if err := opts.Registry.Register(concurrencyLimitExceeded); err != nil { + return nil, err + } + } + return &API{ - v1: v1, - v2: v2, + v1: v1, + v2: v2, + requestsInFlight: requestsInFlight, + concurrencyLimitExceeded: concurrencyLimitExceeded, + timeout: opts.Timeout, + inFlightSem: make(chan struct{}, concurrency), }, nil } @@ -88,26 +157,15 @@ func New( // APIv2 works on the http.Handler level, this method also creates a new // http.ServeMux and then uses it to register both the provided router (to // handle "/") and APIv2 (to handle "/api/v2"). The method returns -// the newly created http.ServeMux. -// -// If the provided value for timeout is positive, it is enforced for all HTTP -// requests. (Negative or zero results in no timeout at all.) -// -// If the provided value for concurrency is positive, it limits the number of -// concurrently processed GET requests. Otherwise, the number of concurrently -// processed GET requests is limited to GOMAXPROCS or 8, whatever is -// larger. Status code 503 is served for GET requests that would exceed the -// concurrency limit. -func (api *API) Register( - r *route.Router, routePrefix string, - timeout time.Duration, concurrency int, -) *http.ServeMux { - limiter := makeLimiter(timeout, concurrency) - +// the newly created http.ServeMux. If a timeout has been set on construction of +// API, it is enforced for all HTTP request going through this mux. The same is +// true for the concurrency limit, with the exception that it is only applied to +// GET requests. +func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux { api.v1.Register(r.WithPrefix("/api/v1")) mux := http.NewServeMux() - mux.Handle("/", limiter(r)) + mux.Handle("/", api.limitHandler(r)) apiPrefix := "" if routePrefix != "/" { @@ -116,10 +174,10 @@ func (api *API) Register( // TODO(beorn7): HTTP instrumentation is only in place for Router. Since // /api/v2 works on the Handler level, it is currently not instrumented // at all (with the exception of requestsInFlight, which is handled in - // the Limiter below). + // limitHandler below). mux.Handle( apiPrefix+"/api/v2/", - limiter(http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler)), + api.limitHandler(http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler)), ) return mux @@ -134,49 +192,30 @@ func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration) error { return api.v2.Update(cfg, resolveTimeout) } -// makeLimiter returns an HTTP middleware that sets a timeout for HTTP requests -// and also limits the number of concurrently processed GET requests to the -// given number. -// -// If timeout is < 1, no timeout is enforced. -// -// If concurrency is < 1, GOMAXPROCS is used as the concurrency limit but at least 8. -// -// The returned middleware serves http.StatusServiceUnavailable (503) for requests that -// would exceed the number. -func makeLimiter(timeout time.Duration, concurrency int) func(http.Handler) http.Handler { - if concurrency < 1 { - concurrency = runtime.GOMAXPROCS(0) - if concurrency < 8 { - concurrency = 8 - } - } - inFlightSem := make(chan struct{}, concurrency) - - return func(h http.Handler) http.Handler { - concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { - if req.Method == http.MethodGet { // Only limit concurrency of GETs. - select { - case inFlightSem <- struct{}{}: // All good, carry on. - requestsInFlight.Inc() - defer func() { - <-inFlightSem - requestsInFlight.Dec() - }() - default: - http.Error(rsp, fmt.Sprintf( - "Limit of concurrent GET requests reached (%d), try again later.\n", concurrency, - ), http.StatusServiceUnavailable) - return - } +func (api *API) limitHandler(h http.Handler) http.Handler { + concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { + if req.Method == http.MethodGet { // Only limit concurrency of GETs. + select { + case api.inFlightSem <- struct{}{}: // All good, carry on. + api.requestsInFlight.Inc() + defer func() { + <-api.inFlightSem + api.requestsInFlight.Dec() + }() + default: + api.concurrencyLimitExceeded.Inc() + http.Error(rsp, fmt.Sprintf( + "Limit of concurrent GET requests reached (%d), try again later.\n", cap(api.inFlightSem), + ), http.StatusServiceUnavailable) + return } - h.ServeHTTP(rsp, req) - }) - if timeout <= 0 { - return concLimiter } - return http.TimeoutHandler(concLimiter, timeout, fmt.Sprintf( - "Exceeded configured timeout of %v.\n", timeout, - )) + h.ServeHTTP(rsp, req) + }) + if api.timeout <= 0 { + return concLimiter } + return http.TimeoutHandler(concLimiter, api.timeout, fmt.Sprintf( + "Exceeded configured timeout of %v.\n", api.timeout, + )) } diff --git a/api/v1/api.go b/api/v1/api.go index 221781c3..56245996 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -41,28 +41,6 @@ import ( "github.com/prometheus/alertmanager/types" ) -var ( - numReceivedAlerts = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "alertmanager", - Name: "alerts_received_total", - Help: "The total number of received alerts.", - }, []string{"status"}) - - numInvalidAlerts = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "alertmanager", - Name: "alerts_invalid_total", - Help: "The total number of received alerts that were invalid.", - }) -) - -func init() { - numReceivedAlerts.WithLabelValues("firing") - numReceivedAlerts.WithLabelValues("resolved") - - prometheus.MustRegister(numReceivedAlerts) - prometheus.MustRegister(numInvalidAlerts) -} - var corsHeaders = map[string]string{ "Access-Control-Allow-Headers": "Accept, Authorization, Content-Type, Origin", "Access-Control-Allow-Methods": "GET, DELETE, OPTIONS", @@ -98,6 +76,9 @@ type API struct { peer *cluster.Peer logger log.Logger + numReceivedAlerts *prometheus.CounterVec + numInvalidAlerts prometheus.Counter + getAlertStatus getAlertStatusFn mtx sync.RWMutex @@ -112,18 +93,38 @@ func New( sf getAlertStatusFn, peer *cluster.Peer, l log.Logger, + r prometheus.Registerer, ) *API { if l == nil { l = log.NewNopLogger() } + numReceivedAlerts := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "alertmanager", + Name: "alerts_received_total", + Help: "The total number of received alerts.", + }, []string{"status"}) + + numInvalidAlerts := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "alertmanager", + Name: "alerts_invalid_total", + Help: "The total number of received alerts that were invalid.", + }) + numReceivedAlerts.WithLabelValues("firing") + numReceivedAlerts.WithLabelValues("resolved") + if r != nil { + r.MustRegister(numReceivedAlerts, numInvalidAlerts) + } + return &API{ - alerts: alerts, - silences: silences, - getAlertStatus: sf, - uptime: time.Now(), - peer: peer, - logger: l, + alerts: alerts, + silences: silences, + getAlertStatus: sf, + uptime: time.Now(), + peer: peer, + logger: l, + numReceivedAlerts: numReceivedAlerts, + numInvalidAlerts: numInvalidAlerts, } } @@ -452,9 +453,9 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* alert.EndsAt = now.Add(resolveTimeout) } if alert.EndsAt.After(time.Now()) { - numReceivedAlerts.WithLabelValues("firing").Inc() + api.numReceivedAlerts.WithLabelValues("firing").Inc() } else { - numReceivedAlerts.WithLabelValues("resolved").Inc() + api.numReceivedAlerts.WithLabelValues("resolved").Inc() } } @@ -468,7 +469,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* if err := a.Validate(); err != nil { validationErrs.Add(err) - numInvalidAlerts.Inc() + api.numInvalidAlerts.Inc() continue } validAlerts = append(validAlerts, a) diff --git a/api/v1/api_test.go b/api/v1/api_test.go index a2991ab4..b83ea007 100644 --- a/api/v1/api_test.go +++ b/api/v1/api_test.go @@ -132,7 +132,7 @@ func TestAddAlerts(t *testing.T) { } alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err) - api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil) + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil) r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) w := httptest.NewRecorder() @@ -259,7 +259,7 @@ func TestListAlerts(t *testing.T) { }, } { alertsProvider := newFakeAlerts(alerts, tc.err) - api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil) + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil) api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil) r, err := http.NewRequest("GET", "/api/v1/alerts", nil) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index a3784d4d..b799edd7 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -127,8 +127,8 @@ func run() int { externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String() routePrefix = kingpin.Flag("web.route-prefix", "Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url.").String() listenAddress = kingpin.Flag("web.listen-address", "Address to listen on for the web interface and API.").Default(":9093").String() - getConcurrency = kingpin.Flag("web.get-concurrency", "Maximum number of GET requests processed concurrently. If negative or zero, the limit is GOMAXPROC or 8, whatever is larger.").Default("0").Int() - getTimeout = kingpin.Flag("web.timeout", "Timeout for HTTP requests. If negative or zero, no timeout is set.").Default("0").Duration() + getConcurrency = kingpin.Flag("web.get-concurrency", "Maximum number of GET requests processed concurrently. If negative or zero, the limit is GOMAXPROC or 8, whichever is larger.").Default("0").Int() + httpTimeout = kingpin.Flag("web.timeout", "Timeout for HTTP requests. If negative or zero, no timeout is set.").Default("0").Duration() clusterBindAddr = kingpin.Flag("cluster.listen-address", "Listen address for cluster."). Default(defaultClusterAddr).String() @@ -270,13 +270,16 @@ func run() int { ) defer disp.Stop() - api, err := api.New( - alerts, - silences, - marker.Status, - peer, - log.With(logger, "component", "api"), - ) + api, err := api.New(api.Options{ + Alerts: alerts, + Silences: silences, + StatusFunc: marker.Status, + Peer: peer, + Timeout: *httpTimeout, + Concurrency: *getConcurrency, + Logger: log.With(logger, "component", "api"), + Registry: prometheus.DefaultRegisterer, + }) if err != nil { level.Error(logger).Log("err", fmt.Errorf("failed to create API: %v", err.Error())) @@ -376,7 +379,7 @@ func run() int { ui.Register(router, webReload, logger) - mux := api.Register(router, *routePrefix, *getTimeout, *getConcurrency) + mux := api.Register(router, *routePrefix) srv := http.Server{Addr: *listenAddress, Handler: mux} srvc := make(chan struct{})