diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index ab51a741..cf2e3b6d 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -267,7 +267,7 @@ func run() int { notificationLogOpts := []nflog.Option{ nflog.WithRetention(*retention), nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")), - nflog.WithMaintenance(15*time.Minute, stopc, wg.Done), + nflog.WithMaintenance(15*time.Minute, stopc, wg.Done, nil), nflog.WithMetrics(prometheus.DefaultRegisterer), nflog.WithLogger(log.With(logger, "component", "nflog")), } @@ -304,7 +304,7 @@ func run() int { // Start providers before router potentially sends updates. wg.Add(1) go func() { - silences.Maintenance(15*time.Minute, filepath.Join(*dataDir, "silences"), stopc) + silences.Maintenance(15*time.Minute, filepath.Join(*dataDir, "silences"), stopc, nil) wg.Done() }() diff --git a/nflog/nflog.go b/nflog/nflog.go index 5fe34f29..be2b47db 100644 --- a/nflog/nflog.go +++ b/nflog/nflog.go @@ -85,11 +85,16 @@ type Log struct { // For now we only store the most recently added log entry. // The key is a serialized concatenation of group key and receiver. - mtx sync.RWMutex - st state - broadcast func([]byte) + mtx sync.RWMutex + st state + broadcast func([]byte) + maintenanceOverride MaintenanceFunc } +// MaintenanceFunc represents the function to run as part of the periodic maintenance for the nflog. +// It returns the size of the snapshot taken or an error if it failed. +type MaintenanceFunc func() (int64, error) + type metrics struct { gcDuration prometheus.Summary snapshotDuration prometheus.Summary @@ -190,7 +195,8 @@ func WithMetrics(r prometheus.Registerer) Option { // // The maintenance terminates on receiving from the provided channel. // The done function is called after the final snapshot was completed. -func WithMaintenance(d time.Duration, stopc chan struct{}, done func()) Option { +// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage. +func WithMaintenance(d time.Duration, stopc chan struct{}, done func(), maintenanceOverride MaintenanceFunc) Option { return func(l *Log) error { if d == 0 { return errors.New("maintenance interval must not be 0") @@ -198,6 +204,7 @@ func WithMaintenance(d time.Duration, stopc chan struct{}, done func()) Option { l.runInterval = d l.stopc = stopc l.done = done + l.maintenanceOverride = maintenanceOverride return nil } } @@ -325,34 +332,40 @@ func (l *Log) run() { t := time.NewTicker(l.runInterval) defer t.Stop() + var doMaintenance MaintenanceFunc + doMaintenance = func() (int64, error) { + var size int64 + if _, err := l.GC(); err != nil { + return size, err + } + if l.snapf == "" { + return size, nil + } + f, err := openReplace(l.snapf) + if err != nil { + return size, err + } + if size, err = l.Snapshot(f); err != nil { + return size, err + } + return size, f.Close() + } + + if l.maintenanceOverride != nil { + doMaintenance = l.maintenanceOverride + } + if l.done != nil { defer l.done() } - f := func() error { + runMaintenance := func(do func() (int64, error)) error { start := l.now() - var size int64 - level.Debug(l.logger).Log("msg", "Running maintenance") - defer func() { - level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size) - l.metrics.snapshotSize.Set(float64(size)) - }() - - if _, err := l.GC(); err != nil { - return err - } - if l.snapf == "" { - return nil - } - f, err := openReplace(l.snapf) - if err != nil { - return err - } - if size, err = l.Snapshot(f); err != nil { - return err - } - return f.Close() + size, err := do() + level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size) + l.metrics.snapshotSize.Set(float64(size)) + return err } Loop: @@ -361,7 +374,7 @@ Loop: case <-l.stopc: break Loop case <-t.C: - if err := f(); err != nil { + if err := runMaintenance(doMaintenance); err != nil { level.Error(l.logger).Log("msg", "Running maintenance failed", "err", err) } } @@ -370,7 +383,7 @@ Loop: if l.snapf == "" { return } - if err := f(); err != nil { + if err := runMaintenance(doMaintenance); err != nil { level.Error(l.logger).Log("msg", "Creating shutdown snapshot failed", "err", err) } } diff --git a/nflog/nflog_test.go b/nflog/nflog_test.go index eb0fdac6..97c208da 100644 --- a/nflog/nflog_test.go +++ b/nflog/nflog_test.go @@ -18,10 +18,13 @@ import ( "io/ioutil" "os" "path/filepath" + "sync" "testing" "time" pb "github.com/prometheus/alertmanager/nflog/nflogpb" + + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -123,6 +126,33 @@ func TestLogSnapshot(t *testing.T) { } } +func TestWithMaintenance_SupportsCustomCallback(t *testing.T) { + f, err := ioutil.TempFile("", "snapshot") + require.NoError(t, err, "creating temp file failed") + + stopc := make(chan struct{}) + var mtx sync.Mutex + var mc int + l, err := New(WithMetrics(prometheus.NewPedanticRegistry()), WithSnapshot(f.Name()), WithMaintenance(100*time.Millisecond, stopc, nil, func() (int64, error) { + mtx.Lock() + mc++ + mtx.Unlock() + + return 0, nil + })) + require.NoError(t, err) + + go l.run() + time.Sleep(200 * time.Millisecond) + close(stopc) + + require.Eventually(t, func() bool { + mtx.Lock() + defer mtx.Unlock() + return mc >= 2 + }, 500*time.Millisecond, 100*time.Millisecond) +} + func TestReplaceFile(t *testing.T) { dir, err := ioutil.TempDir("", "replace_file") require.NoError(t, err, "creating temp dir failed") diff --git a/silence/silence.go b/silence/silence.go index 643e1de7..c61dac50 100644 --- a/silence/silence.go +++ b/silence/silence.go @@ -201,6 +201,10 @@ type Silences struct { mc matcherCache } +// MaintenanceFunc represents the function to run as part of the periodic maintenance for silences. +// It returns the size of the snapshot taken or an error if it failed. +type MaintenanceFunc func() (int64, error) + type metrics struct { gcDuration prometheus.Summary snapshotDuration prometheus.Summary @@ -349,34 +353,42 @@ func New(o Options) (*Silences, error) { // Maintenance garbage collects the silence state at the given interval. If the snapshot // file is set, a snapshot is written to it afterwards. // Terminates on receiving from stopc. -func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}) { +// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage. +func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}, override MaintenanceFunc) { t := time.NewTicker(interval) defer t.Stop() - f := func() error { - start := s.now() + var doMaintenance MaintenanceFunc + doMaintenance = func() (int64, error) { var size int64 - level.Debug(s.logger).Log("msg", "Running maintenance") - defer func() { - level.Debug(s.logger).Log("msg", "Maintenance done", "duration", s.now().Sub(start), "size", size) - s.metrics.snapshotSize.Set(float64(size)) - }() - if _, err := s.GC(); err != nil { - return err + return size, err } if snapf == "" { - return nil + return size, nil } f, err := openReplace(snapf) if err != nil { - return err + return size, err } if size, err = s.Snapshot(f); err != nil { - return err + return size, err } - return f.Close() + return size, f.Close() + } + + if doMaintenance != nil { + doMaintenance = override + } + + runMaintenance := func(do MaintenanceFunc) error { + start := s.now() + level.Debug(s.logger).Log("msg", "Running maintenance") + size, err := do() + level.Debug(s.logger).Log("msg", "Maintenance done", "duration", s.now().Sub(start), "size", size) + s.metrics.snapshotSize.Set(float64(size)) + return err } Loop: @@ -385,7 +397,7 @@ Loop: case <-stopc: break Loop case <-t.C: - if err := f(); err != nil { + if err := runMaintenance(doMaintenance); err != nil { level.Info(s.logger).Log("msg", "Running maintenance failed", "err", err) } } @@ -394,7 +406,7 @@ Loop: if snapf == "" { return } - if err := f(); err != nil { + if err := runMaintenance(doMaintenance); err != nil { level.Info(s.logger).Log("msg", "Creating shutdown snapshot failed", "err", err) } } diff --git a/silence/silence_test.go b/silence/silence_test.go index 60758793..1e86a9b8 100644 --- a/silence/silence_test.go +++ b/silence/silence_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "os" "sort" + "sync" "testing" "time" @@ -179,6 +180,32 @@ func TestSilencesSnapshot(t *testing.T) { } } +func TestSilences_Maintenance_SupportsCustomCallback(t *testing.T) { + f, err := ioutil.TempFile("", "snapshot") + require.NoError(t, err, "creating temp file failed") + s := &Silences{st: state{}, logger: log.NewNopLogger(), now: utcNow, metrics: newMetrics(nil, nil)} + stopc := make(chan struct{}) + var mtx sync.Mutex + var mc int + + go s.Maintenance(100*time.Millisecond, f.Name(), stopc, func() (int64, error) { + mtx.Lock() + mc++ + mtx.Unlock() + + return 0, nil + }) + + time.Sleep(200 * time.Millisecond) + close(stopc) + + require.Eventually(t, func() bool { + mtx.Lock() + defer mtx.Unlock() + return mc >= 2 // At least, one for the regular schedule and one at shutdown. + }, 500*time.Millisecond, 100*time.Millisecond) +} + func TestSilencesSetSilence(t *testing.T) { s, err := New(Options{ Retention: time.Minute,