From 3c7a16a8382c03114195c4222ded00629bdbd64c Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 30 May 2016 11:51:28 -0700 Subject: [PATCH] provider/mesh: add notification gossip peer --- provider/mesh/peer.go | 92 ++++++++++++++++++++++++++++++++++++++++++ provider/mesh/state.go | 12 ++++++ 2 files changed, 104 insertions(+) create mode 100644 provider/mesh/peer.go diff --git a/provider/mesh/peer.go b/provider/mesh/peer.go new file mode 100644 index 00000000..c0805a9a --- /dev/null +++ b/provider/mesh/peer.go @@ -0,0 +1,92 @@ +package mesh + +import ( + "fmt" + + "github.com/prometheus/alertmanager/types" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/weaveworks/mesh" +) + +type NotificationInfos struct { + st *notificationState + send mesh.Gossip + logger log.Logger +} + +func NewNotificationInfos(logger log.Logger) *NotificationInfos { + return &NotificationInfos{ + logger: logger, + st: newNotificationState(), + } +} + +func (ni *NotificationInfos) Gossip() mesh.GossipData { + return ni.st.copy() +} + +func (ni *NotificationInfos) OnGossip(b []byte) (mesh.GossipData, error) { + set, err := decodeNotificationSet(b) + if err != nil { + return nil, err + } + d := ni.st.mergeDelta(set) + // The delta is newly created and we are the only one holding it so far. + // Thus, we can access without locking. + if len(d.set) == 0 { + return nil, nil // per OnGossip contract + } + return d, nil +} + +func (ni *NotificationInfos) OnGossipBroadcast(_ mesh.PeerName, b []byte) (mesh.GossipData, error) { + set, err := decodeNotificationSet(b) + if err != nil { + return nil, err + } + return ni.st.mergeDelta(set), nil +} + +func (ni *NotificationInfos) OnGossipUnicast(_ mesh.PeerName, b []byte) error { + set, err := decodeNotificationSet(b) + if err != nil { + return err + } + ni.st.mergeComplete(set) + return nil +} + +func (ni *NotificationInfos) Set(ns ...*types.NotifyInfo) error { + set := map[string]notificationEntry{} + for _, n := range ns { + k := fmt.Sprintf("%x:%s", n.Alert, n.Receiver) + set[k] = notificationEntry{ + Resolved: n.Resolved, + Timestamp: n.Timestamp, + } + } + update := ¬ificationState{set: set} + + ni.st.Merge(update) + ni.send.GossipBroadcast(update) + return nil +} + +func (ni *NotificationInfos) Get(dest string, fps ...model.Fingerprint) ([]*types.NotifyInfo, error) { + res := make([]*types.NotifyInfo, 0, len(fps)) + for _, fp := range fps { + k := fmt.Sprintf("%x:%s", fp, dest) + if e, ok := ni.st.set[k]; ok { + res = append(res, &types.NotifyInfo{ + Alert: fp, + Receiver: dest, + Resolved: e.Resolved, + Timestamp: e.Timestamp, + }) + } else { + res = append(res, nil) + } + } + return res, nil +} diff --git a/provider/mesh/state.go b/provider/mesh/state.go index c67e3b36..e50ce7d0 100644 --- a/provider/mesh/state.go +++ b/provider/mesh/state.go @@ -21,6 +21,18 @@ type notificationState struct { set map[string]notificationEntry } +func newNotificationState() *notificationState { + return ¬ificationState{ + set: map[string]notificationEntry{}, + } +} + +func decodeNotificationSet(b []byte) (map[string]notificationEntry, error) { + var v map[string]notificationEntry + err := gob.NewDecoder(bytes.NewReader(b)).Decode(&v) + return v, err +} + // copy returns a deep copy of the notification state. func (s *notificationState) copy() *notificationState { s.mtx.RLock()