From 6e64b4be222ddb91d24cb934259018fff8bff5f1 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 14 Mar 2021 17:27:04 +0100 Subject: [PATCH] update gortsplib --- go.mod | 2 +- go.sum | 4 +-- internal/clientrtsp/client.go | 37 +------------------ internal/externalcmd/cmd.go | 5 +-- internal/path/path.go | 68 ++++++++++++++++++----------------- internal/sourcertmp/source.go | 17 ++++----- internal/sourcertsp/source.go | 19 ++++------ 7 files changed, 55 insertions(+), 97 deletions(-) diff --git a/go.mod b/go.mod index 23283e56..46ff8c1c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210313202643-32c10cfb66cd + github.com/aler9/gortsplib v0.0.0-20210314154849-d902b7da9320 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 diff --git a/go.sum b/go.sum index 63e86835..c48e94d1 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20210313202643-32c10cfb66cd h1:+9AYNCIlkuZF3d3OOqDRC9D+bLdyrDPiVi8q+gmq8mQ= -github.com/aler9/gortsplib v0.0.0-20210313202643-32c10cfb66cd/go.mod h1:aj4kDzanb3JZ46sFywWShcsnqqXTLE/3PNjwDhQZGM0= +github.com/aler9/gortsplib v0.0.0-20210314154849-d902b7da9320 h1:/WOt00YtNY2eWQy+MZ+LkkP7+XMviepr4yLCrK5PzhE= +github.com/aler9/gortsplib v0.0.0-20210314154849-d902b7da9320/go.mod h1:aj4kDzanb3JZ46sFywWShcsnqqXTLE/3PNjwDhQZGM0= github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d h1:LAX8pNvYpGgFpKdbPpEZWjNkHbmyvjMrT3vO7s7aaKU= github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= diff --git a/internal/clientrtsp/client.go b/internal/clientrtsp/client.go index cbf71d45..ffeacbf1 100644 --- a/internal/clientrtsp/client.go +++ b/internal/clientrtsp/client.go @@ -254,7 +254,7 @@ func (c *Client) run() { }, nil } - onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) { + onSetup := func(req *base.Request, th *headers.Transport, reqPath string, trackID int) (*base.Response, error) { if th.Protocol == gortsplib.StreamProtocolUDP { if _, ok := c.protocols[gortsplib.StreamProtocolUDP]; !ok { return &base.Response{ @@ -271,26 +271,6 @@ func (c *Client) run() { switch c.conn.State() { case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play - pathAndQuery, ok := req.URL.RTSPPathAndQuery() - if !ok { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid path (%s)", req.URL) - } - - _, pathAndQuery, ok = base.PathSplitControlAttribute(pathAndQuery) - if !ok { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid path (%s)", req.URL) - } - - reqPath, _ := base.PathSplitQuery(pathAndQuery) - - // path can end with a slash, remove it - // this is needed to support reading mpegts with ffmpeg - reqPath = strings.TrimSuffix(reqPath, "/") - if c.path != nil && reqPath != c.path.Name() { return &base.Response{ StatusCode: base.StatusBadRequest, @@ -333,21 +313,6 @@ func (c *Client) run() { StatusCode: base.StatusBadRequest, }, fmt.Errorf("track %d does not exist", trackID) } - - default: // record - reqPathAndQuery, ok := req.URL.RTSPPathAndQuery() - if !ok { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid path (%s)", req.URL) - } - - if !strings.HasPrefix(reqPathAndQuery, c.path.Name()) { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid path: must begin with '%s', but is '%s'", - c.path.Name(), reqPathAndQuery) - } } return &base.Response{ diff --git a/internal/externalcmd/cmd.go b/internal/externalcmd/cmd.go index c0ae38af..ccc503ff 100644 --- a/internal/externalcmd/cmd.go +++ b/internal/externalcmd/cmd.go @@ -62,11 +62,8 @@ func (e *Cmd) run() { return false } - t := time.NewTimer(retryPause) - defer t.Stop() - select { - case <-t.C: + case <-time.After(retryPause): return true case <-e.terminate: return false diff --git a/internal/path/path.go b/internal/path/path.go index 82c50ad0..51315e58 100644 --- a/internal/path/path.go +++ b/internal/path/path.go @@ -38,10 +38,10 @@ type source interface { IsSource() } -// sourceExternal is implemented by all source*. -type sourceExternal interface { +// extSource is implemented by all external sources. +type extSource interface { IsSource() - IsSourceExternal() + IsExtSource() Close() } @@ -49,6 +49,10 @@ type sourceRedirect struct{} func (*sourceRedirect) IsSource() {} +type extSourceSetReadyReq struct { + tracks gortsplib.Tracks +} + type clientState int const ( @@ -100,16 +104,16 @@ type Path struct { closeTimerStarted bool // in - sourceSetReady chan struct{} // from source - sourceSetNotReady chan struct{} // from source - clientDescribe chan client.DescribeReq - clientSetupPlay chan client.SetupPlayReq - clientAnnounce chan client.AnnounceReq - clientPlay chan client.PlayReq - clientRecord chan client.RecordReq - clientPause chan client.PauseReq - clientRemove chan client.RemoveReq - terminate chan struct{} + extSourceSetReady chan extSourceSetReadyReq // from external source + extSourceSetNotReady chan struct{} // from external source + clientDescribe chan client.DescribeReq + clientSetupPlay chan client.SetupPlayReq + clientAnnounce chan client.AnnounceReq + clientPlay chan client.PlayReq + clientRecord chan client.RecordReq + clientPause chan client.PauseReq + clientRemove chan client.RemoveReq + terminate chan struct{} } // New allocates a Path. @@ -144,8 +148,8 @@ func New( sourceCloseTimer: newEmptyTimer(), runOnDemandCloseTimer: newEmptyTimer(), closeTimer: newEmptyTimer(), - sourceSetReady: make(chan struct{}), - sourceSetNotReady: make(chan struct{}), + extSourceSetReady: make(chan extSourceSetReadyReq), + extSourceSetNotReady: make(chan struct{}), clientDescribe: make(chan client.DescribeReq), clientSetupPlay: make(chan client.SetupPlayReq), clientAnnounce: make(chan client.AnnounceReq), @@ -213,7 +217,7 @@ outer: case <-pa.sourceCloseTimer.C: pa.sourceCloseTimerStarted = false - pa.source.(sourceExternal).Close() + pa.source.(extSource).Close() pa.source = nil pa.scheduleClose() @@ -232,10 +236,11 @@ outer: <-pa.terminate break outer - case <-pa.sourceSetReady: + case req := <-pa.extSourceSetReady: + pa.sourceTracks = req.tracks pa.onSourceSetReady() - case <-pa.sourceSetNotReady: + case <-pa.extSourceSetNotReady: pa.onSourceSetNotReady() case req := <-pa.clientDescribe: @@ -290,7 +295,7 @@ outer: onInitCmd.Close() } - if source, ok := pa.source.(sourceExternal); ok { + if source, ok := pa.source.(extSource); ok { source.Close() } pa.sourceWg.Wait() @@ -323,8 +328,8 @@ outer: } pa.clientsWg.Wait() - close(pa.sourceSetReady) - close(pa.sourceSetNotReady) + close(pa.extSourceSetReady) + close(pa.extSourceSetNotReady) close(pa.clientDescribe) close(pa.clientSetupPlay) close(pa.clientAnnounce) @@ -338,12 +343,12 @@ func (pa *Path) exhaustChannels() { go func() { for { select { - case _, ok := <-pa.sourceSetReady: + case _, ok := <-pa.extSourceSetReady: if !ok { return } - case _, ok := <-pa.sourceSetNotReady: + case _, ok := <-pa.extSourceSetNotReady: if !ok { return } @@ -487,13 +492,13 @@ func (pa *Path) removeClient(c client.Client) { } func (pa *Path) onSourceSetReady() { + pa.sourceState = sourceStateReady + if pa.sourceState == sourceStateWaitingDescribe { pa.describeTimer.Stop() pa.describeTimer = newEmptyTimer() } - pa.sourceState = sourceStateReady - for _, req := range pa.describeRequests { req.Res <- client.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet } @@ -792,15 +797,14 @@ func (pa *Path) Name() string { return pa.name } -// OnSourceSetReady is called by a source. -func (pa *Path) OnSourceSetReady(tracks gortsplib.Tracks) { - pa.sourceTracks = tracks - pa.sourceSetReady <- struct{}{} +// OnExtSourceSetReady is called by a external source. +func (pa *Path) OnExtSourceSetReady(tracks gortsplib.Tracks) { + pa.extSourceSetReady <- extSourceSetReadyReq{tracks} } -// OnSourceSetNotReady is called by a source. -func (pa *Path) OnSourceSetNotReady() { - pa.sourceSetNotReady <- struct{}{} +// OnExtSourceSetNotReady is called by a external source. +func (pa *Path) OnExtSourceSetNotReady() { + pa.extSourceSetNotReady <- struct{}{} } // OnPathManDescribe is called by pathman.PathMan. diff --git a/internal/sourcertmp/source.go b/internal/sourcertmp/source.go index 47ce02c2..2c8eb735 100644 --- a/internal/sourcertmp/source.go +++ b/internal/sourcertmp/source.go @@ -26,8 +26,8 @@ const ( // Parent is implemeneted by path.Path. type Parent interface { Log(logger.Level, string, ...interface{}) - OnSourceSetReady(gortsplib.Tracks) - OnSourceSetNotReady() + OnExtSourceSetReady(gortsplib.Tracks) + OnExtSourceSetNotReady() OnFrame(int, gortsplib.StreamType, []byte) } @@ -76,8 +76,8 @@ func (s *Source) Close() { // IsSource implements path.source. func (s *Source) IsSource() {} -// IsSourceExternal implements path.sourceExternal. -func (s *Source) IsSourceExternal() {} +// IsExtSource implements path.extSource. +func (s *Source) IsExtSource() {} func (s *Source) log(level logger.Level, format string, args ...interface{}) { s.parent.Log(level, "[rtmp source] "+format, args...) @@ -93,11 +93,8 @@ func (s *Source) run() { return false } - t := time.NewTimer(retryPause) - defer t.Stop() - select { - case <-t.C: + case <-time.After(retryPause): return true case <-s.terminate: return false @@ -176,8 +173,8 @@ func (s *Source) runInner() bool { } s.log(logger.Info, "ready") - s.parent.OnSourceSetReady(tracks) - defer s.parent.OnSourceSetNotReady() + s.parent.OnExtSourceSetReady(tracks) + defer s.parent.OnExtSourceSetNotReady() readerDone := make(chan error) go func() { diff --git a/internal/sourcertsp/source.go b/internal/sourcertsp/source.go index 3fea9c0d..7ce7afd5 100644 --- a/internal/sourcertsp/source.go +++ b/internal/sourcertsp/source.go @@ -19,8 +19,8 @@ const ( // Parent is implemented by path.Path. type Parent interface { Log(logger.Level, string, ...interface{}) - OnSourceSetReady(gortsplib.Tracks) - OnSourceSetNotReady() + OnExtSourceSetReady(gortsplib.Tracks) + OnExtSourceSetNotReady() OnFrame(int, gortsplib.StreamType, []byte) } @@ -81,8 +81,8 @@ func (s *Source) Close() { // IsSource implements path.source. func (s *Source) IsSource() {} -// IsSourceExternal implements path.sourceExternal. -func (s *Source) IsSourceExternal() {} +// IsExtSource implements path.extSource. +func (s *Source) IsExtSource() {} func (s *Source) log(level logger.Level, format string, args ...interface{}) { s.parent.Log(level, "[rtsp source] "+format, args...) @@ -98,11 +98,8 @@ func (s *Source) run() { return false } - t := time.NewTimer(retryPause) - defer t.Stop() - select { - case <-t.C: + case <-time.After(retryPause): return true case <-s.terminate: return false @@ -150,11 +147,9 @@ func (s *Source) runInner() bool { return true } - tracks := conn.Tracks() - s.log(logger.Info, "ready") - s.parent.OnSourceSetReady(tracks) - defer s.parent.OnSourceSetNotReady() + s.parent.OnExtSourceSetReady(conn.Tracks()) + defer s.parent.OnExtSourceSetNotReady() done := conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { s.parent.OnFrame(trackID, streamType, payload)