mirror of
https://github.com/bluenviron/mediamtx
synced 2025-02-20 21:46:56 +00:00
hls: rename remuxer into muxer
This commit is contained in:
parent
ca499a27c3
commit
4fdd42fa58
@ -254,7 +254,7 @@ components:
|
||||
properties:
|
||||
type:
|
||||
type: string
|
||||
enum: [hlsmuxer]
|
||||
enum: [hlsMuxer]
|
||||
|
||||
RTSPSession:
|
||||
type: object
|
||||
|
@ -88,7 +88,7 @@ create();
|
||||
</html>
|
||||
`
|
||||
|
||||
type hlsRemuxerRequest struct {
|
||||
type hlsMuxerRequest struct {
|
||||
Dir string
|
||||
File string
|
||||
Req *http.Request
|
||||
@ -96,29 +96,29 @@ type hlsRemuxerRequest struct {
|
||||
Res chan io.Reader
|
||||
}
|
||||
|
||||
type hlsRemuxerTrackIDPayloadPair struct {
|
||||
type hlsMuxerTrackIDPayloadPair struct {
|
||||
trackID int
|
||||
buf []byte
|
||||
}
|
||||
|
||||
type hlsRemuxerPathManager interface {
|
||||
type hlsMuxerPathManager interface {
|
||||
OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
|
||||
}
|
||||
|
||||
type hlsRemuxerParent interface {
|
||||
type hlsMuxerParent interface {
|
||||
Log(logger.Level, string, ...interface{})
|
||||
OnRemuxerClose(*hlsRemuxer)
|
||||
OnMuxerClose(*hlsMuxer)
|
||||
}
|
||||
|
||||
type hlsRemuxer struct {
|
||||
type hlsMuxer struct {
|
||||
hlsAlwaysRemux bool
|
||||
hlsSegmentCount int
|
||||
hlsSegmentDuration time.Duration
|
||||
readBufferCount int
|
||||
wg *sync.WaitGroup
|
||||
pathName string
|
||||
pathManager hlsRemuxerPathManager
|
||||
parent hlsRemuxerParent
|
||||
pathManager hlsMuxerPathManager
|
||||
parent hlsMuxerParent
|
||||
|
||||
ctx context.Context
|
||||
ctxCancel func()
|
||||
@ -126,13 +126,13 @@ type hlsRemuxer struct {
|
||||
ringBuffer *ringbuffer.RingBuffer
|
||||
lastRequestTime *int64
|
||||
muxer *hls.Muxer
|
||||
requests []hlsRemuxerRequest
|
||||
requests []hlsMuxerRequest
|
||||
|
||||
// in
|
||||
request chan hlsRemuxerRequest
|
||||
request chan hlsMuxerRequest
|
||||
}
|
||||
|
||||
func newHLSRemuxer(
|
||||
func newHLSMuxer(
|
||||
parentCtx context.Context,
|
||||
hlsAlwaysRemux bool,
|
||||
hlsSegmentCount int,
|
||||
@ -140,11 +140,11 @@ func newHLSRemuxer(
|
||||
readBufferCount int,
|
||||
wg *sync.WaitGroup,
|
||||
pathName string,
|
||||
pathManager hlsRemuxerPathManager,
|
||||
parent hlsRemuxerParent) *hlsRemuxer {
|
||||
pathManager hlsMuxerPathManager,
|
||||
parent hlsMuxerParent) *hlsMuxer {
|
||||
ctx, ctxCancel := context.WithCancel(parentCtx)
|
||||
|
||||
r := &hlsRemuxer{
|
||||
r := &hlsMuxer{
|
||||
hlsAlwaysRemux: hlsAlwaysRemux,
|
||||
hlsSegmentCount: hlsSegmentCount,
|
||||
hlsSegmentDuration: hlsSegmentDuration,
|
||||
@ -159,7 +159,7 @@ func newHLSRemuxer(
|
||||
v := time.Now().Unix()
|
||||
return &v
|
||||
}(),
|
||||
request: make(chan hlsRemuxerRequest),
|
||||
request: make(chan hlsMuxerRequest),
|
||||
}
|
||||
|
||||
r.log(logger.Info, "created")
|
||||
@ -170,28 +170,28 @@ func newHLSRemuxer(
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *hlsRemuxer) Close() {
|
||||
func (r *hlsMuxer) Close() {
|
||||
r.ctxCancel()
|
||||
}
|
||||
|
||||
func (r *hlsRemuxer) log(level logger.Level, format string, args ...interface{}) {
|
||||
r.parent.Log(level, "[remuxer %s] "+format, append([]interface{}{r.pathName}, args...)...)
|
||||
func (r *hlsMuxer) log(level logger.Level, format string, args ...interface{}) {
|
||||
r.parent.Log(level, "[muxer %s] "+format, append([]interface{}{r.pathName}, args...)...)
|
||||
}
|
||||
|
||||
// PathName returns the path name.
|
||||
func (r *hlsRemuxer) PathName() string {
|
||||
func (r *hlsMuxer) PathName() string {
|
||||
return r.pathName
|
||||
}
|
||||
|
||||
func (r *hlsRemuxer) run() {
|
||||
func (r *hlsMuxer) run() {
|
||||
defer r.wg.Done()
|
||||
defer r.log(logger.Info, "destroyed")
|
||||
|
||||
remuxerCtx, remuxerCtxCancel := context.WithCancel(context.Background())
|
||||
remuxerReady := make(chan struct{})
|
||||
remuxerErr := make(chan error)
|
||||
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
|
||||
innerReady := make(chan struct{})
|
||||
innerErr := make(chan error)
|
||||
go func() {
|
||||
remuxerErr <- r.runRemuxer(remuxerCtx, remuxerReady)
|
||||
innerErr <- r.runInner(innerCtx, innerReady)
|
||||
}()
|
||||
|
||||
isReady := false
|
||||
@ -200,8 +200,8 @@ outer:
|
||||
for {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
remuxerCtxCancel()
|
||||
<-remuxerErr
|
||||
innerCtxCancel()
|
||||
<-innerErr
|
||||
break outer
|
||||
|
||||
case req := <-r.request:
|
||||
@ -211,15 +211,15 @@ outer:
|
||||
r.requests = append(r.requests, req)
|
||||
}
|
||||
|
||||
case <-remuxerReady:
|
||||
case <-innerReady:
|
||||
isReady = true
|
||||
for _, req := range r.requests {
|
||||
r.handleRequest(req)
|
||||
}
|
||||
r.requests = nil
|
||||
|
||||
case err := <-remuxerErr:
|
||||
remuxerCtxCancel()
|
||||
case err := <-innerErr:
|
||||
innerCtxCancel()
|
||||
if err != nil {
|
||||
r.log(logger.Info, "ERR: %s", err)
|
||||
}
|
||||
@ -234,10 +234,10 @@ outer:
|
||||
req.Res <- nil
|
||||
}
|
||||
|
||||
r.parent.OnRemuxerClose(r)
|
||||
r.parent.OnMuxerClose(r)
|
||||
}
|
||||
|
||||
func (r *hlsRemuxer) runRemuxer(remuxerCtx context.Context, remuxerReady chan struct{}) error {
|
||||
func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
|
||||
res := r.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{
|
||||
Author: r,
|
||||
PathName: r.pathName,
|
||||
@ -311,7 +311,7 @@ func (r *hlsRemuxer) runRemuxer(remuxerCtx context.Context, remuxerReady chan st
|
||||
}
|
||||
defer r.muxer.Close()
|
||||
|
||||
remuxerReady <- struct{}{}
|
||||
innerReady <- struct{}{}
|
||||
|
||||
r.ringBuffer = ringbuffer.New(uint64(r.readBufferCount))
|
||||
|
||||
@ -327,7 +327,7 @@ func (r *hlsRemuxer) runRemuxer(remuxerCtx context.Context, remuxerReady chan st
|
||||
if !ok {
|
||||
return fmt.Errorf("terminated")
|
||||
}
|
||||
pair := data.(hlsRemuxerTrackIDPayloadPair)
|
||||
pair := data.(hlsMuxerTrackIDPayloadPair)
|
||||
|
||||
if videoTrack != nil && pair.trackID == videoTrackID {
|
||||
var pkt rtp.Packet
|
||||
@ -399,7 +399,7 @@ func (r *hlsRemuxer) runRemuxer(remuxerCtx context.Context, remuxerReady chan st
|
||||
case err := <-writerDone:
|
||||
return err
|
||||
|
||||
case <-remuxerCtx.Done():
|
||||
case <-innerCtx.Done():
|
||||
r.ringBuffer.Close()
|
||||
<-writerDone
|
||||
return nil
|
||||
@ -407,7 +407,7 @@ func (r *hlsRemuxer) runRemuxer(remuxerCtx context.Context, remuxerReady chan st
|
||||
}
|
||||
}
|
||||
|
||||
func (r *hlsRemuxer) handleRequest(req hlsRemuxerRequest) {
|
||||
func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) {
|
||||
atomic.StoreInt64(r.lastRequestTime, time.Now().Unix())
|
||||
|
||||
conf := r.path.Conf()
|
||||
@ -463,7 +463,7 @@ func (r *hlsRemuxer) handleRequest(req hlsRemuxerRequest) {
|
||||
}
|
||||
|
||||
// OnRequest is called by hlsserver.Server (forwarded from ServeHTTP).
|
||||
func (r *hlsRemuxer) OnRequest(req hlsRemuxerRequest) {
|
||||
func (r *hlsMuxer) OnRequest(req hlsMuxerRequest) {
|
||||
select {
|
||||
case r.request <- req:
|
||||
case <-r.ctx.Done():
|
||||
@ -473,20 +473,20 @@ func (r *hlsRemuxer) OnRequest(req hlsRemuxerRequest) {
|
||||
}
|
||||
|
||||
// OnReaderAccepted implements reader.
|
||||
func (r *hlsRemuxer) OnReaderAccepted() {
|
||||
r.log(logger.Info, "is remuxing into HLS")
|
||||
func (r *hlsMuxer) OnReaderAccepted() {
|
||||
r.log(logger.Info, "is converting into HLS")
|
||||
}
|
||||
|
||||
// OnReaderFrame implements reader.
|
||||
func (r *hlsRemuxer) OnReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
|
||||
func (r *hlsMuxer) OnReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
|
||||
if streamType == gortsplib.StreamTypeRTP {
|
||||
r.ringBuffer.Push(hlsRemuxerTrackIDPayloadPair{trackID, payload})
|
||||
r.ringBuffer.Push(hlsMuxerTrackIDPayloadPair{trackID, payload})
|
||||
}
|
||||
}
|
||||
|
||||
// OnReaderAPIDescribe implements reader.
|
||||
func (r *hlsRemuxer) OnReaderAPIDescribe() interface{} {
|
||||
func (r *hlsMuxer) OnReaderAPIDescribe() interface{} {
|
||||
return struct {
|
||||
Type string `json:"type"`
|
||||
}{"hlsremuxer"}
|
||||
}{"hlsMuxer"}
|
||||
}
|
@ -30,12 +30,12 @@ type hlsServer struct {
|
||||
ctxCancel func()
|
||||
wg sync.WaitGroup
|
||||
ln net.Listener
|
||||
remuxers map[string]*hlsRemuxer
|
||||
muxers map[string]*hlsMuxer
|
||||
|
||||
// in
|
||||
pathSourceReady chan *path
|
||||
request chan hlsRemuxerRequest
|
||||
remuxerClose chan *hlsRemuxer
|
||||
request chan hlsMuxerRequest
|
||||
muxerClose chan *hlsMuxer
|
||||
}
|
||||
|
||||
func newHLSServer(
|
||||
@ -67,10 +67,10 @@ func newHLSServer(
|
||||
ctx: ctx,
|
||||
ctxCancel: ctxCancel,
|
||||
ln: ln,
|
||||
remuxers: make(map[string]*hlsRemuxer),
|
||||
muxers: make(map[string]*hlsMuxer),
|
||||
pathSourceReady: make(chan *path),
|
||||
request: make(chan hlsRemuxerRequest),
|
||||
remuxerClose: make(chan *hlsRemuxer),
|
||||
request: make(chan hlsMuxerRequest),
|
||||
muxerClose: make(chan *hlsMuxer),
|
||||
}
|
||||
|
||||
s.Log(logger.Info, "listener opened on "+address)
|
||||
@ -105,18 +105,18 @@ outer:
|
||||
select {
|
||||
case pa := <-s.pathSourceReady:
|
||||
if s.hlsAlwaysRemux {
|
||||
s.findOrCreateRemuxer(pa.Name())
|
||||
s.findOrCreateMuxer(pa.Name())
|
||||
}
|
||||
|
||||
case req := <-s.request:
|
||||
r := s.findOrCreateRemuxer(req.Dir)
|
||||
r := s.findOrCreateMuxer(req.Dir)
|
||||
r.OnRequest(req)
|
||||
|
||||
case c := <-s.remuxerClose:
|
||||
if c2, ok := s.remuxers[c.PathName()]; !ok || c2 != c {
|
||||
case c := <-s.muxerClose:
|
||||
if c2, ok := s.muxers[c.PathName()]; !ok || c2 != c {
|
||||
continue
|
||||
}
|
||||
delete(s.remuxers, c.PathName())
|
||||
delete(s.muxers, c.PathName())
|
||||
|
||||
case <-s.ctx.Done():
|
||||
break outer
|
||||
@ -176,7 +176,7 @@ func (s *hlsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
dir = strings.TrimSuffix(dir, "/")
|
||||
|
||||
cres := make(chan io.Reader)
|
||||
hreq := hlsRemuxerRequest{
|
||||
hreq := hlsMuxerRequest{
|
||||
Dir: dir,
|
||||
File: fname,
|
||||
Req: r,
|
||||
@ -209,10 +209,10 @@ func (s *hlsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *hlsServer) findOrCreateRemuxer(pathName string) *hlsRemuxer {
|
||||
r, ok := s.remuxers[pathName]
|
||||
func (s *hlsServer) findOrCreateMuxer(pathName string) *hlsMuxer {
|
||||
r, ok := s.muxers[pathName]
|
||||
if !ok {
|
||||
r = newHLSRemuxer(
|
||||
r = newHLSMuxer(
|
||||
s.ctx,
|
||||
s.hlsAlwaysRemux,
|
||||
s.hlsSegmentCount,
|
||||
@ -222,15 +222,15 @@ func (s *hlsServer) findOrCreateRemuxer(pathName string) *hlsRemuxer {
|
||||
pathName,
|
||||
s.pathManager,
|
||||
s)
|
||||
s.remuxers[pathName] = r
|
||||
s.muxers[pathName] = r
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// OnRemuxerClose is called by hlsRemuxer.
|
||||
func (s *hlsServer) OnRemuxerClose(c *hlsRemuxer) {
|
||||
// OnMuxerClose is called by hlsMuxer.
|
||||
func (s *hlsServer) OnMuxerClose(c *hlsMuxer) {
|
||||
select {
|
||||
case s.remuxerClose <- c:
|
||||
case s.muxerClose <- c:
|
||||
case <-s.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user