add path prefix to path entities

This commit is contained in:
aler9 2021-07-31 18:32:00 +02:00
parent 5ab989250d
commit 1a7f26ce29

View File

@ -37,22 +37,22 @@ type sourceRedirect struct{}
func (*sourceRedirect) IsSource() {} func (*sourceRedirect) IsSource() {}
type readPublisherState int type pathReadPublisherState int
const ( const (
readPublisherStatePrePlay readPublisherState = iota pathReadPublisherStatePrePlay pathReadPublisherState = iota
readPublisherStatePlay pathReadPublisherStatePlay
readPublisherStatePreRecord pathReadPublisherStatePreRecord
readPublisherStateRecord pathReadPublisherStateRecord
readPublisherStatePreRemove pathReadPublisherStatePreRemove
) )
type sourceState int type pathSourceState int
const ( const (
sourceStateNotReady sourceState = iota pathSourceStateNotReady pathSourceState = iota
sourceStateWaitingDescribe pathSourceStateCreating
sourceStateReady pathSourceStateReady
) )
type pathReadersMap struct { type pathReadersMap struct {
@ -102,7 +102,7 @@ type path struct {
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
readPublishers map[readPublisher]readPublisherState readPublishers map[readPublisher]pathReadPublisherState
describeRequests []readPublisherDescribeReq describeRequests []readPublisherDescribeReq
setupPlayRequests []readPublisherSetupPlayReq setupPlayRequests []readPublisherSetupPlayReq
source source source source
@ -113,7 +113,7 @@ type path struct {
describeTimer *time.Timer describeTimer *time.Timer
sourceCloseTimer *time.Timer sourceCloseTimer *time.Timer
sourceCloseTimerStarted bool sourceCloseTimerStarted bool
sourceState sourceState sourceState pathSourceState
sourceWg sync.WaitGroup sourceWg sync.WaitGroup
runOnDemandCloseTimer *time.Timer runOnDemandCloseTimer *time.Timer
runOnDemandCloseTimerStarted bool runOnDemandCloseTimerStarted bool
@ -161,7 +161,7 @@ func newPath(
parent: parent, parent: parent,
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
readPublishers: make(map[readPublisher]readPublisherState), readPublishers: make(map[readPublisher]pathReadPublisherState),
nonRTSPReaders: newPathReadersMap(), nonRTSPReaders: newPathReadersMap(),
describeTimer: newEmptyTimer(), describeTimer: newEmptyTimer(),
sourceCloseTimer: newEmptyTimer(), sourceCloseTimer: newEmptyTimer(),
@ -192,6 +192,21 @@ func (pa *path) Log(level logger.Level, format string, args ...interface{}) {
pa.parent.Log(level, "[path "+pa.name+"] "+format, args...) pa.parent.Log(level, "[path "+pa.name+"] "+format, args...)
} }
// ConfName returns the configuration name of this path.
func (pa *path) ConfName() string {
return pa.confName
}
// Conf returns the configuration of this path.
func (pa *path) Conf() *conf.PathConf {
return pa.conf
}
// Name returns the name of this path.
func (pa *path) Name() string {
return pa.name
}
func (pa *path) run() { func (pa *path) run() {
defer pa.wg.Done() defer pa.wg.Done()
@ -226,7 +241,7 @@ outer:
pa.setupPlayRequests = nil pa.setupPlayRequests = nil
// set state after removeReadPublisher(), so schedule* works once // set state after removeReadPublisher(), so schedule* works once
pa.sourceState = sourceStateNotReady pa.sourceState = pathSourceStateNotReady
pa.scheduleSourceClose() pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose() pa.scheduleRunOnDemandClose()
@ -283,7 +298,7 @@ outer:
continue continue
} }
if pa.readPublishers[req.Author] != readPublisherStatePreRemove { if pa.readPublishers[req.Author] != pathReadPublisherStatePreRemove {
pa.removeReadPublisher(req.Author) pa.removeReadPublisher(req.Author)
} }
@ -326,16 +341,16 @@ outer:
} }
for rp, state := range pa.readPublishers { for rp, state := range pa.readPublishers {
if state != readPublisherStatePreRemove { if state != pathReadPublisherStatePreRemove {
switch state { switch state {
case readPublisherStatePlay: case pathReadPublisherStatePlay:
atomic.AddInt64(pa.stats.CountReaders, -1) atomic.AddInt64(pa.stats.CountReaders, -1)
if _, ok := rp.(pathRTSPSession); !ok { if _, ok := rp.(pathRTSPSession); !ok {
pa.nonRTSPReaders.remove(rp) pa.nonRTSPReaders.remove(rp)
} }
case readPublisherStateRecord: case pathReadPublisherStateRecord:
atomic.AddInt64(pa.stats.CountPublishers, -1) atomic.AddInt64(pa.stats.CountPublishers, -1)
} }
rp.Close() rp.Close()
@ -381,7 +396,7 @@ func (pa *path) startExternalSource() {
func (pa *path) hasReadPublishers() bool { func (pa *path) hasReadPublishers() bool {
for _, state := range pa.readPublishers { for _, state := range pa.readPublishers {
if state != readPublisherStatePreRemove { if state != pathReadPublisherStatePreRemove {
return true return true
} }
} }
@ -390,30 +405,30 @@ func (pa *path) hasReadPublishers() bool {
func (pa *path) hasReadPublishersNotSources() bool { func (pa *path) hasReadPublishersNotSources() bool {
for c, state := range pa.readPublishers { for c, state := range pa.readPublishers {
if state != readPublisherStatePreRemove && c != pa.source { if state != pathReadPublisherStatePreRemove && c != pa.source {
return true return true
} }
} }
return false return false
} }
func (pa *path) addReadPublisher(c readPublisher, state readPublisherState) { func (pa *path) addReadPublisher(c readPublisher, state pathReadPublisherState) {
pa.readPublishers[c] = state pa.readPublishers[c] = state
} }
func (pa *path) removeReadPublisher(rp readPublisher) { func (pa *path) removeReadPublisher(rp readPublisher) {
state := pa.readPublishers[rp] state := pa.readPublishers[rp]
pa.readPublishers[rp] = readPublisherStatePreRemove pa.readPublishers[rp] = pathReadPublisherStatePreRemove
switch state { switch state {
case readPublisherStatePlay: case pathReadPublisherStatePlay:
atomic.AddInt64(pa.stats.CountReaders, -1) atomic.AddInt64(pa.stats.CountReaders, -1)
if _, ok := rp.(pathRTSPSession); !ok { if _, ok := rp.(pathRTSPSession); !ok {
pa.nonRTSPReaders.remove(rp) pa.nonRTSPReaders.remove(rp)
} }
case readPublisherStateRecord: case pathReadPublisherStateRecord:
atomic.AddInt64(pa.stats.CountPublishers, -1) atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.onSourceSetNotReady() pa.onSourceSetNotReady()
} }
@ -425,7 +440,7 @@ func (pa *path) removeReadPublisher(rp readPublisher) {
// close all readPublishers that are reading or waiting to read // close all readPublishers that are reading or waiting to read
for orp, state := range pa.readPublishers { for orp, state := range pa.readPublishers {
if state != readPublisherStatePreRemove { if state != pathReadPublisherStatePreRemove {
pa.removeReadPublisher(orp) pa.removeReadPublisher(orp)
orp.Close() orp.Close()
} }
@ -438,12 +453,12 @@ func (pa *path) removeReadPublisher(rp readPublisher) {
} }
func (pa *path) onSourceSetReady() { func (pa *path) onSourceSetReady() {
if pa.sourceState == sourceStateWaitingDescribe { if pa.sourceState == pathSourceStateCreating {
pa.describeTimer.Stop() pa.describeTimer.Stop()
pa.describeTimer = newEmptyTimer() pa.describeTimer = newEmptyTimer()
} }
pa.sourceState = sourceStateReady pa.sourceState = pathSourceStateReady
for _, req := range pa.describeRequests { for _, req := range pa.describeRequests {
req.Res <- readPublisherDescribeRes{ req.Res <- readPublisherDescribeRes{
@ -465,7 +480,7 @@ func (pa *path) onSourceSetReady() {
} }
func (pa *path) onSourceSetNotReady() { func (pa *path) onSourceSetNotReady() {
pa.sourceState = sourceStateNotReady pa.sourceState = pathSourceStateNotReady
if pa.onPublishCmd != nil { if pa.onPublishCmd != nil {
pa.onPublishCmd.Close() pa.onPublishCmd.Close()
@ -474,7 +489,7 @@ func (pa *path) onSourceSetNotReady() {
// close all readPublishers that are reading or waiting to read // close all readPublishers that are reading or waiting to read
for c, state := range pa.readPublishers { for c, state := range pa.readPublishers {
if c != pa.source && state != readPublisherStatePreRemove { if c != pa.source && state != pathReadPublisherStatePreRemove {
pa.removeReadPublisher(c) pa.removeReadPublisher(c)
c.Close() c.Close()
} }
@ -487,9 +502,9 @@ func (pa *path) fixedPublisherStart() {
if pa.source == nil { if pa.source == nil {
pa.startExternalSource() pa.startExternalSource()
if pa.sourceState != sourceStateWaitingDescribe { if pa.sourceState != pathSourceStateCreating {
pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout) pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout)
pa.sourceState = sourceStateWaitingDescribe pa.sourceState = pathSourceStateCreating
} }
// reset timer // reset timer
@ -509,9 +524,9 @@ func (pa *path) fixedPublisherStart() {
Port: port, Port: port,
}) })
if pa.sourceState != sourceStateWaitingDescribe { if pa.sourceState != pathSourceStateCreating {
pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout) pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout)
pa.sourceState = sourceStateWaitingDescribe pa.sourceState = pathSourceStateCreating
} }
// reset timer // reset timer
@ -534,17 +549,17 @@ func (pa *path) onReadPublisherDescribe(req readPublisherDescribeReq) {
} }
switch pa.sourceState { switch pa.sourceState {
case sourceStateReady: case pathSourceStateReady:
req.Res <- readPublisherDescribeRes{ req.Res <- readPublisherDescribeRes{
Stream: pa.sourceStream, Stream: pa.sourceStream,
} }
return return
case sourceStateWaitingDescribe: case pathSourceStateCreating:
pa.describeRequests = append(pa.describeRequests, req) pa.describeRequests = append(pa.describeRequests, req)
return return
case sourceStateNotReady: case pathSourceStateNotReady:
if pa.conf.Fallback != "" { if pa.conf.Fallback != "" {
fallbackURL := func() string { fallbackURL := func() string {
if strings.HasPrefix(pa.conf.Fallback, "/") { if strings.HasPrefix(pa.conf.Fallback, "/") {
@ -572,15 +587,15 @@ func (pa *path) onReadPublisherSetupPlay(req readPublisherSetupPlayReq) {
pa.scheduleClose() pa.scheduleClose()
switch pa.sourceState { switch pa.sourceState {
case sourceStateReady: case pathSourceStateReady:
pa.onReadPublisherSetupPlayPost(req) pa.onReadPublisherSetupPlayPost(req)
return return
case sourceStateWaitingDescribe: case pathSourceStateCreating:
pa.setupPlayRequests = append(pa.setupPlayRequests, req) pa.setupPlayRequests = append(pa.setupPlayRequests, req)
return return
case sourceStateNotReady: case pathSourceStateNotReady:
req.Res <- readPublisherSetupPlayRes{Err: readPublisherErrNoOnePublishing{PathName: pa.name}} req.Res <- readPublisherSetupPlayRes{Err: readPublisherErrNoOnePublishing{PathName: pa.name}}
return return
} }
@ -600,7 +615,7 @@ func (pa *path) onReadPublisherSetupPlayPost(req readPublisherSetupPlayReq) {
pa.runOnDemandCloseTimerStarted = false pa.runOnDemandCloseTimerStarted = false
} }
pa.addReadPublisher(req.Author, readPublisherStatePrePlay) pa.addReadPublisher(req.Author, pathReadPublisherStatePrePlay)
} }
req.Res <- readPublisherSetupPlayRes{ req.Res <- readPublisherSetupPlayRes{
@ -611,7 +626,7 @@ func (pa *path) onReadPublisherSetupPlayPost(req readPublisherSetupPlayReq) {
func (pa *path) onReadPublisherPlay(req readPublisherPlayReq) { func (pa *path) onReadPublisherPlay(req readPublisherPlayReq) {
atomic.AddInt64(pa.stats.CountReaders, 1) atomic.AddInt64(pa.stats.CountReaders, 1)
pa.readPublishers[req.Author] = readPublisherStatePlay pa.readPublishers[req.Author] = pathReadPublisherStatePlay
if _, ok := req.Author.(pathRTSPSession); !ok { if _, ok := req.Author.(pathRTSPSession); !ok {
pa.nonRTSPReaders.add(req.Author) pa.nonRTSPReaders.add(req.Author)
@ -652,7 +667,7 @@ func (pa *path) onReadPublisherAnnounce(req readPublisherAnnounceReq) {
} }
} }
pa.addReadPublisher(req.Author, readPublisherStatePreRecord) pa.addReadPublisher(req.Author, pathReadPublisherStatePreRecord)
pa.source = req.Author pa.source = req.Author
pa.sourceStream = gortsplib.NewServerStream(req.Tracks) pa.sourceStream = gortsplib.NewServerStream(req.Tracks)
@ -660,13 +675,13 @@ func (pa *path) onReadPublisherAnnounce(req readPublisherAnnounceReq) {
} }
func (pa *path) onReadPublisherRecord(req readPublisherRecordReq) { func (pa *path) onReadPublisherRecord(req readPublisherRecordReq) {
if state, ok := pa.readPublishers[req.Author]; !ok || state != readPublisherStatePreRecord { if state, ok := pa.readPublishers[req.Author]; !ok || state != pathReadPublisherStatePreRecord {
req.Res <- readPublisherRecordRes{Err: fmt.Errorf("not recording anymore")} req.Res <- readPublisherRecordRes{Err: fmt.Errorf("not recording anymore")}
return return
} }
atomic.AddInt64(pa.stats.CountPublishers, 1) atomic.AddInt64(pa.stats.CountPublishers, 1)
pa.readPublishers[req.Author] = readPublisherStateRecord pa.readPublishers[req.Author] = pathReadPublisherStateRecord
req.Author.OnPublisherAccepted(len(pa.sourceStream.Tracks())) req.Author.OnPublisherAccepted(len(pa.sourceStream.Tracks()))
@ -690,17 +705,17 @@ func (pa *path) onReadPublisherPause(req readPublisherPauseReq) {
return return
} }
if state == readPublisherStatePlay { if state == pathReadPublisherStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1) atomic.AddInt64(pa.stats.CountReaders, -1)
pa.readPublishers[req.Author] = readPublisherStatePrePlay pa.readPublishers[req.Author] = pathReadPublisherStatePrePlay
if _, ok := req.Author.(pathRTSPSession); !ok { if _, ok := req.Author.(pathRTSPSession); !ok {
pa.nonRTSPReaders.remove(req.Author) pa.nonRTSPReaders.remove(req.Author)
} }
} else if state == readPublisherStateRecord { } else if state == pathReadPublisherStateRecord {
atomic.AddInt64(pa.stats.CountPublishers, -1) atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.readPublishers[req.Author] = readPublisherStatePreRecord pa.readPublishers[req.Author] = pathReadPublisherStatePreRecord
pa.onSourceSetNotReady() pa.onSourceSetNotReady()
} }
@ -713,7 +728,7 @@ func (pa *path) scheduleSourceClose() {
} }
if pa.sourceCloseTimerStarted || if pa.sourceCloseTimerStarted ||
pa.sourceState == sourceStateWaitingDescribe || pa.sourceState == pathSourceStateCreating ||
pa.hasReadPublishers() { pa.hasReadPublishers() {
return return
} }
@ -729,7 +744,7 @@ func (pa *path) scheduleRunOnDemandClose() {
} }
if pa.runOnDemandCloseTimerStarted || if pa.runOnDemandCloseTimerStarted ||
pa.sourceState == sourceStateWaitingDescribe || pa.sourceState == pathSourceStateCreating ||
pa.hasReadPublishersNotSources() { pa.hasReadPublishersNotSources() {
return return
} }
@ -743,7 +758,7 @@ func (pa *path) scheduleClose() {
if pa.conf.Regexp != nil && if pa.conf.Regexp != nil &&
!pa.hasReadPublishers() && !pa.hasReadPublishers() &&
pa.source == nil && pa.source == nil &&
pa.sourceState != sourceStateWaitingDescribe && pa.sourceState != pathSourceStateCreating &&
!pa.sourceCloseTimerStarted && !pa.sourceCloseTimerStarted &&
!pa.runOnDemandCloseTimerStarted && !pa.runOnDemandCloseTimerStarted &&
!pa.closeTimerStarted { !pa.closeTimerStarted {
@ -754,21 +769,6 @@ func (pa *path) scheduleClose() {
} }
} }
// ConfName returns the configuration name of this path.
func (pa *path) ConfName() string {
return pa.confName
}
// Conf returns the configuration of this path.
func (pa *path) Conf() *conf.PathConf {
return pa.conf
}
// Name returns the name of this path.
func (pa *path) Name() string {
return pa.name
}
// OnSourceExternalSetReady is called by an external source. // OnSourceExternalSetReady is called by an external source.
func (pa *path) OnSourceExternalSetReady(req sourceExtSetReadyReq) { func (pa *path) OnSourceExternalSetReady(req sourceExtSetReadyReq) {
req.Res = make(chan sourceExtSetReadyRes) req.Res = make(chan sourceExtSetReadyRes)