hls muxer: when hlsAlwaysRemux is on, automatically recreate muxers in

case of errors
This commit is contained in:
aler9 2022-07-24 13:05:15 +02:00
parent c769088e6b
commit 8a4743fe9a
4 changed files with 83 additions and 41 deletions

View File

@ -112,8 +112,8 @@ type hlsMuxerParent interface {
type hlsMuxer struct { type hlsMuxer struct {
name string name string
remoteAddr string
externalAuthenticationURL string externalAuthenticationURL string
hlsAlwaysRemux bool
hlsVariant conf.HLSVariant hlsVariant conf.HLSVariant
hlsSegmentCount int hlsSegmentCount int
hlsSegmentDuration conf.StringDuration hlsSegmentDuration conf.StringDuration
@ -143,7 +143,6 @@ func newHLSMuxer(
name string, name string,
remoteAddr string, remoteAddr string,
externalAuthenticationURL string, externalAuthenticationURL string,
hlsAlwaysRemux bool,
hlsVariant conf.HLSVariant, hlsVariant conf.HLSVariant,
hlsSegmentCount int, hlsSegmentCount int,
hlsSegmentDuration conf.StringDuration, hlsSegmentDuration conf.StringDuration,
@ -160,8 +159,8 @@ func newHLSMuxer(
m := &hlsMuxer{ m := &hlsMuxer{
name: name, name: name,
remoteAddr: remoteAddr,
externalAuthenticationURL: externalAuthenticationURL, externalAuthenticationURL: externalAuthenticationURL,
hlsAlwaysRemux: hlsAlwaysRemux,
hlsVariant: hlsVariant, hlsVariant: hlsVariant,
hlsSegmentCount: hlsSegmentCount, hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration, hlsSegmentDuration: hlsSegmentDuration,
@ -398,7 +397,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
select { select {
case <-closeCheckTicker.C: case <-closeCheckTicker.C:
t := time.Unix(atomic.LoadInt64(m.lastRequestTime), 0) t := time.Unix(atomic.LoadInt64(m.lastRequestTime), 0)
if !m.hlsAlwaysRemux && time.Since(t) >= closeAfterInactivity { if m.remoteAddr != "" && time.Since(t) >= closeAfterInactivity {
m.ringBuffer.Close() m.ringBuffer.Close()
<-writerDone <-writerDone
return fmt.Errorf("not used anymore") return fmt.Errorf("not used anymore")

View File

@ -76,10 +76,11 @@ type hlsServer struct {
muxers map[string]*hlsMuxer muxers map[string]*hlsMuxer
// in // in
pathSourceReady chan *path pathSourceReady chan *path
request chan *hlsMuxerRequest pathSourceNotReady chan *path
muxerClose chan *hlsMuxer request chan *hlsMuxerRequest
apiMuxersList chan hlsServerAPIMuxersListReq muxerClose chan *hlsMuxer
apiMuxersList chan hlsServerAPIMuxersListReq
} }
func newHLSServer( func newHLSServer(
@ -142,6 +143,7 @@ func newHLSServer(
tlsConfig: tlsConfig, tlsConfig: tlsConfig,
muxers: make(map[string]*hlsMuxer), muxers: make(map[string]*hlsMuxer),
pathSourceReady: make(chan *path), pathSourceReady: make(chan *path),
pathSourceNotReady: make(chan *path),
request: make(chan *hlsMuxerRequest), request: make(chan *hlsMuxerRequest),
muxerClose: make(chan *hlsMuxer), muxerClose: make(chan *hlsMuxer),
apiMuxersList: make(chan hlsServerAPIMuxersListReq), apiMuxersList: make(chan hlsServerAPIMuxersListReq),
@ -204,6 +206,15 @@ outer:
s.findOrCreateMuxer(pa.Name(), "", nil) s.findOrCreateMuxer(pa.Name(), "", nil)
} }
case pa := <-s.pathSourceNotReady:
if s.hlsAlwaysRemux {
c, ok := s.muxers[pa.Name()]
if ok {
c.close()
delete(s.muxers, pa.Name())
}
}
case req := <-s.request: case req := <-s.request:
s.findOrCreateMuxer(req.dir, req.ctx.ClientIP(), req) s.findOrCreateMuxer(req.dir, req.ctx.ClientIP(), req)
@ -213,6 +224,10 @@ outer:
} }
delete(s.muxers, c.PathName()) delete(s.muxers, c.PathName())
if s.hlsAlwaysRemux && c.remoteAddr == "" {
s.findOrCreateMuxer(c.PathName(), "", nil)
}
case req := <-s.apiMuxersList: case req := <-s.apiMuxersList:
muxers := make(map[string]*hlsMuxer) muxers := make(map[string]*hlsMuxer)
@ -331,7 +346,6 @@ func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *h
pathName, pathName,
remoteAddr, remoteAddr,
s.externalAuthenticationURL, s.externalAuthenticationURL,
s.hlsAlwaysRemux,
s.hlsVariant, s.hlsVariant,
s.hlsSegmentCount, s.hlsSegmentCount,
s.hlsSegmentDuration, s.hlsSegmentDuration,
@ -358,7 +372,7 @@ func (s *hlsServer) onMuxerClose(c *hlsMuxer) {
} }
} }
// onPathSourceReady is called by core. // onPathSourceReady is called by pathManager.
func (s *hlsServer) onPathSourceReady(pa *path) { func (s *hlsServer) onPathSourceReady(pa *path) {
select { select {
case s.pathSourceReady <- pa: case s.pathSourceReady <- pa:
@ -366,6 +380,14 @@ func (s *hlsServer) onPathSourceReady(pa *path) {
} }
} }
// onPathSourceNotReady is called by pathManager.
func (s *hlsServer) onPathSourceNotReady(pa *path) {
select {
case s.pathSourceNotReady <- pa:
case <-s.ctx.Done():
}
}
// onAPIHLSMuxersList is called by api. // onAPIHLSMuxersList is called by api.
func (s *hlsServer) onAPIHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes { func (s *hlsServer) onAPIHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes {
req.res = make(chan hlsServerAPIMuxersListRes) req.res = make(chan hlsServerAPIMuxersListRes)

View File

@ -62,6 +62,7 @@ func (pathErrAuthCritical) Error() string {
type pathParent interface { type pathParent interface {
log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
onPathSourceReady(*path) onPathSourceReady(*path)
onPathSourceNotReady(*path)
onPathClose(*path) onPathClose(*path)
} }
@ -533,7 +534,9 @@ func (pa *path) run() {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
} }
pa.sourceSetNotReady() if pa.sourceReady {
pa.sourceSetNotReady()
}
if pa.source != nil { if pa.source != nil {
if source, ok := pa.source.(sourceStatic); ok { if source, ok := pa.source.(sourceStatic); ok {
@ -655,8 +658,6 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
pa.sourceReady = true pa.sourceReady = true
pa.stream = newStream(tracks) pa.stream = newStream(tracks)
pa.parent.onPathSourceReady(pa)
if pa.conf.RunOnReady != "" { if pa.conf.RunOnReady != "" {
pa.log(logger.Info, "runOnReady command started") pa.log(logger.Info, "runOnReady command started")
pa.onReadyCmd = externalcmd.NewCmd( pa.onReadyCmd = externalcmd.NewCmd(
@ -668,9 +669,13 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
pa.log(logger.Info, "runOnReady command exited with code %d", co) pa.log(logger.Info, "runOnReady command exited with code %d", co)
}) })
} }
pa.parent.onPathSourceReady(pa)
} }
func (pa *path) sourceSetNotReady() { func (pa *path) sourceSetNotReady() {
pa.parent.onPathSourceNotReady(pa)
for r := range pa.readers { for r := range pa.readers {
pa.doReaderRemove(r) pa.doReaderRemove(r)
r.close() r.close()

View File

@ -11,7 +11,8 @@ import (
) )
type pathManagerHLSServer interface { type pathManagerHLSServer interface {
onPathSourceReady(pa *path) onPathSourceReady(*path)
onPathSourceNotReady(*path)
} }
type pathManagerParent interface { type pathManagerParent interface {
@ -35,14 +36,15 @@ type pathManager struct {
paths map[string]*path paths map[string]*path
// in // in
confReload chan map[string]*conf.PathConf confReload chan map[string]*conf.PathConf
pathClose chan *path pathClose chan *path
pathSourceReady chan *path pathSourceReady chan *path
describe chan pathDescribeReq pathSourceNotReady chan *path
readerSetupPlay chan pathReaderSetupPlayReq describe chan pathDescribeReq
publisherAnnounce chan pathPublisherAnnounceReq readerSetupPlay chan pathReaderSetupPlayReq
hlsServerSet chan pathManagerHLSServer publisherAnnounce chan pathPublisherAnnounceReq
apiPathsList chan pathAPIPathsListReq hlsServerSet chan pathManagerHLSServer
apiPathsList chan pathAPIPathsListReq
} }
func newPathManager( func newPathManager(
@ -59,25 +61,26 @@ func newPathManager(
ctx, ctxCancel := context.WithCancel(parentCtx) ctx, ctxCancel := context.WithCancel(parentCtx)
pm := &pathManager{ pm := &pathManager{
rtspAddress: rtspAddress, rtspAddress: rtspAddress,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
readBufferCount: readBufferCount, readBufferCount: readBufferCount,
pathConfs: pathConfs, pathConfs: pathConfs,
externalCmdPool: externalCmdPool, externalCmdPool: externalCmdPool,
metrics: metrics, metrics: metrics,
parent: parent, parent: parent,
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
paths: make(map[string]*path), paths: make(map[string]*path),
confReload: make(chan map[string]*conf.PathConf), confReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path), pathClose: make(chan *path),
pathSourceReady: make(chan *path), pathSourceReady: make(chan *path),
describe: make(chan pathDescribeReq), pathSourceNotReady: make(chan *path),
readerSetupPlay: make(chan pathReaderSetupPlayReq), describe: make(chan pathDescribeReq),
publisherAnnounce: make(chan pathPublisherAnnounceReq), readerSetupPlay: make(chan pathReaderSetupPlayReq),
hlsServerSet: make(chan pathManagerHLSServer), publisherAnnounce: make(chan pathPublisherAnnounceReq),
apiPathsList: make(chan pathAPIPathsListReq), hlsServerSet: make(chan pathManagerHLSServer),
apiPathsList: make(chan pathAPIPathsListReq),
} }
for pathConfName, pathConf := range pm.pathConfs { for pathConfName, pathConf := range pm.pathConfs {
@ -165,6 +168,11 @@ outer:
pm.hlsServer.onPathSourceReady(pa) pm.hlsServer.onPathSourceReady(pa)
} }
case pa := <-pm.pathSourceNotReady:
if pm.hlsServer != nil {
pm.hlsServer.onPathSourceNotReady(pa)
}
case req := <-pm.describe: case req := <-pm.describe:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName) pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
if err != nil { if err != nil {
@ -323,6 +331,14 @@ func (pm *pathManager) onPathSourceReady(pa *path) {
} }
} }
// onPathSourceNotReady is called by path.
func (pm *pathManager) onPathSourceNotReady(pa *path) {
select {
case pm.pathSourceNotReady <- pa:
case <-pm.ctx.Done():
}
}
// onPathClose is called by path. // onPathClose is called by path.
func (pm *pathManager) onPathClose(pa *path) { func (pm *pathManager) onPathClose(pa *path) {
select { select {