provider/mem: fix dropped alerts
Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
parent
510cb2936f
commit
c78b449f4a
|
@ -181,7 +181,7 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
|
|||
|
||||
alerts := ag.alerts.List()
|
||||
filteredAlerts := make([]*types.Alert, 0, len(alerts))
|
||||
for a := range alerts {
|
||||
for _, a := range alerts {
|
||||
if !alertFilter(a, now) {
|
||||
continue
|
||||
}
|
||||
|
@ -403,7 +403,7 @@ func (ag *aggrGroup) insert(alert *types.Alert) {
|
|||
}
|
||||
|
||||
func (ag *aggrGroup) empty() bool {
|
||||
return ag.alerts.Count() == 0
|
||||
return ag.alerts.Empty()
|
||||
}
|
||||
|
||||
// flush sends notifications for all new alerts.
|
||||
|
@ -414,10 +414,10 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
|||
|
||||
var (
|
||||
alerts = ag.alerts.List()
|
||||
alertsSlice = make(types.AlertSlice, 0, ag.alerts.Count())
|
||||
alertsSlice = make(types.AlertSlice, 0, len(alerts))
|
||||
now = time.Now()
|
||||
)
|
||||
now := time.Now()
|
||||
for alert := range alerts {
|
||||
for _, alert := range alerts {
|
||||
a := *alert
|
||||
// Ensure that alerts don't resolve as time move forwards.
|
||||
if !a.ResolvedAt(now) {
|
||||
|
|
|
@ -204,7 +204,7 @@ func NewInhibitRule(cr *config.InhibitRule) *InhibitRule {
|
|||
// source and the target side of the rule are disregarded.
|
||||
func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) {
|
||||
Outer:
|
||||
for a := range r.scache.List() {
|
||||
for _, a := range r.scache.List() {
|
||||
// The cache might be stale and contain resolved alerts.
|
||||
if a.Resolved() {
|
||||
continue
|
||||
|
|
|
@ -31,10 +31,10 @@ const alertChannelLength = 200
|
|||
|
||||
// Alerts gives access to a set of alerts. All methods are goroutine-safe.
|
||||
type Alerts struct {
|
||||
alerts *store.Alerts
|
||||
cancel context.CancelFunc
|
||||
|
||||
mtx sync.Mutex
|
||||
alerts *store.Alerts
|
||||
listeners map[int]listeningAlerts
|
||||
next int
|
||||
|
||||
|
@ -99,25 +99,26 @@ func max(a, b int) int {
|
|||
// resolved and successfully notified about.
|
||||
// They are not guaranteed to be in chronological order.
|
||||
func (a *Alerts) Subscribe() provider.AlertIterator {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
var (
|
||||
ch = make(chan *types.Alert, max(a.alerts.Count(), alertChannelLength))
|
||||
done = make(chan struct{})
|
||||
done = make(chan struct{})
|
||||
alerts = a.alerts.List()
|
||||
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
|
||||
)
|
||||
|
||||
for a := range a.alerts.List() {
|
||||
for _, a := range alerts {
|
||||
ch <- a
|
||||
}
|
||||
|
||||
a.mtx.Lock()
|
||||
i := a.next
|
||||
a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
|
||||
a.next++
|
||||
a.listeners[i] = listeningAlerts{alerts: ch, done: done}
|
||||
a.mtx.Unlock()
|
||||
|
||||
return provider.NewAlertIterator(ch, done, nil)
|
||||
}
|
||||
|
||||
// GetPending returns an iterator over all alerts that have
|
||||
// GetPending returns an iterator over all the alerts that have
|
||||
// pending notifications.
|
||||
func (a *Alerts) GetPending() provider.AlertIterator {
|
||||
var (
|
||||
|
@ -128,7 +129,7 @@ func (a *Alerts) GetPending() provider.AlertIterator {
|
|||
go func() {
|
||||
defer close(ch)
|
||||
|
||||
for a := range a.alerts.List() {
|
||||
for _, a := range a.alerts.List() {
|
||||
select {
|
||||
case ch <- a:
|
||||
case <-done:
|
||||
|
|
|
@ -122,24 +122,23 @@ func (a *Alerts) Delete(fp model.Fingerprint) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// List returns a buffered channel of Alerts currently held in memory.
|
||||
func (a *Alerts) List() <-chan *types.Alert {
|
||||
// List returns a slice of Alerts currently held in memory.
|
||||
func (a *Alerts) List() []*types.Alert {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
c := make(chan *types.Alert, len(a.c))
|
||||
alerts := make([]*types.Alert, 0, len(a.c))
|
||||
for _, alert := range a.c {
|
||||
c <- alert
|
||||
alerts = append(alerts, alert)
|
||||
}
|
||||
close(c)
|
||||
|
||||
return c
|
||||
return alerts
|
||||
}
|
||||
|
||||
// Count returns the number of items within the store.
|
||||
func (a *Alerts) Count() int {
|
||||
// Empty returns true if the store is empty.
|
||||
func (a *Alerts) Empty() bool {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
return len(a.c)
|
||||
return len(a.c) == 0
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue