From 8a4743fe9a2b7c97a83d46ad9ce91598c2253ce5 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 24 Jul 2022 13:05:15 +0200 Subject: [PATCH] hls muxer: when hlsAlwaysRemux is on, automatically recreate muxers in case of errors --- internal/core/hls_muxer.go | 7 ++-- internal/core/hls_server.go | 34 ++++++++++++++--- internal/core/path.go | 11 ++++-- internal/core/path_manager.go | 72 +++++++++++++++++++++-------------- 4 files changed, 83 insertions(+), 41 deletions(-) diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 712ef98c..1ad5dcaa 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -112,8 +112,8 @@ type hlsMuxerParent interface { type hlsMuxer struct { name string + remoteAddr string externalAuthenticationURL string - hlsAlwaysRemux bool hlsVariant conf.HLSVariant hlsSegmentCount int hlsSegmentDuration conf.StringDuration @@ -143,7 +143,6 @@ func newHLSMuxer( name string, remoteAddr string, externalAuthenticationURL string, - hlsAlwaysRemux bool, hlsVariant conf.HLSVariant, hlsSegmentCount int, hlsSegmentDuration conf.StringDuration, @@ -160,8 +159,8 @@ func newHLSMuxer( m := &hlsMuxer{ name: name, + remoteAddr: remoteAddr, externalAuthenticationURL: externalAuthenticationURL, - hlsAlwaysRemux: hlsAlwaysRemux, hlsVariant: hlsVariant, hlsSegmentCount: hlsSegmentCount, hlsSegmentDuration: hlsSegmentDuration, @@ -398,7 +397,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) select { case <-closeCheckTicker.C: t := time.Unix(atomic.LoadInt64(m.lastRequestTime), 0) - if !m.hlsAlwaysRemux && time.Since(t) >= closeAfterInactivity { + if m.remoteAddr != "" && time.Since(t) >= closeAfterInactivity { m.ringBuffer.Close() <-writerDone return fmt.Errorf("not used anymore") diff --git a/internal/core/hls_server.go b/internal/core/hls_server.go index 414942e2..34b59cf2 100644 --- a/internal/core/hls_server.go +++ b/internal/core/hls_server.go @@ -76,10 +76,11 @@ type hlsServer struct { muxers map[string]*hlsMuxer // in - pathSourceReady chan *path - request chan *hlsMuxerRequest - muxerClose chan *hlsMuxer - apiMuxersList chan hlsServerAPIMuxersListReq + pathSourceReady chan *path + pathSourceNotReady chan *path + request chan *hlsMuxerRequest + muxerClose chan *hlsMuxer + apiMuxersList chan hlsServerAPIMuxersListReq } func newHLSServer( @@ -142,6 +143,7 @@ func newHLSServer( tlsConfig: tlsConfig, muxers: make(map[string]*hlsMuxer), pathSourceReady: make(chan *path), + pathSourceNotReady: make(chan *path), request: make(chan *hlsMuxerRequest), muxerClose: make(chan *hlsMuxer), apiMuxersList: make(chan hlsServerAPIMuxersListReq), @@ -204,6 +206,15 @@ outer: s.findOrCreateMuxer(pa.Name(), "", nil) } + case pa := <-s.pathSourceNotReady: + if s.hlsAlwaysRemux { + c, ok := s.muxers[pa.Name()] + if ok { + c.close() + delete(s.muxers, pa.Name()) + } + } + case req := <-s.request: s.findOrCreateMuxer(req.dir, req.ctx.ClientIP(), req) @@ -213,6 +224,10 @@ outer: } delete(s.muxers, c.PathName()) + if s.hlsAlwaysRemux && c.remoteAddr == "" { + s.findOrCreateMuxer(c.PathName(), "", nil) + } + case req := <-s.apiMuxersList: muxers := make(map[string]*hlsMuxer) @@ -331,7 +346,6 @@ func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *h pathName, remoteAddr, s.externalAuthenticationURL, - s.hlsAlwaysRemux, s.hlsVariant, s.hlsSegmentCount, s.hlsSegmentDuration, @@ -358,7 +372,7 @@ func (s *hlsServer) onMuxerClose(c *hlsMuxer) { } } -// onPathSourceReady is called by core. +// onPathSourceReady is called by pathManager. func (s *hlsServer) onPathSourceReady(pa *path) { select { case s.pathSourceReady <- pa: @@ -366,6 +380,14 @@ func (s *hlsServer) onPathSourceReady(pa *path) { } } +// onPathSourceNotReady is called by pathManager. +func (s *hlsServer) onPathSourceNotReady(pa *path) { + select { + case s.pathSourceNotReady <- pa: + case <-s.ctx.Done(): + } +} + // onAPIHLSMuxersList is called by api. func (s *hlsServer) onAPIHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes { req.res = make(chan hlsServerAPIMuxersListRes) diff --git a/internal/core/path.go b/internal/core/path.go index 58981424..4facd883 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -62,6 +62,7 @@ func (pathErrAuthCritical) Error() string { type pathParent interface { log(logger.Level, string, ...interface{}) onPathSourceReady(*path) + onPathSourceNotReady(*path) onPathClose(*path) } @@ -533,7 +534,9 @@ func (pa *path) run() { req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} } - pa.sourceSetNotReady() + if pa.sourceReady { + pa.sourceSetNotReady() + } if pa.source != nil { if source, ok := pa.source.(sourceStatic); ok { @@ -655,8 +658,6 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) { pa.sourceReady = true pa.stream = newStream(tracks) - pa.parent.onPathSourceReady(pa) - if pa.conf.RunOnReady != "" { pa.log(logger.Info, "runOnReady command started") pa.onReadyCmd = externalcmd.NewCmd( @@ -668,9 +669,13 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) { pa.log(logger.Info, "runOnReady command exited with code %d", co) }) } + + pa.parent.onPathSourceReady(pa) } func (pa *path) sourceSetNotReady() { + pa.parent.onPathSourceNotReady(pa) + for r := range pa.readers { pa.doReaderRemove(r) r.close() diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index b20d6ed3..da6003b4 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -11,7 +11,8 @@ import ( ) type pathManagerHLSServer interface { - onPathSourceReady(pa *path) + onPathSourceReady(*path) + onPathSourceNotReady(*path) } type pathManagerParent interface { @@ -35,14 +36,15 @@ type pathManager struct { paths map[string]*path // in - confReload chan map[string]*conf.PathConf - pathClose chan *path - pathSourceReady chan *path - describe chan pathDescribeReq - readerSetupPlay chan pathReaderSetupPlayReq - publisherAnnounce chan pathPublisherAnnounceReq - hlsServerSet chan pathManagerHLSServer - apiPathsList chan pathAPIPathsListReq + confReload chan map[string]*conf.PathConf + pathClose chan *path + pathSourceReady chan *path + pathSourceNotReady chan *path + describe chan pathDescribeReq + readerSetupPlay chan pathReaderSetupPlayReq + publisherAnnounce chan pathPublisherAnnounceReq + hlsServerSet chan pathManagerHLSServer + apiPathsList chan pathAPIPathsListReq } func newPathManager( @@ -59,25 +61,26 @@ func newPathManager( ctx, ctxCancel := context.WithCancel(parentCtx) pm := &pathManager{ - rtspAddress: rtspAddress, - readTimeout: readTimeout, - writeTimeout: writeTimeout, - readBufferCount: readBufferCount, - pathConfs: pathConfs, - externalCmdPool: externalCmdPool, - metrics: metrics, - parent: parent, - ctx: ctx, - ctxCancel: ctxCancel, - paths: make(map[string]*path), - confReload: make(chan map[string]*conf.PathConf), - pathClose: make(chan *path), - pathSourceReady: make(chan *path), - describe: make(chan pathDescribeReq), - readerSetupPlay: make(chan pathReaderSetupPlayReq), - publisherAnnounce: make(chan pathPublisherAnnounceReq), - hlsServerSet: make(chan pathManagerHLSServer), - apiPathsList: make(chan pathAPIPathsListReq), + rtspAddress: rtspAddress, + readTimeout: readTimeout, + writeTimeout: writeTimeout, + readBufferCount: readBufferCount, + pathConfs: pathConfs, + externalCmdPool: externalCmdPool, + metrics: metrics, + parent: parent, + ctx: ctx, + ctxCancel: ctxCancel, + paths: make(map[string]*path), + confReload: make(chan map[string]*conf.PathConf), + pathClose: make(chan *path), + pathSourceReady: make(chan *path), + pathSourceNotReady: make(chan *path), + describe: make(chan pathDescribeReq), + readerSetupPlay: make(chan pathReaderSetupPlayReq), + publisherAnnounce: make(chan pathPublisherAnnounceReq), + hlsServerSet: make(chan pathManagerHLSServer), + apiPathsList: make(chan pathAPIPathsListReq), } for pathConfName, pathConf := range pm.pathConfs { @@ -165,6 +168,11 @@ outer: pm.hlsServer.onPathSourceReady(pa) } + case pa := <-pm.pathSourceNotReady: + if pm.hlsServer != nil { + pm.hlsServer.onPathSourceNotReady(pa) + } + case req := <-pm.describe: pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName) if err != nil { @@ -323,6 +331,14 @@ func (pm *pathManager) onPathSourceReady(pa *path) { } } +// onPathSourceNotReady is called by path. +func (pm *pathManager) onPathSourceNotReady(pa *path) { + select { + case pm.pathSourceNotReady <- pa: + case <-pm.ctx.Done(): + } +} + // onPathClose is called by path. func (pm *pathManager) onPathClose(pa *path) { select {