// Copyright 2020 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Forked from https://github.com/mxinden/memberlist-tls-transport. // Implements Transport interface so that all gossip communications occur via TLS over TCP. package cluster import ( "context" "crypto/tls" "fmt" "net" "strings" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/hashicorp/go-sockaddr" "github.com/hashicorp/memberlist" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" common "github.com/prometheus/common/config" "github.com/prometheus/exporter-toolkit/web" ) const ( metricNamespace = "alertmanager" metricSubsystem = "tls_transport" network = "tcp" ) // TLSTransport is a Transport implementation that uses TLS over TCP for both // packet and stream operations. type TLSTransport struct { ctx context.Context cancel context.CancelFunc logger log.Logger bindAddr string bindPort int done chan struct{} listener net.Listener packetCh chan *memberlist.Packet streamCh chan net.Conn connPool *connectionPool tlsServerCfg *tls.Config tlsClientCfg *tls.Config packetsSent prometheus.Counter packetsRcvd prometheus.Counter streamsSent prometheus.Counter streamsRcvd prometheus.Counter readErrs prometheus.Counter writeErrs *prometheus.CounterVec } // NewTLSTransport returns a TLS transport with the given configuration. // On successful initialization, a tls listener will be created and listening. // A valid bindAddr is required. If bindPort == 0, the system will assign // a free port automatically. func NewTLSTransport( ctx context.Context, logger log.Logger, reg prometheus.Registerer, bindAddr string, bindPort int, cfg *TLSTransportConfig, ) (*TLSTransport, error) { if cfg == nil { return nil, errors.New("must specify TLSTransportConfig") } tlsServerCfg, err := web.ConfigToTLSConfig(cfg.TLSServerConfig) if err != nil { return nil, errors.Wrap(err, "invalid TLS server config") } tlsClientCfg, err := common.NewTLSConfig(cfg.TLSClientConfig) if err != nil { return nil, errors.Wrap(err, "invalid TLS client config") } ip := net.ParseIP(bindAddr) if ip == nil { return nil, fmt.Errorf("invalid bind address \"%s\"", bindAddr) } addr := &net.TCPAddr{IP: ip, Port: bindPort} listener, err := tls.Listen(network, addr.String(), tlsServerCfg) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to start TLS listener on %q port %d", bindAddr, bindPort)) } connPool, err := newConnectionPool(tlsClientCfg) if err != nil { return nil, errors.Wrap(err, "failed to initialize tls transport connection pool") } ctx, cancel := context.WithCancel(ctx) t := &TLSTransport{ ctx: ctx, cancel: cancel, logger: logger, bindAddr: bindAddr, bindPort: bindPort, done: make(chan struct{}), listener: listener, packetCh: make(chan *memberlist.Packet), streamCh: make(chan net.Conn), connPool: connPool, tlsServerCfg: tlsServerCfg, tlsClientCfg: tlsClientCfg, } t.registerMetrics(reg) go func() { t.listen() close(t.done) }() return t, nil } // FinalAdvertiseAddr is given the user's configured values (which // might be empty) and returns the desired IP and port to advertise to // the rest of the cluster. func (t *TLSTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) { var advertiseAddr net.IP var advertisePort int if ip != "" { advertiseAddr = net.ParseIP(ip) if advertiseAddr == nil { return nil, 0, fmt.Errorf("failed to parse advertise address %q", ip) } if ip4 := advertiseAddr.To4(); ip4 != nil { advertiseAddr = ip4 } advertisePort = port } else { if t.bindAddr == "0.0.0.0" { // Otherwise, if we're not bound to a specific IP, let's // use a suitable private IP address. var err error ip, err = sockaddr.GetPrivateIP() if err != nil { return nil, 0, fmt.Errorf("failed to get interface addresses: %v", err) } if ip == "" { return nil, 0, fmt.Errorf("no private IP address found, and explicit IP not provided") } advertiseAddr = net.ParseIP(ip) if advertiseAddr == nil { return nil, 0, fmt.Errorf("failed to parse advertise address: %q", ip) } } else { advertiseAddr = t.listener.Addr().(*net.TCPAddr).IP } advertisePort = t.GetAutoBindPort() } return advertiseAddr, advertisePort, nil } // PacketCh returns a channel that can be read to receive incoming // packets from other peers. func (t *TLSTransport) PacketCh() <-chan *memberlist.Packet { return t.packetCh } // StreamCh returns a channel that can be read to handle incoming stream // connections from other peers. func (t *TLSTransport) StreamCh() <-chan net.Conn { return t.streamCh } // Shutdown is called when memberlist is shutting down; this gives the // TLS Transport a chance to clean up the listener and other goroutines. func (t *TLSTransport) Shutdown() error { level.Debug(t.logger).Log("msg", "shutting down tls transport") t.cancel() err := t.listener.Close() t.connPool.shutdown() <-t.done return err } // WriteTo is a packet-oriented interface that borrows a connection // from the pool, and writes to it. It also returns a timestamp of when // the packet was written. func (t *TLSTransport) WriteTo(b []byte, addr string) (time.Time, error) { conn, err := t.connPool.borrowConnection(addr, DefaultTcpTimeout) if err != nil { t.writeErrs.WithLabelValues("packet").Inc() return time.Now(), errors.Wrap(err, "failed to dial") } fromAddr := t.listener.Addr().String() err = conn.writePacket(fromAddr, b) if err != nil { t.writeErrs.WithLabelValues("packet").Inc() return time.Now(), errors.Wrap(err, "failed to write packet") } t.packetsSent.Add(float64(len(b))) return time.Now(), nil } // DialTimeout is used to create a connection that allows memberlist // to perform two-way communications with a peer. func (t *TLSTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) { conn, err := dialTLSConn(addr, timeout, t.tlsClientCfg) if err != nil { t.writeErrs.WithLabelValues("stream").Inc() return nil, errors.Wrap(err, "failed to dial") } err = conn.writeStream() netConn := conn.getRawConn() if err != nil { t.writeErrs.WithLabelValues("stream").Inc() return netConn, errors.Wrap(err, "failed to create stream connection") } t.streamsSent.Inc() return netConn, nil } // GetAutoBindPort returns the bind port that was automatically given by the system // if a bindPort of 0 was specified during instantiation. func (t *TLSTransport) GetAutoBindPort() int { return t.listener.Addr().(*net.TCPAddr).Port } // listen starts up multiple handlers accepting concurrent connections. func (t *TLSTransport) listen() { for { select { case <-t.ctx.Done(): return default: conn, err := t.listener.Accept() if err != nil { // The error "use of closed network connection" is returned when the listener is closed. // It is not exported in a more reasonable way. See https://github.com/golang/go/issues/4373. if strings.Contains(err.Error(), "use of closed network connection") { return } t.readErrs.Inc() level.Debug(t.logger).Log("msg", "error accepting connection", "err", err) } else { go t.handle(conn) } } } } func (t *TLSTransport) handle(conn net.Conn) { for { packet, err := rcvTLSConn(conn).read() if err != nil { level.Debug(t.logger).Log("msg", "error reading from connection", "err", err) t.readErrs.Inc() return } select { case <-t.ctx.Done(): return default: if packet != nil { n := len(packet.Buf) t.packetCh <- packet t.packetsRcvd.Add(float64(n)) } else { t.streamCh <- conn t.streamsRcvd.Inc() return } } } } func (t *TLSTransport) registerMetrics(reg prometheus.Registerer) { t.packetsSent = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: metricNamespace, Subsystem: metricSubsystem, Name: "packet_bytes_sent_total", Help: "The number of packet bytes sent to outgoing connections (excluding internal metadata).", }, ) t.packetsRcvd = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: metricNamespace, Subsystem: metricSubsystem, Name: "packet_bytes_received_total", Help: "The number of packet bytes received from incoming connections (excluding internal metadata).", }, ) t.streamsSent = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: metricNamespace, Subsystem: metricSubsystem, Name: "stream_connections_sent_total", Help: "The number of stream connections sent.", }, ) t.streamsRcvd = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: metricNamespace, Subsystem: metricSubsystem, Name: "stream_connections_received_total", Help: "The number of stream connections received.", }, ) t.readErrs = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: metricNamespace, Subsystem: metricSubsystem, Name: "read_errors_total", Help: "The number of errors encountered while reading from incoming connections.", }, ) t.writeErrs = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: metricNamespace, Subsystem: metricSubsystem, Name: "write_errors_total", Help: "The number of errors encountered while writing to outgoing connections.", }, []string{"connection_type"}, ) if reg != nil { reg.MustRegister(t.packetsSent) reg.MustRegister(t.packetsRcvd) reg.MustRegister(t.streamsSent) reg.MustRegister(t.streamsRcvd) reg.MustRegister(t.readErrs) reg.MustRegister(t.writeErrs) } }