Convert Alertmanager to use non-global go-kit loggers

Fixes https://github.com/prometheus/alertmanager/issues/1040
This commit is contained in:
Julius Volz 2017-10-21 22:59:33 -07:00
parent 4369eb3244
commit 947970af44
11 changed files with 288 additions and 222 deletions

View File

@ -22,8 +22,9 @@ import (
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/common/version"
@ -81,6 +82,7 @@ type API struct {
resolveTimeout time.Duration
uptime time.Time
mrouter *mesh.Router
logger log.Logger
groups groupsFn
getAlertStatus getAlertStatusFn
@ -92,7 +94,7 @@ type groupsFn func([]*labels.Matcher) dispatch.AlertOverview
type getAlertStatusFn func(model.Fingerprint) types.AlertStatus
// New returns a new API.
func New(alerts provider.Alerts, silences *silence.Silences, gf groupsFn, sf getAlertStatusFn, router *mesh.Router) *API {
func New(alerts provider.Alerts, silences *silence.Silences, gf groupsFn, sf getAlertStatusFn, router *mesh.Router, l log.Logger) *API {
return &API{
alerts: alerts,
silences: silences,
@ -100,6 +102,7 @@ func New(alerts provider.Alerts, silences *silence.Silences, gf groupsFn, sf get
getAlertStatus: sf,
uptime: time.Now(),
mrouter: router,
logger: l,
}
}
@ -171,7 +174,7 @@ func (api *API) receivers(w http.ResponseWriter, req *http.Request) {
receivers = append(receivers, r.Name)
}
respond(w, receivers)
api.respond(w, receivers)
}
func (api *API) status(w http.ResponseWriter, req *http.Request) {
@ -200,7 +203,7 @@ func (api *API) status(w http.ResponseWriter, req *http.Request) {
api.mtx.RUnlock()
respond(w, status)
api.respond(w, status)
}
type meshStatus struct {
@ -245,7 +248,7 @@ func (api *API) alertGroups(w http.ResponseWriter, r *http.Request) {
if filter := r.FormValue("filter"); filter != "" {
matchers, err = parse.Matchers(filter)
if err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
@ -255,7 +258,7 @@ func (api *API) alertGroups(w http.ResponseWriter, r *http.Request) {
groups := api.groups(matchers)
respond(w, groups)
api.respond(w, groups)
}
func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
@ -273,7 +276,7 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
if filter := r.FormValue("filter"); filter != "" {
matchers, err = parse.Matchers(filter)
if err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
@ -285,7 +288,7 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
if silencedParam == "false" {
showSilenced = false
} else if silencedParam != "true" {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorBadData,
err: fmt.Errorf(
"parameter 'silenced' can either be 'true' or 'false', not '%v'",
@ -300,7 +303,7 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
if inhibitedParam == "false" {
showInhibited = false
} else if inhibitedParam != "true" {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorBadData,
err: fmt.Errorf(
"parameter 'inhibited' can either be 'true' or 'false', not '%v'",
@ -314,7 +317,7 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
if receiverParam := r.FormValue("receiver"); receiverParam != "" {
re, err = regexp.Compile("^(?:" + receiverParam + ")$")
if err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorBadData,
err: fmt.Errorf(
"failed to parse receiver param: %s",
@ -374,7 +377,7 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
}
if err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
@ -383,7 +386,7 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
sort.Slice(res, func(i, j int) bool {
return res[i].Fingerprint < res[j].Fingerprint
})
respond(w, res)
api.respond(w, res)
}
func regexpAny(re *regexp.Regexp, ss []string) bool {
@ -414,7 +417,7 @@ func (api *API) legacyAddAlerts(w http.ResponseWriter, r *http.Request) {
Labels model.LabelSet `json:"labels"`
Payload model.LabelSet `json:"payload"`
}{}
if err := receive(r, &legacyAlerts); err != nil {
if err := api.receive(r, &legacyAlerts); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
@ -442,8 +445,8 @@ func (api *API) legacyAddAlerts(w http.ResponseWriter, r *http.Request) {
func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) {
var alerts []*types.Alert
if err := receive(r, &alerts); err != nil {
respondError(w, apiError{
if err := api.receive(r, &alerts); err != nil {
api.respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
@ -489,7 +492,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
validAlerts = append(validAlerts, a)
}
if err := api.alerts.Put(validAlerts...); err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
@ -497,20 +500,20 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
}
if validationErrs.Len() > 0 {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorBadData,
err: validationErrs,
}, nil)
return
}
respond(w, nil)
api.respond(w, nil)
}
func (api *API) setSilence(w http.ResponseWriter, r *http.Request) {
var sil types.Silence
if err := receive(r, &sil); err != nil {
respondError(w, apiError{
if err := api.receive(r, &sil); err != nil {
api.respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
@ -518,7 +521,7 @@ func (api *API) setSilence(w http.ResponseWriter, r *http.Request) {
}
psil, err := silenceToProto(&sil)
if err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
@ -527,14 +530,14 @@ func (api *API) setSilence(w http.ResponseWriter, r *http.Request) {
sid, err := api.silences.Set(psil)
if err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
return
}
respond(w, struct {
api.respond(w, struct {
SilenceID string `json:"silenceId"`
}{
SilenceID: sid,
@ -551,33 +554,33 @@ func (api *API) getSilence(w http.ResponseWriter, r *http.Request) {
}
sil, err := silenceFromProto(sils[0])
if err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
return
}
respond(w, sil)
api.respond(w, sil)
}
func (api *API) delSilence(w http.ResponseWriter, r *http.Request) {
sid := route.Param(r.Context(), "sid")
if err := api.silences.Expire(sid); err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
return
}
respond(w, nil)
api.respond(w, nil)
}
func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
psils, err := api.silences.Query()
if err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
@ -588,7 +591,7 @@ func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
if filter := r.FormValue("filter"); filter != "" {
matchers, err = parse.Matchers(filter)
if err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
@ -600,7 +603,7 @@ func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
for _, ps := range psils {
s, err := silenceFromProto(ps)
if err != nil {
respondError(w, apiError{
api.respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
@ -640,7 +643,7 @@ func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
silences = append(silences, pending...)
silences = append(silences, expired...)
respond(w, silences)
api.respond(w, silences)
}
func matchesFilterLabels(s *types.Silence, matchers []*labels.Matcher) bool {
@ -724,7 +727,7 @@ type response struct {
Error string `json:"error,omitempty"`
}
func respond(w http.ResponseWriter, data interface{}) {
func (api *API) respond(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
@ -733,13 +736,13 @@ func respond(w http.ResponseWriter, data interface{}) {
Data: data,
})
if err != nil {
log.Errorf("errorr: %v", err)
level.Error(api.logger).Log("msg", "Error marshalling JSON", "err", err)
return
}
w.Write(b)
}
func respondError(w http.ResponseWriter, apiErr apiError, data interface{}) {
func (api *API) respondError(w http.ResponseWriter, apiErr apiError, data interface{}) {
w.Header().Set("Content-Type", "application/json")
switch apiErr.typ {
@ -760,18 +763,18 @@ func respondError(w http.ResponseWriter, apiErr apiError, data interface{}) {
if err != nil {
return
}
log.Errorf("api error: %v", apiErr.Error())
level.Error(api.logger).Log("msg", "API error", "err", apiErr.Error())
w.Write(b)
}
func receive(r *http.Request, v interface{}) error {
func (api *API) receive(r *http.Request, v interface{}) error {
dec := json.NewDecoder(r.Body)
defer r.Body.Close()
err := dec.Decode(v)
if err != nil {
log.Debugf("Decoding request failed: %v", err)
level.Debug(api.logger).Log("msg", "Decoding request failed", "err", err)
}
return err
}

View File

@ -31,6 +31,8 @@ import (
"syscall"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/alertmanager/api"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/dispatch"
@ -43,7 +45,7 @@ import (
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/alertmanager/ui"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/route"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/pkg/labels"
@ -101,7 +103,6 @@ func newMarkerMetrics(marker types.Marker) {
}
func main() {
peers := &stringset{}
var (
showVersion = flag.Bool("version", false, "Print version information.")
@ -113,20 +114,31 @@ func main() {
routePrefix = flag.String("web.route-prefix", "", "Prefix for the internal routes of web endpoints. Defaults to path of -web.external-url.")
listenAddress = flag.String("web.listen-address", ":9093", "Address to listen on for the web interface and API.")
meshListen = flag.String("mesh.listen-address", net.JoinHostPort("0.0.0.0", strconv.Itoa(mesh.Port)), "mesh listen address. Pass an empty string to disable.")
hwaddr = flag.String("mesh.peer-id", "", "mesh peer ID (default: MAC address)")
nickname = flag.String("mesh.nickname", mustHostname(), "mesh peer nickname")
password = flag.String("mesh.password", "", "password to join the peer network (empty password disables encryption)")
meshListen = flag.String("mesh.listen-address", net.JoinHostPort("0.0.0.0", strconv.Itoa(mesh.Port)), "Mesh listen address. Pass an empty string to disable.")
hwaddr = flag.String("mesh.peer-id", "", "Mesh peer ID (default: MAC address).")
nickname = flag.String("mesh.nickname", mustHostname(), "Mesh peer nickname.")
password = flag.String("mesh.password", "", "Password to join the peer network (empty password disables encryption).")
)
flag.Var(peers, "mesh.peer", "initial peers (may be repeated)")
peers := &stringset{}
flag.Var(peers, "mesh.peer", "Initial peers (may be repeated)")
logLevel := &promlog.AllowedLevel{}
if err := logLevel.Set("info"); err != nil {
panic(err)
}
flag.Var(logLevel, "log.level", "Only log messages with the given severity or above. One of: [debug, info, warn, error]")
flag.Parse()
logger := promlog.New(*logLevel)
if *hwaddr == "" {
*hwaddr = mustHardwareAddr()
}
if len(flag.Args()) > 0 {
log.Fatalln("Received unexpected and unparsed arguments: ", strings.Join(flag.Args(), ", "))
level.Error(logger).Log("msg", "Received unexpected and unparsed arguments", "arguments", strings.Join(flag.Args(), ", "))
os.Exit(1)
}
if *showVersion {
@ -134,19 +146,21 @@ func main() {
os.Exit(0)
}
log.Infoln("Starting alertmanager", version.Info())
log.Infoln("Build context", version.BuildContext())
level.Info(logger).Log("msg", "Starting Alertmanager", "version", version.Info())
level.Info(logger).Log("build_context", version.BuildContext())
err := os.MkdirAll(*dataDir, 0777)
if err != nil {
log.Fatal(err)
level.Error(logger).Log("msg", "Unable to create data directory", "err", err)
os.Exit(1)
}
var mrouter *mesh.Router
if *meshListen != "" {
mrouter, err = initMesh(*meshListen, *hwaddr, *nickname, *password, log.With("component", "mesh"))
mrouter, err = initMesh(*meshListen, *hwaddr, *nickname, *password, log.With(logger, "component", "mesh"))
if err != nil {
log.Fatal(err)
level.Error(logger).Log("msg", "Unable to initialize gossip mesh", "err", err)
os.Exit(1)
}
}
@ -159,20 +173,22 @@ func main() {
nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")),
nflog.WithMaintenance(15*time.Minute, stopc, wg.Done),
nflog.WithMetrics(prometheus.DefaultRegisterer),
nflog.WithLogger(log.Base().With("component", "nflog")),
nflog.WithLogger(log.With(logger, "component", "nflog")),
}
if *meshListen != "" {
notificationLogOpts = append(notificationLogOpts, nflog.WithMesh(func(g mesh.Gossiper) mesh.Gossip {
res, err := mrouter.NewGossip("nflog", g)
if err != nil {
log.Fatal(err)
level.Error(logger).Log("err", err)
os.Exit(1)
}
return res
}))
}
notificationLog, err := nflog.New(notificationLogOpts...)
if err != nil {
log.Fatal(err)
level.Error(logger).Log("err", err)
os.Exit(1)
}
marker := types.NewMarker()
@ -181,14 +197,15 @@ func main() {
silenceOpts := silence.Options{
SnapshotFile: filepath.Join(*dataDir, "silences"),
Retention: *retention,
Logger: log.Base().With("component", "silences"),
Logger: log.With(logger, "component", "silences"),
Metrics: prometheus.DefaultRegisterer,
}
if *meshListen != "" {
silenceOpts.Gossip = func(g mesh.Gossiper) mesh.Gossip {
res, err := mrouter.NewGossip("silences", g)
if err != nil {
log.Fatal(err)
level.Error(logger).Log("err", err)
os.Exit(1)
}
return res
}
@ -196,7 +213,8 @@ func main() {
silences, err := silence.New(silenceOpts)
if err != nil {
log.Fatal(err)
level.Error(logger).Log("err", err)
os.Exit(1)
}
// Start providers before router potentially sends updates.
@ -221,7 +239,8 @@ func main() {
alerts, err := mem.NewAlerts(marker, 30*time.Minute, *dataDir)
if err != nil {
log.Fatal(err)
level.Error(logger).Log("err", err)
os.Exit(1)
}
defer alerts.Close()
@ -241,11 +260,13 @@ func main() {
},
marker.Status,
mrouter,
logger,
)
amURL, err := extURL(*listenAddress, *externalURL)
if err != nil {
log.Fatal(err)
level.Error(logger).Log("err", err)
os.Exit(1)
}
waitFunc := func() time.Duration { return 0 }
@ -261,10 +282,10 @@ func main() {
var hash float64
reload := func() (err error) {
log.With("file", *configFile).Infof("Loading configuration file")
level.Info(logger).Log("msg", "Loading configuration file", "file", *configFile)
defer func() {
if err != nil {
log.With("file", *configFile).Errorf("Loading configuration file failed: %s", err)
level.Error(logger).Log("msg", "Loading configuration file failed", "file", *configFile, "err", err)
configSuccess.Set(0)
} else {
configSuccess.Set(1)
@ -294,7 +315,7 @@ func main() {
inhibitor.Stop()
disp.Stop()
inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker)
inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger)
pipeline = notify.BuildPipeline(
conf.Receivers,
tmpl,
@ -303,8 +324,9 @@ func main() {
silences,
notificationLog,
marker,
logger,
)
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc)
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc, logger)
go disp.Run()
go inhibitor.Run()
@ -331,12 +353,12 @@ func main() {
webReload := make(chan struct{})
ui.Register(router, webReload)
ui.Register(router, webReload, logger)
apiv.Register(router.WithPrefix("/api"))
log.Infoln("Listening on", *listenAddress)
go listen(*listenAddress, router)
level.Info(logger).Log("msg", "Listening", "address", *listenAddress)
go listen(*listenAddress, router, logger)
var (
hup = make(chan os.Signal)
@ -362,7 +384,7 @@ func main() {
<-term
log.Infoln("Received SIGTERM, exiting gracefully...")
level.Info(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
}
type peerDescSlice []mesh.PeerDescription
@ -397,16 +419,19 @@ func initMesh(addr, hwaddr, nickname, pw string, logger log.Logger) (*mesh.Route
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
log.Fatalf("mesh address: %s: %v", addr, err)
level.Error(logger).Log("msg", "Invalid mesh address", "address", addr, "err", err)
os.Exit(1)
}
port, err := strconv.Atoi(portStr)
if err != nil {
log.Fatalf("mesh address: %s: %v", addr, err)
level.Error(logger).Log("msg", "Invalid mesh address", "address", addr, "err", err)
os.Exit(1)
}
name, err := mesh.PeerNameFromString(hwaddr)
if err != nil {
log.Fatalf("invalid hardware address %q: %v", hwaddr, err)
level.Error(logger).Log("msg", "Invalid hardware address", "address", hwaddr, "err", err)
os.Exit(1)
}
password := []byte(pw)
@ -432,7 +457,7 @@ type printfLogger struct {
}
func (l printfLogger) Printf(f string, args ...interface{}) {
l.Debugf(f, args...)
level.Debug(l).Log(fmt.Sprintf(f, args...))
}
func extURL(listen, external string) (*url.URL, error) {
@ -463,9 +488,10 @@ func extURL(listen, external string) (*url.URL, error) {
return u, nil
}
func listen(listen string, router *route.Router) {
func listen(listen string, router *route.Router, logger log.Logger) {
if err := http.ListenAndServe(listen, router); err != nil {
log.Fatal(err)
level.Error(logger).Log("msg", "Listen error", "err", err)
os.Exit(1)
}
}

View File

@ -6,7 +6,8 @@ import (
"sync"
"time"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"golang.org/x/net/context"
@ -33,7 +34,7 @@ type Dispatcher struct {
ctx context.Context
cancel func()
log log.Logger
logger log.Logger
}
// NewDispatcher returns a new Dispatcher.
@ -43,6 +44,7 @@ func NewDispatcher(
s notify.Stage,
mk types.Marker,
to func(time.Duration) time.Duration,
l log.Logger,
) *Dispatcher {
disp := &Dispatcher{
alerts: ap,
@ -50,7 +52,7 @@ func NewDispatcher(
route: r,
marker: mk,
timeout: to,
log: log.With("component", "dispatcher"),
logger: log.With(l, "component", "dispatcher"),
}
return disp
}
@ -178,16 +180,16 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
if !ok {
// Iterator exhausted for some reason.
if err := it.Err(); err != nil {
log.Errorf("Error on alert update: %s", err)
level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
}
return
}
d.log.With("alert", alert).Debug("Received alert")
level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)
// Log errors but keep trying.
if err := it.Err(); err != nil {
log.Errorf("Error on alert update: %s", err)
level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
continue
}
@ -255,13 +257,13 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// If the group does not exist, create it.
ag, ok := groups[fp]
if !ok {
ag = newAggrGroup(d.ctx, group, route, d.timeout)
ag = newAggrGroup(d.ctx, group, route, d.timeout, d.logger)
groups[fp] = ag
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, alerts...)
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
log.Errorf("Notify for %d alerts failed: %s", len(alerts), err)
level.Error(d.logger).Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
}
return err == nil
})
@ -276,7 +278,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
type aggrGroup struct {
labels model.LabelSet
opts *RouteOpts
log log.Logger
logger log.Logger
routeKey string
ctx context.Context
@ -291,7 +293,7 @@ type aggrGroup struct {
}
// newAggrGroup returns a new aggregation group.
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration) *aggrGroup {
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
@ -304,7 +306,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(
}
ag.ctx, ag.cancel = context.WithCancel(ctx)
ag.log = log.With("aggrGroup", ag)
ag.logger = log.With(logger, "aggrGroup", ag)
// Set an initial one-time wait before flushing
// the first batch of notifications.
@ -425,7 +427,7 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
ag.mtx.Unlock()
ag.log.Debugln("flushing", alertsSlice)
level.Debug(ag.logger).Log("msg", "Flushing", "alerts", fmt.Sprintf("%v", alertsSlice))
if notify(alertsSlice...) {
ag.mtx.Lock()

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"golang.org/x/net/context"
@ -180,7 +181,7 @@ func TestAggrGroup(t *testing.T) {
}
// Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, route, nil)
ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
go ag.run(ntfy)
ag.insert(a1)
@ -228,7 +229,7 @@ func TestAggrGroup(t *testing.T) {
// immediate flushing.
// Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself.
ag = newAggrGroup(context.Background(), lset, route, nil)
ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
go ag.run(ntfy)
ag.insert(a1)

View File

@ -19,8 +19,9 @@ import (
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/oklog/pkg/group"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/alertmanager/config"
@ -34,16 +35,18 @@ type Inhibitor struct {
alerts provider.Alerts
rules []*InhibitRule
marker types.Marker
logger log.Logger
mtx sync.RWMutex
cancel func()
}
// NewInhibitor returns a new Inhibitor.
func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker) *Inhibitor {
func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker, logger log.Logger) *Inhibitor {
ih := &Inhibitor{
alerts: ap,
marker: mk,
logger: logger,
}
for _, cr := range rs {
r := NewInhibitRule(cr)
@ -75,7 +78,7 @@ func (ih *Inhibitor) run(ctx context.Context) {
return
case a := <-it.Next():
if err := it.Err(); err != nil {
log.Errorf("Error iterating alerts: %s", err)
level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err)
continue
}
if a.Resolved() {

View File

@ -27,10 +27,11 @@ import (
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
pb "github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/weaveworks/mesh"
)
@ -283,8 +284,8 @@ func (l *nlog) run() {
f := func() error {
start := l.now()
l.logger.Info("running maintenance")
defer l.logger.With("duration", l.now().Sub(start)).Info("maintenance done")
level.Info(l.logger).Log("msg", "Running maintenance")
defer level.Info(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start))
if _, err := l.GC(); err != nil {
return err
@ -310,7 +311,7 @@ Loop:
break Loop
case <-t.C:
if err := f(); err != nil {
l.logger.With("err", err).Error("running maintenance failed")
level.Error(l.logger).Log("msg", "Running maintenance failed", "err", err)
}
}
}
@ -319,7 +320,7 @@ Loop:
return
}
if err := f(); err != nil {
l.logger.With("err", err).Error("creating shutdown snapshot failed")
level.Error(l.logger).Log("msg", "Creating shutdown snapshot failed", "err", err)
}
}

View File

@ -27,11 +27,13 @@ import (
"net/http"
"net/mail"
"net/smtp"
"net/textproto"
"net/url"
"strings"
"time"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
"golang.org/x/net/context"
@ -40,7 +42,6 @@ import (
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
"net/textproto"
)
type notifierConfig interface {
@ -87,7 +88,7 @@ func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool,
// BuildReceiverIntegrations builds a list of integration notifiers off of a
// receivers config.
func BuildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template) []Integration {
func BuildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template, logger log.Logger) []Integration {
var (
integrations []Integration
add = func(name string, i int, n Notifier, nc notifierConfig) {
@ -101,35 +102,35 @@ func BuildReceiverIntegrations(nc *config.Receiver, tmpl *template.Template) []I
)
for i, c := range nc.WebhookConfigs {
n := NewWebhook(c, tmpl)
n := NewWebhook(c, tmpl, logger)
add("webhook", i, n, c)
}
for i, c := range nc.EmailConfigs {
n := NewEmail(c, tmpl)
n := NewEmail(c, tmpl, logger)
add("email", i, n, c)
}
for i, c := range nc.PagerdutyConfigs {
n := NewPagerDuty(c, tmpl)
n := NewPagerDuty(c, tmpl, logger)
add("pagerduty", i, n, c)
}
for i, c := range nc.OpsGenieConfigs {
n := NewOpsGenie(c, tmpl)
n := NewOpsGenie(c, tmpl, logger)
add("opsgenie", i, n, c)
}
for i, c := range nc.SlackConfigs {
n := NewSlack(c, tmpl)
n := NewSlack(c, tmpl, logger)
add("slack", i, n, c)
}
for i, c := range nc.HipchatConfigs {
n := NewHipchat(c, tmpl)
n := NewHipchat(c, tmpl, logger)
add("hipchat", i, n, c)
}
for i, c := range nc.VictorOpsConfigs {
n := NewVictorOps(c, tmpl)
n := NewVictorOps(c, tmpl, logger)
add("victorops", i, n, c)
}
for i, c := range nc.PushoverConfigs {
n := NewPushover(c, tmpl)
n := NewPushover(c, tmpl, logger)
add("pushover", i, n, c)
}
return integrations
@ -142,13 +143,14 @@ var userAgentHeader = fmt.Sprintf("Alertmanager/%s", version.Version)
// Webhook implements a Notifier for generic webhooks.
type Webhook struct {
// The URL to which notifications are sent.
URL string
tmpl *template.Template
URL string
tmpl *template.Template
logger log.Logger
}
// NewWebhook returns a new Webhook.
func NewWebhook(conf *config.WebhookConfig, t *template.Template) *Webhook {
return &Webhook{URL: conf.URL, tmpl: t}
func NewWebhook(conf *config.WebhookConfig, t *template.Template, l log.Logger) *Webhook {
return &Webhook{URL: conf.URL, tmpl: t, logger: l}
}
// WebhookMessage defines the JSON object send to webhook endpoints.
@ -162,11 +164,11 @@ type WebhookMessage struct {
// Notify implements the Notifier interface.
func (w *Webhook) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
data := w.tmpl.Data(receiverName(ctx), groupLabels(ctx), alerts...)
data := w.tmpl.Data(receiverName(ctx, w.logger), groupLabels(ctx, w.logger), alerts...)
groupKey, ok := GroupKey(ctx)
if !ok {
log.Errorf("group key missing")
level.Error(w.logger).Log("msg", "group key missing")
}
msg := &WebhookMessage{
@ -208,12 +210,13 @@ func (w *Webhook) retry(statusCode int) (bool, error) {
// Email implements a Notifier for email notifications.
type Email struct {
conf *config.EmailConfig
tmpl *template.Template
conf *config.EmailConfig
tmpl *template.Template
logger log.Logger
}
// NewEmail returns a new Email notifier.
func NewEmail(c *config.EmailConfig, t *template.Template) *Email {
func NewEmail(c *config.EmailConfig, t *template.Template, l log.Logger) *Email {
if _, ok := c.Headers["Subject"]; !ok {
c.Headers["Subject"] = config.DefaultEmailSubject
}
@ -223,7 +226,7 @@ func NewEmail(c *config.EmailConfig, t *template.Template) *Email {
if _, ok := c.Headers["From"]; !ok {
c.Headers["From"] = c.From
}
return &Email{conf: c, tmpl: t}
return &Email{conf: c, tmpl: t, logger: l}
}
// auth resolves a string of authentication mechanisms.
@ -322,7 +325,7 @@ func (n *Email) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
}
var (
data = n.tmpl.Data(receiverName(ctx), groupLabels(ctx), as...)
data = n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
tmpl = tmplText(n.tmpl, data, &err)
from = tmpl(n.conf.From)
to = tmpl(n.conf.To)
@ -419,13 +422,14 @@ func (n *Email) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
// PagerDuty implements a Notifier for PagerDuty notifications.
type PagerDuty struct {
conf *config.PagerdutyConfig
tmpl *template.Template
conf *config.PagerdutyConfig
tmpl *template.Template
logger log.Logger
}
// NewPagerDuty returns a new PagerDuty notifier.
func NewPagerDuty(c *config.PagerdutyConfig, t *template.Template) *PagerDuty {
return &PagerDuty{conf: c, tmpl: t}
func NewPagerDuty(c *config.PagerdutyConfig, t *template.Template, l log.Logger) *PagerDuty {
return &PagerDuty{conf: c, tmpl: t, logger: l}
}
const (
@ -455,7 +459,7 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error
var err error
var (
alerts = types.Alerts(as...)
data = n.tmpl.Data(receiverName(ctx), groupLabels(ctx), as...)
data = n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
tmpl = tmplText(n.tmpl, data, &err)
eventType = pagerDutyEventTrigger
)
@ -463,7 +467,7 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error
eventType = pagerDutyEventResolve
}
log.With("incident", key).With("eventType", eventType).Debugln("notifying PagerDuty")
level.Debug(n.logger).Log("msg", "Notifying PagerDuty", "incident", key, "eventType", eventType)
details := make(map[string]string, len(n.conf.Details))
for k, v := range n.conf.Details {
@ -512,15 +516,17 @@ func (n *PagerDuty) retry(statusCode int) (bool, error) {
// Slack implements a Notifier for Slack notifications.
type Slack struct {
conf *config.SlackConfig
tmpl *template.Template
conf *config.SlackConfig
tmpl *template.Template
logger log.Logger
}
// NewSlack returns a new Slack notification handler.
func NewSlack(conf *config.SlackConfig, tmpl *template.Template) *Slack {
func NewSlack(c *config.SlackConfig, t *template.Template, l log.Logger) *Slack {
return &Slack{
conf: conf,
tmpl: tmpl,
conf: c,
tmpl: t,
logger: l,
}
}
@ -557,7 +563,7 @@ type slackAttachmentField struct {
func (n *Slack) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
var err error
var (
data = n.tmpl.Data(receiverName(ctx), groupLabels(ctx), as...)
data = n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
tmplText = tmplText(n.tmpl, data, &err)
)
@ -609,15 +615,17 @@ func (n *Slack) retry(statusCode int) (bool, error) {
// Hipchat implements a Notifier for Hipchat notifications.
type Hipchat struct {
conf *config.HipchatConfig
tmpl *template.Template
conf *config.HipchatConfig
tmpl *template.Template
logger log.Logger
}
// NewHipchat returns a new Hipchat notification handler.
func NewHipchat(conf *config.HipchatConfig, tmpl *template.Template) *Hipchat {
func NewHipchat(c *config.HipchatConfig, t *template.Template, l log.Logger) *Hipchat {
return &Hipchat{
conf: conf,
tmpl: tmpl,
conf: c,
tmpl: t,
logger: l,
}
}
@ -634,7 +642,7 @@ func (n *Hipchat) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
var err error
var msg string
var (
data = n.tmpl.Data(receiverName(ctx), groupLabels(ctx), as...)
data = n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
tmplText = tmplText(n.tmpl, data, &err)
tmplHTML = tmplHTML(n.tmpl, data, &err)
url = fmt.Sprintf("%sv2/room/%s/notification?auth_token=%s", n.conf.APIURL, n.conf.RoomID, n.conf.AuthToken)
@ -685,13 +693,14 @@ func (n *Hipchat) retry(statusCode int) (bool, error) {
// OpsGenie implements a Notifier for OpsGenie notifications.
type OpsGenie struct {
conf *config.OpsGenieConfig
tmpl *template.Template
conf *config.OpsGenieConfig
tmpl *template.Template
logger log.Logger
}
// NewOpsGenie returns a new OpsGenie notifier.
func NewOpsGenie(c *config.OpsGenieConfig, t *template.Template) *OpsGenie {
return &OpsGenie{conf: c, tmpl: t}
func NewOpsGenie(c *config.OpsGenieConfig, t *template.Template, l log.Logger) *OpsGenie {
return &OpsGenie{conf: c, tmpl: t, logger: l}
}
type opsGenieMessage struct {
@ -726,9 +735,9 @@ func (n *OpsGenie) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
if !ok {
return false, fmt.Errorf("group key missing")
}
data := n.tmpl.Data(receiverName(ctx), groupLabels(ctx), as...)
data := n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
log.With("incident", key).Debugln("notifying OpsGenie")
level.Debug(n.logger).Log("msg", "Notifying OpsGenie", "incident", key)
var err error
tmpl := tmplText(n.tmpl, data, &err)
@ -801,8 +810,14 @@ func (n *OpsGenie) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
return false, fmt.Errorf("unexpected status code %v", resp.StatusCode)
} else if resp.StatusCode/100 != 2 {
body, _ := ioutil.ReadAll(resp.Body)
log.With("incident", key).Debugf("unexpected OpsGenie response from %s (POSTed %s), %s: %s",
apiURL, msg, resp.Status, body)
level.Debug(n.logger).Log(
"msg", "Unexpected OpsGenie response",
"incident", key,
"url", apiURL,
"posted_message", msg,
"status", resp.Status,
"response_body", body,
)
return false, fmt.Errorf("unexpected status code %v", resp.StatusCode)
}
return false, nil
@ -810,15 +825,17 @@ func (n *OpsGenie) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
// VictorOps implements a Notifier for VictorOps notifications.
type VictorOps struct {
conf *config.VictorOpsConfig
tmpl *template.Template
conf *config.VictorOpsConfig
tmpl *template.Template
logger log.Logger
}
// NewVictorOps returns a new VictorOps notifier.
func NewVictorOps(c *config.VictorOpsConfig, t *template.Template) *VictorOps {
func NewVictorOps(c *config.VictorOpsConfig, t *template.Template, l log.Logger) *VictorOps {
return &VictorOps{
conf: c,
tmpl: t,
conf: c,
tmpl: t,
logger: l,
}
}
@ -856,7 +873,7 @@ func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) (bool, error
var err error
var (
alerts = types.Alerts(as...)
data = n.tmpl.Data(receiverName(ctx), groupLabels(ctx), as...)
data = n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
tmpl = tmplText(n.tmpl, data, &err)
apiURL = fmt.Sprintf("%s%s/%s", n.conf.APIURL, n.conf.APIKey, n.conf.RoutingKey)
messageType = tmpl(n.conf.MessageType)
@ -913,7 +930,14 @@ func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) (bool, error
return false, fmt.Errorf("could not parse error response %q", body)
}
log.With("incident", key).Debugf("unexpected VictorOps response from %s (POSTed %s), %s: %s", apiURL, msg, resp.Status, body)
level.Debug(n.logger).Log(
"msg", "Unexpected VictorOps response",
"incident", key,
"url", apiURL,
"posted_message", msg,
"status", resp.Status,
"response_body", body,
)
return false, fmt.Errorf("error when posting alert: result %q, message %q",
responseMessage.Result, responseMessage.Message)
@ -924,13 +948,14 @@ func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) (bool, error
// Pushover implements a Notifier for Pushover notifications.
type Pushover struct {
conf *config.PushoverConfig
tmpl *template.Template
conf *config.PushoverConfig
tmpl *template.Template
logger log.Logger
}
// NewPushover returns a new Pushover notifier.
func NewPushover(c *config.PushoverConfig, t *template.Template) *Pushover {
return &Pushover{conf: c, tmpl: t}
func NewPushover(c *config.PushoverConfig, t *template.Template, l log.Logger) *Pushover {
return &Pushover{conf: c, tmpl: t, logger: l}
}
// Notify implements the Notifier interface.
@ -939,9 +964,9 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
if !ok {
return false, fmt.Errorf("group key missing")
}
data := n.tmpl.Data(receiverName(ctx), groupLabels(ctx), as...)
data := n.tmpl.Data(receiverName(ctx, n.logger), groupLabels(ctx, n.logger), as...)
log.With("incident", key).Debugln("notifying Pushover")
level.Debug(n.logger).Log("msg", "Notifying Pushover", "incident", key)
var err error
tmpl := tmplText(n.tmpl, data, &err)
@ -954,11 +979,11 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
parameters.Add("title", title)
if len(title) > 512 {
title = title[:512]
log.With("incident", key).Debugf("Truncated title to %q due to Pushover message limit", title)
level.Debug(n.logger).Log("msg", "Truncated title due to Pushover message limit", "truncated_title", title, "incident", key)
}
if len(title)+len(message) > 512 {
message = message[:512-len(title)]
log.With("incident", key).Debugf("Truncated message to %q due to Pushover message limit", message)
level.Debug(n.logger).Log("msg", "Truncated message due to Pushover message limit", "truncated_message", message, "incident", key)
}
message = strings.TrimSpace(message)
if message == "" {
@ -980,7 +1005,7 @@ func (n *Pushover) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
return false, err
}
u.RawQuery = parameters.Encode()
log.With("incident", key).Debugf("Pushover URL = %q", u.String())
level.Debug(n.logger).Log("msg", "Sending Pushover message", "incident", key, "url", u.String())
resp, err := ctxhttp.Post(ctx, http.DefaultClient, u.String(), "text/plain", nil)
if err != nil {

View File

@ -21,8 +21,9 @@ import (
"github.com/cenkalti/backoff"
"github.com/cespare/xxhash"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -138,10 +139,10 @@ func ReceiverName(ctx context.Context) (string, bool) {
return v, ok
}
func receiverName(ctx context.Context) string {
func receiverName(ctx context.Context, l log.Logger) string {
recv, ok := ReceiverName(ctx)
if !ok {
log.Error("missing receiver")
level.Error(l).Log("msg", "Missing receiver")
}
return recv
}
@ -153,10 +154,10 @@ func GroupKey(ctx context.Context) (string, bool) {
return v, ok
}
func groupLabels(ctx context.Context) model.LabelSet {
func groupLabels(ctx context.Context, l log.Logger) model.LabelSet {
groupLabels, ok := GroupLabels(ctx)
if !ok {
log.Error("missing group labels")
level.Error(l).Log("msg", "Missing group labels")
}
return groupLabels
}
@ -191,15 +192,15 @@ func ResolvedAlerts(ctx context.Context) ([]uint64, bool) {
// A Stage processes alerts under the constraints of the given context.
type Stage interface {
Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
}
// StageFunc wraps a function to represent a Stage.
type StageFunc func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
type StageFunc func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
// Exec implements Stage interface.
func (f StageFunc) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
return f(ctx, alerts...)
func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
return f(ctx, l, alerts...)
}
// BuildPipeline builds a map of receivers to Stages.
@ -211,6 +212,7 @@ func BuildPipeline(
silences *silence.Silences,
notificationLog nflog.Log,
marker types.Marker,
logger log.Logger,
) RoutingStage {
rs := RoutingStage{}
@ -218,15 +220,15 @@ func BuildPipeline(
ss := NewSilenceStage(silences, marker)
for _, rc := range confs {
rs[rc.Name] = MultiStage{is, ss, createStage(rc, tmpl, wait, notificationLog)}
rs[rc.Name] = MultiStage{is, ss, createStage(rc, tmpl, wait, notificationLog, logger)}
}
return rs
}
// createStage creates a pipeline of stages for a receiver.
func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.Duration, notificationLog nflog.Log) Stage {
func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.Duration, notificationLog nflog.Log, logger log.Logger) Stage {
var fs FanoutStage
for _, i := range BuildReceiverIntegrations(rc, tmpl) {
for _, i := range BuildReceiverIntegrations(rc, tmpl, logger) {
recv := &nflogpb.Receiver{
GroupName: rc.Name,
Integration: i.name,
@ -248,7 +250,7 @@ func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.
type RoutingStage map[string]Stage
// Exec implements the Stage interface.
func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
receiver, ok := ReceiverName(ctx)
if !ok {
return ctx, nil, fmt.Errorf("receiver missing")
@ -259,21 +261,21 @@ func (rs RoutingStage) Exec(ctx context.Context, alerts ...*types.Alert) (contex
return ctx, nil, fmt.Errorf("stage for receiver missing")
}
return s.Exec(ctx, alerts...)
return s.Exec(ctx, l, alerts...)
}
// A MultiStage executes a series of stages sequencially.
type MultiStage []Stage
// Exec implements the Stage interface.
func (ms MultiStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var err error
for _, s := range ms {
if len(alerts) == 0 {
return ctx, nil, nil
}
ctx, alerts, err = s.Exec(ctx, alerts...)
ctx, alerts, err = s.Exec(ctx, l, alerts...)
if err != nil {
return ctx, nil, err
}
@ -286,7 +288,7 @@ type FanoutStage []Stage
// Exec attempts to execute all stages concurrently and discards the results.
// It returns its input alerts and a types.MultiError if one or more stages fail.
func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
wg sync.WaitGroup
me types.MultiError
@ -295,9 +297,9 @@ func (fs FanoutStage) Exec(ctx context.Context, alerts ...*types.Alert) (context
for _, s := range fs {
go func(s Stage) {
if _, _, err := s.Exec(ctx, alerts...); err != nil {
if _, _, err := s.Exec(ctx, l, alerts...); err != nil {
me.Add(err)
log.Errorf("Error on notify: %s", err)
level.Error(l).Log("msg", "Error on notify", "err", err)
}
wg.Done()
}(s)
@ -325,7 +327,7 @@ func NewInhibitStage(m types.Muter, mk types.Marker) *InhibitStage {
}
// Exec implements the Stage interface.
func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (n *InhibitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
// TODO(fabxc): increment total alerts counter.
@ -354,7 +356,7 @@ func NewSilenceStage(s *silence.Silences, mk types.Marker) *SilenceStage {
}
// Exec implements the Stage interface.
func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (n *SilenceStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
for _, a := range alerts {
// TODO(fabxc): increment total alerts counter.
@ -364,7 +366,7 @@ func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) (contex
silence.QMatches(a.Labels),
)
if err != nil {
log.Errorf("Querying silences failed: %s", err)
level.Error(l).Log("msg", "Querying silences failed", "err", err)
}
if len(sils) == 0 {
@ -397,7 +399,7 @@ func NewWaitStage(wait func() time.Duration) *WaitStage {
}
// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
@ -501,7 +503,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint
}
// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
@ -567,7 +569,7 @@ func NewRetryStage(i Integration) *RetryStage {
}
// Exec implements the Stage interface.
func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
i = 0
b = backoff.NewExponentialBackOff()
@ -593,7 +595,7 @@ func (r RetryStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.C
case <-tick.C:
if retry, err := r.integration.Notify(ctx, alerts...); err != nil {
numFailedNotifications.WithLabelValues(r.integration.name).Inc()
log.Debugf("Notify attempt %d for %q failed: %s", i, r.integration.name, err)
level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.name, "err", err)
if !retry {
return ctx, alerts, fmt.Errorf("Cancelling notify retry for %q due to unrecoverable error: %s", r.integration.name, err)
}
@ -631,7 +633,7 @@ func NewSetNotifiesStage(l nflog.Log, recv *nflogpb.Receiver) *SetNotifiesStage
}
// Exec implements the Stage interface.
func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")

View File

@ -20,6 +20,7 @@ import (
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/prometheus/common/model"
@ -47,7 +48,7 @@ func (f notifierFunc) Notify(ctx context.Context, alerts ...*types.Alert) (bool,
type failStage struct{}
func (s failStage) Exec(ctx context.Context, as ...*types.Alert) (context.Context, []*types.Alert, error) {
func (s failStage) Exec(ctx context.Context, l log.Logger, as ...*types.Alert) (context.Context, []*types.Alert, error) {
return ctx, nil, fmt.Errorf("some error")
}
@ -168,12 +169,12 @@ func TestDedupStage(t *testing.T) {
ctx := context.Background()
_, _, err := s.Exec(ctx)
_, _, err := s.Exec(ctx, log.NewNopLogger())
require.EqualError(t, err, "group key missing")
ctx = WithGroupKey(ctx, "1")
_, _, err = s.Exec(ctx)
_, _, err = s.Exec(ctx, log.NewNopLogger())
require.EqualError(t, err, "repeat interval missing")
ctx = WithRepeatInterval(ctx, time.Hour)
@ -184,14 +185,14 @@ func TestDedupStage(t *testing.T) {
s.nflog = &testNflog{
qerr: errors.New("bad things"),
}
ctx, res, err := s.Exec(ctx, alerts...)
ctx, res, err := s.Exec(ctx, log.NewNopLogger(), alerts...)
require.EqualError(t, err, "bad things")
// ... but skip ErrNotFound.
s.nflog = &testNflog{
qerr: nflog.ErrNotFound,
}
ctx, res, err = s.Exec(ctx, alerts...)
ctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.NoError(t, err, "unexpected error on not found log entry")
require.Equal(t, alerts, res, "input alerts differ from result alerts")
@ -202,7 +203,7 @@ func TestDedupStage(t *testing.T) {
{FiringAlerts: []uint64{1, 2, 3}},
},
}
ctx, res, err = s.Exec(ctx, alerts...)
ctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.Contains(t, err.Error(), "result size")
// Must return no error and no alerts no need to update.
@ -216,7 +217,7 @@ func TestDedupStage(t *testing.T) {
},
},
}
ctx, res, err = s.Exec(ctx, alerts...)
ctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.NoError(t, err)
require.Nil(t, res, "unexpected alerts returned")
@ -231,7 +232,7 @@ func TestDedupStage(t *testing.T) {
},
},
}
ctx, res, err = s.Exec(ctx, alerts...)
ctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.NoError(t, err)
require.Equal(t, alerts, res, "unexpected alerts returned")
}
@ -244,14 +245,14 @@ func TestMultiStage(t *testing.T) {
)
stage := MultiStage{
StageFunc(func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
if !reflect.DeepEqual(alerts, alerts1) {
t.Fatal("Input not equal to input of MultiStage")
}
ctx = context.WithValue(ctx, "key", "value")
return ctx, alerts2, nil
}),
StageFunc(func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
if !reflect.DeepEqual(alerts, alerts2) {
t.Fatal("Input not equal to output of previous stage")
}
@ -263,7 +264,7 @@ func TestMultiStage(t *testing.T) {
}),
}
_, alerts, err := stage.Exec(context.Background(), alerts1...)
_, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), alerts1...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
@ -280,7 +281,7 @@ func TestMultiStageFailure(t *testing.T) {
stage = MultiStage{s1}
)
_, _, err := stage.Exec(ctx, nil)
_, _, err := stage.Exec(ctx, log.NewNopLogger(), nil)
if err.Error() != "some error" {
t.Fatal("Errors were not propagated correctly by MultiStage")
}
@ -293,7 +294,7 @@ func TestRoutingStage(t *testing.T) {
)
stage := RoutingStage{
"name": StageFunc(func(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
"name": StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
if !reflect.DeepEqual(alerts, alerts1) {
t.Fatal("Input not equal to input of RoutingStage")
}
@ -304,7 +305,7 @@ func TestRoutingStage(t *testing.T) {
ctx := WithReceiverName(context.Background(), "name")
_, alerts, err := stage.Exec(ctx, alerts1...)
_, alerts, err := stage.Exec(ctx, log.NewNopLogger(), alerts1...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
@ -379,21 +380,21 @@ func TestSetNotifiesStage(t *testing.T) {
alerts := []*types.Alert{{}, {}, {}}
ctx := context.Background()
resctx, res, err := s.Exec(ctx, alerts...)
resctx, res, err := s.Exec(ctx, log.NewNopLogger(), alerts...)
require.EqualError(t, err, "group key missing")
require.Nil(t, res)
require.NotNil(t, resctx)
ctx = WithGroupKey(ctx, "1")
resctx, res, err = s.Exec(ctx, alerts...)
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.EqualError(t, err, "firing alerts missing")
require.Nil(t, res)
require.NotNil(t, resctx)
ctx = WithFiringAlerts(ctx, []uint64{0, 1, 2})
resctx, res, err = s.Exec(ctx, alerts...)
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.EqualError(t, err, "resolved alerts missing")
require.Nil(t, res)
require.NotNil(t, resctx)
@ -407,7 +408,7 @@ func TestSetNotifiesStage(t *testing.T) {
require.Equal(t, []uint64{}, resolvedAlerts)
return nil
}
resctx, res, err = s.Exec(ctx, alerts...)
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.Nil(t, err)
require.Equal(t, alerts, res)
require.NotNil(t, resctx)
@ -422,7 +423,7 @@ func TestSetNotifiesStage(t *testing.T) {
require.Equal(t, []uint64{0, 1, 2}, resolvedAlerts)
return nil
}
resctx, res, err = s.Exec(ctx, alerts...)
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.Nil(t, err)
require.Equal(t, alerts, res)
require.NotNil(t, resctx)
@ -471,7 +472,7 @@ func TestSilenceStage(t *testing.T) {
// the WasSilenced flag set to true afterwards.
marker.SetSilenced(inAlerts[1].Fingerprint(), "123")
_, alerts, err := silencer.Exec(nil, inAlerts...)
_, alerts, err := silencer.Exec(nil, log.NewNopLogger(), inAlerts...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}
@ -524,7 +525,7 @@ func TestInhibitStage(t *testing.T) {
// the WasInhibited flag set to true afterwards.
marker.SetInhibited(inAlerts[1].Fingerprint(), "123")
_, alerts, err := inhibitor.Exec(nil, inAlerts...)
_, alerts, err := inhibitor.Exec(nil, log.NewNopLogger(), inAlerts...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
}

View File

@ -26,12 +26,13 @@ import (
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
"github.com/pkg/errors"
pb "github.com/prometheus/alertmanager/silence/silencepb"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
"github.com/weaveworks/mesh"
@ -128,7 +129,7 @@ func newSilenceMetricByState(s *Silences, st SilenceState) prometheus.GaugeFunc
func() float64 {
count, err := s.CountState(st)
if err != nil {
s.logger.With("err", err).Error("counting silences failed")
level.Error(s.logger).Log("msg", "Counting silences failed", "err", err)
}
return float64(count)
},
@ -258,8 +259,8 @@ func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-cha
f := func() error {
start := s.now()
s.logger.Info("running maintenance")
defer s.logger.With("duration", s.now().Sub(start)).Info("maintenance done")
level.Info(s.logger).Log("msg", "Running maintenance")
defer level.Info(s.logger).Log("msg", "Maintenance done", "duration", s.now().Sub(start))
if _, err := s.GC(); err != nil {
return err
@ -285,7 +286,7 @@ Loop:
break Loop
case <-t.C:
if err := f(); err != nil {
s.logger.With("err", err).Error("running maintenance failed")
level.Info(s.logger).Log("msg", "Running maintenance failed", "err", err)
}
}
}
@ -294,7 +295,7 @@ Loop:
return
}
if err := f(); err != nil {
s.logger.With("err", err).Info("msg", "creating shutdown snapshot failed")
level.Info(s.logger).Log("msg", "Creating shutdown snapshot failed", "err", err)
}
}

View File

@ -20,22 +20,23 @@ import (
_ "net/http/pprof" // Comment this line to disable pprof endpoint.
"path/filepath"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/route"
)
func serveAsset(w http.ResponseWriter, req *http.Request, fp string) {
func serveAsset(w http.ResponseWriter, req *http.Request, fp string, logger log.Logger) {
info, err := AssetInfo(fp)
if err != nil {
log.Warn("Could not get file: ", err)
level.Warn(logger).Log("msg", "Could not get file", "err", err)
w.WriteHeader(http.StatusNotFound)
return
}
file, err := Asset(fp)
if err != nil {
if err != io.EOF {
log.With("file", fp).Warn("Could not get file: ", err)
level.Warn(logger).Log("msg", "Could not get file", "file", fp, "err", err)
}
w.WriteHeader(http.StatusNotFound)
return
@ -46,27 +47,27 @@ func serveAsset(w http.ResponseWriter, req *http.Request, fp string) {
}
// Register registers handlers to serve files for the web interface.
func Register(r *route.Router, reloadCh chan<- struct{}) {
func Register(r *route.Router, reloadCh chan<- struct{}, logger log.Logger) {
ihf := prometheus.InstrumentHandlerFunc
r.Get("/metrics", prometheus.Handler().ServeHTTP)
r.Get("/", ihf("index", func(w http.ResponseWriter, req *http.Request) {
serveAsset(w, req, "ui/app/index.html")
serveAsset(w, req, "ui/app/index.html", logger)
}))
r.Get("/script.js", ihf("app", func(w http.ResponseWriter, req *http.Request) {
serveAsset(w, req, "ui/app/script.js")
serveAsset(w, req, "ui/app/script.js", logger)
}))
r.Get("/favicon.ico", ihf("app", func(w http.ResponseWriter, req *http.Request) {
serveAsset(w, req, "ui/app/favicon.ico")
serveAsset(w, req, "ui/app/favicon.ico", logger)
}))
r.Get("/lib/*filepath", ihf("lib_files",
func(w http.ResponseWriter, req *http.Request) {
fp := route.Param(req.Context(), "filepath")
serveAsset(w, req, filepath.Join("ui/lib", fp))
serveAsset(w, req, filepath.Join("ui/lib", fp), logger)
},
))