mirror of
https://github.com/prometheus/alertmanager
synced 2025-05-16 14:48:37 +00:00
*: Introduce config coordinator bundling config specific logic
Instead of handling all config specific logic inside Alertmangaer.main(), this patch introduces the config coordinator component. Tasks of the config coordinator: - Load and parse configuration - Notify subscribers on configuration changes - Register and manage configuration specific metrics Signed-off-by: Max Leonard Inden <IndenML@gmail.com>
This commit is contained in:
parent
f809c45f4e
commit
d0cd5a0f08
@ -186,12 +186,9 @@ func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
|
|||||||
|
|
||||||
// Update config and resolve timeout of each API. APIv2 also needs
|
// Update config and resolve timeout of each API. APIv2 also needs
|
||||||
// setAlertStatus to be updated.
|
// setAlertStatus to be updated.
|
||||||
func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration, setAlertStatus func(model.LabelSet) error) error {
|
func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet) error) {
|
||||||
if err := api.v1.Update(cfg, resolveTimeout); err != nil {
|
api.v1.Update(cfg)
|
||||||
return err
|
api.v2.Update(cfg, setAlertStatus)
|
||||||
}
|
|
||||||
|
|
||||||
return api.v2.Update(cfg, resolveTimeout, setAlertStatus)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *API) limitHandler(h http.Handler) http.Handler {
|
func (api *API) limitHandler(h http.Handler) http.Handler {
|
||||||
|
@ -71,7 +71,6 @@ type API struct {
|
|||||||
silences *silence.Silences
|
silences *silence.Silences
|
||||||
config *config.Config
|
config *config.Config
|
||||||
route *dispatch.Route
|
route *dispatch.Route
|
||||||
resolveTimeout time.Duration
|
|
||||||
uptime time.Time
|
uptime time.Time
|
||||||
peer *cluster.Peer
|
peer *cluster.Peer
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
@ -153,14 +152,12 @@ func (api *API) Register(r *route.Router) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update sets the configuration string to a new value.
|
// Update sets the configuration string to a new value.
|
||||||
func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration) error {
|
func (api *API) Update(cfg *config.Config) {
|
||||||
api.mtx.Lock()
|
api.mtx.Lock()
|
||||||
defer api.mtx.Unlock()
|
defer api.mtx.Unlock()
|
||||||
|
|
||||||
api.resolveTimeout = resolveTimeout
|
|
||||||
api.config = cfg
|
api.config = cfg
|
||||||
api.route = dispatch.NewRoute(cfg.Route, nil)
|
api.route = dispatch.NewRoute(cfg.Route, nil)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type errorType string
|
type errorType string
|
||||||
@ -432,7 +429,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
api.mtx.RLock()
|
api.mtx.RLock()
|
||||||
resolveTimeout := api.resolveTimeout
|
resolveTimeout := time.Duration(api.config.Global.ResolveTimeout)
|
||||||
api.mtx.RUnlock()
|
api.mtx.RUnlock()
|
||||||
|
|
||||||
for _, alert := range alerts {
|
for _, alert := range alerts {
|
||||||
|
@ -133,6 +133,12 @@ func TestAddAlerts(t *testing.T) {
|
|||||||
|
|
||||||
alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
|
alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
|
||||||
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
|
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
|
||||||
|
defaultGlobalConfig := config.DefaultGlobalConfig()
|
||||||
|
route := config.Route{}
|
||||||
|
api.Update(&config.Config{
|
||||||
|
Global: &defaultGlobalConfig,
|
||||||
|
Route: &route,
|
||||||
|
})
|
||||||
|
|
||||||
r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
|
r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
|
||||||
w := httptest.NewRecorder()
|
w := httptest.NewRecorder()
|
||||||
|
@ -56,11 +56,10 @@ type API struct {
|
|||||||
getAlertStatus getAlertStatusFn
|
getAlertStatus getAlertStatusFn
|
||||||
uptime time.Time
|
uptime time.Time
|
||||||
|
|
||||||
// mtx protects resolveTimeout, alertmanagerConfig, setAlertStatus and route.
|
// mtx protects alertmanagerConfig, setAlertStatus and route.
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
// resolveTimeout represents the default resolve timeout that an alert is
|
// resolveTimeout represents the default resolve timeout that an alert is
|
||||||
// assigned if no end time is specified.
|
// assigned if no end time is specified.
|
||||||
resolveTimeout time.Duration
|
|
||||||
alertmanagerConfig *config.Config
|
alertmanagerConfig *config.Config
|
||||||
route *dispatch.Route
|
route *dispatch.Route
|
||||||
setAlertStatus setAlertStatusFn
|
setAlertStatus setAlertStatusFn
|
||||||
@ -123,15 +122,13 @@ func NewAPI(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update sets the API struct members that may change between reloads of alertmanager.
|
// Update sets the API struct members that may change between reloads of alertmanager.
|
||||||
func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration, setAlertStatus setAlertStatusFn) error {
|
func (api *API) Update(cfg *config.Config, setAlertStatus setAlertStatusFn) {
|
||||||
api.mtx.Lock()
|
api.mtx.Lock()
|
||||||
defer api.mtx.Unlock()
|
defer api.mtx.Unlock()
|
||||||
|
|
||||||
api.resolveTimeout = resolveTimeout
|
|
||||||
api.alertmanagerConfig = cfg
|
api.alertmanagerConfig = cfg
|
||||||
api.route = dispatch.NewRoute(cfg.Route, nil)
|
api.route = dispatch.NewRoute(cfg.Route, nil)
|
||||||
api.setAlertStatus = setAlertStatus
|
api.setAlertStatus = setAlertStatus
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware.Responder {
|
func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware.Responder {
|
||||||
@ -336,7 +333,7 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
api.mtx.RLock()
|
api.mtx.RLock()
|
||||||
resolveTimeout := api.resolveTimeout
|
resolveTimeout := time.Duration(api.alertmanagerConfig.Global.ResolveTimeout)
|
||||||
api.mtx.RUnlock()
|
api.mtx.RUnlock()
|
||||||
|
|
||||||
for _, alert := range alerts {
|
for _, alert := range alerts {
|
||||||
|
@ -15,8 +15,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/md5"
|
|
||||||
"encoding/binary"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -56,18 +54,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
configHash = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Name: "alertmanager_config_hash",
|
|
||||||
Help: "Hash of the currently loaded alertmanager configuration.",
|
|
||||||
})
|
|
||||||
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Name: "alertmanager_config_last_reload_successful",
|
|
||||||
Help: "Whether the last configuration reload attempt was successful.",
|
|
||||||
})
|
|
||||||
configSuccessTime = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Name: "alertmanager_config_last_reload_success_timestamp_seconds",
|
|
||||||
Help: "Timestamp of the last successful configuration reload.",
|
|
||||||
})
|
|
||||||
requestDuration = prometheus.NewHistogramVec(
|
requestDuration = prometheus.NewHistogramVec(
|
||||||
prometheus.HistogramOpts{
|
prometheus.HistogramOpts{
|
||||||
Name: "alertmanager_http_request_duration_seconds",
|
Name: "alertmanager_http_request_duration_seconds",
|
||||||
@ -88,9 +74,6 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(configSuccess)
|
|
||||||
prometheus.MustRegister(configSuccessTime)
|
|
||||||
prometheus.MustRegister(configHash)
|
|
||||||
prometheus.MustRegister(requestDuration)
|
prometheus.MustRegister(requestDuration)
|
||||||
prometheus.MustRegister(responseSize)
|
prometheus.MustRegister(responseSize)
|
||||||
prometheus.MustRegister(version.NewCollector("alertmanager"))
|
prometheus.MustRegister(version.NewCollector("alertmanager"))
|
||||||
@ -263,14 +246,6 @@ func run() int {
|
|||||||
}
|
}
|
||||||
defer alerts.Close()
|
defer alerts.Close()
|
||||||
|
|
||||||
var (
|
|
||||||
inhibitor *inhibit.Inhibitor
|
|
||||||
tmpl *template.Template
|
|
||||||
pipeline notify.Stage
|
|
||||||
disp *dispatch.Dispatcher
|
|
||||||
)
|
|
||||||
defer disp.Stop()
|
|
||||||
|
|
||||||
api, err := api.New(api.Options{
|
api, err := api.New(api.Options{
|
||||||
Alerts: alerts,
|
Alerts: alerts,
|
||||||
Silences: silences,
|
Silences: silences,
|
||||||
@ -304,30 +279,24 @@ func run() int {
|
|||||||
return d + waitFunc()
|
return d + waitFunc()
|
||||||
}
|
}
|
||||||
|
|
||||||
var hash float64
|
var (
|
||||||
reload := func() (err error) {
|
inhibitor *inhibit.Inhibitor
|
||||||
level.Info(logger).Log("msg", "Loading configuration file", "file", *configFile)
|
tmpl *template.Template
|
||||||
defer func() {
|
pipeline notify.Stage
|
||||||
if err != nil {
|
disp *dispatch.Dispatcher
|
||||||
level.Error(logger).Log("msg", "Loading configuration file failed", "file", *configFile, "err", err)
|
)
|
||||||
configSuccess.Set(0)
|
|
||||||
} else {
|
|
||||||
configSuccess.Set(1)
|
|
||||||
configSuccessTime.Set(float64(time.Now().Unix()))
|
|
||||||
configHash.Set(hash)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
conf, plainCfg, err := config.LoadFile(*configFile)
|
defer disp.Stop()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
hash = md5HashAsMetricValue(plainCfg)
|
|
||||||
|
|
||||||
|
configCoordinator := config.NewCoordinator(
|
||||||
|
*configFile,
|
||||||
|
prometheus.DefaultRegisterer,
|
||||||
|
log.With(logger, "component", "configuration"),
|
||||||
|
)
|
||||||
|
configCoordinator.Subscribe(func(conf *config.Config) error {
|
||||||
tmpl, err = template.FromGlobs(conf.Templates...)
|
tmpl, err = template.FromGlobs(conf.Templates...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("failed to parse templates: %v", err.Error())
|
||||||
}
|
}
|
||||||
tmpl.ExternalURL = amURL
|
tmpl.ExternalURL = amURL
|
||||||
|
|
||||||
@ -347,10 +316,7 @@ func run() int {
|
|||||||
logger,
|
logger,
|
||||||
)
|
)
|
||||||
|
|
||||||
err = api.Update(conf, time.Duration(conf.Global.ResolveTimeout), setAlertStatus(inhibitor, marker, silences))
|
api.Update(conf, setAlertStatus(inhibitor, marker, silences))
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc, logger)
|
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc, logger)
|
||||||
|
|
||||||
@ -358,9 +324,9 @@ func run() int {
|
|||||||
go inhibitor.Run()
|
go inhibitor.Run()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
})
|
||||||
|
|
||||||
if err := reload(); err != nil {
|
if err := configCoordinator.Reload(); err != nil {
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -413,9 +379,9 @@ func run() int {
|
|||||||
select {
|
select {
|
||||||
case <-hup:
|
case <-hup:
|
||||||
// ignore error, already logged in `reload()`
|
// ignore error, already logged in `reload()`
|
||||||
_ = reload()
|
_ = configCoordinator.Reload()
|
||||||
case errc := <-webReload:
|
case errc := <-webReload:
|
||||||
errc <- reload()
|
errc <- configCoordinator.Reload()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -470,15 +436,6 @@ func extURL(listen, external string) (*url.URL, error) {
|
|||||||
return u, nil
|
return u, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func md5HashAsMetricValue(data []byte) float64 {
|
|
||||||
sum := md5.Sum(data)
|
|
||||||
// We only want 48 bits as a float64 only has a 53 bit mantissa.
|
|
||||||
smallSum := sum[0:6]
|
|
||||||
var bytes = make([]byte, 8)
|
|
||||||
copy(bytes, smallSum)
|
|
||||||
return float64(binary.LittleEndian.Uint64(bytes))
|
|
||||||
}
|
|
||||||
|
|
||||||
func setAlertStatus(inhibitor *inhibit.Inhibitor, marker types.Marker, silences *silence.Silences) func(model.LabelSet) error {
|
func setAlertStatus(inhibitor *inhibit.Inhibitor, marker types.Marker, silences *silence.Silences) func(model.LabelSet) error {
|
||||||
return func(labels model.LabelSet) error {
|
return func(labels model.LabelSet) error {
|
||||||
inhibitor.Mutes(labels)
|
inhibitor.Mutes(labels)
|
||||||
|
157
config/coordinator.go
Normal file
157
config/coordinator.go
Normal file
@ -0,0 +1,157 @@
|
|||||||
|
// Copyright 2019 Prometheus Team
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/binary"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Coordinator coordinates Alertmanager configurations beyond the lifetime of a
|
||||||
|
// single configuration.
|
||||||
|
type Coordinator struct {
|
||||||
|
configFilePath string
|
||||||
|
logger log.Logger
|
||||||
|
|
||||||
|
// Protects config and subscribers
|
||||||
|
mutex sync.Mutex
|
||||||
|
config *Config
|
||||||
|
subscribers []func(*Config) error
|
||||||
|
|
||||||
|
configHashMetric prometheus.Gauge
|
||||||
|
configSuccessMetric prometheus.Gauge
|
||||||
|
configSuccessTimeMetric prometheus.Gauge
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCoordinator returns a new coordinator with the given configuration file
|
||||||
|
// path. It does not yet load the configuration from file. This is done in
|
||||||
|
// `Reload()`.
|
||||||
|
func NewCoordinator(configFilePath string, r prometheus.Registerer, l log.Logger) *Coordinator {
|
||||||
|
c := &Coordinator{
|
||||||
|
configFilePath: configFilePath,
|
||||||
|
logger: l,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.registerMetrics(r)
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) registerMetrics(r prometheus.Registerer) {
|
||||||
|
configHash := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Name: "alertmanager_config_hash",
|
||||||
|
Help: "Hash of the currently loaded alertmanager configuration.",
|
||||||
|
})
|
||||||
|
configSuccess := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Name: "alertmanager_config_last_reload_successful",
|
||||||
|
Help: "Whether the last configuration reload attempt was successful.",
|
||||||
|
})
|
||||||
|
configSuccessTime := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Name: "alertmanager_config_last_reload_success_timestamp_seconds",
|
||||||
|
Help: "Timestamp of the last successful configuration reload.",
|
||||||
|
})
|
||||||
|
|
||||||
|
r.MustRegister(configHash, configSuccess, configSuccessTime)
|
||||||
|
|
||||||
|
c.configHashMetric = configHash
|
||||||
|
c.configSuccessMetric = configSuccess
|
||||||
|
c.configSuccessTimeMetric = configSuccessTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe subscribes the given Subscribers to configuration changes.
|
||||||
|
func (c *Coordinator) Subscribe(ss ...func(*Config) error) {
|
||||||
|
c.mutex.Lock()
|
||||||
|
defer c.mutex.Unlock()
|
||||||
|
|
||||||
|
c.subscribers = append(c.subscribers, ss...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) notifySubscribers() error {
|
||||||
|
for _, s := range c.subscribers {
|
||||||
|
if err := s(c.config); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadFromFile triggers a configuration load, discarding the old configuration.
|
||||||
|
func (c *Coordinator) loadFromFile() error {
|
||||||
|
level.Info(c.logger).Log(
|
||||||
|
"msg", "Loading configuration file",
|
||||||
|
"file", c.configFilePath,
|
||||||
|
)
|
||||||
|
|
||||||
|
conf, plainConfig, err := LoadFile(c.configFilePath)
|
||||||
|
if err != nil {
|
||||||
|
c.configSuccessMetric.Set(0)
|
||||||
|
level.Error(c.logger).Log(
|
||||||
|
"msg", "Loading configuration file failed",
|
||||||
|
"file", c.configFilePath,
|
||||||
|
"err", err,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.config = conf
|
||||||
|
c.configSuccessMetric.Set(1)
|
||||||
|
c.configSuccessTimeMetric.Set(float64(time.Now().Unix()))
|
||||||
|
hash := md5HashAsMetricValue(plainConfig)
|
||||||
|
c.configHashMetric.Set(hash)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reload triggers a configuration reload from file and notifies all
|
||||||
|
// configuration change subscribers.
|
||||||
|
func (c *Coordinator) Reload() error {
|
||||||
|
c.mutex.Lock()
|
||||||
|
defer c.mutex.Unlock()
|
||||||
|
|
||||||
|
if err := c.loadFromFile(); err != nil {
|
||||||
|
c.logger.Log(
|
||||||
|
"msg", "loading configuration file failed",
|
||||||
|
"file", c.configFilePath,
|
||||||
|
"err", err,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.notifySubscribers(); err != nil {
|
||||||
|
c.logger.Log(
|
||||||
|
"msg", "one or more config change subscribers failed to apply new config",
|
||||||
|
"file", c.configFilePath,
|
||||||
|
"err", err,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func md5HashAsMetricValue(data []byte) float64 {
|
||||||
|
sum := md5.Sum(data)
|
||||||
|
// We only want 48 bits as a float64 only has a 53 bit mantissa.
|
||||||
|
smallSum := sum[0:6]
|
||||||
|
var bytes = make([]byte, 8)
|
||||||
|
copy(bytes, smallSum)
|
||||||
|
return float64(binary.LittleEndian.Uint64(bytes))
|
||||||
|
}
|
83
config/coordinator_test.go
Normal file
83
config/coordinator_test.go
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
// Copyright 2019 Prometheus Team
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeRegisterer struct {
|
||||||
|
registeredCollectors []prometheus.Collector
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *fakeRegisterer) Register(prometheus.Collector) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *fakeRegisterer) MustRegister(c ...prometheus.Collector) {
|
||||||
|
r.registeredCollectors = append(r.registeredCollectors, c...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *fakeRegisterer) Unregister(prometheus.Collector) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCoordinatorRegistersMetrics(t *testing.T) {
|
||||||
|
fr := fakeRegisterer{}
|
||||||
|
NewCoordinator("testdata/conf.good.yml", &fr, log.NewNopLogger())
|
||||||
|
|
||||||
|
if len(fr.registeredCollectors) == 0 {
|
||||||
|
t.Error("expected NewCoordinator to register metrics on the given registerer")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCoordinatorNotifiesSubscribers(t *testing.T) {
|
||||||
|
callBackCalled := false
|
||||||
|
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), log.NewNopLogger())
|
||||||
|
c.Subscribe(func(*Config) error {
|
||||||
|
callBackCalled = true
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
err := c.Reload()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !callBackCalled {
|
||||||
|
t.Fatal("expected coordinator.Reload() to call subscribers")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCoordinatorFailReloadWhenSubscriberFails(t *testing.T) {
|
||||||
|
errMessage := "something happened"
|
||||||
|
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), log.NewNopLogger())
|
||||||
|
|
||||||
|
c.Subscribe(func(*Config) error {
|
||||||
|
return errors.New(errMessage)
|
||||||
|
})
|
||||||
|
|
||||||
|
err := c.Reload()
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected reload to throw an error")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err.Error() != errMessage {
|
||||||
|
t.Fatalf("expected error message %q but got %q", errMessage, err)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user