mediamtx/internal/hls/client_processor_mpegts.go

295 lines
6.7 KiB
Go
Raw Normal View History

2022-10-08 09:41:25 +00:00
package hls
import (
"bytes"
"context"
"fmt"
"strings"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
"github.com/asticode/go-astits"
"github.com/aler9/rtsp-simple-server/internal/hls/mpegtstimedec"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
type clientProcessorMPEGTS struct {
segmentQueue *clientSegmentQueue
logger ClientLogger
rp *clientRoutinePool
onTracks func(*gortsplib.TrackH264, *gortsplib.TrackMPEG4Audio) error
onVideoData func(time.Duration, [][]byte)
onAudioData func(time.Duration, []byte)
tracksParsed bool
clockInitialized bool
timeDec *mpegtstimedec.Decoder
startDTS time.Duration
videoPID *uint16
audioPID *uint16
videoTrack *gortsplib.TrackH264
audioTrack *gortsplib.TrackMPEG4Audio
videoProc *clientProcessorMPEGTSTrack
audioProc *clientProcessorMPEGTSTrack
}
func newClientProcessorMPEGTS(
segmentQueue *clientSegmentQueue,
logger ClientLogger,
rp *clientRoutinePool,
onTracks func(*gortsplib.TrackH264, *gortsplib.TrackMPEG4Audio) error,
onVideoData func(time.Duration, [][]byte),
onAudioData func(time.Duration, []byte),
) *clientProcessorMPEGTS {
return &clientProcessorMPEGTS{
segmentQueue: segmentQueue,
logger: logger,
rp: rp,
timeDec: mpegtstimedec.New(),
onTracks: onTracks,
onVideoData: onVideoData,
onAudioData: onAudioData,
}
}
func (p *clientProcessorMPEGTS) run(ctx context.Context) error {
for {
seg, ok := p.segmentQueue.pull(ctx)
if !ok {
return fmt.Errorf("terminated")
}
err := p.processSegment(ctx, seg)
if err != nil {
return err
}
}
}
func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) error {
p.logger.Log(logger.Debug, "processing segment")
dem := astits.NewDemuxer(context.Background(), bytes.NewReader(byts))
if !p.tracksParsed {
p.tracksParsed = true
err := p.parseTracks(dem)
if err != nil {
return err
}
// rewind demuxer in order to read again the audio packet that was used to create the track
if p.audioTrack != nil {
dem = astits.NewDemuxer(context.Background(), bytes.NewReader(byts))
}
}
for {
data, err := dem.NextData()
if err != nil {
if err == astits.ErrNoMorePackets {
return nil
}
if strings.HasPrefix(err.Error(), "astits: parsing PES data failed") {
continue
}
return err
}
if data.PES == nil {
continue
}
if data.PES.Header.OptionalHeader == nil ||
data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorNoPTSOrDTS ||
data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorIsForbidden {
return fmt.Errorf("PTS is missing")
}
pts := p.timeDec.Decode(data.PES.Header.OptionalHeader.PTS.Base)
if p.videoPID != nil && data.PID == *p.videoPID {
var dts time.Duration
if data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorBothPresent {
diff := time.Duration((data.PES.Header.OptionalHeader.PTS.Base-
data.PES.Header.OptionalHeader.DTS.Base)&0x1FFFFFFFF) *
time.Second / 90000
dts = pts - diff
} else {
dts = pts
}
if !p.clockInitialized {
p.clockInitialized = true
p.startDTS = dts
now := time.Now()
p.initializeTrackProcs(now)
}
pts -= p.startDTS
dts -= p.startDTS
p.videoProc.push(ctx, &clientProcessorMPEGTSTrackEntryVideo{
data: data.PES.Data,
pts: pts,
dts: dts,
})
} else if p.audioPID != nil && data.PID == *p.audioPID {
if !p.clockInitialized {
p.clockInitialized = true
p.startDTS = pts
now := time.Now()
p.initializeTrackProcs(now)
}
pts -= p.startDTS
p.audioProc.push(ctx, &clientProcessorMPEGTSTrackEntryAudio{
data: data.PES.Data,
pts: pts,
})
}
}
}
func (p *clientProcessorMPEGTS) parseTracks(dem *astits.Demuxer) error {
// find and parse PMT
for {
data, err := dem.NextData()
if err != nil {
return err
}
if data.PMT != nil {
for _, e := range data.PMT.ElementaryStreams {
switch e.StreamType {
case astits.StreamTypeH264Video:
if p.videoPID != nil {
return fmt.Errorf("multiple video/audio tracks are not supported")
}
v := e.ElementaryPID
p.videoPID = &v
case astits.StreamTypeAACAudio:
if p.audioPID != nil {
return fmt.Errorf("multiple video/audio tracks are not supported")
}
v := e.ElementaryPID
p.audioPID = &v
}
}
break
}
}
if p.videoPID == nil && p.audioPID == nil {
return fmt.Errorf("stream doesn't contain tracks with supported codecs (H264 or AAC)")
}
if p.videoPID != nil {
p.videoTrack = &gortsplib.TrackH264{
PayloadType: 96,
}
if p.audioPID == nil {
err := p.onTracks(p.videoTrack, nil)
if err != nil {
return err
}
}
}
// find and parse first audio packet
if p.audioPID != nil {
for {
data, err := dem.NextData()
if err != nil {
return err
}
if data.PES == nil || data.PID != *p.audioPID {
continue
}
var adtsPkts mpeg4audio.ADTSPackets
err = adtsPkts.Unmarshal(data.PES.Data)
if err != nil {
return fmt.Errorf("unable to decode ADTS: %s", err)
}
pkt := adtsPkts[0]
p.audioTrack = &gortsplib.TrackMPEG4Audio{
PayloadType: 96,
Config: &mpeg4audio.Config{
Type: pkt.Type,
SampleRate: pkt.SampleRate,
ChannelCount: pkt.ChannelCount,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
err = p.onTracks(p.videoTrack, p.audioTrack)
if err != nil {
return err
}
break
}
}
return nil
}
func (p *clientProcessorMPEGTS) initializeTrackProcs(clockStartRTC time.Time) {
if p.videoTrack != nil {
p.videoProc = newClientProcessorMPEGTSTrack(
clockStartRTC,
func(e clientProcessorMPEGTSTrackEntry) error {
vd := e.(*clientProcessorMPEGTSTrackEntryVideo)
nalus, err := h264.AnnexBUnmarshal(vd.data)
if err != nil {
p.logger.Log(logger.Warn, "unable to decode Annex-B: %s", err)
return nil
}
p.onVideoData(vd.pts, nalus)
return nil
},
)
p.rp.add(p.videoProc.run)
}
if p.audioTrack != nil {
p.audioProc = newClientProcessorMPEGTSTrack(
clockStartRTC,
func(e clientProcessorMPEGTSTrackEntry) error {
ad := e.(*clientProcessorMPEGTSTrackEntryAudio)
var adtsPkts mpeg4audio.ADTSPackets
err := adtsPkts.Unmarshal(ad.data)
if err != nil {
return fmt.Errorf("unable to decode ADTS: %s", err)
}
for i, pkt := range adtsPkts {
p.onAudioData(
ad.pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*time.Second/time.Duration(pkt.SampleRate),
pkt.AU)
}
return nil
},
)
p.rp.add(p.audioProc.run)
}
}