cluster: reduce code duplication

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2019-04-17 16:20:41 +02:00
parent b694eef820
commit 6592692907
1 changed files with 14 additions and 33 deletions

View File

@ -232,12 +232,21 @@ func (p *Peer) Join(
} }
if reconnectInterval != 0 { if reconnectInterval != 0 {
go p.handleReconnect(reconnectInterval) go p.runPeriodicTask(
reconnectInterval,
p.reconnect,
)
} }
if reconnectTimeout != 0 { if reconnectTimeout != 0 {
go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout) go p.runPeriodicTask(
5*time.Minute,
func() { p.removeFailedPeers(reconnectTimeout) },
)
} }
go p.handleRefresh(DefaultRefreshInterval) go p.runPeriodicTask(
DefaultRefreshInterval,
p.refresh,
)
return err return err
} }
@ -341,7 +350,7 @@ func (p *Peer) register(reg prometheus.Registerer) {
p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter, p.refreshCounter, p.failedRefreshCounter) p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter, p.refreshCounter, p.failedRefreshCounter)
} }
func (p *Peer) handleReconnectTimeout(d time.Duration, timeout time.Duration) { func (p *Peer) runPeriodicTask(d time.Duration, f func()) {
tick := time.NewTicker(d) tick := time.NewTicker(d)
defer tick.Stop() defer tick.Stop()
@ -350,7 +359,7 @@ func (p *Peer) handleReconnectTimeout(d time.Duration, timeout time.Duration) {
case <-p.stopc: case <-p.stopc:
return return
case <-tick.C: case <-tick.C:
p.removeFailedPeers(timeout) f()
} }
} }
} }
@ -374,20 +383,6 @@ func (p *Peer) removeFailedPeers(timeout time.Duration) {
p.failedPeers = keep p.failedPeers = keep
} }
func (p *Peer) handleReconnect(d time.Duration) {
tick := time.NewTicker(d)
defer tick.Stop()
for {
select {
case <-p.stopc:
return
case <-tick.C:
p.reconnect()
}
}
}
func (p *Peer) reconnect() { func (p *Peer) reconnect() {
p.peerLock.RLock() p.peerLock.RLock()
failedPeers := p.failedPeers failedPeers := p.failedPeers
@ -408,20 +403,6 @@ func (p *Peer) reconnect() {
} }
} }
func (p *Peer) handleRefresh(d time.Duration) {
tick := time.NewTicker(d)
defer tick.Stop()
for {
select {
case <-p.stopc:
return
case <-tick.C:
p.refresh()
}
}
}
func (p *Peer) refresh() { func (p *Peer) refresh() {
logger := log.With(p.logger, "msg", "refresh") logger := log.With(p.logger, "msg", "refresh")