cluster: Add cluster peers DNS refresh job (#1428)

Adds a job which runs periodically and refreshes cluster.peer dns records.

The problem is that when you restart all of the alertmanager instances in an environment like Kubernetes, DNS may contain old alertmanager instance IPs, but on startup (when Join() happens) none of the new instance IPs. As at the start DNS is not empty resolvePeers waitIfEmpty=true, will return and "islands" of 1 alertmanager instances will form.

Signed-off-by: Povilas Versockas <p.versockas@gmail.com>
This commit is contained in:
Povilas Versockas 2018-11-23 10:47:13 +02:00 committed by Max Inden
parent 96fce3e8ab
commit 7f34cb4716
1 changed files with 63 additions and 1 deletions

View File

@ -49,8 +49,13 @@ type Peer struct {
peers map[string]peer peers map[string]peer
failedPeers []peer failedPeers []peer
knownPeers []string
advertiseAddr string
failedReconnectionsCounter prometheus.Counter failedReconnectionsCounter prometheus.Counter
reconnectionsCounter prometheus.Counter reconnectionsCounter prometheus.Counter
failedRefreshCounter prometheus.Counter
refreshCounter prometheus.Counter
peerLeaveCounter prometheus.Counter peerLeaveCounter prometheus.Counter
peerUpdateCounter prometheus.Counter peerUpdateCounter prometheus.Counter
peerJoinCounter prometheus.Counter peerJoinCounter prometheus.Counter
@ -97,6 +102,7 @@ const (
DefaultProbeInterval = 1 * time.Second DefaultProbeInterval = 1 * time.Second
DefaultReconnectInterval = 10 * time.Second DefaultReconnectInterval = 10 * time.Second
DefaultReconnectTimeout = 6 * time.Hour DefaultReconnectTimeout = 6 * time.Hour
DefaultRefreshInterval = 15 * time.Second
maxGossipPacketSize = 1400 maxGossipPacketSize = 1400
) )
@ -170,6 +176,7 @@ func Create(
logger: l, logger: l,
peers: map[string]peer{}, peers: map[string]peer{},
resolvedPeers: resolvedPeers, resolvedPeers: resolvedPeers,
knownPeers: knownPeers,
} }
p.register(reg) p.register(reg)
@ -230,6 +237,7 @@ func (p *Peer) Join(
if reconnectTimeout != 0 { if reconnectTimeout != 0 {
go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout) go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout)
} }
go p.handleRefresh(DefaultRefreshInterval)
return err return err
} }
@ -307,6 +315,15 @@ func (p *Peer) register(reg prometheus.Registerer) {
Help: "A counter of the number of cluster peer reconnections.", Help: "A counter of the number of cluster peer reconnections.",
}) })
p.failedRefreshCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_cluster_refresh_join_failed_total",
Help: "A counter of the number of failed cluster peer joined attempts via refresh.",
})
p.refreshCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_cluster_refresh_join_total",
Help: "A counter of the number of cluster peer joined via refresh.",
})
p.peerLeaveCounter = prometheus.NewCounter(prometheus.CounterOpts{ p.peerLeaveCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_cluster_peers_left_total", Name: "alertmanager_cluster_peers_left_total",
Help: "A counter of the number of peers that have left.", Help: "A counter of the number of peers that have left.",
@ -321,7 +338,7 @@ func (p *Peer) register(reg prometheus.Registerer) {
}) })
reg.MustRegister(clusterFailedPeers, p.failedReconnectionsCounter, p.reconnectionsCounter, reg.MustRegister(clusterFailedPeers, p.failedReconnectionsCounter, p.reconnectionsCounter,
p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter) p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter, p.refreshCounter, p.failedRefreshCounter)
} }
func (p *Peer) handleReconnectTimeout(d time.Duration, timeout time.Duration) { func (p *Peer) handleReconnectTimeout(d time.Duration, timeout time.Duration) {
@ -391,6 +408,51 @@ func (p *Peer) reconnect() {
} }
} }
func (p *Peer) handleRefresh(d time.Duration) {
tick := time.NewTicker(d)
defer tick.Stop()
for {
select {
case <-p.stopc:
return
case <-tick.C:
p.refresh()
}
}
}
func (p *Peer) refresh() {
logger := log.With(p.logger, "msg", "refresh")
resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, net.Resolver{}, false)
if err != nil {
level.Debug(logger).Log("peers", p.knownPeers, "err", err)
return
}
members := p.mlist.Members()
for _, peer := range resolvedPeers {
var isPeerFound bool
for _, member := range members {
if member.Address() == peer {
isPeerFound = true
break
}
}
if !isPeerFound {
if _, err := p.mlist.Join([]string{peer}); err != nil {
p.failedRefreshCounter.Inc()
level.Warn(logger).Log("result", "failure", "addr", peer)
} else {
p.refreshCounter.Inc()
level.Debug(logger).Log("result", "success", "addr", peer)
}
}
}
}
func (p *Peer) peerJoin(n *memberlist.Node) { func (p *Peer) peerJoin(n *memberlist.Node) {
p.peerLock.Lock() p.peerLock.Lock()
defer p.peerLock.Unlock() defer p.peerLock.Unlock()