Merge pull request #481 from prometheus/fabxc-meshsil

*: integrate new silence package
This commit is contained in:
Fabian Reinartz 2016-08-30 16:53:34 +02:00 committed by GitHub
commit 8d88d9e05b
18 changed files with 211 additions and 1533 deletions

View File

@ -20,16 +20,18 @@ import (
"sync"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/common/version"
"github.com/satori/go.uuid"
"golang.org/x/net/context"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/silence/silencepb"
"github.com/prometheus/alertmanager/types"
)
@ -55,7 +57,7 @@ func init() {
// API provides registration of handlers for API routes.
type API struct {
alerts provider.Alerts
silences provider.Silences
silences *silence.Silences
config string
resolveTimeout time.Duration
uptime time.Time
@ -68,7 +70,7 @@ type API struct {
}
// New returns a new API.
func New(alerts provider.Alerts, silences provider.Silences, gf func() dispatch.AlertOverview) *API {
func New(alerts provider.Alerts, silences *silence.Silences, gf func() dispatch.AlertOverview) *API {
return &API{
context: route.Context,
alerts: alerts,
@ -292,16 +294,20 @@ func (api *API) addSilence(w http.ResponseWriter, r *http.Request) {
}, nil)
return
}
if err := sil.Init(); err != nil {
psil, err := silenceToProto(&sil)
if err != nil {
respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
return
}
// Drop start time for new silences so we default to now.
if sil.ID == "" && sil.StartsAt.Before(time.Now()) {
psil.StartsAt = nil
}
sid, err := api.silences.Set(&sil)
sid, err := api.silences.Create(psil)
if err != nil {
respondError(w, apiError{
typ: errorInternal,
@ -311,46 +317,38 @@ func (api *API) addSilence(w http.ResponseWriter, r *http.Request) {
}
respond(w, struct {
SilenceID uuid.UUID `json:"silenceId"`
SilenceID string `json:"silenceId"`
}{
SilenceID: sid,
})
}
func (api *API) getSilence(w http.ResponseWriter, r *http.Request) {
sids := route.Param(api.context(r), "sid")
sid, err := uuid.FromString(sids)
if err != nil {
respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
return
}
sid := route.Param(api.context(r), "sid")
sil, err := api.silences.Get(sid)
if err != nil {
sils, err := api.silences.Query(silence.QIDs(sid))
if err != nil || len(sils) == 0 {
http.Error(w, fmt.Sprint("Error getting silence: ", err), http.StatusNotFound)
return
}
respond(w, &sil)
}
func (api *API) delSilence(w http.ResponseWriter, r *http.Request) {
sids := route.Param(api.context(r), "sid")
sid, err := uuid.FromString(sids)
sil, err := silenceFromProto(sils[0])
if err != nil {
respondError(w, apiError{
typ: errorBadData,
typ: errorInternal,
err: err,
}, nil)
return
}
if err := api.silences.Del(sid); err != nil {
respond(w, sil)
}
func (api *API) delSilence(w http.ResponseWriter, r *http.Request) {
sid := route.Param(api.context(r), "sid")
if err := api.silences.Expire(sid); err != nil {
respondError(w, apiError{
typ: errorInternal,
typ: errorBadData,
err: err,
}, nil)
return
@ -359,7 +357,7 @@ func (api *API) delSilence(w http.ResponseWriter, r *http.Request) {
}
func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
sils, err := api.silences.All()
psils, err := api.silences.Query()
if err != nil {
respondError(w, apiError{
typ: errorInternal,
@ -367,9 +365,102 @@ func (api *API) listSilences(w http.ResponseWriter, r *http.Request) {
}, nil)
return
}
var sils []*types.Silence
for _, ps := range psils {
s, err := silenceFromProto(ps)
if err != nil {
respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
return
}
sils = append(sils, s)
}
respond(w, sils)
}
func silenceToProto(s *types.Silence) (*silencepb.Silence, error) {
startsAt, err := ptypes.TimestampProto(s.StartsAt)
if err != nil {
return nil, err
}
endsAt, err := ptypes.TimestampProto(s.EndsAt)
if err != nil {
return nil, err
}
updatedAt, err := ptypes.TimestampProto(s.UpdatedAt)
if err != nil {
return nil, err
}
sil := &silencepb.Silence{
Id: s.ID,
StartsAt: startsAt,
EndsAt: endsAt,
UpdatedAt: updatedAt,
}
for _, m := range s.Matchers {
matcher := &silencepb.Matcher{
Name: m.Name,
Pattern: m.Value,
Type: silencepb.Matcher_EQUAL,
}
if m.IsRegex {
matcher.Type = silencepb.Matcher_REGEXP
}
sil.Matchers = append(sil.Matchers, matcher)
}
sil.Comments = append(sil.Comments, &silencepb.Comment{
Timestamp: updatedAt,
Author: s.CreatedBy,
Comment: s.Comment,
})
return sil, nil
}
func silenceFromProto(s *silencepb.Silence) (*types.Silence, error) {
startsAt, err := ptypes.Timestamp(s.StartsAt)
if err != nil {
return nil, err
}
endsAt, err := ptypes.Timestamp(s.EndsAt)
if err != nil {
return nil, err
}
updatedAt, err := ptypes.Timestamp(s.UpdatedAt)
if err != nil {
return nil, err
}
sil := &types.Silence{
ID: s.Id,
StartsAt: startsAt,
EndsAt: endsAt,
UpdatedAt: updatedAt,
}
for _, m := range s.Matchers {
matcher := &types.Matcher{
Name: m.Name,
Value: m.Pattern,
}
switch m.Type {
case silencepb.Matcher_EQUAL:
case silencepb.Matcher_REGEXP:
matcher.IsRegex = true
default:
return nil, fmt.Errorf("unknown matcher type")
}
sil.Matchers = append(sil.Matchers, matcher)
}
if len(s.Comments) > 0 {
sil.CreatedBy = s.Comments[0].Author
sil.Comment = s.Comments[0].Comment
}
return sil, nil
}
type status string
const (

View File

@ -40,7 +40,7 @@ import (
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider/mem"
meshprov "github.com/prometheus/alertmanager/provider/mesh"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/alertmanager/ui"
@ -106,6 +106,9 @@ func main() {
log.Fatal(err)
}
logger := kitlog.NewLogfmtLogger(kitlog.NewSyncWriter(os.Stderr))
logger = kitlog.NewContext(logger).With("ts", kitlog.DefaultTimestampUTC, "caller", kitlog.DefaultCaller)
mrouter := initMesh(*meshListen, *hwaddr, *nickname)
stopc := make(chan struct{})
@ -119,7 +122,7 @@ func main() {
nflog.WithRetention(*retention),
nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")),
nflog.WithMaintenance(15*time.Minute, stopc, wg.Done),
nflog.WithLogger(kitlog.NewLogfmtLogger(os.Stdout)),
nflog.WithLogger(kitlog.NewContext(logger).With("component", "nflog")),
)
if err != nil {
log.Fatal(err)
@ -127,15 +130,24 @@ func main() {
marker := types.NewMarker()
silences, err := meshprov.NewSilences(marker, log.Base(), *retention, filepath.Join(*dataDir, "silences"))
silences, err := silence.New(silence.Options{
SnapshotFile: filepath.Join(*dataDir, "silences"),
Retention: *retention,
Logger: kitlog.NewContext(logger).With("component", "silences"),
Gossip: func(g mesh.Gossiper) mesh.Gossip {
return mrouter.NewGossip("silences", g)
},
})
if err != nil {
log.Fatal(err)
}
silences.Register(mrouter.NewGossip("silences", silences))
// Start providers before router potentially sends updates.
go silences.Run()
wg.Add(1)
go func() {
silences.Maintenance(15*time.Minute, filepath.Join(*dataDir, "silences"), stopc)
wg.Done()
}()
mrouter.Start()
@ -143,11 +155,6 @@ func main() {
close(stopc)
// Stop receiving updates from router before shutting down.
mrouter.Stop()
go func() {
silences.Stop()
wg.Done()
}()
wg.Wait()
}()

View File

@ -8,7 +8,6 @@ import (
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
"golang.org/x/net/context"
"github.com/prometheus/alertmanager/notify"
@ -73,8 +72,8 @@ type AlertBlock struct {
type APIAlert struct {
*model.Alert
Inhibited bool `json:"inhibited"`
Silenced *uuid.UUID `json:"silenced,omitempty"`
Inhibited bool `json:"inhibited"`
Silenced string `json:"silenced,omitempty"`
}
// AlertGroup is a list of alert blocks grouped by the same label set.
@ -121,7 +120,7 @@ func (d *Dispatcher) Groups() AlertOverview {
Inhibited: d.marker.Inhibited(a.Fingerprint()),
}
if sid, ok := d.marker.Silenced(a.Fingerprint()); ok {
aa.Silenced = &sid
aa.Silenced = sid
}
apiAlerts = append(apiAlerts, aa)
}

View File

@ -32,7 +32,7 @@ import (
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb"
meshprov "github.com/prometheus/alertmanager/provider/mesh"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
)
@ -180,7 +180,7 @@ func BuildPipeline(
tmpl *template.Template,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silences *meshprov.Silences,
silences *silence.Silences,
notificationLog nflog.Log,
marker types.Marker,
) RoutingStage {
@ -317,18 +317,26 @@ func (n *InhibitStage) Exec(ctx context.Context, alerts ...*types.Alert) (contex
// SilenceStage filters alerts through a silence muter.
type SilenceStage struct {
muter types.Muter
marker types.Marker
silences *silence.Silences
marker types.Marker
}
// NewSilenceStage returns a new SilenceStage.
func NewSilenceStage(m types.Muter, mk types.Marker) *SilenceStage {
func NewSilenceStage(s *silence.Silences, mk types.Marker) *SilenceStage {
return &SilenceStage{
muter: m,
marker: mk,
silences: s,
marker: mk,
}
}
func lsetToStrings(ls model.LabelSet) map[string]string {
m := make(map[string]string, len(ls))
for k, v := range ls {
m[string(k)] = string(v)
}
return m
}
// Exec implements the Stage interface.
func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
@ -336,11 +344,21 @@ func (n *SilenceStage) Exec(ctx context.Context, alerts ...*types.Alert) (contex
_, ok := n.marker.Silenced(a.Fingerprint())
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if the silencer mutes it.
if !n.muter.Mutes(a.Labels) {
sils, err := n.silences.Query(
silence.QState(silence.StateActive),
silence.QMatches(lsetToStrings(a.Labels)),
)
if err != nil {
log.Errorf("Querying silences failed: %s", err)
}
if len(sils) == 0 {
// TODO(fabxc): increment muted alerts counter.
filtered = append(filtered, a)
n.marker.SetSilenced(a.Labels.Fingerprint())
// Store whether a previously silenced alert is firing again.
a.WasSilenced = ok
} else {
n.marker.SetSilenced(a.Labels.Fingerprint(), sils[0].Id)
}
}

View File

@ -28,8 +28,9 @@ import (
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/silence/silencepb"
"github.com/prometheus/alertmanager/types"
"github.com/satori/go.uuid"
)
type failStage struct{}
@ -339,14 +340,19 @@ func TestSetNotifiesStage(t *testing.T) {
}
func TestSilenceStage(t *testing.T) {
// Mute all label sets that have a "mute" key.
muter := types.MuteFunc(func(lset model.LabelSet) bool {
_, ok := lset["mute"]
return ok
})
silences, err := silence.New(silence.Options{})
if err != nil {
t.Fatal(err)
}
if _, err := silences.Create(&silencepb.Silence{
EndsAt: mustTimestampProto(utcNow().Add(time.Hour)),
Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}},
}); err != nil {
t.Fatal(err)
}
marker := types.NewMarker()
silencer := NewSilenceStage(muter, marker)
silencer := NewSilenceStage(silences, marker)
in := []model.LabelSet{
{},
@ -374,7 +380,7 @@ func TestSilenceStage(t *testing.T) {
// Set the second alert als previously silenced. It is expected to have
// the WasSilenced flag set to true afterwards.
marker.SetSilenced(inAlerts[1].Fingerprint(), uuid.NewV4())
marker.SetSilenced(inAlerts[1].Fingerprint(), "123")
_, alerts, err := silencer.Exec(nil, inAlerts...)
if err != nil {

View File

@ -1,20 +0,0 @@
syntax = "proto3";
package msg;
message Timestamp {
int64 seconds = 1;
int32 nanoseconds = 2;
}
message NotificationInfo {
string receiver = 1;
uint64 alert = 2;
bool resolved = 3;
Timestamp timestamp = 4;
Timestamp expiresAt = 5;
}
message NotificationInfoSet {
repeated NotificationInfo infos = 1;
}

View File

@ -1,111 +0,0 @@
// Code generated by protoc-gen-go.
// source: msg.proto
// DO NOT EDIT!
/*
Package msg is a generated protocol buffer package.
It is generated from these files:
msg.proto
It has these top-level messages:
Timestamp
NotificationInfo
NotificationInfoSet
*/
package msg
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Timestamp struct {
Seconds int64 `protobuf:"varint,1,opt,name=seconds" json:"seconds,omitempty"`
Nanoseconds int32 `protobuf:"varint,2,opt,name=nanoseconds" json:"nanoseconds,omitempty"`
}
func (m *Timestamp) Reset() { *m = Timestamp{} }
func (m *Timestamp) String() string { return proto.CompactTextString(m) }
func (*Timestamp) ProtoMessage() {}
func (*Timestamp) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type NotificationInfo struct {
Receiver string `protobuf:"bytes,1,opt,name=receiver" json:"receiver,omitempty"`
Alert uint64 `protobuf:"varint,2,opt,name=alert" json:"alert,omitempty"`
Resolved bool `protobuf:"varint,3,opt,name=resolved" json:"resolved,omitempty"`
Timestamp *Timestamp `protobuf:"bytes,4,opt,name=timestamp" json:"timestamp,omitempty"`
ExpiresAt *Timestamp `protobuf:"bytes,5,opt,name=expiresAt" json:"expiresAt,omitempty"`
}
func (m *NotificationInfo) Reset() { *m = NotificationInfo{} }
func (m *NotificationInfo) String() string { return proto.CompactTextString(m) }
func (*NotificationInfo) ProtoMessage() {}
func (*NotificationInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *NotificationInfo) GetTimestamp() *Timestamp {
if m != nil {
return m.Timestamp
}
return nil
}
func (m *NotificationInfo) GetExpiresAt() *Timestamp {
if m != nil {
return m.ExpiresAt
}
return nil
}
type NotificationInfoSet struct {
Infos []*NotificationInfo `protobuf:"bytes,1,rep,name=infos" json:"infos,omitempty"`
}
func (m *NotificationInfoSet) Reset() { *m = NotificationInfoSet{} }
func (m *NotificationInfoSet) String() string { return proto.CompactTextString(m) }
func (*NotificationInfoSet) ProtoMessage() {}
func (*NotificationInfoSet) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *NotificationInfoSet) GetInfos() []*NotificationInfo {
if m != nil {
return m.Infos
}
return nil
}
func init() {
proto.RegisterType((*Timestamp)(nil), "msg.Timestamp")
proto.RegisterType((*NotificationInfo)(nil), "msg.NotificationInfo")
proto.RegisterType((*NotificationInfoSet)(nil), "msg.NotificationInfoSet")
}
func init() { proto.RegisterFile("msg.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 231 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x90, 0xc1, 0x4a, 0x03, 0x31,
0x10, 0x86, 0x59, 0xb7, 0xd1, 0x66, 0x0a, 0x22, 0x51, 0x21, 0x78, 0x2a, 0x7b, 0x12, 0x94, 0x1e,
0xf4, 0x09, 0xf4, 0x22, 0x5e, 0x3c, 0x44, 0x5f, 0x20, 0x6e, 0xa7, 0x12, 0xe8, 0x66, 0x96, 0x64,
0x28, 0x3e, 0x9c, 0x0f, 0x67, 0x36, 0xcb, 0x46, 0x59, 0x7a, 0x9b, 0x7f, 0xfe, 0x6f, 0x7e, 0x66,
0x06, 0x64, 0x17, 0xbf, 0x36, 0x7d, 0x20, 0x26, 0x55, 0xa7, 0xb2, 0x79, 0x01, 0xf9, 0xe1, 0x3a,
0x8c, 0x6c, 0xbb, 0x5e, 0x69, 0x38, 0x8b, 0xd8, 0x92, 0xdf, 0x46, 0x5d, 0xad, 0xab, 0xdb, 0xda,
0x4c, 0x52, 0xad, 0x61, 0xe5, 0xad, 0xa7, 0xc9, 0x3d, 0x49, 0xae, 0x30, 0xff, 0x5b, 0xcd, 0x4f,
0x05, 0x17, 0x6f, 0xc4, 0x6e, 0xe7, 0x5a, 0xcb, 0x8e, 0xfc, 0xab, 0xdf, 0x91, 0xba, 0x81, 0x65,
0xc0, 0x16, 0xdd, 0x01, 0x43, 0x4e, 0x94, 0xa6, 0x68, 0x75, 0x05, 0xc2, 0xee, 0x31, 0x70, 0x0e,
0x5b, 0x98, 0x51, 0x8c, 0x13, 0x91, 0xf6, 0x07, 0xdc, 0xea, 0x3a, 0x19, 0x4b, 0x53, 0xb4, 0xba,
0x07, 0xc9, 0xd3, 0xae, 0x7a, 0x91, 0xcc, 0xd5, 0xc3, 0xf9, 0x66, 0xb8, 0xa7, 0x5c, 0x60, 0xfe,
0x80, 0x81, 0xc6, 0xef, 0xde, 0xa5, 0xe9, 0x27, 0xd6, 0xe2, 0x38, 0x5d, 0x80, 0xe6, 0x19, 0x2e,
0xe7, 0xdb, 0xbf, 0x23, 0xab, 0x3b, 0x10, 0x2e, 0x95, 0xc3, 0x3f, 0xea, 0x14, 0x70, 0x9d, 0x03,
0xe6, 0xa0, 0x19, 0x99, 0xcf, 0xd3, 0xfc, 0xd7, 0xc7, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xaf,
0x6f, 0x04, 0x82, 0x64, 0x01, 0x00, 0x00,
}

View File

@ -1,246 +0,0 @@
package mesh
import (
"fmt"
"os"
"time"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
"github.com/weaveworks/mesh"
)
// replaceFile wraps a file that is moved to another filename on closing.
type replaceFile struct {
*os.File
filename string
}
func (f *replaceFile) Close() error {
if err := f.File.Sync(); err != nil {
return err
}
if err := f.File.Close(); err != nil {
return err
}
return os.Rename(f.File.Name(), f.filename)
}
// openReplace opens a new temporary file that is moved to filename on closing.
func openReplace(filename string) (*replaceFile, error) {
tmpFilename := fmt.Sprintf("%s.%s", filename, utcNow().Format(time.RFC3339Nano))
f, err := os.Create(tmpFilename)
if err != nil {
return nil, err
}
rf := &replaceFile{
File: f,
filename: filename,
}
return rf, nil
}
// TODO(fabxc): consider making this a flag.
const maintenanceInterval = 15 * time.Minute
type Silences struct {
st *silenceState
mk types.Marker
send mesh.Gossip
stopc chan struct{}
logger log.Logger
retention time.Duration
snapfile string
}
// NewSilences creates a new Silences object.
func NewSilences(mk types.Marker, logger log.Logger, retention time.Duration, snapfile string) (*Silences, error) {
s := &Silences{
st: newSilenceState(),
mk: mk,
stopc: make(chan struct{}),
logger: logger,
retention: retention,
snapfile: snapfile,
}
f, err := os.Open(snapfile)
if os.IsNotExist(err) {
return s, nil
}
if err != nil {
return nil, err
}
defer f.Close()
return s, s.st.loadSnapshot(f)
}
// Register a gossip channel over which silences are shared.
func (s *Silences) Register(g mesh.Gossip) {
s.send = g
}
// Run blocking background processing. Cannot be run more than once.
func (s *Silences) Run() {
for {
select {
case <-s.stopc:
return
case <-time.After(maintenanceInterval):
s.st.gc(s.retention)
if err := s.snapshot(); err != nil {
s.logger.With("err", err).Errorf("Snapshotting failed")
}
}
}
}
func (s *Silences) snapshot() error {
s.logger.Warnf("creating snapshot")
f, err := openReplace(s.snapfile)
if err != nil {
return err
}
if err := s.st.snapshot(f); err != nil {
return err
}
s.logger.Warnf("snapshot created")
return f.Close()
}
// Stop signals the background processing to be stopped.
func (s *Silences) Stop() {
log.Errorf("stopping silences")
close(s.stopc)
if err := s.snapshot(); err != nil {
s.logger.With("err", err).Errorf("Snapshotting failed")
}
}
// Mutes returns true iff any of the known silences mutes the provided label set.
func (s *Silences) Mutes(lset model.LabelSet) bool {
s.st.mtx.RLock()
defer s.st.mtx.RUnlock()
for _, sil := range s.st.m {
if sil.Mutes(lset) {
s.mk.SetSilenced(lset.Fingerprint(), sil.ID)
return true
}
}
s.mk.SetSilenced(lset.Fingerprint())
return false
}
// All returns a list of all known silences.
func (s *Silences) All() ([]*types.Silence, error) {
s.st.mtx.RLock()
defer s.st.mtx.RUnlock()
res := make([]*types.Silence, 0, len(s.st.m))
for _, sil := range s.st.m {
if !sil.Deleted() {
res = append(res, sil)
}
}
return res, nil
}
// Set overwrites the given silence or creates a new one if it doesn't exist yet.
// The new information is spread via the registered gossip channel.
func (s *Silences) Set(sil *types.Silence) (uuid.UUID, error) {
if sil.ID == uuid.Nil {
sil.ID = uuid.NewV4()
}
if err := s.st.set(sil); err != nil {
return uuid.Nil, err
}
s.send.GossipBroadcast(&silenceState{
m: map[uuid.UUID]*types.Silence{
sil.ID: sil,
},
})
return sil.ID, nil
}
// Del removes the silence with the given ID. The new information is spread via
// the registered gossip channel.
// Active silences are not deleted but their end time is set to now.
//
// TODO(fabxc): consider actually deleting silences that haven't started yet.
func (s *Silences) Del(id uuid.UUID) error {
sil, err := s.st.del(id)
if err != nil {
return err
}
update := &silenceState{
m: map[uuid.UUID]*types.Silence{
sil.ID: sil,
},
}
s.send.GossipBroadcast(update)
return nil
}
// Get the silence with the given ID.
func (s *Silences) Get(id uuid.UUID) (*types.Silence, error) {
s.st.mtx.RLock()
defer s.st.mtx.RUnlock()
sil, ok := s.st.m[id]
if !ok || sil.Deleted() {
return nil, provider.ErrNotFound
}
// TODO(fabxc): ensure that silence objects are never modified; just replaced.
return sil, nil
}
// Gossip implements the mesh.Gossiper interface.
func (s *Silences) Gossip() mesh.GossipData {
return s.st.copy()
}
// OnGossip implements the mesh.Gossiper interface.
func (s *Silences) OnGossip(b []byte) (mesh.GossipData, error) {
set, err := decodeSilenceSet(b)
if err != nil {
return nil, err
}
d := s.st.mergeDelta(set)
// The delta is newly created and we are the only one holding it so far.
// Thus, we can access without locking.
if len(d.m) == 0 {
return nil, nil // per OnGossip contract
}
return d, nil
}
// OnGossipBroadcast implements the mesh.Gossiper interface.
func (s *Silences) OnGossipBroadcast(_ mesh.PeerName, b []byte) (mesh.GossipData, error) {
set, err := decodeSilenceSet(b)
if err != nil {
return nil, err
}
d := s.st.mergeDelta(set)
return d, nil
}
// OnGossipUnicast implements the mesh.Gossiper interface.
func (s *Silences) OnGossipUnicast(_ mesh.PeerName, b []byte) error {
set, err := decodeSilenceSet(b)
if err != nil {
return err
}
s.st.mergeComplete(set)
return nil
}

View File

@ -1,151 +0,0 @@
package mesh
import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"testing"
"time"
"github.com/kylelemons/godebug/pretty"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/log"
"github.com/satori/go.uuid"
"github.com/weaveworks/mesh"
)
func TestReplaceFile(t *testing.T) {
dir, err := ioutil.TempDir("", "replace_file")
if err != nil {
t.Fatal(err)
}
origFilename := filepath.Join(dir, "testfile")
of, err := os.Create(origFilename)
if err != nil {
t.Fatal(err)
}
nf, err := openReplace(filepath.Join(dir, "testfile"))
if err != nil {
t.Fatalf("Creating test file failed: %s", err)
}
if _, err := nf.Write([]byte("test")); err != nil {
t.Fatalf("Writing replace file failed: %s", err)
}
if nf.Name() == of.Name() {
t.Fatalf("Replacement file must not have same name as original")
}
if err := nf.Close(); err != nil {
t.Fatalf("Closing replace file failed: %s", err)
}
of.Close()
ofr, err := os.Open(origFilename)
if err != nil {
t.Fatal(err)
}
defer ofr.Close()
res, err := ioutil.ReadAll(ofr)
if err != nil {
t.Fatal(err)
}
if string(res) != "test" {
t.Fatalf("File contents do not match; got %q, expected %q", string(res), "test")
}
}
func TestSilencesSet(t *testing.T) {
var (
now = utcNow()
id1 = uuid.NewV4()
matchers = types.NewMatchers(types.NewMatcher("a", "b"))
)
cases := []struct {
input *types.Silence
update map[uuid.UUID]*types.Silence
fail bool
}{
{
// Set an invalid silence.
input: &types.Silence{},
fail: true,
},
{
// Set a silence including ID.
input: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(time.Minute),
EndsAt: now.Add(time.Hour),
CreatedBy: "x",
Comment: "x",
},
update: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(time.Minute),
EndsAt: now.Add(time.Hour),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
},
},
}
for i, c := range cases {
t.Logf("Test case %d", i)
s, err := NewSilences(nil, log.Base(), time.Hour, "")
if err != nil {
t.Fatal(err)
}
tg := &testGossip{}
s.Register(tg)
s.st.now = func() time.Time { return now }
beforeID := c.input.ID
uid, err := s.Set(c.input)
if err != nil {
if c.fail {
continue
}
t.Errorf("Unexpected error: %s", err)
continue
}
if c.fail {
t.Errorf("Expected error but got none")
continue
}
if beforeID != uuid.Nil && uid != beforeID {
t.Errorf("Silence ID unexpectedly changed: before %q, after %q", beforeID, uid)
continue
}
// Verify the update propagated.
if have := tg.updates[0].(*silenceState).m; !reflect.DeepEqual(have, c.update) {
t.Errorf("Update did not match")
t.Errorf("%s", pretty.Compare(have, c.update))
}
}
}
// testGossip implements the mesh.Gossip interface. Received broadcast
// updates are appended to a list.
type testGossip struct {
updates []mesh.GossipData
}
func (g *testGossip) GossipUnicast(dst mesh.PeerName, msg []byte) error {
panic("not implemented")
}
func (g *testGossip) GossipBroadcast(update mesh.GossipData) {
g.updates = append(g.updates, update)
}

View File

@ -1,254 +0,0 @@
package mesh
import (
"bytes"
"encoding/gob"
"fmt"
"io"
"sync"
"time"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/types"
"github.com/satori/go.uuid"
"github.com/weaveworks/mesh"
)
func utcNow() time.Time { return time.Now().UTC() }
type silenceState struct {
mtx sync.RWMutex
m map[uuid.UUID]*types.Silence
now func() time.Time // now function for test injection
}
func newSilenceState() *silenceState {
return &silenceState{
m: map[uuid.UUID]*types.Silence{},
now: utcNow,
}
}
func (st *silenceState) gc(retention time.Duration) {
st.mtx.Lock()
defer st.mtx.Unlock()
t := st.now().Add(-retention)
for k, v := range st.m {
if v.EndsAt.Before(t) {
delete(st.m, k)
}
}
}
func (st *silenceState) snapshot(w io.Writer) error {
st.mtx.RLock()
defer st.mtx.RUnlock()
enc := gob.NewEncoder(w)
for _, s := range st.m {
if err := enc.Encode(s); err != nil {
return err
}
}
return nil
}
func (st *silenceState) loadSnapshot(r io.Reader) error {
st.mtx.Lock()
defer st.mtx.Unlock()
dec := gob.NewDecoder(r)
for {
var s types.Silence
if err := dec.Decode(&s); err != nil {
if err == io.EOF {
break
}
return err
}
if err := s.Init(); err != nil {
return fmt.Errorf("iniializing silence failed: %s", err)
}
st.m[s.ID] = &s
}
return nil
}
func decodeSilenceSet(b []byte) (map[uuid.UUID]*types.Silence, error) {
var v map[uuid.UUID]*types.Silence
err := gob.NewDecoder(bytes.NewReader(b)).Decode(&v)
return v, err
}
func (st *silenceState) Encode() [][]byte {
st.mtx.RLock()
defer st.mtx.RUnlock()
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(&st.m); err != nil {
panic(err)
}
return [][]byte{buf.Bytes()}
}
const timestampTolerance = time.Second
// silenceModAllowed checks whether silence a may be changed to silence b.
// Returns an error stating the reason if not.
// The silences are guaranteed to be valid. Silence a may be nil if b is a new.
func silenceModAllowed(a, b *types.Silence, now time.Time) error {
if a == nil {
if b.StartsAt.Before(now) {
// From b being valid it follows that EndsAt will also be
// in the future.
return fmt.Errorf("new silence may not start in the past")
}
return nil
}
if a.ID != b.ID {
return fmt.Errorf("IDs do not match")
}
almostEqual := func(s, t time.Time) bool {
d := s.Sub(t)
return d <= timestampTolerance && d >= -timestampTolerance
}
if almostEqual(a.StartsAt, b.StartsAt) {
// Always pick original timestamp so we cannot drift the time
// by spamming edits.
b.StartsAt = a.StartsAt
} else {
if a.StartsAt.Before(now) {
return fmt.Errorf("start time of active silence must not be modified")
}
if b.StartsAt.Before(now) {
return fmt.Errorf("start time cannot be moved into the past")
}
}
if almostEqual(a.EndsAt, b.EndsAt) {
// Always pick original timestamp so we cannot drift the time
// by spamming edits.
b.EndsAt = a.EndsAt
} else {
if a.EndsAt.Before(now) {
return fmt.Errorf("end time must not be modified for elapsed silence")
}
if b.EndsAt.Before(now) {
return fmt.Errorf("end time must not be set into the past")
}
}
if !a.Matchers.Equal(b.Matchers) {
return fmt.Errorf("matchers must not be modified")
}
return nil
}
func (st *silenceState) set(s *types.Silence) error {
st.mtx.Lock()
defer st.mtx.Unlock()
s.StartsAt = s.StartsAt.UTC()
s.EndsAt = s.EndsAt.UTC()
now := st.now()
s.UpdatedAt = now
prev, ok := st.m[s.ID]
// Silence start for new silences must not be before now.
// Simplest solution is to reset it here if necessary.
if !ok && s.StartsAt.Before(now) {
s.StartsAt = now
}
if err := s.Validate(); err != nil {
return err
}
if err := silenceModAllowed(prev, s, now); err != nil {
return err
}
st.m[s.ID] = s
return nil
}
func (st *silenceState) del(id uuid.UUID) (*types.Silence, error) {
st.mtx.Lock()
defer st.mtx.Unlock()
prev, ok := st.m[id]
if !ok {
return nil, provider.ErrNotFound
}
now := st.now()
// Silences are immutable by contract so we create a
// shallow copy.
sil := *prev
sil.UpdatedAt = now
// If silence hasn't started yet, terminate it at
// its starting time.
if sil.StartsAt.After(now) {
sil.EndsAt = sil.StartsAt
} else {
sil.EndsAt = now
}
if err := sil.Validate(); err != nil {
return nil, err
}
if err := silenceModAllowed(prev, &sil, now); err != nil {
return nil, err
}
st.m[sil.ID] = &sil
return &sil, nil
}
func (st *silenceState) Merge(other mesh.GossipData) mesh.GossipData {
o := other.(*silenceState)
o.mtx.RLock()
defer o.mtx.RUnlock()
return st.mergeComplete(o.m)
}
func (st *silenceState) mergeComplete(set map[uuid.UUID]*types.Silence) *silenceState {
st.mtx.Lock()
defer st.mtx.Unlock()
for k, v := range set {
if prev, ok := st.m[k]; !ok || prev.UpdatedAt.Before(v.UpdatedAt) {
st.m[k] = v
}
}
return st
}
func (st *silenceState) mergeDelta(set map[uuid.UUID]*types.Silence) *silenceState {
st.mtx.Lock()
defer st.mtx.Unlock()
d := map[uuid.UUID]*types.Silence{}
for k, v := range set {
if prev, ok := st.m[k]; !ok || prev.UpdatedAt.Before(v.UpdatedAt) {
st.m[k] = v
d[k] = v
}
}
return &silenceState{m: d}
}
func (st *silenceState) copy() *silenceState {
st.mtx.RLock()
defer st.mtx.RUnlock()
res := &silenceState{
m: make(map[uuid.UUID]*types.Silence, len(st.m)),
}
for k, v := range st.m {
res.m[k] = v
}
return res
}

View File

@ -1,645 +0,0 @@
package mesh
import (
"bytes"
"reflect"
"regexp"
"strings"
"testing"
"time"
"github.com/kylelemons/godebug/pretty"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/types"
"github.com/satori/go.uuid"
)
func TestSilenceStateGC(t *testing.T) {
var (
now = utcNow()
id1 = uuid.NewV4()
id2 = uuid.NewV4()
id3 = uuid.NewV4()
id4 = uuid.NewV4()
id5 = uuid.NewV4()
)
silence := func(id uuid.UUID, t time.Time) *types.Silence {
return &types.Silence{
ID: id,
Matchers: types.NewMatchers(types.NewMatcher("a", "c")),
StartsAt: now.Add(-100 * time.Hour),
EndsAt: t,
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
}
}
initial := map[uuid.UUID]*types.Silence{
id1: silence(id1, now.Add(10*time.Minute)),
id2: silence(id2, now),
id3: silence(id3, now.Add(-10*time.Minute)),
id4: silence(id4, now.Add(-1*time.Hour)),
id5: silence(id5, now.Add(-2*time.Hour)),
}
final := map[uuid.UUID]*types.Silence{
id1: silence(id1, now.Add(10*time.Minute)),
id2: silence(id2, now),
id3: silence(id3, now.Add(-10*time.Minute)),
id4: silence(id4, now.Add(-1*time.Hour)),
}
st := newSilenceState()
st.now = func() time.Time { return now }
st.m = initial
st.gc(time.Hour)
if !reflect.DeepEqual(st.m, final) {
t.Errorf("Unexpected state after GC")
t.Errorf("%s", pretty.Compare(st.m, final))
}
}
func TestSilenceStateSnapshot(t *testing.T) {
var (
now = utcNow()
id1 = uuid.NewV4()
id2 = uuid.NewV4()
matchers = types.NewMatchers(
types.NewMatcher("a", "b"),
types.NewRegexMatcher("label", regexp.MustCompile("abc[^a].+")),
)
)
initial := map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(time.Minute),
EndsAt: now.Add(time.Hour),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
id2: &types.Silence{
ID: id2,
Matchers: matchers,
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(time.Minute),
UpdatedAt: now,
CreatedBy: "creator X",
Comment: "comment comment comment",
},
}
st := newSilenceState()
st.now = func() time.Time { return now }
st.m = initial
var buf bytes.Buffer
if err := st.snapshot(&buf); err != nil {
t.Fatalf("Snapshotting failed: %s", err)
}
st = newSilenceState()
if err := st.loadSnapshot(&buf); err != nil {
t.Fatalf("Loading snapshot failed: %s", err)
}
if !reflect.DeepEqual(st.m, initial) {
t.Errorf("Loaded snapshot did not match")
t.Errorf("%s", pretty.Compare(st.m, initial))
}
}
func TestSilenceStateSet(t *testing.T) {
var (
now = utcNow()
id1 = uuid.NewV4()
matchers = types.NewMatchers(types.NewMatcher("a", "b"))
)
cases := []struct {
initial map[uuid.UUID]*types.Silence
final map[uuid.UUID]*types.Silence
input *types.Silence
err string
}{
{
initial: map[uuid.UUID]*types.Silence{},
final: map[uuid.UUID]*types.Silence{},
// Provide an invalid silence (no matchers).
input: &types.Silence{
ID: id1,
StartsAt: now,
EndsAt: now.Add(time.Minute),
CreatedBy: "x",
Comment: "x",
},
err: "matcher",
}, {
initial: map[uuid.UUID]*types.Silence{},
final: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(time.Minute),
EndsAt: now.Add(time.Hour),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
},
input: &types.Silence{
ID: id1,
Matchers: matchers,
// Different input timezones must be normalized to UTC.
StartsAt: now.Add(time.Minute).In(time.FixedZone("test", 100000)),
EndsAt: now.Add(time.Hour).In(time.FixedZone("test", 10000000)),
CreatedBy: "x",
Comment: "x",
},
}, {
initial: map[uuid.UUID]*types.Silence{},
final: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
// StartsAt should be reset to now if it's before
// now for a new silence.
StartsAt: now,
EndsAt: now.Add(time.Hour),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
},
input: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-time.Second),
EndsAt: now.Add(time.Hour),
CreatedBy: "x",
Comment: "x",
},
}, {
initial: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(-time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
},
final: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(-time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
},
// Do an invalid modification (silence already elapsed).
input: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(time.Minute),
CreatedBy: "x",
Comment: "x",
},
err: "elapsed",
}, {
initial: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(time.Hour),
UpdatedAt: now.Add(-time.Minute),
CreatedBy: "x",
Comment: "x",
},
},
final: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(time.Minute),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
},
// Do valid modification (alter end time).
input: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(time.Minute),
CreatedBy: "x",
Comment: "x",
},
},
}
for i, c := range cases {
t.Logf("Test case %d", i)
s := newSilenceState()
s.m = c.initial
s.now = func() time.Time { return now }
if err := s.set(c.input); err != nil {
if len(c.err) > 0 {
if strings.Contains(err.Error(), c.err) {
continue
}
t.Errorf("Expected error containing %q, got %q", c.err, err)
continue
}
t.Errorf("Setting failed: %s", err)
continue
}
if !reflect.DeepEqual(s.m, c.final) {
t.Errorf("Unexpected final state")
t.Errorf("%s", pretty.Compare(s.m, c.final))
continue
}
}
}
func TestSilenceStateDel(t *testing.T) {
var (
now = utcNow()
id1 = uuid.NewV4()
matchers = types.NewMatchers(types.NewMatcher("a", "b"))
)
cases := []struct {
initial map[uuid.UUID]*types.Silence
final map[uuid.UUID]*types.Silence
input uuid.UUID
err string
}{
{
initial: map[uuid.UUID]*types.Silence{},
final: map[uuid.UUID]*types.Silence{},
// Provide a non-existant ID.
input: id1,
err: provider.ErrNotFound.Error(),
}, {
initial: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(time.Minute),
EndsAt: now.Add(2 * time.Minute),
UpdatedAt: now.Add(-time.Minute),
CreatedBy: "x",
Comment: "x",
},
},
// Deleting unstarted silence sets end timestamp to start time.
final: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(time.Minute),
EndsAt: now.Add(time.Minute),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
},
input: id1,
},
{
initial: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(time.Minute),
UpdatedAt: now.Add(-time.Minute),
CreatedBy: "x",
Comment: "x",
},
},
final: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-time.Minute),
EndsAt: now,
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
},
input: id1,
}, {
// Attempt deleting an elapsed silence.
initial: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(-5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
},
final: map[uuid.UUID]*types.Silence{
id1: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(-5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
},
input: id1,
err: "end time",
},
}
for i, c := range cases {
t.Logf("Test case %d", i)
s := newSilenceState()
s.m = c.initial
s.now = func() time.Time { return now }
sil, err := s.del(c.input)
if err != nil {
if len(c.err) > 0 {
if strings.Contains(err.Error(), c.err) {
continue
}
t.Errorf("Expected error containing %q, got %q", c.err, err)
continue
}
t.Errorf("Setting failed: %s", err)
continue
}
if !reflect.DeepEqual(s.m, c.final) {
t.Errorf("Unexpected final state")
t.Errorf("%s", pretty.Compare(s.m, c.final))
continue
}
if !reflect.DeepEqual(sil, s.m[c.input]) {
t.Errorf("Returned silence doesn't match stored silence")
}
}
}
func TestSilenceModAllowed(t *testing.T) {
var (
now = utcNow()
id1 = uuid.NewV4()
matchers = types.NewMatchers(types.NewMatcher("a", "b"))
)
cases := []struct {
a, b *types.Silence
err string
}{
{
a: nil,
b: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(1 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
},
{
// Modify silence comment and creator and set not-yet started
// end time into future.
a: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
b: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(100 * time.Minute),
UpdatedAt: now,
CreatedBy: "y",
Comment: "y",
},
},
{
a: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(-5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
b: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(-5 * time.Minute),
UpdatedAt: now,
CreatedBy: "y",
Comment: "y",
},
},
{
a: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(-5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
// Timestamp tolerance must be respected.
b: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-10*time.Minute + timestampTolerance),
EndsAt: now.Add(-5*time.Minute - timestampTolerance),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
},
{
a: nil,
b: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
err: "start in the past",
},
{
a: &types.Silence{
ID: uuid.NewV4(),
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(-5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
b: &types.Silence{
ID: uuid.NewV4(),
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(-5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
err: "IDs do not match",
},
{
a: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-10 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
b: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(1 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
err: "start time of active silence",
},
{
a: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(1 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
b: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-1 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
err: "start time cannot be moved into the past",
},
{
a: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-5 * time.Minute),
EndsAt: now.Add(-1 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
b: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-5 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
err: "end time must not be modified for elapsed silence",
},
{
a: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-5 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
b: &types.Silence{
ID: id1,
Matchers: matchers,
StartsAt: now.Add(-5 * time.Minute),
EndsAt: now.Add(-1 * time.Minute),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
err: "end time must not be set into the past",
},
{
a: &types.Silence{
ID: id1,
Matchers: types.NewMatchers(types.NewMatcher("a", "b")),
StartsAt: now.Add(-5 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now.Add(-10 * time.Minute),
CreatedBy: "x",
Comment: "x",
},
b: &types.Silence{
ID: id1,
Matchers: types.NewMatchers(types.NewMatcher("a", "c")),
StartsAt: now.Add(-5 * time.Minute),
EndsAt: now.Add(5 * time.Minute),
UpdatedAt: now,
CreatedBy: "x",
Comment: "x",
},
err: "matchers must not be modified",
},
}
for _, c := range cases {
got := silenceModAllowed(c.a, c.b, now)
if got == nil {
if c.err != "" {
t.Errorf("Expected error containing %q but got none", c.err)
}
continue
}
if c.err == "" {
t.Errorf("Expected no error but got %q", got)
} else if !strings.Contains(got.Error(), c.err) {
t.Errorf("Expected error containing %q but got %q", c.err, got)
}
}
}

View File

@ -17,7 +17,6 @@ import (
"fmt"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
"github.com/prometheus/alertmanager/types"
)
@ -87,20 +86,3 @@ type Alerts interface {
// Put adds the given alert to the set.
Put(...*types.Alert) error
}
// Silences gives access to silences. All methods are goroutine-safe.
type Silences interface {
// The Silences provider must implement the Muter interface
// for all its silences. The data provider may have access to an
// optimized view of the data to perform this evaluation.
types.Muter
// All returns all existing silences.
All() ([]*types.Silence, error)
// Set a new silence.
Set(*types.Silence) (uuid.UUID, error)
// Del removes a silence.
Del(uuid.UUID) error
// Get a silence associated with a fingerprint.
Get(uuid.UUID) (*types.Silence, error)
}

View File

@ -91,9 +91,12 @@ func New(o Options) (*Silences, error) {
return nil, err
}
if o.SnapshotFile != "" {
var err error
if o.SnapshotReader, err = os.Open(o.SnapshotFile); err != nil {
return nil, err
if r, err := os.Open(o.SnapshotFile); err != nil {
if !os.IsNotExist(err) {
return nil, err
}
} else {
o.SnapshotReader = r
}
}
s := &Silences{
@ -318,6 +321,7 @@ func (s *Silences) Create(sil *pb.Silence) (id string, err error) {
if err := s.setSilence(sil); err != nil {
return "", err
}
s.logger.Log("created silence", sil.Id)
return sil.Id, nil
}
@ -503,12 +507,13 @@ func QState(states ...SilenceState) QueryParam {
return func(q *query) error {
f := func(sil *pb.Silence, now *timestamp.Timestamp) (bool, error) {
s := getState(sil, now)
for _, ps := range states {
if s == ps {
return false, nil
return true, nil
}
}
return true, nil
return false, nil
}
q.filters = append(q.filters, f)
return nil

View File

@ -318,7 +318,7 @@ func TestQState(t *testing.T) {
cases := []struct {
sil *pb.Silence
states []SilenceState
drop bool
keep bool
}{
{
sil: &pb.Silence{
@ -326,7 +326,7 @@ func TestQState(t *testing.T) {
EndsAt: mustTimeProto(now.Add(time.Hour)),
},
states: []SilenceState{StateActive, StateExpired},
drop: true,
keep: false,
},
{
sil: &pb.Silence{
@ -334,7 +334,7 @@ func TestQState(t *testing.T) {
EndsAt: mustTimeProto(now.Add(time.Hour)),
},
states: []SilenceState{StatePending},
drop: false,
keep: true,
},
{
sil: &pb.Silence{
@ -342,7 +342,7 @@ func TestQState(t *testing.T) {
EndsAt: mustTimeProto(now.Add(time.Hour)),
},
states: []SilenceState{StateExpired, StatePending},
drop: false,
keep: true,
},
}
for i, c := range cases {
@ -350,9 +350,9 @@ func TestQState(t *testing.T) {
QState(c.states...)(q)
f := q.filters[0]
drop, err := f(c.sil, mustTimeProto(now))
keep, err := f(c.sil, mustTimeProto(now))
require.NoError(t, err)
require.Equal(t, c.drop, drop, "unexpected filter result for case %d", i)
require.Equal(t, c.keep, keep, "unexpected filter result for case %d", i)
}
}

View File

@ -30,7 +30,6 @@ import (
"github.com/prometheus/client_golang/api/alertmanager"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
"golang.org/x/net/context"
)
@ -329,10 +328,10 @@ func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
var v struct {
Status string `json:"status"`
Data struct {
SilenceID uuid.UUID `json:"silenceId"`
SilenceID string `json:"silenceId"`
} `json:"data"`
}
if err := json.Unmarshal(b, &v); err != nil {
if err := json.Unmarshal(b, &v); err != nil || resp.StatusCode/100 != 2 {
am.t.Errorf("error setting silence %v: %s", sil, err)
return
}
@ -349,8 +348,8 @@ func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) {
return
}
_, err = http.DefaultClient.Do(req)
if err != nil {
resp, err := http.DefaultClient.Do(req)
if err != nil || resp.StatusCode/100 != 2 {
am.t.Errorf("Error deleting silence %v: %s", sil, err)
return
}

View File

@ -58,7 +58,7 @@ receivers:
)
// Add a silence that affects the first alert.
am.SetSilence(At(2.5), Silence(2, 4.5).Match("alertname", "test1"))
am.SetSilence(At(2.3), Silence(2.5, 4.5).Match("alertname", "test1"))
co.Want(Between(3, 3.5), Alert("alertname", "test2").Active(1))
co.Want(Between(4, 4.5), Alert("alertname", "test2").Active(1))
@ -106,7 +106,7 @@ receivers:
// Silence everything for a long time and delete the silence after
// two iterations.
sil := Silence(1, 100).MatchRE("alertname", ".*")
sil := Silence(1.5, 100).MatchRE("alertname", ".*")
am.SetSilence(At(1.3), sil)
am.DelSilence(At(3.5), sil)

View File

@ -25,7 +25,6 @@ import (
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/types"
"github.com/satori/go.uuid"
)
// At is a convenience method to allow for declarative syntax of Acceptance
@ -54,7 +53,7 @@ func Between(start, end float64) Interval {
// TestSilence models a model.Silence with relative times.
type TestSilence struct {
ID uuid.UUID
ID string
match []string
matchRE []string
startsAt, endsAt float64

View File

@ -21,16 +21,15 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
)
// Marker helps to mark alerts as silenced and/or inhibited.
// All methods are goroutine-safe.
type Marker interface {
SetInhibited(alert model.Fingerprint, b bool)
SetSilenced(alert model.Fingerprint, sil ...uuid.UUID)
SetSilenced(alert model.Fingerprint, sil ...string)
Silenced(alert model.Fingerprint) (uuid.UUID, bool)
Silenced(alert model.Fingerprint) (string, bool)
Inhibited(alert model.Fingerprint) bool
}
@ -38,13 +37,13 @@ type Marker interface {
func NewMarker() Marker {
return &memMarker{
inhibited: map[model.Fingerprint]struct{}{},
silenced: map[model.Fingerprint]uuid.UUID{},
silenced: map[model.Fingerprint]string{},
}
}
type memMarker struct {
inhibited map[model.Fingerprint]struct{}
silenced map[model.Fingerprint]uuid.UUID
silenced map[model.Fingerprint]string
mtx sync.RWMutex
}
@ -57,7 +56,7 @@ func (m *memMarker) Inhibited(alert model.Fingerprint) bool {
return ok
}
func (m *memMarker) Silenced(alert model.Fingerprint) (uuid.UUID, bool) {
func (m *memMarker) Silenced(alert model.Fingerprint) (string, bool) {
m.mtx.RLock()
defer m.mtx.RUnlock()
@ -76,7 +75,7 @@ func (m *memMarker) SetInhibited(alert model.Fingerprint, b bool) {
}
}
func (m *memMarker) SetSilenced(alert model.Fingerprint, sil ...uuid.UUID) {
func (m *memMarker) SetSilenced(alert model.Fingerprint, sil ...string) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -206,7 +205,7 @@ func (f MuteFunc) Mutes(lset model.LabelSet) bool { return f(lset) }
// A Silence determines whether a given label set is muted.
type Silence struct {
// A unique identifier across all connected instances.
ID uuid.UUID `json:"id"`
ID string `json:"id"`
// A set of matchers determining if a label set is affect
// by the silence.
Matchers Matchers `json:"matchers"`
@ -237,7 +236,7 @@ type Silence struct {
// Validate returns true iff all fields of the silence have valid values.
func (s *Silence) Validate() error {
if s.ID == uuid.Nil {
if s.ID == "" {
return fmt.Errorf("ID missing")
}
if len(s.Matchers) == 0 {