297 lines
7.8 KiB
Go
297 lines
7.8 KiB
Go
package mesh
|
|
|
|
import (
|
|
"encoding/gob"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// localPeer is the only "active" peer in the mesh. It extends Peer with
|
|
// additional behaviors, mostly to retrieve and manage connection state.
|
|
type localPeer struct {
|
|
sync.RWMutex
|
|
*Peer
|
|
router *Router
|
|
actionChan chan<- localPeerAction
|
|
}
|
|
|
|
// The actor closure used by localPeer.
|
|
type localPeerAction func()
|
|
|
|
// newLocalPeer returns a usable LocalPeer.
|
|
func newLocalPeer(name PeerName, nickName string, router *Router) *localPeer {
|
|
actionChan := make(chan localPeerAction, ChannelSize)
|
|
peer := &localPeer{
|
|
Peer: newPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()),
|
|
router: router,
|
|
actionChan: actionChan,
|
|
}
|
|
go peer.actorLoop(actionChan)
|
|
return peer
|
|
}
|
|
|
|
// Connections returns all the connections that the local peer is aware of.
|
|
func (peer *localPeer) getConnections() connectionSet {
|
|
connections := make(connectionSet)
|
|
peer.RLock()
|
|
defer peer.RUnlock()
|
|
for _, conn := range peer.connections {
|
|
connections[conn] = struct{}{}
|
|
}
|
|
return connections
|
|
}
|
|
|
|
// ConnectionTo returns the connection to the named peer, if any.
|
|
//
|
|
// TODO(pb): Weave Net invokes router.Ourself.ConnectionTo;
|
|
// it may be better to provide that on Router directly.
|
|
func (peer *localPeer) ConnectionTo(name PeerName) (Connection, bool) {
|
|
peer.RLock()
|
|
defer peer.RUnlock()
|
|
conn, found := peer.connections[name]
|
|
return conn, found // yes, you really can't inline that. FFS.
|
|
}
|
|
|
|
// ConnectionsTo returns all known connections to the named peers.
|
|
//
|
|
// TODO(pb): Weave Net invokes router.Ourself.ConnectionsTo;
|
|
// it may be better to provide that on Router directly.
|
|
func (peer *localPeer) ConnectionsTo(names []PeerName) []Connection {
|
|
if len(names) == 0 {
|
|
return nil
|
|
}
|
|
conns := make([]Connection, 0, len(names))
|
|
peer.RLock()
|
|
defer peer.RUnlock()
|
|
for _, name := range names {
|
|
conn, found := peer.connections[name]
|
|
// Again, !found could just be due to a race.
|
|
if found {
|
|
conns = append(conns, conn)
|
|
}
|
|
}
|
|
return conns
|
|
}
|
|
|
|
// createConnection creates a new connection, originating from
|
|
// localAddr, to peerAddr. If acceptNewPeer is false, peerAddr must
|
|
// already be a member of the mesh.
|
|
func (peer *localPeer) createConnection(localAddr string, peerAddr string, acceptNewPeer bool, logger Logger) error {
|
|
if err := peer.checkConnectionLimit(); err != nil {
|
|
return err
|
|
}
|
|
localTCPAddr, err := net.ResolveTCPAddr("tcp4", localAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
remoteTCPAddr, err := net.ResolveTCPAddr("tcp4", peerAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tcpConn, err := net.DialTCP("tcp4", localTCPAddr, remoteTCPAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
connRemote := newRemoteConnection(peer.Peer, nil, peerAddr, true, false)
|
|
startLocalConnection(connRemote, tcpConn, peer.router, acceptNewPeer, logger)
|
|
return nil
|
|
}
|
|
|
|
// ACTOR client API
|
|
|
|
// Synchronous.
|
|
func (peer *localPeer) doAddConnection(conn ourConnection, isRestartedPeer bool) error {
|
|
resultChan := make(chan error)
|
|
peer.actionChan <- func() {
|
|
resultChan <- peer.handleAddConnection(conn, isRestartedPeer)
|
|
}
|
|
return <-resultChan
|
|
}
|
|
|
|
// Asynchronous.
|
|
func (peer *localPeer) doConnectionEstablished(conn ourConnection) {
|
|
peer.actionChan <- func() {
|
|
peer.handleConnectionEstablished(conn)
|
|
}
|
|
}
|
|
|
|
// Synchronous.
|
|
func (peer *localPeer) doDeleteConnection(conn ourConnection) {
|
|
resultChan := make(chan interface{})
|
|
peer.actionChan <- func() {
|
|
peer.handleDeleteConnection(conn)
|
|
resultChan <- nil
|
|
}
|
|
<-resultChan
|
|
}
|
|
|
|
func (peer *localPeer) encode(enc *gob.Encoder) {
|
|
peer.RLock()
|
|
defer peer.RUnlock()
|
|
peer.Peer.encode(enc)
|
|
}
|
|
|
|
// ACTOR server
|
|
|
|
func (peer *localPeer) actorLoop(actionChan <-chan localPeerAction) {
|
|
gossipTimer := time.Tick(gossipInterval)
|
|
for {
|
|
select {
|
|
case action := <-actionChan:
|
|
action()
|
|
case <-gossipTimer:
|
|
peer.router.sendAllGossip()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (peer *localPeer) handleAddConnection(conn ourConnection, isRestartedPeer bool) error {
|
|
if peer.Peer != conn.getLocal() {
|
|
panic("Attempt made to add connection to peer where peer is not the source of connection")
|
|
}
|
|
if conn.Remote() == nil {
|
|
panic("Attempt made to add connection to peer with unknown remote peer")
|
|
}
|
|
toName := conn.Remote().Name
|
|
dupErr := fmt.Errorf("Multiple connections to %s added to %s", conn.Remote(), peer.String())
|
|
// deliberately non symmetrical
|
|
if dupConn, found := peer.connections[toName]; found {
|
|
if dupConn == conn {
|
|
return nil
|
|
}
|
|
dupOurConn := dupConn.(ourConnection)
|
|
switch conn.breakTie(dupOurConn) {
|
|
case tieBreakWon:
|
|
dupOurConn.shutdown(dupErr)
|
|
peer.handleDeleteConnection(dupOurConn)
|
|
case tieBreakLost:
|
|
return dupErr
|
|
case tieBreakTied:
|
|
// oh good grief. Sod it, just kill both of them.
|
|
dupOurConn.shutdown(dupErr)
|
|
peer.handleDeleteConnection(dupOurConn)
|
|
return dupErr
|
|
}
|
|
}
|
|
if err := peer.checkConnectionLimit(); err != nil {
|
|
return err
|
|
}
|
|
_, isConnectedPeer := peer.router.Routes.Unicast(toName)
|
|
peer.addConnection(conn)
|
|
switch {
|
|
case isRestartedPeer:
|
|
conn.logf("connection added (restarted peer)")
|
|
peer.router.sendAllGossipDown(conn)
|
|
case isConnectedPeer:
|
|
conn.logf("connection added")
|
|
default:
|
|
conn.logf("connection added (new peer)")
|
|
peer.router.sendAllGossipDown(conn)
|
|
}
|
|
|
|
peer.router.Routes.recalculate()
|
|
peer.broadcastPeerUpdate(conn.Remote())
|
|
|
|
return nil
|
|
}
|
|
|
|
func (peer *localPeer) handleConnectionEstablished(conn ourConnection) {
|
|
if peer.Peer != conn.getLocal() {
|
|
panic("Peer informed of active connection where peer is not the source of connection")
|
|
}
|
|
if dupConn, found := peer.connections[conn.Remote().Name]; !found || conn != dupConn {
|
|
conn.shutdown(fmt.Errorf("Cannot set unknown connection active"))
|
|
return
|
|
}
|
|
peer.connectionEstablished(conn)
|
|
conn.logf("connection fully established")
|
|
|
|
peer.router.Routes.recalculate()
|
|
peer.broadcastPeerUpdate()
|
|
}
|
|
|
|
func (peer *localPeer) handleDeleteConnection(conn ourConnection) {
|
|
if peer.Peer != conn.getLocal() {
|
|
panic("Attempt made to delete connection from peer where peer is not the source of connection")
|
|
}
|
|
if conn.Remote() == nil {
|
|
panic("Attempt made to delete connection to peer with unknown remote peer")
|
|
}
|
|
toName := conn.Remote().Name
|
|
if connFound, found := peer.connections[toName]; !found || connFound != conn {
|
|
return
|
|
}
|
|
peer.deleteConnection(conn)
|
|
conn.logf("connection deleted")
|
|
// Must do garbage collection first to ensure we don't send out an
|
|
// update with unreachable peers (can cause looping)
|
|
peer.router.Peers.GarbageCollect()
|
|
peer.router.Routes.recalculate()
|
|
peer.broadcastPeerUpdate()
|
|
}
|
|
|
|
// helpers
|
|
|
|
func (peer *localPeer) broadcastPeerUpdate(peers ...*Peer) {
|
|
// Some tests run without a router. This should be fixed so
|
|
// that the relevant part of Router can be easily run in the
|
|
// context of a test, but that will involve significant
|
|
// reworking of tests.
|
|
if peer.router != nil {
|
|
peer.router.broadcastTopologyUpdate(append(peers, peer.Peer))
|
|
}
|
|
}
|
|
|
|
func (peer *localPeer) checkConnectionLimit() error {
|
|
limit := peer.router.ConnLimit
|
|
if 0 != limit && peer.connectionCount() >= limit {
|
|
return fmt.Errorf("Connection limit reached (%v)", limit)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (peer *localPeer) addConnection(conn Connection) {
|
|
peer.Lock()
|
|
defer peer.Unlock()
|
|
peer.connections[conn.Remote().Name] = conn
|
|
peer.Version++
|
|
}
|
|
|
|
func (peer *localPeer) deleteConnection(conn Connection) {
|
|
peer.Lock()
|
|
defer peer.Unlock()
|
|
delete(peer.connections, conn.Remote().Name)
|
|
peer.Version++
|
|
}
|
|
|
|
func (peer *localPeer) connectionEstablished(conn Connection) {
|
|
peer.Lock()
|
|
defer peer.Unlock()
|
|
peer.Version++
|
|
}
|
|
|
|
func (peer *localPeer) connectionCount() int {
|
|
peer.RLock()
|
|
defer peer.RUnlock()
|
|
return len(peer.connections)
|
|
}
|
|
|
|
func (peer *localPeer) setShortID(shortID PeerShortID) {
|
|
peer.Lock()
|
|
defer peer.Unlock()
|
|
peer.ShortID = shortID
|
|
peer.Version++
|
|
}
|
|
|
|
func (peer *localPeer) setVersionBeyond(version uint64) bool {
|
|
peer.Lock()
|
|
defer peer.Unlock()
|
|
if version >= peer.Version {
|
|
peer.Version = version + 1
|
|
return true
|
|
}
|
|
return false
|
|
}
|