hls: insert segments into playlist only after they're complete

In this way, EXT-X-TARGETDURATION and EXTINF are always filled correctly.

If no segments have been generated yet, the playlist is not returned
until a segment is inserted or the muxer is closed. This causes timeout
issues on iOS Safari, that are solved by waiting for a fetch() before starting
the video.
This commit is contained in:
aler9 2021-08-23 12:26:06 +02:00
parent 6b6d314a07
commit 92523c2a13
8 changed files with 291 additions and 266 deletions

View File

@ -47,7 +47,7 @@ const index = `<!DOCTYPE html>
</head>
<body>
<div id="video-wrapper"><video id="video" muted controls></video></div>
<div id="video-wrapper"><video id="video" muted controls autoplay></video></div>
<script src="https://cdn.jsdelivr.net/npm/hls.js@1.0.0"></script>
@ -57,8 +57,14 @@ const create = () => {
const video = document.getElementById('video');
if (video.canPlayType('application/vnd.apple.mpegurl')) {
video.src = 'index.m3u8';
video.play();
// since it's not possible to detect timeout errors in iOS,
// wait for the playlist to be available before starting the stream
fetch('stream.m3u8')
.then(() => {
video.src = 'index.m3u8';
video.play();
});
} else {
const hls = new Hls({
progressive: false,
@ -68,9 +74,7 @@ const create = () => {
if (data.fatal) {
hls.destroy();
setTimeout(() => {
create();
}, 2000);
setTimeout(create, 2000);
}
});
@ -79,8 +83,9 @@ const create = () => {
video.play();
}
}
create();
};
window.addEventListener('DOMContentLoaded', create);
</script>
@ -443,7 +448,7 @@ func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) {
req.Res <- r.muxer.StreamPlaylist()
case strings.HasSuffix(req.File, ".ts"):
r := r.muxer.TSFile(req.File)
r := r.muxer.Segment(req.File)
if r == nil {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil

View File

@ -1,68 +0,0 @@
package hls
import (
"bytes"
"io"
"sync"
)
type multiAccessBufferReader struct {
m *multiAccessBuffer
readPos int
}
func (r *multiAccessBufferReader) Read(p []byte) (int, error) {
r.m.mutex.Lock()
defer r.m.mutex.Unlock()
if r.m.closed && r.m.writePos == r.readPos {
return 0, io.EOF
}
for !r.m.closed && r.m.writePos == r.readPos {
r.m.cond.Wait()
}
buf := r.m.buf.Bytes()
n := copy(p, buf[r.readPos:])
r.readPos += n
return n, nil
}
type multiAccessBuffer struct {
buf bytes.Buffer
closed bool
writePos int
mutex sync.Mutex
cond *sync.Cond
}
func newMultiAccessBuffer() *multiAccessBuffer {
m := &multiAccessBuffer{}
m.cond = sync.NewCond(&m.mutex)
return m
}
func (m *multiAccessBuffer) Close() error {
m.mutex.Lock()
m.closed = true
m.mutex.Unlock()
m.cond.Broadcast()
return nil
}
func (m *multiAccessBuffer) Write(p []byte) (int, error) {
m.mutex.Lock()
n, _ := m.buf.Write(p)
m.writePos += n
m.mutex.Unlock()
m.cond.Broadcast()
return n, nil
}
func (m *multiAccessBuffer) NewReader() io.Reader {
return &multiAccessBufferReader{
m: m,
}
}

View File

@ -1,39 +0,0 @@
package hls
import (
"io"
"testing"
"github.com/stretchr/testify/require"
)
func TestMultiAccessBuffer(t *testing.T) {
m := newMultiAccessBuffer()
m.Write([]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08})
r := m.NewReader()
buf := make([]byte, 4)
n, err := r.Read(buf)
require.NoError(t, err)
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, buf[:n])
buf = make([]byte, 10)
n, err = r.Read(buf)
require.NoError(t, err)
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, buf[:n])
m.Write([]byte{0x09, 0x0a, 0x0b, 0x0c})
m.Close()
buf = make([]byte, 10)
n, err = r.Read(buf)
require.NoError(t, err)
require.Equal(t, []byte{0x09, 0x0a, 0x0b, 0x0c}, buf[:n])
buf = make([]byte, 10)
_, err = r.Read(buf)
require.Equal(t, io.EOF, err)
}

View File

@ -1,13 +1,7 @@
package hls
import (
"bytes"
"encoding/hex"
"io"
"math"
"strconv"
"strings"
"sync"
"time"
"github.com/aler9/gortsplib"
@ -32,18 +26,16 @@ type Muxer struct {
videoTrack *gortsplib.Track
audioTrack *gortsplib.Track
h264SPS []byte
h264PPS []byte
aacConfig rtpaac.MPEG4AudioConfig
videoDTSEst *h264.DTSEstimator
audioAUCount int
tsCurrent *tsFile
tsQueue []*tsFile
tsByName map[string]*tsFile
tsDeleteCount int
mutex sync.RWMutex
startPCR time.Time
startPTS time.Duration
h264SPS []byte
h264PPS []byte
aacConfig rtpaac.MPEG4AudioConfig
videoDTSEst *h264.DTSEstimator
audioAUCount int
currentSegment *segment
startPCR time.Time
startPTS time.Duration
primaryPlaylist *primaryPlaylist
streamPlaylist *streamPlaylist
}
// NewMuxer allocates a Muxer.
@ -84,19 +76,17 @@ func NewMuxer(
h264PPS: h264PPS,
aacConfig: aacConfig,
videoDTSEst: h264.NewDTSEstimator(),
tsCurrent: newTSFile(videoTrack, audioTrack),
tsByName: make(map[string]*tsFile),
currentSegment: newSegment(videoTrack, audioTrack, h264SPS, h264PPS),
primaryPlaylist: newPrimaryPlaylist(videoTrack, audioTrack, h264SPS, h264PPS),
streamPlaylist: newStreamPlaylist(hlsSegmentCount),
}
m.tsByName[m.tsCurrent.name] = m.tsCurrent
m.tsQueue = append(m.tsQueue, m.tsCurrent)
return m, nil
}
// Close closes a Muxer.
func (m *Muxer) Close() {
m.tsCurrent.close()
m.streamPlaylist.close()
}
// WriteH264 writes H264 NALUs, grouped by PTS, into the muxer.
@ -112,38 +102,27 @@ func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error {
}()
// skip group silently until we find one with a IDR
if !m.tsCurrent.firstPacketWritten && !idrPresent {
if !m.currentSegment.firstPacketWritten && !idrPresent {
return nil
}
m.mutex.Lock()
defer m.mutex.Unlock()
if m.currentSegment.firstPacketWritten {
if idrPresent &&
m.currentSegment.duration() >= m.hlsSegmentDuration {
m.streamPlaylist.pushSegment(m.currentSegment)
if idrPresent &&
m.tsCurrent.firstPacketWritten &&
m.tsCurrent.duration() >= m.hlsSegmentDuration {
m.tsCurrent.close()
m.tsCurrent = newTSFile(m.videoTrack, m.audioTrack)
m.tsCurrent.setStartPCR(m.startPCR)
m.tsByName[m.tsCurrent.name] = m.tsCurrent
m.tsQueue = append(m.tsQueue, m.tsCurrent)
if len(m.tsQueue) > m.hlsSegmentCount {
delete(m.tsByName, m.tsQueue[0].name)
m.tsQueue = m.tsQueue[1:]
m.tsDeleteCount++
m.currentSegment = newSegment(m.videoTrack, m.audioTrack, m.h264SPS, m.h264PPS)
m.currentSegment.setStartPCR(m.startPCR)
}
} else if !m.tsCurrent.firstPacketWritten {
} else {
m.startPCR = time.Now()
m.startPTS = pts
m.tsCurrent.setStartPCR(m.startPCR)
m.currentSegment.setStartPCR(m.startPCR)
}
pts = pts + ptsOffset - m.startPTS
err := m.tsCurrent.writeH264(
m.h264SPS,
m.h264PPS,
err := m.currentSegment.writeH264(
m.videoDTSEst.Feed(pts),
pts,
idrPresent,
@ -157,34 +136,24 @@ func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error {
// WriteAAC writes AAC AUs, grouped by PTS, into the muxer.
func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.videoTrack == nil {
if m.audioAUCount >= segmentMinAUCount &&
m.tsCurrent.firstPacketWritten &&
m.tsCurrent.duration() >= m.hlsSegmentDuration {
m.tsCurrent.close()
if m.currentSegment.firstPacketWritten {
if m.audioAUCount >= segmentMinAUCount &&
m.currentSegment.duration() >= m.hlsSegmentDuration {
m.audioAUCount = 0
m.audioAUCount = 0
m.streamPlaylist.pushSegment(m.currentSegment)
m.tsCurrent = newTSFile(m.videoTrack, m.audioTrack)
m.tsCurrent.setStartPCR(m.startPCR)
m.tsByName[m.tsCurrent.name] = m.tsCurrent
m.tsQueue = append(m.tsQueue, m.tsCurrent)
if len(m.tsQueue) > m.hlsSegmentCount {
delete(m.tsByName, m.tsQueue[0].name)
m.tsQueue = m.tsQueue[1:]
m.tsDeleteCount++
m.currentSegment = newSegment(m.videoTrack, m.audioTrack, m.h264SPS, m.h264PPS)
m.currentSegment.setStartPCR(m.startPCR)
}
} else if !m.tsCurrent.firstPacketWritten {
} else {
m.startPCR = time.Now()
m.startPTS = pts
m.tsCurrent.setStartPCR(m.startPCR)
m.currentSegment.setStartPCR(m.startPCR)
}
} else {
if !m.tsCurrent.firstPacketWritten {
if !m.currentSegment.firstPacketWritten {
return nil
}
}
@ -194,7 +163,7 @@ func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error {
for i, au := range aus {
auPTS := pts + time.Duration(i)*1000*time.Second/time.Duration(m.aacConfig.SampleRate)
err := m.tsCurrent.writeAAC(
err := m.currentSegment.writeAAC(
m.aacConfig.SampleRate,
m.aacConfig.ChannelCount,
auPTS,
@ -211,68 +180,15 @@ func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error {
// PrimaryPlaylist returns a reader to read the primary playlist
func (m *Muxer) PrimaryPlaylist() io.Reader {
var codecs []string
if m.videoTrack != nil {
codecs = append(codecs, "avc1."+hex.EncodeToString(m.h264SPS[1:4]))
}
if m.audioTrack != nil {
codecs = append(codecs, "mp4a.40.2")
}
cnt := "#EXTM3U\n"
cnt += "#EXT-X-STREAM-INF:BANDWIDTH=200000,CODECS=\"" + strings.Join(codecs, ",") + "\"\n"
cnt += "stream.m3u8\n"
return bytes.NewReader([]byte(cnt))
return m.primaryPlaylist.reader()
}
// StreamPlaylist returns a reader to read the stream playlist.
func (m *Muxer) StreamPlaylist() io.Reader {
cnt := "#EXTM3U\n"
cnt += "#EXT-X-VERSION:3\n"
cnt += "#EXT-X-ALLOW-CACHE:NO\n"
m.mutex.RLock()
defer m.mutex.RUnlock()
targetDuration := func() uint {
ret := uint(math.Ceil(m.hlsSegmentDuration.Seconds()))
// EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION
for _, f := range m.tsQueue {
v2 := uint(math.Round(f.duration().Seconds()))
if v2 > ret {
ret = v2
}
}
return ret
}()
cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n"
cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(m.tsDeleteCount), 10) + "\n"
for _, f := range m.tsQueue {
cnt += "#EXTINF:" + strconv.FormatFloat(f.duration().Seconds(), 'f', -1, 64) + ",\n"
cnt += f.name + ".ts\n"
}
return bytes.NewReader([]byte(cnt))
return m.streamPlaylist.reader()
}
// TSFile returns a reader to read a MPEG-TS file.
func (m *Muxer) TSFile(fname string) io.Reader {
base := strings.TrimSuffix(fname, ".ts")
m.mutex.RLock()
f, ok := m.tsByName[base]
m.mutex.RUnlock()
if !ok {
return nil
}
return f.newReader()
// Segment returns a reader to read a segment.
func (m *Muxer) Segment(fname string) io.Reader {
return m.streamPlaylist.segment(fname)
}

View File

@ -80,13 +80,11 @@ func TestMuxer(t *testing.T) {
`#EXT-X-TARGETDURATION:2\n` +
`#EXT-X-MEDIA-SEQUENCE:0\n` +
`#EXTINF:2,\n` +
`([0-9]+\.ts)\n` +
`#EXTINF:0,\n` +
`([0-9]+\.ts)\n$`)
ma := re.FindStringSubmatch(string(byts))
require.NotEqual(t, nil, ma)
require.NotEqual(t, 0, len(ma))
byts, err = ioutil.ReadAll(m.TSFile(ma[1]))
byts, err = ioutil.ReadAll(m.Segment(ma[1]))
require.NoError(t, err)
checkTSPacket(t, byts, 0, 1)
@ -107,3 +105,29 @@ func TestMuxer(t *testing.T) {
byts[:24],
)
}
func TestMuxerCloseBeforeFirstSegment(t *testing.T) {
videoTrack, err := gortsplib.NewTrackH264(96, []byte{0x07, 0x01, 0x02, 0x03}, []byte{0x08})
require.NoError(t, err)
audioTrack, err := gortsplib.NewTrackAAC(97, []byte{17, 144})
require.NoError(t, err)
m, err := NewMuxer(3, 1*time.Second, videoTrack, audioTrack)
require.NoError(t, err)
// group with IDR
err = m.WriteH264(2*time.Second, [][]byte{
{5}, // IDR
{9}, // AUD
{8}, // PPS
{7}, // SPS
})
require.NoError(t, err)
m.Close()
byts, err := ioutil.ReadAll(m.StreamPlaylist())
require.NoError(t, err)
require.Equal(t, []byte{}, byts)
}

