Expires notify log sooner when possible

It seems useless to keep the notifications in the nflog for longer than
twice the repeat interval. This should help reduce memory usage of
clustered alertmanagers.

Signed-off-by: Julien Pivotto <roidelapluie@o11y.eu>
This commit is contained in:
Julien Pivotto 2022-07-05 12:09:18 +02:00
parent d034f116d5
commit b0443021dc
4 changed files with 24 additions and 10 deletions

View File

@ -399,7 +399,7 @@ func stateKey(k string, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, receiverKey(r))
}
func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
// Write all st with the same timestamp.
now := l.now()
key := stateKey(gkey, r)
@ -415,6 +415,11 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
}
}
expiresAt := now.Add(l.retention)
if expiry > 0 && l.retention > expiry {
expiresAt = now.Add(expiry)
}
e := &pb.MeshEntry{
Entry: &pb.Entry{
Receiver: r,
@ -423,7 +428,7 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
FiringAlerts: firingAlerts,
ResolvedAlerts: resolvedAlerts,
},
ExpiresAt: now.Add(l.retention),
ExpiresAt: expiresAt,
}
b, err := marshalMeshEntry(e)

View File

@ -322,7 +322,7 @@ func TestQuery(t *testing.T) {
firingAlerts := []uint64{1, 2, 3}
resolvedAlerts := []uint64{4, 5}
err = nl.Log(recv, "key", firingAlerts, resolvedAlerts)
err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0)
require.NoError(t, err, "logging notification failed")
entries, err := nl.Query(QGroupKey("key"), QReceiver(recv))

View File

@ -239,7 +239,7 @@ func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Aler
}
type NotificationLog interface {
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error)
}
@ -785,7 +785,13 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ
return ctx, nil, errors.New("resolved alerts missing")
}
return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
repeat, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}
expiry := 2 * repeat
return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry)
}
type timeStage struct {

View File

@ -58,15 +58,15 @@ type testNflog struct {
qres []*nflogpb.Entry
qerr error
logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
}
func (l *testNflog) Query(p ...nflog.QueryParam) ([]*nflogpb.Entry, error) {
return l.qres, l.qerr
}
func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts)
func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts, expiry)
}
func (l *testNflog) GC() (int, error) {
@ -553,12 +553,14 @@ func TestSetNotifiesStage(t *testing.T) {
require.NotNil(t, resctx)
ctx = WithResolvedAlerts(ctx, []uint64{})
ctx = WithRepeatInterval(ctx, time.Hour)
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
require.Equal(t, s.recv, r)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{0, 1, 2}, firingAlerts)
require.Equal(t, []uint64{}, resolvedAlerts)
require.Equal(t, 2*time.Hour, expiry)
return nil
}
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
@ -569,11 +571,12 @@ func TestSetNotifiesStage(t *testing.T) {
ctx = WithFiringAlerts(ctx, []uint64{})
ctx = WithResolvedAlerts(ctx, []uint64{0, 1, 2})
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
require.Equal(t, s.recv, r)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{}, firingAlerts)
require.Equal(t, []uint64{0, 1, 2}, resolvedAlerts)
require.Equal(t, 2*time.Hour, expiry)
return nil
}
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)