hls muxer: fix race condition

This commit is contained in:
aler9 2023-01-08 21:16:20 +01:00
parent 20448ec6a8
commit d5dfce16ea
5 changed files with 66 additions and 66 deletions

View File

@ -39,10 +39,10 @@ type hlsMuxerResponse struct {
}
type hlsMuxerRequest struct {
dir string
path string
file string
ctx *gin.Context
res chan hlsMuxerResponse
res chan *hlsMuxerResponse
}
type hlsMuxerPathManager interface {
@ -128,14 +128,10 @@ func newHLSMuxer(
return &v
}(),
bytesSent: new(uint64),
chRequest: make(chan *hlsMuxerRequest),
chRequest: make(chan *hlsMuxerRequest, 1),
chAPIHLSMuxersList: make(chan hlsServerAPIMuxersListSubReq),
}
if req != nil {
m.requests = append(m.requests, req)
}
m.log(logger.Info, "created %s", func() string {
if remoteAddr == "" {
return "automatically"
@ -196,12 +192,17 @@ func (m *hlsMuxer) run() {
return errors.New("terminated")
case req := <-m.chRequest:
if isReady {
req.res <- hlsMuxerResponse{
switch {
case isRecreating:
req.res <- nil
case isReady:
req.res <- &hlsMuxerResponse{
muxer: m,
cb: m.handleRequest(req),
}
} else {
default:
m.requests = append(m.requests, req)
}
@ -216,7 +217,7 @@ func (m *hlsMuxer) run() {
case <-innerReady:
isReady = true
for _, req := range m.requests {
req.res <- hlsMuxerResponse{
req.res <- &hlsMuxerResponse{
muxer: m,
cb: m.handleRequest(req),
}
@ -254,13 +255,9 @@ func (m *hlsMuxer) run() {
func (m *hlsMuxer) clearQueuedRequests() {
for _, req := range m.requests {
req.res <- hlsMuxerResponse{
muxer: m,
cb: func() *hls.MuxerFileResponse {
return &hls.MuxerFileResponse{Status: http.StatusNotFound}
},
}
req.res <- nil
}
m.requests = nil
}
func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
@ -616,17 +613,12 @@ func (m *hlsMuxer) addSentBytes(n uint64) {
atomic.AddUint64(m.bytesSent, n)
}
// request is called by hlsserver.Server (forwarded from ServeHTTP).
func (m *hlsMuxer) request(req *hlsMuxerRequest) {
// processRequest is called by hlsserver.Server (forwarded from ServeHTTP).
func (m *hlsMuxer) processRequest(req *hlsMuxerRequest) {
select {
case m.chRequest <- req:
case <-m.ctx.Done():
req.res <- hlsMuxerResponse{
muxer: m,
cb: func() *hls.MuxerFileResponse {
return &hls.MuxerFileResponse{Status: http.StatusInternalServerError}
},
}
req.res <- nil
}
}

View File

@ -204,7 +204,7 @@ outer:
select {
case pa := <-s.chPathSourceReady:
if s.alwaysRemux {
s.findOrCreateMuxer(pa.Name(), "", nil)
s.createMuxer(pa.Name(), "", nil)
}
case pa := <-s.chPathSourceNotReady:
@ -217,7 +217,18 @@ outer:
}
case req := <-s.request:
s.findOrCreateMuxer(req.dir, req.ctx.ClientIP(), req)
r, ok := s.muxers[req.path]
switch {
case ok:
r.processRequest(req)
case s.alwaysRemux:
req.res <- nil
default:
r := s.createMuxer(req.path, req.ctx.ClientIP(), req)
r.processRequest(req)
}
case c := <-s.chMuxerClose:
if c2, ok := s.muxers[c.PathName()]; !ok || c2 != c {
@ -301,56 +312,53 @@ func (s *hlsServer) onRequest(ctx *gin.Context) {
dir = strings.TrimSuffix(dir, "/")
hreq := &hlsMuxerRequest{
dir: dir,
path: dir,
file: fname,
ctx: ctx,
res: make(chan hlsMuxerResponse),
res: make(chan *hlsMuxerResponse),
}
select {
case s.request <- hreq:
res1 := <-hreq.res
res := res1.cb()
if res1 != nil {
res := res1.cb()
for k, v := range res.Header {
ctx.Writer.Header().Set(k, v)
}
for k, v := range res.Header {
ctx.Writer.Header().Set(k, v)
}
ctx.Writer.WriteHeader(res.Status)
ctx.Writer.WriteHeader(res.Status)
if res.Body != nil {
n, _ := io.Copy(ctx.Writer, res.Body)
res1.muxer.addSentBytes(uint64(n))
if res.Body != nil {
n, _ := io.Copy(ctx.Writer, res.Body)
res1.muxer.addSentBytes(uint64(n))
}
}
case <-s.ctx.Done():
}
}
func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *hlsMuxerRequest) *hlsMuxer {
r, ok := s.muxers[pathName]
if !ok {
r = newHLSMuxer(
s.ctx,
pathName,
remoteAddr,
s.externalAuthenticationURL,
s.alwaysRemux,
s.variant,
s.segmentCount,
s.segmentDuration,
s.partDuration,
s.segmentMaxSize,
s.readBufferCount,
req,
&s.wg,
pathName,
s.pathManager,
s)
s.muxers[pathName] = r
} else if req != nil {
r.request(req)
}
func (s *hlsServer) createMuxer(pathName string, remoteAddr string, req *hlsMuxerRequest) *hlsMuxer {
r := newHLSMuxer(
s.ctx,
pathName,
remoteAddr,
s.externalAuthenticationURL,
s.alwaysRemux,
s.variant,
s.segmentCount,
s.segmentDuration,
s.partDuration,
s.segmentMaxSize,
s.readBufferCount,
req,
&s.wg,
pathName,
s.pathManager,
s)
s.muxers[pathName] = r
return r
}

View File

@ -146,7 +146,7 @@ func (v *muxerVariantFMP4) file(name string, msn string, part string, skip strin
var err error
v.initContent, err = init.Marshal()
if err != nil {
return &MuxerFileResponse{Status: http.StatusInternalServerError}
return &MuxerFileResponse{Status: http.StatusNotFound}
}
}

View File

@ -215,7 +215,7 @@ func (p *muxerVariantFMP4Playlist) playlistReader(msn string, part string, skip
}
if p.closed {
return &MuxerFileResponse{Status: http.StatusInternalServerError}
return &MuxerFileResponse{Status: http.StatusNotFound}
}
return &MuxerFileResponse{
@ -241,7 +241,7 @@ func (p *muxerVariantFMP4Playlist) playlistReader(msn string, part string, skip
}
if p.closed {
return &MuxerFileResponse{Status: http.StatusInternalServerError}
return &MuxerFileResponse{Status: http.StatusNotFound}
}
return &MuxerFileResponse{
@ -416,7 +416,7 @@ func (p *muxerVariantFMP4Playlist) segmentReader(fname string) *MuxerFileRespons
}
if p.closed {
return &MuxerFileResponse{Status: http.StatusInternalServerError}
return &MuxerFileResponse{Status: http.StatusNotFound}
}
return &MuxerFileResponse{

View File

@ -94,7 +94,7 @@ func (p *muxerVariantMPEGTSPlaylist) playlistReader() *MuxerFileResponse {
}
if p.closed {
return &MuxerFileResponse{Status: http.StatusInternalServerError}
return &MuxerFileResponse{Status: http.StatusNotFound}
}
return &MuxerFileResponse{