diff --git a/api/api.go b/api/api.go index 46c5caa3..9c8346e5 100644 --- a/api/api.go +++ b/api/api.go @@ -186,12 +186,9 @@ func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux { // Update config and resolve timeout of each API. APIv2 also needs // setAlertStatus to be updated. -func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration, setAlertStatus func(model.LabelSet) error) error { - if err := api.v1.Update(cfg, resolveTimeout); err != nil { - return err - } - - return api.v2.Update(cfg, resolveTimeout, setAlertStatus) +func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet) error) { + api.v1.Update(cfg) + api.v2.Update(cfg, setAlertStatus) } func (api *API) limitHandler(h http.Handler) http.Handler { diff --git a/api/v1/api.go b/api/v1/api.go index 56245996..f9d98c1f 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -67,14 +67,13 @@ func setCORS(w http.ResponseWriter) { // API provides registration of handlers for API routes. type API struct { - alerts provider.Alerts - silences *silence.Silences - config *config.Config - route *dispatch.Route - resolveTimeout time.Duration - uptime time.Time - peer *cluster.Peer - logger log.Logger + alerts provider.Alerts + silences *silence.Silences + config *config.Config + route *dispatch.Route + uptime time.Time + peer *cluster.Peer + logger log.Logger numReceivedAlerts *prometheus.CounterVec numInvalidAlerts prometheus.Counter @@ -153,14 +152,12 @@ func (api *API) Register(r *route.Router) { } // Update sets the configuration string to a new value. -func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration) error { +func (api *API) Update(cfg *config.Config) { api.mtx.Lock() defer api.mtx.Unlock() - api.resolveTimeout = resolveTimeout api.config = cfg api.route = dispatch.NewRoute(cfg.Route, nil) - return nil } type errorType string @@ -432,7 +429,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* now := time.Now() api.mtx.RLock() - resolveTimeout := api.resolveTimeout + resolveTimeout := time.Duration(api.config.Global.ResolveTimeout) api.mtx.RUnlock() for _, alert := range alerts { diff --git a/api/v1/api_test.go b/api/v1/api_test.go index 1fba64e9..5ba85cd9 100644 --- a/api/v1/api_test.go +++ b/api/v1/api_test.go @@ -133,6 +133,12 @@ func TestAddAlerts(t *testing.T) { alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err) api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil) + defaultGlobalConfig := config.DefaultGlobalConfig() + route := config.Route{} + api.Update(&config.Config{ + Global: &defaultGlobalConfig, + Route: &route, + }) r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) w := httptest.NewRecorder() diff --git a/api/v2/api.go b/api/v2/api.go index 5bb5142a..376f9c69 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -56,11 +56,10 @@ type API struct { getAlertStatus getAlertStatusFn uptime time.Time - // mtx protects resolveTimeout, alertmanagerConfig, setAlertStatus and route. + // mtx protects alertmanagerConfig, setAlertStatus and route. mtx sync.RWMutex // resolveTimeout represents the default resolve timeout that an alert is // assigned if no end time is specified. - resolveTimeout time.Duration alertmanagerConfig *config.Config route *dispatch.Route setAlertStatus setAlertStatusFn @@ -124,15 +123,13 @@ func NewAPI( } // Update sets the API struct members that may change between reloads of alertmanager. -func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration, setAlertStatus setAlertStatusFn) error { +func (api *API) Update(cfg *config.Config, setAlertStatus setAlertStatusFn) { api.mtx.Lock() defer api.mtx.Unlock() - api.resolveTimeout = resolveTimeout api.alertmanagerConfig = cfg api.route = dispatch.NewRoute(cfg.Route, nil) api.setAlertStatus = setAlertStatus - return nil } func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware.Responder { @@ -337,7 +334,7 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. now := time.Now() api.mtx.RLock() - resolveTimeout := api.resolveTimeout + resolveTimeout := time.Duration(api.alertmanagerConfig.Global.ResolveTimeout) api.mtx.RUnlock() for _, alert := range alerts { diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index b59962ad..4d70424e 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -15,8 +15,6 @@ package main import ( "context" - "crypto/md5" - "encoding/binary" "fmt" "net" "net/http" @@ -56,18 +54,6 @@ import ( ) var ( - configHash = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "alertmanager_config_hash", - Help: "Hash of the currently loaded alertmanager configuration.", - }) - configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "alertmanager_config_last_reload_successful", - Help: "Whether the last configuration reload attempt was successful.", - }) - configSuccessTime = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "alertmanager_config_last_reload_success_timestamp_seconds", - Help: "Timestamp of the last successful configuration reload.", - }) requestDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "alertmanager_http_request_duration_seconds", @@ -88,9 +74,6 @@ var ( ) func init() { - prometheus.MustRegister(configSuccess) - prometheus.MustRegister(configSuccessTime) - prometheus.MustRegister(configHash) prometheus.MustRegister(requestDuration) prometheus.MustRegister(responseSize) prometheus.MustRegister(version.NewCollector("alertmanager")) @@ -263,14 +246,6 @@ func run() int { } defer alerts.Close() - var ( - inhibitor *inhibit.Inhibitor - tmpl *template.Template - pipeline notify.Stage - disp *dispatch.Dispatcher - ) - defer disp.Stop() - api, err := api.New(api.Options{ Alerts: alerts, Silences: silences, @@ -304,30 +279,24 @@ func run() int { return d + waitFunc() } - var hash float64 - reload := func() (err error) { - level.Info(logger).Log("msg", "Loading configuration file", "file", *configFile) - defer func() { - if err != nil { - level.Error(logger).Log("msg", "Loading configuration file failed", "file", *configFile, "err", err) - configSuccess.Set(0) - } else { - configSuccess.Set(1) - configSuccessTime.Set(float64(time.Now().Unix())) - configHash.Set(hash) - } - }() + var ( + inhibitor *inhibit.Inhibitor + tmpl *template.Template + pipeline notify.Stage + disp *dispatch.Dispatcher + ) - conf, plainCfg, err := config.LoadFile(*configFile) - if err != nil { - return err - } - - hash = md5HashAsMetricValue(plainCfg) + defer disp.Stop() + configCoordinator := config.NewCoordinator( + *configFile, + prometheus.DefaultRegisterer, + log.With(logger, "component", "configuration"), + ) + configCoordinator.Subscribe(func(conf *config.Config) error { tmpl, err = template.FromGlobs(conf.Templates...) if err != nil { - return err + return fmt.Errorf("failed to parse templates: %v", err.Error()) } tmpl.ExternalURL = amURL @@ -347,10 +316,7 @@ func run() int { logger, ) - err = api.Update(conf, time.Duration(conf.Global.ResolveTimeout), setAlertStatus(inhibitor, marker, silences)) - if err != nil { - return err - } + api.Update(conf, setAlertStatus(inhibitor, marker, silences)) disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc, logger) @@ -358,9 +324,9 @@ func run() int { go inhibitor.Run() return nil - } + }) - if err := reload(); err != nil { + if err := configCoordinator.Reload(); err != nil { return 1 } @@ -413,9 +379,9 @@ func run() int { select { case <-hup: // ignore error, already logged in `reload()` - _ = reload() + _ = configCoordinator.Reload() case errc := <-webReload: - errc <- reload() + errc <- configCoordinator.Reload() } } }() @@ -470,15 +436,6 @@ func extURL(listen, external string) (*url.URL, error) { return u, nil } -func md5HashAsMetricValue(data []byte) float64 { - sum := md5.Sum(data) - // We only want 48 bits as a float64 only has a 53 bit mantissa. - smallSum := sum[0:6] - var bytes = make([]byte, 8) - copy(bytes, smallSum) - return float64(binary.LittleEndian.Uint64(bytes)) -} - func setAlertStatus(inhibitor *inhibit.Inhibitor, marker types.Marker, silences *silence.Silences) func(model.LabelSet) error { return func(labels model.LabelSet) error { inhibitor.Mutes(labels) diff --git a/config/coordinator.go b/config/coordinator.go new file mode 100644 index 00000000..35c2049a --- /dev/null +++ b/config/coordinator.go @@ -0,0 +1,157 @@ +// Copyright 2019 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "crypto/md5" + "encoding/binary" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +// Coordinator coordinates Alertmanager configurations beyond the lifetime of a +// single configuration. +type Coordinator struct { + configFilePath string + logger log.Logger + + // Protects config and subscribers + mutex sync.Mutex + config *Config + subscribers []func(*Config) error + + configHashMetric prometheus.Gauge + configSuccessMetric prometheus.Gauge + configSuccessTimeMetric prometheus.Gauge +} + +// NewCoordinator returns a new coordinator with the given configuration file +// path. It does not yet load the configuration from file. This is done in +// `Reload()`. +func NewCoordinator(configFilePath string, r prometheus.Registerer, l log.Logger) *Coordinator { + c := &Coordinator{ + configFilePath: configFilePath, + logger: l, + } + + c.registerMetrics(r) + + return c +} + +func (c *Coordinator) registerMetrics(r prometheus.Registerer) { + configHash := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "alertmanager_config_hash", + Help: "Hash of the currently loaded alertmanager configuration.", + }) + configSuccess := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "alertmanager_config_last_reload_successful", + Help: "Whether the last configuration reload attempt was successful.", + }) + configSuccessTime := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "alertmanager_config_last_reload_success_timestamp_seconds", + Help: "Timestamp of the last successful configuration reload.", + }) + + r.MustRegister(configHash, configSuccess, configSuccessTime) + + c.configHashMetric = configHash + c.configSuccessMetric = configSuccess + c.configSuccessTimeMetric = configSuccessTime +} + +// Subscribe subscribes the given Subscribers to configuration changes. +func (c *Coordinator) Subscribe(ss ...func(*Config) error) { + c.mutex.Lock() + defer c.mutex.Unlock() + + c.subscribers = append(c.subscribers, ss...) +} + +func (c *Coordinator) notifySubscribers() error { + for _, s := range c.subscribers { + if err := s(c.config); err != nil { + return err + } + } + + return nil +} + +// loadFromFile triggers a configuration load, discarding the old configuration. +func (c *Coordinator) loadFromFile() error { + level.Info(c.logger).Log( + "msg", "Loading configuration file", + "file", c.configFilePath, + ) + + conf, plainConfig, err := LoadFile(c.configFilePath) + if err != nil { + c.configSuccessMetric.Set(0) + level.Error(c.logger).Log( + "msg", "Loading configuration file failed", + "file", c.configFilePath, + "err", err, + ) + return err + } + + c.config = conf + c.configSuccessMetric.Set(1) + c.configSuccessTimeMetric.Set(float64(time.Now().Unix())) + hash := md5HashAsMetricValue(plainConfig) + c.configHashMetric.Set(hash) + + return nil +} + +// Reload triggers a configuration reload from file and notifies all +// configuration change subscribers. +func (c *Coordinator) Reload() error { + c.mutex.Lock() + defer c.mutex.Unlock() + + if err := c.loadFromFile(); err != nil { + c.logger.Log( + "msg", "loading configuration file failed", + "file", c.configFilePath, + "err", err, + ) + return err + } + + if err := c.notifySubscribers(); err != nil { + c.logger.Log( + "msg", "one or more config change subscribers failed to apply new config", + "file", c.configFilePath, + "err", err, + ) + return err + } + + return nil +} + +func md5HashAsMetricValue(data []byte) float64 { + sum := md5.Sum(data) + // We only want 48 bits as a float64 only has a 53 bit mantissa. + smallSum := sum[0:6] + var bytes = make([]byte, 8) + copy(bytes, smallSum) + return float64(binary.LittleEndian.Uint64(bytes)) +} diff --git a/config/coordinator_test.go b/config/coordinator_test.go new file mode 100644 index 00000000..a754ded2 --- /dev/null +++ b/config/coordinator_test.go @@ -0,0 +1,83 @@ +// Copyright 2019 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "errors" + "testing" + + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +type fakeRegisterer struct { + registeredCollectors []prometheus.Collector +} + +func (r *fakeRegisterer) Register(prometheus.Collector) error { + return nil +} + +func (r *fakeRegisterer) MustRegister(c ...prometheus.Collector) { + r.registeredCollectors = append(r.registeredCollectors, c...) +} + +func (r *fakeRegisterer) Unregister(prometheus.Collector) bool { + return false +} + +func TestCoordinatorRegistersMetrics(t *testing.T) { + fr := fakeRegisterer{} + NewCoordinator("testdata/conf.good.yml", &fr, log.NewNopLogger()) + + if len(fr.registeredCollectors) == 0 { + t.Error("expected NewCoordinator to register metrics on the given registerer") + } +} + +func TestCoordinatorNotifiesSubscribers(t *testing.T) { + callBackCalled := false + c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), log.NewNopLogger()) + c.Subscribe(func(*Config) error { + callBackCalled = true + return nil + }) + + err := c.Reload() + if err != nil { + t.Fatal(err) + } + + if !callBackCalled { + t.Fatal("expected coordinator.Reload() to call subscribers") + } +} + +func TestCoordinatorFailReloadWhenSubscriberFails(t *testing.T) { + errMessage := "something happened" + c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), log.NewNopLogger()) + + c.Subscribe(func(*Config) error { + return errors.New(errMessage) + }) + + err := c.Reload() + if err == nil { + t.Fatal("expected reload to throw an error") + } + + if err.Error() != errMessage { + t.Fatalf("expected error message %q but got %q", errMessage, err) + } +}