Various improvements after code review

Most importantly, `api.New` now takes an `Options` struct as an
argument, which allows some other things done here as well:

- Timout and concurrency limit are now in the options, streamlining
  the registration and the implementation of the limiting middleware.

- A local registry is used for metrics, and the metrics used so far
  inside any of the api packages are using it now.

The 'in flight' metric now contains the 'get' as a method label. I
have also added a TODO to instrument other methods in the same way
(otherwise, the label doesn't reall make sense, semantically). I have
also added an explicit error counter for requests rejected because of
the concurrency limit. (They also show up as 503s in the generic HTTP
instrumentation (or they would, if v2 were instrumented, too), but
those 503s might have a number of reasons, while users might want to
alert on concurrency limit problems explicitly).

Signed-off-by: beorn7 <beorn@soundcloud.com>
This commit is contained in:
beorn7 2019-02-12 17:42:11 +01:00
parent 3382a0e949
commit 21de9ff88c
4 changed files with 177 additions and 134 deletions

View File

@ -14,6 +14,7 @@
package api
import (
"errors"
"fmt"
"net/http"
"runtime"
@ -33,44 +34,87 @@ import (
"github.com/go-kit/kit/log"
)
var (
requestsInFlight = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_http_get_requests_in_flight",
Help: "Current number of HTTP GET requests being processed.",
})
)
func init() {
prometheus.MustRegister(requestsInFlight)
}
// API represents all APIs of Alertmanager.
type API struct {
v1 *apiv1.API
v2 *apiv2.API
v1 *apiv1.API
v2 *apiv2.API
requestsInFlight prometheus.Gauge
concurrencyLimitExceeded prometheus.Counter
timeout time.Duration
inFlightSem chan struct{}
}
// Options for the creation of an API object. Alerts, Silences, and StatusFunc
// are mandatory to set. The zero value for everything else is a safe default.
type Options struct {
// Alerts to be used by the API. Mandatory.
Alerts provider.Alerts
// Silences to be used by the API. Mandatory.
Silences *silence.Silences
// StatusFunc is used be the API to retrieve the AlertStatus of an
// alert. Mandatory.
StatusFunc func(model.Fingerprint) types.AlertStatus
// Peer from the gossip cluster. If nil, no clustering will be used.
Peer *cluster.Peer
// Timeout for all HTTP connections. The zero value (and negative
// values) result in no timeout.
Timeout time.Duration
// Concurrency limit for GET requests. The zero value (and negative
// values) result in a limit of GOMAXPROCS or 8, whichever is
// larger. Status code 503 is served for GET requests that would exceed
// the concurrency limit.
Concurrency int
// Logger is used for logging, if nil, no logging will happen.
Logger log.Logger
// Registry is used to register Prometheus metrics. If nil, no metrics
// registration will happen.
Registry prometheus.Registerer
}
func (o Options) validate() error {
if o.Alerts == nil {
return errors.New("mandatory field Alerts not set")
}
if o.Silences == nil {
return errors.New("mandatory field Silences not set")
}
if o.StatusFunc == nil {
return errors.New("mandatory field StatusFunc not set")
}
return nil
}
// New creates a new API object combining all API versions.
func New(
alerts provider.Alerts,
silences *silence.Silences,
sf func(model.Fingerprint) types.AlertStatus,
peer *cluster.Peer,
l log.Logger,
) (*API, error) {
func New(opts Options) (*API, error) {
if err := opts.validate(); err != nil {
return nil, fmt.Errorf("invalid API options: %s", err)
}
l := opts.Logger
if l == nil {
l = log.NewNopLogger()
}
concurrency := opts.Concurrency
if concurrency < 1 {
concurrency = runtime.GOMAXPROCS(0)
if concurrency < 8 {
concurrency = 8
}
}
v1 := apiv1.New(
alerts,
silences,
sf,
peer,
opts.Alerts,
opts.Silences,
opts.StatusFunc,
opts.Peer,
log.With(l, "version", "v1"),
opts.Registry,
)
v2, err := apiv2.NewAPI(
alerts,
sf,
silences,
peer,
opts.Alerts,
opts.StatusFunc,
opts.Silences,
opts.Peer,
log.With(l, "version", "v2"),
)
@ -78,9 +122,34 @@ func New(
return nil, err
}
// TODO(beorn7): For now, this hardcodes the method="get" label. Other
// methods should get the same instrumentation.
requestsInFlight := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_http_requests_in_flight",
Help: "Current number of HTTP requests being processed.",
ConstLabels: prometheus.Labels{"method": "get"},
})
concurrencyLimitExceeded := prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_http_concurrency_limit_exceeded_total",
Help: "Total number of times an HTTP request failed because the concurrency limit was reached.",
ConstLabels: prometheus.Labels{"method": "get"},
})
if opts.Registry != nil {
if err := opts.Registry.Register(requestsInFlight); err != nil {
return nil, err
}
if err := opts.Registry.Register(concurrencyLimitExceeded); err != nil {
return nil, err
}
}
return &API{
v1: v1,
v2: v2,
v1: v1,
v2: v2,
requestsInFlight: requestsInFlight,
concurrencyLimitExceeded: concurrencyLimitExceeded,
timeout: opts.Timeout,
inFlightSem: make(chan struct{}, concurrency),
}, nil
}
@ -88,26 +157,15 @@ func New(
// APIv2 works on the http.Handler level, this method also creates a new
// http.ServeMux and then uses it to register both the provided router (to
// handle "/") and APIv2 (to handle "<routePrefix>/api/v2"). The method returns
// the newly created http.ServeMux.
//
// If the provided value for timeout is positive, it is enforced for all HTTP
// requests. (Negative or zero results in no timeout at all.)
//
// If the provided value for concurrency is positive, it limits the number of
// concurrently processed GET requests. Otherwise, the number of concurrently
// processed GET requests is limited to GOMAXPROCS or 8, whatever is
// larger. Status code 503 is served for GET requests that would exceed the
// concurrency limit.
func (api *API) Register(
r *route.Router, routePrefix string,
timeout time.Duration, concurrency int,
) *http.ServeMux {
limiter := makeLimiter(timeout, concurrency)
// the newly created http.ServeMux. If a timeout has been set on construction of
// API, it is enforced for all HTTP request going through this mux. The same is
// true for the concurrency limit, with the exception that it is only applied to
// GET requests.
func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
api.v1.Register(r.WithPrefix("/api/v1"))
mux := http.NewServeMux()
mux.Handle("/", limiter(r))
mux.Handle("/", api.limitHandler(r))
apiPrefix := ""
if routePrefix != "/" {
@ -116,10 +174,10 @@ func (api *API) Register(
// TODO(beorn7): HTTP instrumentation is only in place for Router. Since
// /api/v2 works on the Handler level, it is currently not instrumented
// at all (with the exception of requestsInFlight, which is handled in
// the Limiter below).
// limitHandler below).
mux.Handle(
apiPrefix+"/api/v2/",
limiter(http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler)),
api.limitHandler(http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler)),
)
return mux
@ -134,49 +192,30 @@ func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration) error {
return api.v2.Update(cfg, resolveTimeout)
}
// makeLimiter returns an HTTP middleware that sets a timeout for HTTP requests
// and also limits the number of concurrently processed GET requests to the
// given number.
//
// If timeout is < 1, no timeout is enforced.
//
// If concurrency is < 1, GOMAXPROCS is used as the concurrency limit but at least 8.
//
// The returned middleware serves http.StatusServiceUnavailable (503) for requests that
// would exceed the number.
func makeLimiter(timeout time.Duration, concurrency int) func(http.Handler) http.Handler {
if concurrency < 1 {
concurrency = runtime.GOMAXPROCS(0)
if concurrency < 8 {
concurrency = 8
}
}
inFlightSem := make(chan struct{}, concurrency)
return func(h http.Handler) http.Handler {
concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodGet { // Only limit concurrency of GETs.
select {
case inFlightSem <- struct{}{}: // All good, carry on.
requestsInFlight.Inc()
defer func() {
<-inFlightSem
requestsInFlight.Dec()
}()
default:
http.Error(rsp, fmt.Sprintf(
"Limit of concurrent GET requests reached (%d), try again later.\n", concurrency,
), http.StatusServiceUnavailable)
return
}
func (api *API) limitHandler(h http.Handler) http.Handler {
concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodGet { // Only limit concurrency of GETs.
select {
case api.inFlightSem <- struct{}{}: // All good, carry on.
api.requestsInFlight.Inc()
defer func() {
<-api.inFlightSem
api.requestsInFlight.Dec()
}()
default:
api.concurrencyLimitExceeded.Inc()
http.Error(rsp, fmt.Sprintf(
"Limit of concurrent GET requests reached (%d), try again later.\n", cap(api.inFlightSem),
), http.StatusServiceUnavailable)
return
}
h.ServeHTTP(rsp, req)
})
if timeout <= 0 {
return concLimiter
}
return http.TimeoutHandler(concLimiter, timeout, fmt.Sprintf(
"Exceeded configured timeout of %v.\n", timeout,
))
h.ServeHTTP(rsp, req)
})
if api.timeout <= 0 {
return concLimiter
}
return http.TimeoutHandler(concLimiter, api.timeout, fmt.Sprintf(
"Exceeded configured timeout of %v.\n", api.timeout,
))
}

View File

@ -41,28 +41,6 @@ import (
"github.com/prometheus/alertmanager/types"
)
var (
numReceivedAlerts = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "alerts_received_total",
Help: "The total number of received alerts.",
}, []string{"status"})
numInvalidAlerts = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "alerts_invalid_total",
Help: "The total number of received alerts that were invalid.",
})
)
func init() {
numReceivedAlerts.WithLabelValues("firing")
numReceivedAlerts.WithLabelValues("resolved")
prometheus.MustRegister(numReceivedAlerts)
prometheus.MustRegister(numInvalidAlerts)
}
var corsHeaders = map[string]string{
"Access-Control-Allow-Headers": "Accept, Authorization, Content-Type, Origin",
"Access-Control-Allow-Methods": "GET, DELETE, OPTIONS",
@ -98,6 +76,9 @@ type API struct {
peer *cluster.Peer
logger log.Logger
numReceivedAlerts *prometheus.CounterVec
numInvalidAlerts prometheus.Counter
getAlertStatus getAlertStatusFn
mtx sync.RWMutex
@ -112,18 +93,38 @@ func New(
sf getAlertStatusFn,
peer *cluster.Peer,
l log.Logger,
r prometheus.Registerer,
) *API {
if l == nil {
l = log.NewNopLogger()
}
numReceivedAlerts := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "alerts_received_total",
Help: "The total number of received alerts.",
}, []string{"status"})
numInvalidAlerts := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "alerts_invalid_total",
Help: "The total number of received alerts that were invalid.",
})
numReceivedAlerts.WithLabelValues("firing")
numReceivedAlerts.WithLabelValues("resolved")
if r != nil {
r.MustRegister(numReceivedAlerts, numInvalidAlerts)
}
return &API{
alerts: alerts,
silences: silences,
getAlertStatus: sf,
uptime: time.Now(),
peer: peer,
logger: l,
alerts: alerts,
silences: silences,
getAlertStatus: sf,
uptime: time.Now(),
peer: peer,
logger: l,
numReceivedAlerts: numReceivedAlerts,
numInvalidAlerts: numInvalidAlerts,
}
}
@ -452,9 +453,9 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
alert.EndsAt = now.Add(resolveTimeout)
}
if alert.EndsAt.After(time.Now()) {
numReceivedAlerts.WithLabelValues("firing").Inc()
api.numReceivedAlerts.WithLabelValues("firing").Inc()
} else {
numReceivedAlerts.WithLabelValues("resolved").Inc()
api.numReceivedAlerts.WithLabelValues("resolved").Inc()
}
}
@ -468,7 +469,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
if err := a.Validate(); err != nil {
validationErrs.Add(err)
numInvalidAlerts.Inc()
api.numInvalidAlerts.Inc()
continue
}
validAlerts = append(validAlerts, a)

