*: integrate new silence package

This commit is contained in:
Fabian Reinartz 2016-08-30 11:58:27 +02:00
parent 3b5dfa8aba
commit a4e8703567
11 changed files with 211 additions and 88 deletions

View File

@ -20,16 +20,18 @@ import (
"sync"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/common/version"
"github.com/satori/go.uuid"
"golang.org/x/net/context"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/silence/silencepb"
"github.com/prometheus/alertmanager/types"
)
@ -55,7 +57,7 @@ func init() {
// API provides registration of handlers for API routes.
type API struct {
alerts provider.Alerts
silences provider.Silences
silences *silence.Silences
config string
resolveTimeout time.Duration
uptime time.Time
@ -68,7 +70,7 @@ type API struct {
}
// New returns a new API.
func New(alerts provider.Alerts, silences provider.Silences, gf func() dispatch.AlertOverview) *API {
func New(alerts provider.Alerts, silences *silence.Silences, gf func() dispatch.AlertOverview) *API {
return &API{
context: route.Context,
alerts: alerts,
@ -292,16 +294,20 @@ func (api *API) addSilence(w http.ResponseWriter, r *http.Request) {
}, nil)
return
}
if err := sil.Init(); err != nil {
psil, err := silenceToProto(&sil)
if err != nil {
respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
return
}
// Drop start time for new silences so we default to now.
if sil.ID == "" && sil.StartsAt.Before(time.Now()) {
psil.StartsAt = nil
}
sid, err := api.silences.Set(&sil)
sid, err := api.silences.Create(psil)
if err != nil {
respondError(w, apiError{
typ: errorInternal,
@ -311,46 +317,38 @@ func (api *API) addSilence(w http.ResponseWriter, r *http.Request) {
}
respond(w, struct {
SilenceID uuid.UUID `json:"silenceId"`
SilenceID string `json:"silenceId"`
}{
SilenceID: sid,
})
}
func (api *API) getSilence(w http.ResponseWriter, r *http.Request) {
sids := route.Param(api.context(r), "sid")
sid, err := uuid.FromString(sids)
if err != nil {
respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
return
}
sid := route.Param(api.context(r), "sid")
sil, err := api.silences.Get(sid)
if err != nil {
sils, err := api.silences.Query(silence.QIDs(sid))
if err != nil || len(sils) == 0 {
http.Error(w, fmt.Sprint("Error getting silence: ", err), http.StatusNotFound)
return
}
respond(w, &sil)
}
func (api *API) delSilence(w http.ResponseWriter, r *http.Request) {
sids := route.Param(api.context(r), "sid")
sid, err := uuid.FromString(sids)
sil, err := silenceFromProto(sils[0])
if err != nil {
respondError(w, apiError{
typ: errorBadData,
typ: errorInternal,
err: err,
}, nil)
return
}
if err := api.silences.Del(sid); err != nil {
respond(w, sil)
}
func (api *API) delSilence(w http.ResponseWriter, r *http.Request) {
sid := route.Param(api.context(r), "sid")
if err := api.silences.Expire(sid); err != nil {
respondError(w, apiError{
typ: errorInternal,
typ: errorBadData,
err: err,
}, nil)
return
@ -359,7 +357,7 @@ func (api *API) delSilence(w http.ResponseWriter, r *http.Request) {
}
func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
sils, err := api.silences.All()
psils, err := api.silences.Query()
if err != nil {
respondError(w, apiError{
typ: errorInternal,
@ -367,9 +365,102 @@ func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
}, nil)
return
}
var sils []*types.Silence
for _, ps := range psils {
s, err := silenceFromProto(ps)
if err != nil {
respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
return
}
sils = append(sils, s)
}
respond(w, sils)
}
func silenceToProto(s *types.Silence) (*silencepb.Silence, error) {
startsAt, err := ptypes.TimestampProto(s.StartsAt)
if err != nil {
return nil, err
}
endsAt, err := ptypes.TimestampProto(s.EndsAt)
if err != nil {
return nil, err
}
updatedAt, err := ptypes.TimestampProto(s.UpdatedAt)
if err != nil {
return nil, err
}
sil := &silencepb.Silence{
Id: s.ID,
StartsAt: startsAt,
EndsAt: endsAt,
UpdatedAt: updatedAt,
}
for _, m := range s.Matchers {
matcher := &silencepb.Matcher{
Name: m.Name,
Pattern: m.Value,
Type: silencepb.Matcher_EQUAL,
}
if m.IsRegex {
matcher.Type = silencepb.Matcher_REGEXP
}
sil.Matchers = append(sil.Matchers, matcher)
}
sil.Comments = append(sil.Comments, &silencepb.Comment{
Timestamp: updatedAt,
Author: s.CreatedBy,
Comment: s.Comment,
})
return sil, nil
}
func silenceFromProto(s *silencepb.Silence) (*types.Silence, error) {
startsAt, err := ptypes.Timestamp(s.StartsAt)
if err != nil {
return nil, err
}
endsAt, err := ptypes.Timestamp(s.EndsAt)
if err != nil {
return nil, err
}
updatedAt, err := ptypes.Timestamp(s.UpdatedAt)
if err != nil {
return nil, err
}
sil := &types.Silence{
ID: s.Id,
StartsAt: startsAt,
EndsAt: endsAt,
UpdatedAt: updatedAt,
}
for _, m := range s.Matchers {
matcher := &types.Matcher{
Name: m.Name,
Value: m.Pattern,
}
switch m.Type {
case silencepb.Matcher_EQUAL:
case silencepb.Matcher_REGEXP:
matcher.IsRegex = true
default:
return nil, fmt.Errorf("unknown matcher type")
}
sil.Matchers = append(sil.Matchers, matcher)
}
if len(s.Comments) > 0 {
sil.CreatedBy = s.Comments[0].Author
sil.Comment = s.Comments[0].Comment
}
return sil, nil
}
type status string
const (

View File

@ -40,7 +40,7 @@ import (
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider/mem"
meshprov "github.com/prometheus/alertmanager/provider/mesh"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/alertmanager/ui"
@ -102,6 +102,9 @@ func main() {
log.Fatal(err)
}
logger := kitlog.NewLogfmtLogger(kitlog.NewSyncWriter(os.Stderr))
logger = kitlog.NewContext(logger).With("ts", kitlog.DefaultTimestampUTC, "caller", kitlog.DefaultCaller)
mrouter := initMesh(*meshListen, *hwaddr, *nickname)
stopc := make(chan struct{})
@ -115,7 +118,7 @@ func main() {
nflog.WithRetention(*retention),
nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")),
nflog.WithMaintenance(15*time.Minute, stopc, wg.Done),
nflog.WithLogger(kitlog.NewLogfmtLogger(os.Stdout)),
nflog.WithLogger(kitlog.NewContext(logger).With("component", "nflog")),
)
if err != nil {
log.Fatal(err)
@ -123,15 +126,24 @@ func main() {
marker := types.NewMarker()
silences, err := meshprov.NewSilences(marker, log.Base(), *retention, filepath.Join(*dataDir, "silences"))
silences, err := silence.New(silence.Options{
SnapshotFile: filepath.Join(*dataDir, "silences"),
Retention: *retention,
Logger: kitlog.NewContext(logger).With("component", "silences"),
Gossip: func(g mesh.Gossiper) mesh.Gossip {
return mrouter.NewGossip("silences", g)
},
})
if err != nil {
log.Fatal(err)
}
silences.Register(mrouter.NewGossip("silences", silences))
// Start providers before router potentially sends updates.
go silences.Run()
wg.Add(1)
go func() {
silences.Maintenance(15*time.Minute, filepath.Join(*dataDir, "silences"), stopc)
wg.Done()
}()
mrouter.Start()
@ -139,11 +151,6 @@ func main() {
close(stopc)
// Stop receiving updates from router before shutting down.
mrouter.Stop()
go func() {
silences.Stop()
wg.Done()
}()
wg.Wait()
}()

View File

@ -8,7 +8,6 @@ import (
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
"golang.org/x/net/context"
"github.com/prometheus/alertmanager/notify"
@ -73,8 +72,8 @@ type AlertBlock struct {
type APIAlert struct {
*model.Alert
Inhibited bool `json:"inhibited"`
Silenced *uuid.UUID `json:"silenced,omitempty"`
Inhibited bool `json:"inhibited"`
Silenced string `json:"silenced,omitempty"`
}
// AlertGroup is a list of alert blocks grouped by the same label set.
@ -121,7 +120,7 @@ func (d *Dispatcher) Groups() AlertOverview {
Inhibited: d.marker.Inhibited(a.Fingerprint()),
}
if sid, ok := d.marker.Silenced(a.Fingerprint()); ok {
aa.Silenced = &sid
aa.Silenced = sid
}
apiAlerts = append(apiAlerts, aa)
}

View File

@ -32,7 +32,7 @@ import (
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb"
meshprov "github.com/prometheus/alertmanager/provider/mesh"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
)
@ -180,7 +180,7 @@ func BuildPipeline(
tmpl *template.Template,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silences *meshprov.Silences,
silences *silence.Silences,
notificationLog nflog.Log,
marker types.Marker,
) RoutingStage {
@ -317,18 +317,26 @@ func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) (contex
// SilenceStage filters alerts through a silence muter.
type SilenceStage struct {
muter types.Muter
marker types.Marker
silences *silence.Silences
marker types.Marker
}
// NewSilenceStage returns a new SilenceStage.
func NewSilenceStage(m types.Muter, mk types.Marker) *SilenceStage {
func NewSilenceStage(s *silence.Silences, mk types.Marker) *SilenceStage {
return &SilenceStage{
muter: m,
marker: mk,
silences: s,
marker: mk,
}
}
func lsetToStrings(ls model.LabelSet) map[string]string {
m := make(map[string]string, len(ls))
for k, v := range ls {
m[string(k)] = string(v)
}
return m
}
// Exec implements the Stage interface.
func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
@ -336,11 +344,21 @@ func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) (contex
_, ok := n.marker.Silenced(a.Fingerprint())
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
if !n.muter.Mutes(a.Labels) {
sils, err := n.silences.Query(
silence.QState(silence.StateActive),
silence.QMatches(lsetToStrings(a.Labels)),
)
if err != nil {
log.Errorf("Querying silences failed: %s", err)
}
if len(sils) == 0 {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
n.marker.SetSilenced(a.Labels.Fingerprint())
// Store whether a previously silenced alert is firing again.
a.WasSilenced = ok
} else {
n.marker.SetSilenced(a.Labels.Fingerprint(), sils[0].Id)
}
}

View File

@ -28,8 +28,9 @@ import (
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/silence/silencepb"
"github.com/prometheus/alertmanager/types"
"github.com/satori/go.uuid"
)
type failStage struct{}
@ -339,14 +340,19 @@ func TestSetNotifiesStage(t *testing.T) {
}
func TestSilenceStage(t *testing.T) {
// Mute all label sets that have a "mute" key.
muter := types.MuteFunc(func(lset model.LabelSet) bool {
_, ok := lset["mute"]
return ok
})
silences, err := silence.New(silence.Options{})
if err != nil {
t.Fatal(err)
}
if _, err := silences.Create(&silencepb.Silence{
EndsAt: mustTimestampProto(utcNow().Add(time.Hour)),
Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}},
}); err != nil {
t.Fatal(err)
}
marker := types.NewMarker()
silencer := NewSilenceStage(muter, marker)
silencer := NewSilenceStage(silences, marker)
in := []model.LabelSet{
{},
@ -374,7 +380,7 @@ func TestSilenceStage(t *testing.T) {
// Set the second alert als previously silenced. It is expected to have
// the WasSilenced flag set to true afterwards.
marker.SetSilenced(inAlerts[1].Fingerprint(), uuid.NewV4())
marker.SetSilenced(inAlerts[1].Fingerprint(), "123")
_, alerts, err := silencer.Exec(nil, inAlerts...)
if err != nil {

View File

@ -90,9 +90,12 @@ func New(o Options) (*Silences, error) {
return nil, err
}
if o.SnapshotFile != "" {
var err error
if o.SnapshotReader, err = os.Open(o.SnapshotFile); err != nil {
return nil, err
if r, err := os.Open(o.SnapshotFile); err != nil {
if !os.IsNotExist(err) {
return nil, err
}
} else {
o.SnapshotReader = r
}
}
s := &Silences{
@ -317,6 +320,7 @@ func (s *Silences) Create(sil *pb.Silence) (id string, err error) {
if err := s.setSilence(sil); err != nil {
return "", err
}
s.logger.Log("created silence", sil.Id)
return sil.Id, nil
}
@ -502,12 +506,13 @@ func QState(states ...SilenceState) QueryParam {
return func(q *query) error {
f := func(sil *pb.Silence, now *timestamp.Timestamp) (bool, error) {
s := getState(sil, now)
for _, ps := range states {
if s == ps {
return false, nil
return true, nil
}
}
return true, nil
return false, nil
}
q.filters = append(q.filters, f)
return nil

View File

@ -318,7 +318,7 @@ func TestQState(t *testing.T) {
cases := []struct {
sil *pb.Silence
states []SilenceState
drop bool
keep bool
}{
{
sil: &pb.Silence{
@ -326,7 +326,7 @@ func TestQState(t *testing.T) {
EndsAt: mustTimeProto(now.Add(time.Hour)),
},
states: []SilenceState{StateActive, StateExpired},
drop: true,
keep: false,
},
{
sil: &pb.Silence{
@ -334,7 +334,7 @@ func TestQState(t *testing.T) {
EndsAt: mustTimeProto(now.Add(time.Hour)),
},
states: []SilenceState{StatePending},
drop: false,
keep: true,
},
{
sil: &pb.Silence{
@ -342,7 +342,7 @@ func TestQState(t *testing.T) {
EndsAt: mustTimeProto(now.Add(time.Hour)),
},
states: []SilenceState{StateExpired, StatePending},
drop: false,
keep: true,
},
}
for i, c := range cases {
@ -350,9 +350,9 @@ func TestQState(t *testing.T) {
QState(c.states...)(q)
f := q.filters[0]
drop, err := f(c.sil, mustTimeProto(now))
keep, err := f(c.sil, mustTimeProto(now))
require.NoError(t, err)
require.Equal(t, c.drop, drop, "unexpected filter result for case %d", i)
require.Equal(t, c.keep, keep, "unexpected filter result for case %d", i)
}
}

View File

@ -30,7 +30,6 @@ import (
"github.com/prometheus/client_golang/api/alertmanager"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
"golang.org/x/net/context"
)
@ -329,10 +328,10 @@ func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
var v struct {
Status string `json:"status"`
Data struct {
SilenceID uuid.UUID `json:"silenceId"`
SilenceID string `json:"silenceId"`
} `json:"data"`
}
if err := json.Unmarshal(b, &v); err != nil {
if err := json.Unmarshal(b, &v); err != nil || resp.StatusCode/100 != 2 {
am.t.Errorf("error setting silence %v: %s", sil, err)
return
}
@ -349,8 +348,8 @@ func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) {
return
}
_, err = http.DefaultClient.Do(req)
if err != nil {
resp, err := http.DefaultClient.Do(req)
if err != nil || resp.StatusCode/100 != 2 {
am.t.Errorf("Error deleting silence %v: %s", sil, err)
return
}

View File

@ -58,7 +58,7 @@ receivers:
)
// Add a silence that affects the first alert.
am.SetSilence(At(2.5), Silence(2, 4.5).Match("alertname", "test1"))
am.SetSilence(At(2.3), Silence(2.5, 4.5).Match("alertname", "test1"))
co.Want(Between(3, 3.5), Alert("alertname", "test2").Active(1))
co.Want(Between(4, 4.5), Alert("alertname", "test2").Active(1))
@ -106,7 +106,7 @@ receivers:
// Silence everything for a long time and delete the silence after
// two iterations.
sil := Silence(1, 100).MatchRE("alertname", ".*")
sil := Silence(1.5, 100).MatchRE("alertname", ".*")
am.SetSilence(At(1.3), sil)
am.DelSilence(At(3.5), sil)

View File

@ -25,7 +25,6 @@ import (
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/types"
"github.com/satori/go.uuid"
)
// At is a convenience method to allow for declarative syntax of Acceptance
@ -54,7 +53,7 @@ func Between(start, end float64) Interval {
// TestSilence models a model.Silence with relative times.
type TestSilence struct {
ID uuid.UUID
ID string
match []string
matchRE []string
startsAt, endsAt float64

View File

@ -21,16 +21,15 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
)
// Marker helps to mark alerts as silenced and/or inhibited.
// All methods are goroutine-safe.
type Marker interface {
SetInhibited(alert model.Fingerprint, b bool)
SetSilenced(alert model.Fingerprint, sil ...uuid.UUID)
SetSilenced(alert model.Fingerprint, sil ...string)
Silenced(alert model.Fingerprint) (uuid.UUID, bool)
Silenced(alert model.Fingerprint) (string, bool)
Inhibited(alert model.Fingerprint) bool
}
@ -38,13 +37,13 @@ type Marker interface {
func NewMarker() Marker {
return &memMarker{
inhibited: map[model.Fingerprint]struct{}{},
silenced: map[model.Fingerprint]uuid.UUID{},
silenced: map[model.Fingerprint]string{},
}
}
type memMarker struct {
inhibited map[model.Fingerprint]struct{}
silenced map[model.Fingerprint]uuid.UUID
silenced map[model.Fingerprint]string
mtx sync.RWMutex
}
@ -57,7 +56,7 @@ func (m *memMarker) Inhibited(alert model.Fingerprint) bool {
return ok
}
func (m *memMarker) Silenced(alert model.Fingerprint) (uuid.UUID, bool) {
func (m *memMarker) Silenced(alert model.Fingerprint) (string, bool) {
m.mtx.RLock()
defer m.mtx.RUnlock()
@ -76,7 +75,7 @@ func (m *memMarker) SetInhibited(alert model.Fingerprint, b bool) {
}
}
func (m *memMarker) SetSilenced(alert model.Fingerprint, sil ...uuid.UUID) {
func (m *memMarker) SetSilenced(alert model.Fingerprint, sil ...string) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -206,7 +205,7 @@ func (f MuteFunc) Mutes(lset model.LabelSet) bool { return f(lset) }
// A Silence determines whether a given label set is muted.
type Silence struct {
// A unique identifier across all connected instances.
ID uuid.UUID `json:"id"`
ID string `json:"id"`
// A set of matchers determining if a label set is affect
// by the silence.
Matchers Matchers `json:"matchers"`
@ -237,7 +236,7 @@ type Silence struct {
// Validate returns true iff all fields of the silence have valid values.
func (s *Silence) Validate() error {
if s.ID == uuid.Nil {
if s.ID == "" {
return fmt.Errorf("ID missing")
}
if len(s.Matchers) == 0 {