cluster: make sure we don't miss the first pushPull (#1456)

* cluster: make sure we don't miss the first pushPull

During the join, memberlist initiates a pushPull to get initial data.
Unfortunately, at this point the nflog and silence listener have not
been registered yet, so the first data arrives only after one pushPull
cycle (1min by default !).

Signed-off-by: Corentin Chary <c.chary@criteo.com>
This commit is contained in:
Corentin Chary 2018-07-09 11:16:04 +02:00 committed by Max Inden
parent f5a258dd1d
commit 42ea9a565b
4 changed files with 71 additions and 35 deletions

View File

@ -38,6 +38,8 @@ type Peer struct {
mlist *memberlist.Memberlist
delegate *delegate
resolvedPeers []string
mtx sync.RWMutex
states map[string]State
stopc chan struct{}
@ -98,7 +100,7 @@ const (
maxGossipPacketSize = 1400
)
func Join(
func Create(
l log.Logger,
reg prometheus.Registerer,
bindAddr string,
@ -110,8 +112,6 @@ func Join(
tcpTimeout time.Duration,
probeTimeout time.Duration,
probeInterval time.Duration,
reconnectInterval time.Duration,
reconnectTimeout time.Duration,
) (*Peer, error) {
bindHost, bindPortStr, err := net.SplitHostPort(bindAddr)
if err != nil {
@ -164,11 +164,12 @@ func Join(
}
p := &Peer{
states: map[string]State{},
stopc: make(chan struct{}),
readyc: make(chan struct{}),
logger: l,
peers: map[string]peer{},
states: map[string]State{},
stopc: make(chan struct{}),
readyc: make(chan struct{}),
logger: l,
peers: map[string]peer{},
resolvedPeers: resolvedPeers,
}
p.register(reg)
@ -207,12 +208,17 @@ func Join(
return nil, errors.Wrap(err, "create memberlist")
}
p.mlist = ml
return p, nil
}
n, err := ml.Join(resolvedPeers)
func (p *Peer) Join(
reconnectInterval time.Duration,
reconnectTimeout time.Duration) error {
n, err := p.mlist.Join(p.resolvedPeers)
if err != nil {
level.Warn(l).Log("msg", "failed to join cluster", "err", err)
level.Warn(p.logger).Log("msg", "failed to join cluster", "err", err)
} else {
level.Debug(l).Log("msg", "joined cluster", "peers", n)
level.Debug(p.logger).Log("msg", "joined cluster", "peers", n)
}
if reconnectInterval != 0 {
@ -222,7 +228,7 @@ func Join(
go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout)
}
return p, nil
return err
}
// All peers are initially added to the failed list. They will be removed from

View File

@ -26,7 +26,7 @@ import (
func TestJoinLeave(t *testing.T) {
logger := log.NewNopLogger()
p, err := Join(
p, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
@ -38,11 +38,14 @@ func TestJoinLeave(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
err = p.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p)
require.False(t, p.Ready())
require.Equal(t, p.Status(), "settling")
go p.Settle(context.Background(), 0*time.Second)
@ -50,7 +53,7 @@ func TestJoinLeave(t *testing.T) {
require.Equal(t, p.Status(), "ready")
// Create the peer who joins the first.
p2, err := Join(
p2, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
@ -62,11 +65,14 @@ func TestJoinLeave(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p2)
err = p2.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p2)
go p2.Settle(context.Background(), 0*time.Second)
require.Equal(t, 2, p.ClusterSize())
@ -79,7 +85,7 @@ func TestJoinLeave(t *testing.T) {
func TestReconnect(t *testing.T) {
logger := log.NewNopLogger()
p, err := Join(
p, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
@ -91,15 +97,18 @@ func TestReconnect(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p)
err = p.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
go p.Settle(context.Background(), 0*time.Second)
p.WaitReady()
p2, err := Join(
p2, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
@ -111,11 +120,14 @@ func TestReconnect(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p2)
err = p2.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p2)
go p2.Settle(context.Background(), 0*time.Second)
p2.WaitReady()
@ -134,7 +146,7 @@ func TestReconnect(t *testing.T) {
func TestRemoveFailedPeers(t *testing.T) {
logger := log.NewNopLogger()
p, err := Join(
p, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
@ -146,11 +158,14 @@ func TestRemoveFailedPeers(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
err = p.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p)
n := p.Self()
now := time.Now()
@ -180,7 +195,7 @@ func TestInitiallyFailingPeers(t *testing.T) {
logger := log.NewNopLogger()
myAddr := "1.2.3.4:5000"
peerAddrs := []string{myAddr, "2.3.4.5:5000", "3.4.5.6:5000", "foo.example.com:5000"}
p, err := Join(
p, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
@ -192,11 +207,14 @@ func TestInitiallyFailingPeers(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
err = p.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p)
p.setInitialFailed(peerAddrs, myAddr)

View File

@ -135,6 +135,7 @@ func (d *delegate) NotifyMsg(b []byte) {
level.Warn(d.logger).Log("msg", "decode broadcast", "err", err)
return
}
s, ok := d.states[p.Key]
if !ok {
return
@ -160,6 +161,7 @@ func (d *delegate) LocalState(_ bool) []byte {
all := &clusterpb.FullState{
Parts: make([]clusterpb.Part, 0, len(d.states)),
}
for key, s := range d.states {
b, err := s.MarshalBinary()
if err != nil {
@ -189,10 +191,10 @@ func (d *delegate) MergeRemoteState(buf []byte, _ bool) {
}
d.mtx.RLock()
defer d.mtx.RUnlock()
for _, p := range fs.Parts {
s, ok := d.states[p.Key]
if !ok {
level.Warn(d.logger).Log("received", "unknown state key", "len", len(buf), "key", p.Key)
continue
}
if err := s.Merge(p.Data); err != nil {

View File

@ -182,7 +182,7 @@ func main() {
var peer *cluster.Peer
if *clusterBindAddr != "" {
peer, err = cluster.Join(
peer, err = cluster.Create(
log.With(logger, "component", "cluster"),
prometheus.DefaultRegisterer,
*clusterBindAddr,
@ -194,19 +194,11 @@ func main() {
*tcpTimeout,
*probeTimeout,
*probeInterval,
*reconnectInterval,
*peerReconnectTimeout,
)
if err != nil {
level.Error(logger).Log("msg", "Unable to initialize gossip mesh", "err", err)
os.Exit(1)
}
ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout)
defer func() {
cancel()
peer.Leave(10 * time.Second)
}()
go peer.Settle(ctx, *gossipInterval*10)
}
stopc := make(chan struct{})
@ -263,6 +255,24 @@ func main() {
wg.Wait()
}()
// Peer state listeners have been registered, now we can join and get the initial state.
if peer != nil {
err = peer.Join(
*reconnectInterval,
*peerReconnectTimeout,
)
if err != nil {
level.Error(logger).Log("msg", "Unable to join gossip mesh", "err", err)
os.Exit(1)
}
ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout)
defer func() {
cancel()
peer.Leave(10 * time.Second)
}()
go peer.Settle(ctx, *pushPullInterval*10)
}
alerts, err := mem.NewAlerts(marker, *alertGCInterval)
if err != nil {
level.Error(logger).Log("err", err)