hls: fix freeze when sourceOnDemand is yes and multiple sources are requested at the same time (#493)

This commit is contained in:
aler9 2021-08-10 15:00:07 +02:00
parent 1987572b67
commit 3872b42434

View File

@ -111,6 +111,7 @@ type hlsRemuxer struct {
ringBuffer *ringbuffer.RingBuffer
lastRequestTime *int64
muxer *hls.Muxer
requests []hlsRemuxerRequest
// in
request chan hlsRemuxerRequest
@ -177,22 +178,44 @@ func (r *hlsRemuxer) PathName() string {
func (r *hlsRemuxer) run() {
defer r.wg.Done()
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
runErr := make(chan error)
remuxerCtx, remuxerCtxCancel := context.WithCancel(context.Background())
remuxerReady := make(chan struct{})
remuxerErr := make(chan error)
go func() {
runErr <- r.runInner(innerCtx)
remuxerErr <- r.runRemuxer(remuxerCtx, remuxerReady)
}()
select {
case err := <-runErr:
innerCtxCancel()
if err != nil {
r.log(logger.Info, "ERR: %s", err)
}
isReady := false
case <-r.ctx.Done():
innerCtxCancel()
<-runErr
outer:
for {
select {
case <-r.ctx.Done():
remuxerCtxCancel()
<-remuxerErr
break outer
case req := <-r.request:
if isReady {
r.handleRequest(req)
} else {
r.requests = append(r.requests, req)
}
case <-remuxerReady:
isReady = true
for _, req := range r.requests {
r.handleRequest(req)
}
r.requests = nil
case err := <-remuxerErr:
remuxerCtxCancel()
if err != nil {
r.log(logger.Info, "ERR: %s", err)
}
break outer
}
}
r.ctxCancel()
@ -200,14 +223,13 @@ func (r *hlsRemuxer) run() {
r.parent.OnRemuxerClose(r)
}
func (r *hlsRemuxer) runInner(innerCtx context.Context) error {
func (r *hlsRemuxer) runRemuxer(remuxerCtx context.Context, remuxerReady chan struct{}) error {
res := r.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{
Author: r,
PathName: r.pathName,
IP: nil,
ValidateCredentials: nil,
})
if res.Err != nil {
return res.Err
}
@ -283,21 +305,11 @@ func (r *hlsRemuxer) runInner(innerCtx context.Context) error {
}
defer r.muxer.Close()
// start request handler only after muxer has been inizialized
requestHandlerTerminate := make(chan struct{})
requestHandlerDone := make(chan struct{})
go r.runRequestHandler(requestHandlerTerminate, requestHandlerDone)
defer func() {
close(requestHandlerTerminate)
<-requestHandlerDone
}()
remuxerReady <- struct{}{}
r.ringBuffer = ringbuffer.New(uint64(r.readBufferCount))
r.path.OnReaderPlay(pathReaderPlayReq{
Author: r,
})
r.path.OnReaderPlay(pathReaderPlayReq{Author: r})
writerDone := make(chan error)
go func() {
@ -396,7 +408,7 @@ func (r *hlsRemuxer) runInner(innerCtx context.Context) error {
case err := <-writerDone:
return err
case <-innerCtx.Done():
case <-remuxerCtx.Done():
r.ringBuffer.Close()
<-writerDone
return nil
@ -404,74 +416,62 @@ func (r *hlsRemuxer) runInner(innerCtx context.Context) error {
}
}
func (r *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct{}) {
defer close(done)
func (r *hlsRemuxer) handleRequest(req hlsRemuxerRequest) {
atomic.StoreInt64(r.lastRequestTime, time.Now().Unix())
for {
select {
case <-terminate:
conf := r.path.Conf()
if conf.ReadIPsParsed != nil {
tmp, _, _ := net.SplitHostPort(req.Req.RemoteAddr)
ip := net.ParseIP(tmp)
if !ipEqualOrInRange(ip, conf.ReadIPsParsed) {
r.log(logger.Info, "ERR: ip '%s' not allowed", ip)
req.W.WriteHeader(http.StatusUnauthorized)
req.Res <- nil
return
case preq := <-r.request:
req := preq
atomic.StoreInt64(r.lastRequestTime, time.Now().Unix())
conf := r.path.Conf()
if conf.ReadIPsParsed != nil {
tmp, _, _ := net.SplitHostPort(req.Req.RemoteAddr)
ip := net.ParseIP(tmp)
if !ipEqualOrInRange(ip, conf.ReadIPsParsed) {
r.log(logger.Info, "ERR: ip '%s' not allowed", ip)
req.W.WriteHeader(http.StatusUnauthorized)
req.Res <- nil
continue
}
}
if conf.ReadUser != "" {
user, pass, ok := req.Req.BasicAuth()
if !ok || user != conf.ReadUser || pass != conf.ReadPass {
req.W.Header().Set("WWW-Authenticate", `Basic realm="rtsp-simple-server"`)
req.W.WriteHeader(http.StatusUnauthorized)
req.Res <- nil
continue
}
}
switch {
case req.File == "stream.m3u8":
r := r.muxer.Playlist()
if r == nil {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
continue
}
req.W.Header().Set("Content-Type", `application/x-mpegURL`)
req.Res <- r
case strings.HasSuffix(req.File, ".ts"):
r := r.muxer.TSFile(req.File)
if r == nil {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
continue
}
req.W.Header().Set("Content-Type", `video/MP2T`)
req.Res <- r
case req.File == "":
req.Res <- bytes.NewReader([]byte(index))
default:
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
}
}
}
if conf.ReadUser != "" {
user, pass, ok := req.Req.BasicAuth()
if !ok || user != conf.ReadUser || pass != conf.ReadPass {
req.W.Header().Set("WWW-Authenticate", `Basic realm="rtsp-simple-server"`)
req.W.WriteHeader(http.StatusUnauthorized)
req.Res <- nil
return
}
}
switch {
case req.File == "stream.m3u8":
r := r.muxer.Playlist()
if r == nil {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
return
}
req.W.Header().Set("Content-Type", `application/x-mpegURL`)
req.Res <- r
case strings.HasSuffix(req.File, ".ts"):
r := r.muxer.TSFile(req.File)
if r == nil {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
return
}
req.W.Header().Set("Content-Type", `video/MP2T`)
req.Res <- r
case req.File == "":
req.Res <- bytes.NewReader([]byte(index))
default:
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
}
}
// OnRequest is called by hlsserver.Server (forwarded from ServeHTTP).