mirror of
https://github.com/prometheus/alertmanager
synced 2025-02-16 18:47:10 +00:00
* Gossip large messages via SendReliable For messages beyond half of the maximum gossip packet size, send the message to all peer nodes via TCP. The choice of "larger than half the max gossip size" is relatively arbitrary. From brief testing, the overhead from memberlist on a packet seemed to only use ~3 of the available 1400 bytes, and most gossip messages seem to be <<500 bytes. * Add tests for oversized/normal message gossiping * Make oversize metric names consistent * Remove errant printf in test * Correctly increment WaitGroup * Add comment for OversizedMessage func * Add metric for oversized messages dropped Code was added to drop oversized messages if the buffered channel they are sent on is full. This is a good thing to surface as a metric. * Add counter for total oversized messages sent * Change full queue log level to debug Was previously a warning, which isn't necessary now that there is a metric tracking it. Signed-off-by: stuart nelson <stuartnelson3@gmail.com>
207 lines
7.0 KiB
Go
207 lines
7.0 KiB
Go
// Copyright 2018 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package cluster
|
|
|
|
import (
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/hashicorp/memberlist"
|
|
"github.com/prometheus/alertmanager/cluster/clusterpb"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
// delegate implements memberlist.Delegate and memberlist.EventDelegate
|
|
// and broadcasts its peer's state in the cluster.
|
|
type delegate struct {
|
|
*Peer
|
|
|
|
logger log.Logger
|
|
bcast *memberlist.TransmitLimitedQueue
|
|
|
|
messagesReceived *prometheus.CounterVec
|
|
messagesReceivedSize *prometheus.CounterVec
|
|
messagesSent *prometheus.CounterVec
|
|
messagesSentSize *prometheus.CounterVec
|
|
}
|
|
|
|
func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit int) *delegate {
|
|
bcast := &memberlist.TransmitLimitedQueue{
|
|
NumNodes: p.ClusterSize,
|
|
RetransmitMult: retransmit,
|
|
}
|
|
messagesReceived := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_messages_received_total",
|
|
Help: "Total number of cluster messsages received.",
|
|
}, []string{"msg_type"})
|
|
messagesReceivedSize := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_messages_received_size_total",
|
|
Help: "Total size of cluster messages received.",
|
|
}, []string{"msg_type"})
|
|
messagesSent := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_messages_sent_total",
|
|
Help: "Total number of cluster messsages sent.",
|
|
}, []string{"msg_type"})
|
|
messagesSentSize := prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_messages_sent_size_total",
|
|
Help: "Total size of cluster messages sent.",
|
|
}, []string{"msg_type"})
|
|
gossipClusterMembers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "alertmanager_cluster_members",
|
|
Help: "Number indicating current number of members in cluster.",
|
|
}, func() float64 {
|
|
return float64(p.ClusterSize())
|
|
})
|
|
peerPosition := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "alertmanager_peer_position",
|
|
Help: "Position the Alertmanager instance believes it's in. The position determines a peer's behavior in the cluster.",
|
|
}, func() float64 {
|
|
return float64(p.Position())
|
|
})
|
|
healthScore := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "alertmanager_cluster_health_score",
|
|
Help: "Health score of the cluster. Lower values are better and zero means 'totally healthy'.",
|
|
}, func() float64 {
|
|
return float64(p.mlist.GetHealthScore())
|
|
})
|
|
messagesQueued := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "alertmanager_cluster_messages_queued",
|
|
Help: "Number of cluster messsages which are queued.",
|
|
}, func() float64 {
|
|
return float64(bcast.NumQueued())
|
|
})
|
|
|
|
messagesReceived.WithLabelValues("full_state")
|
|
messagesReceivedSize.WithLabelValues("full_state")
|
|
messagesReceived.WithLabelValues("update")
|
|
messagesReceivedSize.WithLabelValues("update")
|
|
messagesSent.WithLabelValues("full_state")
|
|
messagesSentSize.WithLabelValues("full_state")
|
|
messagesSent.WithLabelValues("update")
|
|
messagesSentSize.WithLabelValues("update")
|
|
|
|
reg.MustRegister(messagesReceived, messagesReceivedSize, messagesSent, messagesSentSize,
|
|
gossipClusterMembers, peerPosition, healthScore, messagesQueued)
|
|
|
|
return &delegate{
|
|
logger: l,
|
|
Peer: p,
|
|
bcast: bcast,
|
|
messagesReceived: messagesReceived,
|
|
messagesReceivedSize: messagesReceivedSize,
|
|
messagesSent: messagesSent,
|
|
messagesSentSize: messagesSentSize,
|
|
}
|
|
}
|
|
|
|
// NodeMeta retrieves meta-data about the current node when broadcasting an alive message.
|
|
func (d *delegate) NodeMeta(limit int) []byte {
|
|
return []byte{}
|
|
}
|
|
|
|
// NotifyMsg is the callback invoked when a user-level gossip message is received.
|
|
func (d *delegate) NotifyMsg(b []byte) {
|
|
d.messagesReceived.WithLabelValues("update").Inc()
|
|
d.messagesReceivedSize.WithLabelValues("update").Add(float64(len(b)))
|
|
|
|
var p clusterpb.Part
|
|
if err := proto.Unmarshal(b, &p); err != nil {
|
|
level.Warn(d.logger).Log("msg", "decode broadcast", "err", err)
|
|
return
|
|
}
|
|
s, ok := d.states[p.Key]
|
|
if !ok {
|
|
return
|
|
}
|
|
if err := s.Merge(p.Data); err != nil {
|
|
level.Warn(d.logger).Log("msg", "merge broadcast", "err", err, "key", p.Key)
|
|
return
|
|
}
|
|
}
|
|
|
|
// GetBroadcasts is called when user data messages can be broadcasted.
|
|
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
|
msgs := d.bcast.GetBroadcasts(overhead, limit)
|
|
d.messagesSent.WithLabelValues("update").Add(float64(len(msgs)))
|
|
for _, m := range msgs {
|
|
d.messagesSentSize.WithLabelValues("update").Add(float64(len(m)))
|
|
}
|
|
return msgs
|
|
}
|
|
|
|
// LocalState is called when gossip fetches local state.
|
|
func (d *delegate) LocalState(_ bool) []byte {
|
|
all := &clusterpb.FullState{
|
|
Parts: make([]clusterpb.Part, 0, len(d.states)),
|
|
}
|
|
for key, s := range d.states {
|
|
b, err := s.MarshalBinary()
|
|
if err != nil {
|
|
level.Warn(d.logger).Log("msg", "encode local state", "err", err, "key", key)
|
|
return nil
|
|
}
|
|
all.Parts = append(all.Parts, clusterpb.Part{Key: key, Data: b})
|
|
}
|
|
b, err := proto.Marshal(all)
|
|
if err != nil {
|
|
level.Warn(d.logger).Log("msg", "encode local state", "err", err)
|
|
return nil
|
|
}
|
|
d.messagesSent.WithLabelValues("full_state").Inc()
|
|
d.messagesSentSize.WithLabelValues("full_state").Add(float64(len(b)))
|
|
return b
|
|
}
|
|
|
|
func (d *delegate) MergeRemoteState(buf []byte, _ bool) {
|
|
d.messagesReceived.WithLabelValues("full_state").Inc()
|
|
d.messagesReceivedSize.WithLabelValues("full_state").Add(float64(len(buf)))
|
|
|
|
var fs clusterpb.FullState
|
|
if err := proto.Unmarshal(buf, &fs); err != nil {
|
|
level.Warn(d.logger).Log("msg", "merge remote state", "err", err)
|
|
return
|
|
}
|
|
d.mtx.RLock()
|
|
defer d.mtx.RUnlock()
|
|
|
|
for _, p := range fs.Parts {
|
|
s, ok := d.states[p.Key]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if err := s.Merge(p.Data); err != nil {
|
|
level.Warn(d.logger).Log("msg", "merge remote state", "err", err, "key", p.Key)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// NotifyJoin is called if a peer joins the cluster.
|
|
func (d *delegate) NotifyJoin(n *memberlist.Node) {
|
|
level.Debug(d.logger).Log("received", "NotifyJoin", "node", n.Name, "addr", n.Address())
|
|
d.Peer.peerJoin(n)
|
|
}
|
|
|
|
// NotifyLeave is called if a peer leaves the cluster.
|
|
func (d *delegate) NotifyLeave(n *memberlist.Node) {
|
|
level.Debug(d.logger).Log("received", "NotifyLeave", "node", n.Name, "addr", n.Address())
|
|
d.Peer.peerLeave(n)
|
|
}
|
|
|
|
// NotifyUpdate is called if a cluster peer gets updated.
|
|
func (d *delegate) NotifyUpdate(n *memberlist.Node) {
|
|
level.Debug(d.logger).Log("received", "NotifyUpdate", "node", n.Name, "addr", n.Address())
|
|
d.Peer.peerUpdate(n)
|
|
}
|