Clustering: Change WaitReady to accept a Context.
WaitReady is a blocking call and so should accept a Context in order to be responsive to cancellation of the notification pipeline for any reason. Signed-off-by: Steve Simpson <steve.simpson@grafana.com>
This commit is contained in:
parent
0fef08028b
commit
1711e72d1b
|
@ -584,8 +584,13 @@ func (p *Peer) Ready() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait until Settle() has finished.
|
// Wait until Settle() has finished.
|
||||||
func (p *Peer) WaitReady() {
|
func (p *Peer) WaitReady(ctx context.Context) error {
|
||||||
<-p.readyc
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-p.readyc:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a status string representing the peer state.
|
// Return a status string representing the peer state.
|
||||||
|
|
|
@ -60,9 +60,14 @@ func testJoinLeave(t *testing.T) {
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.False(t, p.Ready())
|
require.False(t, p.Ready())
|
||||||
|
{
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel()
|
||||||
|
require.Equal(t, context.Canceled, p.WaitReady(ctx))
|
||||||
|
}
|
||||||
require.Equal(t, p.Status(), "settling")
|
require.Equal(t, p.Status(), "settling")
|
||||||
go p.Settle(context.Background(), 0*time.Second)
|
go p.Settle(context.Background(), 0*time.Second)
|
||||||
p.WaitReady()
|
require.NoError(t, p.WaitReady(context.Background()))
|
||||||
require.Equal(t, p.Status(), "ready")
|
require.Equal(t, p.Status(), "ready")
|
||||||
|
|
||||||
// Create the peer who joins the first.
|
// Create the peer who joins the first.
|
||||||
|
@ -119,7 +124,7 @@ func testReconnect(t *testing.T) {
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
go p.Settle(context.Background(), 0*time.Second)
|
go p.Settle(context.Background(), 0*time.Second)
|
||||||
p.WaitReady()
|
require.NoError(t, p.WaitReady(context.Background()))
|
||||||
|
|
||||||
p2, err := Create(
|
p2, err := Create(
|
||||||
logger,
|
logger,
|
||||||
|
@ -142,7 +147,7 @@ func testReconnect(t *testing.T) {
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
go p2.Settle(context.Background(), 0*time.Second)
|
go p2.Settle(context.Background(), 0*time.Second)
|
||||||
p2.WaitReady()
|
require.NoError(t, p2.WaitReady(context.Background()))
|
||||||
|
|
||||||
p.peerJoin(p2.Self())
|
p.peerJoin(p2.Self())
|
||||||
p.peerLeave(p2.Self())
|
p.peerLeave(p2.Self())
|
||||||
|
|
|
@ -44,7 +44,7 @@ type ResolvedSender interface {
|
||||||
// Peer represents the cluster node from where we are the sending the notification.
|
// Peer represents the cluster node from where we are the sending the notification.
|
||||||
type Peer interface {
|
type Peer interface {
|
||||||
// WaitReady waits until the node silences and notifications have settled before attempting to send a notification.
|
// WaitReady waits until the node silences and notifications have settled before attempting to send a notification.
|
||||||
WaitReady()
|
WaitReady(context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// MinTimeout is the minimum timeout that is set for the context of a call
|
// MinTimeout is the minimum timeout that is set for the context of a call
|
||||||
|
@ -430,7 +430,9 @@ func NewGossipSettleStage(p Peer) *GossipSettleStage {
|
||||||
|
|
||||||
func (n *GossipSettleStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
|
func (n *GossipSettleStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
|
||||||
if n.peer != nil {
|
if n.peer != nil {
|
||||||
n.peer.WaitReady()
|
if err := n.peer.WaitReady(ctx); err != nil {
|
||||||
|
return ctx, nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ctx, alerts, nil
|
return ctx, alerts, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue