Merge pull request #1744 from mxinden/introduce-config-coordinator

*: Introduce config coordinator bundling config specific logic
This commit is contained in:
Max Inden 2019-02-25 12:01:46 +01:00 committed by GitHub
commit 6d555302fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 280 additions and 86 deletions

View File

@ -186,12 +186,9 @@ func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
// Update config and resolve timeout of each API. APIv2 also needs
// setAlertStatus to be updated.
func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration, setAlertStatus func(model.LabelSet) error) error {
if err := api.v1.Update(cfg, resolveTimeout); err != nil {
return err
}
return api.v2.Update(cfg, resolveTimeout, setAlertStatus)
func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet) error) {
api.v1.Update(cfg)
api.v2.Update(cfg, setAlertStatus)
}
func (api *API) limitHandler(h http.Handler) http.Handler {

View File

@ -67,14 +67,13 @@ func setCORS(w http.ResponseWriter) {
// API provides registration of handlers for API routes.
type API struct {
alerts provider.Alerts
silences *silence.Silences
config *config.Config
route *dispatch.Route
resolveTimeout time.Duration
uptime time.Time
peer *cluster.Peer
logger log.Logger
alerts provider.Alerts
silences *silence.Silences
config *config.Config
route *dispatch.Route
uptime time.Time
peer *cluster.Peer
logger log.Logger
numReceivedAlerts *prometheus.CounterVec
numInvalidAlerts prometheus.Counter
@ -153,14 +152,12 @@ func (api *API) Register(r *route.Router) {
}
// Update sets the configuration string to a new value.
func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration) error {
func (api *API) Update(cfg *config.Config) {
api.mtx.Lock()
defer api.mtx.Unlock()
api.resolveTimeout = resolveTimeout
api.config = cfg
api.route = dispatch.NewRoute(cfg.Route, nil)
return nil
}
type errorType string
@ -432,7 +429,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
now := time.Now()
api.mtx.RLock()
resolveTimeout := api.resolveTimeout
resolveTimeout := time.Duration(api.config.Global.ResolveTimeout)
api.mtx.RUnlock()
for _, alert := range alerts {

View File

@ -133,6 +133,12 @@ func TestAddAlerts(t *testing.T) {
alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
defaultGlobalConfig := config.DefaultGlobalConfig()
route := config.Route{}
api.Update(&config.Config{
Global: &defaultGlobalConfig,
Route: &route,
})
r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
w := httptest.NewRecorder()

View File

@ -56,11 +56,10 @@ type API struct {
getAlertStatus getAlertStatusFn
uptime time.Time
// mtx protects resolveTimeout, alertmanagerConfig, setAlertStatus and route.
// mtx protects alertmanagerConfig, setAlertStatus and route.
mtx sync.RWMutex
// resolveTimeout represents the default resolve timeout that an alert is
// assigned if no end time is specified.
resolveTimeout time.Duration
alertmanagerConfig *config.Config
route *dispatch.Route
setAlertStatus setAlertStatusFn
@ -124,15 +123,13 @@ func NewAPI(
}
// Update sets the API struct members that may change between reloads of alertmanager.
func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration, setAlertStatus setAlertStatusFn) error {
func (api *API) Update(cfg *config.Config, setAlertStatus setAlertStatusFn) {
api.mtx.Lock()
defer api.mtx.Unlock()
api.resolveTimeout = resolveTimeout
api.alertmanagerConfig = cfg
api.route = dispatch.NewRoute(cfg.Route, nil)
api.setAlertStatus = setAlertStatus
return nil
}
func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware.Responder {
@ -337,7 +334,7 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
now := time.Now()
api.mtx.RLock()
resolveTimeout := api.resolveTimeout
resolveTimeout := time.Duration(api.alertmanagerConfig.Global.ResolveTimeout)
api.mtx.RUnlock()
for _, alert := range alerts {

View File

@ -15,8 +15,6 @@ package main
import (
"context"
"crypto/md5"
"encoding/binary"
"fmt"
"net"
"net/http"
@ -56,18 +54,6 @@ import (
)
var (
configHash = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_config_hash",
Help: "Hash of the currently loaded alertmanager configuration.",
})
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_config_last_reload_successful",
Help: "Whether the last configuration reload attempt was successful.",
})
configSuccessTime = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})
requestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "alertmanager_http_request_duration_seconds",
@ -88,9 +74,6 @@ var (
)
func init() {
prometheus.MustRegister(configSuccess)
prometheus.MustRegister(configSuccessTime)
prometheus.MustRegister(configHash)
prometheus.MustRegister(requestDuration)
prometheus.MustRegister(responseSize)
prometheus.MustRegister(version.NewCollector("alertmanager"))
@ -263,14 +246,6 @@ func run() int {
}
defer alerts.Close()
var (
inhibitor *inhibit.Inhibitor
tmpl *template.Template
pipeline notify.Stage
disp *dispatch.Dispatcher
)
defer disp.Stop()
api, err := api.New(api.Options{
Alerts: alerts,
Silences: silences,
@ -304,30 +279,24 @@ func run() int {
return d + waitFunc()
}
var hash float64
reload := func() (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "file", *configFile)
defer func() {
if err != nil {
level.Error(logger).Log("msg", "Loading configuration file failed", "file", *configFile, "err", err)
configSuccess.Set(0)
} else {
configSuccess.Set(1)
configSuccessTime.Set(float64(time.Now().Unix()))
configHash.Set(hash)
}
}()
var (
inhibitor *inhibit.Inhibitor
tmpl *template.Template
pipeline notify.Stage
disp *dispatch.Dispatcher
)
conf, plainCfg, err := config.LoadFile(*configFile)
if err != nil {
return err
}
hash = md5HashAsMetricValue(plainCfg)
defer disp.Stop()
configCoordinator := config.NewCoordinator(
*configFile,
prometheus.DefaultRegisterer,
log.With(logger, "component", "configuration"),
)
configCoordinator.Subscribe(func(conf *config.Config) error {
tmpl, err = template.FromGlobs(conf.Templates...)
if err != nil {
return err
return fmt.Errorf("failed to parse templates: %v", err.Error())
}
tmpl.ExternalURL = amURL
@ -347,10 +316,7 @@ func run() int {
logger,
)
err = api.Update(conf, time.Duration(conf.Global.ResolveTimeout), setAlertStatus(inhibitor, marker, silences))
if err != nil {
return err
}
api.Update(conf, setAlertStatus(inhibitor, marker, silences))
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc, logger)
@ -358,9 +324,9 @@ func run() int {
go inhibitor.Run()
return nil
}
})
if err := reload(); err != nil {
if err := configCoordinator.Reload(); err != nil {
return 1
}
@ -413,9 +379,9 @@ func run() int {
select {
case <-hup:
// ignore error, already logged in `reload()`
_ = reload()
_ = configCoordinator.Reload()
case errc := <-webReload:
errc <- reload()
errc <- configCoordinator.Reload()
}
}
}()
@ -470,15 +436,6 @@ func extURL(listen, external string) (*url.URL, error) {
return u, nil
}
func md5HashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
// We only want 48 bits as a float64 only has a 53 bit mantissa.
smallSum := sum[0:6]
var bytes = make([]byte, 8)
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}
func setAlertStatus(inhibitor *inhibit.Inhibitor, marker types.Marker, silences *silence.Silences) func(model.LabelSet) error {
return func(labels model.LabelSet) error {
inhibitor.Mutes(labels)

157
config/coordinator.go Normal file
View File

@ -0,0 +1,157 @@
// Copyright 2019 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package config
import (
"crypto/md5"
"encoding/binary"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)
// Coordinator coordinates Alertmanager configurations beyond the lifetime of a
// single configuration.
type Coordinator struct {
configFilePath string
logger log.Logger
// Protects config and subscribers
mutex sync.Mutex
config *Config
subscribers []func(*Config) error
configHashMetric prometheus.Gauge
configSuccessMetric prometheus.Gauge
configSuccessTimeMetric prometheus.Gauge
}
// NewCoordinator returns a new coordinator with the given configuration file
// path. It does not yet load the configuration from file. This is done in
// `Reload()`.
func NewCoordinator(configFilePath string, r prometheus.Registerer, l log.Logger) *Coordinator {
c := &Coordinator{
configFilePath: configFilePath,
logger: l,
}
c.registerMetrics(r)
return c
}
func (c *Coordinator) registerMetrics(r prometheus.Registerer) {
configHash := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_config_hash",
Help: "Hash of the currently loaded alertmanager configuration.",
})
configSuccess := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_config_last_reload_successful",
Help: "Whether the last configuration reload attempt was successful.",
})
configSuccessTime := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})
r.MustRegister(configHash, configSuccess, configSuccessTime)
c.configHashMetric = configHash
c.configSuccessMetric = configSuccess
c.configSuccessTimeMetric = configSuccessTime
}
// Subscribe subscribes the given Subscribers to configuration changes.
func (c *Coordinator) Subscribe(ss ...func(*Config) error) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.subscribers = append(c.subscribers, ss...)
}
func (c *Coordinator) notifySubscribers() error {
for _, s := range c.subscribers {
if err := s(c.config); err != nil {
return err
}
}
return nil
}
// loadFromFile triggers a configuration load, discarding the old configuration.
func (c *Coordinator) loadFromFile() error {
level.Info(c.logger).Log(
"msg", "Loading configuration file",
"file", c.configFilePath,
)
conf, plainConfig, err := LoadFile(c.configFilePath)
if err != nil {
c.configSuccessMetric.Set(0)
level.Error(c.logger).Log(
"msg", "Loading configuration file failed",
"file", c.configFilePath,
"err", err,
)
return err
}
c.config = conf
c.configSuccessMetric.Set(1)
c.configSuccessTimeMetric.Set(float64(time.Now().Unix()))
hash := md5HashAsMetricValue(plainConfig)
c.configHashMetric.Set(hash)
return nil
}
// Reload triggers a configuration reload from file and notifies all
// configuration change subscribers.
func (c *Coordinator) Reload() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if err := c.loadFromFile(); err != nil {
c.logger.Log(
"msg", "loading configuration file failed",
"file", c.configFilePath,
"err", err,
)
return err
}
if err := c.notifySubscribers(); err != nil {
c.logger.Log(
"msg", "one or more config change subscribers failed to apply new config",
"file", c.configFilePath,
"err", err,
)
return err
}
return nil
}
func md5HashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
// We only want 48 bits as a float64 only has a 53 bit mantissa.
smallSum := sum[0:6]
var bytes = make([]byte, 8)
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}

