From 9a89828c1fcb71d8bc0f3513774d55ac81145450 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Wed, 8 Mar 2023 15:10:28 +0000 Subject: [PATCH] Test Flake: Fix Cluster join/leave flake Signed-off-by: gotjosh --- cluster/cluster.go | 13 ++++++++----- cluster/cluster_test.go | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index ba7fdb9e..6298ce67 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/benbjohnson/clock" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/hashicorp/memberlist" @@ -57,6 +58,7 @@ type ClusterChannel interface { // Peer is a single peer in a gossip cluster. type Peer struct { + clock clock.Clock mlist *memberlist.Memberlist delegate *delegate @@ -194,6 +196,7 @@ func Create( } p := &Peer{ + clock: clock.New(), states: map[string]State{}, stopc: make(chan struct{}), readyc: make(chan struct{}), @@ -394,7 +397,7 @@ func (p *Peer) register(reg prometheus.Registerer, name string) { } func (p *Peer) runPeriodicTask(d time.Duration, f func()) { - tick := time.NewTicker(d) + tick := p.clock.Ticker(d) defer tick.Stop() for { @@ -679,20 +682,20 @@ func (p *Peer) Position() int { func (p *Peer) Settle(ctx context.Context, interval time.Duration) { const NumOkayRequired = 3 level.Info(p.logger).Log("msg", "Waiting for gossip to settle...", "interval", interval) - start := time.Now() + start := p.clock.Now() nPeers := 0 nOkay := 0 totalPolls := 0 for { select { case <-ctx.Done(): - elapsed := time.Since(start) + elapsed := p.clock.Since(start) level.Info(p.logger).Log("msg", "gossip not settled but continuing anyway", "polls", totalPolls, "elapsed", elapsed) close(p.readyc) return - case <-time.After(interval): + case <-p.clock.After(interval): } - elapsed := time.Since(start) + elapsed := p.clock.Since(start) n := len(p.Peers()) if nOkay >= NumOkayRequired { level.Info(p.logger).Log("msg", "gossip settled; proceeding", "elapsed", elapsed) diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 87c31d13..da7044f3 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -15,9 +15,11 @@ package cluster import ( "context" + "sync" "testing" "time" + "github.com/benbjohnson/clock" "github.com/go-kit/log" "github.com/hashicorp/go-sockaddr" "github.com/stretchr/testify/require" @@ -57,10 +59,13 @@ func testJoinLeave(t *testing.T) { ) require.NoError(t, err) require.NotNil(t, p) + c1 := clock.NewMock() + p.clock = c1 err = p.Join( DefaultReconnectInterval, DefaultReconnectTimeout, ) + c1.Add(1 * time.Minute) require.NoError(t, err) require.False(t, p.Ready()) { @@ -69,7 +74,17 @@ func testJoinLeave(t *testing.T) { require.Equal(t, context.Canceled, p.WaitReady(ctx)) } require.Equal(t, p.Status(), "settling") - go p.Settle(context.Background(), 0*time.Second) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + p.Settle(context.Background(), 1*time.Second) + }() + time.Sleep(1 * time.Millisecond) + c1.Add(10 * time.Second) + wg.Wait() + require.NoError(t, p.WaitReady(context.Background())) require.Equal(t, p.Status(), "ready") @@ -89,14 +104,27 @@ func testJoinLeave(t *testing.T) { nil, false, ) + c2 := clock.NewMock() + p2.clock = c2 require.NoError(t, err) require.NotNil(t, p2) err = p2.Join( DefaultReconnectInterval, DefaultReconnectTimeout, ) + c2.Add(1 * time.Minute) require.NoError(t, err) - go p2.Settle(context.Background(), 0*time.Second) + + var wg2 sync.WaitGroup + wg2.Add(1) + go func() { + defer wg2.Done() + p2.Settle(context.Background(), 1*time.Second) + }() + time.Sleep(1 * time.Millisecond) + c2.Add(10 * time.Second) + wg2.Wait() + require.NoError(t, p2.WaitReady(context.Background())) require.Equal(t, 2, p.ClusterSize())