Remove GC and callback from store.go
This commit removes the GC and callback function from store.go to address a number of data races that have occurred in the past (#2040 and #3648). The store is no longer responsible for removing resolved alerts after some elapsed period of time, and is instead deferred to the consumer of the store (as done in #2040 and #3648). Signed-off-by: George Robinson <george.robinson@grafana.com>
This commit is contained in:
parent
fd37ce9c95
commit
b9ee89e764
|
@ -94,8 +94,24 @@ func (ih *Inhibitor) Run() {
|
|||
ih.mtx.Unlock()
|
||||
runCtx, runCancel := context.WithCancel(ctx)
|
||||
|
||||
// Run a periodic maintenance for each inhibition rule to remove
|
||||
// resolved alerts from their local cache.
|
||||
for _, rule := range ih.rules {
|
||||
go rule.scache.Run(runCtx, 15*time.Minute)
|
||||
rule := rule // Create a local rule variable for the goroutine.
|
||||
g.Add(func() error {
|
||||
ticker := time.NewTicker(15 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
rule.scache.DeleteResolved()
|
||||
case <-runCtx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}, func(err error) {
|
||||
runCancel()
|
||||
})
|
||||
}
|
||||
|
||||
g.Add(func() error {
|
||||
|
|
|
@ -106,48 +106,22 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio
|
|||
a.registerMetrics(r)
|
||||
}
|
||||
|
||||
go a.gcLoop(ctx, intervalGC)
|
||||
go func() {
|
||||
ticker := time.NewTicker(intervalGC)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
a.doMaintenance()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
a.gc()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Alerts) gc() {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
deleted := a.alerts.GC()
|
||||
for _, alert := range deleted {
|
||||
// As we don't persist alerts, we no longer consider them after
|
||||
// they are resolved. Alerts waiting for resolved notifications are
|
||||
// held in memory in aggregation groups redundantly.
|
||||
a.marker.Delete(alert.Fingerprint())
|
||||
a.callback.PostDelete(&alert)
|
||||
}
|
||||
|
||||
for i, l := range a.listeners {
|
||||
select {
|
||||
case <-l.done:
|
||||
delete(a.listeners, i)
|
||||
close(l.alerts)
|
||||
default:
|
||||
// listener is not closed yet, hence proceed.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close the alert provider.
|
||||
func (a *Alerts) Close() {
|
||||
if a.cancel != nil {
|
||||
|
@ -283,6 +257,32 @@ func (a *Alerts) count(state types.AlertState) int {
|
|||
return count
|
||||
}
|
||||
|
||||
func (a *Alerts) doMaintenance() {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
for _, alert := range a.alerts.List() {
|
||||
if alert.Resolved() {
|
||||
// TODO(grobinson-grafana): See if we can use a single method instead of calling List() and then Delete().
|
||||
a.alerts.Delete(alert.Fingerprint())
|
||||
// As we don't persist alerts, we no longer consider them after
|
||||
// they are resolved. Alerts waiting for resolved notifications are
|
||||
// held in memory in aggregation groups redundantly.
|
||||
a.marker.Delete(alert.Fingerprint())
|
||||
a.callback.PostDelete(alert)
|
||||
}
|
||||
}
|
||||
|
||||
for i, l := range a.listeners {
|
||||
select {
|
||||
case <-l.done:
|
||||
delete(a.listeners, i)
|
||||
close(l.alerts)
|
||||
default:
|
||||
// listener is not closed yet, hence proceed.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type noopCallback struct{}
|
||||
|
||||
func (n noopCallback) PreStore(_ *types.Alert, _ bool) error { return nil }
|
||||
|
|
|
@ -14,10 +14,8 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
|
@ -28,70 +26,17 @@ import (
|
|||
var ErrNotFound = errors.New("alert not found")
|
||||
|
||||
// Alerts provides lock-coordinated to an in-memory map of alerts, keyed by
|
||||
// their fingerprint. Resolved alerts are removed from the map based on
|
||||
// gcInterval. An optional callback can be set which receives a slice of all
|
||||
// resolved alerts that have been removed.
|
||||
// their fingerprint.
|
||||
type Alerts struct {
|
||||
sync.Mutex
|
||||
c map[model.Fingerprint]*types.Alert
|
||||
cb func([]types.Alert)
|
||||
c map[model.Fingerprint]*types.Alert
|
||||
}
|
||||
|
||||
// NewAlerts returns a new Alerts struct.
|
||||
func NewAlerts() *Alerts {
|
||||
a := &Alerts{
|
||||
c: make(map[model.Fingerprint]*types.Alert),
|
||||
cb: func(_ []types.Alert) {},
|
||||
return &Alerts{
|
||||
c: make(map[model.Fingerprint]*types.Alert),
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// SetGCCallback sets a GC callback to be executed after each GC.
|
||||
func (a *Alerts) SetGCCallback(cb func([]types.Alert)) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
a.cb = cb
|
||||
}
|
||||
|
||||
// Run starts the GC loop. The interval must be greater than zero; if not, the function will panic.
|
||||
func (a *Alerts) Run(ctx context.Context, interval time.Duration) {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
a.GC()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GC deletes resolved alerts and returns them.
|
||||
func (a *Alerts) GC() []types.Alert {
|
||||
a.Lock()
|
||||
var resolved []types.Alert
|
||||
for fp, alert := range a.c {
|
||||
if alert.Resolved() {
|
||||
delete(a.c, fp)
|
||||
resolved = append(resolved, types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: alert.Labels.Clone(),
|
||||
Annotations: alert.Annotations.Clone(),
|
||||
StartsAt: alert.StartsAt,
|
||||
EndsAt: alert.EndsAt,
|
||||
GeneratorURL: alert.GeneratorURL,
|
||||
},
|
||||
UpdatedAt: alert.UpdatedAt,
|
||||
Timeout: alert.Timeout,
|
||||
})
|
||||
}
|
||||
}
|
||||
a.Unlock()
|
||||
a.cb(resolved)
|
||||
return resolved
|
||||
}
|
||||
|
||||
// Get returns the Alert with the matching fingerprint, or an error if it is
|
||||
|
@ -116,6 +61,12 @@ func (a *Alerts) Set(alert *types.Alert) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *Alerts) Delete(fp model.Fingerprint) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
delete(a.c, fp)
|
||||
}
|
||||
|
||||
// DeleteIfNotModified deletes the slice of Alerts from the store if not
|
||||
// modified.
|
||||
func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error {
|
||||
|
@ -130,6 +81,17 @@ func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// DeleteResolved deletes all resolved alerts.
|
||||
func (a *Alerts) DeleteResolved() {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
for fp, alert := range a.c {
|
||||
if alert.Resolved() {
|
||||
delete(a.c, fp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// List returns a slice of Alerts currently held in memory.
|
||||
func (a *Alerts) List() []*types.Alert {
|
||||
a.Lock()
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -138,60 +137,42 @@ func TestDeleteIfNotModified(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestGC(t *testing.T) {
|
||||
now := time.Now()
|
||||
newAlert := func(key string, start, end time.Duration) *types.Alert {
|
||||
return &types.Alert{
|
||||
func TestDeleteResolved(t *testing.T) {
|
||||
t.Run("active alert should not be deleted", func(t *testing.T) {
|
||||
a := NewAlerts()
|
||||
a1 := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{model.LabelName(key): "b"},
|
||||
StartsAt: now.Add(start * time.Minute),
|
||||
EndsAt: now.Add(end * time.Minute),
|
||||
Labels: model.LabelSet{
|
||||
"foo": "bar",
|
||||
},
|
||||
StartsAt: time.Now(),
|
||||
EndsAt: time.Now().Add(5 * time.Minute),
|
||||
},
|
||||
}
|
||||
}
|
||||
active := []*types.Alert{
|
||||
newAlert("b", 10, 20),
|
||||
newAlert("c", -10, 10),
|
||||
}
|
||||
resolved := []*types.Alert{
|
||||
newAlert("a", -10, -5),
|
||||
newAlert("d", -10, -1),
|
||||
}
|
||||
s := NewAlerts()
|
||||
var (
|
||||
n int
|
||||
done = make(chan struct{})
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
)
|
||||
s.SetGCCallback(func(a []types.Alert) {
|
||||
n += len(a)
|
||||
if n >= len(resolved) {
|
||||
cancel()
|
||||
}
|
||||
require.NoError(t, a.Set(a1))
|
||||
a.DeleteResolved()
|
||||
// a1 should not have been deleted.
|
||||
got, err := a.Get(a1.Fingerprint())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a1, got)
|
||||
})
|
||||
for _, alert := range append(active, resolved...) {
|
||||
require.NoError(t, s.Set(alert))
|
||||
}
|
||||
go func() {
|
||||
s.Run(ctx, 10*time.Millisecond)
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
break
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("garbage collection didn't complete in time")
|
||||
}
|
||||
|
||||
for _, alert := range active {
|
||||
if _, err := s.Get(alert.Fingerprint()); err != nil {
|
||||
t.Errorf("alert %v should not have been gc'd", alert)
|
||||
t.Run("resolved alert should not be deleted", func(t *testing.T) {
|
||||
a := NewAlerts()
|
||||
a1 := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"foo": "bar",
|
||||
},
|
||||
StartsAt: time.Now().Add(-5 * time.Minute),
|
||||
EndsAt: time.Now().Add(-time.Second),
|
||||
},
|
||||
}
|
||||
}
|
||||
for _, alert := range resolved {
|
||||
if _, err := s.Get(alert.Fingerprint()); err == nil {
|
||||
t.Errorf("alert %v should have been gc'd", alert)
|
||||
}
|
||||
}
|
||||
require.Len(t, resolved, n)
|
||||
require.NoError(t, a.Set(a1))
|
||||
a.DeleteResolved()
|
||||
// a1 should have been deleted.
|
||||
got, err := a.Get(a1.Fingerprint())
|
||||
require.Equal(t, ErrNotFound, err)
|
||||
require.Nil(t, got)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue