mirror of
https://github.com/prometheus/alertmanager
synced 2025-02-16 18:47:10 +00:00
Adds a job which runs periodically and refreshes cluster.peer dns records. The problem is that when you restart all of the alertmanager instances in an environment like Kubernetes, DNS may contain old alertmanager instance IPs, but on startup (when Join() happens) none of the new instance IPs. As at the start DNS is not empty resolvePeers waitIfEmpty=true, will return and "islands" of 1 alertmanager instances will form. Signed-off-by: Povilas Versockas <p.versockas@gmail.com>
812 lines
21 KiB
Go
812 lines
21 KiB
Go
// Copyright 2018 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/hashicorp/memberlist"
|
|
"github.com/oklog/ulid"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
// Peer is a single peer in a gossip cluster.
|
|
type Peer struct {
|
|
mlist *memberlist.Memberlist
|
|
delegate *delegate
|
|
|
|
resolvedPeers []string
|
|
|
|
mtx sync.RWMutex
|
|
states map[string]State
|
|
stopc chan struct{}
|
|
readyc chan struct{}
|
|
|
|
peerLock sync.RWMutex
|
|
peers map[string]peer
|
|
failedPeers []peer
|
|
|
|
knownPeers []string
|
|
advertiseAddr string
|
|
|
|
failedReconnectionsCounter prometheus.Counter
|
|
reconnectionsCounter prometheus.Counter
|
|
failedRefreshCounter prometheus.Counter
|
|
refreshCounter prometheus.Counter
|
|
peerLeaveCounter prometheus.Counter
|
|
peerUpdateCounter prometheus.Counter
|
|
peerJoinCounter prometheus.Counter
|
|
|
|
logger log.Logger
|
|
}
|
|
|
|
// peer is an internal type used for bookkeeping. It holds the state of peers
|
|
// in the cluster.
|
|
type peer struct {
|
|
status PeerStatus
|
|
leaveTime time.Time
|
|
|
|
*memberlist.Node
|
|
}
|
|
|
|
// PeerStatus is the state that a peer is in.
|
|
type PeerStatus int
|
|
|
|
const (
|
|
StatusNone PeerStatus = iota
|
|
StatusAlive
|
|
StatusFailed
|
|
)
|
|
|
|
func (s PeerStatus) String() string {
|
|
switch s {
|
|
case StatusNone:
|
|
return "none"
|
|
case StatusAlive:
|
|
return "alive"
|
|
case StatusFailed:
|
|
return "failed"
|
|
default:
|
|
panic(fmt.Sprintf("unknown PeerStatus: %d", s))
|
|
}
|
|
}
|
|
|
|
const (
|
|
DefaultPushPullInterval = 60 * time.Second
|
|
DefaultGossipInterval = 200 * time.Millisecond
|
|
DefaultTcpTimeout = 10 * time.Second
|
|
DefaultProbeTimeout = 500 * time.Millisecond
|
|
DefaultProbeInterval = 1 * time.Second
|
|
DefaultReconnectInterval = 10 * time.Second
|
|
DefaultReconnectTimeout = 6 * time.Hour
|
|
DefaultRefreshInterval = 15 * time.Second
|
|
maxGossipPacketSize = 1400
|
|
)
|
|
|
|
func Create(
|
|
l log.Logger,
|
|
reg prometheus.Registerer,
|
|
bindAddr string,
|
|
advertiseAddr string,
|
|
knownPeers []string,
|
|
waitIfEmpty bool,
|
|
pushPullInterval time.Duration,
|
|
gossipInterval time.Duration,
|
|
tcpTimeout time.Duration,
|
|
probeTimeout time.Duration,
|
|
probeInterval time.Duration,
|
|
) (*Peer, error) {
|
|
bindHost, bindPortStr, err := net.SplitHostPort(bindAddr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
bindPort, err := strconv.Atoi(bindPortStr)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "invalid listen address")
|
|
}
|
|
|
|
var advertiseHost string
|
|
var advertisePort int
|
|
if advertiseAddr != "" {
|
|
var advertisePortStr string
|
|
advertiseHost, advertisePortStr, err = net.SplitHostPort(advertiseAddr)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "invalid advertise address")
|
|
}
|
|
advertisePort, err = strconv.Atoi(advertisePortStr)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "invalid advertise address, wrong port")
|
|
}
|
|
}
|
|
|
|
resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, net.Resolver{}, waitIfEmpty)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "resolve peers")
|
|
}
|
|
level.Debug(l).Log("msg", "resolved peers to following addresses", "peers", strings.Join(resolvedPeers, ","))
|
|
|
|
// Initial validation of user-specified advertise address.
|
|
addr, err := calculateAdvertiseAddress(bindHost, advertiseHost)
|
|
if err != nil {
|
|
level.Warn(l).Log("err", "couldn't deduce an advertise address: "+err.Error())
|
|
} else if hasNonlocal(resolvedPeers) && isUnroutable(addr.String()) {
|
|
level.Warn(l).Log("err", "this node advertises itself on an unroutable address", "addr", addr.String())
|
|
level.Warn(l).Log("err", "this node will be unreachable in the cluster")
|
|
level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname")
|
|
} else if isAny(bindAddr) && advertiseHost == "" {
|
|
// memberlist doesn't advertise properly when the bind address is empty or unspecified.
|
|
level.Info(l).Log("msg", "setting advertise address explicitly", "addr", addr.String(), "port", bindPort)
|
|
advertiseHost = addr.String()
|
|
advertisePort = bindPort
|
|
}
|
|
|
|
// TODO(fabxc): generate human-readable but random names?
|
|
name, err := ulid.New(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano())))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
p := &Peer{
|
|
states: map[string]State{},
|
|
stopc: make(chan struct{}),
|
|
readyc: make(chan struct{}),
|
|
logger: l,
|
|
peers: map[string]peer{},
|
|
resolvedPeers: resolvedPeers,
|
|
knownPeers: knownPeers,
|
|
}
|
|
|
|
p.register(reg)
|
|
|
|
retransmit := len(knownPeers) / 2
|
|
if retransmit < 3 {
|
|
retransmit = 3
|
|
}
|
|
p.delegate = newDelegate(l, reg, p, retransmit)
|
|
|
|
cfg := memberlist.DefaultLANConfig()
|
|
cfg.Name = name.String()
|
|
cfg.BindAddr = bindHost
|
|
cfg.BindPort = bindPort
|
|
cfg.Delegate = p.delegate
|
|
cfg.Events = p.delegate
|
|
cfg.GossipInterval = gossipInterval
|
|
cfg.PushPullInterval = pushPullInterval
|
|
cfg.TCPTimeout = tcpTimeout
|
|
cfg.ProbeTimeout = probeTimeout
|
|
cfg.ProbeInterval = probeInterval
|
|
cfg.LogOutput = &logWriter{l: l}
|
|
cfg.GossipNodes = retransmit
|
|
cfg.UDPBufferSize = maxGossipPacketSize
|
|
|
|
if advertiseHost != "" {
|
|
cfg.AdvertiseAddr = advertiseHost
|
|
cfg.AdvertisePort = advertisePort
|
|
p.setInitialFailed(resolvedPeers, fmt.Sprintf("%s:%d", advertiseHost, advertisePort))
|
|
} else {
|
|
p.setInitialFailed(resolvedPeers, bindAddr)
|
|
}
|
|
|
|
ml, err := memberlist.Create(cfg)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "create memberlist")
|
|
}
|
|
p.mlist = ml
|
|
return p, nil
|
|
}
|
|
|
|
func (p *Peer) Join(
|
|
reconnectInterval time.Duration,
|
|
reconnectTimeout time.Duration) error {
|
|
n, err := p.mlist.Join(p.resolvedPeers)
|
|
if err != nil {
|
|
level.Warn(p.logger).Log("msg", "failed to join cluster", "err", err)
|
|
if reconnectInterval != 0 {
|
|
level.Info(p.logger).Log("msg", fmt.Sprintf("will retry joining cluster every %v", reconnectInterval.String()))
|
|
}
|
|
} else {
|
|
level.Debug(p.logger).Log("msg", "joined cluster", "peers", n)
|
|
}
|
|
|
|
if reconnectInterval != 0 {
|
|
go p.handleReconnect(reconnectInterval)
|
|
}
|
|
if reconnectTimeout != 0 {
|
|
go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout)
|
|
}
|
|
go p.handleRefresh(DefaultRefreshInterval)
|
|
|
|
return err
|
|
}
|
|
|
|
// All peers are initially added to the failed list. They will be removed from
|
|
// this list in peerJoin when making their initial connection.
|
|
func (p *Peer) setInitialFailed(peers []string, myAddr string) {
|
|
if len(peers) == 0 {
|
|
return
|
|
}
|
|
|
|
p.peerLock.RLock()
|
|
defer p.peerLock.RUnlock()
|
|
|
|
now := time.Now()
|
|
for _, peerAddr := range peers {
|
|
if peerAddr == myAddr {
|
|
// Don't add ourselves to the initially failing list,
|
|
// we don't connect to ourselves.
|
|
continue
|
|
}
|
|
host, port, err := net.SplitHostPort(peerAddr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
ip := net.ParseIP(host)
|
|
if ip == nil {
|
|
// Don't add textual addresses since memberlist only advertises
|
|
// dotted decimal or IPv6 addresses.
|
|
continue
|
|
}
|
|
portUint, err := strconv.ParseUint(port, 10, 16)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
pr := peer{
|
|
status: StatusFailed,
|
|
leaveTime: now,
|
|
Node: &memberlist.Node{
|
|
Addr: ip,
|
|
Port: uint16(portUint),
|
|
},
|
|
}
|
|
p.failedPeers = append(p.failedPeers, pr)
|
|
p.peers[peerAddr] = pr
|
|
}
|
|
}
|
|
|
|
type logWriter struct {
|
|
l log.Logger
|
|
}
|
|
|
|
func (l *logWriter) Write(b []byte) (int, error) {
|
|
return len(b), level.Debug(l.l).Log("memberlist", string(b))
|
|
}
|
|
|
|
func (p *Peer) register(reg prometheus.Registerer) {
|
|
clusterFailedPeers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "alertmanager_cluster_failed_peers",
|
|
Help: "Number indicating the current number of failed peers in the cluster.",
|
|
}, func() float64 {
|
|
p.peerLock.RLock()
|
|
defer p.peerLock.RUnlock()
|
|
|
|
return float64(len(p.failedPeers))
|
|
})
|
|
p.failedReconnectionsCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_reconnections_failed_total",
|
|
Help: "A counter of the number of failed cluster peer reconnection attempts.",
|
|
})
|
|
|
|
p.reconnectionsCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_reconnections_total",
|
|
Help: "A counter of the number of cluster peer reconnections.",
|
|
})
|
|
|
|
p.failedRefreshCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_refresh_join_failed_total",
|
|
Help: "A counter of the number of failed cluster peer joined attempts via refresh.",
|
|
})
|
|
p.refreshCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_refresh_join_total",
|
|
Help: "A counter of the number of cluster peer joined via refresh.",
|
|
})
|
|
|
|
p.peerLeaveCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_peers_left_total",
|
|
Help: "A counter of the number of peers that have left.",
|
|
})
|
|
p.peerUpdateCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_peers_update_total",
|
|
Help: "A counter of the number of peers that have updated metadata.",
|
|
})
|
|
p.peerJoinCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_cluster_peers_joined_total",
|
|
Help: "A counter of the number of peers that have joined.",
|
|
})
|
|
|
|
reg.MustRegister(clusterFailedPeers, p.failedReconnectionsCounter, p.reconnectionsCounter,
|
|
p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter, p.refreshCounter, p.failedRefreshCounter)
|
|
}
|
|
|
|
func (p *Peer) handleReconnectTimeout(d time.Duration, timeout time.Duration) {
|
|
tick := time.NewTicker(d)
|
|
defer tick.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-p.stopc:
|
|
return
|
|
case <-tick.C:
|
|
p.removeFailedPeers(timeout)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Peer) removeFailedPeers(timeout time.Duration) {
|
|
p.peerLock.Lock()
|
|
defer p.peerLock.Unlock()
|
|
|
|
now := time.Now()
|
|
|
|
keep := make([]peer, 0, len(p.failedPeers))
|
|
for _, pr := range p.failedPeers {
|
|
if pr.leaveTime.Add(timeout).After(now) {
|
|
keep = append(keep, pr)
|
|
} else {
|
|
level.Debug(p.logger).Log("msg", "failed peer has timed out", "peer", pr.Node, "addr", pr.Address())
|
|
delete(p.peers, pr.Name)
|
|
}
|
|
}
|
|
|
|
p.failedPeers = keep
|
|
}
|
|
|
|
func (p *Peer) handleReconnect(d time.Duration) {
|
|
tick := time.NewTicker(d)
|
|
defer tick.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-p.stopc:
|
|
return
|
|
case <-tick.C:
|
|
p.reconnect()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Peer) reconnect() {
|
|
p.peerLock.RLock()
|
|
failedPeers := p.failedPeers
|
|
p.peerLock.RUnlock()
|
|
|
|
logger := log.With(p.logger, "msg", "reconnect")
|
|
for _, pr := range failedPeers {
|
|
// No need to do book keeping on failedPeers here. If a
|
|
// reconnect is successful, they will be announced in
|
|
// peerJoin().
|
|
if _, err := p.mlist.Join([]string{pr.Address()}); err != nil {
|
|
p.failedReconnectionsCounter.Inc()
|
|
level.Debug(logger).Log("result", "failure", "peer", pr.Node, "addr", pr.Address())
|
|
} else {
|
|
p.reconnectionsCounter.Inc()
|
|
level.Debug(logger).Log("result", "success", "peer", pr.Node, "addr", pr.Address())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Peer) handleRefresh(d time.Duration) {
|
|
tick := time.NewTicker(d)
|
|
defer tick.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-p.stopc:
|
|
return
|
|
case <-tick.C:
|
|
p.refresh()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Peer) refresh() {
|
|
logger := log.With(p.logger, "msg", "refresh")
|
|
|
|
resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, net.Resolver{}, false)
|
|
if err != nil {
|
|
level.Debug(logger).Log("peers", p.knownPeers, "err", err)
|
|
return
|
|
}
|
|
|
|
members := p.mlist.Members()
|
|
for _, peer := range resolvedPeers {
|
|
var isPeerFound bool
|
|
for _, member := range members {
|
|
if member.Address() == peer {
|
|
isPeerFound = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !isPeerFound {
|
|
if _, err := p.mlist.Join([]string{peer}); err != nil {
|
|
p.failedRefreshCounter.Inc()
|
|
level.Warn(logger).Log("result", "failure", "addr", peer)
|
|
} else {
|
|
p.refreshCounter.Inc()
|
|
level.Debug(logger).Log("result", "success", "addr", peer)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Peer) peerJoin(n *memberlist.Node) {
|
|
p.peerLock.Lock()
|
|
defer p.peerLock.Unlock()
|
|
|
|
var oldStatus PeerStatus
|
|
pr, ok := p.peers[n.Address()]
|
|
if !ok {
|
|
oldStatus = StatusNone
|
|
pr = peer{
|
|
status: StatusAlive,
|
|
Node: n,
|
|
}
|
|
} else {
|
|
oldStatus = pr.status
|
|
pr.Node = n
|
|
pr.status = StatusAlive
|
|
pr.leaveTime = time.Time{}
|
|
}
|
|
|
|
p.peers[n.Address()] = pr
|
|
p.peerJoinCounter.Inc()
|
|
|
|
if oldStatus == StatusFailed {
|
|
level.Debug(p.logger).Log("msg", "peer rejoined", "peer", pr.Node)
|
|
p.failedPeers = removeOldPeer(p.failedPeers, pr.Address())
|
|
}
|
|
}
|
|
|
|
func (p *Peer) peerLeave(n *memberlist.Node) {
|
|
p.peerLock.Lock()
|
|
defer p.peerLock.Unlock()
|
|
|
|
pr, ok := p.peers[n.Address()]
|
|
if !ok {
|
|
// Why are we receiving a leave notification from a node that
|
|
// never joined?
|
|
return
|
|
}
|
|
|
|
pr.status = StatusFailed
|
|
pr.leaveTime = time.Now()
|
|
p.failedPeers = append(p.failedPeers, pr)
|
|
p.peers[n.Address()] = pr
|
|
|
|
p.peerLeaveCounter.Inc()
|
|
level.Debug(p.logger).Log("msg", "peer left", "peer", pr.Node)
|
|
}
|
|
|
|
func (p *Peer) peerUpdate(n *memberlist.Node) {
|
|
p.peerLock.Lock()
|
|
defer p.peerLock.Unlock()
|
|
|
|
pr, ok := p.peers[n.Address()]
|
|
if !ok {
|
|
// Why are we receiving an update from a node that never
|
|
// joined?
|
|
return
|
|
}
|
|
|
|
pr.Node = n
|
|
p.peers[n.Address()] = pr
|
|
|
|
p.peerUpdateCounter.Inc()
|
|
level.Debug(p.logger).Log("msg", "peer updated", "peer", pr.Node)
|
|
}
|
|
|
|
// AddState adds a new state that will be gossiped. It returns a channel to which
|
|
// broadcast messages for the state can be sent.
|
|
func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) *Channel {
|
|
p.states[key] = s
|
|
send := func(b []byte) {
|
|
p.delegate.bcast.QueueBroadcast(simpleBroadcast(b))
|
|
}
|
|
peers := func() []*memberlist.Node {
|
|
nodes := p.Peers()
|
|
for i, n := range nodes {
|
|
if n.Name == p.Self().Name {
|
|
nodes = append(nodes[:i], nodes[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
return nodes
|
|
}
|
|
sendOversize := func(n *memberlist.Node, b []byte) error {
|
|
return p.mlist.SendReliable(n, b)
|
|
}
|
|
return NewChannel(key, send, peers, sendOversize, p.logger, p.stopc, reg)
|
|
}
|
|
|
|
// Leave the cluster, waiting up to timeout.
|
|
func (p *Peer) Leave(timeout time.Duration) error {
|
|
close(p.stopc)
|
|
level.Debug(p.logger).Log("msg", "leaving cluster")
|
|
return p.mlist.Leave(timeout)
|
|
}
|
|
|
|
// Name returns the unique ID of this peer in the cluster.
|
|
func (p *Peer) Name() string {
|
|
return p.mlist.LocalNode().Name
|
|
}
|
|
|
|
// ClusterSize returns the current number of alive members in the cluster.
|
|
func (p *Peer) ClusterSize() int {
|
|
return p.mlist.NumMembers()
|
|
}
|
|
|
|
// Return true when router has settled.
|
|
func (p *Peer) Ready() bool {
|
|
select {
|
|
case <-p.readyc:
|
|
return true
|
|
default:
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Wait until Settle() has finished.
|
|
func (p *Peer) WaitReady() {
|
|
<-p.readyc
|
|
}
|
|
|
|
// Return a status string representing the peer state.
|
|
func (p *Peer) Status() string {
|
|
if p.Ready() {
|
|
return "ready"
|
|
} else {
|
|
return "settling"
|
|
}
|
|
}
|
|
|
|
// Info returns a JSON-serializable dump of cluster state.
|
|
// Useful for debug.
|
|
func (p *Peer) Info() map[string]interface{} {
|
|
p.mtx.RLock()
|
|
defer p.mtx.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"self": p.mlist.LocalNode(),
|
|
"members": p.mlist.Members(),
|
|
}
|
|
}
|
|
|
|
// Self returns the node information about the peer itself.
|
|
func (p *Peer) Self() *memberlist.Node {
|
|
return p.mlist.LocalNode()
|
|
}
|
|
|
|
// Peers returns the peers in the cluster.
|
|
func (p *Peer) Peers() []*memberlist.Node {
|
|
return p.mlist.Members()
|
|
}
|
|
|
|
// Position returns the position of the peer in the cluster.
|
|
func (p *Peer) Position() int {
|
|
all := p.Peers()
|
|
sort.Slice(all, func(i, j int) bool {
|
|
return all[i].Name < all[j].Name
|
|
})
|
|
|
|
k := 0
|
|
for _, n := range all {
|
|
if n.Name == p.Self().Name {
|
|
break
|
|
}
|
|
k++
|
|
}
|
|
return k
|
|
}
|
|
|
|
// Settle waits until the mesh is ready (and sets the appropriate internal state when it is).
|
|
// The idea is that we don't want to start "working" before we get a chance to know most of the alerts and/or silences.
|
|
// Inspired from https://github.com/apache/cassandra/blob/7a40abb6a5108688fb1b10c375bb751cbb782ea4/src/java/org/apache/cassandra/gms/Gossiper.java
|
|
// This is clearly not perfect or strictly correct but should prevent the alertmanager to send notification before it is obviously not ready.
|
|
// This is especially important for those that do not have persistent storage.
|
|
func (p *Peer) Settle(ctx context.Context, interval time.Duration) {
|
|
const NumOkayRequired = 3
|
|
level.Info(p.logger).Log("msg", "Waiting for gossip to settle...", "interval", interval)
|
|
start := time.Now()
|
|
nPeers := 0
|
|
nOkay := 0
|
|
totalPolls := 0
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
elapsed := time.Since(start)
|
|
level.Info(p.logger).Log("msg", "gossip not settled but continuing anyway", "polls", totalPolls, "elapsed", elapsed)
|
|
close(p.readyc)
|
|
return
|
|
case <-time.After(interval):
|
|
}
|
|
elapsed := time.Since(start)
|
|
n := len(p.Peers())
|
|
if nOkay >= NumOkayRequired {
|
|
level.Info(p.logger).Log("msg", "gossip settled; proceeding", "elapsed", elapsed)
|
|
break
|
|
}
|
|
if n == nPeers {
|
|
nOkay++
|
|
level.Debug(p.logger).Log("msg", "gossip looks settled", "elapsed", elapsed)
|
|
} else {
|
|
nOkay = 0
|
|
level.Info(p.logger).Log("msg", "gossip not settled", "polls", totalPolls, "before", nPeers, "now", n, "elapsed", elapsed)
|
|
}
|
|
nPeers = n
|
|
totalPolls++
|
|
}
|
|
close(p.readyc)
|
|
}
|
|
|
|
// State is a piece of state that can be serialized and merged with other
|
|
// serialized state.
|
|
type State interface {
|
|
// MarshalBinary serializes the underlying state.
|
|
MarshalBinary() ([]byte, error)
|
|
|
|
// Merge merges serialized state into the underlying state.
|
|
Merge(b []byte) error
|
|
}
|
|
|
|
// We use a simple broadcast implementation in which items are never invalidated by others.
|
|
type simpleBroadcast []byte
|
|
|
|
func (b simpleBroadcast) Message() []byte { return []byte(b) }
|
|
func (b simpleBroadcast) Invalidates(memberlist.Broadcast) bool { return false }
|
|
func (b simpleBroadcast) Finished() {}
|
|
|
|
func resolvePeers(ctx context.Context, peers []string, myAddress string, res net.Resolver, waitIfEmpty bool) ([]string, error) {
|
|
var resolvedPeers []string
|
|
|
|
for _, peer := range peers {
|
|
host, port, err := net.SplitHostPort(peer)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "split host/port for peer %s", peer)
|
|
}
|
|
|
|
retryCtx, cancel := context.WithCancel(ctx)
|
|
|
|
ips, err := res.LookupIPAddr(ctx, host)
|
|
if err != nil {
|
|
// Assume direct address.
|
|
resolvedPeers = append(resolvedPeers, peer)
|
|
continue
|
|
}
|
|
|
|
if len(ips) == 0 {
|
|
var lookupErrSpotted bool
|
|
|
|
err := retry(2*time.Second, retryCtx.Done(), func() error {
|
|
if lookupErrSpotted {
|
|
// We need to invoke cancel in next run of retry when lookupErrSpotted to preserve LookupIPAddr error.
|
|
cancel()
|
|
}
|
|
|
|
ips, err = res.LookupIPAddr(retryCtx, host)
|
|
if err != nil {
|
|
lookupErrSpotted = true
|
|
return errors.Wrapf(err, "IP Addr lookup for peer %s", peer)
|
|
}
|
|
|
|
ips = removeMyAddr(ips, port, myAddress)
|
|
if len(ips) == 0 {
|
|
if !waitIfEmpty {
|
|
return nil
|
|
}
|
|
return errors.New("empty IPAddr result. Retrying")
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
for _, ip := range ips {
|
|
resolvedPeers = append(resolvedPeers, net.JoinHostPort(ip.String(), port))
|
|
}
|
|
}
|
|
|
|
return resolvedPeers, nil
|
|
}
|
|
|
|
func removeMyAddr(ips []net.IPAddr, targetPort string, myAddr string) []net.IPAddr {
|
|
var result []net.IPAddr
|
|
|
|
for _, ip := range ips {
|
|
if net.JoinHostPort(ip.String(), targetPort) == myAddr {
|
|
continue
|
|
}
|
|
result = append(result, ip)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func hasNonlocal(clusterPeers []string) bool {
|
|
for _, peer := range clusterPeers {
|
|
if host, _, err := net.SplitHostPort(peer); err == nil {
|
|
peer = host
|
|
}
|
|
if ip := net.ParseIP(peer); ip != nil && !ip.IsLoopback() {
|
|
return true
|
|
} else if ip == nil && strings.ToLower(peer) != "localhost" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isUnroutable(addr string) bool {
|
|
if host, _, err := net.SplitHostPort(addr); err == nil {
|
|
addr = host
|
|
}
|
|
if ip := net.ParseIP(addr); ip != nil && (ip.IsUnspecified() || ip.IsLoopback()) {
|
|
return true // typically 0.0.0.0 or localhost
|
|
} else if ip == nil && strings.ToLower(addr) == "localhost" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isAny(addr string) bool {
|
|
if host, _, err := net.SplitHostPort(addr); err == nil {
|
|
addr = host
|
|
}
|
|
return addr == "" || net.ParseIP(addr).IsUnspecified()
|
|
}
|
|
|
|
// retry executes f every interval seconds until timeout or no error is returned from f.
|
|
func retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
|
|
tick := time.NewTicker(interval)
|
|
defer tick.Stop()
|
|
|
|
var err error
|
|
for {
|
|
if err = f(); err == nil {
|
|
return nil
|
|
}
|
|
select {
|
|
case <-stopc:
|
|
return err
|
|
case <-tick.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
func removeOldPeer(old []peer, addr string) []peer {
|
|
new := make([]peer, 0, len(old))
|
|
for _, p := range old {
|
|
if p.Address() != addr {
|
|
new = append(new, p)
|
|
}
|
|
}
|
|
|
|
return new
|
|
}
|