View File

@ -132,7 +132,7 @@ func TestAddAlerts(t *testing.T) {
}
alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
w := httptest.NewRecorder()
@ -259,7 +259,7 @@ func TestListAlerts(t *testing.T) {
},
} {
alertsProvider := newFakeAlerts(alerts, tc.err)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil)
r, err := http.NewRequest("GET", "/api/v1/alerts", nil)

View File

@ -127,8 +127,8 @@ func run() int {
externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
routePrefix = kingpin.Flag("web.route-prefix", "Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url.").String()
listenAddress = kingpin.Flag("web.listen-address", "Address to listen on for the web interface and API.").Default(":9093").String()
getConcurrency = kingpin.Flag("web.get-concurrency", "Maximum number of GET requests processed concurrently. If negative or zero, the limit is GOMAXPROC or 8, whatever is larger.").Default("0").Int()
getTimeout = kingpin.Flag("web.timeout", "Timeout for HTTP requests. If negative or zero, no timeout is set.").Default("0").Duration()
getConcurrency = kingpin.Flag("web.get-concurrency", "Maximum number of GET requests processed concurrently. If negative or zero, the limit is GOMAXPROC or 8, whichever is larger.").Default("0").Int()
httpTimeout = kingpin.Flag("web.timeout", "Timeout for HTTP requests. If negative or zero, no timeout is set.").Default("0").Duration()
clusterBindAddr = kingpin.Flag("cluster.listen-address", "Listen address for cluster.").
Default(defaultClusterAddr).String()
@ -270,13 +270,16 @@ func run() int {
)
defer disp.Stop()
api, err := api.New(
alerts,
silences,
marker.Status,
peer,
log.With(logger, "component", "api"),
)
api, err := api.New(api.Options{
Alerts: alerts,
Silences: silences,
StatusFunc: marker.Status,
Peer: peer,
Timeout: *httpTimeout,
Concurrency: *getConcurrency,
Logger: log.With(logger, "component", "api"),
Registry: prometheus.DefaultRegisterer,
})
if err != nil {
level.Error(logger).Log("err", fmt.Errorf("failed to create API: %v", err.Error()))
@ -376,7 +379,7 @@ func run() int {
ui.Register(router, webReload, logger)
mux := api.Register(router, *routePrefix, *getTimeout, *getConcurrency)
mux := api.Register(router, *routePrefix)
srv := http.Server{Addr: *listenAddress, Handler: mux}
srvc := make(chan struct{})