Split notifier select in 2 to ensure newer targets are used. (#10948)

* Split notifier select in 2 to ensure newer targets are used.

Signed-off-by: Julien Pivotto <roidelapluie@o11y.eu>
This commit is contained in:
Julien Pivotto 2022-07-01 14:23:23 +02:00 committed by GitHub
parent 4aa693da99
commit 02f3297719
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 122 additions and 1 deletions

View File

@ -302,12 +302,22 @@ func (n *Manager) nextBatch() []*Alert {
// Run dispatches notifications continuously.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
for {
// The select is split in two parts, such as we will first try to read
// new alertmanager targets if they are available, before sending new
// alerts.
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
case <-n.more:
default:
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
case <-n.more:
}
}
alerts := n.nextBatch()

View File

@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"time"
@ -571,3 +572,113 @@ func makeInputTargetGroup() *targetgroup.Group {
func TestLabelsToOpenAPILabelSet(t *testing.T) {
require.Equal(t, models.LabelSet{"aaa": "111", "bbb": "222"}, labelsToOpenAPILabelSet(labels.Labels{{Name: "aaa", Value: "111"}, {Name: "bbb", Value: "222"}}))
}
// TestHangingNotifier validates that targets updates happen even when there are
// queued alerts.
func TestHangingNotifier(t *testing.T) {
// Note: When targets are not updated in time, this test is flaky because go
// selects are not deterministic. Therefore we run 10 subtests to run into the issue.
for i := 0; i < 10; i++ {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var (
done = make(chan struct{})
changed = make(chan struct{})
syncCh = make(chan map[string][]*targetgroup.Group)
)
defer func() {
close(done)
}()
// Setting up a bad server. This server hangs for 2 seconds.
badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-done:
case <-time.After(2 * time.Second):
}
}))
badURL, err := url.Parse(badServer.URL)
require.NoError(t, err)
badAddress := badURL.Host // Used for __name__ label in targets.
// Setting up a bad server. This server returns fast, signaling requests on
// by closing the changed channel.
goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
close(changed)
}))
goodURL, err := url.Parse(goodServer.URL)
require.NoError(t, err)
goodAddress := goodURL.Host // Used for __name__ label in targets.
h := NewManager(
&Options{
QueueCapacity: 20 * maxBatchSize,
},
nil,
)
h.alertmanagers = make(map[string]*alertmanagerSet)
am1Cfg := config.DefaultAlertmanagerConfig
am1Cfg.Timeout = model.Duration(200 * time.Millisecond)
h.alertmanagers["config-0"] = &alertmanagerSet{
ams: []alertmanager{},
cfg: &am1Cfg,
metrics: h.metrics,
}
go h.Run(syncCh)
defer h.Stop()
var alerts []*Alert
for i := range make([]struct{}, 20*maxBatchSize) {
alerts = append(alerts, &Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
})
}
// Injecting the hanging server URL.
syncCh <- map[string][]*targetgroup.Group{
"config-0": {
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(badAddress),
},
},
},
},
}
// Queing alerts.
h.Send(alerts...)
// Updating with a working alertmanager target.
go func() {
select {
case syncCh <- map[string][]*targetgroup.Group{
"config-0": {
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(goodAddress),
},
},
},
},
}:
case <-done:
}
}()
select {
case <-time.After(300 * time.Millisecond):
t.Fatalf("Timeout after 300 milliseconds, targets not synced in time.")
case <-changed:
// The good server has been hit in less than 3 seconds, therefore
// targets have been updated before a second call could be made to the
// bad server.
}
})
}
}