*: use persistent HTTP clients (#1904)

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2019-06-07 10:37:49 +02:00 committed by GitHub
parent e33edd6f13
commit 2abd78cbb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 304 additions and 185 deletions

View File

@ -289,9 +289,7 @@ func run() int {
var (
inhibitor *inhibit.Inhibitor
silencer *silence.Silencer
tmpl *template.Template
pipeline notify.Stage
)
configCoordinator := config.NewCoordinator(
@ -306,20 +304,29 @@ func run() int {
}
tmpl.ExternalURL = amURL
// Build the map of receiver to integrations.
receivers := make(map[string][]notify.Integration, len(conf.Receivers))
for _, rcv := range conf.Receivers {
integrations, err := notify.BuildReceiverIntegrations(rcv, tmpl, logger)
if err != nil {
return err
}
// rcv.Name is guaranteed to be unique across all receivers.
receivers[rcv.Name] = integrations
}
inhibitor.Stop()
disp.Stop()
inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger)
silencer = silence.NewSilencer(silences, marker, logger)
pipeline = notify.BuildPipeline(
conf.Receivers,
tmpl,
silencer := silence.NewSilencer(silences, marker, logger)
pipeline := notify.BuildPipeline(
receivers,
waitFunc,
inhibitor,
silencer,
notificationLog,
peer,
logger,
)
api.Update(conf, func(labels model.LabelSet) {

View File

@ -38,18 +38,18 @@ import (
"github.com/prometheus/alertmanager/types"
)
// A Notifier notifies about alerts under constraints of the given context.
// It returns an error if unsuccessful and a flag whether the error is
// Notifier notifies about alerts under constraints of the given context. It
// returns an error if unsuccessful and a flag whether the error is
// recoverable. This information is useful for a retry logic.
type Notifier interface {
Notify(context.Context, ...*types.Alert) (bool, error)
}
// An Integration wraps a notifier and its config to be uniquely identified by
// name and index from its origin in the configuration.
// Integration wraps a notifier and its configuration to be uniquely identified
// by name and index from its origin in the configuration.
type Integration struct {
notifier Notifier
conf notifierConfig
rs ResolvedSender
name string
idx int
}
@ -59,15 +59,36 @@ func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool,
return i.notifier.Notify(ctx, alerts...)
}
// SendResolved implements the ResolvedSender interface.
func (i *Integration) SendResolved() bool {
return i.rs.SendResolved()
}
// Name returns the name of the integration.
func (i *Integration) Name() string {
return i.name
}
// Name returns the index of the integration.
func (i *Integration) Index() int {
return i.idx
}
// BuildReceiverIntegrations builds a list of integration notifiers off of a
// receivers config.
func BuildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, logger log.Logger) []Integration {
// receiver config.
func BuildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, logger log.Logger) ([]Integration, error) {
var (
errs types.MultiError
integrations []Integration
add = func(name string, i int, n Notifier, nc notifierConfig) {
add = func(name string, i int, rs ResolvedSender, f func(l log.Logger) (Notifier, error)) {
n, err := f(log.With(logger, "integration", name))
if err != nil {
errs.Add(err)
return
}
integrations = append(integrations, Integration{
notifier: n,
conf: nc,
rs: rs,
name: name,
idx: i,
})
@ -75,42 +96,36 @@ func BuildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, log
)
for i, c := range nc.WebhookConfigs {
n := NewWebhook(c, tmpl, logger)
add("webhook", i, n, c)
add("webhook", i, c, func(l log.Logger) (Notifier, error) { return NewWebhook(c, tmpl, l) })
}
for i, c := range nc.EmailConfigs {
n := NewEmail(c, tmpl, logger)
add("email", i, n, c)
add("email", i, c, func(l log.Logger) (Notifier, error) { return NewEmail(c, tmpl, l), nil })
}
for i, c := range nc.PagerdutyConfigs {
n := NewPagerDuty(c, tmpl, logger)
add("pagerduty", i, n, c)
add("pagerduty", i, c, func(l log.Logger) (Notifier, error) { return NewPagerDuty(c, tmpl, l) })
}
for i, c := range nc.OpsGenieConfigs {
n := NewOpsGenie(c, tmpl, logger)
add("opsgenie", i, n, c)
add("opsgenie", i, c, func(l log.Logger) (Notifier, error) { return NewOpsGenie(c, tmpl, l) })
}
for i, c := range nc.WechatConfigs {
n := NewWechat(c, tmpl, logger)
add("wechat", i, n, c)
add("wechat", i, c, func(l log.Logger) (Notifier, error) { return NewWechat(c, tmpl, l) })
}
for i, c := range nc.SlackConfigs {
n := NewSlack(c, tmpl, logger)
add("slack", i, n, c)
add("slack", i, c, func(l log.Logger) (Notifier, error) { return NewSlack(c, tmpl, l) })
}
for i, c := range nc.HipchatConfigs {
n := NewHipchat(c, tmpl, logger)
add("hipchat", i, n, c)
add("hipchat", i, c, func(l log.Logger) (Notifier, error) { return NewHipchat(c, tmpl, l) })
}
for i, c := range nc.VictorOpsConfigs {
n := NewVictorOps(c, tmpl, logger)
add("victorops", i, n, c)
add("victorops", i, c, func(l log.Logger) (Notifier, error) { return NewVictorOps(c, tmpl, l) })
}
for i, c := range nc.PushoverConfigs {
n := NewPushover(c, tmpl, logger)
add("pushover", i, n, c)
add("pushover", i, c, func(l log.Logger) (Notifier, error) { return NewPushover(c, tmpl, l) })
}
return integrations
if errs.Len() > 0 {
return nil, &errs
}
return integrations, nil
}
const contentTypeJSON = "application/json"
@ -122,11 +137,21 @@ type Webhook struct {
conf *config.WebhookConfig
tmpl *template.Template
logger log.Logger
client *http.Client
}
// NewWebhook returns a new Webhook.
func NewWebhook(conf *config.WebhookConfig, t *template.Template, l log.Logger) *Webhook {
return &Webhook{conf: conf, tmpl: t, logger: l}
func NewWebhook(conf *config.WebhookConfig, t *template.Template, l log.Logger) (*Webhook, error) {
client, err := commoncfg.NewClientFromConfig(*conf.HTTPConfig, "webhook")
if err != nil {
return nil, err
}
return &Webhook{
conf: conf,
tmpl: t,
logger: l,
client: client,
}, nil
}
// WebhookMessage defines the JSON object send to webhook endpoints.
@ -165,16 +190,11 @@ func (w *Webhook) Notify(ctx context.Context, alerts ...*types.Alert) (bool, err
req.Header.Set("Content-Type", contentTypeJSON)
req.Header.Set("User-Agent", userAgentHeader)
c, err := commoncfg.NewClientFromConfig(*w.conf.HTTPConfig, "webhook")
if err != nil {
return false, err
}
resp, err := c.Do(req.WithContext(ctx))
resp, err := w.client.Do(req.WithContext(ctx))
if err != nil {
return true, err
}
resp.Body.Close()
drainResponse(resp)
return w.retry(resp.StatusCode)
}
@ -195,15 +215,20 @@ type PagerDuty struct {
tmpl *template.Template
logger log.Logger
apiV1 string // for tests.
client *http.Client
}
// NewPagerDuty returns a new PagerDuty notifier.
func NewPagerDuty(c *config.PagerdutyConfig, t *template.Template, l log.Logger) *PagerDuty {
n := &PagerDuty{conf: c, tmpl: t, logger: l}
func NewPagerDuty(c *config.PagerdutyConfig, t *template.Template, l log.Logger) (*PagerDuty, error) {
client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "pagerduty")
if err != nil {
return nil, err
}
n := &PagerDuty{conf: c, tmpl: t, logger: l, client: client}
if c.ServiceKey != "" {
n.apiV1 = "https://events.pagerduty.com/generic/2010-04-15/create_event.json"
}
return n
return n, nil
}
const (
@ -251,7 +276,6 @@ type pagerDutyPayload struct {
func (n *PagerDuty) notifyV1(
ctx context.Context,
c *http.Client,
eventType, key string,
data *template.Data,
details map[string]string,
@ -282,18 +306,17 @@ func (n *PagerDuty) notifyV1(
return false, err
}
resp, err := post(ctx, c, n.apiV1, contentTypeJSON, &buf)
resp, err := post(ctx, n.client, n.apiV1, contentTypeJSON, &buf)
if err != nil {
return true, err
}
defer resp.Body.Close()
defer drainResponse(resp)
return n.retryV1(resp)
}
func (n *PagerDuty) notifyV2(
ctx context.Context,
c *http.Client,
eventType, key string,
data *template.Data,
details map[string]string,
@ -351,11 +374,11 @@ func (n *PagerDuty) notifyV2(
return false, fmt.Errorf("failed to encode PagerDuty v2 message: %v", err)
}
resp, err := post(ctx, c, n.conf.URL.String(), contentTypeJSON, &buf)
resp, err := post(ctx, n.client, n.conf.URL.String(), contentTypeJSON, &buf)
if err != nil {
return true, fmt.Errorf("failed to post message to PagerDuty: %v", err)
}
defer resp.Body.Close()
defer drainResponse(resp)
return n.retryV2(resp)
}
@ -369,7 +392,6 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error
return false, fmt.Errorf("group key missing")
}
var err error
var (
alerts = types.Alerts(as...)
data = n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
@ -379,7 +401,7 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error
eventType = pagerDutyEventResolve
}
level.Debug(n.logger).Log("msg", "Notifying PagerDuty", "incident", key, "eventType", eventType)
level.Debug(n.logger).Log("incident", key, "eventType", eventType)
details := make(map[string]string, len(n.conf.Details))
for k, v := range n.conf.Details {
@ -390,15 +412,10 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error
details[k] = detail
}
c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "pagerduty")
if err != nil {
return false, err
}
if n.apiV1 != "" {
return n.notifyV1(ctx, c, eventType, key, data, details, as...)
return n.notifyV1(ctx, eventType, key, data, details, as...)
}
return n.notifyV2(ctx, c, eventType, key, data, details, as...)
return n.notifyV2(ctx, eventType, key, data, details, as...)
}
func pagerDutyErr(status int, body io.Reader) error {
@ -448,15 +465,22 @@ type Slack struct {
conf *config.SlackConfig
tmpl *template.Template
logger log.Logger
client *http.Client
}
// NewSlack returns a new Slack notification handler.
func NewSlack(c *config.SlackConfig, t *template.Template, l log.Logger) *Slack {
func NewSlack(c *config.SlackConfig, t *template.Template, l log.Logger) (*Slack, error) {
client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "slack")
if err != nil {
return nil, err
}
return &Slack{
conf: c,
tmpl: t,
logger: l,
}
client: client,
}, nil
}
// slackReq is the request for sending a slack notification.
@ -575,17 +599,12 @@ func (n *Slack) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
return false, err
}
c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "slack")
if err != nil {
return false, err
}
u := n.conf.APIURL.String()
resp, err := post(ctx, c, u, contentTypeJSON, &buf)
resp, err := post(ctx, n.client, u, contentTypeJSON, &buf)
if err != nil {
return true, redactURL(err)
}
resp.Body.Close()
drainResponse(resp)
return n.retry(resp.StatusCode)
}
@ -606,15 +625,21 @@ type Hipchat struct {
conf *config.HipchatConfig
tmpl *template.Template
logger log.Logger
client *http.Client
}
// NewHipchat returns a new Hipchat notification handler.
func NewHipchat(c *config.HipchatConfig, t *template.Template, l log.Logger) *Hipchat {
func NewHipchat(c *config.HipchatConfig, t *template.Template, l log.Logger) (*Hipchat, error) {
client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "hipchat")
if err != nil {
return nil, err
}
return &Hipchat{
conf: c,
tmpl: t,
logger: l,
}
client: client,
}, nil
}
type hipchatReq struct {
@ -663,17 +688,11 @@ func (n *Hipchat) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
return false, err
}
c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "hipchat")
if err != nil {
return false, err
}
resp, err := post(ctx, c, apiURL.String(), contentTypeJSON, &buf)
resp, err := post(ctx, n.client, apiURL.String(), contentTypeJSON, &buf)
if err != nil {
return true, redactURL(err)
}
defer resp.Body.Close()
defer drainResponse(resp)
return n.retry(resp.StatusCode)
}
@ -694,6 +713,7 @@ type Wechat struct {
conf *config.WechatConfig
tmpl *template.Template
logger log.Logger
client *http.Client
accessToken string
accessTokenAt time.Time
@ -724,8 +744,13 @@ type weChatResponse struct {
}
// NewWechat returns a new Wechat notifier.
func NewWechat(c *config.WechatConfig, t *template.Template, l log.Logger) *Wechat {
return &Wechat{conf: c, tmpl: t, logger: l}
func NewWechat(c *config.WechatConfig, t *template.Template, l log.Logger) (*Wechat, error) {
client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "wechat")
if err != nil {
return nil, err
}
return &Wechat{conf: c, tmpl: t, logger: l, client: client}, nil
}
// Notify implements the Notifier interface.
@ -735,7 +760,7 @@ func (n *Wechat) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
return false, fmt.Errorf("group key missing")
}
level.Debug(n.logger).Log("msg", "Notifying Wechat", "incident", key)
level.Debug(n.logger).Log("incident", key)
data := n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
var err error
@ -744,11 +769,6 @@ func (n *Wechat) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
return false, err
}
c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "wechat")
if err != nil {
return false, err
}
// Refresh AccessToken over 2 hours
if n.accessToken == "" || time.Since(n.accessTokenAt) > 2*time.Hour {
parameters := url.Values{}
@ -769,11 +789,11 @@ func (n *Wechat) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := c.Do(req.WithContext(ctx))
resp, err := n.client.Do(req.WithContext(ctx))
if err != nil {
return true, redactURL(err)
}
defer resp.Body.Close()
defer drainResponse(resp)
var wechatToken WechatToken
if err := json.NewDecoder(resp.Body).Decode(&wechatToken); err != nil {
@ -820,17 +840,17 @@ func (n *Wechat) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
return true, err
}
resp, err := c.Do(req.WithContext(ctx))
resp, err := n.client.Do(req.WithContext(ctx))
if err != nil {
return true, redactURL(err)
}
defer resp.Body.Close()
defer drainResponse(resp)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return true, err
}
level.Debug(n.logger).Log("msg", "response: "+string(body), "incident", key)
level.Debug(n.logger).Log("response", string(body), "incident", key)
if resp.StatusCode != 200 {
return true, fmt.Errorf("unexpected status code %v", resp.StatusCode)
@ -860,11 +880,16 @@ type OpsGenie struct {
conf *config.OpsGenieConfig
tmpl *template.Template
logger log.Logger
client *http.Client
}
// NewOpsGenie returns a new OpsGenie notifier.
func NewOpsGenie(c *config.OpsGenieConfig, t *template.Template, l log.Logger) *OpsGenie {
return &OpsGenie{conf: c, tmpl: t, logger: l}
func NewOpsGenie(c *config.OpsGenieConfig, t *template.Template, l log.Logger) (*OpsGenie, error) {
client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "opsgenie")
if err != nil {
return nil, err
}
return &OpsGenie{conf: c, tmpl: t, logger: l, client: client}, nil
}
type opsGenieCreateMessage struct {
@ -897,17 +922,11 @@ func (n *OpsGenie) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
return retry, err
}
c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "opsgenie")
if err != nil {
return false, err
}
resp, err := c.Do(req.WithContext(ctx))
resp, err := n.client.Do(req)
if err != nil {
return true, err
}
defer resp.Body.Close()
defer drainResponse(resp)
return n.retry(resp.StatusCode)
}
@ -932,7 +951,7 @@ func (n *OpsGenie) createRequest(ctx context.Context, as ...*types.Alert) (*http
}
data := n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
level.Debug(n.logger).Log("msg", "Notifying OpsGenie", "incident", key)
level.Debug(n.logger).Log("incident", key)
var err error
tmpl := tmplText(n.tmpl, data, &err)
@ -958,7 +977,7 @@ func (n *OpsGenie) createRequest(ctx context.Context, as ...*types.Alert) (*http
default:
message, truncated := truncate(tmpl(n.conf.Message), 130)
if truncated {
level.Debug(n.logger).Log("msg", "truncated message due to OpsGenie message limit", "truncated_message", message, "incident", key)
level.Debug(n.logger).Log("msg", "truncated message", "truncated_message", message, "incident", key)
}
apiURL.Path += "v2/alerts"
@ -1011,7 +1030,7 @@ func (n *OpsGenie) createRequest(ctx context.Context, as ...*types.Alert) (*http
}
req.Header.Set("Content-Type", contentTypeJSON)
req.Header.Set("Authorization", fmt.Sprintf("GenieKey %s", apiKey))
return req, true, nil
return req.WithContext(ctx), true, nil
}
func (n *OpsGenie) retry(statusCode int) (bool, error) {
@ -1031,15 +1050,21 @@ type VictorOps struct {
conf *config.VictorOpsConfig
tmpl *template.Template
logger log.Logger
client *http.Client
}
// NewVictorOps returns a new VictorOps notifier.
func NewVictorOps(c *config.VictorOpsConfig, t *template.Template, l log.Logger) *VictorOps {
func NewVictorOps(c *config.VictorOpsConfig, t *template.Template, l log.Logger) (*VictorOps, error) {
client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "victorops")
if err != nil {
return nil, err
}
return &VictorOps{
conf: c,
tmpl: t,
logger: l,
}
client: client,
}, nil
}
const (
@ -1058,22 +1083,16 @@ func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) (bool, error
)
apiURL.Path += fmt.Sprintf("%s/%s", n.conf.APIKey, tmpl(n.conf.RoutingKey))
c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "victorops")
if err != nil {
return false, err
}
buf, err := n.createVictorOpsPayload(ctx, as...)
if err != nil {
return true, err
}
resp, err := post(ctx, c, apiURL.String(), contentTypeJSON, buf)
resp, err := post(ctx, n.client, apiURL.String(), contentTypeJSON, buf)
if err != nil {
return true, redactURL(err)
}
defer resp.Body.Close()
defer drainResponse(resp)
return n.retry(resp.StatusCode)
}
@ -1111,7 +1130,7 @@ func (n *VictorOps) createVictorOpsPayload(ctx context.Context, as ...*types.Ale
stateMessage, truncated := truncate(stateMessage, 20480)
if truncated {
level.Debug(n.logger).Log("msg", "truncated stateMessage due to VictorOps stateMessage limit", "truncated_state_message", stateMessage, "incident", key)
level.Debug(n.logger).Log("msg", "truncated stateMessage", "truncated_state_message", stateMessage, "incident", key)
}
msg := map[string]string{
@ -1158,12 +1177,23 @@ type Pushover struct {
conf *config.PushoverConfig
tmpl *template.Template
logger log.Logger
client *http.Client
apiURL string // for tests.
}
// NewPushover returns a new Pushover notifier.
func NewPushover(c *config.PushoverConfig, t *template.Template, l log.Logger) *Pushover {
return &Pushover{conf: c, tmpl: t, logger: l, apiURL: "https://api.pushover.net/1/messages.json"}
func NewPushover(c *config.PushoverConfig, t *template.Template, l log.Logger) (*Pushover, error) {
client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "pushover")
if err != nil {
return nil, err
}
return &Pushover{
conf: c,
tmpl: t,
logger: l,
client: client,
apiURL: "https://api.pushover.net/1/messages.json",
}, nil
}
// Notify implements the Notifier interface.
@ -1174,7 +1204,7 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
}
data := n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
level.Debug(n.logger).Log("msg", "Notifying Pushover", "incident", key)
level.Debug(n.logger).Log("incident", key)
var (
err error
@ -1189,7 +1219,7 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
title, truncated := truncate(tmpl(n.conf.Title), 250)
if truncated {
level.Debug(n.logger).Log("msg", "Truncated title due to Pushover title limit", "truncated_title", title, "incident", key)
level.Debug(n.logger).Log("msg", "Truncated title", "truncated_title", title, "incident", key)
}
parameters.Add("title", title)
@ -1202,7 +1232,7 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
message, truncated = truncate(message, 1024)
if truncated {
level.Debug(n.logger).Log("msg", "Truncated message due to Pushover message limit", "truncated_message", message, "incident", key)
level.Debug(n.logger).Log("msg", "Truncated message", "truncated_message", message, "incident", key)
}
message = strings.TrimSpace(message)
if message == "" {
@ -1213,7 +1243,7 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
supplementaryURL, truncated := truncate(tmpl(n.conf.URL), 512)
if truncated {
level.Debug(n.logger).Log("msg", "Truncated URL due to Pushover url limit", "truncated_url", supplementaryURL, "incident", key)
level.Debug(n.logger).Log("msg", "Truncated URL", "truncated_url", supplementaryURL, "incident", key)
}
parameters.Add("url", supplementaryURL)
parameters.Add("url_title", tmpl(n.conf.URLTitle))
@ -1232,18 +1262,12 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
}
u.RawQuery = parameters.Encode()
// Don't log the URL as it contains secret data (see #1825).
level.Debug(n.logger).Log("msg", "Sending Pushover message", "incident", key)
c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "pushover")
if err != nil {
return false, err
}
resp, err := post(ctx, c, u.String(), "text/plain", nil)
level.Debug(n.logger).Log("msg", "Sending message", "incident", key)
resp, err := post(ctx, n.client, u.String(), "text/plain", nil)
if err != nil {
return true, redactURL(err)
}
defer resp.Body.Close()
defer drainResponse(resp)
return n.retry(resp.StatusCode)
}
@ -1314,6 +1338,13 @@ func post(ctx context.Context, client *http.Client, url string, bodyType string,
return client.Do(req.WithContext(ctx))
}
// drainResponse consumes and closes the response's body to make sure that the
// HTTP client can reuse existing connections.
func drainResponse(resp *http.Response) {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
func truncate(s string, n int) (string, bool) {
r := []rune(s)
if len(r) <= n {

View File

@ -91,6 +91,74 @@ func assertNotifyLeaksNoSecret(t *testing.T, ctx context.Context, n Notifier, se
require.True(t, ok)
}
func TestBuildReceiverIntegrations(t *testing.T) {
for _, tc := range []struct {
receiver *config.Receiver
err bool
exp []Integration
}{
{
receiver: &config.Receiver{
Name: "foo",
WebhookConfigs: []*config.WebhookConfig{
&config.WebhookConfig{
HTTPConfig: &commoncfg.HTTPClientConfig{},
},
&config.WebhookConfig{
HTTPConfig: &commoncfg.HTTPClientConfig{},
NotifierConfig: config.NotifierConfig{
VSendResolved: true,
},
},
},
},
exp: []Integration{
Integration{
name: "webhook",
idx: 0,
rs: sendResolved(false),
},
Integration{
name: "webhook",
idx: 1,
rs: sendResolved(true),
},
},
},
{
receiver: &config.Receiver{
Name: "foo",
WebhookConfigs: []*config.WebhookConfig{
&config.WebhookConfig{
HTTPConfig: &commoncfg.HTTPClientConfig{
TLSConfig: commoncfg.TLSConfig{
CAFile: "not_existing",
},
},
},
},
},
err: true,
},
} {
tc := tc
t.Run("", func(t *testing.T) {
integrations, err := BuildReceiverIntegrations(tc.receiver, nil, nil)
if tc.err {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Len(t, integrations, len(tc.exp))
for i := range tc.exp {
require.Equal(t, tc.exp[i].SendResolved(), integrations[i].SendResolved())
require.Equal(t, tc.exp[i].Name(), integrations[i].Name())
require.Equal(t, tc.exp[i].Index(), integrations[i].Index())
}
})
}
}
func TestWebhookRetry(t *testing.T) {
u, err := url.Parse("http://example.com")
if err != nil {
@ -134,7 +202,7 @@ func TestPagerDutyRedactedURLV1(t *testing.T) {
defer fn()
key := "01234567890123456789012345678901"
notifier := NewPagerDuty(
notifier, err := NewPagerDuty(
&config.PagerdutyConfig{
ServiceKey: config.Secret(key),
HTTPConfig: &commoncfg.HTTPClientConfig{},
@ -142,6 +210,7 @@ func TestPagerDutyRedactedURLV1(t *testing.T) {
createTmpl(t),
log.NewNopLogger(),
)
require.NoError(t, err)
notifier.apiV1 = u.String()
assertNotifyLeaksNoSecret(t, ctx, notifier, key)
@ -152,7 +221,7 @@ func TestPagerDutyRedactedURLV2(t *testing.T) {
defer fn()
key := "01234567890123456789012345678901"
notifier := NewPagerDuty(
notifier, err := NewPagerDuty(
&config.PagerdutyConfig{
URL: &config.URL{URL: u},
RoutingKey: config.Secret(key),
@ -161,6 +230,7 @@ func TestPagerDutyRedactedURLV2(t *testing.T) {
createTmpl(t),
log.NewNopLogger(),
)
require.NoError(t, err)
assertNotifyLeaksNoSecret(t, ctx, notifier, key)
}
@ -219,7 +289,7 @@ func TestSlackRedactedURL(t *testing.T) {
ctx, u, fn := getContextWithCancelingURL()
defer fn()
notifier := NewSlack(
notifier, err := NewSlack(
&config.SlackConfig{
APIURL: &config.SecretURL{URL: u},
HTTPConfig: &commoncfg.HTTPClientConfig{},
@ -227,6 +297,7 @@ func TestSlackRedactedURL(t *testing.T) {
createTmpl(t),
log.NewNopLogger(),
)
require.NoError(t, err)
assertNotifyLeaksNoSecret(t, ctx, notifier, u.String())
}
@ -245,7 +316,7 @@ func TestHipchatRedactedURL(t *testing.T) {
defer fn()
token := "secret_token"
notifier := NewHipchat(
notifier, err := NewHipchat(
&config.HipchatConfig{
APIURL: &config.URL{URL: u},
AuthToken: config.Secret(token),
@ -254,6 +325,7 @@ func TestHipchatRedactedURL(t *testing.T) {
createTmpl(t),
log.NewNopLogger(),
)
require.NoError(t, err)
assertNotifyLeaksNoSecret(t, ctx, notifier, token)
}
@ -273,7 +345,7 @@ func TestOpsGenieRedactedURL(t *testing.T) {
defer fn()
key := "key"
notifier := NewOpsGenie(
notifier, err := NewOpsGenie(
&config.OpsGenieConfig{
APIURL: &config.URL{URL: u},
APIKey: config.Secret(key),
@ -282,6 +354,7 @@ func TestOpsGenieRedactedURL(t *testing.T) {
createTmpl(t),
log.NewNopLogger(),
)
require.NoError(t, err)
assertNotifyLeaksNoSecret(t, ctx, notifier, key)
}
@ -299,7 +372,7 @@ func TestVictorOpsRedactedURL(t *testing.T) {
defer fn()
secret := "secret"
notifier := NewVictorOps(
notifier, err := NewVictorOps(
&config.VictorOpsConfig{
APIURL: &config.URL{URL: u},
APIKey: config.Secret(secret),
@ -308,6 +381,7 @@ func TestVictorOpsRedactedURL(t *testing.T) {
createTmpl(t),
log.NewNopLogger(),
)
require.NoError(t, err)
assertNotifyLeaksNoSecret(t, ctx, notifier, secret)
}
@ -325,7 +399,7 @@ func TestPushoverRedactedURL(t *testing.T) {
defer fn()
key, token := "user_key", "token"
notifier := NewPushover(
notifier, err := NewPushover(
&config.PushoverConfig{
UserKey: config.Secret(key),
Token: config.Secret(token),
@ -334,6 +408,7 @@ func TestPushoverRedactedURL(t *testing.T) {
createTmpl(t),
log.NewNopLogger(),
)
require.NoError(t, err)
notifier.apiURL = u.String()
assertNotifyLeaksNoSecret(t, ctx, notifier, key, token)
@ -471,13 +546,15 @@ func TestOpsGenie(t *testing.T) {
Type: `{{ .CommonLabels.ResponderType2 }}`,
},
},
Tags: `{{ .CommonLabels.Tags }}`,
Note: `{{ .CommonLabels.Note }}`,
Priority: `{{ .CommonLabels.Priority }}`,
APIKey: `{{ .ExternalURL }}`,
APIURL: &config.URL{URL: u},
Tags: `{{ .CommonLabels.Tags }}`,
Note: `{{ .CommonLabels.Note }}`,
Priority: `{{ .CommonLabels.Priority }}`,
APIKey: `{{ .ExternalURL }}`,
APIURL: &config.URL{URL: u},
HTTPConfig: &commoncfg.HTTPClientConfig{},
}
notifier := NewOpsGenie(conf, tmpl, logger)
notifier, err := NewOpsGenie(conf, tmpl, logger)
require.NoError(t, err)
ctx := context.Background()
ctx = WithGroupKey(ctx, "1")
@ -552,9 +629,11 @@ func TestVictorOpsCustomFields(t *testing.T) {
CustomFields: map[string]string{
"Field_A": "{{ .CommonLabels.Message }}",
},
HTTPConfig: &commoncfg.HTTPClientConfig{},
}
notifier := NewVictorOps(conf, tmpl, logger)
notifier, err := NewVictorOps(conf, tmpl, logger)
require.NoError(t, err)
ctx := context.Background()
ctx = WithGroupKey(ctx, "1")
@ -586,7 +665,7 @@ func TestWechatRedactedURLOnInitialAuthentication(t *testing.T) {
defer fn()
secret := "secret_key"
notifier := NewWechat(
notifier, err := NewWechat(
&config.WechatConfig{
APIURL: &config.URL{URL: u},
HTTPConfig: &commoncfg.HTTPClientConfig{},
@ -596,6 +675,7 @@ func TestWechatRedactedURLOnInitialAuthentication(t *testing.T) {
createTmpl(t),
log.NewNopLogger(),
)
require.NoError(t, err)
assertNotifyLeaksNoSecret(t, ctx, notifier, secret)
}
@ -607,7 +687,7 @@ func TestWechatRedactedURLOnNotify(t *testing.T) {
})
defer fn()
notifier := NewWechat(
notifier, err := NewWechat(
&config.WechatConfig{
APIURL: &config.URL{URL: u},
HTTPConfig: &commoncfg.HTTPClientConfig{},
@ -617,6 +697,7 @@ func TestWechatRedactedURLOnNotify(t *testing.T) {
createTmpl(t),
log.NewNopLogger(),
)
require.NoError(t, err)
assertNotifyLeaksNoSecret(t, ctx, notifier, secret, token)
}

View File

@ -28,12 +28,10 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
)
@ -92,7 +90,8 @@ func init() {
prometheus.MustRegister(notificationLatencySeconds)
}
type notifierConfig interface {
// ResolvedSender returns true if resolved notifications should be sent.
type ResolvedSender interface {
SendResolved() bool
}
@ -234,40 +233,39 @@ type NotificationLog interface {
// BuildPipeline builds a map of receivers to Stages.
func BuildPipeline(
confs []*config.Receiver,
tmpl *template.Template,
receivers map[string][]Integration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
notificationLog NotificationLog,
peer *cluster.Peer,
logger log.Logger,
) RoutingStage {
rs := RoutingStage{}
rs := make(RoutingStage, len(receivers))
ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor)
ss := NewMuteStage(silencer)
for _, rc := range confs {
rs[rc.Name] = MultiStage{ms, is, ss, createStage(rc, tmpl, wait, notificationLog, logger)}
for name := range receivers {
st := createReceiverStage(name, receivers[name], wait, notificationLog)
rs[name] = MultiStage{ms, is, ss, st}
}
return rs
}
// createStage creates a pipeline of stages for a receiver.
func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.Duration, notificationLog NotificationLog, logger log.Logger) Stage {
// createReceiverStage creates a pipeline of stages for a receiver.
func createReceiverStage(name string, integrations []Integration, wait func() time.Duration, notificationLog NotificationLog) Stage {
var fs FanoutStage
for _, i := range BuildReceiverIntegrations(rc, tmpl, logger) {
for i := range integrations {
recv := &nflogpb.Receiver{
GroupName: rc.Name,
Integration: i.name,
Idx: uint32(i.idx),
GroupName: name,
Integration: integrations[i].Name(),
Idx: uint32(integrations[i].Index()),
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(i, notificationLog, recv))
s = append(s, NewRetryStage(i, rc.Name))
s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
s = append(s, NewRetryStage(integrations[i], name))
s = append(s, NewSetNotifiesStage(notificationLog, recv))
fs = append(fs, s)
@ -416,20 +414,20 @@ func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al
// DedupStage filters alerts.
// Filtering happens based on a notification log.
type DedupStage struct {
rs ResolvedSender
nflog NotificationLog
recv *nflogpb.Receiver
conf notifierConfig
now func() time.Time
hash func(*types.Alert) uint64
}
// NewDedupStage wraps a DedupStage that runs against the given notification log.
func NewDedupStage(i Integration, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
return &DedupStage{
rs: rs,
nflog: l,
recv: recv,
conf: i.conf,
now: utcNow,
hash: hashAlert,
}
@ -502,7 +500,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint
return len(entry.FiringAlerts) > 0
}
if n.conf.SendResolved() && !entry.IsResolvedSubset(resolved) {
if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
return true
}
@ -583,7 +581,7 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
// If we shouldn't send notifications for resolved alerts, but there are only
// resolved alerts, report them all as successfully notified (we still want the
// notification log to log them for the next run of DedupStage).
if !r.integration.conf.SendResolved() {
if !r.integration.SendResolved() {
firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, fmt.Errorf("firing alerts missing")
@ -625,13 +623,13 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
case <-tick.C:
now := time.Now()
retry, err := r.integration.Notify(ctx, sent...)
notificationLatencySeconds.WithLabelValues(r.integration.name).Observe(time.Since(now).Seconds())
numNotifications.WithLabelValues(r.integration.name).Inc()
notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds())
numNotifications.WithLabelValues(r.integration.Name()).Inc()
if err != nil {
numFailedNotifications.WithLabelValues(r.integration.name).Inc()
level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.name, "receiver", r.groupName, "err", err)
numFailedNotifications.WithLabelValues(r.integration.Name()).Inc()
level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.Name(), "receiver", r.groupName, "err", err)
if !retry {
return ctx, alerts, fmt.Errorf("cancelling notify retry for %q due to unrecoverable error: %s", r.integration.name, err)
return ctx, alerts, fmt.Errorf("cancelling notify retry for %q due to unrecoverable error: %s", r.integration.Name(), err)
}
// Save this error to be able to return the last seen error by an

View File

@ -33,10 +33,10 @@ import (
"github.com/prometheus/alertmanager/types"
)
type notifierConfigFunc func() bool
type sendResolved bool
func (f notifierConfigFunc) SendResolved() bool {
return f()
func (s sendResolved) SendResolved() bool {
return bool(s)
}
type notifierFunc func(ctx context.Context, alerts ...*types.Alert) (bool, error)
@ -201,8 +201,8 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
t.Log("case", i)
s := &DedupStage{
now: func() time.Time { return now },
conf: notifierConfigFunc(func() bool { return c.resolve }),
now: func() time.Time { return now },
rs: sendResolved(c.resolve),
}
res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat)
require.Equal(t, c.res, res)
@ -221,7 +221,7 @@ func TestDedupStage(t *testing.T) {
now: func() time.Time {
return now
},
conf: notifierConfigFunc(func() bool { return false }),
rs: sendResolved(false),
}
ctx := context.Background()
@ -384,7 +384,7 @@ func TestRetryStageWithError(t *testing.T) {
sent = append(sent, alerts...)
return false, nil
}),
conf: notifierConfigFunc(func() bool { return false }),
rs: sendResolved(false),
}
r := RetryStage{
integration: i,
@ -424,7 +424,7 @@ func TestRetryStageNoResolved(t *testing.T) {
sent = append(sent, alerts...)
return false, nil
}),
conf: notifierConfigFunc(func() bool { return false }),
rs: sendResolved(false),
}
r := RetryStage{
integration: i,
@ -477,7 +477,7 @@ func TestRetryStageSendResolved(t *testing.T) {
sent = append(sent, alerts...)
return false, nil
}),
conf: notifierConfigFunc(func() bool { return true }),
rs: sendResolved(true),
}
r := RetryStage{
integration: i,

View File

@ -1,2 +1,4 @@
// Don't flag lines such as "io.Copy(ioutil.Discard, resp.Body)".
io.Copy
// Never check for logger errors.
(github.com/go-kit/kit/log.Logger).Log