diff --git a/.circleci/config.yml b/.circleci/config.yml index 1011b5c8..68f5f3f5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -106,9 +106,10 @@ jobs: steps: - checkout - - run: go install github.com/monitoring-mixins/mixtool/cmd/mixtool@latest - - run: go install github.com/google/go-jsonnet/cmd/jsonnetfmt@latest - - run: make -C doc/alertmanager-mixin lint + - run: cd doc/alertmanager-mixin; go install github.com/monitoring-mixins/mixtool/cmd/mixtool@latest + - run: cd doc/alertmanager-mixin; go install github.com/google/go-jsonnet/cmd/jsonnetfmt@latest + - run: cd doc/alertmanager-mixin; go install github.com/jsonnet-bundler/jsonnet-bundler/cmd/jb@latest + - run: cd doc/alertmanager-mixin; make lint workflows: version: 2 diff --git a/CHANGELOG.md b/CHANGELOG.md index 93dc0aff..4d979022 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,32 @@ +## 0.22.0 / 2021-05-21 + +* [CHANGE] Amtool and Alertmanager binaries help now prints to stdout. #2505 +* [CHANGE] Use path relative to the configuration file for certificates and password files. #2502 +* [CHANGE] Display Silence and Alert dates in ISO8601 format. #2363 +* [FEATURE] Add date picker to silence form views. #2262 +* [FEATURE] Add support for negative matchers. #2434 #2460 and many more. +* [FEATURE] Add time-based muting to routing tree. #2393 +* [FEATURE] Support TLS and basic authentication on the web server. #2446 +* [FEATURE] Add OAuth 2.0 client support in HTTP client. #2560 +* [ENHANCEMENT] Add composite durations in the configuration (e.g. 2h20m). #2353 +* [ENHANCEMENT] Add follow_redirect option to disable following redirects. #2551 +* [ENHANCEMENT] Add metric for permanently failed notifications. #2383 +* [ENHANCEMENT] Add support for custom authorization scheme. #2499 +* [ENHANCEMENT] Add support for not following HTTP redirects. #2499 +* [ENHANCEMENT] Add support to set the Slack URL from a file. #2534 +* [ENHANCEMENT] amtool: Add alert status to extended and simple output. #2324 +* [ENHANCEMENT] Do not omit false booleans in the configuration page. #2317 +* [ENHANCEMENT] OpsGenie: Propagate labels to Opsgenie details. #2276 +* [ENHANCEMENT] PagerDuty: Filter out empty images and links. #2379 +* [ENHANCEMENT] WeChat: add markdown support. #2309 +* [BUGFIX] Fix a possible deadlock on shutdown. #2558 +* [BUGFIX] UI: Fix extended printing of regex sign. #2445 +* [BUGFIX] UI: Fix the favicon when using a path prefix. #2392 +* [BUGFIX] Make filter labels consistent with Prometheus. #2403 +* [BUGFIX] alertmanager_config_last_reload_successful takes templating failures into account. #2373 +* [BUGFIX] amtool: avoid nil dereference in silence update. #2427 +* [BUGFIX] VictorOps: Catch routing_key templating errors. #2467 + ## 0.21.0 / 2020-06-16 This release removes the HipChat integration as it is discontinued by Atlassian on June 30th 2020. diff --git a/README.md b/README.md index 85e6cc9e..7feab975 100644 --- a/README.md +++ b/README.md @@ -283,7 +283,7 @@ $ amtool silence query instance=~".+0" ID Matchers Ends At Created By Comment e48cb58a-0b17-49ba-b734-3585139b1d25 alertname=Test_Alert instance=~.+0 2017-08-02 22:41:39 UTC kellel -$ amtool silence expire $(amtool silence -q query instance=~".+0") +$ amtool silence expire $(amtool silence query -q instance=~".+0") $ amtool silence query instance=~".+0" diff --git a/VERSION b/VERSION index 88541566..21574090 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.21.0 +0.22.0 diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index e365f09b..506fc559 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -325,7 +325,7 @@ func run() int { go peer.Settle(ctx, *gossipInterval*10) } - alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, logger) + alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger) if err != nil { level.Error(logger).Log("err", err) return 1 @@ -339,11 +339,19 @@ func run() int { return disp.Groups(routeFilter, alertFilter) } + // An interface value that holds a nil concrete value is non-nil. + // Therefore we explicly pass an empty interface, to detect if the + // cluster is not enabled in notify. + var clusterPeer cluster.ClusterPeer + if peer != nil { + clusterPeer = peer + } + api, err := api.New(api.Options{ Alerts: alerts, Silences: silences, StatusFunc: marker.Status, - Peer: peer, + Peer: clusterPeer, Timeout: *httpTimeout, Concurrency: *getConcurrency, Logger: log.With(logger, "component", "api"), @@ -379,7 +387,7 @@ func run() int { tmpl *template.Template ) - dispMetrics := dispatch.NewDispatcherMetrics(prometheus.DefaultRegisterer) + dispMetrics := dispatch.NewDispatcherMetrics(false, prometheus.DefaultRegisterer) pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer) configLogger := log.With(logger, "component", "configuration") configCoordinator := config.NewCoordinator( @@ -430,6 +438,15 @@ func run() int { inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger) silencer := silence.NewSilencer(silences, marker, logger) + + // An interface value that holds a nil concrete value is non-nil. + // Therefore we explicly pass an empty interface, to detect if the + // cluster is not enabled in notify. + var pipelinePeer notify.Peer + if peer != nil { + pipelinePeer = peer + } + pipeline := pipelineBuilder.New( receivers, waitFunc, @@ -437,7 +454,7 @@ func run() int { silencer, muteTimes, notificationLog, - peer, + pipelinePeer, ) configuredReceivers.Set(float64(len(activeReceivers))) configuredIntegrations.Set(float64(integrationsNum)) @@ -447,7 +464,7 @@ func run() int { silencer.Mutes(labels) }) - disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, logger, dispMetrics) + disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { level.Warn(configLogger).Log( diff --git a/config/config.go b/config/config.go index 1691622a..e8c4287f 100644 --- a/config/config.go +++ b/config/config.go @@ -106,7 +106,7 @@ func (u URL) MarshalJSON() ([]byte, error) { if u.URL != nil { return json.Marshal(u.URL.String()) } - return nil, nil + return []byte("null"), nil } // UnmarshalJSON implements the json.Marshaler interface for URL. @@ -876,7 +876,7 @@ func (re Regexp) MarshalJSON() ([]byte, error) { if re.original != "" { return json.Marshal(re.original) } - return nil, nil + return []byte("null"), nil } // Matchers is label.Matchers with an added UnmarshalYAML method to implement the yaml.Unmarshaler interface @@ -929,7 +929,7 @@ func (m *Matchers) UnmarshalJSON(data []byte) error { // MarshalJSON implements the json.Marshaler interface for Matchers. func (m Matchers) MarshalJSON() ([]byte, error) { if len(m) == 0 { - return nil, nil + return []byte("[]"), nil } result := make([]string, len(m)) for i, matcher := range m { diff --git a/config/config_test.go b/config/config_test.go index 6687c7bd..d37860e7 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -540,23 +540,75 @@ func TestUnmarshalSecretURL(t *testing.T) { } func TestMarshalURL(t *testing.T) { - urlp, err := url.Parse("http://example.com/") - if err != nil { - t.Fatal(err) - } - u := &URL{urlp} + for name, tc := range map[string]struct { + input *URL + expectedJSON string + expectedYAML string + }{ + "url": { + input: mustParseURL("http://example.com/"), + expectedJSON: "\"http://example.com/\"", + expectedYAML: "http://example.com/\n", + }, - c, err := json.Marshal(u) - if err != nil { - t.Fatal(err) - } - require.Equal(t, "\"http://example.com/\"", string(c), "URL not properly marshaled in JSON.") + "wrapped nil value": { + input: &URL{}, + expectedJSON: "null", + expectedYAML: "null\n", + }, - c, err = yaml.Marshal(u) - if err != nil { - t.Fatal(err) + "wrapped empty URL": { + input: &URL{&url.URL{}}, + expectedJSON: "\"\"", + expectedYAML: "\"\"\n", + }, + } { + t.Run(name, func(t *testing.T) { + j, err := json.Marshal(tc.input) + require.NoError(t, err) + require.Equal(t, tc.expectedJSON, string(j), "URL not properly marshaled into JSON.") + + y, err := yaml.Marshal(tc.input) + require.NoError(t, err) + require.Equal(t, tc.expectedYAML, string(y), "URL not properly marshaled into YAML.") + }) + } +} + +func TestUnmarshalNilURL(t *testing.T) { + b := []byte(`null`) + + { + var u URL + err := json.Unmarshal(b, &u) + require.Error(t, err, "unsupported scheme \"\" for URL") + require.Nil(t, nil, u.URL) + } + + { + var u URL + err := yaml.Unmarshal(b, &u) + require.NoError(t, err) + require.Nil(t, nil, u.URL) // UnmarshalYAML is not even called when unmarshalling "null". + } +} + +func TestUnmarshalEmptyURL(t *testing.T) { + b := []byte(`""`) + + { + var u URL + err := json.Unmarshal(b, &u) + require.Error(t, err, "unsupported scheme \"\" for URL") + require.Equal(t, (*url.URL)(nil), u.URL) + } + + { + var u URL + err := yaml.Unmarshal(b, &u) + require.Error(t, err, "unsupported scheme \"\" for URL") + require.Equal(t, (*url.URL)(nil), u.URL) } - require.Equal(t, "http://example.com/\n", string(c), "URL not properly marshaled in YAML.") } func TestUnmarshalURL(t *testing.T) { @@ -612,6 +664,70 @@ func TestUnmarshalRelativeURL(t *testing.T) { } } +func TestMarshalRegexpWithNilValue(t *testing.T) { + r := &Regexp{} + + out, err := json.Marshal(r) + require.NoError(t, err) + require.Equal(t, "null", string(out)) + + out, err = yaml.Marshal(r) + require.NoError(t, err) + require.Equal(t, "null\n", string(out)) +} + +func TestUnmarshalEmptyRegexp(t *testing.T) { + b := []byte(`""`) + + { + var re Regexp + err := json.Unmarshal(b, &re) + require.NoError(t, err) + require.Equal(t, regexp.MustCompile("^(?:)$"), re.Regexp) + require.Equal(t, "", re.original) + } + + { + var re Regexp + err := yaml.Unmarshal(b, &re) + require.NoError(t, err) + require.Equal(t, regexp.MustCompile("^(?:)$"), re.Regexp) + require.Equal(t, "", re.original) + } +} + +func TestUnmarshalNullRegexp(t *testing.T) { + input := []byte(`null`) + + { + var re Regexp + err := json.Unmarshal(input, &re) + require.NoError(t, err) + require.Nil(t, nil, re.Regexp) + require.Equal(t, "", re.original) + } + + { + var re Regexp + err := yaml.Unmarshal(input, &re) // Interestingly enough, unmarshalling `null` in YAML doesn't even call UnmarshalYAML. + require.NoError(t, err) + require.Nil(t, re.Regexp) + require.Equal(t, "", re.original) + } +} + +func TestMarshalEmptyMatchers(t *testing.T) { + r := Matchers{} + + out, err := json.Marshal(r) + require.NoError(t, err) + require.Equal(t, "[]", string(out)) + + out, err = yaml.Marshal(r) + require.NoError(t, err) + require.Equal(t, "[]\n", string(out)) +} + func TestJSONUnmarshal(t *testing.T) { c, err := LoadFile("testdata/conf.good.yml") if err != nil { diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index a3357857..a31b869b 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -33,12 +33,13 @@ import ( // DispatcherMetrics represents metrics associated to a dispatcher. type DispatcherMetrics struct { - aggrGroups prometheus.Gauge - processingDuration prometheus.Summary + aggrGroups prometheus.Gauge + processingDuration prometheus.Summary + aggrGroupLimitReached prometheus.Counter } // NewDispatcherMetrics returns a new registered DispatchMetrics. -func NewDispatcherMetrics(r prometheus.Registerer) *DispatcherMetrics { +func NewDispatcherMetrics(registerLimitMetrics bool, r prometheus.Registerer) *DispatcherMetrics { m := DispatcherMetrics{ aggrGroups: prometheus.NewGauge( prometheus.GaugeOpts{ @@ -52,10 +53,19 @@ func NewDispatcherMetrics(r prometheus.Registerer) *DispatcherMetrics { Help: "Summary of latencies for the processing of alerts.", }, ), + aggrGroupLimitReached: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "alertmanager_dispatcher_aggregation_group_limit_reached_total", + Help: "Number of times when dispatcher failed to create new aggregation group due to limit.", + }, + ), } if r != nil { r.MustRegister(m.aggrGroups, m.processingDuration) + if registerLimitMetrics { + r.MustRegister(m.aggrGroupLimitReached) + } } return &m @@ -68,12 +78,14 @@ type Dispatcher struct { alerts provider.Alerts stage notify.Stage metrics *DispatcherMetrics + limits Limits marker types.Marker timeout func(time.Duration) time.Duration - aggrGroups map[*Route]map[model.Fingerprint]*aggrGroup - mtx sync.RWMutex + mtx sync.RWMutex + aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup + aggrGroupsNum int done chan struct{} ctx context.Context @@ -82,6 +94,14 @@ type Dispatcher struct { logger log.Logger } +// Limits describes limits used by Dispatcher. +type Limits interface { + // MaxNumberOfAggregationGroups returns max number of aggregation groups that dispatcher can have. + // 0 or negative value = unlimited. + // If dispatcher hits this limit, it will not create additional groups, but will log an error instead. + MaxNumberOfAggregationGroups() int +} + // NewDispatcher returns a new Dispatcher. func NewDispatcher( ap provider.Alerts, @@ -89,9 +109,14 @@ func NewDispatcher( s notify.Stage, mk types.Marker, to func(time.Duration) time.Duration, + lim Limits, l log.Logger, m *DispatcherMetrics, ) *Dispatcher { + if lim == nil { + lim = nilLimits{} + } + disp := &Dispatcher{ alerts: ap, stage: s, @@ -100,6 +125,7 @@ func NewDispatcher( timeout: to, logger: log.With(l, "component", "dispatcher"), metrics: m, + limits: lim, } return disp } @@ -109,7 +135,8 @@ func (d *Dispatcher) Run() { d.done = make(chan struct{}) d.mtx.Lock() - d.aggrGroups = map[*Route]map[model.Fingerprint]*aggrGroup{} + d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{} + d.aggrGroupsNum = 0 d.metrics.aggrGroups.Set(0) d.ctx, d.cancel = context.WithCancel(context.Background()) d.mtx.Unlock() @@ -152,11 +179,12 @@ func (d *Dispatcher) run(it provider.AlertIterator) { case <-cleanup.C: d.mtx.Lock() - for _, groups := range d.aggrGroups { + for _, groups := range d.aggrGroupsPerRoute { for _, ag := range groups { if ag.empty() { ag.stop() delete(groups, ag.fingerprint()) + d.aggrGroupsNum-- d.metrics.aggrGroups.Dec() } } @@ -201,7 +229,7 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ receivers := map[model.Fingerprint][]string{} now := time.Now() - for route, ags := range d.aggrGroups { + for route, ags := range d.aggrGroupsPerRoute { if !routeFilter(route) { continue } @@ -284,21 +312,28 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { d.mtx.Lock() defer d.mtx.Unlock() - group, ok := d.aggrGroups[route] + routeGroups, ok := d.aggrGroupsPerRoute[route] if !ok { - group = map[model.Fingerprint]*aggrGroup{} - d.aggrGroups[route] = group + routeGroups = map[model.Fingerprint]*aggrGroup{} + d.aggrGroupsPerRoute[route] = routeGroups } - ag, ok := group[fp] + ag, ok := routeGroups[fp] if ok { ag.insert(alert) return } - // If the group does not exist, create it. + // If the group does not exist, create it. But check the limit first. + if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit { + d.metrics.aggrGroupLimitReached.Inc() + level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + return + } + ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) - group[fp] = ag + routeGroups[fp] = ag + d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() // Insert the 1st alert in the group before starting the group's run() @@ -499,3 +534,7 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { } } } + +type nilLimits struct{} + +func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 45662970..efacda1b 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -24,6 +24,7 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -365,7 +366,7 @@ route: logger := log.NewNopLogger() route := NewRoute(conf.Route, nil) marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger) if err != nil { t.Fatal(err) } @@ -373,7 +374,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) go dispatcher.Run() defer dispatcher.Stop() @@ -411,65 +412,162 @@ route: &AlertGroup{ Alerts: []*types.Alert{inputAlerts[0]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("OtherAlert"), + "alertname": "OtherAlert", }, Receiver: "prod", }, &AlertGroup{ Alerts: []*types.Alert{inputAlerts[1]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("TestingAlert"), - model.LabelName("service"): model.LabelValue("api"), + "alertname": "TestingAlert", + "service": "api", }, Receiver: "testing", }, &AlertGroup{ Alerts: []*types.Alert{inputAlerts[2], inputAlerts[3]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("HighErrorRate"), - model.LabelName("service"): model.LabelValue("api"), - model.LabelName("cluster"): model.LabelValue("aa"), + "alertname": "HighErrorRate", + "service": "api", + "cluster": "aa", }, Receiver: "prod", }, &AlertGroup{ Alerts: []*types.Alert{inputAlerts[4]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("HighErrorRate"), - model.LabelName("service"): model.LabelValue("api"), - model.LabelName("cluster"): model.LabelValue("bb"), + "alertname": "HighErrorRate", + "service": "api", + "cluster": "bb", }, Receiver: "prod", }, &AlertGroup{ Alerts: []*types.Alert{inputAlerts[5]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("HighLatency"), - model.LabelName("service"): model.LabelValue("db"), - model.LabelName("cluster"): model.LabelValue("bb"), + "alertname": "HighLatency", + "service": "db", + "cluster": "bb", }, Receiver: "kafka", }, &AlertGroup{ Alerts: []*types.Alert{inputAlerts[5]}, Labels: model.LabelSet{ - model.LabelName("alertname"): model.LabelValue("HighLatency"), - model.LabelName("service"): model.LabelValue("db"), - model.LabelName("cluster"): model.LabelValue("bb"), + "alertname": "HighLatency", + "service": "db", + "cluster": "bb", }, Receiver: "prod", }, }, alertGroups) require.Equal(t, map[model.Fingerprint][]string{ - inputAlerts[0].Fingerprint(): []string{"prod"}, - inputAlerts[1].Fingerprint(): []string{"testing"}, - inputAlerts[2].Fingerprint(): []string{"prod"}, - inputAlerts[3].Fingerprint(): []string{"prod"}, - inputAlerts[4].Fingerprint(): []string{"prod"}, - inputAlerts[5].Fingerprint(): []string{"kafka", "prod"}, + inputAlerts[0].Fingerprint(): {"prod"}, + inputAlerts[1].Fingerprint(): {"testing"}, + inputAlerts[2].Fingerprint(): {"prod"}, + inputAlerts[3].Fingerprint(): {"prod"}, + inputAlerts[4].Fingerprint(): {"prod"}, + inputAlerts[5].Fingerprint(): {"kafka", "prod"}, }, receivers) } +func TestGroupsWithLimits(t *testing.T) { + confData := `receivers: +- name: 'kafka' +- name: 'prod' +- name: 'testing' + +route: + group_by: ['alertname'] + group_wait: 10ms + group_interval: 10ms + receiver: 'prod' + routes: + - match: + env: 'testing' + receiver: 'testing' + group_by: ['alertname', 'service'] + - match: + env: 'prod' + receiver: 'prod' + group_by: ['alertname', 'service', 'cluster'] + continue: true + - match: + kafka: 'yes' + receiver: 'kafka' + group_by: ['alertname', 'service', 'cluster']` + conf, err := config.Load(confData) + if err != nil { + t.Fatal(err) + } + + logger := log.NewNopLogger() + route := NewRoute(conf.Route, nil) + marker := types.NewMarker(prometheus.NewRegistry()) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + timeout := func(d time.Duration) time.Duration { return time.Duration(0) } + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + lim := limits{groups: 6} + m := NewDispatcherMetrics(true, prometheus.NewRegistry()) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m) + go dispatcher.Run() + defer dispatcher.Stop() + + // Create alerts. the dispatcher will automatically create the groups. + inputAlerts := []*types.Alert{ + // Matches the parent route. + newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}), + // Matches the first sub-route. + newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api", "instance": "inst1"}), + // Matches the second sub-route. + newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst1"}), + newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst2"}), + // Matches the second sub-route. + newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api", "instance": "inst1"}), + // Matches the second and third sub-route. + newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}), + } + err = alerts.Put(inputAlerts...) + if err != nil { + t.Fatal(err) + } + + // Let alerts get processed. + for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + require.Equal(t, 7, len(recorder.Alerts())) + + routeFilter := func(*Route) bool { return true } + alertFilter := func(*types.Alert, time.Time) bool { return true } + + alertGroups, _ := dispatcher.Groups(routeFilter, alertFilter) + require.Len(t, alertGroups, 6) + + require.Equal(t, 0.0, testutil.ToFloat64(m.aggrGroupLimitReached)) + + // Try to store new alert. This time, we will hit limit for number of groups. + err = alerts.Put(newAlert(model.LabelSet{"env": "prod", "alertname": "NewAlert", "cluster": "new-cluster", "service": "db"})) + if err != nil { + t.Fatal(err) + } + + // Let alert get processed. + for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + require.Equal(t, 1.0, testutil.ToFloat64(m.aggrGroupLimitReached)) + + // Verify there are still only 6 groups. + alertGroups, _ = dispatcher.Groups(routeFilter, alertFilter) + require.Len(t, alertGroups, 6) +} + type recordStage struct { mtx sync.RWMutex alerts map[string]map[model.Fingerprint]*types.Alert @@ -527,24 +625,24 @@ func newAlert(labels model.LabelSet) *types.Alert { func TestDispatcherRace(t *testing.T) { logger := log.NewNopLogger() marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger) if err != nil { t.Fatal(err) } defer alerts.Close() timeout := func(d time.Duration) time.Duration { return time.Duration(0) } - dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) go dispatcher.Run() dispatcher.Stop() } func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) { - const numAlerts = 8000 + const numAlerts = 5000 logger := log.NewNopLogger() marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger) if err != nil { t.Fatal(err) } @@ -562,7 +660,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) timeout := func(d time.Duration) time.Duration { return d } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) go dispatcher.Run() defer dispatcher.Stop() @@ -585,3 +683,11 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) // We expect all alerts to be notified immediately, since they all belong to different groups. require.Equal(t, numAlerts, len(recorder.Alerts())) } + +type limits struct { + groups int +} + +func (l limits) MaxNumberOfAggregationGroups() int { + return l.groups +} diff --git a/doc/alertmanager-mixin/.gitignore b/doc/alertmanager-mixin/.gitignore new file mode 100644 index 00000000..09527a31 --- /dev/null +++ b/doc/alertmanager-mixin/.gitignore @@ -0,0 +1,2 @@ +vendor +dashboards_out diff --git a/doc/alertmanager-mixin/Makefile b/doc/alertmanager-mixin/Makefile index 360ddf2d..314e4b38 100644 --- a/doc/alertmanager-mixin/Makefile +++ b/doc/alertmanager-mixin/Makefile @@ -1,10 +1,13 @@ -JSONNET_FMT := jsonnetfmt -n 2 --max-blank-lines 2 --string-style s --comment-style s +JSONNET_FMT := jsonnetfmt -n 2 --max-blank-lines 1 --string-style s --comment-style s ALERTMANAGER_ALERTS := alertmanager_alerts.yaml -default: build +default: vendor build dashboards_out all: fmt build +vendor: + jb install + fmt: find . -name 'vendor' -prune -o -name '*.libsonnet' -print -o -name '*.jsonnet' -print | \ xargs -n 1 -- $(JSONNET_FMT) -i @@ -17,7 +20,12 @@ lint: build mixtool lint mixin.libsonnet -build: + +dashboards_out: mixin.libsonnet config.libsonnet $(wildcard dashboards/*) + @mkdir -p dashboards_out + jsonnet -J vendor -m dashboards_out dashboards.jsonnet + +build: vendor mixtool generate alerts mixin.libsonnet > $(ALERTMANAGER_ALERTS) clean: diff --git a/doc/alertmanager-mixin/config.libsonnet b/doc/alertmanager-mixin/config.libsonnet index a9d8b558..d0a28923 100644 --- a/doc/alertmanager-mixin/config.libsonnet +++ b/doc/alertmanager-mixin/config.libsonnet @@ -1,5 +1,6 @@ { _config+:: { + local c = self, // alertmanagerSelector is inserted as part of the label selector in // PromQL queries to identify metrics collected from Alertmanager // servers. @@ -12,13 +13,17 @@ // to keep for resulting cluster-level alerts. alertmanagerClusterLabels: 'job', - // alertmanagerName is inserted into annotations to name the Alertmanager - // instance affected by the alert. - alertmanagerName: '{{$labels.instance}}', + // alertmanagerNameLabels is a string with comma-separated + // labels used to identify different alertmanagers within the same + // Alertmanager HA cluster. // If you run Alertmanager on Kubernetes with the Prometheus // Operator, you can make use of the configured target labels for // nicer naming: - // alertmanagerName: '{{$labels.namespace}}/{{$labels.pod}}' + // alertmanagerNameLabels: 'namespace,pod' + alertmanagerNameLabels: 'instance', + + // alertmanagerName is an identifier for alerts. By default, it is built from 'alertmanagerNameLabels'. + alertmanagerName: std.join('/', ['{{$labels.%s}}' % [label] for label in std.split(c.alertmanagerNameLabels, ',')]), // alertmanagerClusterName is inserted into annotations to name an // Alertmanager cluster. All labels used here must also be present @@ -32,5 +37,8 @@ // integration that is itself not used for critical alerts. // Example: @'pagerduty|webhook' alertmanagerCriticalIntegrationsRegEx: @'.*', + + dashboardNamePrefix: 'Alertmanager / ', + dashboardTags: ['alertmanager-mixin'], }, } diff --git a/doc/alertmanager-mixin/dashboards.jsonnet b/doc/alertmanager-mixin/dashboards.jsonnet new file mode 100644 index 00000000..9d913ed3 --- /dev/null +++ b/doc/alertmanager-mixin/dashboards.jsonnet @@ -0,0 +1,6 @@ +local dashboards = (import 'mixin.libsonnet').grafanaDashboards; + +{ + [name]: dashboards[name] + for name in std.objectFields(dashboards) +} diff --git a/doc/alertmanager-mixin/dashboards.libsonnet b/doc/alertmanager-mixin/dashboards.libsonnet new file mode 100644 index 00000000..e4cc41ca --- /dev/null +++ b/doc/alertmanager-mixin/dashboards.libsonnet @@ -0,0 +1 @@ +(import './dashboards/overview.libsonnet') diff --git a/doc/alertmanager-mixin/dashboards/overview.libsonnet b/doc/alertmanager-mixin/dashboards/overview.libsonnet new file mode 100644 index 00000000..fe98d637 --- /dev/null +++ b/doc/alertmanager-mixin/dashboards/overview.libsonnet @@ -0,0 +1,149 @@ +local grafana = import 'github.com/grafana/grafonnet-lib/grafonnet/grafana.libsonnet'; +local dashboard = grafana.dashboard; +local row = grafana.row; +local prometheus = grafana.prometheus; +local template = grafana.template; +local graphPanel = grafana.graphPanel; + +{ + grafanaDashboards+:: { + + local amQuerySelector = std.join(',', ['%s="$%s"' % [label, label] for label in std.split($._config.alertmanagerClusterLabels, ',')]), + local amNameDashboardLegend = std.join('/', ['{{%s}}' % [label] for label in std.split($._config.alertmanagerNameLabels, ',')]), + + local alertmanagerClusterSelectorTemplates = + [ + template.new( + name=label, + datasource='$datasource', + query='label_values(alertmanager_alerts, %s)' % label, + current='', + refresh=2, + includeAll=false, + sort=1 + ) + for label in std.split($._config.alertmanagerClusterLabels, ',') + ], + + local integrationTemplate = + template.new( + name='integration', + datasource='$datasource', + query='label_values(alertmanager_notifications_total{integration=~"%s"}, integration)' % $._config.alertmanagerCriticalIntegrationsRegEx, + current='all', + hide='2', // Always hide + refresh=2, + includeAll=true, + sort=1 + ), + + 'alertmanager-overview.json': + local alerts = + graphPanel.new( + 'Alerts', + datasource='$datasource', + span=6, + format='none', + stack=true, + fill=1, + legend_show=false, + ) + .addTarget(prometheus.target('sum(alertmanager_alerts{%(amQuerySelector)s}) by (%(alertmanagerClusterLabels)s,%(alertmanagerNameLabels)s)' % $._config { amQuerySelector: amQuerySelector }, legendFormat='%(amNameDashboardLegend)s' % $._config { amNameDashboardLegend: amNameDashboardLegend })); + + local alertsRate = + graphPanel.new( + 'Alerts receive rate', + datasource='$datasource', + span=6, + format='ops', + stack=true, + fill=1, + legend_show=false, + ) + .addTarget(prometheus.target('sum(rate(alertmanager_alerts_received_total{%(amQuerySelector)s}[5m])) by (%(alertmanagerClusterLabels)s,%(alertmanagerNameLabels)s)' % $._config { amQuerySelector: amQuerySelector }, legendFormat='%(amNameDashboardLegend)s Received' % $._config { amNameDashboardLegend: amNameDashboardLegend })) + .addTarget(prometheus.target('sum(rate(alertmanager_alerts_invalid_total{%(amQuerySelector)s}[5m])) by (%(alertmanagerClusterLabels)s,%(alertmanagerNameLabels)s)' % $._config { amQuerySelector: amQuerySelector }, legendFormat='%(amNameDashboardLegend)s Invalid' % $._config { amNameDashboardLegend: amNameDashboardLegend })); + + local notifications = + graphPanel.new( + '$integration: Notifications Send Rate', + datasource='$datasource', + format='ops', + stack=true, + fill=1, + legend_show=false, + repeat='integration' + ) + .addTarget(prometheus.target('sum(rate(alertmanager_notifications_total{%(amQuerySelector)s, integration="$integration"}[5m])) by (integration,%(alertmanagerClusterLabels)s,%(alertmanagerNameLabels)s)' % $._config { amQuerySelector: amQuerySelector }, legendFormat='%(amNameDashboardLegend)s Total' % $._config { amNameDashboardLegend: amNameDashboardLegend })) + .addTarget(prometheus.target('sum(rate(alertmanager_notifications_failed_total{%(amQuerySelector)s, integration="$integration"}[5m])) by (integration,%(alertmanagerClusterLabels)s,%(alertmanagerNameLabels)s)' % $._config { amQuerySelector: amQuerySelector }, legendFormat='%(amNameDashboardLegend)s Failed' % $._config { amNameDashboardLegend: amNameDashboardLegend })); + + local notificationDuration = + graphPanel.new( + '$integration: Notification Duration', + datasource='$datasource', + format='s', + stack=false, + fill=1, + legend_show=false, + repeat='integration' + ) + .addTarget(prometheus.target( + ||| + histogram_quantile(0.99, + sum(rate(alertmanager_notification_latency_seconds_bucket{%(amQuerySelector)s, integration="$integration"}[5m])) by (le,%(alertmanagerClusterLabels)s,%(alertmanagerNameLabels)s) + ) + ||| % $._config { amQuerySelector: amQuerySelector }, legendFormat='%(amNameDashboardLegend)s 99th Percentile' % $._config { amNameDashboardLegend: amNameDashboardLegend } + )) + .addTarget(prometheus.target( + ||| + histogram_quantile(0.50, + sum(rate(alertmanager_notification_latency_seconds_bucket{%(amQuerySelector)s, integration="$integration"}[5m])) by (le,%(alertmanagerClusterLabels)s,%(alertmanagerNameLabels)s) + ) + ||| % $._config { amQuerySelector: amQuerySelector }, legendFormat='%(amNameDashboardLegend)s Median' % $._config { amNameDashboardLegend: amNameDashboardLegend } + )) + .addTarget(prometheus.target( + ||| + sum(rate(alertmanager_notification_latency_seconds_sum{%(amQuerySelector)s, integration="$integration"}[5m])) by (%(alertmanagerClusterLabels)s,%(alertmanagerNameLabels)s) + / + sum(rate(alertmanager_notification_latency_seconds_count{%(amQuerySelector)s, integration="$integration"}[5m])) by (%(alertmanagerClusterLabels)s,%(alertmanagerNameLabels)s) + ||| % $._config { amQuerySelector: amQuerySelector }, legendFormat='%(amNameDashboardLegend)s Average' % $._config { amNameDashboardLegend: amNameDashboardLegend } + )); + + dashboard.new( + '%sOverview' % $._config.dashboardNamePrefix, + time_from='now-1h', + tags=($._config.dashboardTags), + timezone='utc', + refresh='30s', + graphTooltip='shared_crosshair', + uid='alertmanager-overview' + ) + .addTemplate( + { + current: { + text: 'Prometheus', + value: 'Prometheus', + }, + hide: 0, + label: null, + name: 'datasource', + options: [], + query: 'prometheus', + refresh: 1, + regex: '', + type: 'datasource', + }, + ) + .addTemplates(alertmanagerClusterSelectorTemplates) + .addTemplate(integrationTemplate) + .addRow( + row.new('Alerts') + .addPanel(alerts) + .addPanel(alertsRate) + ) + .addRow( + row.new('Notifications') + .addPanel(notifications) + .addPanel(notificationDuration) + ), + }, +} diff --git a/doc/alertmanager-mixin/jsonnetfile.json b/doc/alertmanager-mixin/jsonnetfile.json new file mode 100644 index 00000000..650733a0 --- /dev/null +++ b/doc/alertmanager-mixin/jsonnetfile.json @@ -0,0 +1,15 @@ +{ + "version": 1, + "dependencies": [ + { + "source": { + "git": { + "remote": "https://github.com/grafana/grafonnet-lib.git", + "subdir": "grafonnet" + } + }, + "version": "master" + } + ], + "legacyImports": false +} diff --git a/doc/alertmanager-mixin/jsonnetfile.lock.json b/doc/alertmanager-mixin/jsonnetfile.lock.json new file mode 100644 index 00000000..803febc8 --- /dev/null +++ b/doc/alertmanager-mixin/jsonnetfile.lock.json @@ -0,0 +1,16 @@ +{ + "version": 1, + "dependencies": [ + { + "source": { + "git": { + "remote": "https://github.com/grafana/grafonnet-lib.git", + "subdir": "grafonnet" + } + }, + "version": "55cf4ee53ced2b6d3ce96ecce9fb813b4465be98", + "sum": "4/sUV0Kk+o8I+wlYxL9R6EPhL/NiLfYHk+NXlU64RUk=" + } + ], + "legacyImports": false +} diff --git a/doc/alertmanager-mixin/mixin.libsonnet b/doc/alertmanager-mixin/mixin.libsonnet index 95efe331..22db15c9 100644 --- a/doc/alertmanager-mixin/mixin.libsonnet +++ b/doc/alertmanager-mixin/mixin.libsonnet @@ -1,2 +1,3 @@ (import 'config.libsonnet') + -(import 'alerts.libsonnet') +(import 'alerts.libsonnet') + +(import 'dashboards.libsonnet') diff --git a/doc/examples/simple.yml b/doc/examples/simple.yml index 02063546..1cd24b9c 100644 --- a/doc/examples/simple.yml +++ b/doc/examples/simple.yml @@ -47,39 +47,39 @@ route: routes: # This routes performs a regular expression match on alert labels to # catch alerts that are related to a list of services. - - match_re: - service: ^(foo1|foo2|baz)$ + - matchers: + - service=~"foo1|foo2|baz" receiver: team-X-mails # The service has a sub-route for critical alerts, any alerts # that do not match, i.e. severity != critical, fall-back to the # parent node and are sent to 'team-X-mails' routes: - - match: - severity: critical + - matchers: + - severity="critical" receiver: team-X-pager - - match: - service: files + - matchers: + - service="files" receiver: team-Y-mails routes: - - match: - severity: critical + - matchers: + - severity="critical" receiver: team-Y-pager # This route handles all alerts coming from a database service. If there's # no team to handle it, it defaults to the DB team. - - match: - service: database + - matchers: + - service="database" receiver: team-DB-pager # Also group alerts by affected database. group_by: [alertname, cluster, database] routes: - - match: - owner: team-X + - matchers: + - owner="team-X" receiver: team-X-pager continue: true - - match: - owner: team-Y + - matchers: + - owner="team-Y" receiver: team-Y-pager @@ -88,16 +88,14 @@ route: # We use this to mute any warning-level notifications if the same alert is # already critical. inhibit_rules: -- source_match: - severity: 'critical' - target_match: - severity: 'warning' +- source_matchers: [ severity="critical" ] + target_matchers: [ severity="warning" ] # Apply inhibition if the alertname is the same. # CAUTION: # If all label names listed in `equal` are missing # from both the source and target alerts, # the inhibition rule will apply! - equal: ['alertname', 'cluster', 'service'] + equal: [ alertname, cluster, service ] receivers: diff --git a/docs/configuration.md b/docs/configuration.md index b5b76240..fcf932a5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -211,15 +211,15 @@ route: # are dispatched to the database pager. - receiver: 'database-pager' group_wait: 10s - match_re: - service: mysql|cassandra + matchers: + - service=~"mysql|cassandra" # All alerts with the team=frontend label match this sub-route. # They are grouped by product and environment rather than cluster # and alertname. - receiver: 'frontend-pager' group_by: [product, environment] - match: - team: frontend + matchers: + - team="frontend" ``` ## `` @@ -264,7 +264,7 @@ immediately before 24:00. They are specified like so: - start_time: HH:MM end_time: HH:MM -`weeekday_range`: A list of days of the week, where the week begins on Sunday and ends on Saturday. +`weekday_range`: A list of days of the week, where the week begins on Sunday and ends on Saturday. Days should be specified by name (e.g. ‘Sunday’). For convenience, ranges are also accepted of the form : and are inclusive on both ends. For example: `[‘monday:wednesday','saturday', 'sunday']` @@ -326,7 +326,7 @@ source_match_re: # A list of matchers for which one or more alerts have # to exist for the inhibition to take effect. source_matchers: - [ - ... ] + [ - ... ] # Labels that must have an equal value in the source and target # alert for the inhibition to take effect. @@ -583,6 +583,8 @@ Pushover notifications are sent via the [Pushover API](https://pushover.net/api) user_key: # Your registered application’s API token, see https://pushover.net/apps +# You can also register a token by cloning this Prometheus app: +# https://pushover.net/apps/clone/prometheus token: # Notification title. @@ -760,12 +762,12 @@ The 3rd token may be the empty string. Within the 3rd token, OpenMetrics escapin In the configuration, multiple matchers are combined in a YAML list. However, it is also possible to combine multiple matchers within a single YAML string, again using syntax inspired by PromQL. In such a string, a leading `{` and/or a trailing `}` is optional and will be trimmed before further parsing. Individual matchers are separated by commas outside of quoted parts of the string. Those commas may be surrounded by whitespace. Parts of the string inside unescaped double quotes `"…"` are considered quoted (and commas don't act as separators there). If double quotes are escaped with a single backslash `\`, they are ignored for the purpose of identifying quoted parts of the input string. If the input string, after trimming the optional trailing `}`, ends with a comma, followed by optional whitespace, this comma and whitespace will be trimmed. -Here are some examples of valid string matchers : +Here are some examples of valid string matchers: 1. Shown below are two equality matchers combined in a long form YAML list. ```yaml - matchers : + matchers: - foo = bar - dings !=bums ``` @@ -967,4 +969,4 @@ API](http://admin.wechat.com/wiki/index.php?title=Customer_Service_Messages). [ to_user: | default = '{{ template "wechat.default.to_user" . }}' ] [ to_party: | default = '{{ template "wechat.default.to_party" . }}' ] [ to_tag: | default = '{{ template "wechat.default.to_tag" . }}' ] -``` \ No newline at end of file +``` diff --git a/docs/https.md b/docs/https.md index e4749010..7493652d 100644 --- a/docs/https.md +++ b/docs/https.md @@ -8,7 +8,7 @@ sort_rank: 11 Alertmanager supports basic authentication and TLS. This is **experimental** and might change in the future. -Currentlu TLS is only supported for the HTTP traffic. Gossip traffic does not +Currently TLS is only supported for the HTTP traffic. Gossip traffic does not support encryption yet. To specify which web configuration file to load, use the `--web.config.file` flag. diff --git a/go.mod b/go.mod index 77fcbbbd..5b1e95b6 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 github.com/stretchr/testify v1.7.0 github.com/xlab/treeprint v1.1.0 + go.uber.org/atomic v1.5.0 golang.org/x/net v0.0.0-20210421230115-4e50805a0758 golang.org/x/tools v0.1.0 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/go.sum b/go.sum index c1dd7fbd..604168ac 100644 --- a/go.sum +++ b/go.sum @@ -537,6 +537,7 @@ go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= @@ -564,6 +565,7 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= diff --git a/provider/mem/mem.go b/provider/mem/mem.go index 575ad0d2..d45ce86d 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -38,16 +38,36 @@ type Alerts struct { listeners map[int]listeningAlerts next int + callback AlertStoreCallback + logger log.Logger } +type AlertStoreCallback interface { + // PreStore is called before alert is stored into the store. If this method returns error, + // alert is not stored. + // Existing flag indicates whether alert has existed before (and is only updated) or not. + // If alert has existed before, then alert passed to PreStore is result of merging existing alert with new alert. + PreStore(alert *types.Alert, existing bool) error + + // PostStore is called after alert has been put into store. + PostStore(alert *types.Alert, existing bool) + + // PostDelete is called after alert has been removed from the store due to alert garbage collection. + PostDelete(alert *types.Alert) +} + type listeningAlerts struct { alerts chan *types.Alert done chan struct{} } // NewAlerts returns a new alert provider. -func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l log.Logger) (*Alerts, error) { +func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, alertCallback AlertStoreCallback, l log.Logger) (*Alerts, error) { + if alertCallback == nil { + alertCallback = noopCallback{} + } + ctx, cancel := context.WithCancel(ctx) a := &Alerts{ alerts: store.NewAlerts(), @@ -55,6 +75,7 @@ func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l listeners: map[int]listeningAlerts{}, next: 0, logger: log.With(l, "component", "provider"), + callback: alertCallback, } a.alerts.SetGCCallback(func(alerts []*types.Alert) { for _, alert := range alerts { @@ -62,6 +83,7 @@ func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l // they are resolved. Alerts waiting for resolved notifications are // held in memory in aggregation groups redundantly. m.Delete(alert.Fingerprint()) + a.callback.PostDelete(alert) } a.mtx.Lock() @@ -148,13 +170,16 @@ func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { // Put adds the given alert to the set. func (a *Alerts) Put(alerts ...*types.Alert) error { - for _, alert := range alerts { fp := alert.Fingerprint() + existing := false + // Check that there's an alert existing within the store before // trying to merge. if old, err := a.alerts.Get(fp); err == nil { + existing = true + // Merge alerts if there is an overlap in activity range. if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) || (alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) { @@ -162,11 +187,18 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { } } + if err := a.callback.PreStore(alert, existing); err != nil { + level.Error(a.logger).Log("msg", "pre-store callback returned error on set alert", "err", err) + continue + } + if err := a.alerts.Set(alert); err != nil { level.Error(a.logger).Log("msg", "error on set alert", "err", err) continue } + a.callback.PostStore(alert, existing) + a.mtx.Lock() for _, l := range a.listeners { select { @@ -179,3 +211,9 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { return nil } + +type noopCallback struct{} + +func (n noopCallback) PreStore(_ *types.Alert, _ bool) error { return nil } +func (n noopCallback) PostStore(_ *types.Alert, _ bool) {} +func (n noopCallback) PostDelete(_ *types.Alert) {} diff --git a/provider/mem/mem_test.go b/provider/mem/mem_test.go index fa8a1ee8..3e24a56d 100644 --- a/provider/mem/mem_test.go +++ b/provider/mem/mem_test.go @@ -25,11 +25,13 @@ import ( "github.com/go-kit/kit/log" "github.com/kylelemons/godebug/pretty" - "github.com/prometheus/alertmanager/store" - "github.com/prometheus/alertmanager/types" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/prometheus/alertmanager/store" + "github.com/prometheus/alertmanager/types" ) var ( @@ -85,7 +87,7 @@ func init() { // a listener can not unsubscribe as the lock is hold by `alerts.Lock`. func TestAlertsSubscribePutStarvation(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger()) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger()) if err != nil { t.Fatal(err) } @@ -136,7 +138,7 @@ func TestAlertsSubscribePutStarvation(t *testing.T) { func TestAlertsPut(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger()) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger()) if err != nil { t.Fatal(err) } @@ -164,7 +166,7 @@ func TestAlertsSubscribe(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - alerts, err := NewAlerts(ctx, marker, 30*time.Minute, log.NewNopLogger()) + alerts, err := NewAlerts(ctx, marker, 30*time.Minute, noopCallback{}, log.NewNopLogger()) if err != nil { t.Fatal(err) } @@ -241,7 +243,7 @@ func TestAlertsSubscribe(t *testing.T) { func TestAlertsGetPending(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger()) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger()) if err != nil { t.Fatal(err) } @@ -284,7 +286,7 @@ func TestAlertsGetPending(t *testing.T) { func TestAlertsGC(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, log.NewNopLogger()) + alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, noopCallback{}, log.NewNopLogger()) if err != nil { t.Fatal(err) } @@ -316,6 +318,71 @@ func TestAlertsGC(t *testing.T) { } } +func TestAlertsStoreCallback(t *testing.T) { + cb := &limitCountCallback{limit: 3} + + marker := types.NewMarker(prometheus.NewRegistry()) + alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, cb, log.NewNopLogger()) + if err != nil { + t.Fatal(err) + } + + err = alerts.Put(alert1, alert2, alert3) + if err != nil { + t.Fatal(err) + } + if num := cb.alerts.Load(); num != 3 { + t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 3, num) + } + + alert1Mod := *alert1 + alert1Mod.Annotations = model.LabelSet{"foo": "bar", "new": "test"} // Update annotations for alert1 + + alert4 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"bar4": "foo4"}, + Annotations: model.LabelSet{"foo4": "bar4"}, + StartsAt: t0, + EndsAt: t1, + GeneratorURL: "http://example.com/prometheus", + }, + UpdatedAt: t0, + Timeout: false, + } + + err = alerts.Put(&alert1Mod, alert4) + // Verify that we failed to put new alert into store (not reported via error, only checked using Load) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + if num := cb.alerts.Load(); num != 3 { + t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 3, num) + } + + // But we still managed to update alert1, since callback doesn't report error when updating existing alert. + a, err := alerts.Get(alert1.Fingerprint()) + if err != nil { + t.Fatal(err) + } + if !alertsEqual(a, &alert1Mod) { + t.Errorf("Unexpected alert") + t.Fatalf(pretty.Compare(a, &alert1Mod)) + } + + // Now wait until existing alerts are GC-ed, and make sure that callback was called. + time.Sleep(300 * time.Millisecond) + + if num := cb.alerts.Load(); num != 0 { + t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 0, num) + } + + err = alerts.Put(alert4) + if err != nil { + t.Fatal(err) + } +} + func alertsEqual(a1, a2 *types.Alert) bool { if a1 == nil || a2 == nil { return false @@ -340,3 +407,32 @@ func alertsEqual(a1, a2 *types.Alert) bool { } return a1.Timeout == a2.Timeout } + +type limitCountCallback struct { + alerts atomic.Int32 + limit int +} + +var errTooManyAlerts = fmt.Errorf("too many alerts") + +func (l *limitCountCallback) PreStore(_ *types.Alert, existing bool) error { + if existing { + return nil + } + + if int(l.alerts.Load())+1 > l.limit { + return errTooManyAlerts + } + + return nil +} + +func (l *limitCountCallback) PostStore(_ *types.Alert, existing bool) { + if !existing { + l.alerts.Inc() + } +} + +func (l *limitCountCallback) PostDelete(_ *types.Alert) { + l.alerts.Dec() +} diff --git a/provider/provider.go b/provider/provider.go index 1ab5fbe9..91b5a741 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -83,6 +83,6 @@ type Alerts interface { GetPending() AlertIterator // Get returns the alert for a given fingerprint. Get(model.Fingerprint) (*types.Alert, error) - // Put adds the given alert to the set. + // Put adds the given set of alerts to the set. Put(...*types.Alert) error }