From c78b449f4a24ed9e8f412dddcb3599d7a8c4f267 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Fri, 19 Apr 2019 14:01:41 +0200 Subject: [PATCH] provider/mem: fix dropped alerts Signed-off-by: Simon Pasquier --- dispatch/dispatch.go | 10 +++++----- inhibit/inhibit.go | 2 +- provider/mem/mem.go | 21 +++++++++++---------- store/store.go | 17 ++++++++--------- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index f6360fd4..2a8c491c 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -181,7 +181,7 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ alerts := ag.alerts.List() filteredAlerts := make([]*types.Alert, 0, len(alerts)) - for a := range alerts { + for _, a := range alerts { if !alertFilter(a, now) { continue } @@ -403,7 +403,7 @@ func (ag *aggrGroup) insert(alert *types.Alert) { } func (ag *aggrGroup) empty() bool { - return ag.alerts.Count() == 0 + return ag.alerts.Empty() } // flush sends notifications for all new alerts. @@ -414,10 +414,10 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { var ( alerts = ag.alerts.List() - alertsSlice = make(types.AlertSlice, 0, ag.alerts.Count()) + alertsSlice = make(types.AlertSlice, 0, len(alerts)) + now = time.Now() ) - now := time.Now() - for alert := range alerts { + for _, alert := range alerts { a := *alert // Ensure that alerts don't resolve as time move forwards. if !a.ResolvedAt(now) { diff --git a/inhibit/inhibit.go b/inhibit/inhibit.go index e49eb42a..60027b52 100644 --- a/inhibit/inhibit.go +++ b/inhibit/inhibit.go @@ -204,7 +204,7 @@ func NewInhibitRule(cr *config.InhibitRule) *InhibitRule { // source and the target side of the rule are disregarded. func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) { Outer: - for a := range r.scache.List() { + for _, a := range r.scache.List() { // The cache might be stale and contain resolved alerts. if a.Resolved() { continue diff --git a/provider/mem/mem.go b/provider/mem/mem.go index 0b909797..9cae5487 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -31,10 +31,10 @@ const alertChannelLength = 200 // Alerts gives access to a set of alerts. All methods are goroutine-safe. type Alerts struct { - alerts *store.Alerts cancel context.CancelFunc mtx sync.Mutex + alerts *store.Alerts listeners map[int]listeningAlerts next int @@ -99,25 +99,26 @@ func max(a, b int) int { // resolved and successfully notified about. // They are not guaranteed to be in chronological order. func (a *Alerts) Subscribe() provider.AlertIterator { + a.mtx.Lock() + defer a.mtx.Unlock() + var ( - ch = make(chan *types.Alert, max(a.alerts.Count(), alertChannelLength)) - done = make(chan struct{}) + done = make(chan struct{}) + alerts = a.alerts.List() + ch = make(chan *types.Alert, max(len(alerts), alertChannelLength)) ) - for a := range a.alerts.List() { + for _, a := range alerts { ch <- a } - a.mtx.Lock() - i := a.next + a.listeners[a.next] = listeningAlerts{alerts: ch, done: done} a.next++ - a.listeners[i] = listeningAlerts{alerts: ch, done: done} - a.mtx.Unlock() return provider.NewAlertIterator(ch, done, nil) } -// GetPending returns an iterator over all alerts that have +// GetPending returns an iterator over all the alerts that have // pending notifications. func (a *Alerts) GetPending() provider.AlertIterator { var ( @@ -128,7 +129,7 @@ func (a *Alerts) GetPending() provider.AlertIterator { go func() { defer close(ch) - for a := range a.alerts.List() { + for _, a := range a.alerts.List() { select { case ch <- a: case <-done: diff --git a/store/store.go b/store/store.go index 5e84ba54..8897c36e 100644 --- a/store/store.go +++ b/store/store.go @@ -122,24 +122,23 @@ func (a *Alerts) Delete(fp model.Fingerprint) error { return nil } -// List returns a buffered channel of Alerts currently held in memory. -func (a *Alerts) List() <-chan *types.Alert { +// List returns a slice of Alerts currently held in memory. +func (a *Alerts) List() []*types.Alert { a.Lock() defer a.Unlock() - c := make(chan *types.Alert, len(a.c)) + alerts := make([]*types.Alert, 0, len(a.c)) for _, alert := range a.c { - c <- alert + alerts = append(alerts, alert) } - close(c) - return c + return alerts } -// Count returns the number of items within the store. -func (a *Alerts) Count() int { +// Empty returns true if the store is empty. +func (a *Alerts) Empty() bool { a.Lock() defer a.Unlock() - return len(a.c) + return len(a.c) == 0 }