diff --git a/internal/conf/path.go b/internal/conf/path.go index cc8dc480..35282059 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -306,6 +306,10 @@ func (pconf *PathConf) fillAndCheck(name string) error { return fmt.Errorf("'runOnPublish' is useless when source is not 'record', since the stream is not provided by a publisher, but by a fixed source") } + if pconf.RunOnDemand != "" && pconf.Source != "record" { + return fmt.Errorf("'runOnDemand' can be used only when source is 'record'") + } + if pconf.RunOnDemandStartTimeout == 0 { pconf.RunOnDemandStartTimeout = 10 * time.Second } diff --git a/internal/core/path.go b/internal/core/path.go index e3e9e91b..efd93216 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -72,12 +72,13 @@ const ( pathReaderStatePlay ) -type pathSourceState int +type pathOnDemandState int const ( - pathSourceStateNotReady pathSourceState = iota - pathSourceStateCreating - pathSourceStateReady + pathOnDemandStateInitial pathOnDemandState = iota + pathOnDemandStateWaitingReady + pathOnDemandStateReady + pathOnDemandStateClosing ) type pathSourceStaticSetReadyReq struct { @@ -86,7 +87,8 @@ type pathSourceStaticSetReadyReq struct { } type pathSourceStaticSetNotReadyReq struct { - Res chan struct{} + Source sourceStatic + Res chan struct{} } type pathReaderRemoveReq struct { @@ -211,25 +213,21 @@ type path struct { stats *stats parent pathParent - ctx context.Context - ctxCancel func() - readers map[reader]pathReaderState - describeRequests []pathDescribeReq - setupPlayRequests []pathReaderSetupPlayReq - source source - sourceStaticWg sync.WaitGroup - stream *gortsplib.ServerStream - nonRTSPReaders *pathReadersMap - onDemandCmd *externalcmd.Cmd - onPublishCmd *externalcmd.Cmd - describeTimer *time.Timer - sourceCloseTimer *time.Timer - sourceCloseTimerStarted bool - sourceState pathSourceState - runOnDemandCloseTimer *time.Timer - runOnDemandCloseTimerStarted bool - closeTimer *time.Timer - closeTimerStarted bool + ctx context.Context + ctxCancel func() + source source + sourceReady bool + sourceStaticWg sync.WaitGroup + stream *gortsplib.ServerStream + readers map[reader]pathReaderState + describeRequests []pathDescribeReq + setupPlayRequests []pathReaderSetupPlayReq + nonRTSPReaders *pathReadersMap + onDemandCmd *externalcmd.Cmd + onPublishCmd *externalcmd.Cmd + onDemandReadyTimer *time.Timer + onDemandCloseTimer *time.Timer + onDemandState pathOnDemandState // in sourceStaticSetReady chan pathSourceStaticSetReadyReq @@ -276,10 +274,8 @@ func newPath( ctxCancel: ctxCancel, readers: make(map[reader]pathReaderState), nonRTSPReaders: newPathReadersMap(), - describeTimer: newEmptyTimer(), - sourceCloseTimer: newEmptyTimer(), - runOnDemandCloseTimer: newEmptyTimer(), - closeTimer: newEmptyTimer(), + onDemandReadyTimer: newEmptyTimer(), + onDemandCloseTimer: newEmptyTimer(), sourceStaticSetReady: make(chan pathSourceStaticSetReadyReq), sourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq), describe: make(chan pathDescribeReq), @@ -328,8 +324,8 @@ func (pa *path) run() { if pa.conf.Source == "redirect" { pa.source = &sourceRedirect{} - } else if pa.hasStaticSource() && !pa.conf.SourceOnDemand { - pa.startStaticSource() + } else if !pa.conf.SourceOnDemand && pa.hasStaticSource() { + pa.staticSourceCreate() } var onInitCmd *externalcmd.Cmd @@ -345,57 +341,55 @@ func (pa *path) run() { outer: for { select { - case <-pa.describeTimer.C: + case <-pa.onDemandReadyTimer.C: for _, req := range pa.describeRequests { - req.Res <- pathDescribeRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)} + req.Res <- pathDescribeRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)} } pa.describeRequests = nil for _, req := range pa.setupPlayRequests { - req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)} + req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)} } pa.setupPlayRequests = nil - // set state after removeReader(), so schedule* works once - pa.sourceState = pathSourceStateNotReady + pa.onDemandCloseSource() - pa.scheduleSourceClose() - pa.scheduleRunOnDemandClose() - pa.scheduleClose() + if pa.conf.Regexp != nil { + break outer + } - case <-pa.sourceCloseTimer.C: - pa.sourceCloseTimerStarted = false - pa.source.(sourceStatic).Close() - pa.source = nil + case <-pa.onDemandCloseTimer.C: + pa.onDemandCloseSource() - pa.scheduleClose() - - case <-pa.runOnDemandCloseTimer.C: - pa.runOnDemandCloseTimerStarted = false - pa.Log(logger.Info, "on demand command stopped") - pa.onDemandCmd.Close() - pa.onDemandCmd = nil - - pa.scheduleClose() - - case <-pa.closeTimer.C: - break outer + if pa.conf.Regexp != nil { + break outer + } case req := <-pa.sourceStaticSetReady: pa.stream = gortsplib.NewServerStream(req.Tracks) - pa.onSourceSetReady() + pa.sourceSetReady() close(req.Res) case req := <-pa.sourceStaticSetNotReady: - pa.onSourceSetNotReady() + if req.Source == pa.source { + pa.sourceSetNotReady() + } close(req.Res) + if pa.source == nil && pa.conf.Regexp != nil { + break outer + } + case req := <-pa.describe: pa.onDescribe(req) case req := <-pa.publisherRemove: pa.onPublisherRemove(req) + if pa.source == nil && pa.conf.Regexp != nil { + break outer + } + case req := <-pa.publisherAnnounce: pa.onPublisherAnnounce(req) @@ -405,6 +399,10 @@ outer: case req := <-pa.publisherPause: pa.onPublisherPause(req) + if pa.source == nil && pa.conf.Regexp != nil { + break outer + } + case req := <-pa.readerRemove: pa.onReaderRemove(req) @@ -424,10 +422,8 @@ outer: pa.ctxCancel() - pa.describeTimer.Stop() - pa.sourceCloseTimer.Stop() - pa.runOnDemandCloseTimer.Stop() - pa.closeTimer.Stop() + pa.onDemandReadyTimer.Stop() + pa.onDemandCloseTimer.Stop() if onInitCmd != nil { pa.Log(logger.Info, "on init command stopped") @@ -463,7 +459,7 @@ outer: source.Close() pa.sourceStaticWg.Wait() } else if source, ok := pa.source.(publisher); ok { - if pa.sourceState == pathSourceStateReady { + if pa.sourceReady { atomic.AddInt64(pa.stats.CountPublishers, -1) } source.Close() @@ -479,7 +475,111 @@ func (pa *path) hasStaticSource() bool { strings.HasPrefix(pa.conf.Source, "rtmp://") } -func (pa *path) startStaticSource() { +func (pa *path) isOnDemand() bool { + return (pa.hasStaticSource() && pa.conf.SourceOnDemand) || pa.conf.RunOnDemand != "" +} + +func (pa *path) onDemandStartSource() { + pa.onDemandReadyTimer.Stop() + if pa.hasStaticSource() { + pa.staticSourceCreate() + pa.onDemandReadyTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout) + + } else { + pa.Log(logger.Info, "on demand command started") + _, port, _ := net.SplitHostPort(pa.rtspAddress) + pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{ + Path: pa.name, + Port: port, + }) + pa.onDemandReadyTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout) + } + + pa.onDemandState = pathOnDemandStateWaitingReady +} + +func (pa *path) onDemandScheduleClose() { + pa.onDemandCloseTimer.Stop() + if pa.hasStaticSource() { + pa.onDemandCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter) + } else { + pa.onDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter) + } + + pa.onDemandState = pathOnDemandStateClosing +} + +func (pa *path) onDemandCloseSource() { + if pa.onDemandState == pathOnDemandStateClosing { + pa.onDemandCloseTimer.Stop() + pa.onDemandCloseTimer = newEmptyTimer() + } + + // set state before doPublisherRemove() + pa.onDemandState = pathOnDemandStateInitial + + if pa.hasStaticSource() { + pa.staticSourceDelete() + } else { + pa.Log(logger.Info, "on demand command stopped") + pa.onDemandCmd.Close() + pa.onDemandCmd = nil + + if pa.source != nil { + pa.source.(publisher).Close() + pa.doPublisherRemove() + } + } +} + +func (pa *path) sourceSetReady() { + pa.sourceReady = true + + if pa.isOnDemand() { + pa.onDemandReadyTimer.Stop() + pa.onDemandReadyTimer = newEmptyTimer() + + for _, req := range pa.describeRequests { + req.Res <- pathDescribeRes{ + Stream: pa.stream, + } + } + pa.describeRequests = nil + + for _, req := range pa.setupPlayRequests { + pa.onReaderSetupPlayPost(req) + } + pa.setupPlayRequests = nil + + if len(pa.readers) > 0 { + pa.onDemandState = pathOnDemandStateReady + } else { + pa.onDemandScheduleClose() + } + } + + pa.parent.OnPathSourceReady(pa) +} + +func (pa *path) sourceSetNotReady() { + pa.sourceReady = false + + if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial { + pa.onDemandCloseSource() + } + + if pa.onPublishCmd != nil { + pa.onPublishCmd.Close() + pa.onPublishCmd = nil + } + + for r := range pa.readers { + pa.doReaderRemove(r) + r.Close() + } +} + +func (pa *path) staticSourceCreate() { if strings.HasPrefix(pa.conf.Source, "rtsp://") || strings.HasPrefix(pa.conf.Source, "rtsps://") { pa.source = newRTSPSource( @@ -507,7 +607,17 @@ func (pa *path) startStaticSource() { } } -func (pa *path) removeReader(r reader) { +func (pa *path) staticSourceDelete() { + pa.sourceReady = false + + pa.source.(sourceStatic).Close() + pa.source = nil + + pa.stream.Close() + pa.stream = nil +} + +func (pa *path) doReaderRemove(r reader) { state := pa.readers[r] if state == pathReaderStatePlay { @@ -519,16 +629,12 @@ func (pa *path) removeReader(r reader) { } delete(pa.readers, r) - - pa.scheduleSourceClose() - pa.scheduleRunOnDemandClose() - pa.scheduleClose() } -func (pa *path) removePublisher(p publisher) { - if pa.sourceState == pathSourceStateReady { +func (pa *path) doPublisherRemove() { + if pa.sourceReady { atomic.AddInt64(pa.stats.CountPublishers, -1) - pa.onSourceSetNotReady() + pa.sourceSetNotReady() } pa.source = nil @@ -536,101 +642,12 @@ func (pa *path) removePublisher(p publisher) { pa.stream = nil for r := range pa.readers { - pa.removeReader(r) - r.Close() - } - - pa.scheduleSourceClose() - pa.scheduleRunOnDemandClose() - pa.scheduleClose() -} - -func (pa *path) fixedPublisherStart() { - if pa.hasStaticSource() { - // start on-demand source - if pa.source == nil { - pa.startStaticSource() - - if pa.sourceState != pathSourceStateCreating { - pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout) - pa.sourceState = pathSourceStateCreating - } - - // reset timer - } else if pa.sourceCloseTimerStarted { - pa.sourceCloseTimer.Stop() - pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter) - } - } - - if pa.conf.RunOnDemand != "" { - // start on-demand command - if pa.onDemandCmd == nil { - pa.Log(logger.Info, "on demand command started") - _, port, _ := net.SplitHostPort(pa.rtspAddress) - pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{ - Path: pa.name, - Port: port, - }) - - if pa.sourceState != pathSourceStateCreating { - pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout) - pa.sourceState = pathSourceStateCreating - } - - // reset timer - } else if pa.runOnDemandCloseTimerStarted { - pa.runOnDemandCloseTimer.Stop() - pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter) - } - } -} - -func (pa *path) onSourceSetReady() { - if pa.sourceState == pathSourceStateCreating { - pa.describeTimer.Stop() - pa.describeTimer = newEmptyTimer() - } - - pa.sourceState = pathSourceStateReady - - for _, req := range pa.describeRequests { - req.Res <- pathDescribeRes{ - Stream: pa.stream, - } - } - pa.describeRequests = nil - - for _, req := range pa.setupPlayRequests { - pa.onReaderSetupPlayPost(req) - } - pa.setupPlayRequests = nil - - pa.scheduleSourceClose() - pa.scheduleRunOnDemandClose() - pa.scheduleClose() - - pa.parent.OnPathSourceReady(pa) -} - -func (pa *path) onSourceSetNotReady() { - pa.sourceState = pathSourceStateNotReady - - if pa.onPublishCmd != nil { - pa.onPublishCmd.Close() - pa.onPublishCmd = nil - } - - for r := range pa.readers { - pa.removeReader(r) + pa.doReaderRemove(r) r.Close() } } func (pa *path) onDescribe(req pathDescribeReq) { - pa.fixedPublisherStart() - pa.scheduleClose() - if _, ok := pa.source.(*sourceRedirect); ok { req.Res <- pathDescribeRes{ Redirect: pa.conf.SourceRedirect, @@ -638,43 +655,44 @@ func (pa *path) onDescribe(req pathDescribeReq) { return } - switch pa.sourceState { - case pathSourceStateReady: + if pa.sourceReady { req.Res <- pathDescribeRes{ Stream: pa.stream, } return + } - case pathSourceStateCreating: + if pa.isOnDemand() { + if pa.onDemandState == pathOnDemandStateInitial { + pa.onDemandStartSource() + } pa.describeRequests = append(pa.describeRequests, req) return + } - case pathSourceStateNotReady: - if pa.conf.Fallback != "" { - fallbackURL := func() string { - if strings.HasPrefix(pa.conf.Fallback, "/") { - ur := base.URL{ - Scheme: req.URL.Scheme, - User: req.URL.User, - Host: req.URL.Host, - Path: pa.conf.Fallback, - } - return ur.String() + if pa.conf.Fallback != "" { + fallbackURL := func() string { + if strings.HasPrefix(pa.conf.Fallback, "/") { + ur := base.URL{ + Scheme: req.URL.Scheme, + User: req.URL.User, + Host: req.URL.Host, + Path: pa.conf.Fallback, } - return pa.conf.Fallback - }() - req.Res <- pathDescribeRes{Redirect: fallbackURL} - return - } - - req.Res <- pathDescribeRes{Err: pathErrNoOnePublishing{PathName: pa.name}} + return ur.String() + } + return pa.conf.Fallback + }() + req.Res <- pathDescribeRes{Redirect: fallbackURL} return } + + req.Res <- pathDescribeRes{Err: pathErrNoOnePublishing{PathName: pa.name}} } func (pa *path) onPublisherRemove(req pathPublisherRemoveReq) { if pa.source == req.Author { - pa.removePublisher(req.Author) + pa.doPublisherRemove() } close(req.Res) } @@ -692,20 +710,13 @@ func (pa *path) onPublisherAnnounce(req pathPublisherAnnounceReq) { } pa.Log(logger.Info, "closing existing publisher") - curPub := pa.source.(publisher) - pa.removePublisher(curPub) - curPub.Close() - - // prevent path closure - if pa.closeTimerStarted { - pa.closeTimer.Stop() - pa.closeTimer = newEmptyTimer() - pa.closeTimerStarted = false - } + pa.source.(publisher).Close() + pa.doPublisherRemove() } pa.source = req.Author pa.stream = gortsplib.NewServerStream(req.Tracks) + req.Res <- pathPublisherAnnounceRes{Path: pa} } @@ -719,7 +730,7 @@ func (pa *path) onPublisherRecord(req pathPublisherRecordReq) { req.Author.OnPublisherAccepted(len(pa.stream.Tracks())) - pa.onSourceSetReady() + pa.sourceSetReady() if pa.conf.RunOnPublish != "" { _, port, _ := net.SplitHostPort(pa.rtspAddress) @@ -733,54 +744,50 @@ func (pa *path) onPublisherRecord(req pathPublisherRecordReq) { } func (pa *path) onPublisherPause(req pathPublisherPauseReq) { - if req.Author == pa.source && pa.sourceState == pathSourceStateReady { + if req.Author == pa.source && pa.sourceReady { atomic.AddInt64(pa.stats.CountPublishers, -1) - pa.onSourceSetNotReady() + pa.sourceSetNotReady() } close(req.Res) } func (pa *path) onReaderRemove(req pathReaderRemoveReq) { if _, ok := pa.readers[req.Author]; ok { - pa.removeReader(req.Author) + pa.doReaderRemove(req.Author) } close(req.Res) + + if pa.isOnDemand() && + len(pa.readers) == 0 && + pa.onDemandState == pathOnDemandStateReady { + pa.onDemandScheduleClose() + } } func (pa *path) onReaderSetupPlay(req pathReaderSetupPlayReq) { - pa.fixedPublisherStart() - pa.scheduleClose() - - switch pa.sourceState { - case pathSourceStateReady: + if pa.sourceReady { pa.onReaderSetupPlayPost(req) return + } - case pathSourceStateCreating: + if pa.isOnDemand() { + if pa.onDemandState == pathOnDemandStateInitial { + pa.onDemandStartSource() + } pa.setupPlayRequests = append(pa.setupPlayRequests, req) return - - case pathSourceStateNotReady: - req.Res <- pathReaderSetupPlayRes{Err: pathErrNoOnePublishing{PathName: pa.name}} - return } + + req.Res <- pathReaderSetupPlayRes{Err: pathErrNoOnePublishing{PathName: pa.name}} } func (pa *path) onReaderSetupPlayPost(req pathReaderSetupPlayReq) { - if _, ok := pa.readers[req.Author]; !ok { - // prevent on-demand source from closing - if pa.sourceCloseTimerStarted { - pa.sourceCloseTimer = newEmptyTimer() - pa.sourceCloseTimerStarted = false - } + pa.readers[req.Author] = pathReaderStatePrePlay - // prevent on-demand command from closing - if pa.runOnDemandCloseTimerStarted { - pa.runOnDemandCloseTimer = newEmptyTimer() - pa.runOnDemandCloseTimerStarted = false - } - - pa.readers[req.Author] = pathReaderStatePrePlay + if pa.isOnDemand() && pa.onDemandState == pathOnDemandStateClosing { + pa.onDemandState = pathOnDemandStateReady + pa.onDemandCloseTimer.Stop() + pa.onDemandCloseTimer = newEmptyTimer() } req.Res <- pathReaderSetupPlayRes{ @@ -814,54 +821,6 @@ func (pa *path) onReaderPause(req pathReaderPauseReq) { close(req.Res) } -func (pa *path) scheduleSourceClose() { - if !pa.hasStaticSource() || !pa.conf.SourceOnDemand || pa.source == nil { - return - } - - if pa.sourceCloseTimerStarted || - pa.sourceState == pathSourceStateCreating || - len(pa.readers) > 0 || - pa.source != nil { - return - } - - pa.sourceCloseTimer.Stop() - pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter) - pa.sourceCloseTimerStarted = true -} - -func (pa *path) scheduleRunOnDemandClose() { - if pa.conf.RunOnDemand == "" || pa.onDemandCmd == nil { - return - } - - if pa.runOnDemandCloseTimerStarted || - pa.sourceState == pathSourceStateCreating || - len(pa.readers) > 0 { - return - } - - pa.runOnDemandCloseTimer.Stop() - pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter) - pa.runOnDemandCloseTimerStarted = true -} - -func (pa *path) scheduleClose() { - if pa.conf.Regexp != nil && - len(pa.readers) == 0 && - pa.source == nil && - pa.sourceState != pathSourceStateCreating && - !pa.sourceCloseTimerStarted && - !pa.runOnDemandCloseTimerStarted && - !pa.closeTimerStarted { - - pa.closeTimer.Stop() - pa.closeTimer = time.NewTimer(0) - pa.closeTimerStarted = true - } -} - // OnSourceStaticSetReady is called by a sourceStatic. func (pa *path) OnSourceStaticSetReady(req pathSourceStaticSetReadyReq) { req.Res = make(chan struct{}) diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index 04eafa91..61df3e20 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -173,7 +173,7 @@ func (s *rtmpSource) runInner() bool { }) defer func() { - s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{}) + s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{Source: s}) }() rtcpSenders := rtcpsenderset.New(tracks, s.parent.OnSourceFrame) diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index a33992d2..8efa2909 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -192,7 +192,7 @@ func (s *rtspSource) runInner() bool { }) defer func() { - s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{}) + s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{Source: s}) }() readErr := make(chan error)