2018-05-14 12:36:24 +00:00
// Copyright 2018 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-08-09 09:04:01 +00:00
package dispatch
2015-07-02 16:38:05 +00:00
import (
2018-11-09 09:00:23 +00:00
"context"
2015-09-26 09:12:47 +00:00
"fmt"
2015-11-10 13:52:04 +00:00
"sort"
2015-07-02 16:38:05 +00:00
"sync"
"time"
2021-07-30 08:11:43 +00:00
"github.com/go-kit/log"
"github.com/go-kit/log/level"
2019-11-26 08:04:56 +00:00
"github.com/prometheus/client_golang/prometheus"
2015-07-02 16:38:05 +00:00
"github.com/prometheus/common/model"
2015-09-25 11:12:51 +00:00
2015-09-29 13:12:31 +00:00
"github.com/prometheus/alertmanager/notify"
2015-09-25 11:12:51 +00:00
"github.com/prometheus/alertmanager/provider"
2018-09-03 12:52:53 +00:00
"github.com/prometheus/alertmanager/store"
2015-09-25 12:38:57 +00:00
"github.com/prometheus/alertmanager/types"
2015-07-02 16:38:05 +00:00
)
2019-11-26 08:04:56 +00:00
// DispatcherMetrics represents metrics associated to a dispatcher.
type DispatcherMetrics struct {
2021-05-05 15:26:37 +00:00
aggrGroups prometheus . Gauge
processingDuration prometheus . Summary
aggrGroupLimitReached prometheus . Counter
2019-11-26 08:04:56 +00:00
}
// NewDispatcherMetrics returns a new registered DispatchMetrics.
2021-05-20 06:49:16 +00:00
func NewDispatcherMetrics ( registerLimitMetrics bool , r prometheus . Registerer ) * DispatcherMetrics {
2019-11-26 08:04:56 +00:00
m := DispatcherMetrics {
aggrGroups : prometheus . NewGauge (
prometheus . GaugeOpts {
Name : "alertmanager_dispatcher_aggregation_groups" ,
Help : "Number of active aggregation groups" ,
} ,
) ,
processingDuration : prometheus . NewSummary (
prometheus . SummaryOpts {
Name : "alertmanager_dispatcher_alert_processing_duration_seconds" ,
Help : "Summary of latencies for the processing of alerts." ,
} ,
) ,
2021-05-05 15:26:37 +00:00
aggrGroupLimitReached : prometheus . NewCounter (
prometheus . CounterOpts {
Name : "alertmanager_dispatcher_aggregation_group_limit_reached_total" ,
Help : "Number of times when dispatcher failed to create new aggregation group due to limit." ,
} ,
) ,
2019-11-26 08:04:56 +00:00
}
2020-03-06 14:09:30 +00:00
if r != nil {
2021-05-20 06:49:16 +00:00
r . MustRegister ( m . aggrGroups , m . processingDuration )
if registerLimitMetrics {
r . MustRegister ( m . aggrGroupLimitReached )
}
2020-03-06 14:09:30 +00:00
}
2019-11-26 08:04:56 +00:00
return & m
}
2015-09-24 22:15:27 +00:00
// Dispatcher sorts incoming alerts into aggregation groups and
// assigns the correct notifiers to each.
2015-07-02 16:38:05 +00:00
type Dispatcher struct {
2019-11-26 08:04:56 +00:00
route * Route
alerts provider . Alerts
stage notify . Stage
metrics * DispatcherMetrics
2021-05-05 15:26:37 +00:00
limits Limits
2015-07-02 16:38:05 +00:00
2016-09-05 11:19:14 +00:00
timeout func ( time . Duration ) time . Duration
2015-11-09 13:34:57 +00:00
2021-05-05 15:26:37 +00:00
mtx sync . RWMutex
aggrGroupsPerRoute map [ * Route ] map [ model . Fingerprint ] * aggrGroup
aggrGroupsNum int
2015-07-04 12:41:10 +00:00
2015-09-24 22:15:27 +00:00
done chan struct { }
ctx context . Context
cancel func ( )
2015-09-29 09:58:30 +00:00
2017-10-22 05:59:33 +00:00
logger log . Logger
2015-07-02 16:38:05 +00:00
}
2021-05-05 15:26:37 +00:00
// Limits describes limits used by Dispatcher.
type Limits interface {
// MaxNumberOfAggregationGroups returns max number of aggregation groups that dispatcher can have.
// 0 or negative value = unlimited.
// If dispatcher hits this limit, it will not create additional groups, but will log an error instead.
MaxNumberOfAggregationGroups ( ) int
}
2015-09-24 22:15:27 +00:00
// NewDispatcher returns a new Dispatcher.
2016-09-05 11:19:14 +00:00
func NewDispatcher (
ap provider . Alerts ,
r * Route ,
s notify . Stage ,
mk types . Marker ,
to func ( time . Duration ) time . Duration ,
2021-05-05 15:26:37 +00:00
lim Limits ,
2017-10-22 05:59:33 +00:00
l log . Logger ,
2019-11-26 08:04:56 +00:00
m * DispatcherMetrics ,
2016-09-05 11:19:14 +00:00
) * Dispatcher {
2021-05-05 15:26:37 +00:00
if lim == nil {
lim = nilLimits { }
}
2015-10-11 14:54:39 +00:00
disp := & Dispatcher {
2016-09-05 11:19:14 +00:00
alerts : ap ,
stage : s ,
route : r ,
timeout : to ,
2017-10-22 05:59:33 +00:00
logger : log . With ( l , "component" , "dispatcher" ) ,
2019-11-26 08:04:56 +00:00
metrics : m ,
2021-05-05 15:26:37 +00:00
limits : lim ,
2015-09-27 11:09:02 +00:00
}
2015-10-11 14:54:39 +00:00
return disp
2015-07-09 13:01:38 +00:00
}
2015-09-24 22:15:27 +00:00
// Run starts dispatching alerts incoming via the updates channel.
2015-09-25 11:44:00 +00:00
func ( d * Dispatcher ) Run ( ) {
2015-09-24 22:15:27 +00:00
d . done = make ( chan struct { } )
2015-11-19 12:47:31 +00:00
d . mtx . Lock ( )
2021-05-05 15:26:37 +00:00
d . aggrGroupsPerRoute = map [ * Route ] map [ model . Fingerprint ] * aggrGroup { }
d . aggrGroupsNum = 0
2019-11-26 08:04:56 +00:00
d . metrics . aggrGroups . Set ( 0 )
2015-09-25 11:44:00 +00:00
d . ctx , d . cancel = context . WithCancel ( context . Background ( ) )
2020-03-19 14:32:37 +00:00
d . mtx . Unlock ( )
2015-09-25 11:44:00 +00:00
2015-09-29 08:00:02 +00:00
d . run ( d . alerts . Subscribe ( ) )
2015-10-11 14:54:39 +00:00
close ( d . done )
2015-09-25 11:44:00 +00:00
}
2015-09-29 08:00:02 +00:00
func ( d * Dispatcher ) run ( it provider . AlertIterator ) {
2015-10-16 15:45:15 +00:00
cleanup := time . NewTicker ( 30 * time . Second )
2015-09-26 12:12:55 +00:00
defer cleanup . Stop ( )
2015-07-02 16:38:05 +00:00
2015-09-29 08:00:02 +00:00
defer it . Close ( )
2015-07-04 12:59:52 +00:00
for {
select {
2015-11-20 14:10:38 +00:00
case alert , ok := <- it . Next ( ) :
if ! ok {
// Iterator exhausted for some reason.
if err := it . Err ( ) ; err != nil {
2017-10-22 05:59:33 +00:00
level . Error ( d . logger ) . Log ( "msg" , "Error on alert update" , "err" , err )
2015-11-20 14:10:38 +00:00
}
return
}
2017-10-22 05:59:33 +00:00
level . Debug ( d . logger ) . Log ( "msg" , "Received alert" , "alert" , alert )
2015-09-29 09:58:30 +00:00
2015-10-11 14:54:39 +00:00
// Log errors but keep trying.
2015-09-29 08:00:02 +00:00
if err := it . Err ( ) ; err != nil {
2017-10-22 05:59:33 +00:00
level . Error ( d . logger ) . Log ( "msg" , "Error on alert update" , "err" , err )
2015-09-29 08:00:02 +00:00
continue
}
2015-09-24 22:15:27 +00:00
2019-11-26 08:04:56 +00:00
now := time . Now ( )
2015-10-19 14:17:15 +00:00
for _ , r := range d . route . Match ( alert . Labels ) {
2015-09-24 22:15:27 +00:00
d . processAlert ( alert , r )
}
2019-11-26 08:04:56 +00:00
d . metrics . processingDuration . Observe ( time . Since ( now ) . Seconds ( ) )
2015-09-24 22:15:27 +00:00
2015-09-26 12:12:55 +00:00
case <- cleanup . C :
2015-12-04 13:55:42 +00:00
d . mtx . Lock ( )
2015-11-19 12:47:31 +00:00
2021-05-05 15:26:37 +00:00
for _ , groups := range d . aggrGroupsPerRoute {
2015-10-16 14:55:56 +00:00
for _ , ag := range groups {
if ag . empty ( ) {
ag . stop ( )
delete ( groups , ag . fingerprint ( ) )
2021-05-05 15:26:37 +00:00
d . aggrGroupsNum --
2019-11-26 08:04:56 +00:00
d . metrics . aggrGroups . Dec ( )
2015-10-16 14:55:56 +00:00
}
2015-07-04 12:59:52 +00:00
}
}
2015-07-04 10:52:53 +00:00
2015-12-04 13:55:42 +00:00
d . mtx . Unlock ( )
2015-11-19 12:47:31 +00:00
2015-09-24 22:15:27 +00:00
case <- d . ctx . Done ( ) :
return
}
}
}
2015-07-04 10:52:53 +00:00
2019-03-07 16:18:18 +00:00
// AlertGroup represents how alerts exist within an aggrGroup.
type AlertGroup struct {
2019-07-24 15:12:37 +00:00
Alerts types . AlertSlice
2019-03-07 16:18:18 +00:00
Labels model . LabelSet
Receiver string
}
type AlertGroups [ ] * AlertGroup
2019-07-24 15:12:37 +00:00
func ( ag AlertGroups ) Swap ( i , j int ) { ag [ i ] , ag [ j ] = ag [ j ] , ag [ i ] }
func ( ag AlertGroups ) Less ( i , j int ) bool {
if ag [ i ] . Labels . Equal ( ag [ j ] . Labels ) {
return ag [ i ] . Receiver < ag [ j ] . Receiver
}
return ag [ i ] . Labels . Before ( ag [ j ] . Labels )
}
func ( ag AlertGroups ) Len ( ) int { return len ( ag ) }
2019-03-07 16:18:18 +00:00
// Groups returns a slice of AlertGroups from the dispatcher's internal state.
func ( d * Dispatcher ) Groups ( routeFilter func ( * Route ) bool , alertFilter func ( * types . Alert , time . Time ) bool ) ( AlertGroups , map [ model . Fingerprint ] [ ] string ) {
groups := AlertGroups { }
d . mtx . RLock ( )
defer d . mtx . RUnlock ( )
// Keep a list of receivers for an alert to prevent checking each alert
// again against all routes. The alert has already matched against this
// route on ingestion.
receivers := map [ model . Fingerprint ] [ ] string { }
2019-07-24 15:12:37 +00:00
now := time . Now ( )
2021-05-05 15:26:37 +00:00
for route , ags := range d . aggrGroupsPerRoute {
2019-03-07 16:18:18 +00:00
if ! routeFilter ( route ) {
continue
}
for _ , ag := range ags {
receiver := route . RouteOpts . Receiver
2019-07-24 15:12:37 +00:00
alertGroup := & AlertGroup {
Labels : ag . labels ,
Receiver : receiver ,
2019-03-07 16:18:18 +00:00
}
alerts := ag . alerts . List ( )
filteredAlerts := make ( [ ] * types . Alert , 0 , len ( alerts ) )
2019-04-19 12:01:41 +00:00
for _ , a := range alerts {
2019-03-07 16:18:18 +00:00
if ! alertFilter ( a , now ) {
continue
}
fp := a . Fingerprint ( )
if r , ok := receivers [ fp ] ; ok {
// Receivers slice already exists. Add
// the current receiver to the slice.
receivers [ fp ] = append ( r , receiver )
} else {
// First time we've seen this alert fingerprint.
// Initialize a new receivers slice.
receivers [ fp ] = [ ] string { receiver }
}
filteredAlerts = append ( filteredAlerts , a )
}
if len ( filteredAlerts ) == 0 {
continue
}
alertGroup . Alerts = filteredAlerts
groups = append ( groups , alertGroup )
}
}
sort . Sort ( groups )
2019-07-24 15:12:37 +00:00
for i := range groups {
sort . Sort ( groups [ i ] . Alerts )
}
for i := range receivers {
sort . Strings ( receivers [ i ] )
}
2019-03-07 16:18:18 +00:00
return groups , receivers
}
2015-09-24 22:15:27 +00:00
// Stop the dispatcher.
func ( d * Dispatcher ) Stop ( ) {
2020-03-19 14:32:37 +00:00
if d == nil {
return
}
d . mtx . Lock ( )
if d . cancel == nil {
2021-04-27 08:44:18 +00:00
d . mtx . Unlock ( )
2015-10-11 14:54:39 +00:00
return
}
2015-09-24 22:15:27 +00:00
d . cancel ( )
2015-09-25 16:14:46 +00:00
d . cancel = nil
2021-04-27 08:44:18 +00:00
d . mtx . Unlock ( )
2015-09-26 09:12:47 +00:00
2015-09-24 22:15:27 +00:00
<- d . done
}
2015-07-04 10:52:53 +00:00
2019-02-08 13:57:08 +00:00
// notifyFunc is a function that performs notification for the alert
2015-09-24 22:15:27 +00:00
// with the given fingerprint. It aborts on context cancelation.
2015-09-26 16:12:56 +00:00
// Returns false iff notifying failed.
2015-09-26 12:12:55 +00:00
type notifyFunc func ( context . Context , ... * types . Alert ) bool
2015-09-24 22:15:27 +00:00
2016-06-06 11:18:55 +00:00
// processAlert determines in which aggregation group the alert falls
2017-11-01 14:03:53 +00:00
// and inserts it.
2015-11-07 13:30:21 +00:00
func ( d * Dispatcher ) processAlert ( alert * types . Alert , route * Route ) {
2018-11-29 11:31:14 +00:00
groupLabels := getGroupLabels ( alert , route )
2015-07-02 16:38:05 +00:00
2017-11-01 14:03:53 +00:00
fp := groupLabels . Fingerprint ( )
2015-07-02 16:38:05 +00:00
2015-11-19 12:47:31 +00:00
d . mtx . Lock ( )
2018-07-10 15:13:41 +00:00
defer d . mtx . Unlock ( )
2021-05-05 15:26:37 +00:00
routeGroups , ok := d . aggrGroupsPerRoute [ route ]
2015-10-16 14:55:56 +00:00
if ! ok {
2021-05-05 15:26:37 +00:00
routeGroups = map [ model . Fingerprint ] * aggrGroup { }
d . aggrGroupsPerRoute [ route ] = routeGroups
2015-10-16 14:55:56 +00:00
}
2021-05-05 15:26:37 +00:00
ag , ok := routeGroups [ fp ]
2021-04-30 08:11:10 +00:00
if ok {
ag . insert ( alert )
return
2015-07-02 16:38:05 +00:00
}
2021-05-05 15:26:37 +00:00
// If the group does not exist, create it. But check the limit first.
if limit := d . limits . MaxNumberOfAggregationGroups ( ) ; limit > 0 && d . aggrGroupsNum >= limit {
d . metrics . aggrGroupLimitReached . Inc ( )
level . Error ( d . logger ) . Log ( "msg" , "Too many aggregation groups, cannot create new group for alert" , "groups" , d . aggrGroupsNum , "limit" , limit , "alert" , alert . Name ( ) )
return
}
2021-04-30 08:11:10 +00:00
ag = newAggrGroup ( d . ctx , groupLabels , route , d . timeout , d . logger )
2021-05-05 15:26:37 +00:00
routeGroups [ fp ] = ag
d . aggrGroupsNum ++
2021-04-30 08:11:10 +00:00
d . metrics . aggrGroups . Inc ( )
// Insert the 1st alert in the group before starting the group's run()
// function, to make sure that when the run() will be executed the 1st
// alert is already there.
2015-07-02 16:38:05 +00:00
ag . insert ( alert )
2021-04-30 08:11:10 +00:00
go ag . run ( func ( ctx context . Context , alerts ... * types . Alert ) bool {
_ , _ , err := d . stage . Exec ( ctx , d . logger , alerts ... )
if err != nil {
lvl := level . Error ( d . logger )
if ctx . Err ( ) == context . Canceled {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
lvl = level . Debug ( d . logger )
}
lvl . Log ( "msg" , "Notify for alerts failed" , "num_alerts" , len ( alerts ) , "err" , err )
}
return err == nil
} )
2015-07-04 10:52:53 +00:00
}
2015-07-02 18:48:21 +00:00
2018-11-29 11:31:14 +00:00
func getGroupLabels ( alert * types . Alert , route * Route ) model . LabelSet {
groupLabels := model . LabelSet { }
for ln , lv := range alert . Labels {
if _ , ok := route . RouteOpts . GroupBy [ ln ] ; ok || route . RouteOpts . GroupByAll {
groupLabels [ ln ] = lv
}
}
return groupLabels
}
2015-09-24 22:15:27 +00:00
// aggrGroup aggregates alert fingerprints into groups to which a
// common set of routing options applies.
// It emits notifications in the specified intervals.
type aggrGroup struct {
2017-04-21 09:43:12 +00:00
labels model . LabelSet
opts * RouteOpts
2017-10-22 05:59:33 +00:00
logger log . Logger
2017-04-21 09:43:12 +00:00
routeKey string
2015-07-02 16:38:05 +00:00
2018-09-03 12:52:53 +00:00
alerts * store . Alerts
2016-09-05 11:19:14 +00:00
ctx context . Context
cancel func ( )
done chan struct { }
next * time . Timer
timeout func ( time . Duration ) time . Duration
2015-07-02 16:38:05 +00:00
2018-03-29 10:22:49 +00:00
mtx sync . RWMutex
hasFlushed bool
2015-07-02 16:38:05 +00:00
}
2015-09-24 22:15:27 +00:00
// newAggrGroup returns a new aggregation group.
2017-10-22 05:59:33 +00:00
func newAggrGroup ( ctx context . Context , labels model . LabelSet , r * Route , to func ( time . Duration ) time . Duration , logger log . Logger ) * aggrGroup {
2016-09-05 11:19:14 +00:00
if to == nil {
to = func ( d time . Duration ) time . Duration { return d }
}
2015-07-02 18:48:21 +00:00
ag := & aggrGroup {
2017-04-21 09:43:12 +00:00
labels : labels ,
routeKey : r . Key ( ) ,
opts : & r . RouteOpts ,
timeout : to ,
2019-09-18 07:29:34 +00:00
alerts : store . NewAlerts ( ) ,
2019-06-25 08:11:45 +00:00
done : make ( chan struct { } ) ,
2015-07-02 18:48:21 +00:00
}
2015-09-24 22:15:27 +00:00
ag . ctx , ag . cancel = context . WithCancel ( ctx )
2015-07-02 16:38:05 +00:00
2017-10-22 05:59:33 +00:00
ag . logger = log . With ( logger , "aggrGroup" , ag )
2015-10-27 17:24:09 +00:00
2015-10-07 14:18:55 +00:00
// Set an initial one-time wait before flushing
// the first batch of notifications.
ag . next = time . NewTimer ( ag . opts . GroupWait )
2015-07-02 18:48:21 +00:00
return ag
}
2015-07-02 16:38:05 +00:00
2017-04-21 09:43:12 +00:00
func ( ag * aggrGroup ) fingerprint ( ) model . Fingerprint {
return ag . labels . Fingerprint ( )
}
func ( ag * aggrGroup ) GroupKey ( ) string {
return fmt . Sprintf ( "%s:%s" , ag . routeKey , ag . labels )
}
2015-09-26 15:54:49 +00:00
func ( ag * aggrGroup ) String ( ) string {
2017-04-21 09:43:12 +00:00
return ag . GroupKey ( )
2015-09-26 15:54:49 +00:00
}
2015-09-29 13:12:31 +00:00
func ( ag * aggrGroup ) run ( nf notifyFunc ) {
2015-09-24 22:15:27 +00:00
defer close ( ag . done )
2015-07-04 10:52:53 +00:00
defer ag . next . Stop ( )
2015-07-02 16:38:05 +00:00
for {
select {
2015-10-09 06:26:41 +00:00
case now := <- ag . next . C :
2018-01-11 21:45:59 +00:00
// Give the notifications time until the next flush to
2015-09-26 16:03:54 +00:00
// finish before terminating them.
2016-09-05 11:19:14 +00:00
ctx , cancel := context . WithTimeout ( ag . ctx , ag . timeout ( ag . opts . GroupInterval ) )
2015-09-24 22:15:27 +00:00
2015-10-09 06:26:41 +00:00
// The now time we retrieve from the ticker is the only reliable
// point of time reference for the subsequent notification pipeline.
2015-10-09 06:58:44 +00:00
// Calculating the current time directly is prone to flaky behavior,
2015-10-09 06:26:41 +00:00
// which usually only becomes apparent in tests.
2015-10-09 06:43:39 +00:00
ctx = notify . WithNow ( ctx , now )
2015-10-09 06:26:41 +00:00
2015-10-09 06:43:39 +00:00
// Populate context with information needed along the pipeline.
2017-04-21 09:43:12 +00:00
ctx = notify . WithGroupKey ( ctx , ag . GroupKey ( ) )
2015-10-16 14:55:56 +00:00
ctx = notify . WithGroupLabels ( ctx , ag . labels )
2016-08-16 12:22:47 +00:00
ctx = notify . WithReceiverName ( ctx , ag . opts . Receiver )
2015-10-09 06:43:39 +00:00
ctx = notify . WithRepeatInterval ( ctx , ag . opts . RepeatInterval )
2020-11-24 04:02:07 +00:00
ctx = notify . WithMuteTimeIntervals ( ctx , ag . opts . MuteTimeIntervals )
2022-03-04 14:24:29 +00:00
ctx = notify . WithActiveTimeIntervals ( ctx , ag . opts . ActiveTimeIntervals )
2015-10-08 08:50:37 +00:00
2015-07-04 10:52:53 +00:00
// Wait the configured interval before calling flush again.
2015-11-19 12:47:31 +00:00
ag . mtx . Lock ( )
2015-09-26 16:03:54 +00:00
ag . next . Reset ( ag . opts . GroupInterval )
2018-03-29 10:22:49 +00:00
ag . hasFlushed = true
2015-11-19 12:47:31 +00:00
ag . mtx . Unlock ( )
2015-07-04 10:52:53 +00:00
2015-09-26 12:12:55 +00:00
ag . flush ( func ( alerts ... * types . Alert ) bool {
2015-09-29 13:12:31 +00:00
return nf ( ctx , alerts ... )
2015-09-24 22:15:27 +00:00
} )
2015-10-09 06:58:44 +00:00
cancel ( )
2015-09-24 22:15:27 +00:00
case <- ag . ctx . Done ( ) :
2015-07-02 16:38:05 +00:00
return
}
}
}
func ( ag * aggrGroup ) stop ( ) {
2015-09-24 22:15:27 +00:00
// Calling cancel will terminate all in-process notifications
// and the run() loop.
ag . cancel ( )
<- ag . done
2015-07-02 16:38:05 +00:00
}
2018-01-18 10:12:17 +00:00
// insert inserts the alert into the aggregation group.
2015-09-25 16:14:46 +00:00
func ( ag * aggrGroup ) insert ( alert * types . Alert ) {
2018-09-03 12:52:53 +00:00
if err := ag . alerts . Set ( alert ) ; err != nil {
level . Error ( ag . logger ) . Log ( "msg" , "error on set alert" , "err" , err )
}
2015-07-02 18:48:21 +00:00
// Immediately trigger a flush if the wait duration for this
// alert is already over.
2018-09-03 12:52:53 +00:00
ag . mtx . Lock ( )
defer ag . mtx . Unlock ( )
2018-03-29 10:22:49 +00:00
if ! ag . hasFlushed && alert . StartsAt . Add ( ag . opts . GroupWait ) . Before ( time . Now ( ) ) {
2015-07-04 10:52:53 +00:00
ag . next . Reset ( 0 )
2015-07-02 16:38:05 +00:00
}
}
2015-07-02 18:48:21 +00:00
func ( ag * aggrGroup ) empty ( ) bool {
2019-04-19 12:01:41 +00:00
return ag . alerts . Empty ( )
2015-07-02 18:48:21 +00:00
}
2015-07-02 16:38:05 +00:00
2015-07-02 18:48:21 +00:00
// flush sends notifications for all new alerts.
2015-09-26 12:12:55 +00:00
func ( ag * aggrGroup ) flush ( notify func ( ... * types . Alert ) bool ) {
2015-09-27 17:50:41 +00:00
if ag . empty ( ) {
return
}
2015-07-02 16:38:05 +00:00
2015-09-26 12:12:55 +00:00
var (
2018-09-03 12:52:53 +00:00
alerts = ag . alerts . List ( )
2019-04-19 12:01:41 +00:00
alertsSlice = make ( types . AlertSlice , 0 , len ( alerts ) )
now = time . Now ( )
2015-09-26 12:12:55 +00:00
)
2019-04-19 12:01:41 +00:00
for _ , alert := range alerts {
2019-01-04 15:52:20 +00:00
a := * alert
// Ensure that alerts don't resolve as time move forwards.
if ! a . ResolvedAt ( now ) {
a . EndsAt = time . Time { }
}
alertsSlice = append ( alertsSlice , & a )
2015-07-04 10:52:53 +00:00
}
2018-06-14 13:54:33 +00:00
sort . Stable ( alertsSlice )
2018-03-22 19:06:37 +00:00
2018-09-03 12:52:53 +00:00
level . Debug ( ag . logger ) . Log ( "msg" , "flushing" , "alerts" , fmt . Sprintf ( "%v" , alertsSlice ) )
2015-09-30 12:53:52 +00:00
2015-09-26 12:12:55 +00:00
if notify ( alertsSlice ... ) {
2018-09-03 12:52:53 +00:00
for _ , a := range alertsSlice {
2015-09-26 12:12:55 +00:00
// Only delete if the fingerprint has not been inserted
// again since we notified about it.
2018-09-03 12:52:53 +00:00
fp := a . Fingerprint ( )
got , err := ag . alerts . Get ( fp )
if err != nil {
2019-09-18 07:29:34 +00:00
// This should never happen.
level . Error ( ag . logger ) . Log ( "msg" , "failed to get alert" , "err" , err , "alert" , a . String ( ) )
2018-09-03 12:52:53 +00:00
continue
}
2019-01-04 15:52:20 +00:00
if a . Resolved ( ) && got . UpdatedAt == a . UpdatedAt {
2018-09-03 12:52:53 +00:00
if err := ag . alerts . Delete ( fp ) ; err != nil {
2019-09-18 07:29:34 +00:00
level . Error ( ag . logger ) . Log ( "msg" , "error on delete alert" , "err" , err , "alert" , a . String ( ) )
2018-09-03 12:52:53 +00:00
}
2015-09-24 22:15:27 +00:00
}
2015-09-26 12:12:55 +00:00
}
2015-07-10 17:25:56 +00:00
}
2015-09-24 22:15:27 +00:00
}
2021-05-05 15:26:37 +00:00
type nilLimits struct { }
func ( n nilLimits ) MaxNumberOfAggregationGroups ( ) int { return 0 }