dispatch: add metrics (#2113)
Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
parent
21e99dcb63
commit
4f45457b9c
|
@ -371,6 +371,7 @@ func run() int {
|
||||||
tmpl *template.Template
|
tmpl *template.Template
|
||||||
)
|
)
|
||||||
|
|
||||||
|
dispMetrics := dispatch.NewDispatcherMetrics(prometheus.DefaultRegisterer)
|
||||||
pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer)
|
pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer)
|
||||||
configCoordinator := config.NewCoordinator(
|
configCoordinator := config.NewCoordinator(
|
||||||
*configFile,
|
*configFile,
|
||||||
|
@ -415,7 +416,7 @@ func run() int {
|
||||||
})
|
})
|
||||||
|
|
||||||
routes := dispatch.NewRoute(conf.Route, nil)
|
routes := dispatch.NewRoute(conf.Route, nil)
|
||||||
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, logger)
|
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, logger, dispMetrics)
|
||||||
walkRoute(routes, func(r *dispatch.Route) {
|
walkRoute(routes, func(r *dispatch.Route) {
|
||||||
if r.RouteOpts.RepeatInterval > *retention {
|
if r.RouteOpts.RepeatInterval > *retention {
|
||||||
level.Warn(log.With(logger, "component", "configuration")).Log(
|
level.Warn(log.With(logger, "component", "configuration")).Log(
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
"github.com/prometheus/alertmanager/notify"
|
"github.com/prometheus/alertmanager/notify"
|
||||||
|
@ -30,12 +31,39 @@ import (
|
||||||
"github.com/prometheus/alertmanager/types"
|
"github.com/prometheus/alertmanager/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DispatcherMetrics represents metrics associated to a dispatcher.
|
||||||
|
type DispatcherMetrics struct {
|
||||||
|
aggrGroups prometheus.Gauge
|
||||||
|
processingDuration prometheus.Summary
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDispatcherMetrics returns a new registered DispatchMetrics.
|
||||||
|
func NewDispatcherMetrics(r prometheus.Registerer) *DispatcherMetrics {
|
||||||
|
m := DispatcherMetrics{
|
||||||
|
aggrGroups: prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "alertmanager_dispatcher_aggregation_groups",
|
||||||
|
Help: "Number of active aggregation groups",
|
||||||
|
},
|
||||||
|
),
|
||||||
|
processingDuration: prometheus.NewSummary(
|
||||||
|
prometheus.SummaryOpts{
|
||||||
|
Name: "alertmanager_dispatcher_alert_processing_duration_seconds",
|
||||||
|
Help: "Summary of latencies for the processing of alerts.",
|
||||||
|
},
|
||||||
|
),
|
||||||
|
}
|
||||||
|
prometheus.MustRegister(m.aggrGroups, m.processingDuration)
|
||||||
|
return &m
|
||||||
|
}
|
||||||
|
|
||||||
// Dispatcher sorts incoming alerts into aggregation groups and
|
// Dispatcher sorts incoming alerts into aggregation groups and
|
||||||
// assigns the correct notifiers to each.
|
// assigns the correct notifiers to each.
|
||||||
type Dispatcher struct {
|
type Dispatcher struct {
|
||||||
route *Route
|
route *Route
|
||||||
alerts provider.Alerts
|
alerts provider.Alerts
|
||||||
stage notify.Stage
|
stage notify.Stage
|
||||||
|
metrics *DispatcherMetrics
|
||||||
|
|
||||||
marker types.Marker
|
marker types.Marker
|
||||||
timeout func(time.Duration) time.Duration
|
timeout func(time.Duration) time.Duration
|
||||||
|
@ -58,6 +86,7 @@ func NewDispatcher(
|
||||||
mk types.Marker,
|
mk types.Marker,
|
||||||
to func(time.Duration) time.Duration,
|
to func(time.Duration) time.Duration,
|
||||||
l log.Logger,
|
l log.Logger,
|
||||||
|
m *DispatcherMetrics,
|
||||||
) *Dispatcher {
|
) *Dispatcher {
|
||||||
disp := &Dispatcher{
|
disp := &Dispatcher{
|
||||||
alerts: ap,
|
alerts: ap,
|
||||||
|
@ -66,6 +95,7 @@ func NewDispatcher(
|
||||||
marker: mk,
|
marker: mk,
|
||||||
timeout: to,
|
timeout: to,
|
||||||
logger: log.With(l, "component", "dispatcher"),
|
logger: log.With(l, "component", "dispatcher"),
|
||||||
|
metrics: m,
|
||||||
}
|
}
|
||||||
return disp
|
return disp
|
||||||
}
|
}
|
||||||
|
@ -76,6 +106,7 @@ func (d *Dispatcher) Run() {
|
||||||
|
|
||||||
d.mtx.Lock()
|
d.mtx.Lock()
|
||||||
d.aggrGroups = map[*Route]map[model.Fingerprint]*aggrGroup{}
|
d.aggrGroups = map[*Route]map[model.Fingerprint]*aggrGroup{}
|
||||||
|
d.metrics.aggrGroups.Set(0)
|
||||||
d.mtx.Unlock()
|
d.mtx.Unlock()
|
||||||
|
|
||||||
d.ctx, d.cancel = context.WithCancel(context.Background())
|
d.ctx, d.cancel = context.WithCancel(context.Background())
|
||||||
|
@ -109,9 +140,11 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
for _, r := range d.route.Match(alert.Labels) {
|
for _, r := range d.route.Match(alert.Labels) {
|
||||||
d.processAlert(alert, r)
|
d.processAlert(alert, r)
|
||||||
}
|
}
|
||||||
|
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
|
||||||
|
|
||||||
case <-cleanup.C:
|
case <-cleanup.C:
|
||||||
d.mtx.Lock()
|
d.mtx.Lock()
|
||||||
|
@ -121,6 +154,7 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
|
||||||
if ag.empty() {
|
if ag.empty() {
|
||||||
ag.stop()
|
ag.stop()
|
||||||
delete(groups, ag.fingerprint())
|
delete(groups, ag.fingerprint())
|
||||||
|
d.metrics.aggrGroups.Dec()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -252,6 +286,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
|
||||||
if !ok {
|
if !ok {
|
||||||
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
|
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
|
||||||
group[fp] = ag
|
group[fp] = ag
|
||||||
|
d.metrics.aggrGroups.Inc()
|
||||||
|
|
||||||
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
|
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
|
||||||
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
|
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
|
||||||
|
|
|
@ -372,7 +372,7 @@ route:
|
||||||
|
|
||||||
timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
|
timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
|
||||||
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
|
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
|
||||||
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger)
|
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, logger, NewDispatcherMetrics(prometheus.NewRegistry()))
|
||||||
go dispatcher.Run()
|
go dispatcher.Run()
|
||||||
defer dispatcher.Stop()
|
defer dispatcher.Stop()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue