diff --git a/cluster/cluster.go b/cluster/cluster.go index ec02cc7b..20dd3148 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -232,12 +232,21 @@ func (p *Peer) Join( } if reconnectInterval != 0 { - go p.handleReconnect(reconnectInterval) + go p.runPeriodicTask( + reconnectInterval, + p.reconnect, + ) } if reconnectTimeout != 0 { - go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout) + go p.runPeriodicTask( + 5*time.Minute, + func() { p.removeFailedPeers(reconnectTimeout) }, + ) } - go p.handleRefresh(DefaultRefreshInterval) + go p.runPeriodicTask( + DefaultRefreshInterval, + p.refresh, + ) return err } @@ -341,7 +350,7 @@ func (p *Peer) register(reg prometheus.Registerer) { p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter, p.refreshCounter, p.failedRefreshCounter) } -func (p *Peer) handleReconnectTimeout(d time.Duration, timeout time.Duration) { +func (p *Peer) runPeriodicTask(d time.Duration, f func()) { tick := time.NewTicker(d) defer tick.Stop() @@ -350,7 +359,7 @@ func (p *Peer) handleReconnectTimeout(d time.Duration, timeout time.Duration) { case <-p.stopc: return case <-tick.C: - p.removeFailedPeers(timeout) + f() } } } @@ -374,20 +383,6 @@ func (p *Peer) removeFailedPeers(timeout time.Duration) { p.failedPeers = keep } -func (p *Peer) handleReconnect(d time.Duration) { - tick := time.NewTicker(d) - defer tick.Stop() - - for { - select { - case <-p.stopc: - return - case <-tick.C: - p.reconnect() - } - } -} - func (p *Peer) reconnect() { p.peerLock.RLock() failedPeers := p.failedPeers @@ -408,20 +403,6 @@ func (p *Peer) reconnect() { } } -func (p *Peer) handleRefresh(d time.Duration) { - tick := time.NewTicker(d) - defer tick.Stop() - - for { - select { - case <-p.stopc: - return - case <-tick.C: - p.refresh() - } - } -} - func (p *Peer) refresh() { logger := log.With(p.logger, "msg", "refresh")