alertmanager/api/api.go
gotjosh 9a2ae39430
Clustering: Interface for Peers in other packages
A Peer as defined by the `cluster` package represents the node in the
cluster. It is used in other packages to know the status of all of the
members or how long should we wait to know if a notification has already fired.

In Cortex, we'd like to implement a slightly different way of
clustering (using gRPC for communication and a
hash ring for node discovery).

This is a small change to support that by changing the consumer of other
packages to an interface.

Silences and Notification channels don't need an interface as they take
a `func([]byte) error` as a parameter.

Signed-off-by: gotjosh <josue@grafana.com>
2021-02-19 19:07:41 +00:00

231 lines
7.5 KiB
Go

// Copyright 2019 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"errors"
"fmt"
"net/http"
"runtime"
"time"
apiv1 "github.com/prometheus/alertmanager/api/v1"
apiv2 "github.com/prometheus/alertmanager/api/v2"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/go-kit/kit/log"
)
// API represents all APIs of Alertmanager.
type API struct {
v1 *apiv1.API
v2 *apiv2.API
requestsInFlight prometheus.Gauge
concurrencyLimitExceeded prometheus.Counter
timeout time.Duration
inFlightSem chan struct{}
}
// Options for the creation of an API object. Alerts, Silences, and StatusFunc
// are mandatory to set. The zero value for everything else is a safe default.
type Options struct {
// Alerts to be used by the API. Mandatory.
Alerts provider.Alerts
// Silences to be used by the API. Mandatory.
Silences *silence.Silences
// StatusFunc is used be the API to retrieve the AlertStatus of an
// alert. Mandatory.
StatusFunc func(model.Fingerprint) types.AlertStatus
// Peer from the gossip cluster. If nil, no clustering will be used.
Peer cluster.ClusterPeer
// Timeout for all HTTP connections. The zero value (and negative
// values) result in no timeout.
Timeout time.Duration
// Concurrency limit for GET requests. The zero value (and negative
// values) result in a limit of GOMAXPROCS or 8, whichever is
// larger. Status code 503 is served for GET requests that would exceed
// the concurrency limit.
Concurrency int
// Logger is used for logging, if nil, no logging will happen.
Logger log.Logger
// Registry is used to register Prometheus metrics. If nil, no metrics
// registration will happen.
Registry prometheus.Registerer
// GroupFunc returns a list of alert groups. The alerts are grouped
// according to the current active configuration. Alerts returned are
// filtered by the arguments provided to the function.
GroupFunc func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string)
}
func (o Options) validate() error {
if o.Alerts == nil {
return errors.New("mandatory field Alerts not set")
}
if o.Silences == nil {
return errors.New("mandatory field Silences not set")
}
if o.StatusFunc == nil {
return errors.New("mandatory field StatusFunc not set")
}
if o.GroupFunc == nil {
return errors.New("mandatory field GroupFunc not set")
}
return nil
}
// New creates a new API object combining all API versions. Note that an Update
// call is also needed to get the APIs into an operational state.
func New(opts Options) (*API, error) {
if err := opts.validate(); err != nil {
return nil, fmt.Errorf("invalid API options: %s", err)
}
l := opts.Logger
if l == nil {
l = log.NewNopLogger()
}
concurrency := opts.Concurrency
if concurrency < 1 {
concurrency = runtime.GOMAXPROCS(0)
if concurrency < 8 {
concurrency = 8
}
}
v1 := apiv1.New(
opts.Alerts,
opts.Silences,
opts.StatusFunc,
opts.Peer,
log.With(l, "version", "v1"),
opts.Registry,
)
v2, err := apiv2.NewAPI(
opts.Alerts,
opts.GroupFunc,
opts.StatusFunc,
opts.Silences,
opts.Peer,
log.With(l, "version", "v2"),
opts.Registry,
)
if err != nil {
return nil, err
}
// TODO(beorn7): For now, this hardcodes the method="get" label. Other
// methods should get the same instrumentation.
requestsInFlight := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_http_requests_in_flight",
Help: "Current number of HTTP requests being processed.",
ConstLabels: prometheus.Labels{"method": "get"},
})
concurrencyLimitExceeded := prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_http_concurrency_limit_exceeded_total",
Help: "Total number of times an HTTP request failed because the concurrency limit was reached.",
ConstLabels: prometheus.Labels{"method": "get"},
})
if opts.Registry != nil {
if err := opts.Registry.Register(requestsInFlight); err != nil {
return nil, err
}
if err := opts.Registry.Register(concurrencyLimitExceeded); err != nil {
return nil, err
}
}
return &API{
v1: v1,
v2: v2,
requestsInFlight: requestsInFlight,
concurrencyLimitExceeded: concurrencyLimitExceeded,
timeout: opts.Timeout,
inFlightSem: make(chan struct{}, concurrency),
}, nil
}
// Register all APIs. It registers APIv1 with the provided router directly. As
// APIv2 works on the http.Handler level, this method also creates a new
// http.ServeMux and then uses it to register both the provided router (to
// handle "/") and APIv2 (to handle "<routePrefix>/api/v2"). The method returns
// the newly created http.ServeMux. If a timeout has been set on construction of
// API, it is enforced for all HTTP request going through this mux. The same is
// true for the concurrency limit, with the exception that it is only applied to
// GET requests.
func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
api.v1.Register(r.WithPrefix("/api/v1"))
mux := http.NewServeMux()
mux.Handle("/", api.limitHandler(r))
apiPrefix := ""
if routePrefix != "/" {
apiPrefix = routePrefix
}
// TODO(beorn7): HTTP instrumentation is only in place for Router. Since
// /api/v2 works on the Handler level, it is currently not instrumented
// at all (with the exception of requestsInFlight, which is handled in
// limitHandler below).
mux.Handle(
apiPrefix+"/api/v2/",
api.limitHandler(http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler)),
)
return mux
}
// Update config and resolve timeout of each API. APIv2 also needs
// setAlertStatus to be updated.
func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet)) {
api.v1.Update(cfg)
api.v2.Update(cfg, setAlertStatus)
}
func (api *API) limitHandler(h http.Handler) http.Handler {
concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodGet { // Only limit concurrency of GETs.
select {
case api.inFlightSem <- struct{}{}: // All good, carry on.
api.requestsInFlight.Inc()
defer func() {
<-api.inFlightSem
api.requestsInFlight.Dec()
}()
default:
api.concurrencyLimitExceeded.Inc()
http.Error(rsp, fmt.Sprintf(
"Limit of concurrent GET requests reached (%d), try again later.\n", cap(api.inFlightSem),
), http.StatusServiceUnavailable)
return
}
}
h.ServeHTTP(rsp, req)
})
if api.timeout <= 0 {
return concLimiter
}
return http.TimeoutHandler(concLimiter, api.timeout, fmt.Sprintf(
"Exceeded configured timeout of %v.\n", api.timeout,
))
}