diff --git a/Makefile b/Makefile index 6d9c3663..1be10f2f 100644 --- a/Makefile +++ b/Makefile @@ -64,6 +64,7 @@ test-nodocker: test-internal test-root test: echo "$$DOCKERFILE_TEST" | docker build -q . -f - -t temp docker run --rm \ + --network=host \ -v /var/run/docker.sock:/var/run/docker.sock:ro \ temp \ make test-nodocker diff --git a/README.md b/README.md index 24307701..b71e1fbd 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ _rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP / RTMP / Features: * Publish live streams with RTSP (UDP, TCP or TLS mode) or RTMP -* Read live streams with RTSP, RTMP or HLS +* Read live streams with RTSP (UDP, UDP-multicast, TCP or TLS mode), RTMP or HLS * Pull and serve streams from other RTSP or RTMP servers or cameras, always or on-demand (RTSP proxy) * Streams are automatically converted from a protocol to another (for instance, it's possible to publish with RTSP and read with HLS) * Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM, JPEG) diff --git a/go.mod b/go.mod index 29dc1cda..07d86524 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210603214139-363871d65898 + github.com/aler9/gortsplib v0.0.0-20210617144524-db28e87ecb0c github.com/asticode/go-astits v0.0.0-00010101000000-000000000000 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 diff --git a/go.sum b/go.sum index a7882e5f..4b0b34c3 100644 --- a/go.sum +++ b/go.sum @@ -4,13 +4,12 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04 h1:CXgQLsU4uxWAmsXNOjGLbj0A+0IlRcpZpMgI13fmVwo= github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ= -github.com/aler9/gortsplib v0.0.0-20210603214139-363871d65898 h1:Qw3xa+fdWVF0eHhZ/ntET1q24y5uynLFIllYAWbkeTU= -github.com/aler9/gortsplib v0.0.0-20210603214139-363871d65898/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY= +github.com/aler9/gortsplib v0.0.0-20210617144524-db28e87ecb0c h1:IqV2N1yifhnVPafY8SknenVL6k66gGa5jhrujcbjl5Q= +github.com/aler9/gortsplib v0.0.0-20210617144524-db28e87ecb0c/go.mod h1:ozu0NvgZMhb4AT6VdyV6OfmgPviSiZImRkaTwW1nEKc= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= github.com/asticode/go-astikit v0.20.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -28,7 +27,6 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM= github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0= -github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk= github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U= github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= @@ -38,7 +36,6 @@ github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKq github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -48,20 +45,25 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210610132358-84b48f89b13b h1:k+E048sYJHyVnsr1GDrRZWQ32D2C7lWs9JRc0bel53A= +golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9 h1:L2auWcuQIvxz9xSEqzESnV/QN/gNRXNApHi3fYwl2w0= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 2aeb8976..77724b72 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -7,7 +7,6 @@ import ( "os" "time" - "github.com/aler9/gortsplib" "github.com/aler9/gortsplib/pkg/headers" "golang.org/x/crypto/nacl/secretbox" "gopkg.in/yaml.v2" @@ -26,6 +25,16 @@ const ( EncryptionStrict ) +// Protocol is a RTSP protocol +type Protocol int + +// RTSP protocols. +const ( + ProtocolUDP Protocol = iota + ProtocolMulticast + ProtocolTCP +) + func decrypt(key string, byts []byte) ([]byte, error) { enc, err := base64.StdEncoding.DecodeString(string(byts)) if err != nil { @@ -64,20 +73,20 @@ type Conf struct { RunOnConnectRestart bool `yaml:"runOnConnectRestart"` // rtsp - RTSPDisable bool `yaml:"rtspDisable"` - Protocols []string `yaml:"protocols"` - ProtocolsParsed map[gortsplib.StreamProtocol]struct{} `yaml:"-" json:"-"` - Encryption string `yaml:"encryption"` - EncryptionParsed Encryption `yaml:"-" json:"-"` - RTSPAddress string `yaml:"rtspAddress"` - RTSPSAddress string `yaml:"rtspsAddress"` - RTPAddress string `yaml:"rtpAddress"` - RTCPAddress string `yaml:"rtcpAddress"` - ServerKey string `yaml:"serverKey"` - ServerCert string `yaml:"serverCert"` - AuthMethods []string `yaml:"authMethods"` - AuthMethodsParsed []headers.AuthMethod `yaml:"-" json:"-"` - ReadBufferSize int `yaml:"readBufferSize"` + RTSPDisable bool `yaml:"rtspDisable"` + Protocols []string `yaml:"protocols"` + ProtocolsParsed map[Protocol]struct{} `yaml:"-" json:"-"` + Encryption string `yaml:"encryption"` + EncryptionParsed Encryption `yaml:"-" json:"-"` + RTSPAddress string `yaml:"rtspAddress"` + RTSPSAddress string `yaml:"rtspsAddress"` + RTPAddress string `yaml:"rtpAddress"` + RTCPAddress string `yaml:"rtcpAddress"` + ServerKey string `yaml:"serverKey"` + ServerCert string `yaml:"serverCert"` + AuthMethods []string `yaml:"authMethods"` + AuthMethodsParsed []headers.AuthMethod `yaml:"-" json:"-"` + ReadBufferSize int `yaml:"readBufferSize"` // rtmp RTMPDisable bool `yaml:"rtmpDisable"` @@ -153,16 +162,19 @@ func (conf *Conf) fillAndCheck() error { } if len(conf.Protocols) == 0 { - conf.Protocols = []string{"udp", "tcp"} + conf.Protocols = []string{"udp", "multicast", "tcp"} } - conf.ProtocolsParsed = make(map[gortsplib.StreamProtocol]struct{}) + conf.ProtocolsParsed = make(map[Protocol]struct{}) for _, proto := range conf.Protocols { switch proto { case "udp": - conf.ProtocolsParsed[gortsplib.StreamProtocolUDP] = struct{}{} + conf.ProtocolsParsed[ProtocolUDP] = struct{}{} + + case "multicast": + conf.ProtocolsParsed[ProtocolMulticast] = struct{}{} case "tcp": - conf.ProtocolsParsed[gortsplib.StreamProtocolTCP] = struct{}{} + conf.ProtocolsParsed[ProtocolTCP] = struct{}{} default: return fmt.Errorf("unsupported protocol: %s", proto) @@ -185,7 +197,7 @@ func (conf *Conf) fillAndCheck() error { case "strict", "yes", "true": conf.EncryptionParsed = EncryptionStrict - if _, ok := conf.ProtocolsParsed[gortsplib.StreamProtocolUDP]; ok { + if _, ok := conf.ProtocolsParsed[ProtocolUDP]; ok { return fmt.Errorf("encryption can't be used with the UDP stream protocol") } diff --git a/internal/conf/path.go b/internal/conf/path.go index 024d1998..ccd601b3 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/aler9/gortsplib" "github.com/aler9/gortsplib/pkg/base" ) @@ -69,17 +68,17 @@ type PathConf struct { Regexp *regexp.Regexp `yaml:"-" json:"-"` // source - Source string `yaml:"source"` - SourceProtocol string `yaml:"sourceProtocol"` - SourceProtocolParsed *gortsplib.StreamProtocol `yaml:"-" json:"-"` - SourceAnyPortEnable bool `yaml:"sourceAnyPortEnable"` - SourceFingerprint string `yaml:"sourceFingerprint" json:"sourceFingerprint"` - SourceOnDemand bool `yaml:"sourceOnDemand"` - SourceOnDemandStartTimeout time.Duration `yaml:"sourceOnDemandStartTimeout"` - SourceOnDemandCloseAfter time.Duration `yaml:"sourceOnDemandCloseAfter"` - SourceRedirect string `yaml:"sourceRedirect"` - DisablePublisherOverride bool `yaml:"disablePublisherOverride"` - Fallback string `yaml:"fallback"` + Source string `yaml:"source"` + SourceProtocol string `yaml:"sourceProtocol"` + SourceProtocolParsed *base.StreamProtocol `yaml:"-" json:"-"` + SourceAnyPortEnable bool `yaml:"sourceAnyPortEnable"` + SourceFingerprint string `yaml:"sourceFingerprint" json:"sourceFingerprint"` + SourceOnDemand bool `yaml:"sourceOnDemand"` + SourceOnDemandStartTimeout time.Duration `yaml:"sourceOnDemandStartTimeout"` + SourceOnDemandCloseAfter time.Duration `yaml:"sourceOnDemandCloseAfter"` + SourceRedirect string `yaml:"sourceRedirect"` + DisablePublisherOverride bool `yaml:"disablePublisherOverride"` + Fallback string `yaml:"fallback"` // authentication PublishUser string `yaml:"publishUser"` @@ -149,11 +148,11 @@ func (pconf *PathConf) fillAndCheck(name string) error { switch pconf.SourceProtocol { case "udp": - v := gortsplib.StreamProtocolUDP + v := base.StreamProtocolUDP pconf.SourceProtocolParsed = &v case "tcp": - v := gortsplib.StreamProtocolTCP + v := base.StreamProtocolTCP pconf.SourceProtocolParsed = &v case "automatic": diff --git a/internal/hlsconverter/converter.go b/internal/hlsconverter/converter.go index 35279ab8..079d277f 100644 --- a/internal/hlsconverter/converter.go +++ b/internal/hlsconverter/converter.go @@ -267,7 +267,7 @@ func (c *Converter) runInner(innerCtx context.Context) error { var aacConfig rtpaac.MPEG4AudioConfig var aacDecoder *rtpaac.Decoder - for i, t := range res.Tracks { + for i, t := range res.Stream.Tracks() { if t.IsH264() { if videoTrack != nil { return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1) diff --git a/internal/path/path.go b/internal/path/path.go index 0adbbe71..4a6909d4 100644 --- a/internal/path/path.go +++ b/internal/path/path.go @@ -20,7 +20,6 @@ import ( "github.com/aler9/rtsp-simple-server/internal/rtspsource" "github.com/aler9/rtsp-simple-server/internal/source" "github.com/aler9/rtsp-simple-server/internal/stats" - "github.com/aler9/rtsp-simple-server/internal/streamproc" ) func newEmptyTimer() *time.Timer { @@ -35,6 +34,10 @@ type Parent interface { OnPathClose(*Path) } +type rtspSession interface { + IsRTSPSession() +} + type sourceRedirect struct{} func (*sourceRedirect) IsSource() {} @@ -77,9 +80,8 @@ type Path struct { describeRequests []readpublisher.DescribeReq setupPlayRequests []readpublisher.SetupPlayReq source source.Source - sourceTracks gortsplib.Tracks - sp *streamproc.StreamProc - readers *readersMap + sourceStream *gortsplib.ServerStream + nonRTSPReaders *readersMap onDemandCmd *externalcmd.Cmd describeTimer *time.Timer sourceCloseTimer *time.Timer @@ -134,7 +136,7 @@ func New( ctx: ctx, ctxCancel: ctxCancel, readPublishers: make(map[readpublisher.ReadPublisher]readPublisherState), - readers: newReadersMap(), + nonRTSPReaders: newReadersMap(), describeTimer: newEmptyTimer(), sourceCloseTimer: newEmptyTimer(), runOnDemandCloseTimer: newEmptyTimer(), @@ -224,10 +226,9 @@ outer: break outer case req := <-pa.extSourceSetReady: - pa.sourceTracks = req.Tracks - pa.sp = streamproc.New(pa, len(req.Tracks)) + pa.sourceStream = gortsplib.NewServerStream(req.Tracks) pa.onSourceSetReady() - req.Res <- source.ExtSetReadyRes{SP: pa.sp} + req.Res <- source.ExtSetReadyRes{} case req := <-pa.extSourceSetNotReady: pa.onSourceSetNotReady() @@ -299,17 +300,20 @@ outer: req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("terminated")} } - for c, state := range pa.readPublishers { + for rp, state := range pa.readPublishers { if state != readPublisherStatePreRemove { switch state { case readPublisherStatePlay: atomic.AddInt64(pa.stats.CountReaders, -1) - pa.readers.remove(c) + + if _, ok := rp.(rtspSession); !ok { + pa.nonRTSPReaders.remove(rp) + } case readPublisherStateRecord: atomic.AddInt64(pa.stats.CountPublishers, -1) } - c.Close() + rp.Close() } } @@ -372,28 +376,33 @@ func (pa *Path) addReadPublisher(c readpublisher.ReadPublisher, state readPublis pa.readPublishers[c] = state } -func (pa *Path) removeReadPublisher(c readpublisher.ReadPublisher) { - state := pa.readPublishers[c] - pa.readPublishers[c] = readPublisherStatePreRemove +func (pa *Path) removeReadPublisher(rp readpublisher.ReadPublisher) { + state := pa.readPublishers[rp] + pa.readPublishers[rp] = readPublisherStatePreRemove switch state { case readPublisherStatePlay: atomic.AddInt64(pa.stats.CountReaders, -1) - pa.readers.remove(c) + + if _, ok := rp.(rtspSession); !ok { + pa.nonRTSPReaders.remove(rp) + } case readPublisherStateRecord: atomic.AddInt64(pa.stats.CountPublishers, -1) pa.onSourceSetNotReady() } - if pa.source == c { + if pa.source == rp { pa.source = nil + pa.sourceStream.Close() + pa.sourceStream = nil // close all readPublishers that are reading or waiting to read - for oc, state := range pa.readPublishers { + for orp, state := range pa.readPublishers { if state != readPublisherStatePreRemove { - pa.removeReadPublisher(oc) - oc.Close() + pa.removeReadPublisher(orp) + orp.Close() } } } @@ -412,7 +421,9 @@ func (pa *Path) onSourceSetReady() { pa.sourceState = sourceStateReady for _, req := range pa.describeRequests { - req.Res <- readpublisher.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet + req.Res <- readpublisher.DescribeRes{ + Stream: pa.sourceStream, + } } pa.describeRequests = nil @@ -484,13 +495,17 @@ func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) { pa.scheduleClose() if _, ok := pa.source.(*sourceRedirect); ok { - req.Res <- readpublisher.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet + req.Res <- readpublisher.DescribeRes{ + Redirect: pa.conf.SourceRedirect, + } return } switch pa.sourceState { case sourceStateReady: - req.Res <- readpublisher.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet + req.Res <- readpublisher.DescribeRes{ + Stream: pa.sourceStream, + } return case sourceStateWaitingDescribe: @@ -556,24 +571,21 @@ func (pa *Path) onReadPublisherSetupPlayPost(req readpublisher.SetupPlayReq) { pa.addReadPublisher(req.Author, readPublisherStatePrePlay) } - var ti []streamproc.TrackInfo - if pa.sp != nil { - ti = pa.sp.TrackInfos() - } - req.Res <- readpublisher.SetupPlayRes{ - Path: pa, - Tracks: pa.sourceTracks, - TrackInfos: ti, + Path: pa, + Stream: pa.sourceStream, } } func (pa *Path) onReadPublisherPlay(req readpublisher.PlayReq) { atomic.AddInt64(pa.stats.CountReaders, 1) pa.readPublishers[req.Author] = readPublisherStatePlay - pa.readers.add(req.Author) - req.Res <- readpublisher.PlayRes{TrackInfos: pa.sp.TrackInfos()} + if _, ok := req.Author.(rtspSession); !ok { + pa.nonRTSPReaders.add(req.Author) + } + + req.Res <- readpublisher.PlayRes{} } func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) { @@ -609,7 +621,7 @@ func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) { pa.addReadPublisher(req.Author, readPublisherStatePreRecord) pa.source = req.Author - pa.sourceTracks = req.Tracks + pa.sourceStream = gortsplib.NewServerStream(req.Tracks) req.Res <- readpublisher.AnnounceRes{pa, nil} //nolint:govet } @@ -623,9 +635,7 @@ func (pa *Path) onReadPublisherRecord(req readpublisher.RecordReq) { pa.readPublishers[req.Author] = readPublisherStateRecord pa.onSourceSetReady() - pa.sp = streamproc.New(pa, len(pa.sourceTracks)) - - req.Res <- readpublisher.RecordRes{SP: pa.sp, Err: nil} + req.Res <- readpublisher.RecordRes{} } func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) { @@ -638,7 +648,10 @@ func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) { if state == readPublisherStatePlay { atomic.AddInt64(pa.stats.CountReaders, -1) pa.readPublishers[req.Author] = readPublisherStatePrePlay - pa.readers.remove(req.Author) + + if _, ok := req.Author.(rtspSession); !ok { + pa.nonRTSPReaders.remove(req.Author) + } } else if state == readPublisherStateRecord { atomic.AddInt64(pa.stats.CountPublishers, -1) @@ -792,7 +805,9 @@ func (pa *Path) OnReadPublisherRemove(req readpublisher.RemoveReq) { } } -// OnSPFrame is called by streamproc.StreamProc. -func (pa *Path) OnSPFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { - pa.readers.forwardFrame(trackID, streamType, payload) +// OnFrame is called by a readpublisher +func (pa *Path) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { + pa.sourceStream.WriteFrame(trackID, streamType, payload) + + pa.nonRTSPReaders.forwardFrame(trackID, streamType, payload) } diff --git a/internal/pathman/pathman.go b/internal/pathman/pathman.go index f320d6d1..90dbbe13 100644 --- a/internal/pathman/pathman.go +++ b/internal/pathman/pathman.go @@ -319,11 +319,7 @@ func (pm *PathManager) OnReadPublisherDescribe(req readpublisher.DescribeReq) { select { case pm.rpDescribe <- req: case <-pm.ctx.Done(): - req.Res <- readpublisher.DescribeRes{ - SDP: nil, - Redirect: "", - Err: fmt.Errorf("terminated"), - } + req.Res <- readpublisher.DescribeRes{Err: fmt.Errorf("terminated")} } } @@ -332,10 +328,7 @@ func (pm *PathManager) OnReadPublisherAnnounce(req readpublisher.AnnounceReq) { select { case pm.rpAnnounce <- req: case <-pm.ctx.Done(): - req.Res <- readpublisher.AnnounceRes{ - Path: nil, - Err: fmt.Errorf("terminated"), - } + req.Res <- readpublisher.AnnounceRes{Err: fmt.Errorf("terminated")} } } @@ -344,11 +337,7 @@ func (pm *PathManager) OnReadPublisherSetupPlay(req readpublisher.SetupPlayReq) select { case pm.rpSetupPlay <- req: case <-pm.ctx.Done(): - req.Res <- readpublisher.SetupPlayRes{ - Path: nil, - Tracks: nil, - Err: fmt.Errorf("terminated"), - } + req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("terminated")} } } diff --git a/internal/readpublisher/readpublisher.go b/internal/readpublisher/readpublisher.go index bc865e58..6c4664ff 100644 --- a/internal/readpublisher/readpublisher.go +++ b/internal/readpublisher/readpublisher.go @@ -9,7 +9,6 @@ import ( "github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/rtsp-simple-server/internal/conf" - "github.com/aler9/rtsp-simple-server/internal/streamproc" ) // Path is implemented by path.Path. @@ -20,6 +19,7 @@ type Path interface { OnReadPublisherPlay(PlayReq) OnReadPublisherRecord(RecordReq) OnReadPublisherPause(PauseReq) + OnFrame(int, gortsplib.StreamType, []byte) } // ErrNoOnePublishing is a "no one is publishing" error. @@ -63,7 +63,7 @@ type ReadPublisher interface { // DescribeRes is a describe response. type DescribeRes struct { - SDP []byte + Stream *gortsplib.ServerStream Redirect string Err error } @@ -79,10 +79,9 @@ type DescribeReq struct { // SetupPlayRes is a setup/play response. type SetupPlayRes struct { - Path Path - Tracks gortsplib.Tracks - TrackInfos []streamproc.TrackInfo - Err error + Path Path + Stream *gortsplib.ServerStream + Err error } // SetupPlayReq is a setup/play request. @@ -117,9 +116,7 @@ type RemoveReq struct { } // PlayRes is a play response. -type PlayRes struct { - TrackInfos []streamproc.TrackInfo -} +type PlayRes struct{} // PlayReq is a play request. type PlayReq struct { @@ -129,7 +126,6 @@ type PlayReq struct { // RecordRes is a record response. type RecordRes struct { - SP *streamproc.StreamProc Err error } diff --git a/internal/rtmpconn/conn.go b/internal/rtmpconn/conn.go index 17b4759b..717af4a3 100644 --- a/internal/rtmpconn/conn.go +++ b/internal/rtmpconn/conn.go @@ -238,7 +238,7 @@ func (c *Conn) runRead(ctx context.Context) error { var audioClockRate int var aacDecoder *rtpaac.Decoder - for i, t := range res.Tracks { + for i, t := range res.Stream.Tracks() { if t.IsH264() { if videoTrack != nil { return fmt.Errorf("can't read track %d with RTMP: too many tracks", i+1) @@ -450,12 +450,12 @@ func (c *Conn) runPublish(ctx context.Context) error { } }(c.path) - rtcpSenders := rtcpsenderset.New(tracks, rres.SP.OnFrame) + rtcpSenders := rtcpsenderset.New(tracks, c.path.OnFrame) defer rtcpSenders.Close() onFrame := func(trackID int, payload []byte) { rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) - rres.SP.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) + c.path.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) } for { diff --git a/internal/rtmpsource/source.go b/internal/rtmpsource/source.go index 84d388c5..a62b787f 100644 --- a/internal/rtmpsource/source.go +++ b/internal/rtmpsource/source.go @@ -29,6 +29,7 @@ type Parent interface { Log(logger.Level, string, ...interface{}) OnExtSourceSetReady(req source.ExtSetReadyReq) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) + OnFrame(int, gortsplib.StreamType, []byte) } // Source is a RTMP external source. @@ -177,7 +178,7 @@ func (s *Source) runInner() bool { Tracks: tracks, Res: cres, }) - res := <-cres + <-cres defer func() { res := make(chan struct{}) @@ -187,12 +188,12 @@ func (s *Source) runInner() bool { <-res }() - rtcpSenders := rtcpsenderset.New(tracks, res.SP.OnFrame) + rtcpSenders := rtcpsenderset.New(tracks, s.parent.OnFrame) defer rtcpSenders.Close() onFrame := func(trackID int, payload []byte) { rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) - res.SP.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) + s.parent.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) } for { diff --git a/internal/rtspconn/conn.go b/internal/rtspconn/conn.go index 8a74ec67..ae5587e7 100644 --- a/internal/rtspconn/conn.go +++ b/internal/rtspconn/conn.go @@ -131,7 +131,7 @@ func (c *Conn) OnResponse(res *base.Response) { } // OnDescribe is called by rtspserver.Server. -func (c *Conn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) { +func (c *Conn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { resc := make(chan readpublisher.DescribeRes) c.pathMan.OnReadPublisherDescribe(readpublisher.DescribeReq{ PathName: ctx.Path, @@ -178,7 +178,7 @@ func (c *Conn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Resp return &base.Response{ StatusCode: base.StatusOK, - }, res.SDP, nil + }, res.Stream, nil } // ValidateCredentials allows to validate the credentials of a path. diff --git a/internal/rtspserver/server.go b/internal/rtspserver/server.go index fefd2cf1..886ad68e 100644 --- a/internal/rtspserver/server.go +++ b/internal/rtspserver/server.go @@ -12,6 +12,7 @@ import ( "github.com/aler9/gortsplib" "github.com/aler9/gortsplib/pkg/base" + "github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/pathman" "github.com/aler9/rtsp-simple-server/internal/rtspconn" @@ -53,7 +54,7 @@ type Server struct { readTimeout time.Duration isTLS bool rtspAddress string - protocols map[base.StreamProtocol]struct{} + protocols map[conf.Protocol]struct{} runOnConnect string runOnConnectRestart bool stats *stats.Stats @@ -84,7 +85,7 @@ func New( serverCert string, serverKey string, rtspAddress string, - protocols map[base.StreamProtocol]struct{}, + protocols map[conf.Protocol]struct{}, runOnConnect string, runOnConnectRestart bool, stats *stats.Stats, @@ -274,7 +275,7 @@ func (s *Server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { } // OnDescribe implements gortsplib.ServerHandlerOnDescribe. -func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) { +func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { s.mutex.RLock() c := s.conns[ctx.Conn] s.mutex.RUnlock() @@ -291,7 +292,7 @@ func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Re } // OnSetup implements gortsplib.ServerHandlerOnSetup. -func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *uint32, error) { +func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { s.mutex.RLock() c := s.conns[ctx.Conn] se := s.sessions[ctx.Session] diff --git a/internal/rtspsession/session.go b/internal/rtspsession/session.go index 0e479a4e..a63e5f5c 100644 --- a/internal/rtspsession/session.go +++ b/internal/rtspsession/session.go @@ -4,18 +4,17 @@ import ( "errors" "fmt" "net" - "strconv" "time" "github.com/aler9/gortsplib" "github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/headers" + "github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/readpublisher" "github.com/aler9/rtsp-simple-server/internal/rtspconn" - "github.com/aler9/rtsp-simple-server/internal/streamproc" ) const ( @@ -36,7 +35,7 @@ type Parent interface { // Session is a RTSP server-side session. type Session struct { rtspAddress string - protocols map[gortsplib.StreamProtocol]struct{} + protocols map[conf.Protocol]struct{} visualID string ss *gortsplib.ServerSession pathMan PathMan @@ -45,14 +44,13 @@ type Session struct { path readpublisher.Path setuppedTracks map[int]*gortsplib.Track // read onReadCmd *externalcmd.Cmd // read - sp *streamproc.StreamProc // publish onPublishCmd *externalcmd.Cmd // publish } // New allocates a Session. func New( rtspAddress string, - protocols map[gortsplib.StreamProtocol]struct{}, + protocols map[conf.Protocol]struct{}, visualID string, ss *gortsplib.ServerSession, sc *gortsplib.ServerConn, @@ -107,11 +105,21 @@ func (s *Session) IsReadPublisher() {} // IsSource implements source.Source. func (s *Session) IsSource() {} +// IsRTSPSession implements path.rtspSession. +func (s *Session) IsRTSPSession() {} + // VisualID returns the visual ID of the session. func (s *Session) VisualID() string { return s.visualID } +func (s *Session) displayedProtocol() string { + if *s.ss.SetuppedDelivery() == base.StreamDeliveryMulticast { + return "UDP-multicast" + } + return s.ss.SetuppedProtocol().String() +} + func (s *Session) log(level logger.Level, format string, args ...interface{}) { s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.visualID}, args...)...) } @@ -157,19 +165,25 @@ func (s *Session) OnAnnounce(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnAnn } // OnSetup is called by rtspserver.Server. -func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *uint32, error) { - if ctx.Transport.Protocol == gortsplib.StreamProtocolUDP { - if _, ok := s.protocols[gortsplib.StreamProtocolUDP]; !ok { +func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + if ctx.Transport.Protocol == base.StreamProtocolUDP { + if _, ok := s.protocols[conf.ProtocolUDP]; !ok { return &base.Response{ StatusCode: base.StatusUnsupportedTransport, }, nil, nil } - } else { - if _, ok := s.protocols[gortsplib.StreamProtocolTCP]; !ok { - return &base.Response{ - StatusCode: base.StatusUnsupportedTransport, - }, nil, nil + + if ctx.Transport.Delivery != nil && *ctx.Transport.Delivery == base.StreamDeliveryMulticast { + if _, ok := s.protocols[conf.ProtocolMulticast]; !ok { + return &base.Response{ + StatusCode: base.StatusUnsupportedTransport, + }, nil, nil + } } + } else if _, ok := s.protocols[conf.ProtocolTCP]; !ok { + return &base.Response{ + StatusCode: base.StatusUnsupportedTransport, + }, nil, nil } switch s.ss.State() { @@ -211,7 +225,7 @@ func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupC s.path = res.Path - if ctx.TrackID >= len(res.Tracks) { + if ctx.TrackID >= len(res.Stream.Tracks()) { return &base.Response{ StatusCode: base.StatusBadRequest, }, nil, fmt.Errorf("track %d does not exist", ctx.TrackID) @@ -220,21 +234,17 @@ func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupC if s.setuppedTracks == nil { s.setuppedTracks = make(map[int]*gortsplib.Track) } - s.setuppedTracks[ctx.TrackID] = res.Tracks[ctx.TrackID] - - var ssrc *uint32 - if res.TrackInfos != nil && res.TrackInfos[ctx.TrackID].LastSSRC != 0 { - ssrc = &res.TrackInfos[ctx.TrackID].LastSSRC - } + s.setuppedTracks[ctx.TrackID] = res.Stream.Tracks()[ctx.TrackID] return &base.Response{ StatusCode: base.StatusOK, - }, ssrc, nil - } + }, res.Stream, nil - return &base.Response{ - StatusCode: base.StatusOK, - }, nil, nil + default: // record + return &base.Response{ + StatusCode: base.StatusOK, + }, nil, nil + } } // OnPlay is called by rtspserver.Server. @@ -250,7 +260,7 @@ func (s *Session) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, resc := make(chan readpublisher.PlayRes) s.path.OnReadPublisherPlay(readpublisher.PlayReq{s, resc}) //nolint:govet - res := <-resc + <-resc tracksLen := len(s.ss.SetuppedTracks()) @@ -263,7 +273,7 @@ func (s *Session) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, } return "tracks" }(), - *s.ss.StreamProtocol()) + s.displayedProtocol()) if s.path.Conf().RunOnRead != "" { _, port, _ := net.SplitHostPort(s.rtspAddress) @@ -272,40 +282,6 @@ func (s *Session) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, Port: port, }) } - - // add RTP-Info - var ri headers.RTPInfo - for trackID, ti := range res.TrackInfos { - if ti.LastTimeNTP == 0 { - continue - } - - track, ok := s.setuppedTracks[trackID] - if !ok { - continue - } - - u := &base.URL{ - Scheme: ctx.Req.URL.Scheme, - User: ctx.Req.URL.User, - Host: ctx.Req.URL.Host, - Path: "/" + s.path.Name() + "/trackID=" + strconv.FormatInt(int64(trackID), 10), - } - - clockRate, _ := track.ClockRate() - ts := uint32(uint64(ti.LastTimeRTP) + - uint64(time.Since(time.Unix(ti.LastTimeNTP, 0)).Seconds()*float64(clockRate))) - lsn := ti.LastSequenceNumber - - ri = append(ri, &headers.RTPInfoEntry{ - URL: u.String(), - SequenceNumber: &lsn, - Timestamp: &ts, - }) - } - if len(ri) > 0 { - h["RTP-Info"] = ri.Write() - } } return &base.Response{ @@ -332,8 +308,6 @@ func (s *Session) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Respo }, res.Err } - s.sp = res.SP - tracksLen := len(s.ss.AnnouncedTracks()) s.log(logger.Info, "is publishing to path '%s', %d %s with %s", @@ -345,7 +319,7 @@ func (s *Session) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Respo } return "tracks" }(), - *s.ss.StreamProtocol()) + s.displayedProtocol()) if s.path.Conf().RunOnPublish != "" { _, port, _ := net.SplitHostPort(s.rtspAddress) @@ -389,10 +363,6 @@ func (s *Session) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Respons // OnFrame implements path.Reader. func (s *Session) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { - if _, ok := s.ss.SetuppedTracks()[trackID]; !ok { - return - } - s.ss.WriteFrame(trackID, streamType, payload) } @@ -402,5 +372,5 @@ func (s *Session) OnIncomingFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { return } - s.sp.OnFrame(ctx.TrackID, ctx.StreamType, ctx.Payload) + s.path.OnFrame(ctx.TrackID, ctx.StreamType, ctx.Payload) } diff --git a/internal/rtspsource/source.go b/internal/rtspsource/source.go index b312d41d..94256d47 100644 --- a/internal/rtspsource/source.go +++ b/internal/rtspsource/source.go @@ -28,12 +28,13 @@ type Parent interface { Log(logger.Level, string, ...interface{}) OnExtSourceSetReady(req source.ExtSetReadyReq) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) + OnFrame(int, gortsplib.StreamType, []byte) } // Source is a RTSP external source. type Source struct { ur string - proto *gortsplib.StreamProtocol + proto *base.StreamProtocol anyPortEnable bool fingerprint string readTimeout time.Duration @@ -52,7 +53,7 @@ type Source struct { func New( ctxParent context.Context, ur string, - proto *gortsplib.StreamProtocol, + proto *base.StreamProtocol, anyPortEnable bool, fingerprint string, readTimeout time.Duration, @@ -197,7 +198,7 @@ func (s *Source) runInner() bool { Tracks: conn.Tracks(), Res: cres, }) - res := <-cres + <-cres defer func() { res := make(chan struct{}) @@ -210,7 +211,7 @@ func (s *Source) runInner() bool { readErr := make(chan error) go func() { readErr <- conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { - res.SP.OnFrame(trackID, streamType, payload) + s.parent.OnFrame(trackID, streamType, payload) }) }() diff --git a/internal/source/source.go b/internal/source/source.go index 707f13a2..15947547 100644 --- a/internal/source/source.go +++ b/internal/source/source.go @@ -16,15 +16,8 @@ type ExtSource interface { Close() } -// StreamProc is implemented by streamproc.StreamProc. -type StreamProc interface { - OnFrame(int, gortsplib.StreamType, []byte) -} - // ExtSetReadyRes is a set ready response. -type ExtSetReadyRes struct { - SP StreamProc -} +type ExtSetReadyRes struct{} // ExtSetReadyReq is a set ready request. type ExtSetReadyReq struct { diff --git a/internal/streamproc/streamproc.go b/internal/streamproc/streamproc.go deleted file mode 100644 index 4aa01dd5..00000000 --- a/internal/streamproc/streamproc.go +++ /dev/null @@ -1,82 +0,0 @@ -package streamproc - -import ( - "encoding/binary" - "sync/atomic" - "time" - - "github.com/aler9/gortsplib" -) - -// Path is implemented by path.path. -type Path interface { - OnSPFrame(int, gortsplib.StreamType, []byte) -} - -// TrackInfo contains infos about a track. -type TrackInfo struct { - LastSequenceNumber uint16 - LastTimeRTP uint32 - LastTimeNTP int64 - LastSSRC uint32 -} - -type track struct { - lastSequenceNumber uint32 - lastTimeRTP uint32 - lastTimeNTP int64 - lastSSRC uint32 -} - -// StreamProc is a stream processor, an intermediate layer between a source and a path. -type StreamProc struct { - path Path - tracks []*track -} - -// New allocates a StreamProc. -func New(path Path, tracksLen int) *StreamProc { - sp := &StreamProc{ - path: path, - } - - sp.tracks = make([]*track, tracksLen) - for i := range sp.tracks { - sp.tracks[i] = &track{} - } - - return sp -} - -// TrackInfos returns infos about the tracks of the stream. -func (sp *StreamProc) TrackInfos() []TrackInfo { - ret := make([]TrackInfo, len(sp.tracks)) - for trackID, track := range sp.tracks { - ret[trackID] = TrackInfo{ - LastSequenceNumber: uint16(atomic.LoadUint32(&track.lastSequenceNumber)), - LastTimeRTP: atomic.LoadUint32(&track.lastTimeRTP), - LastTimeNTP: atomic.LoadInt64(&track.lastTimeNTP), - LastSSRC: atomic.LoadUint32(&track.lastSSRC), - } - } - return ret -} - -// OnFrame processes a frame. -func (sp *StreamProc) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { - if streamType == gortsplib.StreamTypeRTP && len(payload) >= 8 { - track := sp.tracks[trackID] - - sequenceNumber := binary.BigEndian.Uint16(payload[2:4]) - atomic.StoreUint32(&track.lastSequenceNumber, uint32(sequenceNumber)) - - timestamp := binary.BigEndian.Uint32(payload[4:8]) - atomic.StoreUint32(&track.lastTimeRTP, timestamp) - atomic.StoreInt64(&track.lastTimeNTP, time.Now().Unix()) - - ssrc := binary.BigEndian.Uint32(payload[8:12]) - atomic.StoreUint32(&track.lastSSRC, ssrc) - } - - sp.path.OnSPFrame(trackID, streamType, payload) -} diff --git a/main.go b/main.go index 6b3c3847..cc59944e 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( "reflect" "sync/atomic" - "github.com/aler9/gortsplib" "gopkg.in/alecthomas/kingpin.v2" "github.com/aler9/rtsp-simple-server/internal/conf" @@ -215,7 +214,7 @@ func (p *program) createResources(initial bool) error { (p.conf.EncryptionParsed == conf.EncryptionNo || p.conf.EncryptionParsed == conf.EncryptionOptional) { if p.serverRTSPPlain == nil { - _, useUDP := p.conf.ProtocolsParsed[gortsplib.StreamProtocolUDP] + _, useUDP := p.conf.ProtocolsParsed[conf.ProtocolUDP] p.serverRTSPPlain, err = rtspserver.New( p.ctx, p.conf.RTSPAddress, diff --git a/main_hlsreader_test.go b/main_hlsreader_test.go index a86503a8..33564838 100644 --- a/main_hlsreader_test.go +++ b/main_hlsreader_test.go @@ -18,7 +18,7 @@ func TestClientHLSRead(t *testing.T) { "-i", "emptyvideo.mkv", "-c", "copy", "-f", "rtsp", - "rtsp://" + ownDockerIP + ":8554/test/stream", + "rtsp://localhost:8554/test/stream", }) require.NoError(t, err) defer cnt1.close() @@ -26,7 +26,7 @@ func TestClientHLSRead(t *testing.T) { time.Sleep(1 * time.Second) cnt2, err := newContainer("ffmpeg", "dest", []string{ - "-i", "http://" + ownDockerIP + ":8888/test/stream/stream.m3u8", + "-i", "http://localhost:8888/test/stream/stream.m3u8", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -42,7 +42,7 @@ func TestClientHLSReadAuth(t *testing.T) { " all:\n" + " readUser: testuser\n" + " readPass: testpass\n" + - " readIps: [172.17.0.0/16]\n") + " readIps: [127.0.0.0/16]\n") require.Equal(t, true, ok) defer p.close() @@ -52,7 +52,7 @@ func TestClientHLSReadAuth(t *testing.T) { "-i", "emptyvideo.mkv", "-c", "copy", "-f", "rtsp", - "rtsp://" + ownDockerIP + ":8554/teststream", + "rtsp://localhost:8554/teststream", }) require.NoError(t, err) defer cnt1.close() @@ -60,7 +60,7 @@ func TestClientHLSReadAuth(t *testing.T) { time.Sleep(1 * time.Second) cnt2, err := newContainer("ffmpeg", "dest", []string{ - "-i", "http://testuser:testpass@" + ownDockerIP + ":8888/teststream/stream.m3u8", + "-i", "http://testuser:testpass@127.0.0.1:8888/teststream/stream.m3u8", "-vframes", "1", "-f", "image2", "-y", "/dev/null", diff --git a/main_rtmpreadpub_test.go b/main_rtmpreadpub_test.go index ad3c2096..79bada5f 100644 --- a/main_rtmpreadpub_test.go +++ b/main_rtmpreadpub_test.go @@ -23,7 +23,7 @@ func TestClientRTMPPublish(t *testing.T) { "-i", "empty" + source + ".mkv", "-c", "copy", "-f", "flv", - "rtmp://" + ownDockerIP + ":1935/test1/test2", + "rtmp://localhost:1935/test1/test2", }) require.NoError(t, err) defer cnt1.close() @@ -32,7 +32,7 @@ func TestClientRTMPPublish(t *testing.T) { cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "udp", - "-i", "rtsp://" + ownDockerIP + ":8554/test1/test2", + "-i", "rtsp://localhost:8554/test1/test2", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -55,7 +55,7 @@ func TestClientRTMPRead(t *testing.T) { "-i", "emptyvideo.mkv", "-c", "copy", "-f", "rtsp", - "rtsp://" + ownDockerIP + ":8554/teststream", + "rtsp://localhost:8554/teststream", }) require.NoError(t, err) defer cnt1.close() @@ -63,7 +63,7 @@ func TestClientRTMPRead(t *testing.T) { time.Sleep(1 * time.Second) cnt2, err := newContainer("ffmpeg", "dest", []string{ - "-i", "rtmp://" + ownDockerIP + ":1935/teststream", + "-i", "rtmp://localhost:1935/teststream", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -81,7 +81,7 @@ func TestClientRTMPAuth(t *testing.T) { " all:\n" + " publishUser: testuser\n" + " publishPass: testpass\n" + - " readIps: [172.17.0.0/16]\n") + " readIps: [127.0.0.0/16]\n") require.Equal(t, true, ok) defer p.close() @@ -91,7 +91,7 @@ func TestClientRTMPAuth(t *testing.T) { "-i", "emptyvideo.mkv", "-c", "copy", "-f", "flv", - "rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass", + "rtmp://localhost/teststream?user=testuser&pass=testpass", }) require.NoError(t, err) defer cnt1.close() @@ -99,7 +99,7 @@ func TestClientRTMPAuth(t *testing.T) { time.Sleep(1 * time.Second) cnt2, err := newContainer("ffmpeg", "dest", []string{ - "-i", "rtmp://" + ownDockerIP + "/teststream", + "-i", "rtmp://127.0.0.1/teststream", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -116,7 +116,7 @@ func TestClientRTMPAuth(t *testing.T) { " all:\n" + " readUser: testuser\n" + " readPass: testpass\n" + - " readIps: [172.17.0.0/16]\n") + " readIps: [127.0.0.0/16]\n") require.Equal(t, true, ok) defer p.close() @@ -126,7 +126,7 @@ func TestClientRTMPAuth(t *testing.T) { "-i", "emptyvideo.mkv", "-c", "copy", "-f", "flv", - "rtmp://" + ownDockerIP + "/teststream", + "rtmp://localhost/teststream", }) require.NoError(t, err) defer cnt1.close() @@ -134,7 +134,7 @@ func TestClientRTMPAuth(t *testing.T) { time.Sleep(1 * time.Second) cnt2, err := newContainer("ffmpeg", "dest", []string{ - "-i", "rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass", + "-i", "rtmp://127.0.0.1/teststream?user=testuser&pass=testpass", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -162,7 +162,7 @@ func TestClientRTMPAuthFail(t *testing.T) { "-i", "emptyvideo.mkv", "-c", "copy", "-f", "flv", - "rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass", + "rtmp://localhost/teststream?user=testuser&pass=testpass", }) require.NoError(t, err) defer cnt1.close() @@ -170,7 +170,7 @@ func TestClientRTMPAuthFail(t *testing.T) { time.Sleep(1 * time.Second) cnt2, err := newContainer("ffmpeg", "dest", []string{ - "-i", "rtmp://" + ownDockerIP + "/teststream", + "-i", "rtmp://localhost/teststream", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -196,7 +196,7 @@ func TestClientRTMPAuthFail(t *testing.T) { "-i", "emptyvideo.mkv", "-c", "copy", "-f", "flv", - "rtmp://" + ownDockerIP + "/teststream", + "rtmp://localhost/teststream", }) require.NoError(t, err) defer cnt1.close() @@ -204,7 +204,7 @@ func TestClientRTMPAuthFail(t *testing.T) { time.Sleep(1 * time.Second) cnt2, err := newContainer("ffmpeg", "dest", []string{ - "-i", "rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass", + "-i", "rtmp://localhost/teststream?user=testuser&pass=testpass", "-vframes", "1", "-f", "image2", "-y", "/dev/null", diff --git a/main_rtmpsource_test.go b/main_rtmpsource_test.go index 69015f4a..b60493db 100644 --- a/main_rtmpsource_test.go +++ b/main_rtmpsource_test.go @@ -33,9 +33,10 @@ func TestSourceRTMP(t *testing.T) { time.Sleep(1 * time.Second) p, ok := testProgram("hlsDisable: yes\n" + + "rtmpDisable: yes\n" + "paths:\n" + " proxied:\n" + - " source: rtmp://" + cnt1.ip() + "/stream/test\n" + + " source: rtmp://localhost/stream/test\n" + " sourceOnDemand: yes\n") require.Equal(t, true, ok) defer p.close() @@ -45,7 +46,7 @@ func TestSourceRTMP(t *testing.T) { cnt3, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "udp", - "-i", "rtsp://" + ownDockerIP + ":8554/proxied", + "-i", "rtsp://localhost:8554/proxied", "-vframes", "1", "-f", "image2", "-y", "/dev/null", diff --git a/main_rtspreadpub_test.go b/main_rtspreadpub_test.go index bac4d124..13e4d147 100644 --- a/main_rtspreadpub_test.go +++ b/main_rtspreadpub_test.go @@ -13,7 +13,6 @@ import ( "github.com/aler9/gortsplib" "github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/headers" - "github.com/pion/rtp" "github.com/stretchr/testify/require" ) @@ -27,39 +26,31 @@ func mustParseURL(s string) *base.URL { func TestClientRTSPPublishRead(t *testing.T) { for _, ca := range []struct { - encrypted bool publisherSoft string publisherProto string readerSoft string readerProto string }{ - {false, "ffmpeg", "udp", "ffmpeg", "udp"}, - {false, "ffmpeg", "udp", "ffmpeg", "tcp"}, - {false, "ffmpeg", "udp", "gstreamer", "udp"}, - {false, "ffmpeg", "udp", "gstreamer", "tcp"}, - {false, "ffmpeg", "udp", "vlc", "udp"}, - {false, "ffmpeg", "udp", "vlc", "tcp"}, - - {false, "ffmpeg", "tcp", "ffmpeg", "udp"}, - {false, "gstreamer", "udp", "ffmpeg", "udp"}, - {false, "gstreamer", "tcp", "ffmpeg", "udp"}, - - {true, "ffmpeg", "tcp", "ffmpeg", "tcp"}, - {true, "ffmpeg", "tcp", "gstreamer", "tcp"}, - {true, "gstreamer", "tcp", "ffmpeg", "tcp"}, + {"ffmpeg", "udp", "ffmpeg", "udp"}, + {"ffmpeg", "udp", "ffmpeg", "multicast"}, + {"ffmpeg", "udp", "ffmpeg", "tcp"}, + {"ffmpeg", "udp", "gstreamer", "udp"}, + {"ffmpeg", "udp", "gstreamer", "multicast"}, + {"ffmpeg", "udp", "gstreamer", "tcp"}, + {"ffmpeg", "udp", "vlc", "udp"}, + {"ffmpeg", "udp", "vlc", "tcp"}, + {"ffmpeg", "tcp", "ffmpeg", "udp"}, + {"gstreamer", "udp", "ffmpeg", "udp"}, + {"gstreamer", "tcp", "ffmpeg", "udp"}, + {"ffmpeg", "tls", "ffmpeg", "tls"}, + {"ffmpeg", "tls", "gstreamer", "tls"}, + {"gstreamer", "tls", "ffmpeg", "tls"}, } { - encryptedStr := func() string { - if ca.encrypted { - return "encrypted" - } - return "plain" - }() - - t.Run(encryptedStr+"_"+ca.publisherSoft+"_"+ca.publisherProto+"_"+ + t.Run(ca.publisherSoft+"_"+ca.publisherProto+"_"+ ca.readerSoft+"_"+ca.readerProto, func(t *testing.T) { var proto string var port string - if !ca.encrypted { + if ca.publisherProto != "tls" { proto = "rtsp" port = "8554" @@ -94,14 +85,25 @@ func TestClientRTSPPublishRead(t *testing.T) { switch ca.publisherSoft { case "ffmpeg": + ps := func() string { + switch ca.publisherProto { + case "udp", "tcp": + return ca.publisherProto + + default: // tls + return "tcp" + } + }() + cnt1, err := newContainer("ffmpeg", "source", []string{ "-re", "-stream_loop", "-1", "-i", "emptyvideo.mkv", "-c", "copy", "-f", "rtsp", - "-rtsp_transport", ca.publisherProto, - proto + "://" + ownDockerIP + ":" + port + "/teststream", + "-rtsp_transport", + ps, + proto + "://localhost:" + port + "/teststream", }) require.NoError(t, err) defer cnt1.close() @@ -109,10 +111,20 @@ func TestClientRTSPPublishRead(t *testing.T) { time.Sleep(1 * time.Second) case "gstreamer": + ps := func() string { + switch ca.publisherProto { + case "udp", "tcp": + return ca.publisherProto + + default: // tls + return "tcp" + } + }() + cnt1, err := newContainer("gstreamer", "source", []string{ "filesrc location=emptyvideo.mkv ! matroskademux ! video/x-h264 ! rtspclientsink " + - "location=" + proto + "://" + ownDockerIP + ":" + port + "/teststream " + - "protocols=" + ca.publisherProto + " tls-validation-flags=0 latency=0 timeout=0 rtx-time=0", + "location=" + proto + "://localhost:" + port + "/teststream " + + "protocols=" + ps + " tls-validation-flags=0 latency=0 timeout=0 rtx-time=0", }) require.NoError(t, err) defer cnt1.close() @@ -124,9 +136,22 @@ func TestClientRTSPPublishRead(t *testing.T) { switch ca.readerSoft { case "ffmpeg": + ps := func() string { + switch ca.readerProto { + case "udp", "tcp": + return ca.publisherProto + + case "multicast": + return "udp_multicast" + + default: // tls + return "tcp" + } + }() + cnt2, err := newContainer("ffmpeg", "dest", []string{ - "-rtsp_transport", ca.readerProto, - "-i", proto + "://" + ownDockerIP + ":" + port + "/teststream", + "-rtsp_transport", ps, + "-i", proto + "://localhost:" + port + "/teststream", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -136,8 +161,21 @@ func TestClientRTSPPublishRead(t *testing.T) { require.Equal(t, 0, cnt2.wait()) case "gstreamer": + ps := func() string { + switch ca.readerProto { + case "udp", "tcp": + return ca.publisherProto + + case "multicast": + return "udp-mcast" + + default: // tls + return "tcp" + } + }() + cnt2, err := newContainer("gstreamer", "read", []string{ - "rtspsrc location=" + proto + "://" + ownDockerIP + ":" + port + "/teststream protocols=tcp tls-validation-flags=0 latency=0 " + + "rtspsrc location=" + proto + "://127.0.0.1:" + port + "/teststream protocols=" + ps + " tls-validation-flags=0 latency=0 " + "! application/x-rtp,media=video ! decodebin ! exitafterframe ! fakesink", }) require.NoError(t, err) @@ -149,7 +187,7 @@ func TestClientRTSPPublishRead(t *testing.T) { if ca.readerProto == "tcp" { args = append(args, "--rtsp-tcp") } - args = append(args, proto+"://"+ownDockerIP+":"+port+"/teststream") + args = append(args, proto+"://localhost:"+port+"/teststream") cnt2, err := newContainer("vlc", "dest", args) require.NoError(t, err) defer cnt2.close() @@ -167,7 +205,7 @@ func TestClientRTSPAuth(t *testing.T) { " all:\n" + " publishUser: testuser\n" + " publishPass: test!$()*+.;<=>[]^_-{}\n" + - " publishIps: [172.17.0.0/16]\n") + " publishIps: [127.0.0.0/16]\n") require.Equal(t, true, ok) defer p.close() @@ -178,7 +216,7 @@ func TestClientRTSPAuth(t *testing.T) { "-c", "copy", "-f", "rtsp", "-rtsp_transport", "udp", - "rtsp://testuser:test!$()*+.;<=>[]^_-{}@" + ownDockerIP + ":8554/test/stream", + "rtsp://testuser:test!$()*+.;<=>[]^_-{}@127.0.0.1:8554/test/stream", }) require.NoError(t, err) defer cnt1.close() @@ -187,7 +225,7 @@ func TestClientRTSPAuth(t *testing.T) { cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "udp", - "-i", "rtsp://" + ownDockerIP + ":8554/test/stream", + "-i", "rtsp://localhost:8554/test/stream", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -208,7 +246,7 @@ func TestClientRTSPAuth(t *testing.T) { " all:\n" + " readUser: testuser\n" + " readPass: test!$()*+.;<=>[]^_-{}\n" + - " readIps: [172.17.0.0/16]\n") + " readIps: [127.0.0.0/16]\n") require.Equal(t, true, ok) defer p.close() @@ -219,7 +257,7 @@ func TestClientRTSPAuth(t *testing.T) { "-c", "copy", "-f", "rtsp", "-rtsp_transport", "udp", - "rtsp://" + ownDockerIP + ":8554/test/stream", + "rtsp://localhost:8554/test/stream", }) require.NoError(t, err) defer cnt1.close() @@ -229,7 +267,7 @@ func TestClientRTSPAuth(t *testing.T) { if soft == "ffmpeg" { cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "udp", - "-i", "rtsp://testuser:test!$()*+.;<=>[]^_-{}@" + ownDockerIP + ":8554/test/stream", + "-i", "rtsp://testuser:test!$()*+.;<=>[]^_-{}@127.0.0.1:8554/test/stream", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -240,7 +278,7 @@ func TestClientRTSPAuth(t *testing.T) { } else { cnt2, err := newContainer("vlc", "dest", []string{ - "rtsp://testuser:test!$()*+.;<=>[]^_-{}@" + ownDockerIP + ":8554/test/stream", + "rtsp://testuser:test!$()*+.;<=>[]^_-{}@localhost:8554/test/stream", }) require.NoError(t, err) defer cnt2.close() @@ -266,14 +304,14 @@ func TestClientRTSPAuth(t *testing.T) { "-c", "copy", "-f", "rtsp", "-rtsp_transport", "udp", - "rtsp://" + ownDockerIP + ":8554/test/stream", + "rtsp://localhost:8554/test/stream", }) require.NoError(t, err) defer cnt1.close() cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "udp", - "-i", "rtsp://testuser:testpass@" + ownDockerIP + ":8554/test/stream", + "-i", "rtsp://testuser:testpass@localhost:8554/test/stream", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -320,7 +358,7 @@ func TestClientRTSPAuthFail(t *testing.T) { require.NoError(t, err) _, err = gortsplib.DialPublish( - "rtsp://"+ca.user+":"+ca.pass+"@"+ownDockerIP+":8554/test/stream", + "rtsp://"+ca.user+":"+ca.pass+"@localhost:8554/test/stream", gortsplib.Tracks{track}, ) require.Equal(t, "wrong status code: 401 (Unauthorized)", err.Error()) @@ -359,7 +397,7 @@ func TestClientRTSPAuthFail(t *testing.T) { defer p.close() _, err := gortsplib.DialRead( - "rtsp://" + ca.user + ":" + ca.pass + "@" + ownDockerIP + ":8554/test/stream", + "rtsp://" + ca.user + ":" + ca.pass + "@localhost:8554/test/stream", ) require.Equal(t, "wrong status code: 401 (Unauthorized)", err.Error()) }) @@ -370,7 +408,7 @@ func TestClientRTSPAuthFail(t *testing.T) { "hlsDisable: yes\n" + "paths:\n" + " all:\n" + - " publishIps: [127.0.0.1/32]\n") + " publishIps: [128.0.0.1/32]\n") require.Equal(t, true, ok) defer p.close() @@ -378,7 +416,7 @@ func TestClientRTSPAuthFail(t *testing.T) { require.NoError(t, err) _, err = gortsplib.DialPublish( - "rtsp://"+ownDockerIP+":8554/test/stream", + "rtsp://localhost:8554/test/stream", gortsplib.Tracks{track}, ) require.Equal(t, "wrong status code: 401 (Unauthorized)", err.Error()) @@ -403,14 +441,14 @@ func TestClientRTSPAutomaticProtocol(t *testing.T) { "-i", "emptyvideo.mkv", "-c", "copy", "-f", "rtsp", - "rtsp://" + ownDockerIP + ":8554/teststream", + "rtsp://localhost:8554/teststream", }) require.NoError(t, err) defer cnt1.close() } cnt2, err := newContainer("ffmpeg", "dest", []string{ - "-i", "rtsp://" + ownDockerIP + ":8554/teststream", + "-i", "rtsp://localhost:8554/teststream", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -442,12 +480,12 @@ func TestClientRTSPPublisherOverride(t *testing.T) { track, err := gortsplib.NewTrackH264(68, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}) require.NoError(t, err) - s1, err := gortsplib.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", + s1, err := gortsplib.DialPublish("rtsp://localhost:8554/teststream", gortsplib.Tracks{track}) require.NoError(t, err) defer s1.Close() - s2, err := gortsplib.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", + s2, err := gortsplib.DialPublish("rtsp://localhost:8554/teststream", gortsplib.Tracks{track}) if ca == "enabled" { require.NoError(t, err) @@ -456,7 +494,7 @@ func TestClientRTSPPublisherOverride(t *testing.T) { require.Error(t, err) } - d1, err := gortsplib.DialRead("rtsp://" + ownDockerIP + ":8554/teststream") + d1, err := gortsplib.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) defer d1.Close() @@ -465,12 +503,14 @@ func TestClientRTSPPublisherOverride(t *testing.T) { go func() { defer close(readDone) d1.ReadFrames(func(trackID int, streamType base.StreamType, payload []byte) { - if ca == "enabled" { - require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, payload) - } else { - require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) + if streamType == gortsplib.StreamTypeRTP { + if ca == "enabled" { + require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, payload) + } else { + require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) + } + close(frameRecv) } - close(frameRecv) }) }() @@ -508,19 +548,19 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) { require.NoError(t, err) client := &gortsplib.Client{ - StreamProtocol: func() *gortsplib.StreamProtocol { - v := gortsplib.StreamProtocolTCP + StreamProtocol: func() *base.StreamProtocol { + v := base.StreamProtocolTCP return &v }(), ReadBufferSize: 4500, } - source, err := client.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", + source, err := client.DialPublish("rtsp://localhost:8554/teststream", gortsplib.Tracks{track}) require.NoError(t, err) defer source.Close() - dest, err := client.DialRead("rtsp://" + ownDockerIP + ":8554/teststream") + dest, err := client.DialRead("rtsp://localhost:8554/teststream") require.NoError(t, err) defer dest.Close() @@ -558,14 +598,14 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) { require.NoError(t, err) client := &gortsplib.Client{ - StreamProtocol: func() *gortsplib.StreamProtocol { - v := gortsplib.StreamProtocolTCP + StreamProtocol: func() *base.StreamProtocol { + v := base.StreamProtocolTCP return &v }(), ReadBufferSize: 4500, } - source, err := client.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", + source, err := client.DialPublish("rtsp://localhost:8554/teststream", gortsplib.Tracks{track}) require.NoError(t, err) defer source.Close() @@ -577,14 +617,14 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) { "rtspAddress: :8555\n" + "paths:\n" + " teststream:\n" + - " source: rtsp://" + ownDockerIP + ":8554/teststream\n" + + " source: rtsp://localhost:8554/teststream\n" + " sourceProtocol: tcp\n") require.Equal(t, true, ok) defer p2.close() time.Sleep(100 * time.Millisecond) - dest, err := client.DialRead("rtsp://" + ownDockerIP + ":8555/teststream") + dest, err := client.DialRead("rtsp://localhost:8555/teststream") require.NoError(t, err) defer dest.Close() @@ -611,176 +651,13 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) { }) } -func TestClientRTSPAdditionalInfos(t *testing.T) { - getInfos := func() (*headers.RTPInfo, []*uint32, error) { - u, err := base.ParseURL("rtsp://" + ownDockerIP + ":8554/teststream") - if err != nil { - return nil, nil, err - } - - conn, err := gortsplib.Dial(u.Scheme, u.Host) - if err != nil { - return nil, nil, err - } - defer conn.Close() - - tracks, _, err := conn.Describe(u) - if err != nil { - return nil, nil, err - } - - ssrcs := make([]*uint32, len(tracks)) - - for i, t := range tracks { - res, err := conn.Setup(headers.TransportModePlay, t, 0, 0) - if err != nil { - return nil, nil, err - } - - var th headers.Transport - err = th.Read(res.Header["Transport"]) - if err != nil { - return nil, nil, err - } - - ssrcs[i] = th.SSRC - } - - res, err := conn.Play(nil) - if err != nil { - return nil, nil, err - } - - var ri headers.RTPInfo - err = ri.Read(res.Header["RTP-Info"]) - if err != nil { - return nil, nil, err - } - - return &ri, ssrcs, nil - } - - p, ok := testProgram("rtmpDisable: yes\n" + - "hlsDisable: yes\n") - require.Equal(t, true, ok) - defer p.close() - - track1, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) - require.NoError(t, err) - - track2, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) - require.NoError(t, err) - - source, err := gortsplib.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", - gortsplib.Tracks{track1, track2}) - require.NoError(t, err) - defer source.Close() - - pkt := rtp.Packet{ - Header: rtp.Header{ - Version: 0x80, - PayloadType: 96, - SequenceNumber: 556, - Timestamp: 984512368, - SSRC: 96342362, - }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, - } - - buf, err := pkt.Marshal() - require.NoError(t, err) - - err = source.WriteFrame(track1.ID, gortsplib.StreamTypeRTP, buf) - require.NoError(t, err) - - rtpInfo, ssrcs, err := getInfos() - require.NoError(t, err) - require.Equal(t, &headers.RTPInfo{ - &headers.RTPInfoEntry{ - URL: (&base.URL{ - Scheme: "rtsp", - Host: ownDockerIP + ":8554", - Path: "/teststream/trackID=0", - }).String(), - SequenceNumber: func() *uint16 { - v := uint16(556) - return &v - }(), - Timestamp: (*rtpInfo)[0].Timestamp, - }, - }, rtpInfo) - require.Equal(t, []*uint32{ - func() *uint32 { - v := uint32(96342362) - return &v - }(), - nil, - }, ssrcs) - - pkt = rtp.Packet{ - Header: rtp.Header{ - Version: 0x80, - PayloadType: 96, - SequenceNumber: 87, - Timestamp: 756436454, - SSRC: 536474323, - }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, - } - - buf, err = pkt.Marshal() - require.NoError(t, err) - - err = source.WriteFrame(track2.ID, gortsplib.StreamTypeRTP, buf) - require.NoError(t, err) - - rtpInfo, ssrcs, err = getInfos() - require.NoError(t, err) - require.Equal(t, &headers.RTPInfo{ - &headers.RTPInfoEntry{ - URL: (&base.URL{ - Scheme: "rtsp", - Host: ownDockerIP + ":8554", - Path: "/teststream/trackID=0", - }).String(), - SequenceNumber: func() *uint16 { - v := uint16(556) - return &v - }(), - Timestamp: (*rtpInfo)[0].Timestamp, - }, - &headers.RTPInfoEntry{ - URL: (&base.URL{ - Scheme: "rtsp", - Host: ownDockerIP + ":8554", - Path: "/teststream/trackID=1", - }).String(), - SequenceNumber: func() *uint16 { - v := uint16(87) - return &v - }(), - Timestamp: (*rtpInfo)[1].Timestamp, - }, - }, rtpInfo) - require.Equal(t, []*uint32{ - func() *uint32 { - v := uint32(96342362) - return &v - }(), - func() *uint32 { - v := uint32(536474323) - return &v - }(), - }, ssrcs) -} - func TestClientRTSPRedirect(t *testing.T) { p1, ok := testProgram("rtmpDisable: yes\n" + "hlsDisable: yes\n" + "paths:\n" + " path1:\n" + " source: redirect\n" + - " sourceRedirect: rtsp://" + ownDockerIP + ":8554/path2\n" + + " sourceRedirect: rtsp://localhost:8554/path2\n" + " path2:\n") require.Equal(t, true, ok) defer p1.close() @@ -792,7 +669,7 @@ func TestClientRTSPRedirect(t *testing.T) { "-c", "copy", "-f", "rtsp", "-rtsp_transport", "udp", - "rtsp://" + ownDockerIP + ":8554/path2", + "rtsp://localhost:8554/path2", }) require.NoError(t, err) defer cnt1.close() @@ -801,7 +678,7 @@ func TestClientRTSPRedirect(t *testing.T) { cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "udp", - "-i", "rtsp://" + ownDockerIP + ":8554/path1", + "-i", "rtsp://localhost:8554/path1", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -819,7 +696,7 @@ func TestClientRTSPFallback(t *testing.T) { t.Run(ca, func(t *testing.T) { val := func() string { if ca == "absolute" { - return "rtsp://" + ownDockerIP + ":8554/path2" + return "rtsp://localhost:8554/path2" } return "/path2" }() @@ -840,7 +717,7 @@ func TestClientRTSPFallback(t *testing.T) { "-c", "copy", "-f", "rtsp", "-rtsp_transport", "udp", - "rtsp://" + ownDockerIP + ":8554/path2", + "rtsp://localhost:8554/path2", }) require.NoError(t, err) defer cnt1.close() @@ -849,7 +726,7 @@ func TestClientRTSPFallback(t *testing.T) { cnt2, err := newContainer("ffmpeg", "dest", []string{ "-rtsp_transport", "udp", - "-i", "rtsp://" + ownDockerIP + ":8554/path1", + "-i", "rtsp://localhost:8554/path1", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -954,7 +831,7 @@ wait Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": headers.Transport{ - Protocol: gortsplib.StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v @@ -1007,7 +884,7 @@ wait Header: base.Header{ "CSeq": base.HeaderValue{"1"}, "Transport": headers.Transport{ - Protocol: gortsplib.StreamProtocolTCP, + Protocol: base.StreamProtocolTCP, Delivery: func() *base.StreamDelivery { v := base.StreamDeliveryUnicast return &v diff --git a/main_rtspsource_test.go b/main_rtspsource_test.go index ef18e11f..86512b47 100644 --- a/main_rtspsource_test.go +++ b/main_rtspsource_test.go @@ -16,10 +16,12 @@ type testServer struct { user string pass string authValidator *auth.Validator - done chan struct{} + stream *gortsplib.ServerStream + + done chan struct{} } -func (sh *testServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) { +func (sh *testServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { if sh.authValidator == nil { sh.authValidator = auth.NewValidator(sh.user, sh.pass, nil) } @@ -35,26 +37,27 @@ func (sh *testServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*ba } track, _ := gortsplib.NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x05, 0x06}) + sh.stream = gortsplib.NewServerStream(gortsplib.Tracks{track}) return &base.Response{ StatusCode: base.StatusOK, - }, gortsplib.Tracks{track}.Write(), nil + }, sh.stream, nil } -func (sh *testServer) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *uint32, error) { +func (sh *testServer) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { if sh.done != nil { close(sh.done) } return &base.Response{ StatusCode: base.StatusOK, - }, nil, nil + }, sh.stream, nil } func (sh *testServer) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { go func() { time.Sleep(1 * time.Second) - ctx.Session.WriteFrame(0, gortsplib.StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) + sh.stream.WriteFrame(0, gortsplib.StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) }() return &base.Response{ @@ -125,8 +128,10 @@ func TestSourceRTSP(t *testing.T) { go func() { defer close(readDone) conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { - require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) - close(received) + if streamType == gortsplib.StreamTypeRTP { + require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) + close(received) + } }) }() diff --git a/main_test.go b/main_test.go index eeda88c3..3e48c6bc 100644 --- a/main_test.go +++ b/main_test.go @@ -2,7 +2,6 @@ package main import ( "io/ioutil" - "net" "os" "os/exec" "path/filepath" @@ -13,41 +12,6 @@ import ( "github.com/stretchr/testify/require" ) -var ownDockerIP = func() string { - out, err := exec.Command("docker", "network", "inspect", "bridge", - "-f", "{{range .IPAM.Config}}{{.Subnet}}{{end}}").Output() - if err != nil { - panic(err) - } - - _, ipnet, err := net.ParseCIDR(string(out[:len(out)-1])) - if err != nil { - panic(err) - } - - ifaces, err := net.Interfaces() - if err != nil { - panic(err) - } - - for _, i := range ifaces { - addrs, err := i.Addrs() - if err != nil { - continue - } - - for _, addr := range addrs { - if v, ok := addr.(*net.IPNet); ok { - if ipnet.Contains(v.IP) { - return v.IP.String() - } - } - } - } - - panic("IP not found") -}() - type container struct { name string } @@ -60,8 +24,10 @@ func newContainer(image string, name string, args []string) (*container, error) exec.Command("docker", "kill", "rtsp-simple-server-test-"+name).Run() exec.Command("docker", "wait", "rtsp-simple-server-test-"+name).Run() + // --network=host is needed to test multicast cmd := []string{ "docker", "run", + "--network=host", "--name=rtsp-simple-server-test-" + name, "rtsp-simple-server-test-" + image, } @@ -203,7 +169,7 @@ func TestHotReloading(t *testing.T) { func() { cnt1, err := newContainer("ffmpeg", "dest", []string{ - "-i", "rtsp://" + ownDockerIP + ":8554/test1", + "-i", "rtsp://localhost:8554/test1", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -226,7 +192,7 @@ func TestHotReloading(t *testing.T) { func() { cnt1, err := newContainer("ffmpeg", "dest", []string{ - "-i", "rtsp://" + ownDockerIP + ":8554/test1", + "-i", "rtsp://localhost:8554/test1", "-vframes", "1", "-f", "image2", "-y", "/dev/null", @@ -238,7 +204,7 @@ func TestHotReloading(t *testing.T) { func() { cnt1, err := newContainer("ffmpeg", "dest", []string{ - "-i", "rtsp://" + ownDockerIP + ":8554/test2", + "-i", "rtsp://localhost:8554/test2", "-vframes", "1", "-f", "image2", "-y", "/dev/null", diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index bed3e5c8..af480f0a 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -43,9 +43,10 @@ rtspDisable: no # supported RTSP stream protocols. # UDP is the most performant, but can cause problems if there's a NAT between # server and clients, and doesn't support encryption. +# UDP-multicast allows to save bandwidth when clients are all in the same LAN. # TCP is the most versatile, and does support encryption. # The handshake is always performed with TCP. -protocols: [udp, tcp] +protocols: [udp, multicast, tcp] # encrypt handshake and TCP streams with TLS (RTSPS). # available values are "no", "strict", "optional". encryption: no