alertmanager/vendor/github.com/weaveworks/mesh/router.go

309 lines
8.9 KiB
Go

package mesh
import (
"bytes"
"encoding/gob"
"fmt"
"math"
"net"
"sync"
"time"
)
var (
// Port is the port used for all mesh communication.
Port = 6783
// ChannelSize is the buffer size used by so-called actor goroutines
// throughout mesh.
ChannelSize = 16
)
const (
tcpHeartbeat = 30 * time.Second
gossipInterval = 30 * time.Second
maxDuration = time.Duration(math.MaxInt64)
acceptMaxTokens = 100
acceptTokenDelay = 100 * time.Millisecond // [2]
)
// Config defines dimensions of configuration for the router.
// TODO(pb): provide usable defaults in NewRouter
type Config struct {
Host string
Port int
ProtocolMinVersion byte
Password []byte
ConnLimit int
PeerDiscovery bool
TrustedSubnets []*net.IPNet
}
// Router manages communication between this peer and the rest of the mesh.
// Router implements Gossiper.
type Router struct {
Config
Overlay Overlay
Ourself *localPeer
Peers *Peers
Routes *routes
ConnectionMaker *connectionMaker
gossipLock sync.RWMutex
gossipChannels gossipChannels
topologyGossip Gossip
acceptLimiter *tokenBucket
logger Logger
}
// NewRouter returns a new router. It must be started.
func NewRouter(config Config, name PeerName, nickName string, overlay Overlay, logger Logger) *Router {
router := &Router{Config: config, gossipChannels: make(gossipChannels)}
if overlay == nil {
overlay = NullOverlay{}
}
router.Overlay = overlay
router.Ourself = newLocalPeer(name, nickName, router)
router.Peers = newPeers(router.Ourself)
router.Peers.OnGC(func(peer *Peer) {
logger.Printf("Removed unreachable peer %s", peer)
})
router.Routes = newRoutes(router.Ourself, router.Peers)
router.ConnectionMaker = newConnectionMaker(router.Ourself, router.Peers, net.JoinHostPort(router.Host, "0"), router.Port, router.PeerDiscovery, logger)
router.topologyGossip = router.NewGossip("topology", router)
router.acceptLimiter = newTokenBucket(acceptMaxTokens, acceptTokenDelay)
router.logger = logger
return router
}
// Start listening for TCP connections. This is separate from NewRouter so
// that gossipers can register before we start forming connections.
func (router *Router) Start() {
router.listenTCP()
}
// Stop shuts down the router. In theory.
func (router *Router) Stop() error {
// TODO: perform graceful shutdown...
return nil
}
func (router *Router) usingPassword() bool {
return router.Password != nil
}
func (router *Router) listenTCP() {
localAddr, err := net.ResolveTCPAddr("tcp4", net.JoinHostPort(router.Host, fmt.Sprint(router.Port)))
if err != nil {
panic(err)
}
ln, err := net.ListenTCP("tcp4", localAddr)
if err != nil {
panic(err)
}
go func() {
defer ln.Close()
for {
tcpConn, err := ln.AcceptTCP()
if err != nil {
router.logger.Printf("%v", err)
continue
}
router.acceptTCP(tcpConn)
router.acceptLimiter.wait()
}
}()
}
func (router *Router) acceptTCP(tcpConn *net.TCPConn) {
remoteAddrStr := tcpConn.RemoteAddr().String()
router.logger.Printf("->[%s] connection accepted", remoteAddrStr)
connRemote := newRemoteConnection(router.Ourself.Peer, nil, remoteAddrStr, false, false)
startLocalConnection(connRemote, tcpConn, router, true, router.logger)
}
// NewGossip returns a usable GossipChannel from the router.
//
// TODO(pb): rename?
func (router *Router) NewGossip(channelName string, g Gossiper) Gossip {
channel := newGossipChannel(channelName, router.Ourself, router.Routes, g, router.logger)
router.gossipLock.Lock()
defer router.gossipLock.Unlock()
if _, found := router.gossipChannels[channelName]; found {
panic(fmt.Sprintf("[gossip] duplicate channel %s", channelName))
}
router.gossipChannels[channelName] = channel
return channel
}
func (router *Router) gossipChannel(channelName string) *gossipChannel {
router.gossipLock.RLock()
channel, found := router.gossipChannels[channelName]
router.gossipLock.RUnlock()
if found {
return channel
}
router.gossipLock.Lock()
defer router.gossipLock.Unlock()
if channel, found = router.gossipChannels[channelName]; found {
return channel
}
channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{}, router.logger)
channel.logf("created surrogate channel")
router.gossipChannels[channelName] = channel
return channel
}
func (router *Router) gossipChannelSet() map[*gossipChannel]struct{} {
channels := make(map[*gossipChannel]struct{})
router.gossipLock.RLock()
defer router.gossipLock.RUnlock()
for _, channel := range router.gossipChannels {
channels[channel] = struct{}{}
}
return channels
}
func (router *Router) handleGossip(tag protocolTag, payload []byte) error {
decoder := gob.NewDecoder(bytes.NewReader(payload))
var channelName string
if err := decoder.Decode(&channelName); err != nil {
return err
}
channel := router.gossipChannel(channelName)
var srcName PeerName
if err := decoder.Decode(&srcName); err != nil {
return err
}
switch tag {
case ProtocolGossipUnicast:
return channel.deliverUnicast(srcName, payload, decoder)
case ProtocolGossipBroadcast:
return channel.deliverBroadcast(srcName, payload, decoder)
case ProtocolGossip:
return channel.deliver(srcName, payload, decoder)
}
return nil
}
// Relay all pending gossip data for each channel via random neighbours.
func (router *Router) sendAllGossip() {
for channel := range router.gossipChannelSet() {
if gossip := channel.gossiper.Gossip(); gossip != nil {
channel.Send(gossip)
}
}
}
// Relay all pending gossip data for each channel via conn.
func (router *Router) sendAllGossipDown(conn Connection) {
for channel := range router.gossipChannelSet() {
if gossip := channel.gossiper.Gossip(); gossip != nil {
channel.SendDown(conn, gossip)
}
}
}
// for testing
func (router *Router) sendPendingGossip() bool {
sentSomething := false
for conn := range router.Ourself.getConnections() {
sentSomething = conn.(gossipConnection).gossipSenders().Flush() || sentSomething
}
return sentSomething
}
// BroadcastTopologyUpdate is invoked whenever there is a change to the mesh
// topology, and broadcasts the new set of peers to the mesh.
func (router *Router) broadcastTopologyUpdate(update []*Peer) {
names := make(peerNameSet)
for _, p := range update {
names[p.Name] = struct{}{}
}
router.topologyGossip.GossipBroadcast(&topologyGossipData{peers: router.Peers, update: names})
}
// OnGossipUnicast implements Gossiper, but always returns an error, as a
// router should only receive gossip broadcasts of TopologyGossipData.
func (router *Router) OnGossipUnicast(sender PeerName, msg []byte) error {
return fmt.Errorf("unexpected topology gossip unicast: %v", msg)
}
// OnGossipBroadcast receives broadcasts of TopologyGossipData.
// It returns the received update unchanged.
func (router *Router) OnGossipBroadcast(_ PeerName, update []byte) (GossipData, error) {
origUpdate, _, err := router.applyTopologyUpdate(update)
if err != nil || len(origUpdate) == 0 {
return nil, err
}
return &topologyGossipData{peers: router.Peers, update: origUpdate}, nil
}
// Gossip yields the current topology as GossipData.
func (router *Router) Gossip() GossipData {
return &topologyGossipData{peers: router.Peers, update: router.Peers.names()}
}
// OnGossip receives broadcasts of TopologyGossipData.
// It returns an "improved" version of the received update.
// See peers.ApplyUpdate.
func (router *Router) OnGossip(update []byte) (GossipData, error) {
_, newUpdate, err := router.applyTopologyUpdate(update)
if err != nil || len(newUpdate) == 0 {
return nil, err
}
return &topologyGossipData{peers: router.Peers, update: newUpdate}, nil
}
func (router *Router) applyTopologyUpdate(update []byte) (peerNameSet, peerNameSet, error) {
origUpdate, newUpdate, err := router.Peers.applyUpdate(update)
if err != nil {
return nil, nil, err
}
if len(newUpdate) > 0 {
router.ConnectionMaker.refresh()
router.Routes.recalculate()
}
return origUpdate, newUpdate, nil
}
func (router *Router) trusts(remote *remoteConnection) bool {
if tcpAddr, err := net.ResolveTCPAddr("tcp4", remote.remoteTCPAddr); err == nil {
for _, trustedSubnet := range router.TrustedSubnets {
if trustedSubnet.Contains(tcpAddr.IP) {
return true
}
}
} else {
// Should not happen as remoteTCPAddr was obtained from TCPConn
router.logger.Printf("Unable to parse remote TCP addr: %s", err)
}
return false
}
// The set of peers in the mesh network.
// Gossiped just like anything else.
type topologyGossipData struct {
peers *Peers
update peerNameSet
}
// Merge implements GossipData.
func (d *topologyGossipData) Merge(other GossipData) GossipData {
names := make(peerNameSet)
for name := range d.update {
names[name] = struct{}{}
}
for name := range other.(*topologyGossipData).update {
names[name] = struct{}{}
}
return &topologyGossipData{peers: d.peers, update: names}
}
// Encode implements GossipData.
func (d *topologyGossipData) Encode() [][]byte {
return [][]byte{d.peers.encodePeers(d.update)}
}