View File

@ -0,0 +1,83 @@
// Copyright 2019 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package config
import (
"errors"
"testing"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
)
type fakeRegisterer struct {
registeredCollectors []prometheus.Collector
}
func (r *fakeRegisterer) Register(prometheus.Collector) error {
return nil
}
func (r *fakeRegisterer) MustRegister(c ...prometheus.Collector) {
r.registeredCollectors = append(r.registeredCollectors, c...)
}
func (r *fakeRegisterer) Unregister(prometheus.Collector) bool {
return false
}
func TestCoordinatorRegistersMetrics(t *testing.T) {
fr := fakeRegisterer{}
NewCoordinator("testdata/conf.good.yml", &fr, log.NewNopLogger())
if len(fr.registeredCollectors) == 0 {
t.Error("expected NewCoordinator to register metrics on the given registerer")
}
}
func TestCoordinatorNotifiesSubscribers(t *testing.T) {
callBackCalled := false
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), log.NewNopLogger())
c.Subscribe(func(*Config) error {
callBackCalled = true
return nil
})
err := c.Reload()
if err != nil {
t.Fatal(err)
}
if !callBackCalled {
t.Fatal("expected coordinator.Reload() to call subscribers")
}
}
func TestCoordinatorFailReloadWhenSubscriberFails(t *testing.T) {
errMessage := "something happened"
c := NewCoordinator("testdata/conf.good.yml", prometheus.NewRegistry(), log.NewNopLogger())
c.Subscribe(func(*Config) error {
return errors.New(errMessage)
})
err := c.Reload()
if err == nil {
t.Fatal("expected reload to throw an error")
}
if err.Error() != errMessage {
t.Fatalf("expected error message %q but got %q", errMessage, err)
}
}