Enable support for custom callbacks as part of maintenance (#2689)

* Enable support for custom callbacks as part of maintenance

This enables support for custom Maintenance callbacks as part of the periodic maintenance of silences and notification logs.
Effectively a no-op for the Alertmanager but allows downstream implementation to inject custom logic as part of it.

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Add tests

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Fix tests and remove whitespace

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Address review comments

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* run go fmt

Signed-off-by: gotjosh <josue.abreu@gmail.com>

* Fix import ordering

Signed-off-by: gotjosh <josue.abreu@gmail.com>
This commit is contained in:
gotjosh 2021-09-06 11:49:39 +01:00 committed by GitHub
parent d6e758ad37
commit 8da517524a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 128 additions and 46 deletions

View File

@ -267,7 +267,7 @@ func run() int {
notificationLogOpts := []nflog.Option{ notificationLogOpts := []nflog.Option{
nflog.WithRetention(*retention), nflog.WithRetention(*retention),
nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")), nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")),
nflog.WithMaintenance(15*time.Minute, stopc, wg.Done), nflog.WithMaintenance(15*time.Minute, stopc, wg.Done, nil),
nflog.WithMetrics(prometheus.DefaultRegisterer), nflog.WithMetrics(prometheus.DefaultRegisterer),
nflog.WithLogger(log.With(logger, "component", "nflog")), nflog.WithLogger(log.With(logger, "component", "nflog")),
} }
@ -304,7 +304,7 @@ func run() int {
// Start providers before router potentially sends updates. // Start providers before router potentially sends updates.
wg.Add(1) wg.Add(1)
go func() { go func() {
silences.Maintenance(15*time.Minute, filepath.Join(*dataDir, "silences"), stopc) silences.Maintenance(15*time.Minute, filepath.Join(*dataDir, "silences"), stopc, nil)
wg.Done() wg.Done()
}() }()

View File

@ -85,11 +85,16 @@ type Log struct {
// For now we only store the most recently added log entry. // For now we only store the most recently added log entry.
// The key is a serialized concatenation of group key and receiver. // The key is a serialized concatenation of group key and receiver.
mtx sync.RWMutex mtx sync.RWMutex
st state st state
broadcast func([]byte) broadcast func([]byte)
maintenanceOverride MaintenanceFunc
} }
// MaintenanceFunc represents the function to run as part of the periodic maintenance for the nflog.
// It returns the size of the snapshot taken or an error if it failed.
type MaintenanceFunc func() (int64, error)
type metrics struct { type metrics struct {
gcDuration prometheus.Summary gcDuration prometheus.Summary
snapshotDuration prometheus.Summary snapshotDuration prometheus.Summary
@ -190,7 +195,8 @@ func WithMetrics(r prometheus.Registerer) Option {
// //
// The maintenance terminates on receiving from the provided channel. // The maintenance terminates on receiving from the provided channel.
// The done function is called after the final snapshot was completed. // The done function is called after the final snapshot was completed.
func WithMaintenance(d time.Duration, stopc chan struct{}, done func()) Option { // If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage.
func WithMaintenance(d time.Duration, stopc chan struct{}, done func(), maintenanceOverride MaintenanceFunc) Option {
return func(l *Log) error { return func(l *Log) error {
if d == 0 { if d == 0 {
return errors.New("maintenance interval must not be 0") return errors.New("maintenance interval must not be 0")
@ -198,6 +204,7 @@ func WithMaintenance(d time.Duration, stopc chan struct{}, done func()) Option {
l.runInterval = d l.runInterval = d
l.stopc = stopc l.stopc = stopc
l.done = done l.done = done
l.maintenanceOverride = maintenanceOverride
return nil return nil
} }
} }
@ -325,34 +332,40 @@ func (l *Log) run() {
t := time.NewTicker(l.runInterval) t := time.NewTicker(l.runInterval)
defer t.Stop() defer t.Stop()
var doMaintenance MaintenanceFunc
doMaintenance = func() (int64, error) {
var size int64
if _, err := l.GC(); err != nil {
return size, err
}
if l.snapf == "" {
return size, nil
}
f, err := openReplace(l.snapf)
if err != nil {
return size, err
}
if size, err = l.Snapshot(f); err != nil {
return size, err
}
return size, f.Close()
}
if l.maintenanceOverride != nil {
doMaintenance = l.maintenanceOverride
}
if l.done != nil { if l.done != nil {
defer l.done() defer l.done()
} }
f := func() error { runMaintenance := func(do func() (int64, error)) error {
start := l.now() start := l.now()
var size int64
level.Debug(l.logger).Log("msg", "Running maintenance") level.Debug(l.logger).Log("msg", "Running maintenance")
defer func() { size, err := do()
level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size) level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size)
l.metrics.snapshotSize.Set(float64(size)) l.metrics.snapshotSize.Set(float64(size))
}() return err
if _, err := l.GC(); err != nil {
return err
}
if l.snapf == "" {
return nil
}
f, err := openReplace(l.snapf)
if err != nil {
return err
}
if size, err = l.Snapshot(f); err != nil {
return err
}
return f.Close()
} }
Loop: Loop:
@ -361,7 +374,7 @@ Loop:
case <-l.stopc: case <-l.stopc:
break Loop break Loop
case <-t.C: case <-t.C:
if err := f(); err != nil { if err := runMaintenance(doMaintenance); err != nil {
level.Error(l.logger).Log("msg", "Running maintenance failed", "err", err) level.Error(l.logger).Log("msg", "Running maintenance failed", "err", err)
} }
} }
@ -370,7 +383,7 @@ Loop:
if l.snapf == "" { if l.snapf == "" {
return return
} }
if err := f(); err != nil { if err := runMaintenance(doMaintenance); err != nil {
level.Error(l.logger).Log("msg", "Creating shutdown snapshot failed", "err", err) level.Error(l.logger).Log("msg", "Creating shutdown snapshot failed", "err", err)
} }
} }

View File

@ -18,10 +18,13 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"testing" "testing"
"time" "time"
pb "github.com/prometheus/alertmanager/nflog/nflogpb" pb "github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -123,6 +126,33 @@ func TestLogSnapshot(t *testing.T) {
} }
} }
func TestWithMaintenance_SupportsCustomCallback(t *testing.T) {
f, err := ioutil.TempFile("", "snapshot")
require.NoError(t, err, "creating temp file failed")
stopc := make(chan struct{})
var mtx sync.Mutex
var mc int
l, err := New(WithMetrics(prometheus.NewPedanticRegistry()), WithSnapshot(f.Name()), WithMaintenance(100*time.Millisecond, stopc, nil, func() (int64, error) {
mtx.Lock()
mc++
mtx.Unlock()
return 0, nil
}))
require.NoError(t, err)
go l.run()
time.Sleep(200 * time.Millisecond)
close(stopc)
require.Eventually(t, func() bool {
mtx.Lock()
defer mtx.Unlock()
return mc >= 2
}, 500*time.Millisecond, 100*time.Millisecond)
}
func TestReplaceFile(t *testing.T) { func TestReplaceFile(t *testing.T) {
dir, err := ioutil.TempDir("", "replace_file") dir, err := ioutil.TempDir("", "replace_file")
require.NoError(t, err, "creating temp dir failed") require.NoError(t, err, "creating temp dir failed")

View File

@ -201,6 +201,10 @@ type Silences struct {
mc matcherCache mc matcherCache
} }
// MaintenanceFunc represents the function to run as part of the periodic maintenance for silences.
// It returns the size of the snapshot taken or an error if it failed.
type MaintenanceFunc func() (int64, error)
type metrics struct { type metrics struct {
gcDuration prometheus.Summary gcDuration prometheus.Summary
snapshotDuration prometheus.Summary snapshotDuration prometheus.Summary
@ -349,34 +353,42 @@ func New(o Options) (*Silences, error) {
// Maintenance garbage collects the silence state at the given interval. If the snapshot // Maintenance garbage collects the silence state at the given interval. If the snapshot
// file is set, a snapshot is written to it afterwards. // file is set, a snapshot is written to it afterwards.
// Terminates on receiving from stopc. // Terminates on receiving from stopc.
func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}) { // If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage.
func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}, override MaintenanceFunc) {
t := time.NewTicker(interval) t := time.NewTicker(interval)
defer t.Stop() defer t.Stop()
f := func() error { var doMaintenance MaintenanceFunc
start := s.now() doMaintenance = func() (int64, error) {
var size int64 var size int64
level.Debug(s.logger).Log("msg", "Running maintenance")
defer func() {
level.Debug(s.logger).Log("msg", "Maintenance done", "duration", s.now().Sub(start), "size", size)
s.metrics.snapshotSize.Set(float64(size))
}()
if _, err := s.GC(); err != nil { if _, err := s.GC(); err != nil {
return err return size, err
} }
if snapf == "" { if snapf == "" {
return nil return size, nil
} }
f, err := openReplace(snapf) f, err := openReplace(snapf)
if err != nil { if err != nil {
return err return size, err
} }
if size, err = s.Snapshot(f); err != nil { if size, err = s.Snapshot(f); err != nil {
return err return size, err
} }
return f.Close() return size, f.Close()
}
if doMaintenance != nil {
doMaintenance = override
}
runMaintenance := func(do MaintenanceFunc) error {
start := s.now()
level.Debug(s.logger).Log("msg", "Running maintenance")
size, err := do()
level.Debug(s.logger).Log("msg", "Maintenance done", "duration", s.now().Sub(start), "size", size)
s.metrics.snapshotSize.Set(float64(size))
return err
} }
Loop: Loop:
@ -385,7 +397,7 @@ Loop:
case <-stopc: case <-stopc:
break Loop break Loop
case <-t.C: case <-t.C:
if err := f(); err != nil { if err := runMaintenance(doMaintenance); err != nil {
level.Info(s.logger).Log("msg", "Running maintenance failed", "err", err) level.Info(s.logger).Log("msg", "Running maintenance failed", "err", err)
} }
} }
@ -394,7 +406,7 @@ Loop:
if snapf == "" { if snapf == "" {
return return
} }
if err := f(); err != nil { if err := runMaintenance(doMaintenance); err != nil {
level.Info(s.logger).Log("msg", "Creating shutdown snapshot failed", "err", err) level.Info(s.logger).Log("msg", "Creating shutdown snapshot failed", "err", err)
} }
} }

