hls: fix discontinuity of TS counters between segments

This commit is contained in:
aler9 2021-09-07 12:02:44 +02:00
parent 98bf53a1e8
commit ef4b925209
5 changed files with 253 additions and 215 deletions

View File

@ -5,8 +5,6 @@ import (
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/rtsp-simple-server/internal/h264"
)
const (
@ -18,20 +16,9 @@ const (
// Muxer is a HLS muxer.
type Muxer struct {
hlsSegmentCount int
hlsSegmentDuration time.Duration
videoTrack *gortsplib.Track
audioTrack *gortsplib.Track
h264Conf *gortsplib.TrackConfigH264
aacConf *gortsplib.TrackConfigAAC
videoDTSEst *h264.DTSEstimator
audioAUCount int
currentSegment *segment
startPCR time.Time
startPTS time.Duration
primaryPlaylist *primaryPlaylist
streamPlaylist *streamPlaylist
primaryPlaylist *muxerPrimaryPlaylist
streamPlaylist *muxerStreamPlaylist
tsGenerator *muxerTSGenerator
}
// NewMuxer allocates a Muxer.
@ -58,16 +45,23 @@ func NewMuxer(
}
}
primaryPlaylist := newMuxerPrimaryPlaylist(videoTrack, audioTrack, h264Conf)
streamPlaylist := newMuxerStreamPlaylist(hlsSegmentCount)
tsGenerator := newMuxerTSGenerator(
hlsSegmentCount,
hlsSegmentDuration,
videoTrack,
audioTrack,
h264Conf,
aacConf,
streamPlaylist)
m := &Muxer{
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
videoTrack: videoTrack,
audioTrack: audioTrack,
h264Conf: h264Conf,
aacConf: aacConf,
currentSegment: newSegment(videoTrack, audioTrack, h264Conf, aacConf),
primaryPlaylist: newPrimaryPlaylist(videoTrack, audioTrack, h264Conf),
streamPlaylist: newStreamPlaylist(hlsSegmentCount),
primaryPlaylist: primaryPlaylist,
streamPlaylist: streamPlaylist,
tsGenerator: tsGenerator,
}
return m, nil
@ -80,93 +74,15 @@ func (m *Muxer) Close() {
// WriteH264 writes H264 NALUs, grouped by PTS, into the muxer.
func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error {
idrPresent := func() bool {
for _, nalu := range nalus {
typ := h264.NALUType(nalu[0] & 0x1F)
if typ == h264.NALUTypeIDR {
return true
}
}
return false
}()
// skip group silently until we find one with a IDR
if !m.currentSegment.firstPacketWritten && !idrPresent {
return nil
}
if m.currentSegment.firstPacketWritten {
if idrPresent &&
m.currentSegment.duration() >= m.hlsSegmentDuration {
m.streamPlaylist.pushSegment(m.currentSegment)
m.currentSegment = newSegment(m.videoTrack, m.audioTrack, m.h264Conf, m.aacConf)
m.currentSegment.setStartPCR(m.startPCR)
}
} else {
m.startPCR = time.Now()
m.startPTS = pts
m.currentSegment.setStartPCR(m.startPCR)
m.videoDTSEst = h264.NewDTSEstimator()
}
pts -= m.startPTS
err := m.currentSegment.writeH264(
m.videoDTSEst.Feed(pts)+pcrOffset,
pts+pcrOffset,
idrPresent,
nalus)
if err != nil {
return err
}
return nil
return m.tsGenerator.writeH264(pts, nalus)
}
// WriteAAC writes AAC AUs, grouped by PTS, into the muxer.
func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error {
if m.videoTrack == nil {
if m.currentSegment.firstPacketWritten {
if m.audioAUCount >= segmentMinAUCount &&
m.currentSegment.duration() >= m.hlsSegmentDuration {
m.audioAUCount = 0
m.streamPlaylist.pushSegment(m.currentSegment)
m.currentSegment = newSegment(m.videoTrack, m.audioTrack, m.h264Conf, m.aacConf)
m.currentSegment.setStartPCR(m.startPCR)
}
} else {
m.startPCR = time.Now()
m.startPTS = pts
m.currentSegment.setStartPCR(m.startPCR)
}
} else {
if !m.currentSegment.firstPacketWritten {
return nil
}
}
pts = pts - m.startPTS + pcrOffset
for _, au := range aus {
err := m.currentSegment.writeAAC(pts, au)
if err != nil {
return err
}
if m.videoTrack == nil {
m.audioAUCount++
}
pts += 1000 * time.Second / time.Duration(m.aacConf.SampleRate)
}
return nil
return m.tsGenerator.writeAAC(pts, aus)
}
// PrimaryPlaylist returns a reader to read the primary playlist
// PrimaryPlaylist returns a reader to read the primary playlist.
func (m *Muxer) PrimaryPlaylist() io.Reader {
return m.primaryPlaylist.reader()
}
@ -176,7 +92,7 @@ func (m *Muxer) StreamPlaylist() io.Reader {
return m.streamPlaylist.reader()
}
// Segment returns a reader to read a segment.
// Segment returns a reader to read a segment listed in the stream playlist.
func (m *Muxer) Segment(fname string) io.Reader {
return m.streamPlaylist.segment(fname)
}

View File

@ -9,7 +9,7 @@ import (
"github.com/aler9/gortsplib"
)
type primaryPlaylist struct {
type muxerPrimaryPlaylist struct {
videoTrack *gortsplib.Track
audioTrack *gortsplib.Track
h264Conf *gortsplib.TrackConfigH264
@ -17,12 +17,12 @@ type primaryPlaylist struct {
cnt []byte
}
func newPrimaryPlaylist(
func newMuxerPrimaryPlaylist(
videoTrack *gortsplib.Track,
audioTrack *gortsplib.Track,
h264Conf *gortsplib.TrackConfigH264,
) *primaryPlaylist {
p := &primaryPlaylist{
) *muxerPrimaryPlaylist {
p := &muxerPrimaryPlaylist{
videoTrack: videoTrack,
audioTrack: audioTrack,
h264Conf: h264Conf,
@ -45,6 +45,6 @@ func newPrimaryPlaylist(
return p
}
func (p *primaryPlaylist) reader() io.Reader {
func (p *muxerPrimaryPlaylist) reader() io.Reader {
return bytes.NewReader(p.cnt)
}

View File

@ -21,27 +21,27 @@ func (r *asyncReader) Read(buf []byte) (int, error) {
return r.reader.Read(buf)
}
type streamPlaylist struct {
type muxerStreamPlaylist struct {
hlsSegmentCount int
mutex sync.Mutex
cond *sync.Cond
closed bool
segments []*segment
segmentByName map[string]*segment
segments []*muxerTSSegment
segmentByName map[string]*muxerTSSegment
segmentDeleteCount int
}
func newStreamPlaylist(hlsSegmentCount int) *streamPlaylist {
p := &streamPlaylist{
func newMuxerStreamPlaylist(hlsSegmentCount int) *muxerStreamPlaylist {
p := &muxerStreamPlaylist{
hlsSegmentCount: hlsSegmentCount,
segmentByName: make(map[string]*segment),
segmentByName: make(map[string]*muxerTSSegment),
}
p.cond = sync.NewCond(&p.mutex)
return p
}
func (p *streamPlaylist) close() {
func (p *muxerStreamPlaylist) close() {
func() {
p.mutex.Lock()
defer p.mutex.Unlock()
@ -51,7 +51,7 @@ func (p *streamPlaylist) close() {
p.cond.Broadcast()
}
func (p *streamPlaylist) reader() io.Reader {
func (p *muxerStreamPlaylist) reader() io.Reader {
return &asyncReader{generator: func() []byte {
p.mutex.Lock()
defer p.mutex.Unlock()
@ -94,7 +94,7 @@ func (p *streamPlaylist) reader() io.Reader {
}}
}
func (p *streamPlaylist) segment(fname string) io.Reader {
func (p *muxerStreamPlaylist) segment(fname string) io.Reader {
base := strings.TrimSuffix(fname, ".ts")
p.mutex.Lock()
@ -108,7 +108,7 @@ func (p *streamPlaylist) segment(fname string) io.Reader {
return f.reader()
}
func (p *streamPlaylist) pushSegment(t *segment) {
func (p *muxerStreamPlaylist) pushSegment(t *muxerTSSegment) {
func() {
p.mutex.Lock()
defer p.mutex.Unlock()

View File

@ -0,0 +1,190 @@
package hls
import (
"context"
"time"
"github.com/aler9/gortsplib"
"github.com/asticode/go-astits"
"github.com/aler9/rtsp-simple-server/internal/aac"
"github.com/aler9/rtsp-simple-server/internal/h264"
)
type muxerTSGenerator struct {
hlsSegmentCount int
hlsSegmentDuration time.Duration
videoTrack *gortsplib.Track
audioTrack *gortsplib.Track
h264Conf *gortsplib.TrackConfigH264
aacConf *gortsplib.TrackConfigAAC
streamPlaylist *muxerStreamPlaylist
tm *astits.Muxer
currentSegment *muxerTSSegment
videoDTSEst *h264.DTSEstimator
audioAUCount int
startPCR time.Time
startPTS time.Duration
}
func newMuxerTSGenerator(
hlsSegmentCount int,
hlsSegmentDuration time.Duration,
videoTrack *gortsplib.Track,
audioTrack *gortsplib.Track,
h264Conf *gortsplib.TrackConfigH264,
aacConf *gortsplib.TrackConfigAAC,
streamPlaylist *muxerStreamPlaylist,
) *muxerTSGenerator {
m := &muxerTSGenerator{
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
videoTrack: videoTrack,
audioTrack: audioTrack,
streamPlaylist: streamPlaylist,
h264Conf: h264Conf,
aacConf: aacConf,
}
m.tm = astits.NewMuxer(context.Background(), m)
if videoTrack != nil {
m.tm.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 256,
StreamType: astits.StreamTypeH264Video,
})
}
if audioTrack != nil {
m.tm.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 257,
StreamType: astits.StreamTypeAACAudio,
})
}
if videoTrack != nil {
m.tm.SetPCRPID(256)
} else {
m.tm.SetPCRPID(257)
}
m.currentSegment = newMuxerTSSegment(m.videoTrack, m)
return m
}
func (m *muxerTSGenerator) Write(p []byte) (int, error) {
return m.currentSegment.write(p)
}
func (m *muxerTSGenerator) writeH264(pts time.Duration, nalus [][]byte) error {
idrPresent := func() bool {
for _, nalu := range nalus {
typ := h264.NALUType(nalu[0] & 0x1F)
if typ == h264.NALUTypeIDR {
return true
}
}
return false
}()
// skip group silently until we find one with a IDR
if !m.currentSegment.firstPacketWritten && !idrPresent {
return nil
}
// switch segment or initialize the first segment
if m.currentSegment.firstPacketWritten {
if idrPresent &&
m.currentSegment.duration() >= m.hlsSegmentDuration {
m.streamPlaylist.pushSegment(m.currentSegment)
m.currentSegment = newMuxerTSSegment(m.videoTrack, m)
}
} else {
m.startPCR = time.Now()
m.startPTS = pts
m.videoDTSEst = h264.NewDTSEstimator()
}
dts := m.videoDTSEst.Feed(pts-m.startPTS) + pcrOffset
pts = pts - m.startPTS + pcrOffset
filteredNALUs := [][]byte{
// prepend an AUD. This is required by video.js and iOS
{byte(h264.NALUTypeAccessUnitDelimiter), 240},
}
for _, nalu := range nalus {
// remove existing SPS, PPS, AUD
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter:
continue
}
// add SPS and PPS before IDR
if typ == h264.NALUTypeIDR {
filteredNALUs = append(filteredNALUs, m.h264Conf.SPS)
filteredNALUs = append(filteredNALUs, m.h264Conf.PPS)
}
filteredNALUs = append(filteredNALUs, nalu)
}
enc, err := h264.EncodeAnnexB(filteredNALUs)
if err != nil {
return err
}
return m.currentSegment.writeH264(m.startPCR, dts, pts, idrPresent, enc)
}
func (m *muxerTSGenerator) writeAAC(pts time.Duration, aus [][]byte) error {
// switch segment or initialize the first segment
if m.videoTrack == nil {
if m.currentSegment.firstPacketWritten {
if m.audioAUCount >= segmentMinAUCount &&
m.currentSegment.duration() >= m.hlsSegmentDuration {
m.audioAUCount = 0
m.streamPlaylist.pushSegment(m.currentSegment)
m.currentSegment = newMuxerTSSegment(m.videoTrack, m)
}
} else {
m.startPCR = time.Now()
m.startPTS = pts
}
} else {
if !m.currentSegment.firstPacketWritten {
return nil
}
}
pts = pts - m.startPTS + pcrOffset
for _, au := range aus {
enc, err := aac.EncodeADTS([]*aac.ADTSPacket{
{
SampleRate: m.aacConf.SampleRate,
ChannelCount: m.aacConf.ChannelCount,
AU: au,
},
})
if err != nil {
return err
}
err = m.currentSegment.writeAAC(m.startPCR, pts, enc)
if err != nil {
return err
}
if m.videoTrack == nil {
m.audioAUCount++
}
pts += 1000 * time.Second / time.Duration(m.aacConf.SampleRate)
}
return nil
}

View File

@ -2,68 +2,36 @@ package hls
import (
"bytes"
"context"
"io"
"strconv"
"time"
"github.com/aler9/gortsplib"
"github.com/asticode/go-astits"
"github.com/aler9/rtsp-simple-server/internal/aac"
"github.com/aler9/rtsp-simple-server/internal/h264"
)
type segment struct {
type muxerTSSegment struct {
videoTrack *gortsplib.Track
h264Conf *gortsplib.TrackConfigH264
aacConf *gortsplib.TrackConfigAAC
tsgen *muxerTSGenerator
name string
buf bytes.Buffer
mux *astits.Muxer
firstPacketWritten bool
minPTS time.Duration
maxPTS time.Duration
startPCR time.Time
pcrSendCounter int
}
func newSegment(
func newMuxerTSSegment(
videoTrack *gortsplib.Track,
audioTrack *gortsplib.Track,
h264Conf *gortsplib.TrackConfigH264,
aacConf *gortsplib.TrackConfigAAC,
) *segment {
t := &segment{
tsgen *muxerTSGenerator,
) *muxerTSSegment {
t := &muxerTSSegment{
videoTrack: videoTrack,
h264Conf: h264Conf,
aacConf: aacConf,
tsgen: tsgen,
name: strconv.FormatInt(time.Now().Unix(), 10),
}
t.mux = astits.NewMuxer(context.Background(), &t.buf)
if videoTrack != nil {
t.mux.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 256,
StreamType: astits.StreamTypeH264Video,
})
}
if audioTrack != nil {
t.mux.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 257,
StreamType: astits.StreamTypeAACAudio,
})
}
if videoTrack != nil {
t.mux.SetPCRPID(256)
} else {
t.mux.SetPCRPID(257)
}
// WriteTable() is called automatically when WriteData() is called with
// - PID == PCRPID
// - AdaptationField != nil
@ -72,23 +40,24 @@ func newSegment(
return t
}
func (t *segment) duration() time.Duration {
func (t *muxerTSSegment) duration() time.Duration {
return t.maxPTS - t.minPTS
}
func (t *segment) setStartPCR(startPCR time.Time) {
t.startPCR = startPCR
func (t *muxerTSSegment) write(p []byte) (int, error) {
return t.buf.Write(p)
}
func (t *segment) reader() io.Reader {
func (t *muxerTSSegment) reader() io.Reader {
return bytes.NewReader(t.buf.Bytes())
}
func (t *segment) writeH264(
func (t *muxerTSSegment) writeH264(
startPCR time.Time,
dts time.Duration,
pts time.Duration,
isIDR bool,
nalus [][]byte) error {
idrPresent bool,
enc []byte) error {
if !t.firstPacketWritten {
t.firstPacketWritten = true
t.minPTS = pts
@ -102,36 +71,9 @@ func (t *segment) writeH264(
}
}
filteredNALUs := [][]byte{
// prepend an AUD. This is required by video.js and iOS
{byte(h264.NALUTypeAccessUnitDelimiter), 240},
}
for _, nalu := range nalus {
// remove existing SPS, PPS, AUD
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter:
continue
}
// add SPS and PPS before IDR
if typ == h264.NALUTypeIDR {
filteredNALUs = append(filteredNALUs, t.h264Conf.SPS)
filteredNALUs = append(filteredNALUs, t.h264Conf.PPS)
}
filteredNALUs = append(filteredNALUs, nalu)
}
enc, err := h264.EncodeAnnexB(filteredNALUs)
if err != nil {
return err
}
var af *astits.PacketAdaptationField
if isIDR {
if idrPresent {
if af == nil {
af = &astits.PacketAdaptationField{}
}
@ -144,7 +86,7 @@ func (t *segment) writeH264(
af = &astits.PacketAdaptationField{}
}
af.HasPCR = true
af.PCR = &astits.ClockReference{Base: int64(time.Since(t.startPCR).Seconds() * 90000)}
af.PCR = &astits.ClockReference{Base: int64(time.Since(startPCR).Seconds() * 90000)}
t.pcrSendCounter = 3
}
t.pcrSendCounter--
@ -162,7 +104,7 @@ func (t *segment) writeH264(
oh.PTS = &astits.ClockReference{Base: int64(pts.Seconds() * 90000)}
}
_, err = t.mux.WriteData(&astits.MuxerData{
_, err := t.tsgen.tm.WriteData(&astits.MuxerData{
PID: 256,
AdaptationField: af,
PES: &astits.PESData{
@ -176,9 +118,10 @@ func (t *segment) writeH264(
return err
}
func (t *segment) writeAAC(
func (t *muxerTSSegment) writeAAC(
startPCR time.Time,
pts time.Duration,
au []byte) error {
enc []byte) error {
if t.videoTrack == nil {
if !t.firstPacketWritten {
t.firstPacketWritten = true
@ -194,17 +137,6 @@ func (t *segment) writeAAC(
}
}
adtsPkt, err := aac.EncodeADTS([]*aac.ADTSPacket{
{
SampleRate: t.aacConf.SampleRate,
ChannelCount: t.aacConf.ChannelCount,
AU: au,
},
})
if err != nil {
return err
}
af := &astits.PacketAdaptationField{
RandomAccessIndicator: true,
}
@ -214,12 +146,12 @@ func (t *segment) writeAAC(
// send PCR once in a while
if t.pcrSendCounter == 0 {
af.HasPCR = true
af.PCR = &astits.ClockReference{Base: int64(time.Since(t.startPCR).Seconds() * 90000)}
af.PCR = &astits.ClockReference{Base: int64(time.Since(startPCR).Seconds() * 90000)}
t.pcrSendCounter = 3
}
}
_, err = t.mux.WriteData(&astits.MuxerData{
_, err := t.tsgen.tm.WriteData(&astits.MuxerData{
PID: 257,
AdaptationField: af,
PES: &astits.PESData{
@ -229,10 +161,10 @@ func (t *segment) writeAAC(
PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS,
PTS: &astits.ClockReference{Base: int64(pts.Seconds() * 90000)},
},
PacketLength: uint16(len(adtsPkt) + 8),
PacketLength: uint16(len(enc) + 8),
StreamID: 192, // = audio
},
Data: adtsPkt,
Data: enc,
},
})
return err