From 1711e72d1b91924402d6229672df409111ad0b6a Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Tue, 9 Mar 2021 14:25:34 +0100 Subject: [PATCH] Clustering: Change WaitReady to accept a Context. WaitReady is a blocking call and so should accept a Context in order to be responsive to cancellation of the notification pipeline for any reason. Signed-off-by: Steve Simpson --- cluster/cluster.go | 9 +++++++-- cluster/cluster_test.go | 11 ++++++++--- notify/notify.go | 6 ++++-- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 04e66789..a12beb95 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -584,8 +584,13 @@ func (p *Peer) Ready() bool { } // Wait until Settle() has finished. -func (p *Peer) WaitReady() { - <-p.readyc +func (p *Peer) WaitReady(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.readyc: + return nil + } } // Return a status string representing the peer state. diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 36ed4203..cd7434d0 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -60,9 +60,14 @@ func testJoinLeave(t *testing.T) { ) require.NoError(t, err) require.False(t, p.Ready()) + { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + require.Equal(t, context.Canceled, p.WaitReady(ctx)) + } require.Equal(t, p.Status(), "settling") go p.Settle(context.Background(), 0*time.Second) - p.WaitReady() + require.NoError(t, p.WaitReady(context.Background())) require.Equal(t, p.Status(), "ready") // Create the peer who joins the first. @@ -119,7 +124,7 @@ func testReconnect(t *testing.T) { ) require.NoError(t, err) go p.Settle(context.Background(), 0*time.Second) - p.WaitReady() + require.NoError(t, p.WaitReady(context.Background())) p2, err := Create( logger, @@ -142,7 +147,7 @@ func testReconnect(t *testing.T) { ) require.NoError(t, err) go p2.Settle(context.Background(), 0*time.Second) - p2.WaitReady() + require.NoError(t, p2.WaitReady(context.Background())) p.peerJoin(p2.Self()) p.peerLeave(p2.Self()) diff --git a/notify/notify.go b/notify/notify.go index 3dcc30f6..2f2205c9 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -44,7 +44,7 @@ type ResolvedSender interface { // Peer represents the cluster node from where we are the sending the notification. type Peer interface { // WaitReady waits until the node silences and notifications have settled before attempting to send a notification. - WaitReady() + WaitReady(context.Context) error } // MinTimeout is the minimum timeout that is set for the context of a call @@ -430,7 +430,9 @@ func NewGossipSettleStage(p Peer) *GossipSettleStage { func (n *GossipSettleStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if n.peer != nil { - n.peer.WaitReady() + if err := n.peer.WaitReady(ctx); err != nil { + return ctx, nil, err + } } return ctx, alerts, nil }