refactor ondemand system

This commit is contained in:
aler9 2021-08-03 22:40:47 +02:00
parent fd27ed941e
commit e516d265d6
4 changed files with 232 additions and 269 deletions

View File

@ -306,6 +306,10 @@ func (pconf *PathConf) fillAndCheck(name string) error {
return fmt.Errorf("'runOnPublish' is useless when source is not 'record', since the stream is not provided by a publisher, but by a fixed source")
}
if pconf.RunOnDemand != "" && pconf.Source != "record" {
return fmt.Errorf("'runOnDemand' can be used only when source is 'record'")
}
if pconf.RunOnDemandStartTimeout == 0 {
pconf.RunOnDemandStartTimeout = 10 * time.Second
}

View File

@ -72,12 +72,13 @@ const (
pathReaderStatePlay
)
type pathSourceState int
type pathOnDemandState int
const (
pathSourceStateNotReady pathSourceState = iota
pathSourceStateCreating
pathSourceStateReady
pathOnDemandStateInitial pathOnDemandState = iota
pathOnDemandStateWaitingReady
pathOnDemandStateReady
pathOnDemandStateClosing
)
type pathSourceStaticSetReadyReq struct {
@ -86,7 +87,8 @@ type pathSourceStaticSetReadyReq struct {
}
type pathSourceStaticSetNotReadyReq struct {
Res chan struct{}
Source sourceStatic
Res chan struct{}
}
type pathReaderRemoveReq struct {
@ -211,25 +213,21 @@ type path struct {
stats *stats
parent pathParent
ctx context.Context
ctxCancel func()
readers map[reader]pathReaderState
describeRequests []pathDescribeReq
setupPlayRequests []pathReaderSetupPlayReq
source source
sourceStaticWg sync.WaitGroup
stream *gortsplib.ServerStream
nonRTSPReaders *pathReadersMap
onDemandCmd *externalcmd.Cmd
onPublishCmd *externalcmd.Cmd
describeTimer *time.Timer
sourceCloseTimer *time.Timer
sourceCloseTimerStarted bool
sourceState pathSourceState
runOnDemandCloseTimer *time.Timer
runOnDemandCloseTimerStarted bool
closeTimer *time.Timer
closeTimerStarted bool
ctx context.Context
ctxCancel func()
source source
sourceReady bool
sourceStaticWg sync.WaitGroup
stream *gortsplib.ServerStream
readers map[reader]pathReaderState
describeRequests []pathDescribeReq
setupPlayRequests []pathReaderSetupPlayReq
nonRTSPReaders *pathReadersMap
onDemandCmd *externalcmd.Cmd
onPublishCmd *externalcmd.Cmd
onDemandReadyTimer *time.Timer
onDemandCloseTimer *time.Timer
onDemandState pathOnDemandState
// in
sourceStaticSetReady chan pathSourceStaticSetReadyReq
@ -276,10 +274,8 @@ func newPath(
ctxCancel: ctxCancel,
readers: make(map[reader]pathReaderState),
nonRTSPReaders: newPathReadersMap(),
describeTimer: newEmptyTimer(),
sourceCloseTimer: newEmptyTimer(),
runOnDemandCloseTimer: newEmptyTimer(),
closeTimer: newEmptyTimer(),
onDemandReadyTimer: newEmptyTimer(),
onDemandCloseTimer: newEmptyTimer(),
sourceStaticSetReady: make(chan pathSourceStaticSetReadyReq),
sourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
describe: make(chan pathDescribeReq),
@ -328,8 +324,8 @@ func (pa *path) run() {
if pa.conf.Source == "redirect" {
pa.source = &sourceRedirect{}
} else if pa.hasStaticSource() && !pa.conf.SourceOnDemand {
pa.startStaticSource()
} else if !pa.conf.SourceOnDemand && pa.hasStaticSource() {
pa.staticSourceCreate()
}
var onInitCmd *externalcmd.Cmd
@ -345,57 +341,55 @@ func (pa *path) run() {
outer:
for {
select {
case <-pa.describeTimer.C:
case <-pa.onDemandReadyTimer.C:
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
req.Res <- pathDescribeRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.setupPlayRequests = nil
// set state after removeReader(), so schedule* works once
pa.sourceState = pathSourceStateNotReady
pa.onDemandCloseSource()
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
if pa.conf.Regexp != nil {
break outer
}
case <-pa.sourceCloseTimer.C:
pa.sourceCloseTimerStarted = false
pa.source.(sourceStatic).Close()
pa.source = nil
case <-pa.onDemandCloseTimer.C:
pa.onDemandCloseSource()
pa.scheduleClose()
case <-pa.runOnDemandCloseTimer.C:
pa.runOnDemandCloseTimerStarted = false
pa.Log(logger.Info, "on demand command stopped")
pa.onDemandCmd.Close()
pa.onDemandCmd = nil
pa.scheduleClose()
case <-pa.closeTimer.C:
break outer
if pa.conf.Regexp != nil {
break outer
}
case req := <-pa.sourceStaticSetReady:
pa.stream = gortsplib.NewServerStream(req.Tracks)
pa.onSourceSetReady()
pa.sourceSetReady()
close(req.Res)
case req := <-pa.sourceStaticSetNotReady:
pa.onSourceSetNotReady()
if req.Source == pa.source {
pa.sourceSetNotReady()
}
close(req.Res)
if pa.source == nil && pa.conf.Regexp != nil {
break outer
}
case req := <-pa.describe:
pa.onDescribe(req)
case req := <-pa.publisherRemove:
pa.onPublisherRemove(req)
if pa.source == nil && pa.conf.Regexp != nil {
break outer
}
case req := <-pa.publisherAnnounce:
pa.onPublisherAnnounce(req)
@ -405,6 +399,10 @@ outer:
case req := <-pa.publisherPause:
pa.onPublisherPause(req)
if pa.source == nil && pa.conf.Regexp != nil {
break outer
}
case req := <-pa.readerRemove:
pa.onReaderRemove(req)
@ -424,10 +422,8 @@ outer:
pa.ctxCancel()
pa.describeTimer.Stop()
pa.sourceCloseTimer.Stop()
pa.runOnDemandCloseTimer.Stop()
pa.closeTimer.Stop()
pa.onDemandReadyTimer.Stop()
pa.onDemandCloseTimer.Stop()
if onInitCmd != nil {
pa.Log(logger.Info, "on init command stopped")
@ -463,7 +459,7 @@ outer:
source.Close()
pa.sourceStaticWg.Wait()
} else if source, ok := pa.source.(publisher); ok {
if pa.sourceState == pathSourceStateReady {
if pa.sourceReady {
atomic.AddInt64(pa.stats.CountPublishers, -1)
}
source.Close()
@ -479,7 +475,111 @@ func (pa *path) hasStaticSource() bool {
strings.HasPrefix(pa.conf.Source, "rtmp://")
}
func (pa *path) startStaticSource() {
func (pa *path) isOnDemand() bool {
return (pa.hasStaticSource() && pa.conf.SourceOnDemand) || pa.conf.RunOnDemand != ""
}
func (pa *path) onDemandStartSource() {
pa.onDemandReadyTimer.Stop()
if pa.hasStaticSource() {
pa.staticSourceCreate()
pa.onDemandReadyTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout)
} else {
pa.Log(logger.Info, "on demand command started")
_, port, _ := net.SplitHostPort(pa.rtspAddress)
pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{
Path: pa.name,
Port: port,
})
pa.onDemandReadyTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout)
}
pa.onDemandState = pathOnDemandStateWaitingReady
}
func (pa *path) onDemandScheduleClose() {
pa.onDemandCloseTimer.Stop()
if pa.hasStaticSource() {
pa.onDemandCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter)
} else {
pa.onDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter)
}
pa.onDemandState = pathOnDemandStateClosing
}
func (pa *path) onDemandCloseSource() {
if pa.onDemandState == pathOnDemandStateClosing {
pa.onDemandCloseTimer.Stop()
pa.onDemandCloseTimer = newEmptyTimer()
}
// set state before doPublisherRemove()
pa.onDemandState = pathOnDemandStateInitial
if pa.hasStaticSource() {
pa.staticSourceDelete()
} else {
pa.Log(logger.Info, "on demand command stopped")
pa.onDemandCmd.Close()
pa.onDemandCmd = nil
if pa.source != nil {
pa.source.(publisher).Close()
pa.doPublisherRemove()
}
}
}
func (pa *path) sourceSetReady() {
pa.sourceReady = true
if pa.isOnDemand() {
pa.onDemandReadyTimer.Stop()
pa.onDemandReadyTimer = newEmptyTimer()
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{
Stream: pa.stream,
}
}
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
pa.onReaderSetupPlayPost(req)
}
pa.setupPlayRequests = nil
if len(pa.readers) > 0 {
pa.onDemandState = pathOnDemandStateReady
} else {
pa.onDemandScheduleClose()
}
}
pa.parent.OnPathSourceReady(pa)
}
func (pa *path) sourceSetNotReady() {
pa.sourceReady = false
if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial {
pa.onDemandCloseSource()
}
if pa.onPublishCmd != nil {
pa.onPublishCmd.Close()
pa.onPublishCmd = nil
}
for r := range pa.readers {
pa.doReaderRemove(r)
r.Close()
}
}
func (pa *path) staticSourceCreate() {
if strings.HasPrefix(pa.conf.Source, "rtsp://") ||
strings.HasPrefix(pa.conf.Source, "rtsps://") {
pa.source = newRTSPSource(
@ -507,7 +607,17 @@ func (pa *path) startStaticSource() {
}
}
func (pa *path) removeReader(r reader) {
func (pa *path) staticSourceDelete() {
pa.sourceReady = false
pa.source.(sourceStatic).Close()
pa.source = nil
pa.stream.Close()
pa.stream = nil
}
func (pa *path) doReaderRemove(r reader) {
state := pa.readers[r]
if state == pathReaderStatePlay {
@ -519,16 +629,12 @@ func (pa *path) removeReader(r reader) {
}
delete(pa.readers, r)
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
}
func (pa *path) removePublisher(p publisher) {
if pa.sourceState == pathSourceStateReady {
func (pa *path) doPublisherRemove() {
if pa.sourceReady {
atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.onSourceSetNotReady()
pa.sourceSetNotReady()
}
pa.source = nil
@ -536,101 +642,12 @@ func (pa *path) removePublisher(p publisher) {
pa.stream = nil
for r := range pa.readers {
pa.removeReader(r)
r.Close()
}
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
}
func (pa *path) fixedPublisherStart() {
if pa.hasStaticSource() {
// start on-demand source
if pa.source == nil {
pa.startStaticSource()
if pa.sourceState != pathSourceStateCreating {
pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout)
pa.sourceState = pathSourceStateCreating
}
// reset timer
} else if pa.sourceCloseTimerStarted {
pa.sourceCloseTimer.Stop()
pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter)
}
}
if pa.conf.RunOnDemand != "" {
// start on-demand command
if pa.onDemandCmd == nil {
pa.Log(logger.Info, "on demand command started")
_, port, _ := net.SplitHostPort(pa.rtspAddress)
pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{
Path: pa.name,
Port: port,
})
if pa.sourceState != pathSourceStateCreating {
pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout)
pa.sourceState = pathSourceStateCreating
}
// reset timer
} else if pa.runOnDemandCloseTimerStarted {
pa.runOnDemandCloseTimer.Stop()
pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter)
}
}
}
func (pa *path) onSourceSetReady() {
if pa.sourceState == pathSourceStateCreating {
pa.describeTimer.Stop()
pa.describeTimer = newEmptyTimer()
}
pa.sourceState = pathSourceStateReady
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{
Stream: pa.stream,
}
}
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
pa.onReaderSetupPlayPost(req)
}
pa.setupPlayRequests = nil
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
pa.parent.OnPathSourceReady(pa)
}
func (pa *path) onSourceSetNotReady() {
pa.sourceState = pathSourceStateNotReady
if pa.onPublishCmd != nil {
pa.onPublishCmd.Close()
pa.onPublishCmd = nil
}
for r := range pa.readers {
pa.removeReader(r)
pa.doReaderRemove(r)
r.Close()
}
}
func (pa *path) onDescribe(req pathDescribeReq) {
pa.fixedPublisherStart()
pa.scheduleClose()
if _, ok := pa.source.(*sourceRedirect); ok {
req.Res <- pathDescribeRes{
Redirect: pa.conf.SourceRedirect,
@ -638,43 +655,44 @@ func (pa *path) onDescribe(req pathDescribeReq) {
return
}
switch pa.sourceState {
case pathSourceStateReady:
if pa.sourceReady {
req.Res <- pathDescribeRes{
Stream: pa.stream,
}
return
}
case pathSourceStateCreating:
if pa.isOnDemand() {
if pa.onDemandState == pathOnDemandStateInitial {
pa.onDemandStartSource()
}
pa.describeRequests = append(pa.describeRequests, req)
return
}
case pathSourceStateNotReady:
if pa.conf.Fallback != "" {
fallbackURL := func() string {
if strings.HasPrefix(pa.conf.Fallback, "/") {
ur := base.URL{
Scheme: req.URL.Scheme,
User: req.URL.User,
Host: req.URL.Host,
Path: pa.conf.Fallback,
}
return ur.String()
if pa.conf.Fallback != "" {
fallbackURL := func() string {
if strings.HasPrefix(pa.conf.Fallback, "/") {
ur := base.URL{
Scheme: req.URL.Scheme,
User: req.URL.User,
Host: req.URL.Host,
Path: pa.conf.Fallback,
}
return pa.conf.Fallback
}()
req.Res <- pathDescribeRes{Redirect: fallbackURL}
return
}
req.Res <- pathDescribeRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
return ur.String()
}
return pa.conf.Fallback
}()
req.Res <- pathDescribeRes{Redirect: fallbackURL}
return
}
req.Res <- pathDescribeRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
}
func (pa *path) onPublisherRemove(req pathPublisherRemoveReq) {
if pa.source == req.Author {
pa.removePublisher(req.Author)
pa.doPublisherRemove()
}
close(req.Res)
}
@ -692,20 +710,13 @@ func (pa *path) onPublisherAnnounce(req pathPublisherAnnounceReq) {
}
pa.Log(logger.Info, "closing existing publisher")
curPub := pa.source.(publisher)
pa.removePublisher(curPub)
curPub.Close()
// prevent path closure
if pa.closeTimerStarted {
pa.closeTimer.Stop()
pa.closeTimer = newEmptyTimer()
pa.closeTimerStarted = false
}
pa.source.(publisher).Close()
pa.doPublisherRemove()
}
pa.source = req.Author
pa.stream = gortsplib.NewServerStream(req.Tracks)
req.Res <- pathPublisherAnnounceRes{Path: pa}
}
@ -719,7 +730,7 @@ func (pa *path) onPublisherRecord(req pathPublisherRecordReq) {
req.Author.OnPublisherAccepted(len(pa.stream.Tracks()))
pa.onSourceSetReady()
pa.sourceSetReady()
if pa.conf.RunOnPublish != "" {
_, port, _ := net.SplitHostPort(pa.rtspAddress)
@ -733,54 +744,50 @@ func (pa *path) onPublisherRecord(req pathPublisherRecordReq) {
}
func (pa *path) onPublisherPause(req pathPublisherPauseReq) {
if req.Author == pa.source && pa.sourceState == pathSourceStateReady {
if req.Author == pa.source && pa.sourceReady {
atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.onSourceSetNotReady()
pa.sourceSetNotReady()
}
close(req.Res)
}
func (pa *path) onReaderRemove(req pathReaderRemoveReq) {
if _, ok := pa.readers[req.Author]; ok {
pa.removeReader(req.Author)
pa.doReaderRemove(req.Author)
}
close(req.Res)
if pa.isOnDemand() &&
len(pa.readers) == 0 &&
pa.onDemandState == pathOnDemandStateReady {
pa.onDemandScheduleClose()
}
}
func (pa *path) onReaderSetupPlay(req pathReaderSetupPlayReq) {
pa.fixedPublisherStart()
pa.scheduleClose()
switch pa.sourceState {
case pathSourceStateReady:
if pa.sourceReady {
pa.onReaderSetupPlayPost(req)
return
}
case pathSourceStateCreating:
if pa.isOnDemand() {
if pa.onDemandState == pathOnDemandStateInitial {
pa.onDemandStartSource()
}
pa.setupPlayRequests = append(pa.setupPlayRequests, req)
return
case pathSourceStateNotReady:
req.Res <- pathReaderSetupPlayRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
return
}
req.Res <- pathReaderSetupPlayRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
}
func (pa *path) onReaderSetupPlayPost(req pathReaderSetupPlayReq) {
if _, ok := pa.readers[req.Author]; !ok {
// prevent on-demand source from closing
if pa.sourceCloseTimerStarted {
pa.sourceCloseTimer = newEmptyTimer()
pa.sourceCloseTimerStarted = false
}
pa.readers[req.Author] = pathReaderStatePrePlay
// prevent on-demand command from closing
if pa.runOnDemandCloseTimerStarted {
pa.runOnDemandCloseTimer = newEmptyTimer()
pa.runOnDemandCloseTimerStarted = false
}
pa.readers[req.Author] = pathReaderStatePrePlay
if pa.isOnDemand() && pa.onDemandState == pathOnDemandStateClosing {
pa.onDemandState = pathOnDemandStateReady
pa.onDemandCloseTimer.Stop()
pa.onDemandCloseTimer = newEmptyTimer()
}
req.Res <- pathReaderSetupPlayRes{
@ -814,54 +821,6 @@ func (pa *path) onReaderPause(req pathReaderPauseReq) {
close(req.Res)
}
func (pa *path) scheduleSourceClose() {
if !pa.hasStaticSource() || !pa.conf.SourceOnDemand || pa.source == nil {
return
}
if pa.sourceCloseTimerStarted ||
pa.sourceState == pathSourceStateCreating ||
len(pa.readers) > 0 ||
pa.source != nil {
return
}
pa.sourceCloseTimer.Stop()
pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter)
pa.sourceCloseTimerStarted = true
}
func (pa *path) scheduleRunOnDemandClose() {
if pa.conf.RunOnDemand == "" || pa.onDemandCmd == nil {
return
}
if pa.runOnDemandCloseTimerStarted ||
pa.sourceState == pathSourceStateCreating ||
len(pa.readers) > 0 {
return
}
pa.runOnDemandCloseTimer.Stop()
pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter)
pa.runOnDemandCloseTimerStarted = true
}
func (pa *path) scheduleClose() {
if pa.conf.Regexp != nil &&
len(pa.readers) == 0 &&
pa.source == nil &&
pa.sourceState != pathSourceStateCreating &&
!pa.sourceCloseTimerStarted &&
!pa.runOnDemandCloseTimerStarted &&
!pa.closeTimerStarted {
pa.closeTimer.Stop()
pa.closeTimer = time.NewTimer(0)
pa.closeTimerStarted = true
}
}
// OnSourceStaticSetReady is called by a sourceStatic.
func (pa *path) OnSourceStaticSetReady(req pathSourceStaticSetReadyReq) {
req.Res = make(chan struct{})

View File

@ -173,7 +173,7 @@ func (s *rtmpSource) runInner() bool {
})
defer func() {
s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{})
s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{Source: s})
}()
rtcpSenders := rtcpsenderset.New(tracks, s.parent.OnSourceFrame)

View File

@ -192,7 +192,7 @@ func (s *rtspSource) runInner() bool {
})
defer func() {
s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{})
s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{Source: s})
}()
readErr := make(chan error)