nflog: add metrics (#518)
This commit is contained in:
parent
b6851a5421
commit
1e01b2bdba
|
@ -120,6 +120,7 @@ func main() {
|
|||
nflog.WithRetention(*retention),
|
||||
nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")),
|
||||
nflog.WithMaintenance(15*time.Minute, stopc, wg.Done),
|
||||
nflog.WithMetrics(prometheus.DefaultRegisterer),
|
||||
nflog.WithLogger(logger.With("component", "nflog")),
|
||||
)
|
||||
if err != nil {
|
||||
|
|
109
nflog/nflog.go
109
nflog/nflog.go
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
pb "github.com/prometheus/alertmanager/nflog/nflogpb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/weaveworks/mesh"
|
||||
)
|
||||
|
@ -94,6 +95,7 @@ func QGroupKey(gk []byte) QueryParam {
|
|||
|
||||
type nlog struct {
|
||||
logger log.Logger
|
||||
metrics *metrics
|
||||
now func() time.Time
|
||||
retention time.Duration
|
||||
|
||||
|
@ -113,6 +115,50 @@ type nlog struct {
|
|||
st gossipData
|
||||
}
|
||||
|
||||
type metrics struct {
|
||||
gcDuration prometheus.Summary
|
||||
snapshotDuration prometheus.Summary
|
||||
queriesTotal prometheus.Counter
|
||||
queryErrorsTotal prometheus.Counter
|
||||
queryDuration prometheus.Histogram
|
||||
}
|
||||
|
||||
func newMetrics(r prometheus.Registerer) *metrics {
|
||||
m := &metrics{}
|
||||
|
||||
m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Name: "alertmanager_nflog_gc_duration_seconds",
|
||||
Help: "Duration of the last notification log garbage collection cycle.",
|
||||
})
|
||||
m.snapshotDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Name: "alertmanager_nflog_snapshot_duration_seconds",
|
||||
Help: "Duration of the last notification log snapshot.",
|
||||
})
|
||||
m.queriesTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "alertmanager_nflog_queries_total",
|
||||
Help: "Number of notification log queries were received.",
|
||||
})
|
||||
m.queryErrorsTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "alertmanager_nflog_query_errors_total",
|
||||
Help: "Number notification log received queries that failed.",
|
||||
})
|
||||
m.queryDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "alertmanager_nflog_query_duration_seconds",
|
||||
Help: "Duration of notification log query evaluation.",
|
||||
})
|
||||
|
||||
if r != nil {
|
||||
r.MustRegister(
|
||||
m.gcDuration,
|
||||
m.snapshotDuration,
|
||||
m.queriesTotal,
|
||||
m.queryErrorsTotal,
|
||||
m.queryDuration,
|
||||
)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// Option configures a new Log implementation.
|
||||
type Option func(*nlog) error
|
||||
|
||||
|
@ -151,6 +197,14 @@ func WithLogger(logger log.Logger) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithMetrics registers metrics for the notification log.
|
||||
func WithMetrics(r prometheus.Registerer) Option {
|
||||
return func(l *nlog) error {
|
||||
l.metrics = newMetrics(r)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaintenance configures the Log to run garbage collection
|
||||
// and snapshotting, if configured, at the given interval.
|
||||
//
|
||||
|
@ -195,6 +249,10 @@ func New(opts ...Option) (Log, error) {
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
if l.metrics == nil {
|
||||
l.metrics = newMetrics(nil)
|
||||
}
|
||||
|
||||
if l.snapf != "" {
|
||||
if f, err := os.Open(l.snapf); !os.IsNotExist(err) {
|
||||
if err != nil {
|
||||
|
@ -207,6 +265,7 @@ func New(opts ...Option) (Log, error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
go l.run()
|
||||
|
||||
return l, nil
|
||||
|
@ -331,6 +390,9 @@ func (l *nlog) log(r *pb.Receiver, gkey, ghash []byte, resolved bool) error {
|
|||
|
||||
// GC implements the Log interface.
|
||||
func (l *nlog) GC() (int, error) {
|
||||
start := time.Now()
|
||||
defer func() { l.metrics.gcDuration.Observe(time.Since(start).Seconds()) }()
|
||||
|
||||
now := l.now()
|
||||
var n int
|
||||
|
||||
|
@ -351,27 +413,37 @@ func (l *nlog) GC() (int, error) {
|
|||
|
||||
// Query implements the Log interface.
|
||||
func (l *nlog) Query(params ...QueryParam) ([]*pb.Entry, error) {
|
||||
q := &query{}
|
||||
for _, p := range params {
|
||||
if err := p(q); err != nil {
|
||||
return nil, err
|
||||
start := time.Now()
|
||||
l.metrics.queriesTotal.Inc()
|
||||
|
||||
entries, err := func() ([]*pb.Entry, error) {
|
||||
q := &query{}
|
||||
for _, p := range params {
|
||||
if err := p(q); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// TODO(fabxc): For now our only query mode is the most recent entry for a
|
||||
// receiver/group_key combination.
|
||||
if q.recv == nil || q.groupKey == nil {
|
||||
// TODO(fabxc): allow more complex queries in the future.
|
||||
// How to enable pagination?
|
||||
return nil, errors.New("no query parameters specified")
|
||||
}
|
||||
}
|
||||
// TODO(fabxc): For now our only query mode is the most recent entry for a
|
||||
// receiver/group_key combination.
|
||||
if q.recv == nil || q.groupKey == nil {
|
||||
// TODO(fabxc): allow more complex queries in the future.
|
||||
// How to enable pagination?
|
||||
return nil, errors.New("no query parameters specified")
|
||||
}
|
||||
|
||||
l.mtx.RLock()
|
||||
defer l.mtx.RUnlock()
|
||||
l.mtx.RLock()
|
||||
defer l.mtx.RUnlock()
|
||||
|
||||
if le, ok := l.st[stateKey(q.groupKey, q.recv)]; ok {
|
||||
return []*pb.Entry{le.Entry}, nil
|
||||
if le, ok := l.st[stateKey(q.groupKey, q.recv)]; ok {
|
||||
return []*pb.Entry{le.Entry}, nil
|
||||
}
|
||||
return nil, ErrNotFound
|
||||
}()
|
||||
if err != nil {
|
||||
l.metrics.queryErrorsTotal.Inc()
|
||||
}
|
||||
return nil, ErrNotFound
|
||||
l.metrics.queryDuration.Observe(time.Since(start).Seconds())
|
||||
return entries, err
|
||||
}
|
||||
|
||||
// loadSnapshot loads a snapshot generated by Snapshot() into the state.
|
||||
|
@ -398,6 +470,9 @@ func (l *nlog) loadSnapshot(r io.Reader) error {
|
|||
|
||||
// Snapshot implements the Log interface.
|
||||
func (l *nlog) Snapshot(w io.Writer) (int, error) {
|
||||
start := time.Now()
|
||||
defer func() { l.metrics.snapshotDuration.Observe(time.Since(start).Seconds()) }()
|
||||
|
||||
l.mtx.RLock()
|
||||
defer l.mtx.RUnlock()
|
||||
|
||||
|
|
|
@ -41,7 +41,8 @@ func TestNlogGC(t *testing.T) {
|
|||
"a2": newEntry(now.Add(time.Second)),
|
||||
"a3": newEntry(now.Add(-time.Second)),
|
||||
},
|
||||
now: func() time.Time { return now },
|
||||
now: func() time.Time { return now },
|
||||
metrics: newMetrics(nil),
|
||||
}
|
||||
n, err := l.GC()
|
||||
require.NoError(t, err, "unexpected error in garbage collection")
|
||||
|
@ -98,7 +99,10 @@ func TestNlogSnapshot(t *testing.T) {
|
|||
f, err := ioutil.TempFile("", "snapshot")
|
||||
require.NoError(t, err, "creating temp file failed")
|
||||
|
||||
l1 := &nlog{st: gossipData{}}
|
||||
l1 := &nlog{
|
||||
st: gossipData{},
|
||||
metrics: newMetrics(nil),
|
||||
}
|
||||
// Setup internal state manually.
|
||||
for _, e := range c.entries {
|
||||
l1.st[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = e
|
||||
|
|
Loading…
Reference in New Issue