From 42ea9a565b4d64b1466b54106ce1456cb4dd5df6 Mon Sep 17 00:00:00 2001 From: Corentin Chary Date: Mon, 9 Jul 2018 11:16:04 +0200 Subject: [PATCH] cluster: make sure we don't miss the first pushPull (#1456) * cluster: make sure we don't miss the first pushPull During the join, memberlist initiates a pushPull to get initial data. Unfortunately, at this point the nflog and silence listener have not been registered yet, so the first data arrives only after one pushPull cycle (1min by default !). Signed-off-by: Corentin Chary --- cluster/cluster.go | 30 ++++++++++++++++----------- cluster/cluster_test.go | 44 ++++++++++++++++++++++++++++------------ cluster/delegate.go | 4 +++- cmd/alertmanager/main.go | 28 +++++++++++++++++-------- 4 files changed, 71 insertions(+), 35 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index fb2abe6a..f44091cb 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -38,6 +38,8 @@ type Peer struct { mlist *memberlist.Memberlist delegate *delegate + resolvedPeers []string + mtx sync.RWMutex states map[string]State stopc chan struct{} @@ -98,7 +100,7 @@ const ( maxGossipPacketSize = 1400 ) -func Join( +func Create( l log.Logger, reg prometheus.Registerer, bindAddr string, @@ -110,8 +112,6 @@ func Join( tcpTimeout time.Duration, probeTimeout time.Duration, probeInterval time.Duration, - reconnectInterval time.Duration, - reconnectTimeout time.Duration, ) (*Peer, error) { bindHost, bindPortStr, err := net.SplitHostPort(bindAddr) if err != nil { @@ -164,11 +164,12 @@ func Join( } p := &Peer{ - states: map[string]State{}, - stopc: make(chan struct{}), - readyc: make(chan struct{}), - logger: l, - peers: map[string]peer{}, + states: map[string]State{}, + stopc: make(chan struct{}), + readyc: make(chan struct{}), + logger: l, + peers: map[string]peer{}, + resolvedPeers: resolvedPeers, } p.register(reg) @@ -207,12 +208,17 @@ func Join( return nil, errors.Wrap(err, "create memberlist") } p.mlist = ml + return p, nil +} - n, err := ml.Join(resolvedPeers) +func (p *Peer) Join( + reconnectInterval time.Duration, + reconnectTimeout time.Duration) error { + n, err := p.mlist.Join(p.resolvedPeers) if err != nil { - level.Warn(l).Log("msg", "failed to join cluster", "err", err) + level.Warn(p.logger).Log("msg", "failed to join cluster", "err", err) } else { - level.Debug(l).Log("msg", "joined cluster", "peers", n) + level.Debug(p.logger).Log("msg", "joined cluster", "peers", n) } if reconnectInterval != 0 { @@ -222,7 +228,7 @@ func Join( go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout) } - return p, nil + return err } // All peers are initially added to the failed list. They will be removed from diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 8333ac09..d313dfba 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -26,7 +26,7 @@ import ( func TestJoinLeave(t *testing.T) { logger := log.NewNopLogger() - p, err := Join( + p, err := Create( logger, prometheus.NewRegistry(), "0.0.0.0:0", @@ -38,11 +38,14 @@ func TestJoinLeave(t *testing.T) { DefaultTcpTimeout, DefaultProbeTimeout, DefaultProbeInterval, + ) + require.NoError(t, err) + require.NotNil(t, p) + err = p.Join( DefaultReconnectInterval, DefaultReconnectTimeout, ) require.NoError(t, err) - require.NotNil(t, p) require.False(t, p.Ready()) require.Equal(t, p.Status(), "settling") go p.Settle(context.Background(), 0*time.Second) @@ -50,7 +53,7 @@ func TestJoinLeave(t *testing.T) { require.Equal(t, p.Status(), "ready") // Create the peer who joins the first. - p2, err := Join( + p2, err := Create( logger, prometheus.NewRegistry(), "0.0.0.0:0", @@ -62,11 +65,14 @@ func TestJoinLeave(t *testing.T) { DefaultTcpTimeout, DefaultProbeTimeout, DefaultProbeInterval, + ) + require.NoError(t, err) + require.NotNil(t, p2) + err = p2.Join( DefaultReconnectInterval, DefaultReconnectTimeout, ) require.NoError(t, err) - require.NotNil(t, p2) go p2.Settle(context.Background(), 0*time.Second) require.Equal(t, 2, p.ClusterSize()) @@ -79,7 +85,7 @@ func TestJoinLeave(t *testing.T) { func TestReconnect(t *testing.T) { logger := log.NewNopLogger() - p, err := Join( + p, err := Create( logger, prometheus.NewRegistry(), "0.0.0.0:0", @@ -91,15 +97,18 @@ func TestReconnect(t *testing.T) { DefaultTcpTimeout, DefaultProbeTimeout, DefaultProbeInterval, - DefaultReconnectInterval, - DefaultReconnectTimeout, ) require.NoError(t, err) require.NotNil(t, p) + err = p.Join( + DefaultReconnectInterval, + DefaultReconnectTimeout, + ) + require.NoError(t, err) go p.Settle(context.Background(), 0*time.Second) p.WaitReady() - p2, err := Join( + p2, err := Create( logger, prometheus.NewRegistry(), "0.0.0.0:0", @@ -111,11 +120,14 @@ func TestReconnect(t *testing.T) { DefaultTcpTimeout, DefaultProbeTimeout, DefaultProbeInterval, + ) + require.NoError(t, err) + require.NotNil(t, p2) + err = p2.Join( DefaultReconnectInterval, DefaultReconnectTimeout, ) require.NoError(t, err) - require.NotNil(t, p2) go p2.Settle(context.Background(), 0*time.Second) p2.WaitReady() @@ -134,7 +146,7 @@ func TestReconnect(t *testing.T) { func TestRemoveFailedPeers(t *testing.T) { logger := log.NewNopLogger() - p, err := Join( + p, err := Create( logger, prometheus.NewRegistry(), "0.0.0.0:0", @@ -146,11 +158,14 @@ func TestRemoveFailedPeers(t *testing.T) { DefaultTcpTimeout, DefaultProbeTimeout, DefaultProbeInterval, + ) + require.NoError(t, err) + require.NotNil(t, p) + err = p.Join( DefaultReconnectInterval, DefaultReconnectTimeout, ) require.NoError(t, err) - require.NotNil(t, p) n := p.Self() now := time.Now() @@ -180,7 +195,7 @@ func TestInitiallyFailingPeers(t *testing.T) { logger := log.NewNopLogger() myAddr := "1.2.3.4:5000" peerAddrs := []string{myAddr, "2.3.4.5:5000", "3.4.5.6:5000", "foo.example.com:5000"} - p, err := Join( + p, err := Create( logger, prometheus.NewRegistry(), "0.0.0.0:0", @@ -192,11 +207,14 @@ func TestInitiallyFailingPeers(t *testing.T) { DefaultTcpTimeout, DefaultProbeTimeout, DefaultProbeInterval, + ) + require.NoError(t, err) + require.NotNil(t, p) + err = p.Join( DefaultReconnectInterval, DefaultReconnectTimeout, ) require.NoError(t, err) - require.NotNil(t, p) p.setInitialFailed(peerAddrs, myAddr) diff --git a/cluster/delegate.go b/cluster/delegate.go index 6ac050ec..9c00318d 100644 --- a/cluster/delegate.go +++ b/cluster/delegate.go @@ -135,6 +135,7 @@ func (d *delegate) NotifyMsg(b []byte) { level.Warn(d.logger).Log("msg", "decode broadcast", "err", err) return } + s, ok := d.states[p.Key] if !ok { return @@ -160,6 +161,7 @@ func (d *delegate) LocalState(_ bool) []byte { all := &clusterpb.FullState{ Parts: make([]clusterpb.Part, 0, len(d.states)), } + for key, s := range d.states { b, err := s.MarshalBinary() if err != nil { @@ -189,10 +191,10 @@ func (d *delegate) MergeRemoteState(buf []byte, _ bool) { } d.mtx.RLock() defer d.mtx.RUnlock() - for _, p := range fs.Parts { s, ok := d.states[p.Key] if !ok { + level.Warn(d.logger).Log("received", "unknown state key", "len", len(buf), "key", p.Key) continue } if err := s.Merge(p.Data); err != nil { diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index a242da2d..e331b566 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -182,7 +182,7 @@ func main() { var peer *cluster.Peer if *clusterBindAddr != "" { - peer, err = cluster.Join( + peer, err = cluster.Create( log.With(logger, "component", "cluster"), prometheus.DefaultRegisterer, *clusterBindAddr, @@ -194,19 +194,11 @@ func main() { *tcpTimeout, *probeTimeout, *probeInterval, - *reconnectInterval, - *peerReconnectTimeout, ) if err != nil { level.Error(logger).Log("msg", "Unable to initialize gossip mesh", "err", err) os.Exit(1) } - ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout) - defer func() { - cancel() - peer.Leave(10 * time.Second) - }() - go peer.Settle(ctx, *gossipInterval*10) } stopc := make(chan struct{}) @@ -263,6 +255,24 @@ func main() { wg.Wait() }() + // Peer state listeners have been registered, now we can join and get the initial state. + if peer != nil { + err = peer.Join( + *reconnectInterval, + *peerReconnectTimeout, + ) + if err != nil { + level.Error(logger).Log("msg", "Unable to join gossip mesh", "err", err) + os.Exit(1) + } + ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout) + defer func() { + cancel() + peer.Leave(10 * time.Second) + }() + go peer.Settle(ctx, *pushPullInterval*10) + } + alerts, err := mem.NewAlerts(marker, *alertGCInterval) if err != nil { level.Error(logger).Log("err", err)