2015-01-21 19:07:45 +00:00
// Copyright 2013 The Prometheus Authors
2013-07-30 15:18:07 +00:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-03-01 11:37:22 +00:00
package notifier
2013-07-30 15:18:07 +00:00
import (
"bytes"
2017-10-25 04:21:42 +00:00
"context"
2013-07-30 15:18:07 +00:00
"encoding/json"
2015-12-10 15:31:50 +00:00
"fmt"
2019-04-18 08:50:37 +00:00
"io"
2013-07-30 15:18:07 +00:00
"net/http"
2016-11-23 16:03:22 +00:00
"net/url"
"path"
2015-09-01 20:17:02 +00:00
"sync"
2013-07-30 15:18:07 +00:00
"time"
2021-06-11 16:17:59 +00:00
"github.com/go-kit/log"
"github.com/go-kit/log/level"
2019-04-18 12:17:03 +00:00
"github.com/go-openapi/strfmt"
2020-07-30 07:45:42 +00:00
"github.com/prometheus/alertmanager/api/v2/models"
2014-09-24 14:52:00 +00:00
"github.com/prometheus/client_golang/prometheus"
2018-04-25 17:19:06 +00:00
config_util "github.com/prometheus/common/config"
2015-08-20 15:18:46 +00:00
"github.com/prometheus/common/model"
2023-09-01 02:00:25 +00:00
"github.com/prometheus/common/sigv4"
2018-11-23 14:49:49 +00:00
"github.com/prometheus/common/version"
2020-10-22 09:00:08 +00:00
"go.uber.org/atomic"
2015-09-01 20:17:02 +00:00
"github.com/prometheus/prometheus/config"
Refactor SD configuration to remove `config` dependency (#3629)
* refactor: move targetGroup struct and CheckOverflow() to their own package
* refactor: move auth and security related structs to a utility package, fix import error in utility package
* refactor: Azure SD, remove SD struct from config
* refactor: DNS SD, remove SD struct from config into dns package
* refactor: ec2 SD, move SD struct from config into the ec2 package
* refactor: file SD, move SD struct from config to file discovery package
* refactor: gce, move SD struct from config to gce discovery package
* refactor: move HTTPClientConfig and URL into util/config, fix import error in httputil
* refactor: consul, move SD struct from config into consul discovery package
* refactor: marathon, move SD struct from config into marathon discovery package
* refactor: triton, move SD struct from config to triton discovery package, fix test
* refactor: zookeeper, move SD structs from config to zookeeper discovery package
* refactor: openstack, remove SD struct from config, move into openstack discovery package
* refactor: kubernetes, move SD struct from config into kubernetes discovery package
* refactor: notifier, use targetgroup package instead of config
* refactor: tests for file, marathon, triton SD - use targetgroup package instead of config.TargetGroup
* refactor: retrieval, use targetgroup package instead of config.TargetGroup
* refactor: storage, use config util package
* refactor: discovery manager, use targetgroup package instead of config.TargetGroup
* refactor: use HTTPClient and TLS config from configUtil instead of config
* refactor: tests, use targetgroup package instead of config.TargetGroup
* refactor: fix tagetgroup.Group pointers that were removed by mistake
* refactor: openstack, kubernetes: drop prefixes
* refactor: remove import aliases forced due to vscode bug
* refactor: move main SD struct out of config into discovery/config
* refactor: rename configUtil to config_util
* refactor: rename yamlUtil to yaml_config
* refactor: kubernetes, remove prefixes
* refactor: move the TargetGroup package to discovery/
* refactor: fix order of imports
2017-12-29 20:01:34 +00:00
"github.com/prometheus/prometheus/discovery/targetgroup"
2021-11-08 14:23:17 +00:00
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
2013-07-30 15:18:07 +00:00
)
const (
2019-04-18 12:17:03 +00:00
contentTypeJSON = "application/json"
2013-07-30 15:18:07 +00:00
)
2014-06-18 17:43:15 +00:00
// String constants for instrumentation.
const (
2016-06-02 12:25:19 +00:00
namespace = "prometheus"
subsystem = "notifications"
alertmanagerLabel = "alertmanager"
2014-06-18 17:43:15 +00:00
)
2018-11-23 14:49:49 +00:00
var userAgent = fmt . Sprintf ( "Prometheus/%s" , version . Version )
2016-12-24 23:37:46 +00:00
// Alert is a generic representation of an alert in the Prometheus eco-system.
type Alert struct {
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include an "alertname" label.
Labels labels . Labels ` json:"labels" `
// Extra key/value information which does not define alert identity.
Annotations labels . Labels ` json:"annotations" `
// The known time range for this alert. Both ends are optional.
StartsAt time . Time ` json:"startsAt,omitempty" `
EndsAt time . Time ` json:"endsAt,omitempty" `
GeneratorURL string ` json:"generatorURL,omitempty" `
}
// Name returns the name of the alert. It is equivalent to the "alertname" label.
func ( a * Alert ) Name ( ) string {
return a . Labels . Get ( labels . AlertName )
}
// Hash returns a hash over the alert. It is equivalent to the alert labels hash.
func ( a * Alert ) Hash ( ) uint64 {
return a . Labels . Hash ( )
}
func ( a * Alert ) String ( ) string {
s := fmt . Sprintf ( "%s[%s]" , a . Name ( ) , fmt . Sprintf ( "%016x" , a . Hash ( ) ) [ : 7 ] )
if a . Resolved ( ) {
return s + "[resolved]"
}
return s + "[active]"
}
// Resolved returns true iff the activity interval ended in the past.
func ( a * Alert ) Resolved ( ) bool {
return a . ResolvedAt ( time . Now ( ) )
}
2019-01-04 10:57:17 +00:00
// ResolvedAt returns true iff the activity interval ended before
2016-12-24 23:37:46 +00:00
// the given timestamp.
func ( a * Alert ) ResolvedAt ( ts time . Time ) bool {
if a . EndsAt . IsZero ( ) {
return false
}
return ! a . EndsAt . After ( ts )
}
2018-01-30 17:45:37 +00:00
// Manager is responsible for dispatching alert notifications to an
2015-12-10 15:31:50 +00:00
// alert manager service.
2018-01-30 17:45:37 +00:00
type Manager struct {
2016-12-24 23:37:46 +00:00
queue [ ] * Alert
2016-03-01 11:37:22 +00:00
opts * Options
2013-08-09 17:32:55 +00:00
2017-03-02 21:23:16 +00:00
metrics * alertMetrics
2024-06-26 10:32:04 +00:00
more chan struct { }
mtx sync . RWMutex
stopOnce * sync . Once
stopRequested chan struct { }
2013-08-09 17:32:55 +00:00
2017-12-30 17:27:50 +00:00
alertmanagers map [ string ] * alertmanagerSet
logger log . Logger
2013-07-30 15:18:07 +00:00
}
2016-05-11 12:20:36 +00:00
// Options are the configurable parameters of a Handler.
2016-03-01 11:37:22 +00:00
type Options struct {
2024-06-26 10:32:04 +00:00
QueueCapacity int
DrainOnShutdown bool
ExternalLabels labels . Labels
RelabelConfigs [ ] * relabel . Config
2017-02-27 19:31:16 +00:00
// Used for sending HTTP requests to the Alertmanager.
2018-11-19 11:31:16 +00:00
Do func ( ctx context . Context , client * http . Client , req * http . Request ) ( * http . Response , error )
2015-06-15 11:03:17 +00:00
2017-03-31 15:50:12 +00:00
Registerer prometheus . Registerer
2015-06-15 11:03:17 +00:00
}
2017-02-27 19:31:16 +00:00
2017-03-02 21:23:16 +00:00
type alertMetrics struct {
2017-05-04 11:54:08 +00:00
latency * prometheus . SummaryVec
errors * prometheus . CounterVec
sent * prometheus . CounterVec
dropped prometheus . Counter
queueLength prometheus . GaugeFunc
queueCapacity prometheus . Gauge
alertmanagersDiscovered prometheus . GaugeFunc
2017-03-02 21:23:16 +00:00
}
2014-06-18 17:43:15 +00:00
2017-05-04 11:54:08 +00:00
func newAlertMetrics ( r prometheus . Registerer , queueCap int , queueLen , alertmanagersDiscovered func ( ) float64 ) * alertMetrics {
2017-03-02 21:23:16 +00:00
m := & alertMetrics {
2016-06-02 12:25:19 +00:00
latency : prometheus . NewSummaryVec ( prometheus . SummaryOpts {
2019-06-12 00:03:13 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
Name : "latency_seconds" ,
Help : "Latency quantiles for sending alert notifications." ,
Objectives : map [ float64 ] float64 { 0.5 : 0.05 , 0.9 : 0.01 , 0.99 : 0.001 } ,
2016-06-02 12:25:19 +00:00
} ,
[ ] string { alertmanagerLabel } ,
) ,
errors : prometheus . NewCounterVec ( prometheus . CounterOpts {
2015-01-13 17:33:35 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
Name : "errors_total" ,
Help : "Total number of errors sending alert notifications." ,
2016-06-02 12:25:19 +00:00
} ,
[ ] string { alertmanagerLabel } ,
) ,
sent : prometheus . NewCounterVec ( prometheus . CounterOpts {
2015-12-10 15:31:50 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
Name : "sent_total" ,
2018-02-19 14:40:49 +00:00
Help : "Total number of alerts sent." ,
2016-06-02 12:25:19 +00:00
} ,
[ ] string { alertmanagerLabel } ,
) ,
2015-12-10 15:31:50 +00:00
dropped : prometheus . NewCounter ( prometheus . CounterOpts {
2015-01-13 17:33:35 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
Name : "dropped_total" ,
2017-01-13 23:36:00 +00:00
Help : "Total number of alerts dropped due to errors when sending to Alertmanager." ,
2015-01-13 17:33:35 +00:00
} ) ,
2017-03-31 18:44:30 +00:00
queueLength : prometheus . NewGaugeFunc ( prometheus . GaugeOpts {
2014-07-23 17:55:33 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
Name : "queue_length" ,
Help : "The number of alert notifications in the queue." ,
2017-03-31 18:44:30 +00:00
} , queueLen ) ,
queueCapacity : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "queue_capacity" ,
Help : "The capacity of the alert notifications queue." ,
2014-07-23 17:55:33 +00:00
} ) ,
2017-05-04 11:54:08 +00:00
alertmanagersDiscovered : prometheus . NewGaugeFunc ( prometheus . GaugeOpts {
Name : "prometheus_notifications_alertmanagers_discovered" ,
Help : "The number of alertmanagers discovered and active." ,
} , alertmanagersDiscovered ) ,
2013-07-30 15:18:07 +00:00
}
2017-03-02 21:23:16 +00:00
2017-03-31 18:44:30 +00:00
m . queueCapacity . Set ( float64 ( queueCap ) )
2017-03-02 21:23:16 +00:00
if r != nil {
r . MustRegister (
m . latency ,
m . errors ,
m . sent ,
m . dropped ,
2017-03-31 18:44:30 +00:00
m . queueLength ,
m . queueCapacity ,
2017-05-04 11:54:08 +00:00
m . alertmanagersDiscovered ,
2017-03-02 21:23:16 +00:00
)
}
return m
}
2018-11-19 11:31:16 +00:00
func do ( ctx context . Context , client * http . Client , req * http . Request ) ( * http . Response , error ) {
if client == nil {
client = http . DefaultClient
}
return client . Do ( req . WithContext ( ctx ) )
}
2018-01-30 17:45:37 +00:00
// NewManager is the manager constructor.
func NewManager ( o * Options , logger log . Logger ) * Manager {
2017-03-02 21:23:16 +00:00
if o . Do == nil {
2018-11-19 11:31:16 +00:00
o . Do = do
2017-03-02 21:23:16 +00:00
}
2017-08-11 18:45:52 +00:00
if logger == nil {
logger = log . NewNopLogger ( )
}
2017-03-02 21:23:16 +00:00
2018-01-30 17:45:37 +00:00
n := & Manager {
2024-06-26 10:32:04 +00:00
queue : make ( [ ] * Alert , 0 , o . QueueCapacity ) ,
more : make ( chan struct { } , 1 ) ,
stopRequested : make ( chan struct { } ) ,
stopOnce : & sync . Once { } ,
opts : o ,
logger : logger ,
2017-03-02 21:23:16 +00:00
}
2017-03-31 18:44:30 +00:00
queueLenFunc := func ( ) float64 { return float64 ( n . queueLen ( ) ) }
2017-05-04 11:54:08 +00:00
alertmanagersDiscoveredFunc := func ( ) float64 { return float64 ( len ( n . Alertmanagers ( ) ) ) }
2017-08-11 18:45:52 +00:00
n . metrics = newAlertMetrics (
o . Registerer ,
o . QueueCapacity ,
queueLenFunc ,
alertmanagersDiscoveredFunc ,
)
2017-03-31 18:44:30 +00:00
return n
2013-07-30 15:18:07 +00:00
}
2015-09-01 20:17:02 +00:00
// ApplyConfig updates the status state as the new config requires.
2018-01-30 17:45:37 +00:00
func ( n * Manager ) ApplyConfig ( conf * config . Config ) error {
2015-09-01 20:17:02 +00:00
n . mtx . Lock ( )
defer n . mtx . Unlock ( )
2015-12-10 15:31:50 +00:00
n . opts . ExternalLabels = conf . GlobalConfig . ExternalLabels
2016-08-09 11:09:36 +00:00
n . opts . RelabelConfigs = conf . AlertingConfig . AlertRelabelConfigs
2016-11-23 16:03:22 +00:00
2017-12-30 17:27:50 +00:00
amSets := make ( map [ string ] * alertmanagerSet )
2016-11-23 16:03:22 +00:00
2019-12-12 16:00:19 +00:00
for k , cfg := range conf . AlertingConfig . AlertmanagerConfigs . ToMap ( ) {
ams , err := newAlertmanagerSet ( cfg , n . logger , n . metrics )
2016-11-23 16:03:22 +00:00
if err != nil {
return err
}
2017-03-02 18:28:15 +00:00
2019-12-12 16:00:19 +00:00
amSets [ k ] = ams
2016-11-23 16:03:22 +00:00
}
n . alertmanagers = amSets
2016-07-11 14:24:54 +00:00
return nil
2015-09-01 20:17:02 +00:00
}
2015-12-10 15:31:50 +00:00
const maxBatchSize = 64
2018-01-30 17:45:37 +00:00
func ( n * Manager ) queueLen ( ) int {
2015-09-01 20:17:02 +00:00
n . mtx . RLock ( )
defer n . mtx . RUnlock ( )
2015-12-10 15:31:50 +00:00
return len ( n . queue )
}
2013-07-30 15:18:07 +00:00
2018-01-30 17:45:37 +00:00
func ( n * Manager ) nextBatch ( ) [ ] * Alert {
2015-12-10 15:31:50 +00:00
n . mtx . Lock ( )
defer n . mtx . Unlock ( )
2016-12-24 23:37:46 +00:00
var alerts [ ] * Alert
2015-12-10 15:31:50 +00:00
if len ( n . queue ) > maxBatchSize {
2016-12-24 23:37:46 +00:00
alerts = append ( make ( [ ] * Alert , 0 , maxBatchSize ) , n . queue [ : maxBatchSize ] ... )
2015-12-10 15:31:50 +00:00
n . queue = n . queue [ maxBatchSize : ]
} else {
2016-12-24 23:37:46 +00:00
alerts = append ( make ( [ ] * Alert , 0 , len ( n . queue ) ) , n . queue ... )
2015-12-10 15:31:50 +00:00
n . queue = n . queue [ : 0 ]
2013-07-30 15:18:07 +00:00
}
2015-12-10 15:31:50 +00:00
return alerts
2013-07-30 15:18:07 +00:00
}
2024-06-26 10:32:04 +00:00
// Run dispatches notifications continuously, returning once Stop has been called and all
// pending notifications have been drained from the queue (if draining is enabled).
//
// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other.
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
func ( n * Manager ) Run ( tsets <- chan map [ string ] [ ] * targetgroup . Group ) {
wg := sync . WaitGroup { }
wg . Add ( 2 )
go func ( ) {
defer wg . Done ( )
n . targetUpdateLoop ( tsets )
} ( )
go func ( ) {
defer wg . Done ( )
n . sendLoop ( )
n . drainQueue ( )
} ( )
wg . Wait ( )
level . Info ( n . logger ) . Log ( "msg" , "Notification manager stopped" )
}
2024-06-03 16:09:51 +00:00
// sendLoop continuously consumes the notifications queue and sends alerts to
// the configured Alertmanagers.
func ( n * Manager ) sendLoop ( ) {
2015-12-10 15:31:50 +00:00
for {
2024-06-26 10:32:04 +00:00
// If we've been asked to stop, that takes priority over sending any further notifications.
2015-12-10 15:31:50 +00:00
select {
2024-06-26 10:32:04 +00:00
case <- n . stopRequested :
2015-12-10 15:31:50 +00:00
return
2024-06-26 10:32:04 +00:00
default :
select {
case <- n . stopRequested :
return
2015-12-10 15:31:50 +00:00
2024-06-26 10:32:04 +00:00
case <- n . more :
n . sendOneBatch ( )
// If the queue still has items left, kick off the next iteration.
if n . queueLen ( ) > 0 {
n . setMore ( )
}
}
2015-12-10 15:31:50 +00:00
}
2013-07-30 15:18:07 +00:00
}
2014-10-10 12:19:02 +00:00
}
2024-06-26 10:32:04 +00:00
// targetUpdateLoop receives updates of target groups and triggers a reload.
func ( n * Manager ) targetUpdateLoop ( tsets <- chan map [ string ] [ ] * targetgroup . Group ) {
2024-06-03 16:09:51 +00:00
for {
2024-06-26 10:32:04 +00:00
// If we've been asked to stop, that takes priority over processing any further target group updates.
2024-06-03 16:09:51 +00:00
select {
2024-06-26 10:32:04 +00:00
case <- n . stopRequested :
2024-06-03 16:09:51 +00:00
return
2024-06-26 10:32:04 +00:00
default :
select {
case <- n . stopRequested :
return
case ts := <- tsets :
n . reload ( ts )
}
}
}
}
func ( n * Manager ) sendOneBatch ( ) {
alerts := n . nextBatch ( )
if ! n . sendAll ( alerts ... ) {
n . metrics . dropped . Add ( float64 ( len ( alerts ) ) )
}
}
func ( n * Manager ) drainQueue ( ) {
if ! n . opts . DrainOnShutdown {
if n . queueLen ( ) > 0 {
level . Warn ( n . logger ) . Log ( "msg" , "Draining remaining notifications on shutdown is disabled, and some notifications have been dropped" , "count" , n . queueLen ( ) )
n . metrics . dropped . Add ( float64 ( n . queueLen ( ) ) )
2024-06-03 16:09:51 +00:00
}
2024-06-26 10:32:04 +00:00
return
}
level . Info ( n . logger ) . Log ( "msg" , "Draining any remaining notifications..." )
for n . queueLen ( ) > 0 {
n . sendOneBatch ( )
2024-06-03 16:09:51 +00:00
}
2024-06-26 10:32:04 +00:00
level . Info ( n . logger ) . Log ( "msg" , "Remaining notifications drained" )
2024-06-03 16:09:51 +00:00
}
2018-01-30 17:45:37 +00:00
func ( n * Manager ) reload ( tgs map [ string ] [ ] * targetgroup . Group ) {
2017-12-30 17:27:50 +00:00
n . mtx . Lock ( )
defer n . mtx . Unlock ( )
for id , tgroup := range tgs {
am , ok := n . alertmanagers [ id ]
if ! ok {
level . Error ( n . logger ) . Log ( "msg" , "couldn't sync alert manager set" , "err" , fmt . Sprintf ( "invalid id:%v" , id ) )
continue
}
am . sync ( tgroup )
}
}
2016-02-29 21:58:32 +00:00
// Send queues the given notification requests for processing.
2016-01-18 15:47:31 +00:00
// Panics if called on a handler that is not running.
2018-01-30 17:45:37 +00:00
func ( n * Manager ) Send ( alerts ... * Alert ) {
2015-12-10 15:31:50 +00:00
n . mtx . Lock ( )
defer n . mtx . Unlock ( )
2023-07-11 21:44:23 +00:00
alerts = relabelAlerts ( n . opts . RelabelConfigs , n . opts . ExternalLabels , alerts )
2019-05-06 07:02:40 +00:00
if len ( alerts ) == 0 {
return
}
2016-08-09 08:08:15 +00:00
2015-12-10 15:31:50 +00:00
// Queue capacity should be significantly larger than a single alert
// batch could be.
if d := len ( alerts ) - n . opts . QueueCapacity ; d > 0 {
alerts = alerts [ d : ]
2017-08-11 18:45:52 +00:00
level . Warn ( n . logger ) . Log ( "msg" , "Alert batch larger than queue capacity, dropping alerts" , "num_dropped" , d )
2017-03-02 21:23:16 +00:00
n . metrics . dropped . Add ( float64 ( d ) )
2015-12-10 15:31:50 +00:00
}
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := ( len ( n . queue ) + len ( alerts ) ) - n . opts . QueueCapacity ; d > 0 {
n . queue = n . queue [ d : ]
2017-08-11 18:45:52 +00:00
level . Warn ( n . logger ) . Log ( "msg" , "Alert notification queue full, dropping alerts" , "num_dropped" , d )
2017-03-02 21:23:16 +00:00
n . metrics . dropped . Add ( float64 ( d ) )
2015-12-10 15:31:50 +00:00
}
n . queue = append ( n . queue , alerts ... )
// Notify sending goroutine that there are alerts to be processed.
n . setMore ( )
}
2023-07-11 21:44:23 +00:00
func relabelAlerts ( relabelConfigs [ ] * relabel . Config , externalLabels labels . Labels , alerts [ ] * Alert ) [ ] * Alert {
2023-03-22 12:26:17 +00:00
lb := labels . NewBuilder ( labels . EmptyLabels ( ) )
2016-12-29 15:53:11 +00:00
var relabeledAlerts [ ] * Alert
2016-12-24 23:37:46 +00:00
2023-03-22 12:26:17 +00:00
for _ , a := range alerts {
lb . Reset ( a . Labels )
2023-07-22 22:37:30 +00:00
externalLabels . Range ( func ( l labels . Label ) {
2023-03-22 12:26:17 +00:00
if a . Labels . Get ( l . Name ) == "" {
lb . Set ( l . Name , l . Value )
}
} )
2023-07-11 21:44:23 +00:00
keep := relabel . ProcessBuilder ( lb , relabelConfigs ... )
2023-03-22 12:26:17 +00:00
if ! keep {
continue
2016-12-29 15:53:11 +00:00
}
2023-03-22 12:26:17 +00:00
a . Labels = lb . Labels ( )
relabeledAlerts = append ( relabeledAlerts , a )
2016-12-29 15:53:11 +00:00
}
return relabeledAlerts
2016-08-09 08:08:15 +00:00
}
2015-12-10 15:31:50 +00:00
// setMore signals that the alert queue has items.
2018-01-30 17:45:37 +00:00
func ( n * Manager ) setMore ( ) {
2015-12-10 15:31:50 +00:00
// If we cannot send on the channel, it means the signal already exists
// and has not been consumed yet.
select {
case n . more <- struct { } { } :
default :
}
}
2017-04-25 05:42:33 +00:00
// Alertmanagers returns a slice of Alertmanager URLs.
2018-01-30 17:45:37 +00:00
func ( n * Manager ) Alertmanagers ( ) [ ] * url . URL {
2016-11-23 17:23:09 +00:00
n . mtx . RLock ( )
amSets := n . alertmanagers
n . mtx . RUnlock ( )
2017-04-25 05:42:33 +00:00
var res [ ] * url . URL
2016-11-23 17:23:09 +00:00
for _ , ams := range amSets {
ams . mtx . RLock ( )
for _ , am := range ams . ams {
res = append ( res , am . url ( ) )
}
ams . mtx . RUnlock ( )
}
return res
}
2018-02-21 09:00:07 +00:00
// DroppedAlertmanagers returns a slice of Alertmanager URLs.
func ( n * Manager ) DroppedAlertmanagers ( ) [ ] * url . URL {
n . mtx . RLock ( )
amSets := n . alertmanagers
n . mtx . RUnlock ( )
var res [ ] * url . URL
for _ , ams := range amSets {
ams . mtx . RLock ( )
for _ , dam := range ams . droppedAms {
res = append ( res , dam . url ( ) )
}
ams . mtx . RUnlock ( )
}
return res
}
2016-11-24 14:17:50 +00:00
// sendAll sends the alerts to all configured Alertmanagers concurrently.
2016-11-25 07:47:04 +00:00
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
2018-01-30 17:45:37 +00:00
func ( n * Manager ) sendAll ( alerts ... * Alert ) bool {
2019-12-11 10:03:50 +00:00
if len ( alerts ) == 0 {
return true
}
2016-06-02 12:25:19 +00:00
begin := time . Now ( )
2019-04-18 12:17:03 +00:00
// v1Payload and v2Payload represent 'alerts' marshaled for Alertmanager API
// v1 or v2. Marshaling happens below. Reference here is for caching between
// for loop iterations.
var v1Payload , v2Payload [ ] byte
2016-06-02 12:25:19 +00:00
2016-11-23 16:03:22 +00:00
n . mtx . RLock ( )
amSets := n . alertmanagers
n . mtx . RUnlock ( )
2015-12-10 15:31:50 +00:00
2016-06-02 12:25:19 +00:00
var (
2016-11-23 16:03:22 +00:00
wg sync . WaitGroup
2020-07-30 07:45:42 +00:00
numSuccess atomic . Uint64
2016-06-02 12:25:19 +00:00
)
2016-11-23 16:03:22 +00:00
for _ , ams := range amSets {
2019-04-18 12:17:03 +00:00
var (
2023-07-11 21:44:23 +00:00
payload [ ] byte
err error
amAlerts = alerts
2019-04-18 12:17:03 +00:00
)
2016-11-23 16:03:22 +00:00
ams . mtx . RLock ( )
2024-06-10 19:26:36 +00:00
if len ( ams . ams ) == 0 {
ams . mtx . RUnlock ( )
continue
}
2024-06-03 16:09:51 +00:00
2023-07-11 21:44:23 +00:00
if len ( ams . cfg . AlertRelabelConfigs ) > 0 {
amAlerts = relabelAlerts ( ams . cfg . AlertRelabelConfigs , labels . Labels { } , alerts )
if len ( amAlerts ) == 0 {
2024-03-29 13:32:39 +00:00
ams . mtx . RUnlock ( )
2023-07-11 21:44:23 +00:00
continue
}
2024-03-08 09:14:22 +00:00
// We can't use the cached values from previous iteration.
v1Payload , v2Payload = nil , nil
2023-07-11 21:44:23 +00:00
}
2019-04-18 12:17:03 +00:00
switch ams . cfg . APIVersion {
case config . AlertmanagerAPIVersionV1 :
{
if v1Payload == nil {
2023-07-11 21:44:23 +00:00
v1Payload , err = json . Marshal ( amAlerts )
2019-04-18 12:17:03 +00:00
if err != nil {
level . Error ( n . logger ) . Log ( "msg" , "Encoding alerts for Alertmanager API v1 failed" , "err" , err )
2020-04-23 08:49:57 +00:00
ams . mtx . RUnlock ( )
2019-04-18 12:17:03 +00:00
return false
}
}
payload = v1Payload
}
case config . AlertmanagerAPIVersionV2 :
{
if v2Payload == nil {
2023-07-11 21:44:23 +00:00
openAPIAlerts := alertsToOpenAPIAlerts ( amAlerts )
2019-04-18 12:17:03 +00:00
v2Payload , err = json . Marshal ( openAPIAlerts )
if err != nil {
level . Error ( n . logger ) . Log ( "msg" , "Encoding alerts for Alertmanager API v2 failed" , "err" , err )
2020-04-23 08:49:57 +00:00
ams . mtx . RUnlock ( )
2019-04-18 12:17:03 +00:00
return false
}
}
payload = v2Payload
}
default :
{
level . Error ( n . logger ) . Log (
"msg" , fmt . Sprintf ( "Invalid Alertmanager API version '%v', expected one of '%v'" , ams . cfg . APIVersion , config . SupportedAlertmanagerAPIVersions ) ,
"err" , err ,
)
2020-04-23 08:49:57 +00:00
ams . mtx . RUnlock ( )
2019-04-18 12:17:03 +00:00
return false
}
}
2024-03-08 09:14:22 +00:00
if len ( ams . cfg . AlertRelabelConfigs ) > 0 {
// We can't use the cached values on the next iteration.
v1Payload , v2Payload = nil , nil
}
2016-11-23 16:03:22 +00:00
for _ , am := range ams . ams {
wg . Add ( 1 )
2024-06-26 10:32:04 +00:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Duration ( ams . cfg . Timeout ) )
2016-11-23 16:03:22 +00:00
defer cancel ( )
2024-03-08 09:20:36 +00:00
go func ( ctx context . Context , client * http . Client , url string , payload [ ] byte , count int ) {
2019-04-18 12:17:03 +00:00
if err := n . sendOne ( ctx , client , url , payload ) ; err != nil {
2024-03-08 09:20:36 +00:00
level . Error ( n . logger ) . Log ( "alertmanager" , url , "count" , count , "msg" , "Error sending alert" , "err" , err )
2019-04-18 12:17:03 +00:00
n . metrics . errors . WithLabelValues ( url ) . Inc ( )
2016-11-23 16:03:22 +00:00
} else {
2020-07-30 07:45:42 +00:00
numSuccess . Inc ( )
2016-11-23 16:03:22 +00:00
}
2019-04-18 12:17:03 +00:00
n . metrics . latency . WithLabelValues ( url ) . Observe ( time . Since ( begin ) . Seconds ( ) )
2023-07-11 21:44:23 +00:00
n . metrics . sent . WithLabelValues ( url ) . Add ( float64 ( len ( amAlerts ) ) )
2016-06-02 12:25:19 +00:00
2016-11-23 16:03:22 +00:00
wg . Done ( )
2024-03-08 09:20:36 +00:00
} ( ctx , ams . client , am . url ( ) . String ( ) , payload , len ( amAlerts ) )
2016-11-23 16:03:22 +00:00
}
2019-04-18 12:17:03 +00:00
2016-11-23 16:03:22 +00:00
ams . mtx . RUnlock ( )
2015-12-10 15:31:50 +00:00
}
2019-04-18 12:17:03 +00:00
2016-06-02 12:25:19 +00:00
wg . Wait ( )
2020-07-30 07:45:42 +00:00
return numSuccess . Load ( ) > 0
2014-10-10 12:19:02 +00:00
}
2019-04-18 12:17:03 +00:00
func alertsToOpenAPIAlerts ( alerts [ ] * Alert ) models . PostableAlerts {
openAPIAlerts := models . PostableAlerts { }
for _ , a := range alerts {
start := strfmt . DateTime ( a . StartsAt )
end := strfmt . DateTime ( a . EndsAt )
openAPIAlerts = append ( openAPIAlerts , & models . PostableAlert {
Annotations : labelsToOpenAPILabelSet ( a . Annotations ) ,
EndsAt : end ,
StartsAt : start ,
Alert : models . Alert {
GeneratorURL : strfmt . URI ( a . GeneratorURL ) ,
Labels : labelsToOpenAPILabelSet ( a . Labels ) ,
} ,
} )
}
return openAPIAlerts
}
func labelsToOpenAPILabelSet ( modelLabelSet labels . Labels ) models . LabelSet {
apiLabelSet := models . LabelSet { }
2022-03-09 22:21:36 +00:00
modelLabelSet . Range ( func ( label labels . Label ) {
2019-09-06 10:37:46 +00:00
apiLabelSet [ label . Name ] = label . Value
2022-03-09 22:21:36 +00:00
} )
2019-04-18 12:17:03 +00:00
return apiLabelSet
}
2018-01-30 17:45:37 +00:00
func ( n * Manager ) sendOne ( ctx context . Context , c * http . Client , url string , b [ ] byte ) error {
2024-04-08 19:26:23 +00:00
req , err := http . NewRequest ( http . MethodPost , url , bytes . NewReader ( b ) )
2017-02-27 19:31:16 +00:00
if err != nil {
return err
}
2018-11-23 14:49:49 +00:00
req . Header . Set ( "User-Agent" , userAgent )
2017-02-27 19:31:16 +00:00
req . Header . Set ( "Content-Type" , contentTypeJSON )
resp , err := n . opts . Do ( ctx , c , req )
2016-11-25 10:11:28 +00:00
if err != nil {
return err
}
2019-04-18 08:50:37 +00:00
defer func ( ) {
2022-04-27 09:24:36 +00:00
io . Copy ( io . Discard , resp . Body )
2019-04-18 08:50:37 +00:00
resp . Body . Close ( )
} ( )
2016-11-25 10:11:28 +00:00
// Any HTTP status 2xx is OK.
if resp . StatusCode / 100 != 2 {
2022-06-16 08:38:27 +00:00
return fmt . Errorf ( "bad response status %s" , resp . Status )
2016-11-25 10:11:28 +00:00
}
2019-04-18 12:17:03 +00:00
2019-09-06 10:38:47 +00:00
return nil
2016-11-25 10:11:28 +00:00
}
2024-06-26 10:32:04 +00:00
// Stop signals the notification manager to shut down and immediately returns.
//
// Run will return once the notification manager has successfully shut down.
//
// The manager will optionally drain any queued notifications before shutting down.
//
// Stop is safe to call multiple times.
2018-01-30 17:45:37 +00:00
func ( n * Manager ) Stop ( ) {
2018-01-17 18:14:24 +00:00
level . Info ( n . logger ) . Log ( "msg" , "Stopping notification manager..." )
2024-06-26 10:32:04 +00:00
n . stopOnce . Do ( func ( ) {
close ( n . stopRequested )
} )
2013-07-30 15:18:07 +00:00
}
2014-06-18 17:43:15 +00:00
2021-10-28 00:01:28 +00:00
// Alertmanager holds Alertmanager endpoint information.
2016-11-25 10:11:28 +00:00
type alertmanager interface {
2017-04-25 05:42:33 +00:00
url ( ) * url . URL
2016-11-23 16:03:22 +00:00
}
2016-12-29 15:53:11 +00:00
type alertmanagerLabels struct { labels . Labels }
2016-11-25 10:11:28 +00:00
2016-11-23 16:03:22 +00:00
const pathLabel = "__alerts_path__"
2017-04-25 05:42:33 +00:00
func ( a alertmanagerLabels ) url ( ) * url . URL {
return & url . URL {
2016-12-29 15:53:11 +00:00
Scheme : a . Get ( model . SchemeLabel ) ,
Host : a . Get ( model . AddressLabel ) ,
Path : a . Get ( pathLabel ) ,
2016-11-23 16:03:22 +00:00
}
}
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
// discovery definitions that have a common configuration on how alerts should be sent.
type alertmanagerSet struct {
2016-11-24 14:17:50 +00:00
cfg * config . AlertmanagerConfig
2016-11-23 16:03:22 +00:00
client * http . Client
2017-03-02 21:23:16 +00:00
metrics * alertMetrics
2018-02-21 09:00:07 +00:00
mtx sync . RWMutex
ams [ ] alertmanager
droppedAms [ ] alertmanager
logger log . Logger
2016-11-23 16:03:22 +00:00
}
2019-12-12 16:00:19 +00:00
func newAlertmanagerSet ( cfg * config . AlertmanagerConfig , logger log . Logger , metrics * alertMetrics ) ( * alertmanagerSet , error ) {
2021-09-26 21:16:12 +00:00
client , err := config_util . NewClientFromConfig ( cfg . HTTPClientConfig , "alertmanager" )
2016-11-23 16:03:22 +00:00
if err != nil {
return nil , err
}
2023-09-01 01:43:48 +00:00
t := client . Transport
if cfg . SigV4Config != nil {
t , err = sigv4 . NewSigV4RoundTripper ( cfg . SigV4Config , client . Transport )
if err != nil {
return nil , err
}
}
client . Transport = t
2016-11-23 16:03:22 +00:00
s := & alertmanagerSet {
2019-12-12 16:00:19 +00:00
client : client ,
cfg : cfg ,
logger : logger ,
metrics : metrics ,
2016-11-23 16:03:22 +00:00
}
return s , nil
}
2017-12-30 17:27:50 +00:00
// sync extracts a deduplicated set of Alertmanager endpoints from a list
2016-11-23 16:03:22 +00:00
// of target groups definitions.
2017-12-30 17:27:50 +00:00
func ( s * alertmanagerSet ) sync ( tgs [ ] * targetgroup . Group ) {
2018-02-21 09:00:07 +00:00
allAms := [ ] alertmanager { }
allDroppedAms := [ ] alertmanager { }
2016-11-23 16:03:22 +00:00
for _ , tg := range tgs {
2021-10-28 00:01:28 +00:00
ams , droppedAms , err := AlertmanagerFromGroup ( tg , s . cfg )
2016-11-23 16:03:22 +00:00
if err != nil {
2017-08-11 18:45:52 +00:00
level . Error ( s . logger ) . Log ( "msg" , "Creating discovered Alertmanagers failed" , "err" , err )
2016-11-23 16:03:22 +00:00
continue
}
2018-02-21 09:00:07 +00:00
allAms = append ( allAms , ams ... )
allDroppedAms = append ( allDroppedAms , droppedAms ... )
2016-11-23 16:03:22 +00:00
}
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
// Set new Alertmanagers and deduplicate them along their unique URL.
s . ams = [ ] alertmanager { }
2018-02-21 09:00:07 +00:00
s . droppedAms = [ ] alertmanager { }
s . droppedAms = append ( s . droppedAms , allDroppedAms ... )
2016-11-23 16:03:22 +00:00
seen := map [ string ] struct { } { }
2018-02-21 09:00:07 +00:00
for _ , am := range allAms {
2017-04-25 05:42:33 +00:00
us := am . url ( ) . String ( )
2016-11-23 16:03:22 +00:00
if _ , ok := seen [ us ] ; ok {
continue
}
2018-11-27 16:44:29 +00:00
// This will initialize the Counters for the AM to 0.
2017-03-02 21:23:16 +00:00
s . metrics . sent . WithLabelValues ( us )
s . metrics . errors . WithLabelValues ( us )
2017-03-02 18:28:15 +00:00
2016-11-23 16:03:22 +00:00
seen [ us ] = struct { } { }
s . ams = append ( s . ams , am )
}
}
2019-04-18 12:17:03 +00:00
func postPath ( pre string , v config . AlertmanagerAPIVersion ) string {
alertPushEndpoint := fmt . Sprintf ( "/api/%v/alerts" , string ( v ) )
2016-11-23 16:03:22 +00:00
return path . Join ( "/" , pre , alertPushEndpoint )
}
2021-10-28 00:01:28 +00:00
// AlertmanagerFromGroup extracts a list of alertmanagers from a target group
2018-04-27 12:04:02 +00:00
// and an associated AlertmanagerConfig.
2021-10-28 00:01:28 +00:00
func AlertmanagerFromGroup ( tg * targetgroup . Group , cfg * config . AlertmanagerConfig ) ( [ ] alertmanager , [ ] alertmanager , error ) {
2016-11-23 16:03:22 +00:00
var res [ ] alertmanager
2018-02-21 09:00:07 +00:00
var droppedAlertManagers [ ] alertmanager
2023-03-22 12:11:49 +00:00
lb := labels . NewBuilder ( labels . EmptyLabels ( ) )
2016-11-23 16:03:22 +00:00
2016-12-29 15:53:11 +00:00
for _ , tlset := range tg . Targets {
2023-03-22 12:11:49 +00:00
lb . Reset ( labels . EmptyLabels ( ) )
2016-12-29 15:53:11 +00:00
for ln , lv := range tlset {
2023-03-22 12:11:49 +00:00
lb . Set ( string ( ln ) , string ( lv ) )
2016-12-29 15:53:11 +00:00
}
2016-11-23 16:03:22 +00:00
// Set configured scheme as the initial scheme label for overwrite.
2023-03-22 12:11:49 +00:00
lb . Set ( model . SchemeLabel , cfg . Scheme )
lb . Set ( pathLabel , postPath ( cfg . PathPrefix , cfg . APIVersion ) )
2016-11-23 16:03:22 +00:00
// Combine target labels with target group labels.
for ln , lv := range tg . Labels {
2016-12-29 15:53:11 +00:00
if _ , ok := tlset [ ln ] ; ! ok {
2023-03-22 12:11:49 +00:00
lb . Set ( string ( ln ) , string ( lv ) )
2016-11-23 16:03:22 +00:00
}
}
2016-12-29 15:53:11 +00:00
2023-03-22 12:11:49 +00:00
preRelabel := lb . Labels ( )
keep := relabel . ProcessBuilder ( lb , cfg . RelabelConfigs ... )
2022-03-09 22:21:36 +00:00
if ! keep {
2023-03-22 12:11:49 +00:00
droppedAlertManagers = append ( droppedAlertManagers , alertmanagerLabels { preRelabel } )
2016-11-23 16:03:22 +00:00
continue
}
2023-03-22 12:11:49 +00:00
addr := lb . Get ( model . AddressLabel )
2016-12-29 15:53:11 +00:00
if err := config . CheckTargetAddress ( model . LabelValue ( addr ) ) ; err != nil {
2018-02-21 09:00:07 +00:00
return nil , nil , err
2016-11-23 16:03:22 +00:00
}
2023-03-22 12:11:49 +00:00
res = append ( res , alertmanagerLabels { lb . Labels ( ) } )
2016-11-23 16:03:22 +00:00
}
2018-02-21 09:00:07 +00:00
return res , droppedAlertManagers , nil
2016-11-23 16:03:22 +00:00
}