mirror of
https://github.com/bluenviron/mediamtx
synced 2024-12-15 19:24:55 +00:00
hls: move muxer into dedicated object
This commit is contained in:
parent
f225363e7d
commit
c3c643c602
2
go.mod
2
go.mod
@ -5,7 +5,7 @@ go 1.16
|
||||
require (
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
|
||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
|
||||
github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62
|
||||
github.com/aler9/gortsplib v0.0.0-20210724151831-dae5a1f04033
|
||||
github.com/asticode/go-astits v1.9.0
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/fsnotify/fsnotify v1.4.9
|
||||
|
4
go.sum
4
go.sum
@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
|
||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
|
||||
github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62 h1:PPTqxgdDmDBQcDziEuLqS4VzmMTp5NSd7b3WZqQCtR4=
|
||||
github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI=
|
||||
github.com/aler9/gortsplib v0.0.0-20210724151831-dae5a1f04033 h1:Bf0hzdN1jUWsb5Mzezq1pd18EIBeKXxk5clIpHZJ1Lk=
|
||||
github.com/aler9/gortsplib v0.0.0-20210724151831-dae5a1f04033/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI=
|
||||
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
|
||||
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
|
||||
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=
|
||||
|
@ -5,10 +5,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -26,12 +24,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// an offset is needed to
|
||||
// - avoid negative PTS values
|
||||
// - avoid PTS < DTS during startup
|
||||
hlsConverterPTSOffset = 2 * time.Second
|
||||
|
||||
segmentMinAUCount = 100
|
||||
closeCheckPeriod = 1 * time.Second
|
||||
closeAfterInactivity = 60 * time.Second
|
||||
)
|
||||
@ -116,15 +108,12 @@ type hlsConverter struct {
|
||||
pathMan hlsConverterPathMan
|
||||
parent hlsConverterParent
|
||||
|
||||
ctx context.Context
|
||||
ctxCancel func()
|
||||
path readPublisherPath
|
||||
ringBuffer *ringbuffer.RingBuffer
|
||||
tsQueue []*hls.TSFile
|
||||
tsByName map[string]*hls.TSFile
|
||||
tsDeleteCount int
|
||||
tsMutex sync.RWMutex
|
||||
lasthlsConverterRequestTime *int64
|
||||
ctx context.Context
|
||||
ctxCancel func()
|
||||
path readPublisherPath
|
||||
ringBuffer *ringbuffer.RingBuffer
|
||||
lastRequestTime *int64
|
||||
muxer *hls.Muxer
|
||||
|
||||
// in
|
||||
request chan hlsConverterRequest
|
||||
@ -153,12 +142,11 @@ func newHLSConverter(
|
||||
parent: parent,
|
||||
ctx: ctx,
|
||||
ctxCancel: ctxCancel,
|
||||
lasthlsConverterRequestTime: func() *int64 {
|
||||
lastRequestTime: func() *int64 {
|
||||
v := time.Now().Unix()
|
||||
return &v
|
||||
}(),
|
||||
tsByName: make(map[string]*hls.TSFile),
|
||||
request: make(chan hlsConverterRequest),
|
||||
request: make(chan hlsConverterRequest),
|
||||
}
|
||||
|
||||
c.log(logger.Info, "opened")
|
||||
@ -294,14 +282,19 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||
return fmt.Errorf("unable to find a video or audio track")
|
||||
}
|
||||
|
||||
curTSFile := hls.NewTSFile(videoTrack, audioTrack)
|
||||
c.tsByName[curTSFile.Name()] = curTSFile
|
||||
c.tsQueue = append(c.tsQueue, curTSFile)
|
||||
|
||||
defer func() {
|
||||
curTSFile.Close()
|
||||
}()
|
||||
var err error
|
||||
c.muxer, err = hls.NewMuxer(
|
||||
c.hlsSegmentCount,
|
||||
c.hlsSegmentDuration,
|
||||
videoTrack,
|
||||
audioTrack,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.muxer.Close()
|
||||
|
||||
// start request handler only after muxer has been inizialized
|
||||
requestHandlerTerminate := make(chan struct{})
|
||||
requestHandlerDone := make(chan struct{})
|
||||
go c.runRequestHandler(requestHandlerTerminate, requestHandlerDone)
|
||||
@ -322,11 +315,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||
writerDone := make(chan error)
|
||||
go func() {
|
||||
writerDone <- func() error {
|
||||
startPCR := time.Now()
|
||||
var videoBuf [][]byte
|
||||
videoDTSEst := h264.NewDTSEstimator()
|
||||
videoInitialized := false
|
||||
audioAUCount := 0
|
||||
|
||||
for {
|
||||
data, ok := c.ringBuffer.Pull()
|
||||
@ -343,21 +332,9 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
// skip packets that are part of frames sent before
|
||||
// the initialization of the converter
|
||||
if !videoInitialized {
|
||||
typ := pkt.Payload[0] & 0x1F
|
||||
start := pkt.Payload[1] >> 7
|
||||
if typ == 28 && start != 1 { // FU-A
|
||||
continue
|
||||
}
|
||||
|
||||
videoInitialized = true
|
||||
}
|
||||
|
||||
nalus, pts, err := h264Decoder.DecodeRTP(&pkt)
|
||||
if err != nil {
|
||||
if err != rtph264.ErrMorePacketsNeeded {
|
||||
if err != rtph264.ErrMorePacketsNeeded && err != rtph264.ErrNonStartingPacketAndNoPrevious {
|
||||
c.log(logger.Warn, "unable to decode video track: %v", err)
|
||||
}
|
||||
continue
|
||||
@ -382,70 +359,24 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||
|
||||
// RTP marker means that all the NALUs with the same PTS have been received.
|
||||
// send them together.
|
||||
marker := (pair.buf[1] >> 7 & 0x1) > 0
|
||||
if marker {
|
||||
bufferHasIDR := func() bool {
|
||||
for _, nalu := range videoBuf {
|
||||
typ := h264.NALUType(nalu[0] & 0x1F)
|
||||
if typ == h264.NALUTypeIDR {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}()
|
||||
|
||||
// we received a marker packet but
|
||||
// - no IDR has been stored yet in current file
|
||||
// - there's no IDR in the buffer
|
||||
// data cannot be parsed, clear buffer
|
||||
if !bufferHasIDR && !curTSFile.FirstPacketWritten() {
|
||||
videoBuf = nil
|
||||
continue
|
||||
}
|
||||
|
||||
err := func() error {
|
||||
c.tsMutex.Lock()
|
||||
defer c.tsMutex.Unlock()
|
||||
|
||||
if bufferHasIDR {
|
||||
if curTSFile.FirstPacketWritten() &&
|
||||
curTSFile.Duration() >= c.hlsSegmentDuration {
|
||||
if curTSFile != nil {
|
||||
curTSFile.Close()
|
||||
}
|
||||
|
||||
curTSFile = hls.NewTSFile(videoTrack, audioTrack)
|
||||
|
||||
c.tsByName[curTSFile.Name()] = curTSFile
|
||||
c.tsQueue = append(c.tsQueue, curTSFile)
|
||||
if len(c.tsQueue) > c.hlsSegmentCount {
|
||||
delete(c.tsByName, c.tsQueue[0].Name())
|
||||
c.tsQueue = c.tsQueue[1:]
|
||||
c.tsDeleteCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
curTSFile.SetPCR(time.Since(startPCR))
|
||||
err := curTSFile.WriteH264(
|
||||
videoDTSEst.Feed(pts+hlsConverterPTSOffset),
|
||||
pts+hlsConverterPTSOffset,
|
||||
bufferHasIDR,
|
||||
videoBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
videoBuf = nil
|
||||
return nil
|
||||
}()
|
||||
if pkt.Marker {
|
||||
err := c.muxer.WriteH264(pts, videoBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
videoBuf = nil
|
||||
}
|
||||
|
||||
} else if audioTrack != nil && pair.trackID == audioTrackID {
|
||||
aus, pts, err := aacDecoder.Decode(pair.buf)
|
||||
var pkt rtp.Packet
|
||||
err := pkt.Unmarshal(pair.buf)
|
||||
if err != nil {
|
||||
c.log(logger.Warn, "unable to decode RTP packet: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
aus, pts, err := aacDecoder.DecodeRTP(&pkt)
|
||||
if err != nil {
|
||||
if err != rtpaac.ErrMorePacketsNeeded {
|
||||
c.log(logger.Warn, "unable to decode audio track: %v", err)
|
||||
@ -453,52 +384,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
err = func() error {
|
||||
c.tsMutex.Lock()
|
||||
defer c.tsMutex.Unlock()
|
||||
|
||||
if videoTrack == nil {
|
||||
if curTSFile.FirstPacketWritten() &&
|
||||
curTSFile.Duration() >= c.hlsSegmentDuration &&
|
||||
audioAUCount >= segmentMinAUCount {
|
||||
|
||||
if curTSFile != nil {
|
||||
curTSFile.Close()
|
||||
}
|
||||
|
||||
audioAUCount = 0
|
||||
curTSFile = hls.NewTSFile(videoTrack, audioTrack)
|
||||
c.tsByName[curTSFile.Name()] = curTSFile
|
||||
c.tsQueue = append(c.tsQueue, curTSFile)
|
||||
if len(c.tsQueue) > c.hlsSegmentCount {
|
||||
delete(c.tsByName, c.tsQueue[0].Name())
|
||||
c.tsQueue = c.tsQueue[1:]
|
||||
c.tsDeleteCount++
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !curTSFile.FirstPacketWritten() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
for i, au := range aus {
|
||||
auPTS := pts + time.Duration(i)*1000*time.Second/time.Duration(aacConfig.SampleRate)
|
||||
|
||||
audioAUCount++
|
||||
curTSFile.SetPCR(time.Since(startPCR))
|
||||
err := curTSFile.WriteAAC(
|
||||
aacConfig.SampleRate,
|
||||
aacConfig.ChannelCount,
|
||||
auPTS+hlsConverterPTSOffset,
|
||||
au)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
err = c.muxer.WriteAAC(pts, aus)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -513,7 +399,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
case <-closeCheckTicker.C:
|
||||
t := time.Unix(atomic.LoadInt64(c.lasthlsConverterRequestTime), 0)
|
||||
t := time.Unix(atomic.LoadInt64(c.lastRequestTime), 0)
|
||||
if time.Since(t) >= closeAfterInactivity {
|
||||
c.ringBuffer.Close()
|
||||
<-writerDone
|
||||
@ -542,7 +428,7 @@ func (c *hlsConverter) runRequestHandler(terminate chan struct{}, done chan stru
|
||||
case preq := <-c.request:
|
||||
req := preq
|
||||
|
||||
atomic.StoreInt64(c.lasthlsConverterRequestTime, time.Now().Unix())
|
||||
atomic.StoreInt64(c.lastRequestTime, time.Now().Unix())
|
||||
|
||||
conf := c.path.Conf()
|
||||
|
||||
@ -569,61 +455,26 @@ func (c *hlsConverter) runRequestHandler(terminate chan struct{}, done chan stru
|
||||
|
||||
switch {
|
||||
case req.File == "stream.m3u8":
|
||||
func() {
|
||||
c.tsMutex.RLock()
|
||||
defer c.tsMutex.RUnlock()
|
||||
r := c.muxer.Playlist()
|
||||
if r == nil {
|
||||
req.W.WriteHeader(http.StatusNotFound)
|
||||
req.Res <- nil
|
||||
continue
|
||||
}
|
||||
|
||||
if len(c.tsQueue) == 0 {
|
||||
req.W.WriteHeader(http.StatusNotFound)
|
||||
req.Res <- nil
|
||||
return
|
||||
}
|
||||
|
||||
cnt := "#EXTM3U\n"
|
||||
cnt += "#EXT-X-VERSION:3\n"
|
||||
cnt += "#EXT-X-ALLOW-CACHE:NO\n"
|
||||
|
||||
targetDuration := func() uint {
|
||||
ret := uint(math.Ceil(c.hlsSegmentDuration.Seconds()))
|
||||
|
||||
// EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION
|
||||
for _, f := range c.tsQueue {
|
||||
v2 := uint(math.Round(f.Duration().Seconds()))
|
||||
if v2 > ret {
|
||||
ret = v2
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}()
|
||||
cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n"
|
||||
|
||||
cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(c.tsDeleteCount), 10) + "\n"
|
||||
|
||||
for _, f := range c.tsQueue {
|
||||
cnt += "#EXTINF:" + strconv.FormatFloat(f.Duration().Seconds(), 'f', -1, 64) + ",\n"
|
||||
cnt += f.Name() + ".ts\n"
|
||||
}
|
||||
|
||||
req.W.Header().Set("Content-Type", `application/x-mpegURL`)
|
||||
req.Res <- bytes.NewReader([]byte(cnt))
|
||||
}()
|
||||
req.W.Header().Set("Content-Type", `application/x-mpegURL`)
|
||||
req.Res <- r
|
||||
|
||||
case strings.HasSuffix(req.File, ".ts"):
|
||||
base := strings.TrimSuffix(req.File, ".ts")
|
||||
|
||||
c.tsMutex.RLock()
|
||||
f, ok := c.tsByName[base]
|
||||
c.tsMutex.RUnlock()
|
||||
|
||||
if !ok {
|
||||
r := c.muxer.TSFile(req.File)
|
||||
if r == nil {
|
||||
req.W.WriteHeader(http.StatusNotFound)
|
||||
req.Res <- nil
|
||||
continue
|
||||
}
|
||||
|
||||
req.W.Header().Set("Content-Type", `video/MP2T`)
|
||||
req.Res <- f.NewReader()
|
||||
req.Res <- r
|
||||
|
||||
case req.File == "":
|
||||
req.Res <- bytes.NewReader([]byte(index))
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"github.com/aler9/gortsplib/pkg/rtpaac"
|
||||
"github.com/aler9/gortsplib/pkg/rtph264"
|
||||
"github.com/notedit/rtmp/av"
|
||||
"github.com/pion/rtp"
|
||||
|
||||
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
|
||||
"github.com/aler9/rtsp-simple-server/internal/h264"
|
||||
@ -290,9 +291,16 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
|
||||
pair := data.(rtmpConnTrackIDPayloadPair)
|
||||
|
||||
if videoTrack != nil && pair.trackID == videoTrackID {
|
||||
nalus, pts, err := h264Decoder.Decode(pair.buf)
|
||||
var pkt rtp.Packet
|
||||
err := pkt.Unmarshal(pair.buf)
|
||||
if err != nil {
|
||||
if err != rtph264.ErrMorePacketsNeeded {
|
||||
c.log(logger.Warn, "unable to decode RTP packet: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
nalus, pts, err := h264Decoder.DecodeRTP(&pkt)
|
||||
if err != nil {
|
||||
if err != rtph264.ErrMorePacketsNeeded && err != rtph264.ErrNonStartingPacketAndNoPrevious {
|
||||
c.log(logger.Warn, "unable to decode video track: %v", err)
|
||||
}
|
||||
continue
|
||||
@ -311,8 +319,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
|
||||
|
||||
// RTP marker means that all the NALUs with the same PTS have been received.
|
||||
// send them together.
|
||||
marker := (pair.buf[1] >> 7 & 0x1) > 0
|
||||
if marker {
|
||||
if pkt.Marker {
|
||||
data, err := h264.EncodeAVCC(videoBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -334,7 +341,14 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
|
||||
}
|
||||
|
||||
} else if audioTrack != nil && pair.trackID == audioTrackID {
|
||||
aus, pts, err := aacDecoder.Decode(pair.buf)
|
||||
var pkt rtp.Packet
|
||||
err := pkt.Unmarshal(pair.buf)
|
||||
if err != nil {
|
||||
c.log(logger.Warn, "unable to decode RTP packet: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
aus, pts, err := aacDecoder.DecodeRTP(&pkt)
|
||||
if err != nil {
|
||||
if err != rtpaac.ErrMorePacketsNeeded {
|
||||
c.log(logger.Warn, "unable to decode audio track: %v", err)
|
||||
|
@ -61,7 +61,7 @@ func (m *multiAccessBuffer) Write(p []byte) (int, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (m *multiAccessBuffer) NewReader() *multiAccessBufferReader {
|
||||
func (m *multiAccessBuffer) NewReader() io.Reader {
|
||||
return &multiAccessBufferReader{
|
||||
m: m,
|
||||
}
|
||||
|
238
internal/hls/muxer.go
Normal file
238
internal/hls/muxer.go
Normal file
@ -0,0 +1,238 @@
|
||||
package hls
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aler9/gortsplib"
|
||||
"github.com/aler9/gortsplib/pkg/rtpaac"
|
||||
|
||||
"github.com/aler9/rtsp-simple-server/internal/h264"
|
||||
)
|
||||
|
||||
const (
|
||||
// an offset is needed to
|
||||
// - avoid negative PTS values
|
||||
// - avoid PTS < DTS during startup
|
||||
ptsOffset = 2 * time.Second
|
||||
|
||||
segmentMinAUCount = 100
|
||||
)
|
||||
|
||||
// Muxer is a HLS muxer.
|
||||
type Muxer struct {
|
||||
hlsSegmentCount int
|
||||
hlsSegmentDuration time.Duration
|
||||
videoTrack *gortsplib.Track
|
||||
audioTrack *gortsplib.Track
|
||||
|
||||
aacConfig rtpaac.MPEG4AudioConfig
|
||||
startPCR time.Time
|
||||
videoDTSEst *h264.DTSEstimator
|
||||
audioAUCount int
|
||||
tsCurrent *tsFile
|
||||
tsQueue []*tsFile
|
||||
tsByName map[string]*tsFile
|
||||
tsDeleteCount int
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// NewMuxer allocates a Muxer.
|
||||
func NewMuxer(
|
||||
hlsSegmentCount int,
|
||||
hlsSegmentDuration time.Duration,
|
||||
videoTrack *gortsplib.Track,
|
||||
audioTrack *gortsplib.Track) (*Muxer, error) {
|
||||
var aacConfig rtpaac.MPEG4AudioConfig
|
||||
if audioTrack != nil {
|
||||
byts, err := audioTrack.ExtractDataAAC()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = aacConfig.Decode(byts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
m := &Muxer{
|
||||
hlsSegmentCount: hlsSegmentCount,
|
||||
hlsSegmentDuration: hlsSegmentDuration,
|
||||
videoTrack: videoTrack,
|
||||
audioTrack: audioTrack,
|
||||
aacConfig: aacConfig,
|
||||
startPCR: time.Now(),
|
||||
videoDTSEst: h264.NewDTSEstimator(),
|
||||
tsCurrent: newTSFile(videoTrack != nil, audioTrack != nil),
|
||||
tsByName: make(map[string]*tsFile),
|
||||
}
|
||||
|
||||
m.tsByName[m.tsCurrent.name] = m.tsCurrent
|
||||
m.tsQueue = append(m.tsQueue, m.tsCurrent)
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Close closes a Muxer.
|
||||
func (m *Muxer) Close() {
|
||||
m.tsCurrent.close()
|
||||
}
|
||||
|
||||
// WriteH264 writes H264 NALUs, grouped by PTS, into the muxer.
|
||||
func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error {
|
||||
idrPresent := func() bool {
|
||||
for _, nalu := range nalus {
|
||||
typ := h264.NALUType(nalu[0] & 0x1F)
|
||||
if typ == h264.NALUTypeIDR {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}()
|
||||
|
||||
// skip group silently until we find one with a IDR
|
||||
if !m.tsCurrent.firstPacketWritten && !idrPresent {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
if idrPresent {
|
||||
if m.tsCurrent.firstPacketWritten &&
|
||||
m.tsCurrent.duration() >= m.hlsSegmentDuration {
|
||||
if m.tsCurrent != nil {
|
||||
m.tsCurrent.close()
|
||||
}
|
||||
|
||||
m.tsCurrent = newTSFile(m.videoTrack != nil, m.audioTrack != nil)
|
||||
|
||||
m.tsByName[m.tsCurrent.name] = m.tsCurrent
|
||||
m.tsQueue = append(m.tsQueue, m.tsCurrent)
|
||||
if len(m.tsQueue) > m.hlsSegmentCount {
|
||||
delete(m.tsByName, m.tsQueue[0].name)
|
||||
m.tsQueue = m.tsQueue[1:]
|
||||
m.tsDeleteCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
m.tsCurrent.setPCR(time.Since(m.startPCR))
|
||||
err := m.tsCurrent.writeH264(
|
||||
m.videoDTSEst.Feed(pts+ptsOffset),
|
||||
pts+ptsOffset,
|
||||
idrPresent,
|
||||
nalus)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteAAC writes AAC AUs, grouped by PTS, into the muxer.
|
||||
func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
if m.videoTrack != nil {
|
||||
if m.tsCurrent.firstPacketWritten &&
|
||||
m.tsCurrent.duration() >= m.hlsSegmentDuration &&
|
||||
m.audioAUCount >= segmentMinAUCount {
|
||||
|
||||
if m.tsCurrent != nil {
|
||||
m.tsCurrent.close()
|
||||
}
|
||||
|
||||
m.audioAUCount = 0
|
||||
m.tsCurrent = newTSFile(m.videoTrack != nil, m.audioTrack != nil)
|
||||
m.tsByName[m.tsCurrent.name] = m.tsCurrent
|
||||
m.tsQueue = append(m.tsQueue, m.tsCurrent)
|
||||
if len(m.tsQueue) > m.hlsSegmentCount {
|
||||
delete(m.tsByName, m.tsQueue[0].name)
|
||||
m.tsQueue = m.tsQueue[1:]
|
||||
m.tsDeleteCount++
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !m.tsCurrent.firstPacketWritten {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
for i, au := range aus {
|
||||
auPTS := pts + time.Duration(i)*1000*time.Second/time.Duration(m.aacConfig.SampleRate)
|
||||
|
||||
m.audioAUCount++
|
||||
m.tsCurrent.setPCR(time.Since(m.startPCR))
|
||||
err := m.tsCurrent.writeAAC(
|
||||
m.aacConfig.SampleRate,
|
||||
m.aacConfig.ChannelCount,
|
||||
auPTS+ptsOffset,
|
||||
au)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Playlist returns a reader to read the HLS playlist in M3U8 format.
|
||||
func (m *Muxer) Playlist() io.Reader {
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
if len(m.tsQueue) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
cnt := "#EXTM3U\n"
|
||||
cnt += "#EXT-X-VERSION:3\n"
|
||||
cnt += "#EXT-X-ALLOW-CACHE:NO\n"
|
||||
|
||||
targetDuration := func() uint {
|
||||
ret := uint(math.Ceil(m.hlsSegmentDuration.Seconds()))
|
||||
|
||||
// EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION
|
||||
for _, f := range m.tsQueue {
|
||||
v2 := uint(math.Round(f.duration().Seconds()))
|
||||
if v2 > ret {
|
||||
ret = v2
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}()
|
||||
cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n"
|
||||
|
||||
cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(m.tsDeleteCount), 10) + "\n"
|
||||
|
||||
for _, f := range m.tsQueue {
|
||||
cnt += "#EXTINF:" + strconv.FormatFloat(f.duration().Seconds(), 'f', -1, 64) + ",\n"
|
||||
cnt += f.name + ".ts\n"
|
||||
}
|
||||
|
||||
return bytes.NewReader([]byte(cnt))
|
||||
}
|
||||
|
||||
// TSFile returns a reader to read a given MPEG-TS file.
|
||||
func (m *Muxer) TSFile(fname string) io.Reader {
|
||||
base := strings.TrimSuffix(fname, ".ts")
|
||||
|
||||
m.mutex.RLock()
|
||||
f, ok := m.tsByName[base]
|
||||
m.mutex.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return f.newReader()
|
||||
}
|
54
internal/hls/muxer_test.go
Normal file
54
internal/hls/muxer_test.go
Normal file
@ -0,0 +1,54 @@
|
||||
package hls
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aler9/gortsplib"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMuxer(t *testing.T) {
|
||||
videoTrack, err := gortsplib.NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04})
|
||||
require.NoError(t, err)
|
||||
|
||||
audioTrack, err := gortsplib.NewTrackAAC(97, []byte{17, 144})
|
||||
require.NoError(t, err)
|
||||
|
||||
m, err := NewMuxer(3, 5*time.Second, videoTrack, audioTrack)
|
||||
require.NoError(t, err)
|
||||
defer m.Close()
|
||||
|
||||
// group without IDR
|
||||
err = m.WriteH264(1*time.Second, [][]byte{
|
||||
{0x06},
|
||||
{0x07},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// group with IDR
|
||||
err = m.WriteH264(2*time.Second, [][]byte{
|
||||
{0x05},
|
||||
{0x06},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = m.WriteAAC(3*time.Second, [][]byte{
|
||||
{0x01, 0x02, 0x03, 0x04},
|
||||
{0x05, 0x06, 0x07, 0x08},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// group without IDR
|
||||
err = m.WriteH264(4*time.Second, [][]byte{
|
||||
{0x06},
|
||||
{0x07},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
byts, err := ioutil.ReadAll(m.Playlist())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Regexp(t, `^#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:5\n#EXT-X-MEDIA-SEQUENCE:0\n#EXTINF:2,\n[0-9]+\.ts\n$`, string(byts))
|
||||
}
|
@ -6,15 +6,13 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/aler9/gortsplib"
|
||||
"github.com/asticode/go-astits"
|
||||
|
||||
"github.com/aler9/rtsp-simple-server/internal/aac"
|
||||
"github.com/aler9/rtsp-simple-server/internal/h264"
|
||||
)
|
||||
|
||||
// TSFile is a MPEG-TS file.
|
||||
type TSFile struct {
|
||||
type tsFile struct {
|
||||
name string
|
||||
buf *multiAccessBuffer
|
||||
mux *astits.Muxer
|
||||
@ -25,30 +23,29 @@ type TSFile struct {
|
||||
maxPTS time.Duration
|
||||
}
|
||||
|
||||
// NewTSFile allocates a TSFile.
|
||||
func NewTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *TSFile {
|
||||
t := &TSFile{
|
||||
func newTSFile(hasVideoTrack bool, hasAudioTrack bool) *tsFile {
|
||||
t := &tsFile{
|
||||
buf: newMultiAccessBuffer(),
|
||||
name: strconv.FormatInt(time.Now().Unix(), 10),
|
||||
}
|
||||
|
||||
t.mux = astits.NewMuxer(context.Background(), t.buf)
|
||||
|
||||
if videoTrack != nil {
|
||||
if hasVideoTrack {
|
||||
t.mux.AddElementaryStream(astits.PMTElementaryStream{
|
||||
ElementaryPID: 256,
|
||||
StreamType: astits.StreamTypeH264Video,
|
||||
})
|
||||
}
|
||||
|
||||
if audioTrack != nil {
|
||||
if hasAudioTrack {
|
||||
t.mux.AddElementaryStream(astits.PMTElementaryStream{
|
||||
ElementaryPID: 257,
|
||||
StreamType: astits.StreamTypeAACAudio,
|
||||
})
|
||||
}
|
||||
|
||||
if videoTrack != nil {
|
||||
if hasVideoTrack {
|
||||
t.pcrTrackIsVideo = true
|
||||
t.mux.SetPCRPID(256)
|
||||
} else {
|
||||
@ -63,38 +60,23 @@ func NewTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *TSFile
|
||||
return t
|
||||
}
|
||||
|
||||
// Close closes a TSFile.
|
||||
func (t *TSFile) Close() error {
|
||||
func (t *tsFile) close() error {
|
||||
return t.buf.Close()
|
||||
}
|
||||
|
||||
// Name returns the file name.
|
||||
func (t *TSFile) Name() string {
|
||||
return t.name
|
||||
}
|
||||
|
||||
// Duration returns the file duration.
|
||||
func (t *TSFile) Duration() time.Duration {
|
||||
func (t *tsFile) duration() time.Duration {
|
||||
return t.maxPTS - t.minPTS
|
||||
}
|
||||
|
||||
// FirstPacketWritten returns whether a packet ha been written into the file.
|
||||
func (t *TSFile) FirstPacketWritten() bool {
|
||||
return t.firstPacketWritten
|
||||
}
|
||||
|
||||
// SetPCR sets the PCR.
|
||||
func (t *TSFile) SetPCR(pcr time.Duration) {
|
||||
func (t *tsFile) setPCR(pcr time.Duration) {
|
||||
t.pcr = pcr
|
||||
}
|
||||
|
||||
// NewReader allocates a reader to read the file.
|
||||
func (t *TSFile) NewReader() io.Reader {
|
||||
func (t *tsFile) newReader() io.Reader {
|
||||
return t.buf.NewReader()
|
||||
}
|
||||
|
||||
// WriteH264 writes H264 NALUs into the file.
|
||||
func (t *TSFile) WriteH264(dts time.Duration, pts time.Duration, isIDR bool, nalus [][]byte) error {
|
||||
func (t *tsFile) writeH264(dts time.Duration, pts time.Duration, isIDR bool, nalus [][]byte) error {
|
||||
if t.pcrTrackIsVideo {
|
||||
if !t.firstPacketWritten {
|
||||
t.firstPacketWritten = true
|
||||
@ -143,8 +125,7 @@ func (t *TSFile) WriteH264(dts time.Duration, pts time.Duration, isIDR bool, nal
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteAAC writes AAC AUs into the file.
|
||||
func (t *TSFile) WriteAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error {
|
||||
func (t *tsFile) writeAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error {
|
||||
if !t.pcrTrackIsVideo {
|
||||
if !t.firstPacketWritten {
|
||||
t.firstPacketWritten = true
|
||||
|
Loading…
Reference in New Issue
Block a user