update gortsplib

This commit is contained in:
aler9 2021-03-14 17:27:04 +01:00
parent c1862b3228
commit 6e64b4be22
7 changed files with 55 additions and 97 deletions

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210313202643-32c10cfb66cd
github.com/aler9/gortsplib v0.0.0-20210314154849-d902b7da9320
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum
View File

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210313202643-32c10cfb66cd h1:+9AYNCIlkuZF3d3OOqDRC9D+bLdyrDPiVi8q+gmq8mQ=
github.com/aler9/gortsplib v0.0.0-20210313202643-32c10cfb66cd/go.mod h1:aj4kDzanb3JZ46sFywWShcsnqqXTLE/3PNjwDhQZGM0=
github.com/aler9/gortsplib v0.0.0-20210314154849-d902b7da9320 h1:/WOt00YtNY2eWQy+MZ+LkkP7+XMviepr4yLCrK5PzhE=
github.com/aler9/gortsplib v0.0.0-20210314154849-d902b7da9320/go.mod h1:aj4kDzanb3JZ46sFywWShcsnqqXTLE/3PNjwDhQZGM0=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d h1:LAX8pNvYpGgFpKdbPpEZWjNkHbmyvjMrT3vO7s7aaKU=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

View File

@ -254,7 +254,7 @@ func (c *Client) run() {
}, nil
}
onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) {
onSetup := func(req *base.Request, th *headers.Transport, reqPath string, trackID int) (*base.Response, error) {
if th.Protocol == gortsplib.StreamProtocolUDP {
if _, ok := c.protocols[gortsplib.StreamProtocolUDP]; !ok {
return &base.Response{
@ -271,26 +271,6 @@ func (c *Client) run() {
switch c.conn.State() {
case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play
pathAndQuery, ok := req.URL.RTSPPathAndQuery()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid path (%s)", req.URL)
}
_, pathAndQuery, ok = base.PathSplitControlAttribute(pathAndQuery)
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid path (%s)", req.URL)
}
reqPath, _ := base.PathSplitQuery(pathAndQuery)
// path can end with a slash, remove it
// this is needed to support reading mpegts with ffmpeg
reqPath = strings.TrimSuffix(reqPath, "/")
if c.path != nil && reqPath != c.path.Name() {
return &base.Response{
StatusCode: base.StatusBadRequest,
@ -333,21 +313,6 @@ func (c *Client) run() {
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("track %d does not exist", trackID)
}
default: // record
reqPathAndQuery, ok := req.URL.RTSPPathAndQuery()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid path (%s)", req.URL)
}
if !strings.HasPrefix(reqPathAndQuery, c.path.Name()) {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid path: must begin with '%s', but is '%s'",
c.path.Name(), reqPathAndQuery)
}
}
return &base.Response{

View File

@ -62,11 +62,8 @@ func (e *Cmd) run() {
return false
}
t := time.NewTimer(retryPause)
defer t.Stop()
select {
case <-t.C:
case <-time.After(retryPause):
return true
case <-e.terminate:
return false

View File

@ -38,10 +38,10 @@ type source interface {
IsSource()
}
// sourceExternal is implemented by all source*.
type sourceExternal interface {
// extSource is implemented by all external sources.
type extSource interface {
IsSource()
IsSourceExternal()
IsExtSource()
Close()
}
@ -49,6 +49,10 @@ type sourceRedirect struct{}
func (*sourceRedirect) IsSource() {}
type extSourceSetReadyReq struct {
tracks gortsplib.Tracks
}
type clientState int
const (
@ -100,16 +104,16 @@ type Path struct {
closeTimerStarted bool
// in
sourceSetReady chan struct{} // from source
sourceSetNotReady chan struct{} // from source
clientDescribe chan client.DescribeReq
clientSetupPlay chan client.SetupPlayReq
clientAnnounce chan client.AnnounceReq
clientPlay chan client.PlayReq
clientRecord chan client.RecordReq
clientPause chan client.PauseReq
clientRemove chan client.RemoveReq
terminate chan struct{}
extSourceSetReady chan extSourceSetReadyReq // from external source
extSourceSetNotReady chan struct{} // from external source
clientDescribe chan client.DescribeReq
clientSetupPlay chan client.SetupPlayReq
clientAnnounce chan client.AnnounceReq
clientPlay chan client.PlayReq
clientRecord chan client.RecordReq
clientPause chan client.PauseReq
clientRemove chan client.RemoveReq
terminate chan struct{}
}
// New allocates a Path.
@ -144,8 +148,8 @@ func New(
sourceCloseTimer: newEmptyTimer(),
runOnDemandCloseTimer: newEmptyTimer(),
closeTimer: newEmptyTimer(),
sourceSetReady: make(chan struct{}),
sourceSetNotReady: make(chan struct{}),
extSourceSetReady: make(chan extSourceSetReadyReq),
extSourceSetNotReady: make(chan struct{}),
clientDescribe: make(chan client.DescribeReq),
clientSetupPlay: make(chan client.SetupPlayReq),
clientAnnounce: make(chan client.AnnounceReq),
@ -213,7 +217,7 @@ outer:
case <-pa.sourceCloseTimer.C:
pa.sourceCloseTimerStarted = false
pa.source.(sourceExternal).Close()
pa.source.(extSource).Close()
pa.source = nil
pa.scheduleClose()
@ -232,10 +236,11 @@ outer:
<-pa.terminate
break outer
case <-pa.sourceSetReady:
case req := <-pa.extSourceSetReady:
pa.sourceTracks = req.tracks
pa.onSourceSetReady()
case <-pa.sourceSetNotReady:
case <-pa.extSourceSetNotReady:
pa.onSourceSetNotReady()
case req := <-pa.clientDescribe:
@ -290,7 +295,7 @@ outer:
onInitCmd.Close()
}
if source, ok := pa.source.(sourceExternal); ok {
if source, ok := pa.source.(extSource); ok {
source.Close()
}
pa.sourceWg.Wait()
@ -323,8 +328,8 @@ outer:
}
pa.clientsWg.Wait()
close(pa.sourceSetReady)
close(pa.sourceSetNotReady)
close(pa.extSourceSetReady)
close(pa.extSourceSetNotReady)
close(pa.clientDescribe)
close(pa.clientSetupPlay)
close(pa.clientAnnounce)
@ -338,12 +343,12 @@ func (pa *Path) exhaustChannels() {
go func() {
for {
select {
case _, ok := <-pa.sourceSetReady:
case _, ok := <-pa.extSourceSetReady:
if !ok {
return
}
case _, ok := <-pa.sourceSetNotReady:
case _, ok := <-pa.extSourceSetNotReady:
if !ok {
return
}
@ -487,13 +492,13 @@ func (pa *Path) removeClient(c client.Client) {
}
func (pa *Path) onSourceSetReady() {
pa.sourceState = sourceStateReady
if pa.sourceState == sourceStateWaitingDescribe {
pa.describeTimer.Stop()
pa.describeTimer = newEmptyTimer()
}
pa.sourceState = sourceStateReady
for _, req := range pa.describeRequests {
req.Res <- client.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet
}
@ -792,15 +797,14 @@ func (pa *Path) Name() string {
return pa.name
}
// OnSourceSetReady is called by a source.
func (pa *Path) OnSourceSetReady(tracks gortsplib.Tracks) {
pa.sourceTracks = tracks
pa.sourceSetReady <- struct{}{}
// OnExtSourceSetReady is called by a external source.
func (pa *Path) OnExtSourceSetReady(tracks gortsplib.Tracks) {
pa.extSourceSetReady <- extSourceSetReadyReq{tracks}
}
// OnSourceSetNotReady is called by a source.
func (pa *Path) OnSourceSetNotReady() {
pa.sourceSetNotReady <- struct{}{}
// OnExtSourceSetNotReady is called by a external source.
func (pa *Path) OnExtSourceSetNotReady() {
pa.extSourceSetNotReady <- struct{}{}
}
// OnPathManDescribe is called by pathman.PathMan.

View File

@ -26,8 +26,8 @@ const (
// Parent is implemeneted by path.Path.
type Parent interface {
Log(logger.Level, string, ...interface{})
OnSourceSetReady(gortsplib.Tracks)
OnSourceSetNotReady()
OnExtSourceSetReady(gortsplib.Tracks)
OnExtSourceSetNotReady()
OnFrame(int, gortsplib.StreamType, []byte)
}
@ -76,8 +76,8 @@ func (s *Source) Close() {
// IsSource implements path.source.
func (s *Source) IsSource() {}
// IsSourceExternal implements path.sourceExternal.
func (s *Source) IsSourceExternal() {}
// IsExtSource implements path.extSource.
func (s *Source) IsExtSource() {}
func (s *Source) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rtmp source] "+format, args...)
@ -93,11 +93,8 @@ func (s *Source) run() {
return false
}
t := time.NewTimer(retryPause)
defer t.Stop()
select {
case <-t.C:
case <-time.After(retryPause):
return true
case <-s.terminate:
return false
@ -176,8 +173,8 @@ func (s *Source) runInner() bool {
}
s.log(logger.Info, "ready")
s.parent.OnSourceSetReady(tracks)
defer s.parent.OnSourceSetNotReady()
s.parent.OnExtSourceSetReady(tracks)
defer s.parent.OnExtSourceSetNotReady()
readerDone := make(chan error)
go func() {

View File

@ -19,8 +19,8 @@ const (
// Parent is implemented by path.Path.
type Parent interface {
Log(logger.Level, string, ...interface{})
OnSourceSetReady(gortsplib.Tracks)
OnSourceSetNotReady()
OnExtSourceSetReady(gortsplib.Tracks)
OnExtSourceSetNotReady()
OnFrame(int, gortsplib.StreamType, []byte)
}
@ -81,8 +81,8 @@ func (s *Source) Close() {
// IsSource implements path.source.
func (s *Source) IsSource() {}
// IsSourceExternal implements path.sourceExternal.
func (s *Source) IsSourceExternal() {}
// IsExtSource implements path.extSource.
func (s *Source) IsExtSource() {}
func (s *Source) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rtsp source] "+format, args...)
@ -98,11 +98,8 @@ func (s *Source) run() {
return false
}
t := time.NewTimer(retryPause)
defer t.Stop()
select {
case <-t.C:
case <-time.After(retryPause):
return true
case <-s.terminate:
return false
@ -150,11 +147,9 @@ func (s *Source) runInner() bool {
return true
}
tracks := conn.Tracks()
s.log(logger.Info, "ready")
s.parent.OnSourceSetReady(tracks)
defer s.parent.OnSourceSetNotReady()
s.parent.OnExtSourceSetReady(conn.Tracks())
defer s.parent.OnExtSourceSetNotReady()
done := conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) {
s.parent.OnFrame(trackID, streamType, payload)