diff --git a/cluster/cluster.go b/cluster/cluster.go index 5881a7e7..676c3b3e 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -49,8 +49,13 @@ type Peer struct { peers map[string]peer failedPeers []peer + knownPeers []string + advertiseAddr string + failedReconnectionsCounter prometheus.Counter reconnectionsCounter prometheus.Counter + failedRefreshCounter prometheus.Counter + refreshCounter prometheus.Counter peerLeaveCounter prometheus.Counter peerUpdateCounter prometheus.Counter peerJoinCounter prometheus.Counter @@ -97,6 +102,7 @@ const ( DefaultProbeInterval = 1 * time.Second DefaultReconnectInterval = 10 * time.Second DefaultReconnectTimeout = 6 * time.Hour + DefaultRefreshInterval = 15 * time.Second maxGossipPacketSize = 1400 ) @@ -170,6 +176,7 @@ func Create( logger: l, peers: map[string]peer{}, resolvedPeers: resolvedPeers, + knownPeers: knownPeers, } p.register(reg) @@ -230,6 +237,7 @@ func (p *Peer) Join( if reconnectTimeout != 0 { go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout) } + go p.handleRefresh(DefaultRefreshInterval) return err } @@ -307,6 +315,15 @@ func (p *Peer) register(reg prometheus.Registerer) { Help: "A counter of the number of cluster peer reconnections.", }) + p.failedRefreshCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_cluster_refresh_join_failed_total", + Help: "A counter of the number of failed cluster peer joined attempts via refresh.", + }) + p.refreshCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_cluster_refresh_join_total", + Help: "A counter of the number of cluster peer joined via refresh.", + }) + p.peerLeaveCounter = prometheus.NewCounter(prometheus.CounterOpts{ Name: "alertmanager_cluster_peers_left_total", Help: "A counter of the number of peers that have left.", @@ -321,7 +338,7 @@ func (p *Peer) register(reg prometheus.Registerer) { }) reg.MustRegister(clusterFailedPeers, p.failedReconnectionsCounter, p.reconnectionsCounter, - p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter) + p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter, p.refreshCounter, p.failedRefreshCounter) } func (p *Peer) handleReconnectTimeout(d time.Duration, timeout time.Duration) { @@ -391,6 +408,51 @@ func (p *Peer) reconnect() { } } +func (p *Peer) handleRefresh(d time.Duration) { + tick := time.NewTicker(d) + defer tick.Stop() + + for { + select { + case <-p.stopc: + return + case <-tick.C: + p.refresh() + } + } +} + +func (p *Peer) refresh() { + logger := log.With(p.logger, "msg", "refresh") + + resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, net.Resolver{}, false) + if err != nil { + level.Debug(logger).Log("peers", p.knownPeers, "err", err) + return + } + + members := p.mlist.Members() + for _, peer := range resolvedPeers { + var isPeerFound bool + for _, member := range members { + if member.Address() == peer { + isPeerFound = true + break + } + } + + if !isPeerFound { + if _, err := p.mlist.Join([]string{peer}); err != nil { + p.failedRefreshCounter.Inc() + level.Warn(logger).Log("result", "failure", "addr", peer) + } else { + p.refreshCounter.Inc() + level.Debug(logger).Log("result", "success", "addr", peer) + } + } + } +} + func (p *Peer) peerJoin(n *memberlist.Node) { p.peerLock.Lock() defer p.peerLock.Unlock()