Metrics: Notification log maintenance success and failure (#3286)

* Metrics: Notification log maintenance success and failure

Due to various reasons, we've observed different kind of errors on this area. From read-only disks to silly code bugs. Errors during maintenance are effectively a data loss and therefore, we should encourage proper monitoring of this area.

Similar to #3285
---------

Signed-off-by: gotjosh <josue.abreu@gmail.com>
This commit is contained in:
gotjosh 2023-03-08 10:29:05 +00:00 committed by GitHub
parent 72b0cb0f40
commit 28925efbd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 20 deletions

View File

@ -101,6 +101,8 @@ type metrics struct {
queryErrorsTotal prometheus.Counter queryErrorsTotal prometheus.Counter
queryDuration prometheus.Histogram queryDuration prometheus.Histogram
propagatedMessagesTotal prometheus.Counter propagatedMessagesTotal prometheus.Counter
maintenanceTotal prometheus.Counter
maintenanceErrorsTotal prometheus.Counter
} }
func newMetrics(r prometheus.Registerer) *metrics { func newMetrics(r prometheus.Registerer) *metrics {
@ -120,6 +122,14 @@ func newMetrics(r prometheus.Registerer) *metrics {
Name: "alertmanager_nflog_snapshot_size_bytes", Name: "alertmanager_nflog_snapshot_size_bytes",
Help: "Size of the last notification log snapshot in bytes.", Help: "Size of the last notification log snapshot in bytes.",
}) })
m.maintenanceTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_nflog_maintenance_total",
Help: "How many maintenances were executed for the notification log.",
})
m.maintenanceErrorsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_nflog_maintenance_errors_total",
Help: "How many maintenances were executed for the notification log that failed.",
})
m.queriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.queriesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_nflog_queries_total", Name: "alertmanager_nflog_queries_total",
Help: "Number of notification log queries were received.", Help: "Number of notification log queries were received.",
@ -146,6 +156,8 @@ func newMetrics(r prometheus.Registerer) *metrics {
m.queryErrorsTotal, m.queryErrorsTotal,
m.queryDuration, m.queryDuration,
m.propagatedMessagesTotal, m.propagatedMessagesTotal,
m.maintenanceTotal,
m.maintenanceErrorsTotal,
) )
} }
return m return m
@ -317,13 +329,18 @@ func (l *Log) Maintenance(interval time.Duration, snapf string, stopc <-chan str
} }
runMaintenance := func(do func() (int64, error)) error { runMaintenance := func(do func() (int64, error)) error {
l.metrics.maintenanceTotal.Inc()
start := l.now().UTC() start := l.now().UTC()
level.Debug(l.logger).Log("msg", "Running maintenance") level.Debug(l.logger).Log("msg", "Running maintenance")
size, err := do() size, err := do()
level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size)
l.metrics.snapshotSize.Set(float64(size)) l.metrics.snapshotSize.Set(float64(size))
if err != nil {
l.metrics.maintenanceErrorsTotal.Inc()
return err return err
} }
level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size)
return nil
}
Loop: Loop:
for { for {

View File

@ -18,7 +18,6 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -27,7 +26,9 @@ import (
"github.com/benbjohnson/clock" "github.com/benbjohnson/clock"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/atomic"
) )
func TestLogGC(t *testing.T) { func TestLogGC(t *testing.T) {
@ -133,35 +134,53 @@ func TestLogSnapshot(t *testing.T) {
func TestWithMaintenance_SupportsCustomCallback(t *testing.T) { func TestWithMaintenance_SupportsCustomCallback(t *testing.T) {
f, err := os.CreateTemp("", "snapshot") f, err := os.CreateTemp("", "snapshot")
require.NoError(t, err, "creating temp file failed") require.NoError(t, err, "creating temp file failed")
stopc := make(chan struct{}) stopc := make(chan struct{})
var mtx sync.Mutex reg := prometheus.NewPedanticRegistry()
var mc int
opts := Options{ opts := Options{
Metrics: prometheus.NewPedanticRegistry(), Metrics: reg,
SnapshotFile: f.Name(), SnapshotFile: f.Name(),
} }
l, err := New(opts) l, err := New(opts)
mockClock := clock.NewMock() clock := clock.NewMock()
l.clock = mockClock l.clock = clock
require.NoError(t, err) require.NoError(t, err)
go l.Maintenance(100*time.Millisecond, f.Name(), stopc, func() (int64, error) { var calls atomic.Int32
mtx.Lock() var wg sync.WaitGroup
mc++
mtx.Unlock()
wg.Add(1)
go func() {
defer wg.Done()
l.Maintenance(100*time.Millisecond, f.Name(), stopc, func() (int64, error) {
calls.Add(1)
return 0, nil return 0, nil
}) })
runtime.Gosched() // ensure that the ticker is running. }()
gosched()
mockClock.Add(200 * time.Millisecond) // Before the first tick, no maintenance executed.
clock.Add(99 * time.Millisecond)
require.EqualValues(t, 0, calls.Load())
// Tick once.
clock.Add(1 * time.Millisecond)
require.EqualValues(t, 1, calls.Load())
// Stop the maintenance loop. We should get exactly one more execution of the maintenance func.
close(stopc) close(stopc)
wg.Wait()
mtx.Lock() require.EqualValues(t, 2, calls.Load())
defer mtx.Unlock() // Check the maintenance metrics.
require.Equal(t, 2, mc) require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP alertmanager_nflog_maintenance_errors_total How many maintenances were executed for the notification log that failed.
# TYPE alertmanager_nflog_maintenance_errors_total counter
alertmanager_nflog_maintenance_errors_total 0
# HELP alertmanager_nflog_maintenance_total How many maintenances were executed for the notification log.
# TYPE alertmanager_nflog_maintenance_total counter
alertmanager_nflog_maintenance_total 2
`), "alertmanager_nflog_maintenance_total", "alertmanager_nflog_maintenance_errors_total"))
} }
func TestReplaceFile(t *testing.T) { func TestReplaceFile(t *testing.T) {
@ -356,3 +375,9 @@ func TestStateDecodingError(t *testing.T) {
_, err = decodeState(bytes.NewReader(msg)) _, err = decodeState(bytes.NewReader(msg))
require.Equal(t, ErrInvalidState, err) require.Equal(t, ErrInvalidState, err)
} }
// runtime.Gosched() does not "suspend" the current goroutine so there's no guarantee that the main goroutine won't
// be able to continue. For more see https://pkg.go.dev/runtime#Gosched.
func gosched() {
time.Sleep(1 * time.Millisecond)
}