Merge pull request #2819 from simonpasquier/detect-fifo-topic
notify/sns: detect FIFO topic based on the rendered value
This commit is contained in:
commit
78a2d27984
|
@ -43,7 +43,6 @@ type Notifier struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
client *http.Client
|
client *http.Client
|
||||||
retrier *notify.Retrier
|
retrier *notify.Retrier
|
||||||
isFifo *bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new SNS notification handler.
|
// New returns a new SNS notification handler.
|
||||||
|
@ -68,7 +67,7 @@ func (n *Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, err
|
||||||
tmpl = notify.TmplText(n.tmpl, data, &err)
|
tmpl = notify.TmplText(n.tmpl, data, &err)
|
||||||
)
|
)
|
||||||
|
|
||||||
client, err := createSNSClient(n.client, n, tmpl)
|
client, err := n.createSNSClient(tmpl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if e, ok := err.(awserr.RequestFailure); ok {
|
if e, ok := err.(awserr.RequestFailure); ok {
|
||||||
return n.retrier.Check(e.StatusCode(), strings.NewReader(e.Message()))
|
return n.retrier.Check(e.StatusCode(), strings.NewReader(e.Message()))
|
||||||
|
@ -77,7 +76,7 @@ func (n *Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
publishInput, err := createPublishInput(ctx, n, tmpl)
|
publishInput, err := n.createPublishInput(ctx, tmpl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
|
@ -96,7 +95,7 @@ func (n *Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, err
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createSNSClient(httpClient *http.Client, n *Notifier, tmpl func(string) string) (*sns.SNS, error) {
|
func (n *Notifier) createSNSClient(tmpl func(string) string) (*sns.SNS, error) {
|
||||||
var creds *credentials.Credentials = nil
|
var creds *credentials.Credentials = nil
|
||||||
// If there are provided sigV4 credentials we want to use those to create a session.
|
// If there are provided sigV4 credentials we want to use those to create a session.
|
||||||
if n.conf.Sigv4.AccessKey != "" && n.conf.Sigv4.SecretKey != "" {
|
if n.conf.Sigv4.AccessKey != "" && n.conf.Sigv4.SecretKey != "" {
|
||||||
|
@ -133,7 +132,7 @@ func createSNSClient(httpClient *http.Client, n *Notifier, tmpl func(string) str
|
||||||
creds = stscreds.NewCredentials(stsSess, n.conf.Sigv4.RoleARN)
|
creds = stscreds.NewCredentials(stsSess, n.conf.Sigv4.RoleARN)
|
||||||
}
|
}
|
||||||
// Use our generated session with credentials to create the SNS Client.
|
// Use our generated session with credentials to create the SNS Client.
|
||||||
client := sns.New(sess, &aws.Config{Credentials: creds, HTTPClient: httpClient})
|
client := sns.New(sess, &aws.Config{Credentials: creds, HTTPClient: n.client})
|
||||||
// We will always need a region to be set by either the local config or the environment.
|
// We will always need a region to be set by either the local config or the environment.
|
||||||
if aws.StringValue(sess.Config.Region) == "" {
|
if aws.StringValue(sess.Config.Region) == "" {
|
||||||
return nil, fmt.Errorf("region not configured in sns.sigv4.region or in default credentials chain")
|
return nil, fmt.Errorf("region not configured in sns.sigv4.region or in default credentials chain")
|
||||||
|
@ -141,19 +140,16 @@ func createSNSClient(httpClient *http.Client, n *Notifier, tmpl func(string) str
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createPublishInput(ctx context.Context, n *Notifier, tmpl func(string) string) (*sns.PublishInput, error) {
|
func (n *Notifier) createPublishInput(ctx context.Context, tmpl func(string) string) (*sns.PublishInput, error) {
|
||||||
publishInput := &sns.PublishInput{}
|
publishInput := &sns.PublishInput{}
|
||||||
messageAttributes := createMessageAttributes(n, tmpl)
|
messageAttributes := n.createMessageAttributes(tmpl)
|
||||||
// Max message size for a message in a SNS publish request is 256KB, except for SMS messages where the limit is 1600 characters/runes.
|
// Max message size for a message in a SNS publish request is 256KB, except for SMS messages where the limit is 1600 characters/runes.
|
||||||
messageSizeLimit := 256 * 1024
|
messageSizeLimit := 256 * 1024
|
||||||
if n.conf.TopicARN != "" {
|
if n.conf.TopicARN != "" {
|
||||||
topicTmpl := tmpl(n.conf.TopicARN)
|
topicARN := tmpl(n.conf.TopicARN)
|
||||||
publishInput.SetTopicArn(topicTmpl)
|
publishInput.SetTopicArn(topicARN)
|
||||||
if n.isFifo == nil {
|
// If we are using a topic ARN, it could be a FIFO topic specified by the topic's suffix ".fifo".
|
||||||
// If we are using a topic ARN it could be a FIFO topic specified by the topic postfix .fifo.
|
if strings.HasSuffix(topicARN, ".fifo") {
|
||||||
n.isFifo = aws.Bool(n.conf.TopicARN[len(n.conf.TopicARN)-5:] == ".fifo")
|
|
||||||
}
|
|
||||||
if *n.isFifo {
|
|
||||||
// Deduplication key and Message Group ID are only added if it's a FIFO SNS Topic.
|
// Deduplication key and Message Group ID are only added if it's a FIFO SNS Topic.
|
||||||
key, err := notify.ExtractGroupKey(ctx)
|
key, err := notify.ExtractGroupKey(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -204,7 +200,7 @@ func validateAndTruncateMessage(message string, maxMessageSizeInBytes int) (stri
|
||||||
return string(truncated), true, nil
|
return string(truncated), true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createMessageAttributes(n *Notifier, tmpl func(string) string) map[string]*sns.MessageAttributeValue {
|
func (n *Notifier) createMessageAttributes(tmpl func(string) string) map[string]*sns.MessageAttributeValue {
|
||||||
// Convert the given attributes map into the AWS Message Attributes Format.
|
// Convert the given attributes map into the AWS Message Attributes Format.
|
||||||
attributes := make(map[string]*sns.MessageAttributeValue, len(n.conf.Attributes))
|
attributes := make(map[string]*sns.MessageAttributeValue, len(n.conf.Attributes))
|
||||||
for k, v := range n.conf.Attributes {
|
for k, v := range n.conf.Attributes {
|
||||||
|
|
Loading…
Reference in New Issue