mirror of
https://github.com/prometheus/alertmanager
synced 2025-01-14 10:05:56 +00:00
42ea9a565b
* cluster: make sure we don't miss the first pushPull During the join, memberlist initiates a pushPull to get initial data. Unfortunately, at this point the nflog and silence listener have not been registered yet, so the first data arrives only after one pushPull cycle (1min by default !). Signed-off-by: Corentin Chary <c.chary@criteo.com>
242 lines
8.0 KiB
Go
242 lines
8.0 KiB
Go
// Copyright 2018 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package cluster
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/hashicorp/memberlist"
|
|
"github.com/prometheus/alertmanager/cluster/clusterpb"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
// Maximum number of messages to be held in the queue.
|
|
const maxQueueSize = 4096
|
|
|
|
// delegate implements memberlist.Delegate and memberlist.EventDelegate
|
|
// and broadcasts its peer's state in the cluster.
|
|
type delegate struct {
|
|
*Peer
|
|
|
|
logger log.Logger
|
|
bcast *memberlist.TransmitLimitedQueue
|
|
|
|
messagesReceived *prometheus.CounterVec
|
|
messagesReceivedSize *prometheus.CounterVec
|
|
messagesSent *prometheus.CounterVec
|
|
messagesSentSize *prometheus.CounterVec
|
|
messagesPruned prometheus.Counter
|
|
}
|
|
|
|
func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit int) *delegate {
|
|
bcast := &memberlist.TransmitLimitedQueue{
|
|
NumNodes: p.ClusterSize,
|
|
RetransmitMult: retransmit,
|
|
}
|
|
messagesReceived := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_messages_received_total",
|
|
Help: "Total number of cluster messsages received.",
|
|
}, []string{"msg_type"})
|
|
messagesReceivedSize := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_messages_received_size_total",
|
|
Help: "Total size of cluster messages received.",
|
|
}, []string{"msg_type"})
|
|
messagesSent := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_messages_sent_total",
|
|
Help: "Total number of cluster messsages sent.",
|
|
}, []string{"msg_type"})
|
|
messagesSentSize := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_messages_sent_size_total",
|
|
Help: "Total size of cluster messages sent.",
|
|
}, []string{"msg_type"})
|
|
messagesPruned := prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_messages_pruned_total",
|
|
Help: "Total number of cluster messsages pruned.",
|
|
})
|
|
gossipClusterMembers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "alertmanager_cluster_members",
|
|
Help: "Number indicating current number of members in cluster.",
|
|
}, func() float64 {
|
|
return float64(p.ClusterSize())
|
|
})
|
|
peerPosition := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "alertmanager_peer_position",
|
|
Help: "Position the Alertmanager instance believes it's in. The position determines a peer's behavior in the cluster.",
|
|
}, func() float64 {
|
|
return float64(p.Position())
|
|
})
|
|
healthScore := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "alertmanager_cluster_health_score",
|
|
Help: "Health score of the cluster. Lower values are better and zero means 'totally healthy'.",
|
|
}, func() float64 {
|
|
return float64(p.mlist.GetHealthScore())
|
|
})
|
|
messagesQueued := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "alertmanager_cluster_messages_queued",
|
|
Help: "Number of cluster messsages which are queued.",
|
|
}, func() float64 {
|
|
return float64(bcast.NumQueued())
|
|
})
|
|
|
|
messagesReceived.WithLabelValues("full_state")
|
|
messagesReceivedSize.WithLabelValues("full_state")
|
|
messagesReceived.WithLabelValues("update")
|
|
messagesReceivedSize.WithLabelValues("update")
|
|
messagesSent.WithLabelValues("full_state")
|
|
messagesSentSize.WithLabelValues("full_state")
|
|
messagesSent.WithLabelValues("update")
|
|
messagesSentSize.WithLabelValues("update")
|
|
|
|
reg.MustRegister(messagesReceived, messagesReceivedSize, messagesSent, messagesSentSize,
|
|
gossipClusterMembers, peerPosition, healthScore, messagesQueued, messagesPruned)
|
|
|
|
d := &delegate{
|
|
logger: l,
|
|
Peer: p,
|
|
bcast: bcast,
|
|
messagesReceived: messagesReceived,
|
|
messagesReceivedSize: messagesReceivedSize,
|
|
messagesSent: messagesSent,
|
|
messagesSentSize: messagesSentSize,
|
|
messagesPruned: messagesPruned,
|
|
}
|
|
|
|
go d.handleQueueDepth()
|
|
|
|
return d
|
|
}
|
|
|
|
// NodeMeta retrieves meta-data about the current node when broadcasting an alive message.
|
|
func (d *delegate) NodeMeta(limit int) []byte {
|
|
return []byte{}
|
|
}
|
|
|
|
// NotifyMsg is the callback invoked when a user-level gossip message is received.
|
|
func (d *delegate) NotifyMsg(b []byte) {
|
|
d.messagesReceived.WithLabelValues("update").Inc()
|
|
d.messagesReceivedSize.WithLabelValues("update").Add(float64(len(b)))
|
|
|
|
var p clusterpb.Part
|
|
if err := proto.Unmarshal(b, &p); err != nil {
|
|
level.Warn(d.logger).Log("msg", "decode broadcast", "err", err)
|
|
return
|
|
}
|
|
|
|
s, ok := d.states[p.Key]
|
|
if !ok {
|
|
return
|
|
}
|
|
if err := s.Merge(p.Data); err != nil {
|
|
level.Warn(d.logger).Log("msg", "merge broadcast", "err", err, "key", p.Key)
|
|
return
|
|
}
|
|
}
|
|
|
|
// GetBroadcasts is called when user data messages can be broadcasted.
|
|
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
|
msgs := d.bcast.GetBroadcasts(overhead, limit)
|
|
d.messagesSent.WithLabelValues("update").Add(float64(len(msgs)))
|
|
for _, m := range msgs {
|
|
d.messagesSentSize.WithLabelValues("update").Add(float64(len(m)))
|
|
}
|
|
return msgs
|
|
}
|
|
|
|
// LocalState is called when gossip fetches local state.
|
|
func (d *delegate) LocalState(_ bool) []byte {
|
|
all := &clusterpb.FullState{
|
|
Parts: make([]clusterpb.Part, 0, len(d.states)),
|
|
}
|
|
|
|
for key, s := range d.states {
|
|
b, err := s.MarshalBinary()
|
|
if err != nil {
|
|
level.Warn(d.logger).Log("msg", "encode local state", "err", err, "key", key)
|
|
return nil
|
|
}
|
|
all.Parts = append(all.Parts, clusterpb.Part{Key: key, Data: b})
|
|
}
|
|
b, err := proto.Marshal(all)
|
|
if err != nil {
|
|
level.Warn(d.logger).Log("msg", "encode local state", "err", err)
|
|
return nil
|
|
}
|
|
d.messagesSent.WithLabelValues("full_state").Inc()
|
|
d.messagesSentSize.WithLabelValues("full_state").Add(float64(len(b)))
|
|
return b
|
|
}
|
|
|
|
func (d *delegate) MergeRemoteState(buf []byte, _ bool) {
|
|
d.messagesReceived.WithLabelValues("full_state").Inc()
|
|
d.messagesReceivedSize.WithLabelValues("full_state").Add(float64(len(buf)))
|
|
|
|
var fs clusterpb.FullState
|
|
if err := proto.Unmarshal(buf, &fs); err != nil {
|
|
level.Warn(d.logger).Log("msg", "merge remote state", "err", err)
|
|
return
|
|
}
|
|
d.mtx.RLock()
|
|
defer d.mtx.RUnlock()
|
|
for _, p := range fs.Parts {
|
|
s, ok := d.states[p.Key]
|
|
if !ok {
|
|
level.Warn(d.logger).Log("received", "unknown state key", "len", len(buf), "key", p.Key)
|
|
continue
|
|
}
|
|
if err := s.Merge(p.Data); err != nil {
|
|
level.Warn(d.logger).Log("msg", "merge remote state", "err", err, "key", p.Key)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// NotifyJoin is called if a peer joins the cluster.
|
|
func (d *delegate) NotifyJoin(n *memberlist.Node) {
|
|
level.Debug(d.logger).Log("received", "NotifyJoin", "node", n.Name, "addr", n.Address())
|
|
d.Peer.peerJoin(n)
|
|
}
|
|
|
|
// NotifyLeave is called if a peer leaves the cluster.
|
|
func (d *delegate) NotifyLeave(n *memberlist.Node) {
|
|
level.Debug(d.logger).Log("received", "NotifyLeave", "node", n.Name, "addr", n.Address())
|
|
d.Peer.peerLeave(n)
|
|
}
|
|
|
|
// NotifyUpdate is called if a cluster peer gets updated.
|
|
func (d *delegate) NotifyUpdate(n *memberlist.Node) {
|
|
level.Debug(d.logger).Log("received", "NotifyUpdate", "node", n.Name, "addr", n.Address())
|
|
d.Peer.peerUpdate(n)
|
|
}
|
|
|
|
// handleQueueDepth ensures that the queue doesn't grow unbounded by pruning
|
|
// older messages at regular interval.
|
|
func (d *delegate) handleQueueDepth() {
|
|
for {
|
|
select {
|
|
case <-d.stopc:
|
|
return
|
|
case <-time.After(15 * time.Minute):
|
|
n := d.bcast.NumQueued()
|
|
if n > maxQueueSize {
|
|
level.Warn(d.logger).Log("msg", "dropping messages because too many are queued", "current", n, "limit", maxQueueSize)
|
|
d.bcast.Prune(maxQueueSize)
|
|
d.messagesPruned.Add(float64(n - maxQueueSize))
|
|
}
|
|
}
|
|
}
|
|
}
|