View File

@ -0,0 +1,55 @@
package hls
import (
"bytes"
"encoding/hex"
"io"
"strings"
"github.com/aler9/gortsplib"
)
type primaryPlaylist struct {
videoTrack *gortsplib.Track
audioTrack *gortsplib.Track
h264SPS []byte
h264PPS []byte
breader *bytes.Reader
}
func newPrimaryPlaylist(
videoTrack *gortsplib.Track,
audioTrack *gortsplib.Track,
h264SPS []byte,
h264PPS []byte,
) *primaryPlaylist {
p := &primaryPlaylist{
videoTrack: videoTrack,
audioTrack: audioTrack,
h264SPS: h264SPS,
h264PPS: h264PPS,
}
var codecs []string
if p.videoTrack != nil {
codecs = append(codecs, "avc1."+hex.EncodeToString(p.h264SPS[1:4]))
}
if p.audioTrack != nil {
codecs = append(codecs, "mp4a.40.2")
}
cnt := "#EXTM3U\n"
cnt += "#EXT-X-STREAM-INF:BANDWIDTH=200000,CODECS=\"" + strings.Join(codecs, ",") + "\"\n"
cnt += "stream.m3u8\n"
p.breader = bytes.NewReader([]byte(cnt))
return p
}
func (p *primaryPlaylist) reader() io.Reader {
return p.breader
}

