From 2abd78cbb727f9a432cce1bc2056f40e2e455ac4 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Fri, 7 Jun 2019 10:37:49 +0200 Subject: [PATCH] *: use persistent HTTP clients (#1904) Signed-off-by: Simon Pasquier --- cmd/alertmanager/main.go | 21 ++- notify/impl.go | 283 +++++++++++++++++++--------------- notify/impl_test.go | 113 ++++++++++++-- notify/notify.go | 52 +++---- notify/notify_test.go | 18 +-- scripts/errcheck_excludes.txt | 2 + 6 files changed, 304 insertions(+), 185 deletions(-) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 146d9e0d..69a7e2aa 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -289,9 +289,7 @@ func run() int { var ( inhibitor *inhibit.Inhibitor - silencer *silence.Silencer tmpl *template.Template - pipeline notify.Stage ) configCoordinator := config.NewCoordinator( @@ -306,20 +304,29 @@ func run() int { } tmpl.ExternalURL = amURL + // Build the map of receiver to integrations. + receivers := make(map[string][]notify.Integration, len(conf.Receivers)) + for _, rcv := range conf.Receivers { + integrations, err := notify.BuildReceiverIntegrations(rcv, tmpl, logger) + if err != nil { + return err + } + // rcv.Name is guaranteed to be unique across all receivers. + receivers[rcv.Name] = integrations + } + inhibitor.Stop() disp.Stop() inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger) - silencer = silence.NewSilencer(silences, marker, logger) - pipeline = notify.BuildPipeline( - conf.Receivers, - tmpl, + silencer := silence.NewSilencer(silences, marker, logger) + pipeline := notify.BuildPipeline( + receivers, waitFunc, inhibitor, silencer, notificationLog, peer, - logger, ) api.Update(conf, func(labels model.LabelSet) { diff --git a/notify/impl.go b/notify/impl.go index 4e73e6aa..a6df557f 100644 --- a/notify/impl.go +++ b/notify/impl.go @@ -38,18 +38,18 @@ import ( "github.com/prometheus/alertmanager/types" ) -// A Notifier notifies about alerts under constraints of the given context. -// It returns an error if unsuccessful and a flag whether the error is +// Notifier notifies about alerts under constraints of the given context. It +// returns an error if unsuccessful and a flag whether the error is // recoverable. This information is useful for a retry logic. type Notifier interface { Notify(context.Context, ...*types.Alert) (bool, error) } -// An Integration wraps a notifier and its config to be uniquely identified by -// name and index from its origin in the configuration. +// Integration wraps a notifier and its configuration to be uniquely identified +// by name and index from its origin in the configuration. type Integration struct { notifier Notifier - conf notifierConfig + rs ResolvedSender name string idx int } @@ -59,15 +59,36 @@ func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, return i.notifier.Notify(ctx, alerts...) } +// SendResolved implements the ResolvedSender interface. +func (i *Integration) SendResolved() bool { + return i.rs.SendResolved() +} + +// Name returns the name of the integration. +func (i *Integration) Name() string { + return i.name +} + +// Name returns the index of the integration. +func (i *Integration) Index() int { + return i.idx +} + // BuildReceiverIntegrations builds a list of integration notifiers off of a -// receivers config. -func BuildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, logger log.Logger) []Integration { +// receiver config. +func BuildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, logger log.Logger) ([]Integration, error) { var ( + errs types.MultiError integrations []Integration - add = func(name string, i int, n Notifier, nc notifierConfig) { + add = func(name string, i int, rs ResolvedSender, f func(l log.Logger) (Notifier, error)) { + n, err := f(log.With(logger, "integration", name)) + if err != nil { + errs.Add(err) + return + } integrations = append(integrations, Integration{ notifier: n, - conf: nc, + rs: rs, name: name, idx: i, }) @@ -75,42 +96,36 @@ func BuildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, log ) for i, c := range nc.WebhookConfigs { - n := NewWebhook(c, tmpl, logger) - add("webhook", i, n, c) + add("webhook", i, c, func(l log.Logger) (Notifier, error) { return NewWebhook(c, tmpl, l) }) } for i, c := range nc.EmailConfigs { - n := NewEmail(c, tmpl, logger) - add("email", i, n, c) + add("email", i, c, func(l log.Logger) (Notifier, error) { return NewEmail(c, tmpl, l), nil }) } for i, c := range nc.PagerdutyConfigs { - n := NewPagerDuty(c, tmpl, logger) - add("pagerduty", i, n, c) + add("pagerduty", i, c, func(l log.Logger) (Notifier, error) { return NewPagerDuty(c, tmpl, l) }) } for i, c := range nc.OpsGenieConfigs { - n := NewOpsGenie(c, tmpl, logger) - add("opsgenie", i, n, c) + add("opsgenie", i, c, func(l log.Logger) (Notifier, error) { return NewOpsGenie(c, tmpl, l) }) } for i, c := range nc.WechatConfigs { - n := NewWechat(c, tmpl, logger) - add("wechat", i, n, c) + add("wechat", i, c, func(l log.Logger) (Notifier, error) { return NewWechat(c, tmpl, l) }) } for i, c := range nc.SlackConfigs { - n := NewSlack(c, tmpl, logger) - add("slack", i, n, c) + add("slack", i, c, func(l log.Logger) (Notifier, error) { return NewSlack(c, tmpl, l) }) } for i, c := range nc.HipchatConfigs { - n := NewHipchat(c, tmpl, logger) - add("hipchat", i, n, c) + add("hipchat", i, c, func(l log.Logger) (Notifier, error) { return NewHipchat(c, tmpl, l) }) } for i, c := range nc.VictorOpsConfigs { - n := NewVictorOps(c, tmpl, logger) - add("victorops", i, n, c) + add("victorops", i, c, func(l log.Logger) (Notifier, error) { return NewVictorOps(c, tmpl, l) }) } for i, c := range nc.PushoverConfigs { - n := NewPushover(c, tmpl, logger) - add("pushover", i, n, c) + add("pushover", i, c, func(l log.Logger) (Notifier, error) { return NewPushover(c, tmpl, l) }) } - return integrations + if errs.Len() > 0 { + return nil, &errs + } + return integrations, nil } const contentTypeJSON = "application/json" @@ -122,11 +137,21 @@ type Webhook struct { conf *config.WebhookConfig tmpl *template.Template logger log.Logger + client *http.Client } // NewWebhook returns a new Webhook. -func NewWebhook(conf *config.WebhookConfig, t *template.Template, l log.Logger) *Webhook { - return &Webhook{conf: conf, tmpl: t, logger: l} +func NewWebhook(conf *config.WebhookConfig, t *template.Template, l log.Logger) (*Webhook, error) { + client, err := commoncfg.NewClientFromConfig(*conf.HTTPConfig, "webhook") + if err != nil { + return nil, err + } + return &Webhook{ + conf: conf, + tmpl: t, + logger: l, + client: client, + }, nil } // WebhookMessage defines the JSON object send to webhook endpoints. @@ -165,16 +190,11 @@ func (w *Webhook) Notify(ctx context.Context, alerts ...*types.Alert) (bool, err req.Header.Set("Content-Type", contentTypeJSON) req.Header.Set("User-Agent", userAgentHeader) - c, err := commoncfg.NewClientFromConfig(*w.conf.HTTPConfig, "webhook") - if err != nil { - return false, err - } - - resp, err := c.Do(req.WithContext(ctx)) + resp, err := w.client.Do(req.WithContext(ctx)) if err != nil { return true, err } - resp.Body.Close() + drainResponse(resp) return w.retry(resp.StatusCode) } @@ -195,15 +215,20 @@ type PagerDuty struct { tmpl *template.Template logger log.Logger apiV1 string // for tests. + client *http.Client } // NewPagerDuty returns a new PagerDuty notifier. -func NewPagerDuty(c *config.PagerdutyConfig, t *template.Template, l log.Logger) *PagerDuty { - n := &PagerDuty{conf: c, tmpl: t, logger: l} +func NewPagerDuty(c *config.PagerdutyConfig, t *template.Template, l log.Logger) (*PagerDuty, error) { + client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "pagerduty") + if err != nil { + return nil, err + } + n := &PagerDuty{conf: c, tmpl: t, logger: l, client: client} if c.ServiceKey != "" { n.apiV1 = "https://events.pagerduty.com/generic/2010-04-15/create_event.json" } - return n + return n, nil } const ( @@ -251,7 +276,6 @@ type pagerDutyPayload struct { func (n *PagerDuty) notifyV1( ctx context.Context, - c *http.Client, eventType, key string, data *template.Data, details map[string]string, @@ -282,18 +306,17 @@ func (n *PagerDuty) notifyV1( return false, err } - resp, err := post(ctx, c, n.apiV1, contentTypeJSON, &buf) + resp, err := post(ctx, n.client, n.apiV1, contentTypeJSON, &buf) if err != nil { return true, err } - defer resp.Body.Close() + defer drainResponse(resp) return n.retryV1(resp) } func (n *PagerDuty) notifyV2( ctx context.Context, - c *http.Client, eventType, key string, data *template.Data, details map[string]string, @@ -351,11 +374,11 @@ func (n *PagerDuty) notifyV2( return false, fmt.Errorf("failed to encode PagerDuty v2 message: %v", err) } - resp, err := post(ctx, c, n.conf.URL.String(), contentTypeJSON, &buf) + resp, err := post(ctx, n.client, n.conf.URL.String(), contentTypeJSON, &buf) if err != nil { return true, fmt.Errorf("failed to post message to PagerDuty: %v", err) } - defer resp.Body.Close() + defer drainResponse(resp) return n.retryV2(resp) } @@ -369,7 +392,6 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error return false, fmt.Errorf("group key missing") } - var err error var ( alerts = types.Alerts(as...) data = n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...) @@ -379,7 +401,7 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error eventType = pagerDutyEventResolve } - level.Debug(n.logger).Log("msg", "Notifying PagerDuty", "incident", key, "eventType", eventType) + level.Debug(n.logger).Log("incident", key, "eventType", eventType) details := make(map[string]string, len(n.conf.Details)) for k, v := range n.conf.Details { @@ -390,15 +412,10 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error details[k] = detail } - c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "pagerduty") - if err != nil { - return false, err - } - if n.apiV1 != "" { - return n.notifyV1(ctx, c, eventType, key, data, details, as...) + return n.notifyV1(ctx, eventType, key, data, details, as...) } - return n.notifyV2(ctx, c, eventType, key, data, details, as...) + return n.notifyV2(ctx, eventType, key, data, details, as...) } func pagerDutyErr(status int, body io.Reader) error { @@ -448,15 +465,22 @@ type Slack struct { conf *config.SlackConfig tmpl *template.Template logger log.Logger + client *http.Client } // NewSlack returns a new Slack notification handler. -func NewSlack(c *config.SlackConfig, t *template.Template, l log.Logger) *Slack { +func NewSlack(c *config.SlackConfig, t *template.Template, l log.Logger) (*Slack, error) { + client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "slack") + if err != nil { + return nil, err + } + return &Slack{ conf: c, tmpl: t, logger: l, - } + client: client, + }, nil } // slackReq is the request for sending a slack notification. @@ -575,17 +599,12 @@ func (n *Slack) Notify(ctx context.Context, as ...*types.Alert) (bool, error) { return false, err } - c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "slack") - if err != nil { - return false, err - } - u := n.conf.APIURL.String() - resp, err := post(ctx, c, u, contentTypeJSON, &buf) + resp, err := post(ctx, n.client, u, contentTypeJSON, &buf) if err != nil { return true, redactURL(err) } - resp.Body.Close() + drainResponse(resp) return n.retry(resp.StatusCode) } @@ -606,15 +625,21 @@ type Hipchat struct { conf *config.HipchatConfig tmpl *template.Template logger log.Logger + client *http.Client } // NewHipchat returns a new Hipchat notification handler. -func NewHipchat(c *config.HipchatConfig, t *template.Template, l log.Logger) *Hipchat { +func NewHipchat(c *config.HipchatConfig, t *template.Template, l log.Logger) (*Hipchat, error) { + client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "hipchat") + if err != nil { + return nil, err + } return &Hipchat{ conf: c, tmpl: t, logger: l, - } + client: client, + }, nil } type hipchatReq struct { @@ -663,17 +688,11 @@ func (n *Hipchat) Notify(ctx context.Context, as ...*types.Alert) (bool, error) return false, err } - c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "hipchat") - if err != nil { - return false, err - } - - resp, err := post(ctx, c, apiURL.String(), contentTypeJSON, &buf) + resp, err := post(ctx, n.client, apiURL.String(), contentTypeJSON, &buf) if err != nil { return true, redactURL(err) } - - defer resp.Body.Close() + defer drainResponse(resp) return n.retry(resp.StatusCode) } @@ -694,6 +713,7 @@ type Wechat struct { conf *config.WechatConfig tmpl *template.Template logger log.Logger + client *http.Client accessToken string accessTokenAt time.Time @@ -724,8 +744,13 @@ type weChatResponse struct { } // NewWechat returns a new Wechat notifier. -func NewWechat(c *config.WechatConfig, t *template.Template, l log.Logger) *Wechat { - return &Wechat{conf: c, tmpl: t, logger: l} +func NewWechat(c *config.WechatConfig, t *template.Template, l log.Logger) (*Wechat, error) { + client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "wechat") + if err != nil { + return nil, err + } + + return &Wechat{conf: c, tmpl: t, logger: l, client: client}, nil } // Notify implements the Notifier interface. @@ -735,7 +760,7 @@ func (n *Wechat) Notify(ctx context.Context, as ...*types.Alert) (bool, error) { return false, fmt.Errorf("group key missing") } - level.Debug(n.logger).Log("msg", "Notifying Wechat", "incident", key) + level.Debug(n.logger).Log("incident", key) data := n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...) var err error @@ -744,11 +769,6 @@ func (n *Wechat) Notify(ctx context.Context, as ...*types.Alert) (bool, error) { return false, err } - c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "wechat") - if err != nil { - return false, err - } - // Refresh AccessToken over 2 hours if n.accessToken == "" || time.Since(n.accessTokenAt) > 2*time.Hour { parameters := url.Values{} @@ -769,11 +789,11 @@ func (n *Wechat) Notify(ctx context.Context, as ...*types.Alert) (bool, error) { req.Header.Set("Content-Type", contentTypeJSON) - resp, err := c.Do(req.WithContext(ctx)) + resp, err := n.client.Do(req.WithContext(ctx)) if err != nil { return true, redactURL(err) } - defer resp.Body.Close() + defer drainResponse(resp) var wechatToken WechatToken if err := json.NewDecoder(resp.Body).Decode(&wechatToken); err != nil { @@ -820,17 +840,17 @@ func (n *Wechat) Notify(ctx context.Context, as ...*types.Alert) (bool, error) { return true, err } - resp, err := c.Do(req.WithContext(ctx)) + resp, err := n.client.Do(req.WithContext(ctx)) if err != nil { return true, redactURL(err) } - defer resp.Body.Close() + defer drainResponse(resp) body, err := ioutil.ReadAll(resp.Body) if err != nil { return true, err } - level.Debug(n.logger).Log("msg", "response: "+string(body), "incident", key) + level.Debug(n.logger).Log("response", string(body), "incident", key) if resp.StatusCode != 200 { return true, fmt.Errorf("unexpected status code %v", resp.StatusCode) @@ -860,11 +880,16 @@ type OpsGenie struct { conf *config.OpsGenieConfig tmpl *template.Template logger log.Logger + client *http.Client } // NewOpsGenie returns a new OpsGenie notifier. -func NewOpsGenie(c *config.OpsGenieConfig, t *template.Template, l log.Logger) *OpsGenie { - return &OpsGenie{conf: c, tmpl: t, logger: l} +func NewOpsGenie(c *config.OpsGenieConfig, t *template.Template, l log.Logger) (*OpsGenie, error) { + client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "opsgenie") + if err != nil { + return nil, err + } + return &OpsGenie{conf: c, tmpl: t, logger: l, client: client}, nil } type opsGenieCreateMessage struct { @@ -897,17 +922,11 @@ func (n *OpsGenie) Notify(ctx context.Context, as ...*types.Alert) (bool, error) return retry, err } - c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "opsgenie") - if err != nil { - return false, err - } - - resp, err := c.Do(req.WithContext(ctx)) - + resp, err := n.client.Do(req) if err != nil { return true, err } - defer resp.Body.Close() + defer drainResponse(resp) return n.retry(resp.StatusCode) } @@ -932,7 +951,7 @@ func (n *OpsGenie) createRequest(ctx context.Context, as ...*types.Alert) (*http } data := n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...) - level.Debug(n.logger).Log("msg", "Notifying OpsGenie", "incident", key) + level.Debug(n.logger).Log("incident", key) var err error tmpl := tmplText(n.tmpl, data, &err) @@ -958,7 +977,7 @@ func (n *OpsGenie) createRequest(ctx context.Context, as ...*types.Alert) (*http default: message, truncated := truncate(tmpl(n.conf.Message), 130) if truncated { - level.Debug(n.logger).Log("msg", "truncated message due to OpsGenie message limit", "truncated_message", message, "incident", key) + level.Debug(n.logger).Log("msg", "truncated message", "truncated_message", message, "incident", key) } apiURL.Path += "v2/alerts" @@ -1011,7 +1030,7 @@ func (n *OpsGenie) createRequest(ctx context.Context, as ...*types.Alert) (*http } req.Header.Set("Content-Type", contentTypeJSON) req.Header.Set("Authorization", fmt.Sprintf("GenieKey %s", apiKey)) - return req, true, nil + return req.WithContext(ctx), true, nil } func (n *OpsGenie) retry(statusCode int) (bool, error) { @@ -1031,15 +1050,21 @@ type VictorOps struct { conf *config.VictorOpsConfig tmpl *template.Template logger log.Logger + client *http.Client } // NewVictorOps returns a new VictorOps notifier. -func NewVictorOps(c *config.VictorOpsConfig, t *template.Template, l log.Logger) *VictorOps { +func NewVictorOps(c *config.VictorOpsConfig, t *template.Template, l log.Logger) (*VictorOps, error) { + client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "victorops") + if err != nil { + return nil, err + } return &VictorOps{ conf: c, tmpl: t, logger: l, - } + client: client, + }, nil } const ( @@ -1058,22 +1083,16 @@ func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) (bool, error ) apiURL.Path += fmt.Sprintf("%s/%s", n.conf.APIKey, tmpl(n.conf.RoutingKey)) - c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "victorops") - if err != nil { - return false, err - } - buf, err := n.createVictorOpsPayload(ctx, as...) if err != nil { return true, err } - resp, err := post(ctx, c, apiURL.String(), contentTypeJSON, buf) + resp, err := post(ctx, n.client, apiURL.String(), contentTypeJSON, buf) if err != nil { return true, redactURL(err) } - - defer resp.Body.Close() + defer drainResponse(resp) return n.retry(resp.StatusCode) } @@ -1111,7 +1130,7 @@ func (n *VictorOps) createVictorOpsPayload(ctx context.Context, as ...*types.Ale stateMessage, truncated := truncate(stateMessage, 20480) if truncated { - level.Debug(n.logger).Log("msg", "truncated stateMessage due to VictorOps stateMessage limit", "truncated_state_message", stateMessage, "incident", key) + level.Debug(n.logger).Log("msg", "truncated stateMessage", "truncated_state_message", stateMessage, "incident", key) } msg := map[string]string{ @@ -1158,12 +1177,23 @@ type Pushover struct { conf *config.PushoverConfig tmpl *template.Template logger log.Logger + client *http.Client apiURL string // for tests. } // NewPushover returns a new Pushover notifier. -func NewPushover(c *config.PushoverConfig, t *template.Template, l log.Logger) *Pushover { - return &Pushover{conf: c, tmpl: t, logger: l, apiURL: "https://api.pushover.net/1/messages.json"} +func NewPushover(c *config.PushoverConfig, t *template.Template, l log.Logger) (*Pushover, error) { + client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "pushover") + if err != nil { + return nil, err + } + return &Pushover{ + conf: c, + tmpl: t, + logger: l, + client: client, + apiURL: "https://api.pushover.net/1/messages.json", + }, nil } // Notify implements the Notifier interface. @@ -1174,7 +1204,7 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error) } data := n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...) - level.Debug(n.logger).Log("msg", "Notifying Pushover", "incident", key) + level.Debug(n.logger).Log("incident", key) var ( err error @@ -1189,7 +1219,7 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error) title, truncated := truncate(tmpl(n.conf.Title), 250) if truncated { - level.Debug(n.logger).Log("msg", "Truncated title due to Pushover title limit", "truncated_title", title, "incident", key) + level.Debug(n.logger).Log("msg", "Truncated title", "truncated_title", title, "incident", key) } parameters.Add("title", title) @@ -1202,7 +1232,7 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error) message, truncated = truncate(message, 1024) if truncated { - level.Debug(n.logger).Log("msg", "Truncated message due to Pushover message limit", "truncated_message", message, "incident", key) + level.Debug(n.logger).Log("msg", "Truncated message", "truncated_message", message, "incident", key) } message = strings.TrimSpace(message) if message == "" { @@ -1213,7 +1243,7 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error) supplementaryURL, truncated := truncate(tmpl(n.conf.URL), 512) if truncated { - level.Debug(n.logger).Log("msg", "Truncated URL due to Pushover url limit", "truncated_url", supplementaryURL, "incident", key) + level.Debug(n.logger).Log("msg", "Truncated URL", "truncated_url", supplementaryURL, "incident", key) } parameters.Add("url", supplementaryURL) parameters.Add("url_title", tmpl(n.conf.URLTitle)) @@ -1232,18 +1262,12 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error) } u.RawQuery = parameters.Encode() // Don't log the URL as it contains secret data (see #1825). - level.Debug(n.logger).Log("msg", "Sending Pushover message", "incident", key) - - c, err := commoncfg.NewClientFromConfig(*n.conf.HTTPConfig, "pushover") - if err != nil { - return false, err - } - - resp, err := post(ctx, c, u.String(), "text/plain", nil) + level.Debug(n.logger).Log("msg", "Sending message", "incident", key) + resp, err := post(ctx, n.client, u.String(), "text/plain", nil) if err != nil { return true, redactURL(err) } - defer resp.Body.Close() + defer drainResponse(resp) return n.retry(resp.StatusCode) } @@ -1314,6 +1338,13 @@ func post(ctx context.Context, client *http.Client, url string, bodyType string, return client.Do(req.WithContext(ctx)) } +// drainResponse consumes and closes the response's body to make sure that the +// HTTP client can reuse existing connections. +func drainResponse(resp *http.Response) { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() +} + func truncate(s string, n int) (string, bool) { r := []rune(s) if len(r) <= n { diff --git a/notify/impl_test.go b/notify/impl_test.go index 2ecc28fa..4e1b2476 100644 --- a/notify/impl_test.go +++ b/notify/impl_test.go @@ -91,6 +91,74 @@ func assertNotifyLeaksNoSecret(t *testing.T, ctx context.Context, n Notifier, se require.True(t, ok) } +func TestBuildReceiverIntegrations(t *testing.T) { + for _, tc := range []struct { + receiver *config.Receiver + err bool + exp []Integration + }{ + { + receiver: &config.Receiver{ + Name: "foo", + WebhookConfigs: []*config.WebhookConfig{ + &config.WebhookConfig{ + HTTPConfig: &commoncfg.HTTPClientConfig{}, + }, + &config.WebhookConfig{ + HTTPConfig: &commoncfg.HTTPClientConfig{}, + NotifierConfig: config.NotifierConfig{ + VSendResolved: true, + }, + }, + }, + }, + exp: []Integration{ + Integration{ + name: "webhook", + idx: 0, + rs: sendResolved(false), + }, + Integration{ + name: "webhook", + idx: 1, + rs: sendResolved(true), + }, + }, + }, + { + receiver: &config.Receiver{ + Name: "foo", + WebhookConfigs: []*config.WebhookConfig{ + &config.WebhookConfig{ + HTTPConfig: &commoncfg.HTTPClientConfig{ + TLSConfig: commoncfg.TLSConfig{ + CAFile: "not_existing", + }, + }, + }, + }, + }, + err: true, + }, + } { + tc := tc + t.Run("", func(t *testing.T) { + integrations, err := BuildReceiverIntegrations(tc.receiver, nil, nil) + if tc.err { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Len(t, integrations, len(tc.exp)) + for i := range tc.exp { + require.Equal(t, tc.exp[i].SendResolved(), integrations[i].SendResolved()) + require.Equal(t, tc.exp[i].Name(), integrations[i].Name()) + require.Equal(t, tc.exp[i].Index(), integrations[i].Index()) + } + }) + } +} + func TestWebhookRetry(t *testing.T) { u, err := url.Parse("http://example.com") if err != nil { @@ -134,7 +202,7 @@ func TestPagerDutyRedactedURLV1(t *testing.T) { defer fn() key := "01234567890123456789012345678901" - notifier := NewPagerDuty( + notifier, err := NewPagerDuty( &config.PagerdutyConfig{ ServiceKey: config.Secret(key), HTTPConfig: &commoncfg.HTTPClientConfig{}, @@ -142,6 +210,7 @@ func TestPagerDutyRedactedURLV1(t *testing.T) { createTmpl(t), log.NewNopLogger(), ) + require.NoError(t, err) notifier.apiV1 = u.String() assertNotifyLeaksNoSecret(t, ctx, notifier, key) @@ -152,7 +221,7 @@ func TestPagerDutyRedactedURLV2(t *testing.T) { defer fn() key := "01234567890123456789012345678901" - notifier := NewPagerDuty( + notifier, err := NewPagerDuty( &config.PagerdutyConfig{ URL: &config.URL{URL: u}, RoutingKey: config.Secret(key), @@ -161,6 +230,7 @@ func TestPagerDutyRedactedURLV2(t *testing.T) { createTmpl(t), log.NewNopLogger(), ) + require.NoError(t, err) assertNotifyLeaksNoSecret(t, ctx, notifier, key) } @@ -219,7 +289,7 @@ func TestSlackRedactedURL(t *testing.T) { ctx, u, fn := getContextWithCancelingURL() defer fn() - notifier := NewSlack( + notifier, err := NewSlack( &config.SlackConfig{ APIURL: &config.SecretURL{URL: u}, HTTPConfig: &commoncfg.HTTPClientConfig{}, @@ -227,6 +297,7 @@ func TestSlackRedactedURL(t *testing.T) { createTmpl(t), log.NewNopLogger(), ) + require.NoError(t, err) assertNotifyLeaksNoSecret(t, ctx, notifier, u.String()) } @@ -245,7 +316,7 @@ func TestHipchatRedactedURL(t *testing.T) { defer fn() token := "secret_token" - notifier := NewHipchat( + notifier, err := NewHipchat( &config.HipchatConfig{ APIURL: &config.URL{URL: u}, AuthToken: config.Secret(token), @@ -254,6 +325,7 @@ func TestHipchatRedactedURL(t *testing.T) { createTmpl(t), log.NewNopLogger(), ) + require.NoError(t, err) assertNotifyLeaksNoSecret(t, ctx, notifier, token) } @@ -273,7 +345,7 @@ func TestOpsGenieRedactedURL(t *testing.T) { defer fn() key := "key" - notifier := NewOpsGenie( + notifier, err := NewOpsGenie( &config.OpsGenieConfig{ APIURL: &config.URL{URL: u}, APIKey: config.Secret(key), @@ -282,6 +354,7 @@ func TestOpsGenieRedactedURL(t *testing.T) { createTmpl(t), log.NewNopLogger(), ) + require.NoError(t, err) assertNotifyLeaksNoSecret(t, ctx, notifier, key) } @@ -299,7 +372,7 @@ func TestVictorOpsRedactedURL(t *testing.T) { defer fn() secret := "secret" - notifier := NewVictorOps( + notifier, err := NewVictorOps( &config.VictorOpsConfig{ APIURL: &config.URL{URL: u}, APIKey: config.Secret(secret), @@ -308,6 +381,7 @@ func TestVictorOpsRedactedURL(t *testing.T) { createTmpl(t), log.NewNopLogger(), ) + require.NoError(t, err) assertNotifyLeaksNoSecret(t, ctx, notifier, secret) } @@ -325,7 +399,7 @@ func TestPushoverRedactedURL(t *testing.T) { defer fn() key, token := "user_key", "token" - notifier := NewPushover( + notifier, err := NewPushover( &config.PushoverConfig{ UserKey: config.Secret(key), Token: config.Secret(token), @@ -334,6 +408,7 @@ func TestPushoverRedactedURL(t *testing.T) { createTmpl(t), log.NewNopLogger(), ) + require.NoError(t, err) notifier.apiURL = u.String() assertNotifyLeaksNoSecret(t, ctx, notifier, key, token) @@ -471,13 +546,15 @@ func TestOpsGenie(t *testing.T) { Type: `{{ .CommonLabels.ResponderType2 }}`, }, }, - Tags: `{{ .CommonLabels.Tags }}`, - Note: `{{ .CommonLabels.Note }}`, - Priority: `{{ .CommonLabels.Priority }}`, - APIKey: `{{ .ExternalURL }}`, - APIURL: &config.URL{URL: u}, + Tags: `{{ .CommonLabels.Tags }}`, + Note: `{{ .CommonLabels.Note }}`, + Priority: `{{ .CommonLabels.Priority }}`, + APIKey: `{{ .ExternalURL }}`, + APIURL: &config.URL{URL: u}, + HTTPConfig: &commoncfg.HTTPClientConfig{}, } - notifier := NewOpsGenie(conf, tmpl, logger) + notifier, err := NewOpsGenie(conf, tmpl, logger) + require.NoError(t, err) ctx := context.Background() ctx = WithGroupKey(ctx, "1") @@ -552,9 +629,11 @@ func TestVictorOpsCustomFields(t *testing.T) { CustomFields: map[string]string{ "Field_A": "{{ .CommonLabels.Message }}", }, + HTTPConfig: &commoncfg.HTTPClientConfig{}, } - notifier := NewVictorOps(conf, tmpl, logger) + notifier, err := NewVictorOps(conf, tmpl, logger) + require.NoError(t, err) ctx := context.Background() ctx = WithGroupKey(ctx, "1") @@ -586,7 +665,7 @@ func TestWechatRedactedURLOnInitialAuthentication(t *testing.T) { defer fn() secret := "secret_key" - notifier := NewWechat( + notifier, err := NewWechat( &config.WechatConfig{ APIURL: &config.URL{URL: u}, HTTPConfig: &commoncfg.HTTPClientConfig{}, @@ -596,6 +675,7 @@ func TestWechatRedactedURLOnInitialAuthentication(t *testing.T) { createTmpl(t), log.NewNopLogger(), ) + require.NoError(t, err) assertNotifyLeaksNoSecret(t, ctx, notifier, secret) } @@ -607,7 +687,7 @@ func TestWechatRedactedURLOnNotify(t *testing.T) { }) defer fn() - notifier := NewWechat( + notifier, err := NewWechat( &config.WechatConfig{ APIURL: &config.URL{URL: u}, HTTPConfig: &commoncfg.HTTPClientConfig{}, @@ -617,6 +697,7 @@ func TestWechatRedactedURLOnNotify(t *testing.T) { createTmpl(t), log.NewNopLogger(), ) + require.NoError(t, err) assertNotifyLeaksNoSecret(t, ctx, notifier, secret, token) } diff --git a/notify/notify.go b/notify/notify.go index 864eda6d..4f945d89 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -28,12 +28,10 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/alertmanager/cluster" - "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/nflog" "github.com/prometheus/alertmanager/nflog/nflogpb" "github.com/prometheus/alertmanager/silence" - "github.com/prometheus/alertmanager/template" "github.com/prometheus/alertmanager/types" ) @@ -92,7 +90,8 @@ func init() { prometheus.MustRegister(notificationLatencySeconds) } -type notifierConfig interface { +// ResolvedSender returns true if resolved notifications should be sent. +type ResolvedSender interface { SendResolved() bool } @@ -234,40 +233,39 @@ type NotificationLog interface { // BuildPipeline builds a map of receivers to Stages. func BuildPipeline( - confs []*config.Receiver, - tmpl *template.Template, + receivers map[string][]Integration, wait func() time.Duration, inhibitor *inhibit.Inhibitor, silencer *silence.Silencer, notificationLog NotificationLog, peer *cluster.Peer, - logger log.Logger, ) RoutingStage { - rs := RoutingStage{} + rs := make(RoutingStage, len(receivers)) ms := NewGossipSettleStage(peer) is := NewMuteStage(inhibitor) ss := NewMuteStage(silencer) - for _, rc := range confs { - rs[rc.Name] = MultiStage{ms, is, ss, createStage(rc, tmpl, wait, notificationLog, logger)} + for name := range receivers { + st := createReceiverStage(name, receivers[name], wait, notificationLog) + rs[name] = MultiStage{ms, is, ss, st} } return rs } -// createStage creates a pipeline of stages for a receiver. -func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.Duration, notificationLog NotificationLog, logger log.Logger) Stage { +// createReceiverStage creates a pipeline of stages for a receiver. +func createReceiverStage(name string, integrations []Integration, wait func() time.Duration, notificationLog NotificationLog) Stage { var fs FanoutStage - for _, i := range BuildReceiverIntegrations(rc, tmpl, logger) { + for i := range integrations { recv := &nflogpb.Receiver{ - GroupName: rc.Name, - Integration: i.name, - Idx: uint32(i.idx), + GroupName: name, + Integration: integrations[i].Name(), + Idx: uint32(integrations[i].Index()), } var s MultiStage s = append(s, NewWaitStage(wait)) - s = append(s, NewDedupStage(i, notificationLog, recv)) - s = append(s, NewRetryStage(i, rc.Name)) + s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) + s = append(s, NewRetryStage(integrations[i], name)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) fs = append(fs, s) @@ -416,20 +414,20 @@ func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al // DedupStage filters alerts. // Filtering happens based on a notification log. type DedupStage struct { + rs ResolvedSender nflog NotificationLog recv *nflogpb.Receiver - conf notifierConfig now func() time.Time hash func(*types.Alert) uint64 } // NewDedupStage wraps a DedupStage that runs against the given notification log. -func NewDedupStage(i Integration, l NotificationLog, recv *nflogpb.Receiver) *DedupStage { +func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage { return &DedupStage{ + rs: rs, nflog: l, recv: recv, - conf: i.conf, now: utcNow, hash: hashAlert, } @@ -502,7 +500,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint return len(entry.FiringAlerts) > 0 } - if n.conf.SendResolved() && !entry.IsResolvedSubset(resolved) { + if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) { return true } @@ -583,7 +581,7 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale // If we shouldn't send notifications for resolved alerts, but there are only // resolved alerts, report them all as successfully notified (we still want the // notification log to log them for the next run of DedupStage). - if !r.integration.conf.SendResolved() { + if !r.integration.SendResolved() { firing, ok := FiringAlerts(ctx) if !ok { return ctx, nil, fmt.Errorf("firing alerts missing") @@ -625,13 +623,13 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale case <-tick.C: now := time.Now() retry, err := r.integration.Notify(ctx, sent...) - notificationLatencySeconds.WithLabelValues(r.integration.name).Observe(time.Since(now).Seconds()) - numNotifications.WithLabelValues(r.integration.name).Inc() + notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds()) + numNotifications.WithLabelValues(r.integration.Name()).Inc() if err != nil { - numFailedNotifications.WithLabelValues(r.integration.name).Inc() - level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.name, "receiver", r.groupName, "err", err) + numFailedNotifications.WithLabelValues(r.integration.Name()).Inc() + level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.Name(), "receiver", r.groupName, "err", err) if !retry { - return ctx, alerts, fmt.Errorf("cancelling notify retry for %q due to unrecoverable error: %s", r.integration.name, err) + return ctx, alerts, fmt.Errorf("cancelling notify retry for %q due to unrecoverable error: %s", r.integration.Name(), err) } // Save this error to be able to return the last seen error by an diff --git a/notify/notify_test.go b/notify/notify_test.go index 71e0501f..32611130 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -33,10 +33,10 @@ import ( "github.com/prometheus/alertmanager/types" ) -type notifierConfigFunc func() bool +type sendResolved bool -func (f notifierConfigFunc) SendResolved() bool { - return f() +func (s sendResolved) SendResolved() bool { + return bool(s) } type notifierFunc func(ctx context.Context, alerts ...*types.Alert) (bool, error) @@ -201,8 +201,8 @@ func TestDedupStageNeedsUpdate(t *testing.T) { t.Log("case", i) s := &DedupStage{ - now: func() time.Time { return now }, - conf: notifierConfigFunc(func() bool { return c.resolve }), + now: func() time.Time { return now }, + rs: sendResolved(c.resolve), } res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat) require.Equal(t, c.res, res) @@ -221,7 +221,7 @@ func TestDedupStage(t *testing.T) { now: func() time.Time { return now }, - conf: notifierConfigFunc(func() bool { return false }), + rs: sendResolved(false), } ctx := context.Background() @@ -384,7 +384,7 @@ func TestRetryStageWithError(t *testing.T) { sent = append(sent, alerts...) return false, nil }), - conf: notifierConfigFunc(func() bool { return false }), + rs: sendResolved(false), } r := RetryStage{ integration: i, @@ -424,7 +424,7 @@ func TestRetryStageNoResolved(t *testing.T) { sent = append(sent, alerts...) return false, nil }), - conf: notifierConfigFunc(func() bool { return false }), + rs: sendResolved(false), } r := RetryStage{ integration: i, @@ -477,7 +477,7 @@ func TestRetryStageSendResolved(t *testing.T) { sent = append(sent, alerts...) return false, nil }), - conf: notifierConfigFunc(func() bool { return true }), + rs: sendResolved(true), } r := RetryStage{ integration: i, diff --git a/scripts/errcheck_excludes.txt b/scripts/errcheck_excludes.txt index e759ce68..b47feb22 100644 --- a/scripts/errcheck_excludes.txt +++ b/scripts/errcheck_excludes.txt @@ -1,2 +1,4 @@ +// Don't flag lines such as "io.Copy(ioutil.Discard, resp.Body)". +io.Copy // Never check for logger errors. (github.com/go-kit/kit/log.Logger).Log