Merge pull request #1743 from prometheus/beorn7/api
Introduce concurrency limit for GET requests and a general timeout for HTTP
This commit is contained in:
commit
da6e2a88dd
169
api/api.go
169
api/api.go
|
@ -14,7 +14,10 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
apiv1 "github.com/prometheus/alertmanager/api/v1"
|
||||
|
@ -24,6 +27,7 @@ import (
|
|||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/silence"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/route"
|
||||
|
||||
|
@ -32,31 +36,85 @@ import (
|
|||
|
||||
// API represents all APIs of Alertmanager.
|
||||
type API struct {
|
||||
v1 *apiv1.API
|
||||
v2 *apiv2.API
|
||||
v1 *apiv1.API
|
||||
v2 *apiv2.API
|
||||
requestsInFlight prometheus.Gauge
|
||||
concurrencyLimitExceeded prometheus.Counter
|
||||
timeout time.Duration
|
||||
inFlightSem chan struct{}
|
||||
}
|
||||
|
||||
// Options for the creation of an API object. Alerts, Silences, and StatusFunc
|
||||
// are mandatory to set. The zero value for everything else is a safe default.
|
||||
type Options struct {
|
||||
// Alerts to be used by the API. Mandatory.
|
||||
Alerts provider.Alerts
|
||||
// Silences to be used by the API. Mandatory.
|
||||
Silences *silence.Silences
|
||||
// StatusFunc is used be the API to retrieve the AlertStatus of an
|
||||
// alert. Mandatory.
|
||||
StatusFunc func(model.Fingerprint) types.AlertStatus
|
||||
// Peer from the gossip cluster. If nil, no clustering will be used.
|
||||
Peer *cluster.Peer
|
||||
// Timeout for all HTTP connections. The zero value (and negative
|
||||
// values) result in no timeout.
|
||||
Timeout time.Duration
|
||||
// Concurrency limit for GET requests. The zero value (and negative
|
||||
// values) result in a limit of GOMAXPROCS or 8, whichever is
|
||||
// larger. Status code 503 is served for GET requests that would exceed
|
||||
// the concurrency limit.
|
||||
Concurrency int
|
||||
// Logger is used for logging, if nil, no logging will happen.
|
||||
Logger log.Logger
|
||||
// Registry is used to register Prometheus metrics. If nil, no metrics
|
||||
// registration will happen.
|
||||
Registry prometheus.Registerer
|
||||
}
|
||||
|
||||
func (o Options) validate() error {
|
||||
if o.Alerts == nil {
|
||||
return errors.New("mandatory field Alerts not set")
|
||||
}
|
||||
if o.Silences == nil {
|
||||
return errors.New("mandatory field Silences not set")
|
||||
}
|
||||
if o.StatusFunc == nil {
|
||||
return errors.New("mandatory field StatusFunc not set")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// New creates a new API object combining all API versions.
|
||||
func New(
|
||||
alerts provider.Alerts,
|
||||
silences *silence.Silences,
|
||||
sf func(model.Fingerprint) types.AlertStatus,
|
||||
peer *cluster.Peer,
|
||||
l log.Logger,
|
||||
) (*API, error) {
|
||||
func New(opts Options) (*API, error) {
|
||||
if err := opts.validate(); err != nil {
|
||||
return nil, fmt.Errorf("invalid API options: %s", err)
|
||||
}
|
||||
l := opts.Logger
|
||||
if l == nil {
|
||||
l = log.NewNopLogger()
|
||||
}
|
||||
concurrency := opts.Concurrency
|
||||
if concurrency < 1 {
|
||||
concurrency = runtime.GOMAXPROCS(0)
|
||||
if concurrency < 8 {
|
||||
concurrency = 8
|
||||
}
|
||||
}
|
||||
|
||||
v1 := apiv1.New(
|
||||
alerts,
|
||||
silences,
|
||||
sf,
|
||||
peer,
|
||||
opts.Alerts,
|
||||
opts.Silences,
|
||||
opts.StatusFunc,
|
||||
opts.Peer,
|
||||
log.With(l, "version", "v1"),
|
||||
opts.Registry,
|
||||
)
|
||||
|
||||
v2, err := apiv2.NewAPI(
|
||||
alerts,
|
||||
sf,
|
||||
silences,
|
||||
peer,
|
||||
opts.Alerts,
|
||||
opts.StatusFunc,
|
||||
opts.Silences,
|
||||
opts.Peer,
|
||||
log.With(l, "version", "v2"),
|
||||
)
|
||||
|
||||
|
@ -64,24 +122,63 @@ func New(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// TODO(beorn7): For now, this hardcodes the method="get" label. Other
|
||||
// methods should get the same instrumentation.
|
||||
requestsInFlight := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_http_requests_in_flight",
|
||||
Help: "Current number of HTTP requests being processed.",
|
||||
ConstLabels: prometheus.Labels{"method": "get"},
|
||||
})
|
||||
concurrencyLimitExceeded := prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "alertmanager_http_concurrency_limit_exceeded_total",
|
||||
Help: "Total number of times an HTTP request failed because the concurrency limit was reached.",
|
||||
ConstLabels: prometheus.Labels{"method": "get"},
|
||||
})
|
||||
if opts.Registry != nil {
|
||||
if err := opts.Registry.Register(requestsInFlight); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := opts.Registry.Register(concurrencyLimitExceeded); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &API{
|
||||
v1: v1,
|
||||
v2: v2,
|
||||
v1: v1,
|
||||
v2: v2,
|
||||
requestsInFlight: requestsInFlight,
|
||||
concurrencyLimitExceeded: concurrencyLimitExceeded,
|
||||
timeout: opts.Timeout,
|
||||
inFlightSem: make(chan struct{}, concurrency),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Register all APIs with the given router and return a mux.
|
||||
// Register all APIs. It registers APIv1 with the provided router directly. As
|
||||
// APIv2 works on the http.Handler level, this method also creates a new
|
||||
// http.ServeMux and then uses it to register both the provided router (to
|
||||
// handle "/") and APIv2 (to handle "<routePrefix>/api/v2"). The method returns
|
||||
// the newly created http.ServeMux. If a timeout has been set on construction of
|
||||
// API, it is enforced for all HTTP request going through this mux. The same is
|
||||
// true for the concurrency limit, with the exception that it is only applied to
|
||||
// GET requests.
|
||||
func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
|
||||
api.v1.Register(r.WithPrefix("/api/v1"))
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/", r)
|
||||
mux.Handle("/", api.limitHandler(r))
|
||||
|
||||
apiPrefix := ""
|
||||
if routePrefix != "/" {
|
||||
apiPrefix = routePrefix
|
||||
}
|
||||
mux.Handle(apiPrefix+"/api/v2/", http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler))
|
||||
// TODO(beorn7): HTTP instrumentation is only in place for Router. Since
|
||||
// /api/v2 works on the Handler level, it is currently not instrumented
|
||||
// at all (with the exception of requestsInFlight, which is handled in
|
||||
// limitHandler below).
|
||||
mux.Handle(
|
||||
apiPrefix+"/api/v2/",
|
||||
api.limitHandler(http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler)),
|
||||
)
|
||||
|
||||
return mux
|
||||
}
|
||||
|
@ -94,3 +191,31 @@ func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration) error {
|
|||
|
||||
return api.v2.Update(cfg, resolveTimeout)
|
||||
}
|
||||
|
||||
func (api *API) limitHandler(h http.Handler) http.Handler {
|
||||
concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
|
||||
if req.Method == http.MethodGet { // Only limit concurrency of GETs.
|
||||
select {
|
||||
case api.inFlightSem <- struct{}{}: // All good, carry on.
|
||||
api.requestsInFlight.Inc()
|
||||
defer func() {
|
||||
<-api.inFlightSem
|
||||
api.requestsInFlight.Dec()
|
||||
}()
|
||||
default:
|
||||
api.concurrencyLimitExceeded.Inc()
|
||||
http.Error(rsp, fmt.Sprintf(
|
||||
"Limit of concurrent GET requests reached (%d), try again later.\n", cap(api.inFlightSem),
|
||||
), http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
}
|
||||
h.ServeHTTP(rsp, req)
|
||||
})
|
||||
if api.timeout <= 0 {
|
||||
return concLimiter
|
||||
}
|
||||
return http.TimeoutHandler(concLimiter, api.timeout, fmt.Sprintf(
|
||||
"Exceeded configured timeout of %v.\n", api.timeout,
|
||||
))
|
||||
}
|
||||
|
|
|
@ -41,28 +41,6 @@ import (
|
|||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
var (
|
||||
numReceivedAlerts = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "alertmanager",
|
||||
Name: "alerts_received_total",
|
||||
Help: "The total number of received alerts.",
|
||||
}, []string{"status"})
|
||||
|
||||
numInvalidAlerts = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "alertmanager",
|
||||
Name: "alerts_invalid_total",
|
||||
Help: "The total number of received alerts that were invalid.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
numReceivedAlerts.WithLabelValues("firing")
|
||||
numReceivedAlerts.WithLabelValues("resolved")
|
||||
|
||||
prometheus.MustRegister(numReceivedAlerts)
|
||||
prometheus.MustRegister(numInvalidAlerts)
|
||||
}
|
||||
|
||||
var corsHeaders = map[string]string{
|
||||
"Access-Control-Allow-Headers": "Accept, Authorization, Content-Type, Origin",
|
||||
"Access-Control-Allow-Methods": "GET, DELETE, OPTIONS",
|
||||
|
@ -98,6 +76,9 @@ type API struct {
|
|||
peer *cluster.Peer
|
||||
logger log.Logger
|
||||
|
||||
numReceivedAlerts *prometheus.CounterVec
|
||||
numInvalidAlerts prometheus.Counter
|
||||
|
||||
getAlertStatus getAlertStatusFn
|
||||
|
||||
mtx sync.RWMutex
|
||||
|
@ -112,18 +93,38 @@ func New(
|
|||
sf getAlertStatusFn,
|
||||
peer *cluster.Peer,
|
||||
l log.Logger,
|
||||
r prometheus.Registerer,
|
||||
) *API {
|
||||
if l == nil {
|
||||
l = log.NewNopLogger()
|
||||
}
|
||||
|
||||
numReceivedAlerts := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "alertmanager",
|
||||
Name: "alerts_received_total",
|
||||
Help: "The total number of received alerts.",
|
||||
}, []string{"status"})
|
||||
|
||||
numInvalidAlerts := prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "alertmanager",
|
||||
Name: "alerts_invalid_total",
|
||||
Help: "The total number of received alerts that were invalid.",
|
||||
})
|
||||
numReceivedAlerts.WithLabelValues("firing")
|
||||
numReceivedAlerts.WithLabelValues("resolved")
|
||||
if r != nil {
|
||||
r.MustRegister(numReceivedAlerts, numInvalidAlerts)
|
||||
}
|
||||
|
||||
return &API{
|
||||
alerts: alerts,
|
||||
silences: silences,
|
||||
getAlertStatus: sf,
|
||||
uptime: time.Now(),
|
||||
peer: peer,
|
||||
logger: l,
|
||||
alerts: alerts,
|
||||
silences: silences,
|
||||
getAlertStatus: sf,
|
||||
uptime: time.Now(),
|
||||
peer: peer,
|
||||
logger: l,
|
||||
numReceivedAlerts: numReceivedAlerts,
|
||||
numInvalidAlerts: numInvalidAlerts,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -253,6 +254,7 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
|
|||
// are no alerts present
|
||||
res = []*Alert{}
|
||||
matchers = []*labels.Matcher{}
|
||||
ctx = r.Context()
|
||||
|
||||
showActive, showInhibited bool
|
||||
showSilenced, showUnprocessed bool
|
||||
|
@ -326,11 +328,13 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
|
|||
defer alerts.Close()
|
||||
|
||||
api.mtx.RLock()
|
||||
// TODO(fabxc): enforce a sensible timeout.
|
||||
for a := range alerts.Next() {
|
||||
if err = alerts.Err(); err != nil {
|
||||
break
|
||||
}
|
||||
if err = ctx.Err(); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
routes := api.route.Match(a.Labels)
|
||||
receivers := make([]string, 0, len(routes))
|
||||
|
@ -449,9 +453,9 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
|
|||
alert.EndsAt = now.Add(resolveTimeout)
|
||||
}
|
||||
if alert.EndsAt.After(time.Now()) {
|
||||
numReceivedAlerts.WithLabelValues("firing").Inc()
|
||||
api.numReceivedAlerts.WithLabelValues("firing").Inc()
|
||||
} else {
|
||||
numReceivedAlerts.WithLabelValues("resolved").Inc()
|
||||
api.numReceivedAlerts.WithLabelValues("resolved").Inc()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -465,7 +469,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
|
|||
|
||||
if err := a.Validate(); err != nil {
|
||||
validationErrs.Add(err)
|
||||
numInvalidAlerts.Inc()
|
||||
api.numInvalidAlerts.Inc()
|
||||
continue
|
||||
}
|
||||
validAlerts = append(validAlerts, a)
|
||||
|
|
|
@ -132,7 +132,7 @@ func TestAddAlerts(t *testing.T) {
|
|||
}
|
||||
|
||||
alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
|
||||
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil)
|
||||
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
|
||||
|
||||
r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
|
||||
w := httptest.NewRecorder()
|
||||
|
@ -259,7 +259,7 @@ func TestListAlerts(t *testing.T) {
|
|||
},
|
||||
} {
|
||||
alertsProvider := newFakeAlerts(alerts, tc.err)
|
||||
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil)
|
||||
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
|
||||
api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil)
|
||||
|
||||
r, err := http.NewRequest("GET", "/api/v1/alerts", nil)
|
||||
|
|
|
@ -195,6 +195,7 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
|
|||
// are no alerts present
|
||||
res = open_api_models.GettableAlerts{}
|
||||
matchers = []*labels.Matcher{}
|
||||
ctx = params.HTTPRequest.Context()
|
||||
)
|
||||
|
||||
if params.Filter != nil {
|
||||
|
@ -224,11 +225,13 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
|
|||
defer alerts.Close()
|
||||
|
||||
api.mtx.RLock()
|
||||
// TODO(fabxc): enforce a sensible timeout.
|
||||
for a := range alerts.Next() {
|
||||
if err = alerts.Err(); err != nil {
|
||||
break
|
||||
}
|
||||
if err = ctx.Err(); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
routes := api.route.Match(a.Labels)
|
||||
receivers := make([]*open_api_models.Receiver, 0, len(routes))
|
||||
|
|
|
@ -96,10 +96,11 @@ func init() {
|
|||
}
|
||||
|
||||
func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc {
|
||||
handlerLabel := prometheus.Labels{"handler": handlerName}
|
||||
return promhttp.InstrumentHandlerDuration(
|
||||
requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}),
|
||||
requestDuration.MustCurryWith(handlerLabel),
|
||||
promhttp.InstrumentHandlerResponseSize(
|
||||
responseSize.MustCurryWith(prometheus.Labels{"handler": handlerName}),
|
||||
responseSize.MustCurryWith(handlerLabel),
|
||||
handler,
|
||||
),
|
||||
)
|
||||
|
@ -123,9 +124,11 @@ func run() int {
|
|||
retention = kingpin.Flag("data.retention", "How long to keep data for.").Default("120h").Duration()
|
||||
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()
|
||||
|
||||
externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
|
||||
routePrefix = kingpin.Flag("web.route-prefix", "Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url.").String()
|
||||
listenAddress = kingpin.Flag("web.listen-address", "Address to listen on for the web interface and API.").Default(":9093").String()
|
||||
externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
|
||||
routePrefix = kingpin.Flag("web.route-prefix", "Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url.").String()
|
||||
listenAddress = kingpin.Flag("web.listen-address", "Address to listen on for the web interface and API.").Default(":9093").String()
|
||||
getConcurrency = kingpin.Flag("web.get-concurrency", "Maximum number of GET requests processed concurrently. If negative or zero, the limit is GOMAXPROC or 8, whichever is larger.").Default("0").Int()
|
||||
httpTimeout = kingpin.Flag("web.timeout", "Timeout for HTTP requests. If negative or zero, no timeout is set.").Default("0").Duration()
|
||||
|
||||
clusterBindAddr = kingpin.Flag("cluster.listen-address", "Listen address for cluster.").
|
||||
Default(defaultClusterAddr).String()
|
||||
|
@ -267,13 +270,16 @@ func run() int {
|
|||
)
|
||||
defer disp.Stop()
|
||||
|
||||
api, err := api.New(
|
||||
alerts,
|
||||
silences,
|
||||
marker.Status,
|
||||
peer,
|
||||
log.With(logger, "component", "api"),
|
||||
)
|
||||
api, err := api.New(api.Options{
|
||||
Alerts: alerts,
|
||||
Silences: silences,
|
||||
StatusFunc: marker.Status,
|
||||
Peer: peer,
|
||||
Timeout: *httpTimeout,
|
||||
Concurrency: *getConcurrency,
|
||||
Logger: log.With(logger, "component", "api"),
|
||||
Registry: prometheus.DefaultRegisterer,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", fmt.Errorf("failed to create API: %v", err.Error()))
|
||||
|
|
Loading…
Reference in New Issue