notify,main: implement peer based notify backoff
This commit implements a wait period before actually dispatching notifications. The backoff linearly depends on the UID order of participating peers. This gives the gossip state time to catch up and avoids duplicate notifications while ensuring that every peer notifies eventually.
This commit is contained in:
parent
e51770ce21
commit
c59f39557b
33
main.go
33
main.go
|
@ -137,6 +137,8 @@ func main() {
|
|||
n = notify.Log(n, log.With("step", "retry"))
|
||||
n = notify.Dedup(ni, n)
|
||||
n = notify.Log(n, log.With("step", "dedup"))
|
||||
n = notify.Wait(meshWait(mrouter, 5*time.Second), n)
|
||||
n = notify.Log(n, log.With("step", "wait"))
|
||||
|
||||
fo[i] = n
|
||||
}
|
||||
|
@ -235,6 +237,37 @@ func main() {
|
|||
log.Infoln("Received SIGTERM, exiting gracefully...")
|
||||
}
|
||||
|
||||
type peerDescSlice []mesh.PeerDescription
|
||||
|
||||
func (s peerDescSlice) Len() int { return len(s) }
|
||||
func (s peerDescSlice) Less(i, j int) bool { return s[i].UID < s[j].UID }
|
||||
func (s peerDescSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||
|
||||
// meshWait returns a function that inspects the current peer state and returns
|
||||
// a duration of one base timeout for each peer with a higher ID than ourselves.
|
||||
func meshWait(r *mesh.Router, timeout time.Duration) func() time.Duration {
|
||||
return func() time.Duration {
|
||||
self := r.Peers.Fetch(r.Ourself.Name)
|
||||
|
||||
var peers peerDescSlice
|
||||
for _, desc := range r.Peers.Descriptions() {
|
||||
peers = append(peers, desc)
|
||||
}
|
||||
sort.Sort(peers)
|
||||
|
||||
k := 0
|
||||
for _, desc := range peers {
|
||||
if desc.Name == self.Name {
|
||||
break
|
||||
}
|
||||
k++
|
||||
}
|
||||
log.Warnf("timeout multiplier: %d", k)
|
||||
|
||||
return time.Duration(k) * timeout
|
||||
}
|
||||
}
|
||||
|
||||
func initMesh(addr, hwaddr, nickname string) *mesh.Router {
|
||||
host, portStr, err := net.SplitHostPort(addr)
|
||||
|
||||
|
|
|
@ -300,6 +300,27 @@ func (n *DedupingNotifier) Notify(ctx context.Context, alerts ...*types.Alert) e
|
|||
return n.notifies.Set(newNotifies...)
|
||||
}
|
||||
|
||||
type WaitNotifier struct {
|
||||
wait func() time.Duration
|
||||
notifier Notifier
|
||||
}
|
||||
|
||||
func Wait(f func() time.Duration, n Notifier) *WaitNotifier {
|
||||
return &WaitNotifier{
|
||||
wait: f,
|
||||
notifier: n,
|
||||
}
|
||||
}
|
||||
|
||||
func (n *WaitNotifier) Notify(ctx context.Context, alerts ...*types.Alert) error {
|
||||
select {
|
||||
case <-time.After(n.wait()):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return n.notifier.Notify(ctx, alerts...)
|
||||
}
|
||||
|
||||
// Router dispatches the alerts to one of a set of
|
||||
// named notifiers based on the name value provided in the context.
|
||||
type Router map[string]Notifier
|
||||
|
|
Loading…
Reference in New Issue