From c59f39557b21b74f58d5ba354e96882e27ccd6ad Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 2 Jun 2016 19:29:52 +0200 Subject: [PATCH] notify,main: implement peer based notify backoff This commit implements a wait period before actually dispatching notifications. The backoff linearly depends on the UID order of participating peers. This gives the gossip state time to catch up and avoids duplicate notifications while ensuring that every peer notifies eventually. --- main.go | 33 +++++++++++++++++++++++++++++++++ notify/notify.go | 21 +++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/main.go b/main.go index 73d522b0..fc50244f 100644 --- a/main.go +++ b/main.go @@ -137,6 +137,8 @@ func main() { n = notify.Log(n, log.With("step", "retry")) n = notify.Dedup(ni, n) n = notify.Log(n, log.With("step", "dedup")) + n = notify.Wait(meshWait(mrouter, 5*time.Second), n) + n = notify.Log(n, log.With("step", "wait")) fo[i] = n } @@ -235,6 +237,37 @@ func main() { log.Infoln("Received SIGTERM, exiting gracefully...") } +type peerDescSlice []mesh.PeerDescription + +func (s peerDescSlice) Len() int { return len(s) } +func (s peerDescSlice) Less(i, j int) bool { return s[i].UID < s[j].UID } +func (s peerDescSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +// meshWait returns a function that inspects the current peer state and returns +// a duration of one base timeout for each peer with a higher ID than ourselves. +func meshWait(r *mesh.Router, timeout time.Duration) func() time.Duration { + return func() time.Duration { + self := r.Peers.Fetch(r.Ourself.Name) + + var peers peerDescSlice + for _, desc := range r.Peers.Descriptions() { + peers = append(peers, desc) + } + sort.Sort(peers) + + k := 0 + for _, desc := range peers { + if desc.Name == self.Name { + break + } + k++ + } + log.Warnf("timeout multiplier: %d", k) + + return time.Duration(k) * timeout + } +} + func initMesh(addr, hwaddr, nickname string) *mesh.Router { host, portStr, err := net.SplitHostPort(addr) diff --git a/notify/notify.go b/notify/notify.go index c458961c..a18a8f6f 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -300,6 +300,27 @@ func (n *DedupingNotifier) Notify(ctx context.Context, alerts ...*types.Alert) e return n.notifies.Set(newNotifies...) } +type WaitNotifier struct { + wait func() time.Duration + notifier Notifier +} + +func Wait(f func() time.Duration, n Notifier) *WaitNotifier { + return &WaitNotifier{ + wait: f, + notifier: n, + } +} + +func (n *WaitNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error { + select { + case <-time.After(n.wait()): + case <-ctx.Done(): + return ctx.Err() + } + return n.notifier.Notify(ctx, alerts...) +} + // Router dispatches the alerts to one of a set of // named notifiers based on the name value provided in the context. type Router map[string]Notifier