diff --git a/Makefile b/Makefile index 963bd19a..30d26156 100644 --- a/Makefile +++ b/Makefile @@ -75,11 +75,11 @@ paths: # readPass: tast # proxied: -# source: rtsp://192.168.10.1/unicast -# sourceProtocol: udp +# source: rtsp://localhost:8554/mystream +# sourceProtocol: tcp - original: - runOnPublish: ffmpeg -i rtsp://localhost:8554/original -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed +# original: +# runOnPublish: ffmpeg -i rtsp://localhost:8554/original -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed endef export CONFIG_RUN diff --git a/go.mod b/go.mod index 7be3b3e1..8939e88d 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200719162513-a119764b9644 + github.com/aler9/gortsplib v0.0.0-20200719202520-de32b1f15ecb github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 github.com/stretchr/testify v1.6.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/go.sum b/go.sum index 329b627c..3ec57227 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200719162513-a119764b9644 h1:14g114ATdvGCrnKOHRLklmPwtchF2LQAjdyORVBEzoQ= -github.com/aler9/gortsplib v0.0.0-20200719162513-a119764b9644/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= +github.com/aler9/gortsplib v0.0.0-20200719202520-de32b1f15ecb h1:R9F835QLbnfLQrOoHZULCrASRC23287Lb6v5LpOt0TY= +github.com/aler9/gortsplib v0.0.0-20200719202520-de32b1f15ecb/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 h1:W0iNErWKvSAyJBNVx+qQoyFrWOFVgS6f/WEME/D3EZc= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436/go.mod h1:OnlEK3QI7YtM+ShZWtGajmOHLZ3bjU80AcIS5e34i1U= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= diff --git a/source-udpl.go b/source-udpl.go deleted file mode 100644 index 9f90a4a2..00000000 --- a/source-udpl.go +++ /dev/null @@ -1,91 +0,0 @@ -package main - -import ( - "net" - "time" - - "github.com/aler9/gortsplib" -) - -type sourceUdpListener struct { - p *program - source *source - trackId int - streamType gortsplib.StreamType - publisherIp net.IP - publisherPort int - nconn *net.UDPConn - running bool - readBuf *doubleBuffer - - writeChan chan *udpAddrBufPair - done chan struct{} -} - -func newSourceUdpListener(p *program, port int, source *source, - trackId int, streamType gortsplib.StreamType, publisherIp net.IP) (*sourceUdpListener, error) { - nconn, err := net.ListenUDP("udp", &net.UDPAddr{ - Port: port, - }) - if err != nil { - return nil, err - } - - l := &sourceUdpListener{ - p: p, - source: source, - trackId: trackId, - streamType: streamType, - publisherIp: publisherIp, - nconn: nconn, - readBuf: newDoubleBuffer(2048), - writeChan: make(chan *udpAddrBufPair), - done: make(chan struct{}), - } - - return l, nil -} - -func (l *sourceUdpListener) close() { - l.nconn.Close() // close twice -} - -func (l *sourceUdpListener) start() { - go l.run() -} - -func (l *sourceUdpListener) stop() { - l.nconn.Close() - <-l.done -} - -func (l *sourceUdpListener) run() { - writeDone := make(chan struct{}) - go func() { - defer close(writeDone) - for w := range l.writeChan { - l.nconn.SetWriteDeadline(time.Now().Add(l.p.conf.WriteTimeout)) - l.nconn.WriteTo(w.buf, w.addr) - } - }() - - for { - buf := l.readBuf.swap() - n, addr, err := l.nconn.ReadFromUDP(buf) - if err != nil { - break - } - - if !l.publisherIp.Equal(addr.IP) || addr.Port != l.publisherPort { - continue - } - - l.source.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) - l.p.events <- programEventStreamerFrame{l.source, l.trackId, l.streamType, buf[:n]} - } - - close(l.writeChan) - <-writeDone - - close(l.done) -} diff --git a/source.go b/source.go index d5ee05e8..285bc66f 100644 --- a/source.go +++ b/source.go @@ -5,6 +5,8 @@ import ( "math/rand" "net" "net/url" + "os" + "sync" "time" "github.com/aler9/gortsplib" @@ -12,17 +14,9 @@ import ( ) const ( - sourceRetryInterval = 5 * time.Second - sourceCheckStreamInterval = 5 * time.Second - sourceKeepaliveInterval = 60 * time.Second - sourceReceiverReportInterval = 10 * time.Second + sourceRetryInterval = 5 * time.Second ) -type sourceUdpListenerPair struct { - rtpl *sourceUdpListener - rtcpl *sourceUdpListener -} - type source struct { p *program path string @@ -32,7 +26,6 @@ type source struct { tracks []*gortsplib.Track serverSdpText []byte serverSdpParsed *sdp.SessionDescription - rtcpReceivers []*gortsplib.RtcpReceiver readBuf *doubleBuffer terminate chan struct{} @@ -123,11 +116,15 @@ func (s *source) run() { func (s *source) do() bool { s.log("initializing with protocol %s", s.proto) - var nconn net.Conn + var conn *gortsplib.ConnClient var err error dialDone := make(chan struct{}) go func() { - nconn, err = net.DialTimeout("tcp", s.u.Host, s.p.conf.ReadTimeout) + conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{ + Host: s.u.Host, + ReadTimeout: s.p.conf.ReadTimeout, + WriteTimeout: s.p.conf.WriteTimeout, + }) close(dialDone) }() @@ -141,13 +138,8 @@ func (s *source) do() bool { s.log("ERR: %s", err) return true } - defer nconn.Close() - conn := gortsplib.NewConnClient(gortsplib.ConnClientConf{ - Conn: nconn, - ReadTimeout: s.p.conf.ReadTimeout, - WriteTimeout: s.p.conf.WriteTimeout, - }) + defer conn.Close() _, err = conn.Options(s.u) if err != nil { @@ -176,59 +168,42 @@ func (s *source) do() bool { } func (s *source) runUdp(conn *gortsplib.ConnClient) bool { - publisherIp := conn.NetConn().RemoteAddr().(*net.TCPAddr).IP + type trackListenerPair struct { + rtpl *gortsplib.ConnClientUdpListener + rtcpl *gortsplib.ConnClientUdpListener + } + var listeners []*trackListenerPair - var sourceUdpListenerPairs []sourceUdpListenerPair + for _, track := range s.tracks { + var rtpl *gortsplib.ConnClientUdpListener + var rtcpl *gortsplib.ConnClientUdpListener + var err error - defer func() { - for _, pair := range sourceUdpListenerPairs { - pair.rtpl.close() - pair.rtcpl.close() - } - }() + for { + // choose two consecutive ports in range 65536-10000 + // rtp must be pair and rtcp odd + rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000 + rtcpPort := rtpPort + 1 - for i, track := range s.tracks { - var rtpPort int - var rtcpPort int - var rtpl *sourceUdpListener - var rtcpl *sourceUdpListener - func() { - for { - // choose two consecutive ports in range 65536-10000 - // rtp must be pair and rtcp odd - rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000 - rtcpPort = rtpPort + 1 - - var err error - rtpl, err = newSourceUdpListener(s.p, rtpPort, s, i, - gortsplib.StreamTypeRtp, publisherIp) - if err != nil { - continue + rtpl, rtcpl, _, err = conn.SetupUdp(s.u, track, rtpPort, rtcpPort) + if err != nil { + // retry if it's a bind error + if nerr, ok := err.(*net.OpError); ok { + if serr, ok := nerr.Err.(*os.SyscallError); ok { + if serr.Syscall == "bind" { + continue + } + } } - rtcpl, err = newSourceUdpListener(s.p, rtcpPort, s, i, - gortsplib.StreamTypeRtcp, publisherIp) - if err != nil { - rtpl.close() - continue - } - - return + s.log("ERR: %s", err) + return true } - }() - rtpServerPort, rtcpServerPort, _, err := conn.SetupUdp(s.u, track, rtpPort, rtcpPort) - if err != nil { - s.log("ERR: %s", err) - rtpl.close() - rtcpl.close() - return true + break } - rtpl.publisherPort = rtpServerPort - rtcpl.publisherPort = rtcpServerPort - - sourceUdpListenerPairs = append(sourceUdpListenerPairs, sourceUdpListenerPair{ + listeners = append(listeners, &trackListenerPair{ rtpl: rtpl, rtcpl: rtcpl, }) @@ -240,26 +215,53 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { return true } - s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.tracks)) - for trackId := range s.tracks { - s.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() - } - - for _, pair := range sourceUdpListenerPairs { - pair.rtpl.start() - pair.rtcpl.start() - } - s.p.events <- programEventStreamerReady{s} + var wg sync.WaitGroup + + for trackId, lp := range listeners { + wg.Add(2) + + // receive RTP packets + go func(trackId int, l *gortsplib.ConnClientUdpListener) { + defer wg.Done() + + doubleBuf := newDoubleBuffer(2048) + for { + buf := doubleBuf.swap() + + n, err := l.Read(buf) + if err != nil { + break + } + + s.p.events <- programEventStreamerFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} + } + }(trackId, lp.rtpl) + + // receive RTCP packets + go func(trackId int, l *gortsplib.ConnClientUdpListener) { + defer wg.Done() + + doubleBuf := newDoubleBuffer(2048) + for { + buf := doubleBuf.swap() + + n, err := l.Read(buf) + if err != nil { + break + } + + s.p.events <- programEventStreamerFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} + } + }(trackId, lp.rtcpl) + } + tcpConnDone := make(chan error) go func() { tcpConnDone <- conn.LoopUDP(s.u) }() - checkStreamTicker := time.NewTicker(sourceCheckStreamInterval) - receiverReportTicker := time.NewTicker(sourceReceiverReportInterval) - var ret bool outer: @@ -275,46 +277,16 @@ outer: s.log("ERR: %s", err) ret = true break outer - - case <-checkStreamTicker.C: - for trackId := range s.tracks { - if time.Since(s.rtcpReceivers[trackId].LastFrameTime()) >= s.p.conf.ReadTimeout { - s.log("ERR: stream is dead") - conn.NetConn().Close() - <-tcpConnDone - ret = true - break outer - } - } - - case <-receiverReportTicker.C: - for trackId := range s.tracks { - frame := s.rtcpReceivers[trackId].Report() - sourceUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{ - addr: &net.UDPAddr{ - IP: conn.NetConn().RemoteAddr().(*net.TCPAddr).IP, - Zone: conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone, - Port: sourceUdpListenerPairs[trackId].rtcpl.publisherPort, - }, - buf: frame, - } - } } } - checkStreamTicker.Stop() - receiverReportTicker.Stop() - s.p.events <- programEventStreamerNotReady{s} - for _, pair := range sourceUdpListenerPairs { - pair.rtpl.stop() - pair.rtcpl.stop() - } - - for trackId := range s.tracks { - s.rtcpReceivers[trackId].Close() + for _, lp := range listeners { + lp.rtpl.Close() + lp.rtcpl.Close() } + wg.Wait() return ret } @@ -334,11 +306,6 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool { return true } - s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.tracks)) - for trackId := range s.tracks { - s.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() - } - s.p.events <- programEventStreamerReady{s} frame := &gortsplib.InterleavedFrame{} @@ -355,15 +322,10 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool { return } - s.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) s.p.events <- programEventStreamerFrame{s, frame.TrackId, frame.StreamType, frame.Content} } }() - // a ticker to check the stream is not needed since there's already a deadline - // on the RTSP reads - receiverReportTicker := time.NewTicker(sourceReceiverReportInterval) - var ret bool outer: @@ -379,28 +341,11 @@ outer: s.log("ERR: %s", err) ret = true break outer - - case <-receiverReportTicker.C: - for trackId := range s.tracks { - frame := s.rtcpReceivers[trackId].Report() - - conn.WriteFrame(&gortsplib.InterleavedFrame{ - TrackId: trackId, - StreamType: gortsplib.StreamTypeRtcp, - Content: frame, - }) - } } } - receiverReportTicker.Stop() - s.p.events <- programEventStreamerNotReady{s} - for trackId := range s.tracks { - s.rtcpReceivers[trackId].Close() - } - return ret }