diff --git a/internal/core/path.go b/internal/core/path.go index 87749a51..3ba165fa 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -37,22 +37,22 @@ type sourceRedirect struct{} func (*sourceRedirect) IsSource() {} -type readPublisherState int +type pathReadPublisherState int const ( - readPublisherStatePrePlay readPublisherState = iota - readPublisherStatePlay - readPublisherStatePreRecord - readPublisherStateRecord - readPublisherStatePreRemove + pathReadPublisherStatePrePlay pathReadPublisherState = iota + pathReadPublisherStatePlay + pathReadPublisherStatePreRecord + pathReadPublisherStateRecord + pathReadPublisherStatePreRemove ) -type sourceState int +type pathSourceState int const ( - sourceStateNotReady sourceState = iota - sourceStateWaitingDescribe - sourceStateReady + pathSourceStateNotReady pathSourceState = iota + pathSourceStateCreating + pathSourceStateReady ) type pathReadersMap struct { @@ -102,7 +102,7 @@ type path struct { ctx context.Context ctxCancel func() - readPublishers map[readPublisher]readPublisherState + readPublishers map[readPublisher]pathReadPublisherState describeRequests []readPublisherDescribeReq setupPlayRequests []readPublisherSetupPlayReq source source @@ -113,7 +113,7 @@ type path struct { describeTimer *time.Timer sourceCloseTimer *time.Timer sourceCloseTimerStarted bool - sourceState sourceState + sourceState pathSourceState sourceWg sync.WaitGroup runOnDemandCloseTimer *time.Timer runOnDemandCloseTimerStarted bool @@ -161,7 +161,7 @@ func newPath( parent: parent, ctx: ctx, ctxCancel: ctxCancel, - readPublishers: make(map[readPublisher]readPublisherState), + readPublishers: make(map[readPublisher]pathReadPublisherState), nonRTSPReaders: newPathReadersMap(), describeTimer: newEmptyTimer(), sourceCloseTimer: newEmptyTimer(), @@ -192,6 +192,21 @@ func (pa *path) Log(level logger.Level, format string, args ...interface{}) { pa.parent.Log(level, "[path "+pa.name+"] "+format, args...) } +// ConfName returns the configuration name of this path. +func (pa *path) ConfName() string { + return pa.confName +} + +// Conf returns the configuration of this path. +func (pa *path) Conf() *conf.PathConf { + return pa.conf +} + +// Name returns the name of this path. +func (pa *path) Name() string { + return pa.name +} + func (pa *path) run() { defer pa.wg.Done() @@ -226,7 +241,7 @@ outer: pa.setupPlayRequests = nil // set state after removeReadPublisher(), so schedule* works once - pa.sourceState = sourceStateNotReady + pa.sourceState = pathSourceStateNotReady pa.scheduleSourceClose() pa.scheduleRunOnDemandClose() @@ -283,7 +298,7 @@ outer: continue } - if pa.readPublishers[req.Author] != readPublisherStatePreRemove { + if pa.readPublishers[req.Author] != pathReadPublisherStatePreRemove { pa.removeReadPublisher(req.Author) } @@ -326,16 +341,16 @@ outer: } for rp, state := range pa.readPublishers { - if state != readPublisherStatePreRemove { + if state != pathReadPublisherStatePreRemove { switch state { - case readPublisherStatePlay: + case pathReadPublisherStatePlay: atomic.AddInt64(pa.stats.CountReaders, -1) if _, ok := rp.(pathRTSPSession); !ok { pa.nonRTSPReaders.remove(rp) } - case readPublisherStateRecord: + case pathReadPublisherStateRecord: atomic.AddInt64(pa.stats.CountPublishers, -1) } rp.Close() @@ -381,7 +396,7 @@ func (pa *path) startExternalSource() { func (pa *path) hasReadPublishers() bool { for _, state := range pa.readPublishers { - if state != readPublisherStatePreRemove { + if state != pathReadPublisherStatePreRemove { return true } } @@ -390,30 +405,30 @@ func (pa *path) hasReadPublishers() bool { func (pa *path) hasReadPublishersNotSources() bool { for c, state := range pa.readPublishers { - if state != readPublisherStatePreRemove && c != pa.source { + if state != pathReadPublisherStatePreRemove && c != pa.source { return true } } return false } -func (pa *path) addReadPublisher(c readPublisher, state readPublisherState) { +func (pa *path) addReadPublisher(c readPublisher, state pathReadPublisherState) { pa.readPublishers[c] = state } func (pa *path) removeReadPublisher(rp readPublisher) { state := pa.readPublishers[rp] - pa.readPublishers[rp] = readPublisherStatePreRemove + pa.readPublishers[rp] = pathReadPublisherStatePreRemove switch state { - case readPublisherStatePlay: + case pathReadPublisherStatePlay: atomic.AddInt64(pa.stats.CountReaders, -1) if _, ok := rp.(pathRTSPSession); !ok { pa.nonRTSPReaders.remove(rp) } - case readPublisherStateRecord: + case pathReadPublisherStateRecord: atomic.AddInt64(pa.stats.CountPublishers, -1) pa.onSourceSetNotReady() } @@ -425,7 +440,7 @@ func (pa *path) removeReadPublisher(rp readPublisher) { // close all readPublishers that are reading or waiting to read for orp, state := range pa.readPublishers { - if state != readPublisherStatePreRemove { + if state != pathReadPublisherStatePreRemove { pa.removeReadPublisher(orp) orp.Close() } @@ -438,12 +453,12 @@ func (pa *path) removeReadPublisher(rp readPublisher) { } func (pa *path) onSourceSetReady() { - if pa.sourceState == sourceStateWaitingDescribe { + if pa.sourceState == pathSourceStateCreating { pa.describeTimer.Stop() pa.describeTimer = newEmptyTimer() } - pa.sourceState = sourceStateReady + pa.sourceState = pathSourceStateReady for _, req := range pa.describeRequests { req.Res <- readPublisherDescribeRes{ @@ -465,7 +480,7 @@ func (pa *path) onSourceSetReady() { } func (pa *path) onSourceSetNotReady() { - pa.sourceState = sourceStateNotReady + pa.sourceState = pathSourceStateNotReady if pa.onPublishCmd != nil { pa.onPublishCmd.Close() @@ -474,7 +489,7 @@ func (pa *path) onSourceSetNotReady() { // close all readPublishers that are reading or waiting to read for c, state := range pa.readPublishers { - if c != pa.source && state != readPublisherStatePreRemove { + if c != pa.source && state != pathReadPublisherStatePreRemove { pa.removeReadPublisher(c) c.Close() } @@ -487,9 +502,9 @@ func (pa *path) fixedPublisherStart() { if pa.source == nil { pa.startExternalSource() - if pa.sourceState != sourceStateWaitingDescribe { + if pa.sourceState != pathSourceStateCreating { pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout) - pa.sourceState = sourceStateWaitingDescribe + pa.sourceState = pathSourceStateCreating } // reset timer @@ -509,9 +524,9 @@ func (pa *path) fixedPublisherStart() { Port: port, }) - if pa.sourceState != sourceStateWaitingDescribe { + if pa.sourceState != pathSourceStateCreating { pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout) - pa.sourceState = sourceStateWaitingDescribe + pa.sourceState = pathSourceStateCreating } // reset timer @@ -534,17 +549,17 @@ func (pa *path) onReadPublisherDescribe(req readPublisherDescribeReq) { } switch pa.sourceState { - case sourceStateReady: + case pathSourceStateReady: req.Res <- readPublisherDescribeRes{ Stream: pa.sourceStream, } return - case sourceStateWaitingDescribe: + case pathSourceStateCreating: pa.describeRequests = append(pa.describeRequests, req) return - case sourceStateNotReady: + case pathSourceStateNotReady: if pa.conf.Fallback != "" { fallbackURL := func() string { if strings.HasPrefix(pa.conf.Fallback, "/") { @@ -572,15 +587,15 @@ func (pa *path) onReadPublisherSetupPlay(req readPublisherSetupPlayReq) { pa.scheduleClose() switch pa.sourceState { - case sourceStateReady: + case pathSourceStateReady: pa.onReadPublisherSetupPlayPost(req) return - case sourceStateWaitingDescribe: + case pathSourceStateCreating: pa.setupPlayRequests = append(pa.setupPlayRequests, req) return - case sourceStateNotReady: + case pathSourceStateNotReady: req.Res <- readPublisherSetupPlayRes{Err: readPublisherErrNoOnePublishing{PathName: pa.name}} return } @@ -600,7 +615,7 @@ func (pa *path) onReadPublisherSetupPlayPost(req readPublisherSetupPlayReq) { pa.runOnDemandCloseTimerStarted = false } - pa.addReadPublisher(req.Author, readPublisherStatePrePlay) + pa.addReadPublisher(req.Author, pathReadPublisherStatePrePlay) } req.Res <- readPublisherSetupPlayRes{ @@ -611,7 +626,7 @@ func (pa *path) onReadPublisherSetupPlayPost(req readPublisherSetupPlayReq) { func (pa *path) onReadPublisherPlay(req readPublisherPlayReq) { atomic.AddInt64(pa.stats.CountReaders, 1) - pa.readPublishers[req.Author] = readPublisherStatePlay + pa.readPublishers[req.Author] = pathReadPublisherStatePlay if _, ok := req.Author.(pathRTSPSession); !ok { pa.nonRTSPReaders.add(req.Author) @@ -652,7 +667,7 @@ func (pa *path) onReadPublisherAnnounce(req readPublisherAnnounceReq) { } } - pa.addReadPublisher(req.Author, readPublisherStatePreRecord) + pa.addReadPublisher(req.Author, pathReadPublisherStatePreRecord) pa.source = req.Author pa.sourceStream = gortsplib.NewServerStream(req.Tracks) @@ -660,13 +675,13 @@ func (pa *path) onReadPublisherAnnounce(req readPublisherAnnounceReq) { } func (pa *path) onReadPublisherRecord(req readPublisherRecordReq) { - if state, ok := pa.readPublishers[req.Author]; !ok || state != readPublisherStatePreRecord { + if state, ok := pa.readPublishers[req.Author]; !ok || state != pathReadPublisherStatePreRecord { req.Res <- readPublisherRecordRes{Err: fmt.Errorf("not recording anymore")} return } atomic.AddInt64(pa.stats.CountPublishers, 1) - pa.readPublishers[req.Author] = readPublisherStateRecord + pa.readPublishers[req.Author] = pathReadPublisherStateRecord req.Author.OnPublisherAccepted(len(pa.sourceStream.Tracks())) @@ -690,17 +705,17 @@ func (pa *path) onReadPublisherPause(req readPublisherPauseReq) { return } - if state == readPublisherStatePlay { + if state == pathReadPublisherStatePlay { atomic.AddInt64(pa.stats.CountReaders, -1) - pa.readPublishers[req.Author] = readPublisherStatePrePlay + pa.readPublishers[req.Author] = pathReadPublisherStatePrePlay if _, ok := req.Author.(pathRTSPSession); !ok { pa.nonRTSPReaders.remove(req.Author) } - } else if state == readPublisherStateRecord { + } else if state == pathReadPublisherStateRecord { atomic.AddInt64(pa.stats.CountPublishers, -1) - pa.readPublishers[req.Author] = readPublisherStatePreRecord + pa.readPublishers[req.Author] = pathReadPublisherStatePreRecord pa.onSourceSetNotReady() } @@ -713,7 +728,7 @@ func (pa *path) scheduleSourceClose() { } if pa.sourceCloseTimerStarted || - pa.sourceState == sourceStateWaitingDescribe || + pa.sourceState == pathSourceStateCreating || pa.hasReadPublishers() { return } @@ -729,7 +744,7 @@ func (pa *path) scheduleRunOnDemandClose() { } if pa.runOnDemandCloseTimerStarted || - pa.sourceState == sourceStateWaitingDescribe || + pa.sourceState == pathSourceStateCreating || pa.hasReadPublishersNotSources() { return } @@ -743,7 +758,7 @@ func (pa *path) scheduleClose() { if pa.conf.Regexp != nil && !pa.hasReadPublishers() && pa.source == nil && - pa.sourceState != sourceStateWaitingDescribe && + pa.sourceState != pathSourceStateCreating && !pa.sourceCloseTimerStarted && !pa.runOnDemandCloseTimerStarted && !pa.closeTimerStarted { @@ -754,21 +769,6 @@ func (pa *path) scheduleClose() { } } -// ConfName returns the configuration name of this path. -func (pa *path) ConfName() string { - return pa.confName -} - -// Conf returns the configuration of this path. -func (pa *path) Conf() *conf.PathConf { - return pa.conf -} - -// Name returns the name of this path. -func (pa *path) Name() string { - return pa.name -} - // OnSourceExternalSetReady is called by an external source. func (pa *path) OnSourceExternalSetReady(req sourceExtSetReadyReq) { req.Res = make(chan sourceExtSetReadyRes)