add retry flag for notify providers

The retry flag allows an integration to specify whether a retry can
potentially be solved or if the error is likely not going to recover.
For example invalid authentication is likely a wrong configuration and
therefore a retry would not make sense, while a server error is likely
a temporary problem and can potentially be solved on the next retry.
This commit is contained in:
Frederic Branczyk 2016-09-05 17:51:03 +02:00
parent d263b7ab9a
commit 92acfbd449
2 changed files with 148 additions and 76 deletions

View File

@ -45,8 +45,10 @@ type notifierConfig interface {
}
// A Notifier notifies about alerts under constraints of the given context.
// It returns an error if unsuccessful and a flag whether the error is
// recoverable. This information is useful for a retry logic.
type Notifier interface {
Notify(context.Context, ...*types.Alert) error
Notify(context.Context, ...*types.Alert) (bool, error)
}
// An Integration wraps a notifier and its config to be uniquely identified by
@ -59,7 +61,7 @@ type Integration struct {
}
// Notify implements the Notifier interface.
func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) error {
func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
return i.notifier.Notify(ctx, alerts...)
}
@ -137,7 +139,7 @@ type WebhookMessage struct {
}
// Notify implements the Notifier interface.
func (w *Webhook) Notify(ctx context.Context, alerts ...*types.Alert) error {
func (w *Webhook) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
data := w.tmpl.Data(receiverName(ctx), groupLabels(ctx), alerts...)
groupKey, ok := GroupKey(ctx)
@ -153,20 +155,26 @@ func (w *Webhook) Notify(ctx context.Context, alerts ...*types.Alert) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(msg); err != nil {
return err
return false, err
}
resp, err := ctxhttp.Post(ctx, http.DefaultClient, w.URL, contentTypeJSON, &buf)
if err != nil {
return err
return true, err
}
resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("unexpected status code %v from %s", resp.StatusCode, w.URL)
return w.retry(resp.StatusCode)
}
func (w *Webhook) retry(statusCode int) (bool, error) {
// Webhooks are assumed to respond with 2xx response codes on a successful
// request and 5xx response codes are assumed to be recoverable.
if statusCode/100 != 2 {
return (statusCode/100 == 5), fmt.Errorf("unexpected status code %v from %s", statusCode, w.URL)
}
return nil
return false, nil
}
// Email implements a Notifier for email notifications.
@ -227,38 +235,38 @@ func (n *Email) auth(mechs string) (smtp.Auth, error) {
}
// Notify implements the Notifier interface.
func (n *Email) Notify(ctx context.Context, as ...*types.Alert) error {
func (n *Email) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
// Connect to the SMTP smarthost.
c, err := smtp.Dial(n.conf.Smarthost)
if err != nil {
return err
return true, err
}
defer c.Quit()
// We need to know the hostname for both auth and TLS.
host, _, err := net.SplitHostPort(n.conf.Smarthost)
if err != nil {
return fmt.Errorf("invalid address: %s", err)
return false, fmt.Errorf("invalid address: %s", err)
}
if n.conf.RequireTLS {
if ok, _ := c.Extension("STARTTLS"); !ok {
return fmt.Errorf("require_tls: true (default), but %q does not advertise the STARTTLS extension", n.conf.Smarthost)
return true, fmt.Errorf("require_tls: true (default), but %q does not advertise the STARTTLS extension", n.conf.Smarthost)
}
tlsConf := &tls.Config{ServerName: host}
if err := c.StartTLS(tlsConf); err != nil {
return fmt.Errorf("starttls failed: %s", err)
return true, fmt.Errorf("starttls failed: %s", err)
}
}
if ok, mech := c.Extension("AUTH"); ok {
auth, err := n.auth(mech)
if err != nil {
return err
return true, err
}
if auth != nil {
if err := c.Auth(auth); err != nil {
return fmt.Errorf("%T failed: %s", auth, err)
return true, fmt.Errorf("%T failed: %s", auth, err)
}
}
}
@ -270,40 +278,40 @@ func (n *Email) Notify(ctx context.Context, as ...*types.Alert) error {
to = tmpl(n.conf.To)
)
if err != nil {
return err
return false, err
}
addrs, err := mail.ParseAddressList(from)
if err != nil {
return fmt.Errorf("parsing from addresses: %s", err)
return false, fmt.Errorf("parsing from addresses: %s", err)
}
if len(addrs) != 1 {
return fmt.Errorf("must be exactly one from address")
return false, fmt.Errorf("must be exactly one from address")
}
if err := c.Mail(addrs[0].Address); err != nil {
return fmt.Errorf("sending mail from: %s", err)
return true, fmt.Errorf("sending mail from: %s", err)
}
addrs, err = mail.ParseAddressList(to)
if err != nil {
return fmt.Errorf("parsing to addresses: %s", err)
return false, fmt.Errorf("parsing to addresses: %s", err)
}
for _, addr := range addrs {
if err := c.Rcpt(addr.Address); err != nil {
return fmt.Errorf("sending rcpt to: %s", err)
return true, fmt.Errorf("sending rcpt to: %s", err)
}
}
// Send the email body.
wc, err := c.Data()
if err != nil {
return err
return true, err
}
defer wc.Close()
for header, t := range n.conf.Headers {
value, err := n.tmpl.ExecuteTextString(t, data)
if err != nil {
return fmt.Errorf("executing %q header template: %s", header, err)
return false, fmt.Errorf("executing %q header template: %s", header, err)
}
fmt.Fprintf(wc, "%s: %s\r\n", header, mime.QEncoding.Encode("utf-8", value))
}
@ -318,11 +326,14 @@ func (n *Email) Notify(ctx context.Context, as ...*types.Alert) error {
// TODO(fabxc): do a multipart write that considers the plain template.
body, err := n.tmpl.ExecuteHTMLString(n.conf.HTML, data)
if err != nil {
return fmt.Errorf("executing email html template: %s", err)
return false, fmt.Errorf("executing email html template: %s", err)
}
_, err = io.WriteString(wc, body)
if err != nil {
return true, err
}
return err
return false, nil
}
// PagerDuty implements a Notifier for PagerDuty notifications.
@ -354,10 +365,10 @@ type pagerDutyMessage struct {
// Notify implements the Notifier interface.
//
// http://developer.pagerduty.com/documentation/integration/events/trigger
func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) error {
func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
key, ok := GroupKey(ctx)
if !ok {
return fmt.Errorf("group key missing")
return false, fmt.Errorf("group key missing")
}
var err error
@ -390,24 +401,32 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) error {
msg.ClientURL = tmpl(n.conf.ClientURL)
}
if err != nil {
return err
return false, err
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(msg); err != nil {
return err
return false, err
}
resp, err := ctxhttp.Post(ctx, http.DefaultClient, n.conf.URL, contentTypeJSON, &buf)
if err != nil {
return err
return true, err
}
resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("unexpected status code %v", resp.StatusCode)
return n.retry(resp.StatusCode)
}
func (n *PagerDuty) retry(statusCode int) (bool, error) {
// Retrying can solve the issue on 403 (rate limiting) and 5xx response codes.
// 2xx response codes indicate a successful request.
// https://v2.developer.pagerduty.com/docs/trigger-events
if statusCode/100 != 2 {
return (statusCode == 403 || statusCode/100 == 5), fmt.Errorf("unexpected status code %v", statusCode)
}
return nil
return false, nil
}
// Slack implements a Notifier for Slack notifications.
@ -453,7 +472,7 @@ type slackAttachmentField struct {
}
// Notify implements the Notifier interface.
func (n *Slack) Notify(ctx context.Context, as ...*types.Alert) error {
func (n *Slack) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
var err error
var (
data = n.tmpl.Data(receiverName(ctx), groupLabels(ctx), as...)
@ -477,26 +496,32 @@ func (n *Slack) Notify(ctx context.Context, as ...*types.Alert) error {
Attachments: []slackAttachment{*attachment},
}
if err != nil {
return err
return false, err
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(req); err != nil {
return err
return false, err
}
resp, err := ctxhttp.Post(ctx, http.DefaultClient, string(n.conf.APIURL), contentTypeJSON, &buf)
if err != nil {
return err
return true, err
}
// TODO(fabxc): is 2xx status code really indicator for success for Slack API?
resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("unexpected status code %v", resp.StatusCode)
return n.retry(resp.StatusCode)
}
func (n *Slack) retry(statusCode int) (bool, error) {
// Only 5xx response codes are recoverable and 2xx codes are successful.
// https://api.slack.com/incoming-webhooks#handling_errors
// https://api.slack.com/changelog/2016-05-17-changes-to-errors-for-incoming-webhooks
if statusCode/100 != 2 {
return (statusCode/100 == 5), fmt.Errorf("unexpected status code %v", statusCode)
}
return nil
return false, nil
}
// Hipchat implements a Notifier for Hipchat notifications.
@ -522,7 +547,7 @@ type hipchatReq struct {
}
// Notify implements the Notifier interface.
func (n *Hipchat) Notify(ctx context.Context, as ...*types.Alert) error {
func (n *Hipchat) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
var err error
var msg string
var (
@ -546,26 +571,33 @@ func (n *Hipchat) Notify(ctx context.Context, as ...*types.Alert) error {
Color: tmplText(n.conf.Color),
}
if err != nil {
return err
return false, err
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(req); err != nil {
return err
return false, err
}
resp, err := ctxhttp.Post(ctx, http.DefaultClient, url, contentTypeJSON, &buf)
if err != nil {
return err
return true, err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("unexpected status code %v", resp.StatusCode)
return n.retry(resp.StatusCode)
}
func (n *Hipchat) retry(statusCode int) (bool, error) {
// Response codes 429 (rate limiting) and 5xx can potentially recover. 2xx
// responce codes indicate successful requests.
// https://developer.atlassian.com/hipchat/guide/hipchat-rest-api/api-response-codes
if statusCode/100 != 2 {
return (statusCode == 429 || statusCode/100 == 5), fmt.Errorf("unexpected status code %v", statusCode)
}
return nil
return false, nil
}
// OpsGenie implements a Notifier for OpsGenie notifications.
@ -606,10 +638,10 @@ type opsGenieErrorResponse struct {
}
// Notify implements the Notifier interface.
func (n *OpsGenie) Notify(ctx context.Context, as ...*types.Alert) error {
func (n *OpsGenie) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
key, ok := GroupKey(ctx)
if !ok {
return fmt.Errorf("group key missing")
return false, fmt.Errorf("group key missing")
}
data := n.tmpl.Data(receiverName(ctx), groupLabels(ctx), as...)
@ -651,40 +683,46 @@ func (n *OpsGenie) Notify(ctx context.Context, as ...*types.Alert) error {
}
}
if err != nil {
return fmt.Errorf("templating error: %s", err)
return false, fmt.Errorf("templating error: %s", err)
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(msg); err != nil {
return err
return false, err
}
resp, err := ctxhttp.Post(ctx, http.DefaultClient, apiURL, contentTypeJSON, &buf)
if err != nil {
return err
return true, err
}
defer resp.Body.Close()
if resp.StatusCode == 400 && alerts.Status() == model.AlertResolved {
// Missing documentation therefore assuming only 5xx response codes are
// recoverable.
if resp.StatusCode/100 == 5 {
return true, fmt.Errorf("unexpected status code %v", resp.StatusCode)
} else if resp.StatusCode == 400 && alerts.Status() == model.AlertResolved {
body, _ := ioutil.ReadAll(resp.Body)
var responseMessage opsGenieErrorResponse
if err := json.Unmarshal(body, &responseMessage); err != nil {
return fmt.Errorf("could not parse error response %q", body)
return false, fmt.Errorf("could not parse error response %q", body)
}
const alreadyClosedError = 5
if responseMessage.Code == alreadyClosedError {
return nil
return false, nil
}
return fmt.Errorf("error when closing alert: code %d, error %q",
return false, fmt.Errorf("error when closing alert: code %d, error %q",
responseMessage.Code, responseMessage.Error)
} else if resp.StatusCode/100 == 4 {
return false, fmt.Errorf("unexpected status code %v", resp.StatusCode)
} else if resp.StatusCode/100 != 2 {
body, _ := ioutil.ReadAll(resp.Body)
log.With("incident", key).Debugf("unexpected OpsGenie response from %s (POSTed %s), %s: %s",
apiURL, msg, resp.Status, body)
return fmt.Errorf("unexpected status code %v", resp.StatusCode)
return false, fmt.Errorf("unexpected status code %v", resp.StatusCode)
}
return nil
return false, nil
}
// VictorOps implements a Notifier for VictorOps notifications.
@ -719,7 +757,7 @@ type victorOpsErrorResponse struct {
}
// Notify implements the Notifier interface.
func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) error {
func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
victorOpsAllowedEvents := map[string]bool{
"INFO": true,
"WARNING": true,
@ -728,7 +766,7 @@ func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) error {
key, ok := GroupKey(ctx)
if !ok {
return fmt.Errorf("group key missing")
return false, fmt.Errorf("group key missing")
}
var err error
@ -756,35 +794,42 @@ func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) error {
}
if err != nil {
return fmt.Errorf("templating error: %s", err)
return false, fmt.Errorf("templating error: %s", err)
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(msg); err != nil {
return err
return false, err
}
resp, err := ctxhttp.Post(ctx, http.DefaultClient, apiURL, contentTypeJSON, &buf)
if err != nil {
return err
return true, err
}
defer resp.Body.Close()
// Missing documentation therefore assuming only 5xx response codes are
// recoverable.
if resp.StatusCode/100 == 5 {
return true, fmt.Errorf("unexpected status code %v", resp.StatusCode)
}
if resp.StatusCode/100 != 2 {
body, _ := ioutil.ReadAll(resp.Body)
var responseMessage victorOpsErrorResponse
if err := json.Unmarshal(body, &responseMessage); err != nil {
return fmt.Errorf("could not parse error response %q", body)
return false, fmt.Errorf("could not parse error response %q", body)
}
log.With("incident", key).Debugf("unexpected VictorOps response from %s (POSTed %s), %s: %s", apiURL, msg, resp.Status, body)
return fmt.Errorf("error when posting alert: result %q, message %q",
return false, fmt.Errorf("error when posting alert: result %q, message %q",
responseMessage.Result, responseMessage.Message)
}
return nil
return false, nil
}
// Pushover implements a Notifier for Pushover notifications.
@ -799,10 +844,10 @@ func NewPushover(c *config.PushoverConfig, t *template.Template) *Pushover {
}
// Notify implements the Notifier interface.
func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) error {
func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
key, ok := GroupKey(ctx)
if !ok {
return fmt.Errorf("group key missing")
return false, fmt.Errorf("group key missing")
}
data := n.tmpl.Data(receiverName(ctx), groupLabels(ctx), as...)
@ -835,29 +880,40 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) error {
parameters.Add("priority", tmpl(n.conf.Priority))
parameters.Add("retry", fmt.Sprintf("%d", int64(time.Duration(n.conf.Retry).Seconds())))
parameters.Add("expire", fmt.Sprintf("%d", int64(time.Duration(n.conf.Expire).Seconds())))
if err != nil {
return false, err
}
apiURL := "https://api.pushover.net/1/messages.json"
u, err := url.Parse(apiURL)
if err != nil {
return err
return false, err
}
u.RawQuery = parameters.Encode()
log.With("incident", key).Debugf("Pushover URL = %q", u.String())
resp, err := ctxhttp.Post(ctx, http.DefaultClient, u.String(), "text/plain", nil)
if err != nil {
return err
return true, err
}
defer resp.Body.Close()
// Only documented behaviour is that 2xx response codes are successful and
// 4xx are unsuccessful, therefore assuming only 5xx are recoverable.
// https://pushover.net/api#response
if resp.StatusCode/100 == 5 {
return true, fmt.Errorf("unexpected status code %v", resp.StatusCode)
}
if resp.StatusCode/100 != 2 {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
return false, err
}
return fmt.Errorf("unexpected status code %v (body: %s)", resp.StatusCode, string(body))
return false, fmt.Errorf("unexpected status code %v (body: %s)", resp.StatusCode, string(body))
}
return nil
return false, nil
}
func tmplText(tmpl *template.Template, data *template.Data, err *error) func(string) string {

View File

@ -561,6 +561,7 @@ func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.C
i = 0
b = backoff.NewExponentialBackOff()
tick = backoff.NewTicker(b)
iErr error
)
defer tick.Stop()
@ -569,20 +570,35 @@ func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.C
// Always check the context first to not notify again.
select {
case <-ctx.Done():
if iErr != nil {
return ctx, nil, iErr
}
return ctx, nil, ctx.Err()
default:
}
select {
case <-tick.C:
if err := r.integration.Notify(ctx, alerts...); err != nil {
if retry, err := r.integration.Notify(ctx, alerts...); err != nil {
numFailedNotifications.WithLabelValues(r.integration.name).Inc()
log.Warnf("Notify attempt %d failed: %s", i, err)
log.Debugf("Notify attempt %d failed: %s", i, err)
if !retry {
return ctx, alerts, fmt.Errorf("Cancelling notify retry due to unrecoverable error: %s", err)
}
// Save this error to be able to return the last seen error by an
// integration upon context timeout.
iErr = err
} else {
numNotifications.WithLabelValues(r.integration.name).Inc()
return ctx, alerts, nil
}
case <-ctx.Done():
if iErr != nil {
return ctx, nil, iErr
}
return ctx, nil, ctx.Err()
}
}