View File

@ -1,6 +1,7 @@
package hls
import (
"bytes"
"context"
"io"
"strconv"
@ -13,10 +14,13 @@ import (
"github.com/aler9/rtsp-simple-server/internal/h264"
)
type tsFile struct {
videoTrack *gortsplib.Track
type segment struct {
videoTrack *gortsplib.Track
h264SPS []byte
h264PPS []byte
name string
buf *multiAccessBuffer
buf bytes.Buffer
mux *astits.Muxer
firstPacketWritten bool
minPTS time.Duration
@ -25,14 +29,20 @@ type tsFile struct {
pcrSendCounter int
}
func newTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *tsFile {
t := &tsFile{
func newSegment(
videoTrack *gortsplib.Track,
audioTrack *gortsplib.Track,
h264SPS []byte,
h264PPS []byte,
) *segment {
t := &segment{
videoTrack: videoTrack,
h264SPS: h264SPS,
h264PPS: h264PPS,
name: strconv.FormatInt(time.Now().Unix(), 10),
buf: newMultiAccessBuffer(),
}
t.mux = astits.NewMuxer(context.Background(), t.buf)
t.mux = astits.NewMuxer(context.Background(), &t.buf)
if videoTrack != nil {
t.mux.AddElementaryStream(astits.PMTElementaryStream{
@ -62,25 +72,19 @@ func newTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *tsFile
return t
}
func (t *tsFile) close() error {
return t.buf.Close()
}
func (t *tsFile) duration() time.Duration {
func (t *segment) duration() time.Duration {
return t.maxPTS - t.minPTS
}
func (t *tsFile) setStartPCR(startPCR time.Time) {
func (t *segment) setStartPCR(startPCR time.Time) {
t.startPCR = startPCR
}
func (t *tsFile) newReader() io.Reader {
return t.buf.NewReader()
func (t *segment) reader() io.Reader {
return bytes.NewReader(t.buf.Bytes())
}
func (t *tsFile) writeH264(
h264SPS []byte,
h264PPS []byte,
func (t *segment) writeH264(
dts time.Duration,
pts time.Duration,
isIDR bool,
@ -113,8 +117,8 @@ func (t *tsFile) writeH264(
// add SPS and PPS before IDR
if typ == h264.NALUTypeIDR {
filteredNALUs = append(filteredNALUs, h264SPS)
filteredNALUs = append(filteredNALUs, h264PPS)
filteredNALUs = append(filteredNALUs, t.h264SPS)
filteredNALUs = append(filteredNALUs, t.h264PPS)
}
filteredNALUs = append(filteredNALUs, nalu)
@ -172,7 +176,7 @@ func (t *tsFile) writeH264(
return err
}
func (t *tsFile) writeAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error {
func (t *segment) writeAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error {
if t.videoTrack == nil {
if !t.firstPacketWritten {
t.firstPacketWritten = true

View File

@ -0,0 +1,128 @@
package hls
import (
"bytes"
"io"
"math"
"strconv"
"strings"
"sync"
)
type readerFunc struct {
wrapped func() []byte
reader *bytes.Reader
}
func (r *readerFunc) Read(buf []byte) (int, error) {
if r.reader == nil {
cnt := r.wrapped()
r.reader = bytes.NewReader(cnt)
}
return r.reader.Read(buf)
}
type streamPlaylist struct {
hlsSegmentCount int
mutex sync.Mutex
cond *sync.Cond
closed bool
segments []*segment
segmentByName map[string]*segment
segmentDeleteCount int
}
func newStreamPlaylist(hlsSegmentCount int) *streamPlaylist {
p := &streamPlaylist{
hlsSegmentCount: hlsSegmentCount,
segmentByName: make(map[string]*segment),
}
p.cond = sync.NewCond(&p.mutex)
return p
}
func (p *streamPlaylist) close() {
func() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.closed = true
}()
p.cond.Broadcast()
}
func (p *streamPlaylist) reader() io.Reader {
return &readerFunc{wrapped: func() []byte {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.closed && len(p.segments) == 0 {
p.cond.Wait()
}
if p.closed {
return nil
}
cnt := "#EXTM3U\n"
cnt += "#EXT-X-VERSION:3\n"
cnt += "#EXT-X-ALLOW-CACHE:NO\n"
targetDuration := func() uint {
ret := uint(0)
// EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION
for _, f := range p.segments {
v2 := uint(math.Round(f.duration().Seconds()))
if v2 > ret {
ret = v2
}
}
return ret
}()
cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n"
cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(p.segmentDeleteCount), 10) + "\n"
for _, f := range p.segments {
cnt += "#EXTINF:" + strconv.FormatFloat(f.duration().Seconds(), 'f', -1, 64) + ",\n"
cnt += f.name + ".ts\n"
}
return []byte(cnt)
}}
}
func (p *streamPlaylist) segment(fname string) io.Reader {
base := strings.TrimSuffix(fname, ".ts")
p.mutex.Lock()
f, ok := p.segmentByName[base]
p.mutex.Unlock()
if !ok {
return nil
}
return f.reader()
}
func (p *streamPlaylist) pushSegment(t *segment) {
func() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.segmentByName[t.name] = t
p.segments = append(p.segments, t)
if len(p.segments) > p.hlsSegmentCount {
delete(p.segmentByName, p.segments[0].name)
p.segments = p.segments[1:]
p.segmentDeleteCount++
}
}()
p.cond.Broadcast()
}