From 39d008f94fe44b3b2efe2ac9bbd2a1198e67cdf8 Mon Sep 17 00:00:00 2001 From: "m.nabokikh" Date: Tue, 11 Jul 2023 23:44:23 +0200 Subject: [PATCH] Route different alerts to different alertmanagers Signed-off-by: m.nabokikh --- config/config.go | 8 ++ notifier/notifier.go | 44 +++++++---- notifier/notifier_test.go | 157 +++++++++++++++++++++++++++++--------- 3 files changed, 158 insertions(+), 51 deletions(-) diff --git a/config/config.go b/config/config.go index d32fcc33c..b77bbd691 100644 --- a/config/config.go +++ b/config/config.go @@ -823,6 +823,8 @@ type AlertmanagerConfig struct { // List of Alertmanager relabel configurations. RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` + // Relabel alerts before sending to the specific alertmanager. + AlertRelabelConfigs []*relabel.Config `yaml:"alert_relabel_configs,omitempty"` } // SetDirectory joins any relative file paths with dir. @@ -858,6 +860,12 @@ func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) er } } + for _, rlcfg := range c.AlertRelabelConfigs { + if rlcfg == nil { + return errors.New("empty or null Alertmanager alert relabeling rule") + } + } + return nil } diff --git a/notifier/notifier.go b/notifier/notifier.go index 891372c43..630cbdf1e 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -349,7 +349,7 @@ func (n *Manager) Send(alerts ...*Alert) { n.mtx.Lock() defer n.mtx.Unlock() - alerts = n.relabelAlerts(alerts) + alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts) if len(alerts) == 0 { return } @@ -377,20 +377,21 @@ func (n *Manager) Send(alerts ...*Alert) { n.setMore() } -// Attach external labels and process relabelling rules. -func (n *Manager) relabelAlerts(alerts []*Alert) []*Alert { +func relabelAlerts(relabelConfigs []*relabel.Config, externalLabels labels.Labels, alerts []*Alert) []*Alert { lb := labels.NewBuilder(labels.EmptyLabels()) var relabeledAlerts []*Alert for _, a := range alerts { lb.Reset(a.Labels) - n.opts.ExternalLabels.Range(func(l labels.Label) { - if a.Labels.Get(l.Name) == "" { - lb.Set(l.Name, l.Value) - } - }) + if externalLabels.Len() > 0 { + externalLabels.Range(func(l labels.Label) { + if a.Labels.Get(l.Name) == "" { + lb.Set(l.Name, l.Value) + } + }) + } - keep := relabel.ProcessBuilder(lb, n.opts.RelabelConfigs...) + keep := relabel.ProcessBuilder(lb, relabelConfigs...) if !keep { continue } @@ -472,17 +473,30 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { ) for _, ams := range amSets { var ( - payload []byte - err error + payload []byte + err error + amAlerts = alerts ) ams.mtx.RLock() + if len(ams.cfg.AlertRelabelConfigs) > 0 { + amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts) + // TODO(nabokihms): figure out the right way to cache marshalled alerts. + // Now it works well only for happy cases. + v1Payload = nil + v2Payload = nil + + if len(amAlerts) == 0 { + continue + } + } + switch ams.cfg.APIVersion { case config.AlertmanagerAPIVersionV1: { if v1Payload == nil { - v1Payload, err = json.Marshal(alerts) + v1Payload, err = json.Marshal(amAlerts) if err != nil { level.Error(n.logger).Log("msg", "Encoding alerts for Alertmanager API v1 failed", "err", err) ams.mtx.RUnlock() @@ -495,7 +509,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { case config.AlertmanagerAPIVersionV2: { if v2Payload == nil { - openAPIAlerts := alertsToOpenAPIAlerts(alerts) + openAPIAlerts := alertsToOpenAPIAlerts(amAlerts) v2Payload, err = json.Marshal(openAPIAlerts) if err != nil { @@ -526,13 +540,13 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { go func(client *http.Client, url string) { if err := n.sendOne(ctx, client, url, payload); err != nil { - level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err) + level.Error(n.logger).Log("alertmanager", url, "count", len(amAlerts), "msg", "Error sending alert", "err", err) n.metrics.errors.WithLabelValues(url).Inc() } else { numSuccess.Inc() } n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds()) - n.metrics.sent.WithLabelValues(url).Add(float64(len(alerts))) + n.metrics.sent.WithLabelValues(url).Add(float64(len(amAlerts))) wg.Done() }(ams.client, am.url().String()) diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 66ee45c6e..79a3157d7 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -98,6 +98,41 @@ func alertsEqual(a, b []*Alert) error { return nil } +func newTestHTTPServerBuilder(expected *[]*Alert, errc chan<- error, u, p string, status *atomic.Int32) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + defer func() { + if err == nil { + return + } + select { + case errc <- err: + default: + } + }() + user, pass, _ := r.BasicAuth() + if user != u || pass != p { + err = fmt.Errorf("unexpected user/password: %s/%s != %s/%s", user, pass, u, p) + w.WriteHeader(http.StatusInternalServerError) + return + } + + b, err := io.ReadAll(r.Body) + if err != nil { + err = fmt.Errorf("error reading body: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + var alerts []*Alert + err = json.Unmarshal(b, &alerts) + if err == nil { + err = alertsEqual(*expected, alerts) + } + w.WriteHeader(int(status.Load())) + })) +} + func TestHandlerSendAll(t *testing.T) { var ( errc = make(chan error, 1) @@ -107,42 +142,8 @@ func TestHandlerSendAll(t *testing.T) { status1.Store(int32(http.StatusOK)) status2.Store(int32(http.StatusOK)) - newHTTPServer := func(u, p string, status *atomic.Int32) *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var err error - defer func() { - if err == nil { - return - } - select { - case errc <- err: - default: - } - }() - user, pass, _ := r.BasicAuth() - if user != u || pass != p { - err = fmt.Errorf("unexpected user/password: %s/%s != %s/%s", user, pass, u, p) - w.WriteHeader(http.StatusInternalServerError) - return - } - - b, err := io.ReadAll(r.Body) - if err != nil { - err = fmt.Errorf("error reading body: %w", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - var alerts []*Alert - err = json.Unmarshal(b, &alerts) - if err == nil { - err = alertsEqual(expected, alerts) - } - w.WriteHeader(int(status.Load())) - })) - } - server1 := newHTTPServer("prometheus", "testing_password", &status1) - server2 := newHTTPServer("", "", &status2) + server1 := newTestHTTPServerBuilder(&expected, errc, "prometheus", "testing_password", &status1) + server2 := newTestHTTPServerBuilder(&expected, errc, "", "", &status2) defer server1.Close() defer server2.Close() @@ -213,6 +214,90 @@ func TestHandlerSendAll(t *testing.T) { checkNoErr() } +func TestHandlerSendAllRemapPerAm(t *testing.T) { + var ( + errc = make(chan error, 1) + expected1 = make([]*Alert, 0, maxBatchSize) + expected2 = make([]*Alert, 0, maxBatchSize) + + status1, status2 atomic.Int32 + ) + status1.Store(int32(http.StatusOK)) + status2.Store(int32(http.StatusOK)) + + server1 := newTestHTTPServerBuilder(&expected1, errc, "", "", &status1) + server2 := newTestHTTPServerBuilder(&expected2, errc, "", "", &status2) + + defer server1.Close() + defer server2.Close() + + h := NewManager(&Options{}, nil) + h.alertmanagers = make(map[string]*alertmanagerSet) + + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + + am2Cfg := config.DefaultAlertmanagerConfig + am2Cfg.Timeout = model.Duration(time.Second) + am2Cfg.AlertRelabelConfigs = []*relabel.Config{ + { + SourceLabels: model.LabelNames{"alertnamedrop"}, + Action: "drop", + Regex: relabel.MustNewRegexp(".+"), + }, + } + + h.alertmanagers["1"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server1.URL }, + }, + }, + cfg: &am1Cfg, + } + + h.alertmanagers["2"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server2.URL }, + }, + }, + cfg: &am2Cfg, + } + + for i := range make([]struct{}, maxBatchSize/2) { + h.queue = append(h.queue, &Alert{ + Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), + }) + h.queue = append(h.queue, &Alert{ + Labels: labels.FromStrings("alertnamedrop", fmt.Sprintf("%d", i)), + }) + + expected1 = append(expected1, &Alert{ + Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), + }) + expected1 = append(expected1, &Alert{ + Labels: labels.FromStrings("alertnamedrop", fmt.Sprintf("%d", i)), + }) + + expected2 = append(expected2, &Alert{ + Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), + }) + } + + checkNoErr := func() { + t.Helper() + select { + case err := <-errc: + require.NoError(t, err) + default: + } + } + + require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + checkNoErr() +} + func TestCustomDo(t *testing.T) { const testURL = "http://testurl.com/" const testBody = "testbody"