mirror of
https://github.com/prometheus/alertmanager
synced 2025-02-16 02:27:01 +00:00
memberlist reconnect (#1384)
* initial impl Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Add reconnectTimeout Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Fix locking Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Remove unused PeerStatuses Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Add metrics Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Actually use peerJoinCounter Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Cleanup peers map on peer timeout Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Add reconnect test Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * test removing failed peers Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Use peer address as map key If a peer is restarted, it will rejoin with the same IP but different ULID. So the node will rejoin the cluster, but its peers will never remove it from their internal list of failed nodes because its ULID has changed. Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Add failed peers from creation Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Remove warnIfAlone() Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Update metric names Signed-off-by: stuart nelson <stuartnelson3@gmail.com> * Address comments Signed-off-by: stuart nelson <stuartnelson3@gmail.com>
This commit is contained in:
parent
402564055b
commit
db4af95ea0
@ -15,7 +15,7 @@ package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sort"
|
||||
@ -45,15 +45,58 @@ type Peer struct {
|
||||
stopc chan struct{}
|
||||
readyc chan struct{}
|
||||
|
||||
peerLock sync.RWMutex
|
||||
peers map[string]peer
|
||||
failedPeers []peer
|
||||
|
||||
failedReconnectionsCounter prometheus.Counter
|
||||
reconnectionsCounter prometheus.Counter
|
||||
peerLeaveCounter prometheus.Counter
|
||||
peerUpdateCounter prometheus.Counter
|
||||
peerJoinCounter prometheus.Counter
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// peer is an internal type used for bookkeeping. It holds the state of peers
|
||||
// in the cluster.
|
||||
type peer struct {
|
||||
status PeerStatus
|
||||
leaveTime time.Time
|
||||
|
||||
*memberlist.Node
|
||||
}
|
||||
|
||||
// PeerStatus is the state that a peer is in.
|
||||
type PeerStatus int
|
||||
|
||||
const (
|
||||
DefaultPushPullInterval = 60 * time.Second
|
||||
DefaultGossipInterval = 200 * time.Millisecond
|
||||
DefaultTcpTimeout = 10 * time.Second
|
||||
DefaultProbeTimeout = 500 * time.Millisecond
|
||||
DefaultProbeInterval = 1 * time.Second
|
||||
StatusNone PeerStatus = iota
|
||||
StatusAlive
|
||||
StatusFailed
|
||||
)
|
||||
|
||||
func (s PeerStatus) String() string {
|
||||
switch s {
|
||||
case StatusNone:
|
||||
return "none"
|
||||
case StatusAlive:
|
||||
return "alive"
|
||||
case StatusFailed:
|
||||
return "failed"
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown PeerStatus: %d", s))
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
DefaultPushPullInterval = 60 * time.Second
|
||||
DefaultGossipInterval = 200 * time.Millisecond
|
||||
DefaultTcpTimeout = 10 * time.Second
|
||||
DefaultProbeTimeout = 500 * time.Millisecond
|
||||
DefaultProbeInterval = 1 * time.Second
|
||||
DefaultReconnectInterval = 10 * time.Second
|
||||
DefaultReconnectTimeout = 6 * time.Hour
|
||||
)
|
||||
|
||||
func Join(
|
||||
@ -68,6 +111,8 @@ func Join(
|
||||
tcpTimeout time.Duration,
|
||||
probeTimeout time.Duration,
|
||||
probeInterval time.Duration,
|
||||
reconnectInterval time.Duration,
|
||||
reconnectTimeout time.Duration,
|
||||
) (*Peer, error) {
|
||||
bindHost, bindPortStr, err := net.SplitHostPort(bindAddr)
|
||||
if err != nil {
|
||||
@ -119,7 +164,11 @@ func Join(
|
||||
stopc: make(chan struct{}),
|
||||
readyc: make(chan struct{}),
|
||||
logger: l,
|
||||
peers: map[string]peer{},
|
||||
}
|
||||
|
||||
p.register(reg)
|
||||
|
||||
p.delegate = newDelegate(l, reg, p)
|
||||
|
||||
cfg := memberlist.DefaultLANConfig()
|
||||
@ -133,7 +182,7 @@ func Join(
|
||||
cfg.TCPTimeout = tcpTimeout
|
||||
cfg.ProbeTimeout = probeTimeout
|
||||
cfg.ProbeInterval = probeInterval
|
||||
cfg.LogOutput = ioutil.Discard
|
||||
cfg.LogOutput = &logWriter{l: l}
|
||||
|
||||
if advertiseAddr != "" {
|
||||
cfg.AdvertiseAddr = advertiseHost
|
||||
@ -146,6 +195,8 @@ func Join(
|
||||
}
|
||||
p.mlist = ml
|
||||
|
||||
p.setInitialFailed(resolvedPeers)
|
||||
|
||||
n, err := ml.Join(resolvedPeers)
|
||||
if err != nil {
|
||||
level.Warn(l).Log("msg", "failed to join cluster", "err", err)
|
||||
@ -153,13 +204,80 @@ func Join(
|
||||
level.Debug(l).Log("msg", "joined cluster", "peers", n)
|
||||
}
|
||||
|
||||
if n > 0 {
|
||||
go p.warnIfAlone(l, 10*time.Second)
|
||||
if reconnectInterval != 0 {
|
||||
go p.handleReconnect(reconnectInterval)
|
||||
}
|
||||
if reconnectTimeout != 0 {
|
||||
go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout)
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *Peer) warnIfAlone(logger log.Logger, d time.Duration) {
|
||||
// All peers are initially added to the failed list. They will be removed from
|
||||
// this list in peerJoin when making their initial connection.
|
||||
func (p *Peer) setInitialFailed(peers []string) {
|
||||
if len(peers) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for _, peerAddr := range peers {
|
||||
pr := peer{
|
||||
status: StatusNone,
|
||||
leaveTime: now,
|
||||
}
|
||||
p.failedPeers = append(p.failedPeers, pr)
|
||||
p.peers[peerAddr] = pr
|
||||
}
|
||||
}
|
||||
|
||||
type logWriter struct {
|
||||
l log.Logger
|
||||
}
|
||||
|
||||
func (l *logWriter) Write(b []byte) (int, error) {
|
||||
return len(b), level.Debug(l.l).Log("memberlist", string(b))
|
||||
}
|
||||
|
||||
func (p *Peer) register(reg prometheus.Registerer) {
|
||||
clusterFailedPeers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_cluster_failed_peers",
|
||||
Help: "Number indicating the current number of failed peers in the cluster.",
|
||||
}, func() float64 {
|
||||
p.peerLock.RLock()
|
||||
defer p.peerLock.RUnlock()
|
||||
|
||||
return float64(len(p.failedPeers))
|
||||
})
|
||||
p.failedReconnectionsCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_reconnections_failed_total",
|
||||
Help: "A counter of the number of failed cluster peer reconnection attempts.",
|
||||
})
|
||||
|
||||
p.reconnectionsCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_reconnections_total",
|
||||
Help: "A counter of the number of cluster peer reconnections.",
|
||||
})
|
||||
|
||||
p.peerLeaveCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_peers_left_total",
|
||||
Help: "A counter of the number of peers that have left.",
|
||||
})
|
||||
p.peerUpdateCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_peers_update_total",
|
||||
Help: "A counter of the number of peers that have updated metadata.",
|
||||
})
|
||||
p.peerJoinCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_peers_joined_total",
|
||||
Help: "A counter of the number of peers that have joined.",
|
||||
})
|
||||
|
||||
reg.MustRegister(clusterFailedPeers, p.failedReconnectionsCounter, p.reconnectionsCounter,
|
||||
p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter)
|
||||
}
|
||||
|
||||
func (p *Peer) handleReconnectTimeout(d time.Duration, timeout time.Duration) {
|
||||
tick := time.NewTicker(d)
|
||||
defer tick.Stop()
|
||||
|
||||
@ -168,13 +286,130 @@ func (p *Peer) warnIfAlone(logger log.Logger, d time.Duration) {
|
||||
case <-p.stopc:
|
||||
return
|
||||
case <-tick.C:
|
||||
if n := p.mlist.NumMembers(); n <= 1 {
|
||||
level.Warn(logger).Log("NumMembers", n, "msg", "I appear to be alone in the cluster")
|
||||
}
|
||||
p.removeFailedPeers(timeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) removeFailedPeers(timeout time.Duration) {
|
||||
p.peerLock.Lock()
|
||||
defer p.peerLock.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
keep := make([]peer, 0, len(p.failedPeers))
|
||||
for _, pr := range p.failedPeers {
|
||||
if pr.leaveTime.Add(timeout).After(now) {
|
||||
keep = append(keep, pr)
|
||||
} else {
|
||||
level.Debug(p.logger).Log("msg", "failed peer has timed out", "peer", pr.Node, "addr", pr.Address())
|
||||
delete(p.peers, pr.Name)
|
||||
}
|
||||
}
|
||||
|
||||
p.failedPeers = keep
|
||||
}
|
||||
|
||||
func (p *Peer) handleReconnect(d time.Duration) {
|
||||
tick := time.NewTicker(d)
|
||||
defer tick.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.stopc:
|
||||
return
|
||||
case <-tick.C:
|
||||
p.reconnect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) reconnect() {
|
||||
p.peerLock.RLock()
|
||||
failedPeers := p.failedPeers
|
||||
p.peerLock.RUnlock()
|
||||
|
||||
logger := log.With(p.logger, "msg", "reconnect")
|
||||
for _, pr := range failedPeers {
|
||||
// No need to do book keeping on failedPeers here. If a
|
||||
// reconnect is successful, they will be announced in
|
||||
// peerJoin().
|
||||
if _, err := p.mlist.Join([]string{pr.Address()}); err != nil {
|
||||
p.failedReconnectionsCounter.Inc()
|
||||
level.Debug(logger).Log("result", "failure", "peer", pr.Node, "addr", pr.Address())
|
||||
} else {
|
||||
p.reconnectionsCounter.Inc()
|
||||
level.Debug(logger).Log("result", "success", "peer", pr.Node, "addr", pr.Address())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) peerJoin(n *memberlist.Node) {
|
||||
p.peerLock.Lock()
|
||||
defer p.peerLock.Unlock()
|
||||
|
||||
var oldStatus PeerStatus
|
||||
pr, ok := p.peers[n.Address()]
|
||||
if !ok {
|
||||
oldStatus = StatusNone
|
||||
pr = peer{
|
||||
status: StatusAlive,
|
||||
Node: n,
|
||||
}
|
||||
} else {
|
||||
oldStatus = pr.status
|
||||
pr.Node = n
|
||||
pr.status = StatusAlive
|
||||
pr.leaveTime = time.Time{}
|
||||
}
|
||||
|
||||
p.peers[n.Address()] = pr
|
||||
p.peerJoinCounter.Inc()
|
||||
|
||||
if oldStatus == StatusFailed {
|
||||
level.Debug(p.logger).Log("msg", "peer rejoined", "peer", pr.Node)
|
||||
p.failedPeers = removeOldPeer(p.failedPeers, pr.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) peerLeave(n *memberlist.Node) {
|
||||
p.peerLock.Lock()
|
||||
defer p.peerLock.Unlock()
|
||||
|
||||
pr, ok := p.peers[n.Address()]
|
||||
if !ok {
|
||||
// Why are we receiving a leave notification from a node that
|
||||
// never joined?
|
||||
return
|
||||
}
|
||||
|
||||
pr.status = StatusFailed
|
||||
pr.leaveTime = time.Now()
|
||||
p.failedPeers = append(p.failedPeers, pr)
|
||||
p.peers[n.Address()] = pr
|
||||
|
||||
p.peerLeaveCounter.Inc()
|
||||
level.Debug(p.logger).Log("msg", "peer left", "peer", pr.Node)
|
||||
}
|
||||
|
||||
func (p *Peer) peerUpdate(n *memberlist.Node) {
|
||||
p.peerLock.Lock()
|
||||
defer p.peerLock.Unlock()
|
||||
|
||||
pr, ok := p.peers[n.Address()]
|
||||
if !ok {
|
||||
// Why are we receiving an update from a node that never
|
||||
// joined?
|
||||
return
|
||||
}
|
||||
|
||||
pr.Node = n
|
||||
p.peers[n.Address()] = pr
|
||||
|
||||
p.peerUpdateCounter.Inc()
|
||||
level.Debug(p.logger).Log("msg", "peer updated", "peer", pr.Node)
|
||||
}
|
||||
|
||||
// AddState adds a new state that will be gossiped. It returns a channel to which
|
||||
// broadcast messages for the state can be sent.
|
||||
func (p *Peer) AddState(key string, s State) *Channel {
|
||||
@ -185,6 +420,7 @@ func (p *Peer) AddState(key string, s State) *Channel {
|
||||
// Leave the cluster, waiting up to timeout.
|
||||
func (p *Peer) Leave(timeout time.Duration) error {
|
||||
close(p.stopc)
|
||||
level.Debug(p.logger).Log("msg", "leaving cluster")
|
||||
return p.mlist.Leave(timeout)
|
||||
}
|
||||
|
||||
@ -336,184 +572,6 @@ func (c *Channel) Broadcast(b []byte) {
|
||||
|
||||
// delegate implements memberlist.Delegate and memberlist.EventDelegate
|
||||
// and broadcasts its peer's state in the cluster.
|
||||
type delegate struct {
|
||||
*Peer
|
||||
|
||||
logger log.Logger
|
||||
bcast *memberlist.TransmitLimitedQueue
|
||||
|
||||
messagesReceived *prometheus.CounterVec
|
||||
messagesReceivedSize *prometheus.CounterVec
|
||||
messagesSent *prometheus.CounterVec
|
||||
messagesSentSize *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer) *delegate {
|
||||
bcast := &memberlist.TransmitLimitedQueue{
|
||||
NumNodes: p.ClusterSize,
|
||||
RetransmitMult: 3,
|
||||
}
|
||||
messagesReceived := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_messages_received_total",
|
||||
Help: "Total number of cluster messsages received.",
|
||||
}, []string{"msg_type"})
|
||||
messagesReceivedSize := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_messages_received_size_total",
|
||||
Help: "Total size of cluster messages received.",
|
||||
}, []string{"msg_type"})
|
||||
messagesSent := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_messages_sent_total",
|
||||
Help: "Total number of cluster messsages sent.",
|
||||
}, []string{"msg_type"})
|
||||
messagesSentSize := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_messages_sent_size_total",
|
||||
Help: "Total size of cluster messages sent.",
|
||||
}, []string{"msg_type"})
|
||||
gossipClusterMembers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_cluster_members",
|
||||
Help: "Number indicating current number of members in cluster.",
|
||||
}, func() float64 {
|
||||
return float64(p.ClusterSize())
|
||||
})
|
||||
peerPosition := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_peer_position",
|
||||
Help: "Position the Alertmanager instance believes it's in. The position determines a peer's behavior in the cluster.",
|
||||
}, func() float64 {
|
||||
return float64(p.Position())
|
||||
})
|
||||
healthScore := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_cluster_health_score",
|
||||
Help: "Health score of the cluster. Lower values are better and zero means 'totally healthy'.",
|
||||
}, func() float64 {
|
||||
return float64(p.mlist.GetHealthScore())
|
||||
})
|
||||
messagesQueued := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_cluster_messages_queued",
|
||||
Help: "Number of cluster messsages which are queued.",
|
||||
}, func() float64 {
|
||||
return float64(bcast.NumQueued())
|
||||
})
|
||||
|
||||
messagesReceived.WithLabelValues("full_state")
|
||||
messagesReceivedSize.WithLabelValues("full_state")
|
||||
messagesReceived.WithLabelValues("update")
|
||||
messagesReceivedSize.WithLabelValues("update")
|
||||
messagesSent.WithLabelValues("full_state")
|
||||
messagesSentSize.WithLabelValues("full_state")
|
||||
messagesSent.WithLabelValues("update")
|
||||
messagesSentSize.WithLabelValues("update")
|
||||
|
||||
reg.MustRegister(messagesReceived, messagesReceivedSize, messagesSent, messagesSentSize,
|
||||
gossipClusterMembers, peerPosition, healthScore, messagesQueued)
|
||||
|
||||
return &delegate{
|
||||
logger: l,
|
||||
Peer: p,
|
||||
bcast: bcast,
|
||||
messagesReceived: messagesReceived,
|
||||
messagesReceivedSize: messagesReceivedSize,
|
||||
messagesSent: messagesSent,
|
||||
messagesSentSize: messagesSentSize,
|
||||
}
|
||||
}
|
||||
|
||||
// NodeMeta retrieves meta-data about the current node when broadcasting an alive message.
|
||||
func (d *delegate) NodeMeta(limit int) []byte {
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
// NotifyMsg is the callback invoked when a user-level gossip message is received.
|
||||
func (d *delegate) NotifyMsg(b []byte) {
|
||||
d.messagesReceived.WithLabelValues("update").Inc()
|
||||
d.messagesReceivedSize.WithLabelValues("update").Add(float64(len(b)))
|
||||
|
||||
var p clusterpb.Part
|
||||
if err := proto.Unmarshal(b, &p); err != nil {
|
||||
level.Warn(d.logger).Log("msg", "decode broadcast", "err", err)
|
||||
return
|
||||
}
|
||||
s, ok := d.states[p.Key]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err := s.Merge(p.Data); err != nil {
|
||||
level.Warn(d.logger).Log("msg", "merge broadcast", "err", err, "key", p.Key)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// GetBroadcasts is called when user data messages can be broadcasted.
|
||||
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
||||
msgs := d.bcast.GetBroadcasts(overhead, limit)
|
||||
d.messagesSent.WithLabelValues("update").Add(float64(len(msgs)))
|
||||
for _, m := range msgs {
|
||||
d.messagesSentSize.WithLabelValues("update").Add(float64(len(m)))
|
||||
}
|
||||
return msgs
|
||||
}
|
||||
|
||||
// LocalState is called when gossip fetches local state.
|
||||
func (d *delegate) LocalState(_ bool) []byte {
|
||||
all := &clusterpb.FullState{
|
||||
Parts: make([]clusterpb.Part, 0, len(d.states)),
|
||||
}
|
||||
for key, s := range d.states {
|
||||
b, err := s.MarshalBinary()
|
||||
if err != nil {
|
||||
level.Warn(d.logger).Log("msg", "encode local state", "err", err, "key", key)
|
||||
return nil
|
||||
}
|
||||
all.Parts = append(all.Parts, clusterpb.Part{Key: key, Data: b})
|
||||
}
|
||||
b, err := proto.Marshal(all)
|
||||
if err != nil {
|
||||
level.Warn(d.logger).Log("msg", "encode local state", "err", err)
|
||||
return nil
|
||||
}
|
||||
d.messagesSent.WithLabelValues("full_state").Inc()
|
||||
d.messagesSentSize.WithLabelValues("full_state").Add(float64(len(b)))
|
||||
return b
|
||||
}
|
||||
|
||||
func (d *delegate) MergeRemoteState(buf []byte, _ bool) {
|
||||
d.messagesReceived.WithLabelValues("full_state").Inc()
|
||||
d.messagesReceivedSize.WithLabelValues("full_state").Add(float64(len(buf)))
|
||||
|
||||
var fs clusterpb.FullState
|
||||
if err := proto.Unmarshal(buf, &fs); err != nil {
|
||||
level.Warn(d.logger).Log("msg", "merge remote state", "err", err)
|
||||
return
|
||||
}
|
||||
d.mtx.RLock()
|
||||
defer d.mtx.RUnlock()
|
||||
|
||||
for _, p := range fs.Parts {
|
||||
s, ok := d.states[p.Key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if err := s.Merge(p.Data); err != nil {
|
||||
level.Warn(d.logger).Log("msg", "merge remote state", "err", err, "key", p.Key)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyJoin is called if a peer joins the cluster.
|
||||
func (d *delegate) NotifyJoin(n *memberlist.Node) {
|
||||
level.Debug(d.logger).Log("received", "NotifyJoin", "node", n.Name, "addr", n.Address())
|
||||
}
|
||||
|
||||
// NotifyLeave is called if a peer leaves the cluster.
|
||||
func (d *delegate) NotifyLeave(n *memberlist.Node) {
|
||||
level.Debug(d.logger).Log("received", "NotifyLeave", "node", n.Name, "addr", n.Address())
|
||||
}
|
||||
|
||||
// NotifyUpdate is called if a cluster peer gets updated.
|
||||
func (d *delegate) NotifyUpdate(n *memberlist.Node) {
|
||||
level.Debug(d.logger).Log("received", "NotifyUpdate", "node", n.Name, "addr", n.Address())
|
||||
}
|
||||
|
||||
func resolvePeers(ctx context.Context, peers []string, myAddress string, res net.Resolver, waitIfEmpty bool) ([]string, error) {
|
||||
var resolvedPeers []string
|
||||
|
||||
@ -626,3 +684,14 @@ func retry(interval time.Duration, stopc <-chan struct{}, f func() error) error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func removeOldPeer(old []peer, name string) []peer {
|
||||
new := make([]peer, 0, len(old))
|
||||
for _, p := range old {
|
||||
if p.Name != name {
|
||||
new = append(new, p)
|
||||
}
|
||||
}
|
||||
|
||||
return new
|
||||
}
|
||||
|
@ -24,25 +24,185 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func TestJoin(t *testing.T) {
|
||||
func TestJoinLeave(t *testing.T) {
|
||||
logger := log.NewNopLogger()
|
||||
p, err := Join(logger,
|
||||
prometheus.DefaultRegisterer,
|
||||
p, err := Join(
|
||||
logger,
|
||||
prometheus.NewRegistry(),
|
||||
"0.0.0.0:0",
|
||||
"",
|
||||
[]string{},
|
||||
true,
|
||||
0*time.Second,
|
||||
0*time.Second,
|
||||
0*time.Second,
|
||||
0*time.Second,
|
||||
0*time.Second,
|
||||
DefaultPushPullInterval,
|
||||
DefaultGossipInterval,
|
||||
DefaultTcpTimeout,
|
||||
DefaultProbeTimeout,
|
||||
DefaultProbeInterval,
|
||||
DefaultReconnectInterval,
|
||||
DefaultReconnectTimeout,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.False(t, p == nil)
|
||||
require.NotNil(t, p)
|
||||
require.False(t, p.Ready())
|
||||
require.Equal(t, p.Status(), "settling")
|
||||
go p.Settle(context.Background(), 0*time.Second)
|
||||
p.WaitReady()
|
||||
require.Equal(t, p.Status(), "ready")
|
||||
|
||||
// Create the peer who joins the first.
|
||||
p2, err := Join(
|
||||
logger,
|
||||
prometheus.NewRegistry(),
|
||||
"0.0.0.0:0",
|
||||
"",
|
||||
[]string{p.Self().Address()},
|
||||
true,
|
||||
DefaultPushPullInterval,
|
||||
DefaultGossipInterval,
|
||||
DefaultTcpTimeout,
|
||||
DefaultProbeTimeout,
|
||||
DefaultProbeInterval,
|
||||
DefaultReconnectInterval,
|
||||
DefaultReconnectTimeout,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, p2)
|
||||
go p2.Settle(context.Background(), 0*time.Second)
|
||||
|
||||
require.Equal(t, 2, p.ClusterSize())
|
||||
p2.Leave(0 * time.Second)
|
||||
require.Equal(t, 1, p.ClusterSize())
|
||||
require.Equal(t, 1, len(p.failedPeers))
|
||||
require.Equal(t, p2.Self().Address(), p.peers[p2.Self().Address()].Node.Address())
|
||||
require.Equal(t, p2.Name(), p.failedPeers[0].Name)
|
||||
}
|
||||
|
||||
func TestReconnect(t *testing.T) {
|
||||
logger := log.NewNopLogger()
|
||||
p, err := Join(
|
||||
logger,
|
||||
prometheus.NewRegistry(),
|
||||
"0.0.0.0:0",
|
||||
"",
|
||||
[]string{},
|
||||
true,
|
||||
DefaultPushPullInterval,
|
||||
DefaultGossipInterval,
|
||||
DefaultTcpTimeout,
|
||||
DefaultProbeTimeout,
|
||||
DefaultProbeInterval,
|
||||
DefaultReconnectInterval,
|
||||
DefaultReconnectTimeout,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, p)
|
||||
go p.Settle(context.Background(), 0*time.Second)
|
||||
p.WaitReady()
|
||||
|
||||
p2, err := Join(
|
||||
logger,
|
||||
prometheus.NewRegistry(),
|
||||
"0.0.0.0:0",
|
||||
"",
|
||||
[]string{},
|
||||
true,
|
||||
DefaultPushPullInterval,
|
||||
DefaultGossipInterval,
|
||||
DefaultTcpTimeout,
|
||||
DefaultProbeTimeout,
|
||||
DefaultProbeInterval,
|
||||
DefaultReconnectInterval,
|
||||
DefaultReconnectTimeout,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, p2)
|
||||
go p2.Settle(context.Background(), 0*time.Second)
|
||||
p2.WaitReady()
|
||||
|
||||
p.peerJoin(p2.Self())
|
||||
p.peerLeave(p2.Self())
|
||||
|
||||
require.Equal(t, 1, p.ClusterSize())
|
||||
require.Equal(t, 1, len(p.failedPeers))
|
||||
|
||||
p.reconnect()
|
||||
|
||||
require.Equal(t, 2, p.ClusterSize())
|
||||
require.Equal(t, 0, len(p.failedPeers))
|
||||
require.Equal(t, StatusAlive, p.peers[p2.Self().Address()].status)
|
||||
}
|
||||
|
||||
func TestRemoveFailedPeers(t *testing.T) {
|
||||
logger := log.NewNopLogger()
|
||||
p, err := Join(
|
||||
logger,
|
||||
prometheus.NewRegistry(),
|
||||
"0.0.0.0:0",
|
||||
"",
|
||||
[]string{},
|
||||
true,
|
||||
DefaultPushPullInterval,
|
||||
DefaultGossipInterval,
|
||||
DefaultTcpTimeout,
|
||||
DefaultProbeTimeout,
|
||||
DefaultProbeInterval,
|
||||
DefaultReconnectInterval,
|
||||
DefaultReconnectTimeout,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, p)
|
||||
n := p.Self()
|
||||
|
||||
now := time.Now()
|
||||
p1 := peer{
|
||||
status: StatusFailed,
|
||||
leaveTime: now,
|
||||
Node: n,
|
||||
}
|
||||
p2 := peer{
|
||||
status: StatusFailed,
|
||||
leaveTime: now.Add(-time.Hour),
|
||||
Node: n,
|
||||
}
|
||||
p3 := peer{
|
||||
status: StatusFailed,
|
||||
leaveTime: now.Add(30 * -time.Minute),
|
||||
Node: n,
|
||||
}
|
||||
p.failedPeers = []peer{p1, p2, p3}
|
||||
|
||||
p.removeFailedPeers(30 * time.Minute)
|
||||
require.Equal(t, 1, len(p.failedPeers))
|
||||
require.Equal(t, p1, p.failedPeers[0])
|
||||
}
|
||||
|
||||
func TestInitiallyFailingPeers(t *testing.T) {
|
||||
logger := log.NewNopLogger()
|
||||
peerAddrs := []string{"1.2.3.4:5000", "2.3.4.5:5000", "3.4.5.6:5000"}
|
||||
p, err := Join(
|
||||
logger,
|
||||
prometheus.NewRegistry(),
|
||||
"0.0.0.0:0",
|
||||
"",
|
||||
[]string{},
|
||||
true,
|
||||
DefaultPushPullInterval,
|
||||
DefaultGossipInterval,
|
||||
DefaultTcpTimeout,
|
||||
DefaultProbeTimeout,
|
||||
DefaultProbeInterval,
|
||||
DefaultReconnectInterval,
|
||||
DefaultReconnectTimeout,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, p)
|
||||
|
||||
p.setInitialFailed(peerAddrs)
|
||||
|
||||
require.Equal(t, len(peerAddrs), len(p.failedPeers))
|
||||
for _, addr := range peerAddrs {
|
||||
pr, ok := p.peers[addr]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, StatusNone, pr.status)
|
||||
}
|
||||
}
|
||||
|
193
cluster/delegate.go
Normal file
193
cluster/delegate.go
Normal file
@ -0,0 +1,193 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/hashicorp/memberlist"
|
||||
"github.com/prometheus/alertmanager/cluster/clusterpb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type delegate struct {
|
||||
*Peer
|
||||
|
||||
logger log.Logger
|
||||
bcast *memberlist.TransmitLimitedQueue
|
||||
|
||||
messagesReceived *prometheus.CounterVec
|
||||
messagesReceivedSize *prometheus.CounterVec
|
||||
messagesSent *prometheus.CounterVec
|
||||
messagesSentSize *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer) *delegate {
|
||||
bcast := &memberlist.TransmitLimitedQueue{
|
||||
NumNodes: p.ClusterSize,
|
||||
RetransmitMult: 3,
|
||||
}
|
||||
messagesReceived := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_messages_received_total",
|
||||
Help: "Total number of cluster messsages received.",
|
||||
}, []string{"msg_type"})
|
||||
messagesReceivedSize := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_messages_received_size_total",
|
||||
Help: "Total size of cluster messages received.",
|
||||
}, []string{"msg_type"})
|
||||
messagesSent := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_messages_sent_total",
|
||||
Help: "Total number of cluster messsages sent.",
|
||||
}, []string{"msg_type"})
|
||||
messagesSentSize := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "alertmanager_cluster_messages_sent_size_total",
|
||||
Help: "Total size of cluster messages sent.",
|
||||
}, []string{"msg_type"})
|
||||
gossipClusterMembers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_cluster_members",
|
||||
Help: "Number indicating current number of members in cluster.",
|
||||
}, func() float64 {
|
||||
return float64(p.ClusterSize())
|
||||
})
|
||||
peerPosition := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_peer_position",
|
||||
Help: "Position the Alertmanager instance believes it's in. The position determines a peer's behavior in the cluster.",
|
||||
}, func() float64 {
|
||||
return float64(p.Position())
|
||||
})
|
||||
healthScore := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_cluster_health_score",
|
||||
Help: "Health score of the cluster. Lower values are better and zero means 'totally healthy'.",
|
||||
}, func() float64 {
|
||||
return float64(p.mlist.GetHealthScore())
|
||||
})
|
||||
messagesQueued := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_cluster_messages_queued",
|
||||
Help: "Number of cluster messsages which are queued.",
|
||||
}, func() float64 {
|
||||
return float64(bcast.NumQueued())
|
||||
})
|
||||
|
||||
messagesReceived.WithLabelValues("full_state")
|
||||
messagesReceivedSize.WithLabelValues("full_state")
|
||||
messagesReceived.WithLabelValues("update")
|
||||
messagesReceivedSize.WithLabelValues("update")
|
||||
messagesSent.WithLabelValues("full_state")
|
||||
messagesSentSize.WithLabelValues("full_state")
|
||||
messagesSent.WithLabelValues("update")
|
||||
messagesSentSize.WithLabelValues("update")
|
||||
|
||||
reg.MustRegister(messagesReceived, messagesReceivedSize, messagesSent, messagesSentSize,
|
||||
gossipClusterMembers, peerPosition, healthScore, messagesQueued)
|
||||
|
||||
return &delegate{
|
||||
logger: l,
|
||||
Peer: p,
|
||||
bcast: bcast,
|
||||
messagesReceived: messagesReceived,
|
||||
messagesReceivedSize: messagesReceivedSize,
|
||||
messagesSent: messagesSent,
|
||||
messagesSentSize: messagesSentSize,
|
||||
}
|
||||
}
|
||||
|
||||
// NodeMeta retrieves meta-data about the current node when broadcasting an alive message.
|
||||
func (d *delegate) NodeMeta(limit int) []byte {
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
// NotifyMsg is the callback invoked when a user-level gossip message is received.
|
||||
// NOTE: This is where a node could notify others of its intent to leave, and
|
||||
// avoid being marked as failed.
|
||||
func (d *delegate) NotifyMsg(b []byte) {
|
||||
d.messagesReceived.WithLabelValues("update").Inc()
|
||||
d.messagesReceivedSize.WithLabelValues("update").Add(float64(len(b)))
|
||||
|
||||
var p clusterpb.Part
|
||||
if err := proto.Unmarshal(b, &p); err != nil {
|
||||
level.Warn(d.logger).Log("msg", "decode broadcast", "err", err)
|
||||
return
|
||||
}
|
||||
s, ok := d.states[p.Key]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err := s.Merge(p.Data); err != nil {
|
||||
level.Warn(d.logger).Log("msg", "merge broadcast", "err", err, "key", p.Key)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// GetBroadcasts is called when user data messages can be broadcasted.
|
||||
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
||||
msgs := d.bcast.GetBroadcasts(overhead, limit)
|
||||
d.messagesSent.WithLabelValues("update").Add(float64(len(msgs)))
|
||||
for _, m := range msgs {
|
||||
d.messagesSentSize.WithLabelValues("update").Add(float64(len(m)))
|
||||
}
|
||||
return msgs
|
||||
}
|
||||
|
||||
// LocalState is called when gossip fetches local state.
|
||||
func (d *delegate) LocalState(_ bool) []byte {
|
||||
all := &clusterpb.FullState{
|
||||
Parts: make([]clusterpb.Part, 0, len(d.states)),
|
||||
}
|
||||
for key, s := range d.states {
|
||||
b, err := s.MarshalBinary()
|
||||
if err != nil {
|
||||
level.Warn(d.logger).Log("msg", "encode local state", "err", err, "key", key)
|
||||
return nil
|
||||
}
|
||||
all.Parts = append(all.Parts, clusterpb.Part{Key: key, Data: b})
|
||||
}
|
||||
b, err := proto.Marshal(all)
|
||||
if err != nil {
|
||||
level.Warn(d.logger).Log("msg", "encode local state", "err", err)
|
||||
return nil
|
||||
}
|
||||
d.messagesSent.WithLabelValues("full_state").Inc()
|
||||
d.messagesSentSize.WithLabelValues("full_state").Add(float64(len(b)))
|
||||
return b
|
||||
}
|
||||
|
||||
func (d *delegate) MergeRemoteState(buf []byte, _ bool) {
|
||||
d.messagesReceived.WithLabelValues("full_state").Inc()
|
||||
d.messagesReceivedSize.WithLabelValues("full_state").Add(float64(len(buf)))
|
||||
|
||||
var fs clusterpb.FullState
|
||||
if err := proto.Unmarshal(buf, &fs); err != nil {
|
||||
level.Warn(d.logger).Log("msg", "merge remote state", "err", err)
|
||||
return
|
||||
}
|
||||
d.mtx.RLock()
|
||||
defer d.mtx.RUnlock()
|
||||
|
||||
for _, p := range fs.Parts {
|
||||
s, ok := d.states[p.Key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if err := s.Merge(p.Data); err != nil {
|
||||
level.Warn(d.logger).Log("msg", "merge remote state", "err", err, "key", p.Key)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyJoin is called if a peer joins the cluster.
|
||||
func (d *delegate) NotifyJoin(n *memberlist.Node) {
|
||||
level.Debug(d.logger).Log("received", "NotifyJoin", "node", n.Name, "addr", n.Address())
|
||||
d.Peer.peerJoin(n)
|
||||
}
|
||||
|
||||
// NotifyLeave is called if a peer leaves the cluster.
|
||||
func (d *delegate) NotifyLeave(n *memberlist.Node) {
|
||||
level.Debug(d.logger).Log("received", "NotifyLeave", "node", n.Name, "addr", n.Address())
|
||||
d.Peer.peerLeave(n)
|
||||
}
|
||||
|
||||
// NotifyUpdate is called if a cluster peer gets updated.
|
||||
func (d *delegate) NotifyUpdate(n *memberlist.Node) {
|
||||
level.Debug(d.logger).Log("received", "NotifyUpdate", "node", n.Name, "addr", n.Address())
|
||||
d.Peer.peerUpdate(n)
|
||||
}
|
@ -160,6 +160,8 @@ func main() {
|
||||
probeTimeout = kingpin.Flag("cluster.probe-timeout", "Timeout to wait for an ack from a probed node before assuming it is unhealthy. This should be set to 99-percentile of RTT (round-trip time) on your network.").Default(cluster.DefaultProbeTimeout.String()).Duration()
|
||||
probeInterval = kingpin.Flag("cluster.probe-interval", "Interval between random node probes. Setting this lower (more frequent) will cause the cluster to detect failed nodes more quickly at the expense of increased bandwidth usage.").Default(cluster.DefaultProbeInterval.String()).Duration()
|
||||
settleTimeout = kingpin.Flag("cluster.settle-timeout", "Maximum time to wait for cluster connections to settle before evaluating notifications.").Default(cluster.DefaultPushPullInterval.String()).Duration()
|
||||
reconnectInterval = kingpin.Flag("cluster.reconnect-interval", "Interval between attempting to reconnect to lost peers.").Default(cluster.DefaultReconnectInterval.String()).Duration()
|
||||
peerReconnectTimeout = kingpin.Flag("cluster.reconnect-timeout", "Length of time to attempt to reconnect to a lost peer.").Default(cluster.DefaultReconnectTimeout.String()).Duration()
|
||||
)
|
||||
|
||||
kingpin.Version(version.Print("alertmanager"))
|
||||
@ -180,7 +182,9 @@ func main() {
|
||||
|
||||
var peer *cluster.Peer
|
||||
if *clusterBindAddr != "" {
|
||||
peer, err = cluster.Join(log.With(logger, "component", "cluster"), prometheus.DefaultRegisterer,
|
||||
peer, err = cluster.Join(
|
||||
log.With(logger, "component", "cluster"),
|
||||
prometheus.DefaultRegisterer,
|
||||
*clusterBindAddr,
|
||||
*clusterAdvertiseAddr,
|
||||
*peers,
|
||||
@ -190,6 +194,8 @@ func main() {
|
||||
*tcpTimeout,
|
||||
*probeTimeout,
|
||||
*probeInterval,
|
||||
*reconnectInterval,
|
||||
*peerReconnectTimeout,
|
||||
)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Unable to initialize gossip mesh", "err", err)
|
||||
|
Loading…
Reference in New Issue
Block a user