diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index ce1879af..bd6b273e 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -120,6 +120,7 @@ func main() { nflog.WithRetention(*retention), nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")), nflog.WithMaintenance(15*time.Minute, stopc, wg.Done), + nflog.WithMetrics(prometheus.DefaultRegisterer), nflog.WithLogger(logger.With("component", "nflog")), ) if err != nil { diff --git a/nflog/nflog.go b/nflog/nflog.go index 07e37fa4..18e8c1e8 100644 --- a/nflog/nflog.go +++ b/nflog/nflog.go @@ -30,6 +30,7 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/matttproud/golang_protobuf_extensions/pbutil" pb "github.com/prometheus/alertmanager/nflog/nflogpb" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/weaveworks/mesh" ) @@ -94,6 +95,7 @@ func QGroupKey(gk []byte) QueryParam { type nlog struct { logger log.Logger + metrics *metrics now func() time.Time retention time.Duration @@ -113,6 +115,50 @@ type nlog struct { st gossipData } +type metrics struct { + gcDuration prometheus.Summary + snapshotDuration prometheus.Summary + queriesTotal prometheus.Counter + queryErrorsTotal prometheus.Counter + queryDuration prometheus.Histogram +} + +func newMetrics(r prometheus.Registerer) *metrics { + m := &metrics{} + + m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "alertmanager_nflog_gc_duration_seconds", + Help: "Duration of the last notification log garbage collection cycle.", + }) + m.snapshotDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "alertmanager_nflog_snapshot_duration_seconds", + Help: "Duration of the last notification log snapshot.", + }) + m.queriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_nflog_queries_total", + Help: "Number of notification log queries were received.", + }) + m.queryErrorsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_nflog_query_errors_total", + Help: "Number notification log received queries that failed.", + }) + m.queryDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "alertmanager_nflog_query_duration_seconds", + Help: "Duration of notification log query evaluation.", + }) + + if r != nil { + r.MustRegister( + m.gcDuration, + m.snapshotDuration, + m.queriesTotal, + m.queryErrorsTotal, + m.queryDuration, + ) + } + return m +} + // Option configures a new Log implementation. type Option func(*nlog) error @@ -151,6 +197,14 @@ func WithLogger(logger log.Logger) Option { } } +// WithMetrics registers metrics for the notification log. +func WithMetrics(r prometheus.Registerer) Option { + return func(l *nlog) error { + l.metrics = newMetrics(r) + return nil + } +} + // WithMaintenance configures the Log to run garbage collection // and snapshotting, if configured, at the given interval. // @@ -195,6 +249,10 @@ func New(opts ...Option) (Log, error) { return nil, err } } + if l.metrics == nil { + l.metrics = newMetrics(nil) + } + if l.snapf != "" { if f, err := os.Open(l.snapf); !os.IsNotExist(err) { if err != nil { @@ -207,6 +265,7 @@ func New(opts ...Option) (Log, error) { } } } + go l.run() return l, nil @@ -331,6 +390,9 @@ func (l *nlog) log(r *pb.Receiver, gkey, ghash []byte, resolved bool) error { // GC implements the Log interface. func (l *nlog) GC() (int, error) { + start := time.Now() + defer func() { l.metrics.gcDuration.Observe(time.Since(start).Seconds()) }() + now := l.now() var n int @@ -351,27 +413,37 @@ func (l *nlog) GC() (int, error) { // Query implements the Log interface. func (l *nlog) Query(params ...QueryParam) ([]*pb.Entry, error) { - q := &query{} - for _, p := range params { - if err := p(q); err != nil { - return nil, err + start := time.Now() + l.metrics.queriesTotal.Inc() + + entries, err := func() ([]*pb.Entry, error) { + q := &query{} + for _, p := range params { + if err := p(q); err != nil { + return nil, err + } + } + // TODO(fabxc): For now our only query mode is the most recent entry for a + // receiver/group_key combination. + if q.recv == nil || q.groupKey == nil { + // TODO(fabxc): allow more complex queries in the future. + // How to enable pagination? + return nil, errors.New("no query parameters specified") } - } - // TODO(fabxc): For now our only query mode is the most recent entry for a - // receiver/group_key combination. - if q.recv == nil || q.groupKey == nil { - // TODO(fabxc): allow more complex queries in the future. - // How to enable pagination? - return nil, errors.New("no query parameters specified") - } - l.mtx.RLock() - defer l.mtx.RUnlock() + l.mtx.RLock() + defer l.mtx.RUnlock() - if le, ok := l.st[stateKey(q.groupKey, q.recv)]; ok { - return []*pb.Entry{le.Entry}, nil + if le, ok := l.st[stateKey(q.groupKey, q.recv)]; ok { + return []*pb.Entry{le.Entry}, nil + } + return nil, ErrNotFound + }() + if err != nil { + l.metrics.queryErrorsTotal.Inc() } - return nil, ErrNotFound + l.metrics.queryDuration.Observe(time.Since(start).Seconds()) + return entries, err } // loadSnapshot loads a snapshot generated by Snapshot() into the state. @@ -398,6 +470,9 @@ func (l *nlog) loadSnapshot(r io.Reader) error { // Snapshot implements the Log interface. func (l *nlog) Snapshot(w io.Writer) (int, error) { + start := time.Now() + defer func() { l.metrics.snapshotDuration.Observe(time.Since(start).Seconds()) }() + l.mtx.RLock() defer l.mtx.RUnlock() diff --git a/nflog/nflog_test.go b/nflog/nflog_test.go index 4886a4ff..2addd05c 100644 --- a/nflog/nflog_test.go +++ b/nflog/nflog_test.go @@ -41,7 +41,8 @@ func TestNlogGC(t *testing.T) { "a2": newEntry(now.Add(time.Second)), "a3": newEntry(now.Add(-time.Second)), }, - now: func() time.Time { return now }, + now: func() time.Time { return now }, + metrics: newMetrics(nil), } n, err := l.GC() require.NoError(t, err, "unexpected error in garbage collection") @@ -98,7 +99,10 @@ func TestNlogSnapshot(t *testing.T) { f, err := ioutil.TempFile("", "snapshot") require.NoError(t, err, "creating temp file failed") - l1 := &nlog{st: gossipData{}} + l1 := &nlog{ + st: gossipData{}, + metrics: newMetrics(nil), + } // Setup internal state manually. for _, e := range c.entries { l1.st[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = e