hls muxer / source: route AAC units singularly

This aligns the HLS implementation with the rest of the server. In case
of HLS/MPEGTS, the server now generates an ADTS packet for each AU,
without grouping multiple AUs into a single ADTS packet.
This commit is contained in:
aler9 2022-08-14 12:16:39 +02:00
parent 3f0771bb31
commit a8822b9f15
13 changed files with 55 additions and 67 deletions

View File

@ -384,9 +384,11 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
continue
}
err = m.muxer.WriteAAC(pts, aus)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
for _, au := range aus {
err = m.muxer.WriteAAC(pts, au)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
}
}
}

View File

@ -119,12 +119,12 @@ func (s *hlsSource) run(ctx context.Context) error {
}
}
onAudioData := func(pts time.Duration, aus [][]byte) {
onAudioData := func(pts time.Duration, au []byte) {
if stream == nil {
return
}
pkts, err := audioEnc.Encode(aus, pts)
pkts, err := audioEnc.Encode([][]byte{au}, pts)
if err != nil {
return
}

View File

@ -48,7 +48,7 @@ type ClientLogger interface {
type Client struct {
onTracks func(*gortsplib.TrackH264, *gortsplib.TrackMPEG4Audio) error
onVideoData func(time.Duration, [][]byte)
onAudioData func(time.Duration, [][]byte)
onAudioData func(time.Duration, []byte)
logger ClientLogger
ctx context.Context
@ -85,7 +85,7 @@ func NewClient(
fingerprint string,
onTracks func(*gortsplib.TrackH264, *gortsplib.TrackMPEG4Audio) error,
onVideoData func(time.Duration, [][]byte),
onAudioData func(time.Duration, [][]byte),
onAudioData func(time.Duration, []byte),
logger ClientLogger,
) (*Client, error) {
primaryPlaylistURL, err := url.Parse(primaryPlaylistURLStr)
@ -549,8 +549,8 @@ func (c *Client) onAudioProcessorTrack(track *gortsplib.TrackMPEG4Audio) error {
return nil
}
func (c *Client) onAudioProcessorData(pts time.Duration, aus [][]byte) {
func (c *Client) onAudioProcessorData(pts time.Duration, au []byte) {
c.tracksMutex.RLock()
defer c.tracksMutex.RUnlock()
c.onAudioData(pts, aus)
c.onAudioData(pts, au)
}

View File

@ -17,7 +17,7 @@ type clientAudioProcessorData struct {
type clientAudioProcessor struct {
ctx context.Context
onTrack func(*gortsplib.TrackMPEG4Audio) error
onData func(time.Duration, [][]byte)
onData func(time.Duration, []byte)
trackInitialized bool
queue chan clientAudioProcessorData
@ -27,7 +27,7 @@ type clientAudioProcessor struct {
func newClientAudioProcessor(
ctx context.Context,
onTrack func(*gortsplib.TrackMPEG4Audio) error,
onData func(time.Duration, [][]byte),
onData func(time.Duration, []byte),
) *clientAudioProcessor {
p := &clientAudioProcessor{
ctx: ctx,
@ -58,14 +58,6 @@ func (p *clientAudioProcessor) doProcess(
data []byte,
pts time.Duration,
) error {
var adtsPkts mpeg4audio.ADTSPackets
err := adtsPkts.Unmarshal(data)
if err != nil {
return err
}
aus := make([][]byte, 0, len(adtsPkts))
elapsed := time.Since(p.clockStartRTC)
if pts > elapsed {
select {
@ -75,7 +67,13 @@ func (p *clientAudioProcessor) doProcess(
}
}
for _, pkt := range adtsPkts {
var adtsPkts mpeg4audio.ADTSPackets
err := adtsPkts.Unmarshal(data)
if err != nil {
return err
}
for i, pkt := range adtsPkts {
if !p.trackInitialized {
p.trackInitialized = true
@ -97,10 +95,11 @@ func (p *clientAudioProcessor) doProcess(
}
}
aus = append(aus, pkt.AU)
p.onData(
pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*time.Second/time.Duration(pkt.SampleRate),
pkt.AU)
}
p.onData(pts, aus)
return nil
}

View File

@ -225,7 +225,7 @@ func TestClient(t *testing.T) {
}, nalus)
close(packetRecv)
},
func(pts time.Duration, aus [][]byte) {
func(pts time.Duration, au []byte) {
},
testLogger{},
)

View File

@ -81,8 +81,8 @@ func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error {
}
// WriteAAC writes AAC AUs, grouped by timestamp.
func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error {
return m.variant.writeAAC(pts, aus)
func (m *Muxer) WriteAAC(pts time.Duration, au []byte) error {
return m.variant.writeAAC(pts, au)
}
// File returns a file reader.

View File

@ -60,9 +60,8 @@ func TestMuxerVideoAudio(t *testing.T) {
})
require.NoError(t, err)
err = m.WriteAAC(3*time.Second, [][]byte{
{0x01, 0x02, 0x03, 0x04},
{0x05, 0x06, 0x07, 0x08},
err = m.WriteAAC(3*time.Second, []byte{
0x01, 0x02, 0x03, 0x04,
})
require.NoError(t, err)
@ -175,8 +174,8 @@ func TestMuxerVideoAudio(t *testing.T) {
require.NoError(t, err)
require.Equal(t, &astits.Packet{
AdaptationField: &astits.PacketAdaptationField{
Length: 147,
StuffingLength: 146,
Length: 158,
StuffingLength: 157,
RandomAccessIndicator: true,
},
Header: &astits.PacketHeader{
@ -186,11 +185,10 @@ func TestMuxerVideoAudio(t *testing.T) {
PID: 257,
},
Payload: []byte{
0x00, 0x00, 0x01, 0xc0, 0x00, 0x1e, 0x80, 0x80,
0x00, 0x00, 0x01, 0xc0, 0x00, 0x13, 0x80, 0x80,
0x05, 0x21, 0x00, 0x07, 0xd8, 0x5f, 0xff, 0xf1,
0x50, 0x80, 0x01, 0x7f, 0xfc, 0x01, 0x02, 0x03,
0x04, 0xff, 0xf1, 0x50, 0x80, 0x01, 0x7f, 0xfc,
0x05, 0x06, 0x07, 0x08,
0x04,
},
}, pkt)
}
@ -299,21 +297,19 @@ func TestMuxerAudioOnly(t *testing.T) {
defer m.Close()
for i := 0; i < 100; i++ {
err = m.WriteAAC(1*time.Second, [][]byte{
{0x01, 0x02, 0x03, 0x04},
err = m.WriteAAC(1*time.Second, []byte{
0x01, 0x02, 0x03, 0x04,
})
require.NoError(t, err)
}
err = m.WriteAAC(2*time.Second, [][]byte{
{0x01, 0x02, 0x03, 0x04},
{0x05, 0x06, 0x07, 0x08},
err = m.WriteAAC(2*time.Second, []byte{
0x01, 0x02, 0x03, 0x04,
})
require.NoError(t, err)
err = m.WriteAAC(3*time.Second, [][]byte{
{0x01, 0x02, 0x03, 0x04},
{0x05, 0x06, 0x07, 0x08},
err = m.WriteAAC(3*time.Second, []byte{
0x01, 0x02, 0x03, 0x04,
})
require.NoError(t, err)

View File

@ -17,6 +17,6 @@ const (
type muxerVariant interface {
close()
writeH264(pts time.Duration, nalus [][]byte) error
writeAAC(pts time.Duration, aus [][]byte) error
writeAAC(pts time.Duration, au []byte) error
file(name string, msn string, part string, skip string) *MuxerFileResponse
}

View File

@ -92,8 +92,8 @@ func (v *muxerVariantFMP4) writeH264(pts time.Duration, nalus [][]byte) error {
return v.segmenter.writeH264(pts, nalus)
}
func (v *muxerVariantFMP4) writeAAC(pts time.Duration, aus [][]byte) error {
return v.segmenter.writeAAC(pts, aus)
func (v *muxerVariantFMP4) writeAAC(pts time.Duration, au []byte) error {
return v.segmenter.writeAAC(pts, au)
}
func (v *muxerVariantFMP4) file(name string, msn string, part string, skip string) *MuxerFileResponse {

View File

@ -6,7 +6,6 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
)
func partDurationIsCompatible(partDuration time.Duration, sampleDuration time.Duration) bool {
@ -258,17 +257,11 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry(sample *fmp4VideoSample) erro
return nil
}
func (m *muxerVariantFMP4Segmenter) writeAAC(pts time.Duration, aus [][]byte) error {
for i, au := range aus {
err := m.writeAACEntry(&fmp4AudioSample{
pts: pts + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*time.Second/time.Duration(m.audioTrack.ClockRate()),
au: au,
})
if err != nil {
return err
}
}
return nil
func (m *muxerVariantFMP4Segmenter) writeAAC(pts time.Duration, au []byte) error {
return m.writeAACEntry(&fmp4AudioSample{
pts: pts,
au: au,
})
}
func (m *muxerVariantFMP4Segmenter) writeAACEntry(sample *fmp4AudioSample) error {

View File

@ -43,8 +43,8 @@ func (v *muxerVariantMPEGTS) writeH264(pts time.Duration, nalus [][]byte) error
return v.segmenter.writeH264(pts, nalus)
}
func (v *muxerVariantMPEGTS) writeAAC(pts time.Duration, aus [][]byte) error {
return v.segmenter.writeAAC(pts, aus)
func (v *muxerVariantMPEGTS) writeAAC(pts time.Duration, au []byte) error {
return v.segmenter.writeAAC(pts, au)
}
func (v *muxerVariantMPEGTS) file(name string, msn string, part string, skip string) *MuxerFileResponse {

View File

@ -145,17 +145,15 @@ func (t *muxerVariantMPEGTSSegment) writeH264(
func (t *muxerVariantMPEGTSSegment) writeAAC(
pcr time.Duration,
pts time.Duration,
aus [][]byte,
au []byte,
) error {
pkts := make(mpeg4audio.ADTSPackets, len(aus))
for i, au := range aus {
pkts[i] = &mpeg4audio.ADTSPacket{
pkts := mpeg4audio.ADTSPackets{
{
Type: t.audioTrack.Config.Type,
SampleRate: t.audioTrack.Config.SampleRate,
ChannelCount: t.audioTrack.Config.ChannelCount,
AU: au,
}
},
}
enc, err := pkts.Marshal()
@ -198,7 +196,7 @@ func (t *muxerVariantMPEGTSSegment) writeAAC(
}
if t.videoTrack == nil {
t.audioAUCount += len(aus)
t.audioAUCount++
if t.startDTS == nil {
t.startDTS = &pts

View File

@ -155,7 +155,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt
return nil
}
func (m *muxerVariantMPEGTSSegmenter) writeAAC(pts time.Duration, aus [][]byte) error {
func (m *muxerVariantMPEGTSSegmenter) writeAAC(pts time.Duration, au []byte) error {
now := time.Now()
if m.videoTrack == nil {
@ -188,7 +188,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeAAC(pts time.Duration, aus [][]byte)
pts -= m.startDTS
}
err := m.currentSegment.writeAAC(now.Sub(m.startPCR), pts, aus)
err := m.currentSegment.writeAAC(now.Sub(m.startPCR), pts, au)
if err != nil {
return err
}