From 7a272416de6eb66c8e606493e6924879fb28a02e Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Fri, 15 Jun 2018 18:08:12 +0200 Subject: [PATCH] cluster: prune the queue if it contains too many items (#1418) * cluster: prune the queue if too large Signed-off-by: Simon Pasquier * Address review comments Also increases the pruning interval to 15 minutes and the max queue size to 4096 items (same value as used by Serf). Signed-off-by: Simon Pasquier --- cluster/delegate.go | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/cluster/delegate.go b/cluster/delegate.go index 9333fa4b..6ac050ec 100644 --- a/cluster/delegate.go +++ b/cluster/delegate.go @@ -14,6 +14,8 @@ package cluster import ( + "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" @@ -22,6 +24,9 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Maximum number of messages to be held in the queue. +const maxQueueSize = 4096 + // delegate implements memberlist.Delegate and memberlist.EventDelegate // and broadcasts its peer's state in the cluster. type delegate struct { @@ -34,6 +39,7 @@ type delegate struct { messagesReceivedSize *prometheus.CounterVec messagesSent *prometheus.CounterVec messagesSentSize *prometheus.CounterVec + messagesPruned prometheus.Counter } func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit int) *delegate { @@ -57,6 +63,10 @@ func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit in Name: "alertmanager_cluster_messages_sent_size_total", Help: "Total size of cluster messages sent.", }, []string{"msg_type"}) + messagesPruned := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_cluster_messages_pruned_total", + Help: "Total number of cluster messsages pruned.", + }) gossipClusterMembers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "alertmanager_cluster_members", Help: "Number indicating current number of members in cluster.", @@ -92,9 +102,9 @@ func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit in messagesSentSize.WithLabelValues("update") reg.MustRegister(messagesReceived, messagesReceivedSize, messagesSent, messagesSentSize, - gossipClusterMembers, peerPosition, healthScore, messagesQueued) + gossipClusterMembers, peerPosition, healthScore, messagesQueued, messagesPruned) - return &delegate{ + d := &delegate{ logger: l, Peer: p, bcast: bcast, @@ -102,7 +112,12 @@ func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit in messagesReceivedSize: messagesReceivedSize, messagesSent: messagesSent, messagesSentSize: messagesSentSize, + messagesPruned: messagesPruned, } + + go d.handleQueueDepth() + + return d } // NodeMeta retrieves meta-data about the current node when broadcasting an alive message. @@ -204,3 +219,21 @@ func (d *delegate) NotifyUpdate(n *memberlist.Node) { level.Debug(d.logger).Log("received", "NotifyUpdate", "node", n.Name, "addr", n.Address()) d.Peer.peerUpdate(n) } + +// handleQueueDepth ensures that the queue doesn't grow unbounded by pruning +// older messages at regular interval. +func (d *delegate) handleQueueDepth() { + for { + select { + case <-d.stopc: + return + case <-time.After(15 * time.Minute): + n := d.bcast.NumQueued() + if n > maxQueueSize { + level.Warn(d.logger).Log("msg", "dropping messages because too many are queued", "current", n, "limit", maxQueueSize) + d.bcast.Prune(maxQueueSize) + d.messagesPruned.Add(float64(n - maxQueueSize)) + } + } + } +}