mirror of
https://github.com/prometheus/alertmanager
synced 2024-12-27 16:42:14 +00:00
Refactor nflog configuration options to make it similar to Silences. (#3220)
* Refactor nflog configuration options to make it similar to Silences. The Notification Log is a similar component to Silences. They're the only two things that are shared between nodes when running in HA and they both hold some sort of internal state that needs to be cleaned up on an interval. To simplify the code and make it a bit more understandable (among other benefits such as improved testability) - I've refactor the notification log configuration and `run` to be similar to the silences.
This commit is contained in:
parent
0f7d21fd9c
commit
f59460bfd4
@ -279,15 +279,14 @@ func run() int {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
notificationLogOpts := []nflog.Option{
|
||||
nflog.WithRetention(*retention),
|
||||
nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")),
|
||||
nflog.WithMaintenance(*maintenanceInterval, stopc, wg.Done, nil),
|
||||
nflog.WithMetrics(prometheus.DefaultRegisterer),
|
||||
nflog.WithLogger(log.With(logger, "component", "nflog")),
|
||||
notificationLogOpts := nflog.Options{
|
||||
SnapshotFile: filepath.Join(*dataDir, "nflog"),
|
||||
Retention: *retention,
|
||||
Logger: log.With(logger, "component", "nflog"),
|
||||
Metrics: prometheus.DefaultRegisterer,
|
||||
}
|
||||
|
||||
notificationLog, err := nflog.New(notificationLogOpts...)
|
||||
notificationLog, err := nflog.New(notificationLogOpts)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", err)
|
||||
return 1
|
||||
@ -297,6 +296,12 @@ func run() int {
|
||||
notificationLog.SetBroadcast(c.Broadcast)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
notificationLog.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "nflog"), stopc, nil)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
marker := types.NewMarker(prometheus.DefaultRegisterer)
|
||||
|
||||
silenceOpts := silence.Options{
|
||||
|
185
nflog/nflog.go
185
nflog/nflog.go
@ -27,6 +27,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
@ -73,23 +74,19 @@ func QGroupKey(gk string) QueryParam {
|
||||
}
|
||||
}
|
||||
|
||||
// Log holds the notification log state for alerts that have been notified.
|
||||
type Log struct {
|
||||
clock clock.Clock
|
||||
|
||||
logger log.Logger
|
||||
metrics *metrics
|
||||
now func() time.Time
|
||||
retention time.Duration
|
||||
|
||||
runInterval time.Duration
|
||||
snapf string
|
||||
stopc chan struct{}
|
||||
done func()
|
||||
|
||||
// For now we only store the most recently added log entry.
|
||||
// The key is a serialized concatenation of group key and receiver.
|
||||
mtx sync.RWMutex
|
||||
st state
|
||||
broadcast func([]byte)
|
||||
maintenanceOverride MaintenanceFunc
|
||||
mtx sync.RWMutex
|
||||
st state
|
||||
broadcast func([]byte)
|
||||
}
|
||||
|
||||
// MaintenanceFunc represents the function to run as part of the periodic maintenance for the nflog.
|
||||
@ -154,76 +151,6 @@ func newMetrics(r prometheus.Registerer) *metrics {
|
||||
return m
|
||||
}
|
||||
|
||||
// Option configures a new Log implementation.
|
||||
type Option func(*Log) error
|
||||
|
||||
// WithRetention sets the retention time for log st.
|
||||
func WithRetention(d time.Duration) Option {
|
||||
return func(l *Log) error {
|
||||
l.retention = d
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithNow overwrites the function used to retrieve a timestamp
|
||||
// for the current point in time.
|
||||
// This is generally useful for injection during tests.
|
||||
func WithNow(f func() time.Time) Option {
|
||||
return func(l *Log) error {
|
||||
l.now = f
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger configures a logger for the notification log.
|
||||
func WithLogger(logger log.Logger) Option {
|
||||
return func(l *Log) error {
|
||||
l.logger = logger
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithMetrics registers metrics for the notification log.
|
||||
func WithMetrics(r prometheus.Registerer) Option {
|
||||
return func(l *Log) error {
|
||||
l.metrics = newMetrics(r)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaintenance configures the Log to run garbage collection
|
||||
// and snapshotting, if configured, at the given interval.
|
||||
//
|
||||
// The maintenance terminates on receiving from the provided channel.
|
||||
// The done function is called after the final snapshot was completed.
|
||||
// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage.
|
||||
func WithMaintenance(d time.Duration, stopc chan struct{}, done func(), maintenanceOverride MaintenanceFunc) Option {
|
||||
return func(l *Log) error {
|
||||
if d == 0 {
|
||||
return errors.New("maintenance interval must not be 0")
|
||||
}
|
||||
l.runInterval = d
|
||||
l.stopc = stopc
|
||||
l.done = done
|
||||
l.maintenanceOverride = maintenanceOverride
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithSnapshot configures the log to be initialized from a given snapshot file.
|
||||
// If maintenance is configured, a snapshot will be saved periodically and on
|
||||
// shutdown as well.
|
||||
func WithSnapshot(sf string) Option {
|
||||
return func(l *Log) error {
|
||||
l.snapf = sf
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func utcNow() time.Time {
|
||||
return time.Now().UTC()
|
||||
}
|
||||
|
||||
type state map[string]*pb.MeshEntry
|
||||
|
||||
func (s state) clone() state {
|
||||
@ -289,48 +216,80 @@ func marshalMeshEntry(e *pb.MeshEntry) ([]byte, error) {
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// Options configures a new Log implementation.
|
||||
type Options struct {
|
||||
SnapshotReader io.Reader
|
||||
SnapshotFile string
|
||||
|
||||
Retention time.Duration
|
||||
|
||||
Logger log.Logger
|
||||
Metrics prometheus.Registerer
|
||||
}
|
||||
|
||||
func (o *Options) validate() error {
|
||||
if o.SnapshotFile != "" && o.SnapshotReader != nil {
|
||||
return errors.New("only one of SnapshotFile and SnapshotReader must be set")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// New creates a new notification log based on the provided options.
|
||||
// The snapshot is loaded into the Log if it is set.
|
||||
func New(opts ...Option) (*Log, error) {
|
||||
func New(o Options) (*Log, error) {
|
||||
if err := o.validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l := &Log{
|
||||
clock: clock.New(),
|
||||
retention: o.Retention,
|
||||
logger: log.NewNopLogger(),
|
||||
now: utcNow,
|
||||
st: state{},
|
||||
broadcast: func([]byte) {},
|
||||
}
|
||||
for _, o := range opts {
|
||||
if err := o(l); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if l.metrics == nil {
|
||||
l.metrics = newMetrics(nil)
|
||||
metrics: newMetrics(o.Metrics),
|
||||
}
|
||||
|
||||
if l.snapf != "" {
|
||||
if f, err := os.Open(l.snapf); !os.IsNotExist(err) {
|
||||
if err != nil {
|
||||
return l, err
|
||||
}
|
||||
defer f.Close()
|
||||
if o.Logger != nil {
|
||||
l.logger = o.Logger
|
||||
}
|
||||
|
||||
if err := l.loadSnapshot(f); err != nil {
|
||||
return l, err
|
||||
if o.SnapshotFile != "" {
|
||||
if r, err := os.Open(o.SnapshotFile); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
level.Debug(l.logger).Log("msg", "notification log snapshot file doesn't exist", "err", err)
|
||||
} else {
|
||||
o.SnapshotReader = r
|
||||
defer r.Close()
|
||||
}
|
||||
}
|
||||
|
||||
go l.run()
|
||||
if o.SnapshotReader != nil {
|
||||
if err := l.loadSnapshot(o.SnapshotReader); err != nil {
|
||||
return l, err
|
||||
}
|
||||
}
|
||||
|
||||
return l, nil
|
||||
}
|
||||
|
||||
// run periodic background maintenance.
|
||||
func (l *Log) run() {
|
||||
if l.runInterval == 0 || l.stopc == nil {
|
||||
func (l *Log) now() time.Time {
|
||||
return l.clock.Now()
|
||||
}
|
||||
|
||||
// Maintenance garbage collects the notification log state at the given interval. If the snapshot
|
||||
// file is set, a snapshot is written to it afterwards.
|
||||
// Terminates on receiving from stopc.
|
||||
// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage.
|
||||
func (l *Log) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}, override MaintenanceFunc) {
|
||||
if interval == 0 || stopc == nil {
|
||||
level.Error(l.logger).Log("msg", "interval or stop signal are missing - not running maintenance")
|
||||
return
|
||||
}
|
||||
t := time.NewTicker(l.runInterval)
|
||||
t := l.clock.Ticker(interval)
|
||||
defer t.Stop()
|
||||
|
||||
var doMaintenance MaintenanceFunc
|
||||
@ -339,29 +298,26 @@ func (l *Log) run() {
|
||||
if _, err := l.GC(); err != nil {
|
||||
return size, err
|
||||
}
|
||||
if l.snapf == "" {
|
||||
if snapf == "" {
|
||||
return size, nil
|
||||
}
|
||||
f, err := openReplace(l.snapf)
|
||||
f, err := openReplace(snapf)
|
||||
if err != nil {
|
||||
return size, err
|
||||
}
|
||||
if size, err = l.Snapshot(f); err != nil {
|
||||
f.Close()
|
||||
return size, err
|
||||
}
|
||||
return size, f.Close()
|
||||
}
|
||||
|
||||
if l.maintenanceOverride != nil {
|
||||
doMaintenance = l.maintenanceOverride
|
||||
}
|
||||
|
||||
if l.done != nil {
|
||||
defer l.done()
|
||||
if override != nil {
|
||||
doMaintenance = override
|
||||
}
|
||||
|
||||
runMaintenance := func(do func() (int64, error)) error {
|
||||
start := l.now()
|
||||
start := l.now().UTC()
|
||||
level.Debug(l.logger).Log("msg", "Running maintenance")
|
||||
size, err := do()
|
||||
level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size)
|
||||
@ -372,7 +328,7 @@ func (l *Log) run() {
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-l.stopc:
|
||||
case <-stopc:
|
||||
break Loop
|
||||
case <-t.C:
|
||||
if err := runMaintenance(doMaintenance); err != nil {
|
||||
@ -380,8 +336,9 @@ Loop:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// No need to run final maintenance if we don't want to snapshot.
|
||||
if l.snapf == "" {
|
||||
if snapf == "" {
|
||||
return
|
||||
}
|
||||
if err := runMaintenance(doMaintenance); err != nil {
|
||||
|
@ -18,18 +18,21 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "github.com/prometheus/alertmanager/nflog/nflogpb"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLogGC(t *testing.T) {
|
||||
now := utcNow()
|
||||
mockClock := clock.NewMock()
|
||||
now := mockClock.Now()
|
||||
// We only care about key names and expiration timestamps.
|
||||
newEntry := func(ts time.Time) *pb.MeshEntry {
|
||||
return &pb.MeshEntry{
|
||||
@ -43,7 +46,7 @@ func TestLogGC(t *testing.T) {
|
||||
"a2": newEntry(now.Add(time.Second)),
|
||||
"a3": newEntry(now.Add(-time.Second)),
|
||||
},
|
||||
now: func() time.Time { return now },
|
||||
clock: mockClock,
|
||||
metrics: newMetrics(nil),
|
||||
}
|
||||
n, err := l.GC()
|
||||
@ -58,7 +61,8 @@ func TestLogGC(t *testing.T) {
|
||||
|
||||
func TestLogSnapshot(t *testing.T) {
|
||||
// Check whether storing and loading the snapshot is symmetric.
|
||||
now := utcNow()
|
||||
mockClock := clock.NewMock()
|
||||
now := mockClock.Now().UTC()
|
||||
|
||||
cases := []struct {
|
||||
entries []*pb.MeshEntry
|
||||
@ -133,24 +137,31 @@ func TestWithMaintenance_SupportsCustomCallback(t *testing.T) {
|
||||
stopc := make(chan struct{})
|
||||
var mtx sync.Mutex
|
||||
var mc int
|
||||
l, err := New(WithMetrics(prometheus.NewPedanticRegistry()), WithSnapshot(f.Name()), WithMaintenance(100*time.Millisecond, stopc, nil, func() (int64, error) {
|
||||
opts := Options{
|
||||
Metrics: prometheus.NewPedanticRegistry(),
|
||||
SnapshotFile: f.Name(),
|
||||
}
|
||||
|
||||
l, err := New(opts)
|
||||
mockClock := clock.NewMock()
|
||||
l.clock = mockClock
|
||||
require.NoError(t, err)
|
||||
|
||||
go l.Maintenance(100*time.Millisecond, f.Name(), stopc, func() (int64, error) {
|
||||
mtx.Lock()
|
||||
mc++
|
||||
mtx.Unlock()
|
||||
|
||||
return 0, nil
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
})
|
||||
runtime.Gosched() // ensure that the ticker is running.
|
||||
|
||||
go l.run()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
mockClock.Add(200 * time.Millisecond)
|
||||
close(stopc)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
mtx.Lock()
|
||||
defer mtx.Unlock()
|
||||
return mc >= 2
|
||||
}, 500*time.Millisecond, 100*time.Millisecond)
|
||||
mtx.Lock()
|
||||
defer mtx.Unlock()
|
||||
require.Equal(t, 2, mc)
|
||||
}
|
||||
|
||||
func TestReplaceFile(t *testing.T) {
|
||||
@ -182,7 +193,8 @@ func TestReplaceFile(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStateMerge(t *testing.T) {
|
||||
now := utcNow()
|
||||
mockClock := clock.NewMock()
|
||||
now := mockClock.Now()
|
||||
|
||||
// We only care about key names and timestamps for the
|
||||
// merging logic.
|
||||
@ -243,7 +255,8 @@ func TestStateMerge(t *testing.T) {
|
||||
|
||||
func TestStateDataCoding(t *testing.T) {
|
||||
// Check whether encoding and decoding the data is symmetric.
|
||||
now := utcNow()
|
||||
mockClock := clock.NewMock()
|
||||
now := mockClock.Now().UTC()
|
||||
|
||||
cases := []struct {
|
||||
entries []*pb.MeshEntry
|
||||
@ -299,7 +312,8 @@ func TestStateDataCoding(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestQuery(t *testing.T) {
|
||||
nl, err := New(WithRetention(time.Second))
|
||||
opts := Options{Retention: time.Second}
|
||||
nl, err := New(opts)
|
||||
if err != nil {
|
||||
require.NoError(t, err, "constructing nflog failed")
|
||||
}
|
||||
|
@ -319,16 +319,6 @@ func New(o Options) (*Silences, error) {
|
||||
if err := o.validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if o.SnapshotFile != "" {
|
||||
if r, err := os.Open(o.SnapshotFile); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
o.SnapshotReader = r
|
||||
defer r.Close()
|
||||
}
|
||||
}
|
||||
s := &Silences{
|
||||
clock: clock.New(),
|
||||
mc: matcherCache{},
|
||||
@ -342,6 +332,19 @@ func New(o Options) (*Silences, error) {
|
||||
if o.Logger != nil {
|
||||
s.logger = o.Logger
|
||||
}
|
||||
|
||||
if o.SnapshotFile != "" {
|
||||
if r, err := os.Open(o.SnapshotFile); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
level.Debug(s.logger).Log("msg", "silences snapshot file doesn't exist", "err", err)
|
||||
} else {
|
||||
o.SnapshotReader = r
|
||||
defer r.Close()
|
||||
}
|
||||
}
|
||||
|
||||
if o.SnapshotReader != nil {
|
||||
if err := s.loadSnapshot(o.SnapshotReader); err != nil {
|
||||
return s, err
|
||||
@ -359,6 +362,10 @@ func (s *Silences) nowUTC() time.Time {
|
||||
// Terminates on receiving from stopc.
|
||||
// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage.
|
||||
func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}, override MaintenanceFunc) {
|
||||
if interval == 0 || stopc == nil {
|
||||
level.Error(s.logger).Log("msg", "interval or stop signal are missing - not running maintenance")
|
||||
return
|
||||
}
|
||||
t := s.clock.Ticker(interval)
|
||||
defer t.Stop()
|
||||
|
||||
@ -377,6 +384,7 @@ func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-cha
|
||||
return size, err
|
||||
}
|
||||
if size, err = s.Snapshot(f); err != nil {
|
||||
f.Close()
|
||||
return size, err
|
||||
}
|
||||
return size, f.Close()
|
||||
@ -406,6 +414,7 @@ Loop:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// No need for final maintenance if we don't want to snapshot.
|
||||
if snapf == "" {
|
||||
return
|
||||
|
Loading…
Reference in New Issue
Block a user