From f35816613e2d5579c97799233ca8126937828f20 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 3 Mar 2017 02:53:16 +0530 Subject: [PATCH] Refactored Notifier to use Registerer * Brought metrics back into Notifier Notifier still implements a Collector. Check if that is needed. --- cmd/prometheus/main.go | 2 +- notifier/notifier.go | 179 ++++++++++++++++++++------------------ notifier/notifier_test.go | 11 +-- 3 files changed, 102 insertions(+), 90 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index c59295861..f51333863 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -97,7 +97,7 @@ func Main() int { reloadables = append(reloadables, remoteStorage) var ( - notifier = notifier.New(&cfg.notifier) + notifier = notifier.New(&cfg.notifier, prometheus.DefaultRegisterer) targetManager = retrieval.NewTargetManager(sampleAppender) queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) ctx, cancelCtx = context.WithCancel(context.Background()) diff --git a/notifier/notifier.go b/notifier/notifier.go index cc256ed4f..abff02a90 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -50,62 +50,19 @@ const ( alertmanagerLabel = "alertmanager" ) -var ( - alertLatency = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "latency_seconds", - Help: "Latency quantiles for sending alert notifications (not including dropped notifications).", - }, - []string{alertmanagerLabel}, - ) - - alertErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "errors_total", - Help: "Total number of errors sending alert notifications.", - }, - []string{alertmanagerLabel}, - ) - - alertSent = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "sent_total", - Help: "Total number of alerts sent.", - }, - []string{alertmanagerLabel}, - ) - - alertDropped = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "dropped_total", - Help: "Total number of alerts dropped due to errors when sending to Alertmanager.", - }) - - alertQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "queue_length", - Help: "The number of alert notifications in the queue.", - }) -) - // Notifier is responsible for dispatching alert notifications to an // alert manager service. type Notifier struct { queue model.Alerts opts *Options + metrics *alertMetrics + more chan struct{} mtx sync.RWMutex ctx context.Context cancel func() - queueCapacity prometheus.Metric - alertmanagers []*alertmanagerSet cancelDiscovery func() } @@ -119,21 +76,53 @@ type Options struct { Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) } -// New constructs a new Notifier. -func New(o *Options) *Notifier { - ctx, cancel := context.WithCancel(context.Background()) - - if o.Do == nil { - o.Do = ctxhttp.Do - } - - return &Notifier{ - queue: make(model.Alerts, 0, o.QueueCapacity), - ctx: ctx, - cancel: cancel, - more: make(chan struct{}, 1), - opts: o, +type alertMetrics struct { + latency *prometheus.SummaryVec + errors *prometheus.CounterVec + sent *prometheus.CounterVec + dropped prometheus.Counter + queueLength prometheus.Gauge + queueCapacity prometheus.Metric +} +func newAlertMetrics(r prometheus.Registerer, o *Options) *alertMetrics { + m := &alertMetrics{ + latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "latency_seconds", + Help: "Latency quantiles for sending alert notifications (not including dropped notifications).", + }, + []string{alertmanagerLabel}, + ), + errors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "errors_total", + Help: "Total number of errors sending alert notifications.", + }, + []string{alertmanagerLabel}, + ), + sent: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_total", + Help: "Total number of alerts successfully sent.", + }, + []string{alertmanagerLabel}, + ), + dropped: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dropped_total", + Help: "Total number of alerts dropped due to errors when sending to Alertmanager.", + }), + queueLength: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_length", + Help: "The number of alert notifications in the queue.", + }), queueCapacity: prometheus.MustNewConstMetric( prometheus.NewDesc( prometheus.BuildFQName(namespace, subsystem, "queue_capacity"), @@ -144,6 +133,35 @@ func New(o *Options) *Notifier { float64(o.QueueCapacity), ), } + + if r != nil { + r.MustRegister( + m.latency, + m.errors, + m.sent, + m.dropped, + ) + } + + return m +} + +// New constructs a new Notifier. +func New(o *Options, r prometheus.Registerer) *Notifier { + ctx, cancel := context.WithCancel(context.Background()) + + if o.Do == nil { + o.Do = ctxhttp.Do + } + + return &Notifier{ + queue: make(model.Alerts, 0, o.QueueCapacity), + ctx: ctx, + cancel: cancel, + more: make(chan struct{}, 1), + opts: o, + metrics: newAlertMetrics(r, o), + } } // ApplyConfig updates the status state as the new config requires. @@ -164,9 +182,10 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { } for _, am := range ams.ams { - alertErrors.WithLabelValues(am.url()) - alertSent.WithLabelValues(am.url()) + n.metrics.errors.WithLabelValues(am.url()) + n.metrics.sent.WithLabelValues(am.url()) } + ams.metrics = n.metrics amSets = append(amSets, ams) } @@ -224,7 +243,7 @@ func (n *Notifier) Run() { alerts := n.nextBatch() if !n.sendAll(alerts...) { - alertDropped.Add(float64(len(alerts))) + n.metrics.dropped.Add(float64(len(alerts))) } // If the queue still has items left, kick off the next iteration. if n.queueLen() > 0 { @@ -256,7 +275,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) { alerts = alerts[d:] log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d) - alertDropped.Add(float64(d)) + n.metrics.dropped.Add(float64(d)) } // If the queue is full, remove the oldest alerts in favor @@ -265,7 +284,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) { n.queue = n.queue[d:] log.Warnf("Alert notification queue full, dropping %d alerts", d) - alertDropped.Add(float64(d)) + n.metrics.dropped.Add(float64(d)) } n.queue = append(n.queue, alerts...) @@ -347,12 +366,12 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool { if err := n.sendOne(ctx, ams.client, u, b); err != nil { log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err) - alertErrors.WithLabelValues(u).Inc() + n.metrics.errors.WithLabelValues(u).Inc() } else { atomic.AddUint64(&numSuccess, 1) } - alertLatency.WithLabelValues(u).Observe(time.Since(begin).Seconds()) - alertSent.WithLabelValues(u).Add(float64(len(alerts))) + n.metrics.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds()) + n.metrics.sent.WithLabelValues(u).Add(float64(len(alerts))) wg.Done() }(am) @@ -391,26 +410,16 @@ func (n *Notifier) Stop() { // Describe implements prometheus.Collector. func (n *Notifier) Describe(ch chan<- *prometheus.Desc) { - alertLatency.Describe(ch) - alertErrors.Describe(ch) - alertSent.Describe(ch) - - ch <- alertDropped.Desc() - ch <- alertQueueLength.Desc() - ch <- n.queueCapacity.Desc() + ch <- n.metrics.queueCapacity.Desc() + ch <- n.metrics.queueLength.Desc() } // Collect implements prometheus.Collector. func (n *Notifier) Collect(ch chan<- prometheus.Metric) { - alertQueueLength.Set(float64(n.queueLen())) + n.metrics.queueLength.Set(float64(n.queueLen())) - alertLatency.Collect(ch) - alertErrors.Collect(ch) - alertSent.Collect(ch) - - ch <- alertDropped - ch <- alertQueueLength - ch <- n.queueCapacity + ch <- n.metrics.queueLength + ch <- n.metrics.queueCapacity } // alertmanager holds Alertmanager endpoint information. @@ -438,6 +447,8 @@ type alertmanagerSet struct { cfg *config.AlertmanagerConfig client *http.Client + metrics *alertMetrics + mtx sync.RWMutex ams []alertmanager } @@ -482,8 +493,8 @@ func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) { continue } - alertSent.WithLabelValues(us) - alertErrors.WithLabelValues(us) + s.metrics.sent.WithLabelValues(us) + s.metrics.errors.WithLabelValues(us) seen[us] = struct{}{} s.ams = append(s.ams, am) diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 3587a1a8a..b34f6fda4 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -25,6 +25,7 @@ import ( "golang.org/x/net/context" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" ) @@ -62,7 +63,7 @@ func TestPostPath(t *testing.T) { } func TestHandlerNextBatch(t *testing.T) { - h := New(&Options{}) + h := New(&Options{}, prometheus.DefaultRegisterer) for i := range make([]struct{}, 2*maxBatchSize+1) { h.queue = append(h.queue, &model.Alert{ @@ -149,7 +150,7 @@ func TestHandlerSendAll(t *testing.T) { defer server1.Close() defer server2.Close() - h := New(&Options{}) + h := New(&Options{}, prometheus.DefaultRegisterer) h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{ ams: []alertmanager{ alertmanagerMock{ @@ -216,7 +217,7 @@ func TestCustomDo(t *testing.T) { Body: ioutil.NopCloser(nil), }, nil }, - }) + }, prometheus.DefaultRegisterer) h.sendOne(context.Background(), nil, testURL, []byte(testBody)) @@ -238,7 +239,7 @@ func TestExternalLabels(t *testing.T) { Replacement: "c", }, }, - }) + }, prometheus.DefaultRegisterer) // This alert should get the external label attached. h.Send(&model.Alert{ @@ -356,7 +357,7 @@ func TestHandlerQueueing(t *testing.T) { cfg: &config.AlertmanagerConfig{ Timeout: time.Second, }, - }) + }, prometheus.DefaultRegisterer) var alerts model.Alerts for i := range make([]struct{}, 20*maxBatchSize) {