Fix race conditions in the memory alerts store (#3648)

* Fix race conditions in the memory alerts store

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* Expose the GC method from store.Alerts

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* Use RLock/Unlock on read path

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* Resolve conflicts

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* release locks by using the defer

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* Revert the RWMutex back to Mutex

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

---------

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
This commit is contained in:
Xiaochao Dong 2024-05-16 18:25:21 +08:00 committed by GitHub
parent df7041114d
commit 91a94f00f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 116 additions and 29 deletions

View File

@ -34,10 +34,11 @@ const alertChannelLength = 200
type Alerts struct { type Alerts struct {
cancel context.CancelFunc cancel context.CancelFunc
mtx sync.Mutex
alerts *store.Alerts alerts *store.Alerts
marker types.AlertMarker marker types.AlertMarker
mtx sync.Mutex
listeners map[int]listeningAlerts listeners map[int]listeningAlerts
next int next int
@ -100,16 +101,42 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio
logger: log.With(l, "component", "provider"), logger: log.With(l, "component", "provider"),
callback: alertCallback, callback: alertCallback,
} }
a.alerts.SetGCCallback(func(alerts []types.Alert) {
for _, alert := range alerts { if r != nil {
a.registerMetrics(r)
}
go a.gcLoop(ctx, intervalGC)
return a, nil
}
func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
a.gc()
}
}
}
func (a *Alerts) gc() {
a.mtx.Lock()
defer a.mtx.Unlock()
deleted := a.alerts.GC()
for _, alert := range deleted {
// As we don't persist alerts, we no longer consider them after // As we don't persist alerts, we no longer consider them after
// they are resolved. Alerts waiting for resolved notifications are // they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly. // held in memory in aggregation groups redundantly.
m.Delete(alert.Fingerprint()) a.marker.Delete(alert.Fingerprint())
a.callback.PostDelete(&alert) a.callback.PostDelete(&alert)
} }
a.mtx.Lock()
for i, l := range a.listeners { for i, l := range a.listeners {
select { select {
case <-l.done: case <-l.done:
@ -119,16 +146,6 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio
// listener is not closed yet, hence proceed. // listener is not closed yet, hence proceed.
} }
} }
a.mtx.Unlock()
})
if r != nil {
a.registerMetrics(r)
}
go a.alerts.Run(ctx, intervalGC)
return a, nil
} }
// Close the alert provider. // Close the alert provider.
@ -174,11 +191,13 @@ func (a *Alerts) GetPending() provider.AlertIterator {
ch = make(chan *types.Alert, alertChannelLength) ch = make(chan *types.Alert, alertChannelLength)
done = make(chan struct{}) done = make(chan struct{})
) )
a.mtx.Lock()
defer a.mtx.Unlock()
alerts := a.alerts.List()
go func() { go func() {
defer close(ch) defer close(ch)
for _, a := range alerts {
for _, a := range a.alerts.List() {
select { select {
case ch <- a: case ch <- a:
case <-done: case <-done:
@ -192,11 +211,16 @@ func (a *Alerts) GetPending() provider.AlertIterator {
// Get returns the alert for a given fingerprint. // Get returns the alert for a given fingerprint.
func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.alerts.Get(fp) return a.alerts.Get(fp)
} }
// Put adds the given alert to the set. // Put adds the given alert to the set.
func (a *Alerts) Put(alerts ...*types.Alert) error { func (a *Alerts) Put(alerts ...*types.Alert) error {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, alert := range alerts { for _, alert := range alerts {
fp := alert.Fingerprint() fp := alert.Fingerprint()
@ -226,14 +250,12 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
a.callback.PostStore(alert, existing) a.callback.PostStore(alert, existing)
a.mtx.Lock()
for _, l := range a.listeners { for _, l := range a.listeners {
select { select {
case l.alerts <- alert: case l.alerts <- alert:
case <-l.done: case <-l.done:
} }
} }
a.mtx.Unlock()
} }
return nil return nil
@ -241,6 +263,9 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
// count returns the number of non-resolved alerts we currently have stored filtered by the provided state. // count returns the number of non-resolved alerts we currently have stored filtered by the provided state.
func (a *Alerts) count(state types.AlertState) int { func (a *Alerts) count(state types.AlertState) int {
a.mtx.Lock()
defer a.mtx.Unlock()
var count int var count int
for _, alert := range a.alerts.List() { for _, alert := range a.alerts.List() {
if alert.Resolved() { if alert.Resolved() {

View File

@ -15,6 +15,7 @@ package mem
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"strconv" "strconv"
@ -561,3 +562,62 @@ func (l *limitCountCallback) PostStore(_ *types.Alert, existing bool) {
func (l *limitCountCallback) PostDelete(_ *types.Alert) { func (l *limitCountCallback) PostDelete(_ *types.Alert) {
l.alerts.Dec() l.alerts.Dec()
} }
func TestAlertsConcurrently(t *testing.T) {
callback := &limitCountCallback{limit: 100}
a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), time.Millisecond, callback, log.NewNopLogger(), nil)
require.NoError(t, err)
stopc := make(chan struct{})
failc := make(chan struct{})
go func() {
time.Sleep(2 * time.Second)
close(stopc)
}()
expire := 10 * time.Millisecond
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
j := 0
for {
select {
case <-failc:
return
case <-stopc:
return
default:
}
now := time.Now()
err := a.Put(&types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"bar": model.LabelValue(strconv.Itoa(j))},
StartsAt: now,
EndsAt: now.Add(expire),
},
UpdatedAt: now,
})
if err != nil && !errors.Is(err, errTooManyAlerts) {
close(failc)
return
}
j++
}
}()
}
wg.Wait()
select {
case <-failc:
t.Fatalf("unexpected error happened")
default:
}
time.Sleep(expire)
require.Eventually(t, func() bool {
// When the alert will eventually expire and is considered resolved - it won't count.
return a.count(types.AlertStateActive) == 0
}, 2*expire, expire)
require.Equal(t, int32(0), callback.alerts.Load())
}

View File

@ -64,12 +64,13 @@ func (a *Alerts) Run(ctx context.Context, interval time.Duration) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-t.C: case <-t.C:
a.gc() a.GC()
} }
} }
} }
func (a *Alerts) gc() { // GC deletes resolved alerts and returns them.
func (a *Alerts) GC() []types.Alert {
a.Lock() a.Lock()
var resolved []types.Alert var resolved []types.Alert
for fp, alert := range a.c { for fp, alert := range a.c {
@ -90,6 +91,7 @@ func (a *Alerts) gc() {
} }
a.Unlock() a.Unlock()
a.cb(resolved) a.cb(resolved)
return resolved
} }
// Get returns the Alert with the matching fingerprint, or an error if it is // Get returns the Alert with the matching fingerprint, or an error if it is