View File

@ -19,6 +19,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"sort" "sort"
"sync"
"testing" "testing"
"time" "time"
@ -179,6 +180,32 @@ func TestSilencesSnapshot(t *testing.T) {
} }
} }
func TestSilences_Maintenance_SupportsCustomCallback(t *testing.T) {
f, err := ioutil.TempFile("", "snapshot")
require.NoError(t, err, "creating temp file failed")
s := &Silences{st: state{}, logger: log.NewNopLogger(), now: utcNow, metrics: newMetrics(nil, nil)}
stopc := make(chan struct{})
var mtx sync.Mutex
var mc int
go s.Maintenance(100*time.Millisecond, f.Name(), stopc, func() (int64, error) {
mtx.Lock()
mc++
mtx.Unlock()
return 0, nil
})
time.Sleep(200 * time.Millisecond)
close(stopc)
require.Eventually(t, func() bool {
mtx.Lock()
defer mtx.Unlock()
return mc >= 2 // At least, one for the regular schedule and one at shutdown.
}, 500*time.Millisecond, 100*time.Millisecond)
}
func TestSilencesSetSilence(t *testing.T) { func TestSilencesSetSilence(t *testing.T) {
s, err := New(Options{ s, err := New(Options{
Retention: time.Minute, Retention: time.Minute,