Merge pull request #1843 from simonpasquier/fix-dropped-alerts

Alerts might be dropped
This commit is contained in:
stuart nelson 2019-04-20 10:33:16 +02:00 committed by GitHub
commit 1d9a35257f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 77 additions and 86 deletions

View File

@ -181,7 +181,7 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
alerts := ag.alerts.List() alerts := ag.alerts.List()
filteredAlerts := make([]*types.Alert, 0, len(alerts)) filteredAlerts := make([]*types.Alert, 0, len(alerts))
for a := range alerts { for _, a := range alerts {
if !alertFilter(a, now) { if !alertFilter(a, now) {
continue continue
} }
@ -403,7 +403,7 @@ func (ag *aggrGroup) insert(alert *types.Alert) {
} }
func (ag *aggrGroup) empty() bool { func (ag *aggrGroup) empty() bool {
return ag.alerts.Count() == 0 return ag.alerts.Empty()
} }
// flush sends notifications for all new alerts. // flush sends notifications for all new alerts.
@ -414,10 +414,10 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
var ( var (
alerts = ag.alerts.List() alerts = ag.alerts.List()
alertsSlice = make(types.AlertSlice, 0, ag.alerts.Count()) alertsSlice = make(types.AlertSlice, 0, len(alerts))
now = time.Now()
) )
now := time.Now() for _, alert := range alerts {
for alert := range alerts {
a := *alert a := *alert
// Ensure that alerts don't resolve as time move forwards. // Ensure that alerts don't resolve as time move forwards.
if !a.ResolvedAt(now) { if !a.ResolvedAt(now) {

View File

@ -204,7 +204,7 @@ func NewInhibitRule(cr *config.InhibitRule) *InhibitRule {
// source and the target side of the rule are disregarded. // source and the target side of the rule are disregarded.
func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) { func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) {
Outer: Outer:
for a := range r.scache.List() { for _, a := range r.scache.List() {
// The cache might be stale and contain resolved alerts. // The cache might be stale and contain resolved alerts.
if a.Resolved() { if a.Resolved() {
continue continue

View File

@ -31,10 +31,10 @@ const alertChannelLength = 200
// Alerts gives access to a set of alerts. All methods are goroutine-safe. // Alerts gives access to a set of alerts. All methods are goroutine-safe.
type Alerts struct { type Alerts struct {
alerts *store.Alerts
cancel context.CancelFunc cancel context.CancelFunc
mtx sync.Mutex mtx sync.Mutex
alerts *store.Alerts
listeners map[int]listeningAlerts listeners map[int]listeningAlerts
next int next int
@ -99,25 +99,26 @@ func max(a, b int) int {
// resolved and successfully notified about. // resolved and successfully notified about.
// They are not guaranteed to be in chronological order. // They are not guaranteed to be in chronological order.
func (a *Alerts) Subscribe() provider.AlertIterator { func (a *Alerts) Subscribe() provider.AlertIterator {
a.mtx.Lock()
defer a.mtx.Unlock()
var ( var (
ch = make(chan *types.Alert, max(a.alerts.Count(), alertChannelLength)) done = make(chan struct{})
done = make(chan struct{}) alerts = a.alerts.List()
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
) )
for a := range a.alerts.List() { for _, a := range alerts {
ch <- a ch <- a
} }
a.mtx.Lock() a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
i := a.next
a.next++ a.next++
a.listeners[i] = listeningAlerts{alerts: ch, done: done}
a.mtx.Unlock()
return provider.NewAlertIterator(ch, done, nil) return provider.NewAlertIterator(ch, done, nil)
} }
// GetPending returns an iterator over all alerts that have // GetPending returns an iterator over all the alerts that have
// pending notifications. // pending notifications.
func (a *Alerts) GetPending() provider.AlertIterator { func (a *Alerts) GetPending() provider.AlertIterator {
var ( var (
@ -128,7 +129,7 @@ func (a *Alerts) GetPending() provider.AlertIterator {
go func() { go func() {
defer close(ch) defer close(ch)
for a := range a.alerts.List() { for _, a := range a.alerts.List() {
select { select {
case ch <- a: case ch <- a:
case <-done: case <-done:

View File

@ -161,91 +161,82 @@ func TestAlertsPut(t *testing.T) {
func TestAlertsSubscribe(t *testing.T) { func TestAlertsSubscribe(t *testing.T) {
marker := types.NewMarker(prometheus.NewRegistry()) marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
alerts, err := NewAlerts(ctx, marker, 30*time.Minute, log.NewNopLogger())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// add alert1 to validate if pending alerts will be sent // Add alert1 to validate if pending alerts will be sent.
if err := alerts.Put(alert1); err != nil { if err := alerts.Put(alert1); err != nil {
t.Fatalf("Insert failed: %s", err) t.Fatalf("Insert failed: %s", err)
} }
var wg sync.WaitGroup expectedAlerts := map[model.Fingerprint]*types.Alert{
wg.Add(2) alert1.Fingerprint(): alert1,
fatalc := make(chan string, 2) alert2.Fingerprint(): alert2,
alert3.Fingerprint(): alert3,
}
iterator1 := alerts.Subscribe() // Start many consumers and make sure that each receives all the subsequent alerts.
iterator2 := alerts.Subscribe() var (
nb = 100
fatalc = make(chan string, nb)
wg sync.WaitGroup
)
wg.Add(nb)
for i := 0; i < nb; i++ {
go func(i int) {
defer wg.Done()
go func() { it := alerts.Subscribe()
defer wg.Done() defer it.Close()
expectedAlerts := map[model.Fingerprint]*types.Alert{
alert1.Fingerprint(): alert1,
alert2.Fingerprint(): alert2,
alert3.Fingerprint(): alert3,
}
for i := 0; i < 3; i++ { received := make(map[model.Fingerprint]struct{})
actual := <-iterator1.Next() for {
expected := expectedAlerts[actual.Fingerprint()] select {
if !alertsEqual(actual, expected) { case got, ok := <-it.Next():
fatalc <- fmt.Sprintf("Unexpected alert (iterator1)\n%s", pretty.Compare(actual, expected)) if !ok {
return fatalc <- fmt.Sprintf("Iterator %d closed", i)
return
}
if it.Err() != nil {
fatalc <- fmt.Sprintf("Iterator %d: %v", i, it.Err())
return
}
expected := expectedAlerts[got.Fingerprint()]
if !alertsEqual(got, expected) {
fatalc <- fmt.Sprintf("Unexpected alert (iterator %d)\n%s", i, pretty.Compare(got, expected))
return
}
received[got.Fingerprint()] = struct{}{}
if len(received) == len(expectedAlerts) {
return
}
case <-time.After(5 * time.Second):
fatalc <- fmt.Sprintf("Unexpected number of alerts for iterator %d, got: %d, expected: %d", i, len(received), len(expectedAlerts))
return
}
} }
}(i)
}
delete(expectedAlerts, actual.Fingerprint()) // Add more alerts that should be received by the subscribers.
}
if len(expectedAlerts) != 0 {
fatalc <- fmt.Sprintf("Unexpected number of alerts (iterator1): %d", len(expectedAlerts))
}
}()
go func() {
defer wg.Done()
expectedAlerts := map[model.Fingerprint]*types.Alert{
alert1.Fingerprint(): alert1,
alert2.Fingerprint(): alert2,
alert3.Fingerprint(): alert3,
}
for i := 0; i < 3; i++ {
actual := <-iterator2.Next()
expected := expectedAlerts[actual.Fingerprint()]
if !alertsEqual(actual, expected) {
t.Errorf("Unexpected alert")
fatalc <- fmt.Sprintf("Unexpected alert (iterator2)\n%s", pretty.Compare(actual, expected))
}
delete(expectedAlerts, actual.Fingerprint())
}
if len(expectedAlerts) != 0 {
fatalc <- fmt.Sprintf("Unexpected number of alerts (iterator2): %d", len(expectedAlerts))
}
}()
go func() {
wg.Wait()
close(fatalc)
}()
if err := alerts.Put(alert2); err != nil { if err := alerts.Put(alert2); err != nil {
t.Fatalf("Insert failed: %s", err) t.Fatalf("Insert failed: %s", err)
} }
if err := alerts.Put(alert3); err != nil { if err := alerts.Put(alert3); err != nil {
t.Fatalf("Insert failed: %s", err) t.Fatalf("Insert failed: %s", err)
} }
wg.Wait()
close(fatalc)
fatal, ok := <-fatalc fatal, ok := <-fatalc
if ok { if ok {
t.Fatalf(fatal) t.Fatalf(fatal)
} }
iterator1.Close()
iterator2.Close()
} }
func TestAlertsGetPending(t *testing.T) { func TestAlertsGetPending(t *testing.T) {

View File

@ -122,24 +122,23 @@ func (a *Alerts) Delete(fp model.Fingerprint) error {
return nil return nil
} }
// List returns a buffered channel of Alerts currently held in memory. // List returns a slice of Alerts currently held in memory.
func (a *Alerts) List() <-chan *types.Alert { func (a *Alerts) List() []*types.Alert {
a.Lock() a.Lock()
defer a.Unlock() defer a.Unlock()
c := make(chan *types.Alert, len(a.c)) alerts := make([]*types.Alert, 0, len(a.c))
for _, alert := range a.c { for _, alert := range a.c {
c <- alert alerts = append(alerts, alert)
} }
close(c)
return c return alerts
} }
// Count returns the number of items within the store. // Empty returns true if the store is empty.
func (a *Alerts) Count() int { func (a *Alerts) Empty() bool {
a.Lock() a.Lock()
defer a.Unlock() defer a.Unlock()
return len(a.c) return len(a.c) == 0
} }