provider: remove NotificationInfos provider

This commit is contained in:
Fabian Reinartz 2016-08-23 13:56:48 +02:00
parent d6713c8eeb
commit 1baf98fb1a
6 changed files with 0 additions and 752 deletions

View File

@ -45,170 +45,9 @@ func openReplace(filename string) (*replaceFile, error) {
return rf, nil
}
// NotificationInfos provides gossiped information about which
// receivers have been notified about which alerts.
type NotificationInfos struct {
st *notificationState
send mesh.Gossip
retention time.Duration
snapfile string
logger log.Logger
stopc chan struct{}
}
// NewNotificationInfos returns a new NotificationInfos object.
func NewNotificationInfos(logger log.Logger, retention time.Duration, snapfile string) (*NotificationInfos, error) {
ni := &NotificationInfos{
logger: logger,
stopc: make(chan struct{}),
st: newNotificationState(),
retention: retention,
snapfile: snapfile,
}
f, err := os.Open(snapfile)
if os.IsNotExist(err) {
return ni, nil
}
if err != nil {
return nil, err
}
defer f.Close()
return ni, ni.st.loadSnapshot(f)
}
// Register the given gossip channel.
func (ni *NotificationInfos) Register(g mesh.Gossip) {
ni.send = g
}
// TODO(fabxc): consider making this a flag.
const maintenanceInterval = 15 * time.Minute
// Run starts blocking background processing of the NotificationInfos.
// Cannot be run more than once.
func (ni *NotificationInfos) Run() {
for {
select {
case <-ni.stopc:
return
case <-time.After(maintenanceInterval):
ni.st.gc()
if err := ni.snapshot(); err != nil {
ni.logger.With("err", err).Errorf("Snapshotting failed")
}
}
}
}
func (ni *NotificationInfos) snapshot() error {
f, err := openReplace(ni.snapfile)
if err != nil {
return err
}
if err := ni.st.snapshot(f); err != nil {
return err
}
return f.Close()
}
// Stop signals the background processing to terminate.
func (ni *NotificationInfos) Stop() {
close(ni.stopc)
if err := ni.snapshot(); err != nil {
ni.logger.With("err", err).Errorf("Snapshotting failed")
}
}
// Gossip implements the mesh.Gossiper interface.
func (ni *NotificationInfos) Gossip() mesh.GossipData {
return ni.st.copy()
}
// OnGossip implements the mosh.Gossiper interface.
func (ni *NotificationInfos) OnGossip(b []byte) (mesh.GossipData, error) {
set, err := decodeNotificationSet(b)
if err != nil {
return nil, err
}
d := ni.st.mergeDelta(set)
// The delta is newly created and we are the only one holding it so far.
// Thus, we can access without locking.
if len(d.set) == 0 {
return nil, nil // per OnGossip contract
}
return d, nil
}
// OnGossipBroadcast implements the mesh.Gossiper interface.
func (ni *NotificationInfos) OnGossipBroadcast(_ mesh.PeerName, b []byte) (mesh.GossipData, error) {
set, err := decodeNotificationSet(b)
if err != nil {
return nil, err
}
return ni.st.mergeDelta(set), nil
}
// OnGossipUnicast implements the mesh.Gossiper interface.
func (ni *NotificationInfos) OnGossipUnicast(_ mesh.PeerName, b []byte) error {
set, err := decodeNotificationSet(b)
if err != nil {
return err
}
ni.st.mergeComplete(set)
return nil
}
// Set the provided notification information.
// The expiration timestamp is set to the timestamp plus the configured retention time.
func (ni *NotificationInfos) Set(ns ...*types.NotificationInfo) error {
set := map[notificationKey]notificationEntry{}
for _, n := range ns {
k := notificationKey{
Receiver: n.Receiver,
Alert: n.Alert,
}
set[k] = notificationEntry{
Resolved: n.Resolved,
Timestamp: n.Timestamp,
// The expiration timestamp is set at creation time
// to avoid synchronization artifacts in garbage collection.
ExpiresAt: n.Timestamp.Add(ni.retention),
}
}
update := &notificationState{set: set}
ni.st.Merge(update)
ni.send.GossipBroadcast(update)
return nil
}
// Get the notification information for the given receiver about alerts
// with the given fingerprints. Returns a slice in order of the input fingerprints.
// Result entries are nil if nothing was found.
func (ni *NotificationInfos) Get(recv string, fps ...model.Fingerprint) ([]*types.NotificationInfo, error) {
var (
res = make([]*types.NotificationInfo, 0, len(fps))
k = notificationKey{Receiver: recv}
now = ni.st.now()
)
for _, fp := range fps {
k.Alert = fp
if e, ok := ni.st.set[k]; ok && e.ExpiresAt.After(now) {
res = append(res, &types.NotificationInfo{
Alert: fp,
Receiver: recv,
Resolved: e.Resolved,
Timestamp: e.Timestamp,
})
} else {
res = append(res, nil)
}
}
return res, nil
}
// Silences provides gossiped silences.
type Silences struct {
st *silenceState
mk types.Marker

View File

@ -11,7 +11,6 @@ import (
"github.com/kylelemons/godebug/pretty"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
"github.com/weaveworks/mesh"
)
@ -59,314 +58,6 @@ func TestReplaceFile(t *testing.T) {
}
}
func TestNotificationInfosOnGossip(t *testing.T) {
var (
now = utcNow()
)
cases := []struct {
initial map[notificationKey]notificationEntry
msg map[notificationKey]notificationEntry
delta map[notificationKey]notificationEntry
final map[notificationKey]notificationEntry
}{
{
initial: map[notificationKey]notificationEntry{},
msg: map[notificationKey]notificationEntry{
{"recv1", 123}: {true, now, time.Time{}},
},
delta: map[notificationKey]notificationEntry{
{"recv1", 123}: {true, now, time.Time{}},
},
final: map[notificationKey]notificationEntry{
{"recv1", 123}: {true, now, time.Time{}},
},
}, {
initial: map[notificationKey]notificationEntry{
{"recv1", 123}: {true, now, time.Time{}},
},
msg: map[notificationKey]notificationEntry{
{"recv1", 123}: {false, now.Add(time.Minute), time.Time{}},
},
delta: map[notificationKey]notificationEntry{
{"recv1", 123}: {false, now.Add(time.Minute), time.Time{}},
},
final: map[notificationKey]notificationEntry{
{"recv1", 123}: {false, now.Add(time.Minute), time.Time{}},
},
}, {
initial: map[notificationKey]notificationEntry{
{"recv1", 123}: {true, now.Add(time.Minute), time.Time{}},
},
msg: map[notificationKey]notificationEntry{
{"recv1", 123}: {false, now, time.Time{}},
},
delta: map[notificationKey]notificationEntry{},
final: map[notificationKey]notificationEntry{
{"recv1", 123}: {true, now.Add(time.Minute), time.Time{}},
},
},
}
for _, c := range cases {
ni, err := NewNotificationInfos(log.Base(), time.Hour, "")
if err != nil {
t.Fatal(err)
}
for k, v := range c.initial {
ni.st.set[k] = v
}
b, err := encodeNotificationSet(c.msg)
if err != nil {
t.Fatal(err)
}
// OnGossip expects the delta but an empty set to be replaced with nil.
d, err := ni.OnGossip(b)
if err != nil {
t.Errorf("%v OnGossip %v: %s", c.initial, c.msg, err)
continue
}
want := c.final
if have := ni.st.set; !reflect.DeepEqual(want, have) {
t.Errorf("%v OnGossip %v: want %v, have %v", c.initial, c.msg, want, have)
}
want = c.delta
if len(c.delta) == 0 {
want = nil
}
if d != nil {
if have := d.(*notificationState).set; !reflect.DeepEqual(want, have) {
t.Errorf("%v OnGossip %v: want %v, have %v", c.initial, c.msg, want, have)
}
} else if want != nil {
t.Errorf("%v OnGossip %v: want nil", c.initial, c.msg)
}
}
for _, c := range cases {
ni, err := NewNotificationInfos(log.Base(), time.Hour, "")
if err != nil {
t.Fatal(err)
}
for k, v := range c.initial {
ni.st.set[k] = v
}
b, err := encodeNotificationSet(c.msg)
if err != nil {
t.Fatal(err)
}
// OnGossipBroadcast expects the provided delta as is.
d, err := ni.OnGossipBroadcast(mesh.UnknownPeerName, b)
if err != nil {
t.Errorf("%v OnGossipBroadcast %v: %s", c.initial, c.msg, err)
continue
}
want := c.final
if have := ni.st.set; !reflect.DeepEqual(want, have) {
t.Errorf("%v OnGossip %v: want %v, have %v", c.initial, c.msg, want, have)
}
want = c.delta
if have := d.(*notificationState).set; !reflect.DeepEqual(want, have) {
t.Errorf("%v OnGossipBroadcast %v: want %v, have %v", c.initial, c.msg, want, have)
}
}
for _, c := range cases {
ni, err := NewNotificationInfos(log.Base(), time.Hour, "")
if err != nil {
t.Fatal(err)
}
for k, v := range c.initial {
ni.st.set[k] = v
}
b, err := encodeNotificationSet(c.msg)
if err != nil {
t.Fatal(err)
}
// OnGossipUnicast always expects the full state back.
err = ni.OnGossipUnicast(mesh.UnknownPeerName, b)
if err != nil {
t.Errorf("%v OnGossip %v: %s", c.initial, c.msg, err)
continue
}
want := c.final
if have := ni.st.set; !reflect.DeepEqual(want, have) {
t.Errorf("%v OnGossip %v: want %v, have %v", c.initial, c.msg, want, have)
}
}
}
func TestNotificationInfosSet(t *testing.T) {
var (
now = utcNow()
retention = time.Hour
)
cases := []struct {
initial map[notificationKey]notificationEntry
input []*types.NotificationInfo
update map[notificationKey]notificationEntry
final map[notificationKey]notificationEntry
}{
{
initial: map[notificationKey]notificationEntry{},
input: []*types.NotificationInfo{
{
Alert: 0x10,
Receiver: "recv1",
Resolved: false,
Timestamp: now,
},
},
update: map[notificationKey]notificationEntry{
{"recv1", 0x10}: {false, now, now.Add(retention)},
},
final: map[notificationKey]notificationEntry{
{"recv1", 0x10}: {false, now, now.Add(retention)},
},
},
{
// In this testcase we the second input update is already state
// respective to the current state. We currently do not prune it
// from the update as it's not a common occurrence.
// The update is okay to propagate but the final state must correctly
// drop it.
initial: map[notificationKey]notificationEntry{
{"recv1", 0x10}: {false, now, now.Add(retention)},
{"recv2", 0x10}: {false, now.Add(10 * time.Minute), now.Add(retention).Add(10 * time.Minute)},
},
input: []*types.NotificationInfo{
{
Alert: 0x10,
Receiver: "recv1",
Resolved: true,
Timestamp: now.Add(10 * time.Minute),
},
{
Alert: 0x10,
Receiver: "recv2",
Resolved: true,
Timestamp: now,
},
{
Alert: 0x20,
Receiver: "recv2",
Resolved: false,
Timestamp: now,
},
},
update: map[notificationKey]notificationEntry{
{"recv1", 0x10}: {true, now.Add(10 * time.Minute), now.Add(retention).Add(10 * time.Minute)},
{"recv2", 0x10}: {true, now, now.Add(retention)},
{"recv2", 0x20}: {false, now, now.Add(retention)},
},
final: map[notificationKey]notificationEntry{
{"recv1", 0x10}: {true, now.Add(10 * time.Minute), now.Add(retention).Add(10 * time.Minute)},
{"recv2", 0x10}: {false, now.Add(10 * time.Minute), now.Add(retention).Add(10 * time.Minute)},
{"recv2", 0x20}: {false, now, now.Add(retention)},
},
},
}
for _, c := range cases {
ni, err := NewNotificationInfos(log.Base(), retention, "")
if err != nil {
t.Fatal(err)
}
tg := &testGossip{}
ni.Register(tg)
ni.st = &notificationState{set: c.initial}
if err := ni.Set(c.input...); err != nil {
t.Errorf("Insert failed: %s", err)
continue
}
// Verify the correct state afterwards.
if have := ni.st.set; !reflect.DeepEqual(have, c.final) {
t.Errorf("Wrong final state %v, expected %v", have, c.final)
continue
}
// Verify that we gossiped the correct update.
if have := tg.updates[0].(*notificationState).set; !reflect.DeepEqual(have, c.update) {
t.Errorf("Wrong gossip update %v, expected %v", have, c.update)
continue
}
}
}
func TestNotificationInfosGet(t *testing.T) {
var (
now = utcNow()
retention = time.Hour
)
type query struct {
recv string
fps []model.Fingerprint
want []*types.NotificationInfo
}
cases := []struct {
state map[notificationKey]notificationEntry
queries []query
}{
{
state: map[notificationKey]notificationEntry{
{"recv1", 0x10}: {true, now.Add(time.Minute), now.Add(retention)},
{"recv1", 0x30}: {true, now.Add(time.Minute), now.Add(retention)},
{"recv2", 0x10}: {false, now.Add(time.Minute), now.Add(retention)},
{"recv2", 0x20}: {false, now, now.Add(retention)},
// Expired results must be filtered.
{"recv1", 0x30}: {false, now.Add(2 * retention), now.Add(-retention)},
},
queries: []query{
{
recv: "recv1",
fps: []model.Fingerprint{0x1000, 0x10, 0x20, 0x30},
want: []*types.NotificationInfo{
nil,
{
Alert: 0x10,
Receiver: "recv1",
Resolved: true,
Timestamp: now.Add(time.Minute),
},
nil,
nil,
},
},
{
recv: "unknown",
fps: []model.Fingerprint{0x10, 0x1000},
want: []*types.NotificationInfo{nil, nil},
},
},
},
}
for _, c := range cases {
ni, err := NewNotificationInfos(log.Base(), retention, "")
if err != nil {
t.Fatal(err)
}
ni.st = &notificationState{
set: c.state,
now: func() time.Time { return now },
}
for _, q := range c.queries {
have, err := ni.Get(q.recv, q.fps...)
if err != nil {
t.Errorf("Unexpected error: %s", err)
}
if !reflect.DeepEqual(have, q.want) {
t.Errorf("%v %v expected result %v, got %v", q.recv, q.fps, q.want, have)
}
}
}
}
func TestSilencesSet(t *testing.T) {
var (
now = utcNow()

View File

@ -8,209 +8,14 @@ import (
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/provider/mesh/msg"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
"github.com/satori/go.uuid"
"github.com/weaveworks/mesh"
)
func utcNow() time.Time { return time.Now().UTC() }
type notificationKey struct {
Receiver string
Alert model.Fingerprint
}
type notificationEntry struct {
Resolved bool
Timestamp time.Time
ExpiresAt time.Time // Scheduled deletion time.
}
type notificationState struct {
mtx sync.RWMutex
set map[notificationKey]notificationEntry
now func() time.Time // test injection hook
}
func newNotificationState() *notificationState {
return &notificationState{
set: map[notificationKey]notificationEntry{},
now: utcNow,
}
}
func decodeNotificationSet(b []byte) (v msg.NotificationInfoSet, err error) {
return v, proto.Unmarshal(b, &v)
}
func encodeNotificationSet(set map[notificationKey]notificationEntry) ([]byte, error) {
infos := make([]*msg.NotificationInfo, 0, len(set))
for k, v := range set {
infos = append(infos, &msg.NotificationInfo{
Receiver: k.Receiver,
Alert: uint64(k.Alert),
Resolved: v.Resolved,
Timestamp: &msg.Timestamp{
Seconds: v.Timestamp.Unix(),
Nanoseconds: int32(v.Timestamp.Nanosecond()),
},
ExpiresAt: &msg.Timestamp{
Seconds: v.ExpiresAt.Unix(),
Nanoseconds: int32(v.ExpiresAt.Nanosecond()),
},
})
}
return proto.Marshal(&msg.NotificationInfoSet{Infos: infos})
}
func (st *notificationState) gc() {
st.mtx.Lock()
defer st.mtx.Unlock()
now := st.now()
for k, v := range st.set {
if !v.ExpiresAt.After(now) {
delete(st.set, k)
}
}
}
func (st *notificationState) snapshot(w io.Writer) error {
st.mtx.RLock()
defer st.mtx.RUnlock()
enc := gob.NewEncoder(w)
for k, n := range st.set {
if err := enc.Encode(k); err != nil {
return err
}
if err := enc.Encode(n); err != nil {
return err
}
}
return nil
}
func (st *notificationState) loadSnapshot(r io.Reader) error {
st.mtx.Lock()
defer st.mtx.Unlock()
dec := gob.NewDecoder(r)
for {
var k notificationKey
var n notificationEntry
if err := dec.Decode(&k); err != nil {
// Only EOF at the start of new pair is correct.
if err == io.EOF {
break
}
return err
}
if err := dec.Decode(&n); err != nil {
return err
}
st.set[k] = n
}
return nil
}
// copy returns a deep copy of the notification state.
func (st *notificationState) copy() *notificationState {
st.mtx.RLock()
defer st.mtx.RUnlock()
res := &notificationState{
set: make(map[notificationKey]notificationEntry, len(st.set)),
}
for k, v := range st.set {
res.set[k] = v
}
return res
}
// Encode the notification state into a single byte slices.
func (st *notificationState) Encode() [][]byte {
st.mtx.RLock()
defer st.mtx.RUnlock()
// TODO(fabxc): split this into chunks of multiple byte slices
// to handle transfer of large state (mesh has a 10MB hard message limit).
b, err := encodeNotificationSet(st.set)
if err != nil {
panic(err)
}
return [][]byte{b}
}
// Merge the notification set with gossip data and return a new notification
// state. The original state remains unchanged.
// The state is based in LWW manner using the timestamp.
func (st *notificationState) Merge(other mesh.GossipData) mesh.GossipData {
o := other.(*notificationState)
o.mtx.RLock()
defer o.mtx.RUnlock()
st.mtx.Lock()
defer st.mtx.Unlock()
for k, v := range o.set {
if prev, ok := st.set[k]; !ok || prev.Timestamp.Before(v.Timestamp) {
st.set[k] = v
}
}
return st
}
func (st *notificationState) mergeComplete(set msg.NotificationInfoSet) *notificationState {
st.mtx.Lock()
defer st.mtx.Unlock()
for _, v := range set.Infos {
ts := time.Unix(v.Timestamp.Seconds, int64(v.Timestamp.Nanoseconds)).UTC()
k := notificationKey{
Receiver: v.Receiver,
Alert: model.Fingerprint(v.Alert),
}
if prev, ok := st.set[k]; !ok || prev.Timestamp.Before(ts) {
st.set[k] = notificationEntry{
Resolved: v.Resolved,
Timestamp: ts,
}
}
}
// XXX(fabxc): from what I understand we merge into the receiver and what
// we return should be exactly that.
// As all operations are locked, this should be fine.
return st
}
func (st *notificationState) mergeDelta(set msg.NotificationInfoSet) *notificationState {
st.mtx.Lock()
defer st.mtx.Unlock()
d := map[notificationKey]notificationEntry{}
for _, v := range set.Infos {
ts := time.Unix(v.Timestamp.Seconds, int64(v.Timestamp.Nanoseconds)).UTC()
k := notificationKey{
Receiver: v.Receiver,
Alert: model.Fingerprint(v.Alert),
}
if prev, ok := st.set[k]; !ok || prev.Timestamp.Before(ts) {
e := notificationEntry{
Resolved: v.Resolved,
Timestamp: ts,
}
st.set[k] = e
d[k] = e
}
}
return &notificationState{set: d}
}
type silenceState struct {
mtx sync.RWMutex
m map[uuid.UUID]*types.Silence

View File

@ -14,61 +14,6 @@ import (
"github.com/satori/go.uuid"
)
func TestNotificationStateGC(t *testing.T) {
now := utcNow()
initial := map[notificationKey]notificationEntry{
{"", 1}: {true, now, now.Add(10 * time.Second)},
{"", 2}: {true, now.Add(30 * time.Minute), now.Add(time.Second)},
{"", 3}: {true, now.Add(-30 * time.Minute), now},
{"", 5}: {true, now.Add(-61 * time.Minute), now.Add(-time.Second)},
}
final := map[notificationKey]notificationEntry{
{"", 1}: {true, now, now.Add(10 * time.Second)},
{"", 2}: {true, now.Add(30 * time.Minute), now.Add(time.Second)},
}
st := newNotificationState()
st.now = func() time.Time { return now }
st.set = initial
st.gc()
if !reflect.DeepEqual(st.set, final) {
t.Errorf("Unexpected state after GC")
t.Errorf("%s", pretty.Compare(st.set, final))
}
}
func TestNotificationStateSnapshot(t *testing.T) {
now := utcNow()
initial := map[notificationKey]notificationEntry{
{"abc", 123}: {false, now.Add(30 * time.Minute), now.Add(time.Hour)},
{"xyz", 789}: {false, now, now.Add(time.Second)},
}
st := newNotificationState()
st.now = func() time.Time { return now }
st.set = initial
var buf bytes.Buffer
if err := st.snapshot(&buf); err != nil {
t.Fatalf("Snapshotting failed: %s", err)
}
st = newNotificationState()
if err := st.loadSnapshot(&buf); err != nil {
t.Fatalf("Loading snapshot failed: %s", err)
}
if !reflect.DeepEqual(st.set, initial) {
t.Errorf("Loaded snapshot did not match")
t.Errorf("%s", pretty.Compare(st.set, initial))
}
}
func TestSilenceStateGC(t *testing.T) {
var (
now = utcNow()

View File

@ -104,11 +104,3 @@ type Silences interface {
// Get a silence associated with a fingerprint.
Get(uuid.UUID) (*types.Silence, error)
}
// Notifies provides information about pending and successful
// notifications. All methods are goroutine-safe.
type Notifies interface {
Get(dest string, fps ...model.Fingerprint) ([]*types.NotificationInfo, error)
// Set several notifies at once. All or none must succeed.
Set(ns ...*types.NotificationInfo) error
}

View File

@ -15,7 +15,6 @@ package types
import (
"fmt"
"hash/fnv"
"sort"
"strings"
"sync"
@ -303,26 +302,3 @@ func (s *Silence) Mutes(lset model.LabelSet) bool {
func (s *Silence) Deleted() bool {
return s.StartsAt.Equal(s.EndsAt)
}
// NotifcationInfo holds information about the last successful notification
// of an alert to a receiver.
type NotificationInfo struct {
Alert model.Fingerprint
Receiver string
Resolved bool
Timestamp time.Time
}
func (n *NotificationInfo) String() string {
return fmt.Sprintf("<Notify:%q@%s to=%v res=%v>", n.Alert, n.Timestamp, n.Receiver, n.Resolved)
}
// Fingerprint returns a quasi-unique fingerprint for the NotifyInfo.
func (n *NotificationInfo) Fingerprint() model.Fingerprint {
h := fnv.New64a()
h.Write([]byte(n.Receiver))
fp := model.Fingerprint(h.Sum64())
return fp ^ n.Alert
}