From 7f34cb471671abb3df6e3ef5e918cfa7fc3c4958 Mon Sep 17 00:00:00 2001 From: Povilas Versockas Date: Fri, 23 Nov 2018 10:47:13 +0200 Subject: [PATCH] cluster: Add cluster peers DNS refresh job (#1428) Adds a job which runs periodically and refreshes cluster.peer dns records. The problem is that when you restart all of the alertmanager instances in an environment like Kubernetes, DNS may contain old alertmanager instance IPs, but on startup (when Join() happens) none of the new instance IPs. As at the start DNS is not empty resolvePeers waitIfEmpty=true, will return and "islands" of 1 alertmanager instances will form. Signed-off-by: Povilas Versockas --- cluster/cluster.go | 64 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 5881a7e7..676c3b3e 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -49,8 +49,13 @@ type Peer struct { peers map[string]peer failedPeers []peer + knownPeers []string + advertiseAddr string + failedReconnectionsCounter prometheus.Counter reconnectionsCounter prometheus.Counter + failedRefreshCounter prometheus.Counter + refreshCounter prometheus.Counter peerLeaveCounter prometheus.Counter peerUpdateCounter prometheus.Counter peerJoinCounter prometheus.Counter @@ -97,6 +102,7 @@ const ( DefaultProbeInterval = 1 * time.Second DefaultReconnectInterval = 10 * time.Second DefaultReconnectTimeout = 6 * time.Hour + DefaultRefreshInterval = 15 * time.Second maxGossipPacketSize = 1400 ) @@ -170,6 +176,7 @@ func Create( logger: l, peers: map[string]peer{}, resolvedPeers: resolvedPeers, + knownPeers: knownPeers, } p.register(reg) @@ -230,6 +237,7 @@ func (p *Peer) Join( if reconnectTimeout != 0 { go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout) } + go p.handleRefresh(DefaultRefreshInterval) return err } @@ -307,6 +315,15 @@ func (p *Peer) register(reg prometheus.Registerer) { Help: "A counter of the number of cluster peer reconnections.", }) + p.failedRefreshCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_cluster_refresh_join_failed_total", + Help: "A counter of the number of failed cluster peer joined attempts via refresh.", + }) + p.refreshCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_cluster_refresh_join_total", + Help: "A counter of the number of cluster peer joined via refresh.", + }) + p.peerLeaveCounter = prometheus.NewCounter(prometheus.CounterOpts{ Name: "alertmanager_cluster_peers_left_total", Help: "A counter of the number of peers that have left.", @@ -321,7 +338,7 @@ func (p *Peer) register(reg prometheus.Registerer) { }) reg.MustRegister(clusterFailedPeers, p.failedReconnectionsCounter, p.reconnectionsCounter, - p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter) + p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter, p.refreshCounter, p.failedRefreshCounter) } func (p *Peer) handleReconnectTimeout(d time.Duration, timeout time.Duration) { @@ -391,6 +408,51 @@ func (p *Peer) reconnect() { } } +func (p *Peer) handleRefresh(d time.Duration) { + tick := time.NewTicker(d) + defer tick.Stop() + + for { + select { + case <-p.stopc: + return + case <-tick.C: + p.refresh() + } + } +} + +func (p *Peer) refresh() { + logger := log.With(p.logger, "msg", "refresh") + + resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, net.Resolver{}, false) + if err != nil { + level.Debug(logger).Log("peers", p.knownPeers, "err", err) + return + } + + members := p.mlist.Members() + for _, peer := range resolvedPeers { + var isPeerFound bool + for _, member := range members { + if member.Address() == peer { + isPeerFound = true + break + } + } + + if !isPeerFound { + if _, err := p.mlist.Join([]string{peer}); err != nil { + p.failedRefreshCounter.Inc() + level.Warn(logger).Log("result", "failure", "addr", peer) + } else { + p.refreshCounter.Inc() + level.Debug(logger).Log("result", "success", "addr", peer) + } + } + } +} + func (p *Peer) peerJoin(n *memberlist.Node) { p.peerLock.Lock() defer p.peerLock.Unlock()