dispatch: don't garbage-collect alerts from store

The aggregation group is already responsible for removing the resolved
alerts. Running the garbage collection in parallel introduces a race and
eventually resolved notifications may be dropped.

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2019-09-18 09:29:34 +02:00
parent dea2829849
commit 4535311c34
6 changed files with 38 additions and 37 deletions

View File

@ -313,12 +313,10 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
alerts: store.NewAlerts(15 * time.Minute),
alerts: store.NewAlerts(),
done: make(chan struct{}),
}
ag.ctx, ag.cancel = context.WithCancel(ctx)
ag.alerts.Run(ag.ctx)
ag.logger = log.With(logger, "aggrGroup", ag)
@ -438,14 +436,13 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
fp := a.Fingerprint()
got, err := ag.alerts.Get(fp)
if err != nil {
// This should only happen if the Alert was
// deleted from the store during the flush.
level.Error(ag.logger).Log("msg", "failed to get alert", "err", err)
// This should never happen.
level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String())
continue
}
if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
if err := ag.alerts.Delete(fp); err != nil {
level.Error(ag.logger).Log("msg", "error on delete alert", "err", err)
level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String())
}
}
}

View File

@ -94,7 +94,7 @@ func (ih *Inhibitor) Run() {
runCtx, runCancel := context.WithCancel(ctx)
for _, rule := range ih.rules {
rule.scache.Run(runCtx)
rule.scache.Run(runCtx, 15*time.Minute)
}
g.Add(func() error {
@ -194,7 +194,7 @@ func NewInhibitRule(cr *config.InhibitRule) *InhibitRule {
SourceMatchers: sourcem,
TargetMatchers: targetm,
Equal: equal,
scache: store.NewAlerts(15 * time.Minute),
scache: store.NewAlerts(),
}
}

View File

@ -122,7 +122,7 @@ func TestInhibitRuleHasEqual(t *testing.T) {
for _, c := range cases {
r := &InhibitRule{
Equal: map[model.LabelName]struct{}{},
scache: store.NewAlerts(5 * time.Minute),
scache: store.NewAlerts(),
}
for _, ln := range c.equal {
r.Equal[ln] = struct{}{}
@ -170,9 +170,9 @@ func TestInhibitRuleMatches(t *testing.T) {
},
}
ih.rules[0].scache = store.NewAlerts(5 * time.Minute)
ih.rules[0].scache = store.NewAlerts()
ih.rules[0].scache.Set(sourceAlert1)
ih.rules[1].scache = store.NewAlerts(5 * time.Minute)
ih.rules[1].scache = store.NewAlerts()
ih.rules[1].scache.Set(sourceAlert2)
cases := []struct {

View File

@ -50,7 +50,7 @@ type listeningAlerts struct {
func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l log.Logger) (*Alerts, error) {
ctx, cancel := context.WithCancel(ctx)
a := &Alerts{
alerts: store.NewAlerts(intervalGC),
alerts: store.NewAlerts(),
cancel: cancel,
listeners: map[int]listeningAlerts{},
next: 0,
@ -76,7 +76,7 @@ func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l
}
a.mtx.Unlock()
})
a.alerts.Run(ctx)
a.alerts.Run(ctx, intervalGC)
return a, nil
}

View File

@ -33,23 +33,16 @@ var (
// gcInterval. An optional callback can be set which receives a slice of all
// resolved alerts that have been removed.
type Alerts struct {
gcInterval time.Duration
sync.Mutex
c map[model.Fingerprint]*types.Alert
cb func([]*types.Alert)
}
// NewAlerts returns a new Alerts struct.
func NewAlerts(gcInterval time.Duration) *Alerts {
if gcInterval == 0 {
gcInterval = time.Minute
}
func NewAlerts() *Alerts {
a := &Alerts{
c: make(map[model.Fingerprint]*types.Alert),
cb: func(_ []*types.Alert) {},
gcInterval: gcInterval,
c: make(map[model.Fingerprint]*types.Alert),
cb: func(_ []*types.Alert) {},
}
return a
@ -63,8 +56,8 @@ func (a *Alerts) SetGCCallback(cb func([]*types.Alert)) {
a.cb = cb
}
// Run starts the GC loop.
func (a *Alerts) Run(ctx context.Context) {
// Run starts the GC loop. The interval must be greater than zero; if not, the function will panic.
func (a *Alerts) Run(ctx context.Context, interval time.Duration) {
go func(t *time.Ticker) {
for {
select {
@ -74,7 +67,7 @@ func (a *Alerts) Run(ctx context.Context) {
a.gc()
}
}
}(time.NewTicker(a.gcInterval))
}(time.NewTicker(interval))
}
func (a *Alerts) gc() {

View File

@ -28,8 +28,8 @@ func TestSetGet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := NewAlerts(d)
a.Run(ctx)
a := NewAlerts()
a.Run(ctx, d)
alert := &types.Alert{
UpdatedAt: time.Now(),
}
@ -46,8 +46,8 @@ func TestDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := NewAlerts(d)
a.Run(ctx)
a := NewAlerts()
a.Run(ctx, d)
alert := &types.Alert{
UpdatedAt: time.Now(),
}
@ -82,18 +82,29 @@ func TestGC(t *testing.T) {
newAlert("a", -10, -5),
newAlert("d", -10, -1),
}
s := NewAlerts(5 * time.Minute)
var n int
s := NewAlerts()
var (
n int
done = make(chan struct{})
)
s.SetGCCallback(func(a []*types.Alert) {
for range a {
n++
n += len(a)
if n >= len(resolved) {
close(done)
}
})
for _, alert := range append(active, resolved...) {
require.NoError(t, s.Set(alert))
}
s.gc()
ctx, cancel := context.WithCancel(context.Background())
s.Run(ctx, 10*time.Millisecond)
select {
case <-done:
cancel()
break
case <-time.After(1 * time.Second):
t.Fatal("garbage collection didn't complete in time")
}
for _, alert := range active {
if _, err := s.Get(alert.Fingerprint()); err != nil {