cluster: prune the queue if it contains too many items (#1418)
* cluster: prune the queue if too large Signed-off-by: Simon Pasquier <spasquie@redhat.com> * Address review comments Also increases the pruning interval to 15 minutes and the max queue size to 4096 items (same value as used by Serf). Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
parent
445fbdf1a8
commit
7a272416de
|
@ -14,6 +14,8 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
@ -22,6 +24,9 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// Maximum number of messages to be held in the queue.
|
||||
const maxQueueSize = 4096
|
||||
|
||||
// delegate implements memberlist.Delegate and memberlist.EventDelegate
|
||||
// and broadcasts its peer's state in the cluster.
|
||||
type delegate struct {
|
||||
|
@ -34,6 +39,7 @@ type delegate struct {
|
|||
messagesReceivedSize *prometheus.CounterVec
|
||||
messagesSent *prometheus.CounterVec
|
||||
messagesSentSize *prometheus.CounterVec
|
||||
messagesPruned prometheus.Counter
|
||||
}
|
||||
|
||||
func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit int) *delegate {
|
||||
|
@ -57,6 +63,10 @@ func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit in
|
|||
Name: "alertmanager_cluster_messages_sent_size_total",
|
||||
Help: "Total size of cluster messages sent.",
|
||||
}, []string{"msg_type"})
|
||||
messagesPruned := prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_messages_pruned_total",
|
||||
Help: "Total number of cluster messsages pruned.",
|
||||
})
|
||||
gossipClusterMembers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_cluster_members",
|
||||
Help: "Number indicating current number of members in cluster.",
|
||||
|
@ -92,9 +102,9 @@ func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit in
|
|||
messagesSentSize.WithLabelValues("update")
|
||||
|
||||
reg.MustRegister(messagesReceived, messagesReceivedSize, messagesSent, messagesSentSize,
|
||||
gossipClusterMembers, peerPosition, healthScore, messagesQueued)
|
||||
gossipClusterMembers, peerPosition, healthScore, messagesQueued, messagesPruned)
|
||||
|
||||
return &delegate{
|
||||
d := &delegate{
|
||||
logger: l,
|
||||
Peer: p,
|
||||
bcast: bcast,
|
||||
|
@ -102,7 +112,12 @@ func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit in
|
|||
messagesReceivedSize: messagesReceivedSize,
|
||||
messagesSent: messagesSent,
|
||||
messagesSentSize: messagesSentSize,
|
||||
messagesPruned: messagesPruned,
|
||||
}
|
||||
|
||||
go d.handleQueueDepth()
|
||||
|
||||
return d
|
||||
}
|
||||
|
||||
// NodeMeta retrieves meta-data about the current node when broadcasting an alive message.
|
||||
|
@ -204,3 +219,21 @@ func (d *delegate) NotifyUpdate(n *memberlist.Node) {
|
|||
level.Debug(d.logger).Log("received", "NotifyUpdate", "node", n.Name, "addr", n.Address())
|
||||
d.Peer.peerUpdate(n)
|
||||
}
|
||||
|
||||
// handleQueueDepth ensures that the queue doesn't grow unbounded by pruning
|
||||
// older messages at regular interval.
|
||||
func (d *delegate) handleQueueDepth() {
|
||||
for {
|
||||
select {
|
||||
case <-d.stopc:
|
||||
return
|
||||
case <-time.After(15 * time.Minute):
|
||||
n := d.bcast.NumQueued()
|
||||
if n > maxQueueSize {
|
||||
level.Warn(d.logger).Log("msg", "dropping messages because too many are queued", "current", n, "limit", maxQueueSize)
|
||||
d.bcast.Prune(maxQueueSize)
|
||||
d.messagesPruned.Add(float64(n - maxQueueSize))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue