From e605727c78c9cc453269401e6e88476ec19b2880 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Mon, 28 Nov 2022 09:00:05 +0100 Subject: [PATCH] produce same absolute time in RTSP and HLS (#1249) * add a NTP timestamp to each data unit * use that NTP timestamp in all protocols --- internal/core/data.go | 16 +++++++++++++++ internal/core/hls_muxer.go | 4 ++-- internal/core/hls_source.go | 2 ++ internal/core/rpicamera_source.go | 1 + internal/core/rtmp_conn.go | 3 +++ internal/core/rtmp_source.go | 2 ++ internal/core/rtsp_session.go | 3 +++ internal/core/rtsp_source.go | 3 +++ internal/core/stream.go | 2 +- internal/hls/muxer.go | 8 ++++---- internal/hls/muxer_variant.go | 4 ++-- internal/hls/muxer_variant_fmp4.go | 8 ++++---- internal/hls/muxer_variant_fmp4_segmenter.go | 20 +++++++++++-------- internal/hls/muxer_variant_mpegts.go | 8 ++++---- .../hls/muxer_variant_mpegts_segmenter.go | 20 +++++++++---------- 15 files changed, 69 insertions(+), 35 deletions(-) diff --git a/internal/core/data.go b/internal/core/data.go index 4dadfc71..62bbdbaf 100644 --- a/internal/core/data.go +++ b/internal/core/data.go @@ -10,11 +10,13 @@ import ( type data interface { getTrackID() int getRTPPackets() []*rtp.Packet + getNTP() time.Time } type dataGeneric struct { trackID int rtpPackets []*rtp.Packet + ntp time.Time } func (d *dataGeneric) getTrackID() int { @@ -25,9 +27,14 @@ func (d *dataGeneric) getRTPPackets() []*rtp.Packet { return d.rtpPackets } +func (d *dataGeneric) getNTP() time.Time { + return d.ntp +} + type dataH264 struct { trackID int rtpPackets []*rtp.Packet + ntp time.Time pts time.Duration nalus [][]byte } @@ -40,9 +47,14 @@ func (d *dataH264) getRTPPackets() []*rtp.Packet { return d.rtpPackets } +func (d *dataH264) getNTP() time.Time { + return d.ntp +} + type dataMPEG4Audio struct { trackID int rtpPackets []*rtp.Packet + ntp time.Time pts time.Duration aus [][]byte } @@ -54,3 +66,7 @@ func (d *dataMPEG4Audio) getTrackID() int { func (d *dataMPEG4Audio) getRTPPackets() []*rtp.Packet { return d.rtpPackets } + +func (d *dataMPEG4Audio) getNTP() time.Time { + return d.ntp +} diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 2b848f9b..de255499 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -433,7 +433,7 @@ func (m *hlsMuxer) runWriter( } pts := tdata.pts - videoStartPTS - err := m.muxer.WriteH264(time.Now(), pts, tdata.nalus) + err := m.muxer.WriteH264(tdata.ntp, pts, tdata.nalus) if err != nil { return fmt.Errorf("muxer error: %v", err) } @@ -452,7 +452,7 @@ func (m *hlsMuxer) runWriter( for i, au := range tdata.aus { err := m.muxer.WriteAAC( - time.Now(), + tdata.ntp, pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* time.Second/time.Duration(audioTrack.ClockRate()), au) diff --git a/internal/core/hls_source.go b/internal/core/hls_source.go index 2bf98d59..e0cdf32e 100644 --- a/internal/core/hls_source.go +++ b/internal/core/hls_source.go @@ -82,6 +82,7 @@ func (s *hlsSource) run(ctx context.Context) error { trackID: videoTrackID, pts: pts, nalus: nalus, + ntp: time.Now(), }) if err != nil { s.Log(logger.Warn, "%v", err) @@ -93,6 +94,7 @@ func (s *hlsSource) run(ctx context.Context) error { trackID: audioTrackID, pts: pts, aus: [][]byte{au}, + ntp: time.Now(), }) if err != nil { s.Log(logger.Warn, "%v", err) diff --git a/internal/core/rpicamera_source.go b/internal/core/rpicamera_source.go index de819c87..7a8581bf 100644 --- a/internal/core/rpicamera_source.go +++ b/internal/core/rpicamera_source.go @@ -62,6 +62,7 @@ func (s *rpiCameraSource) run(ctx context.Context) error { trackID: 0, pts: dts, nalus: nalus, + ntp: time.Now(), }) if err != nil { s.Log(logger.Warn, "%v", err) diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 994006c1..627dcf36 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -562,6 +562,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { trackID: videoTrackID, pts: tmsg.DTS + tmsg.PTSDelta, nalus: nalus, + ntp: time.Now(), }) if err != nil { c.log(logger.Warn, "%v", err) @@ -600,6 +601,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { trackID: videoTrackID, pts: tmsg.DTS + tmsg.PTSDelta, nalus: validNALUs, + ntp: time.Now(), }) if err != nil { c.log(logger.Warn, "%v", err) @@ -616,6 +618,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { trackID: audioTrackID, pts: tmsg.DTS, aus: [][]byte{tmsg.Payload}, + ntp: time.Now(), }) if err != nil { c.log(logger.Warn, "%v", err) diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index 4085acdb..1065b082 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -174,6 +174,7 @@ func (s *rtmpSource) run(ctx context.Context) error { trackID: videoTrackID, pts: tmsg.DTS + tmsg.PTSDelta, nalus: nalus, + ntp: time.Now(), }) if err != nil { s.Log(logger.Warn, "%v", err) @@ -190,6 +191,7 @@ func (s *rtmpSource) run(ctx context.Context) error { trackID: audioTrackID, pts: tmsg.DTS, aus: [][]byte{tmsg.Payload}, + ntp: time.Now(), }) if err != nil { s.Log(logger.Warn, "%v", err) diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index f5261b5a..e2915646 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -386,18 +386,21 @@ func (s *rtspSession) onPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) { err = s.stream.writeData(&dataH264{ trackID: ctx.TrackID, rtpPackets: []*rtp.Packet{ctx.Packet}, + ntp: time.Now(), }) case *gortsplib.TrackMPEG4Audio: err = s.stream.writeData(&dataMPEG4Audio{ trackID: ctx.TrackID, rtpPackets: []*rtp.Packet{ctx.Packet}, + ntp: time.Now(), }) default: err = s.stream.writeData(&dataGeneric{ trackID: ctx.TrackID, rtpPackets: []*rtp.Packet{ctx.Packet}, + ntp: time.Now(), }) } diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index 3e722ea1..fe650f65 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -151,18 +151,21 @@ func (s *rtspSource) run(ctx context.Context) error { err = res.stream.writeData(&dataH264{ trackID: ctx.TrackID, rtpPackets: []*rtp.Packet{ctx.Packet}, + ntp: time.Now(), }) case *gortsplib.TrackMPEG4Audio: err = res.stream.writeData(&dataMPEG4Audio{ trackID: ctx.TrackID, rtpPackets: []*rtp.Packet{ctx.Packet}, + ntp: time.Now(), }) default: err = res.stream.writeData(&dataGeneric{ trackID: ctx.TrackID, rtpPackets: []*rtp.Packet{ctx.Packet}, + ntp: time.Now(), }) } diff --git a/internal/core/stream.go b/internal/core/stream.go index a98cdbbb..64c85dce 100644 --- a/internal/core/stream.go +++ b/internal/core/stream.go @@ -112,7 +112,7 @@ func (s *stream) writeData(data data) error { // forward RTP packets to RTSP readers for _, pkt := range data.getRTPPackets() { atomic.AddUint64(s.bytesReceived, uint64(pkt.MarshalSize())) - s.rtspStream.WritePacketRTP(data.getTrackID(), pkt) + s.rtspStream.WritePacketRTPWithNTP(data.getTrackID(), pkt, data.getNTP()) } // forward data to non-RTSP readers diff --git a/internal/hls/muxer.go b/internal/hls/muxer.go index d4ca8b01..1bd6456a 100644 --- a/internal/hls/muxer.go +++ b/internal/hls/muxer.go @@ -77,13 +77,13 @@ func (m *Muxer) Close() { } // WriteH264 writes H264 NALUs, grouped by timestamp. -func (m *Muxer) WriteH264(now time.Time, pts time.Duration, nalus [][]byte) error { - return m.variant.writeH264(now, pts, nalus) +func (m *Muxer) WriteH264(ntp time.Time, pts time.Duration, nalus [][]byte) error { + return m.variant.writeH264(ntp, pts, nalus) } // WriteAAC writes AAC AUs, grouped by timestamp. -func (m *Muxer) WriteAAC(now time.Time, pts time.Duration, au []byte) error { - return m.variant.writeAAC(now, pts, au) +func (m *Muxer) WriteAAC(ntp time.Time, pts time.Duration, au []byte) error { + return m.variant.writeAAC(ntp, pts, au) } // File returns a file reader. diff --git a/internal/hls/muxer_variant.go b/internal/hls/muxer_variant.go index a950a5c1..10456274 100644 --- a/internal/hls/muxer_variant.go +++ b/internal/hls/muxer_variant.go @@ -16,7 +16,7 @@ const ( type muxerVariant interface { close() - writeH264(now time.Time, pts time.Duration, nalus [][]byte) error - writeAAC(now time.Time, pts time.Duration, au []byte) error + writeH264(ntp time.Time, pts time.Duration, nalus [][]byte) error + writeAAC(ntp time.Time, pts time.Duration, au []byte) error file(name string, msn string, part string, skip string) *MuxerFileResponse } diff --git a/internal/hls/muxer_variant_fmp4.go b/internal/hls/muxer_variant_fmp4.go index 1108a1f4..9785049f 100644 --- a/internal/hls/muxer_variant_fmp4.go +++ b/internal/hls/muxer_variant_fmp4.go @@ -63,12 +63,12 @@ func (v *muxerVariantFMP4) close() { v.playlist.close() } -func (v *muxerVariantFMP4) writeH264(now time.Time, pts time.Duration, nalus [][]byte) error { - return v.segmenter.writeH264(now, pts, nalus) +func (v *muxerVariantFMP4) writeH264(ntp time.Time, pts time.Duration, nalus [][]byte) error { + return v.segmenter.writeH264(ntp, pts, nalus) } -func (v *muxerVariantFMP4) writeAAC(now time.Time, pts time.Duration, au []byte) error { - return v.segmenter.writeAAC(now, pts, au) +func (v *muxerVariantFMP4) writeAAC(ntp time.Time, pts time.Duration, au []byte) error { + return v.segmenter.writeAAC(ntp, pts, au) } func (v *muxerVariantFMP4) file(name string, msn string, part string, skip string) *MuxerFileResponse { diff --git a/internal/hls/muxer_variant_fmp4_segmenter.go b/internal/hls/muxer_variant_fmp4_segmenter.go index 312fc7a5..c20ec489 100644 --- a/internal/hls/muxer_variant_fmp4_segmenter.go +++ b/internal/hls/muxer_variant_fmp4_segmenter.go @@ -48,11 +48,13 @@ func findCompatiblePartDuration( type augmentedVideoSample struct { fmp4.PartSample dts time.Duration + ntp time.Time } type augmentedAudioSample struct { fmp4.PartSample dts time.Duration + ntp time.Time } type muxerVariantFMP4Segmenter struct { @@ -138,7 +140,7 @@ func (m *muxerVariantFMP4Segmenter) adjustPartDuration(du time.Duration) { } } -func (m *muxerVariantFMP4Segmenter) writeH264(now time.Time, pts time.Duration, nalus [][]byte) error { +func (m *muxerVariantFMP4Segmenter) writeH264(ntp time.Time, pts time.Duration, nalus [][]byte) error { idrPresent := false nonIDRPresent := false @@ -157,11 +159,11 @@ func (m *muxerVariantFMP4Segmenter) writeH264(now time.Time, pts time.Duration, return nil } - return m.writeH264Entry(now, pts, nalus, idrPresent) + return m.writeH264Entry(ntp, pts, nalus, idrPresent) } func (m *muxerVariantFMP4Segmenter) writeH264Entry( - now time.Time, + ntp time.Time, pts time.Duration, nalus [][]byte, idrPresent bool, @@ -210,6 +212,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry( Payload: avcc, }, dts: dts, + ntp: ntp, } // put samples into a queue in order to @@ -226,7 +229,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry( m.currentSegment = newMuxerVariantFMP4Segment( m.lowLatency, m.genSegmentID(), - now, + sample.ntp, sample.dts, m.segmentMaxSize, m.videoTrack, @@ -261,7 +264,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry( m.currentSegment = newMuxerVariantFMP4Segment( m.lowLatency, m.genSegmentID(), - now, + m.nextVideoSample.ntp, m.nextVideoSample.dts, m.segmentMaxSize, m.videoTrack, @@ -282,7 +285,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry( return nil } -func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, au []byte) error { +func (m *muxerVariantFMP4Segmenter) writeAAC(ntp time.Time, dts time.Duration, au []byte) error { if m.videoTrack != nil { // wait for the video track if !m.videoFirstIDRReceived { @@ -300,6 +303,7 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a Payload: au, }, dts: dts, + ntp: ntp, } // put samples into a queue in order to compute the sample duration @@ -315,7 +319,7 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a m.currentSegment = newMuxerVariantFMP4Segment( m.lowLatency, m.genSegmentID(), - now, + sample.ntp, sample.dts, m.segmentMaxSize, m.videoTrack, @@ -350,7 +354,7 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a m.currentSegment = newMuxerVariantFMP4Segment( m.lowLatency, m.genSegmentID(), - now, + m.nextAudioSample.ntp, m.nextAudioSample.dts, m.segmentMaxSize, m.videoTrack, diff --git a/internal/hls/muxer_variant_mpegts.go b/internal/hls/muxer_variant_mpegts.go index 684e214a..1872a669 100644 --- a/internal/hls/muxer_variant_mpegts.go +++ b/internal/hls/muxer_variant_mpegts.go @@ -39,12 +39,12 @@ func (v *muxerVariantMPEGTS) close() { v.playlist.close() } -func (v *muxerVariantMPEGTS) writeH264(now time.Time, pts time.Duration, nalus [][]byte) error { - return v.segmenter.writeH264(now, pts, nalus) +func (v *muxerVariantMPEGTS) writeH264(ntp time.Time, pts time.Duration, nalus [][]byte) error { + return v.segmenter.writeH264(ntp, pts, nalus) } -func (v *muxerVariantMPEGTS) writeAAC(now time.Time, pts time.Duration, au []byte) error { - return v.segmenter.writeAAC(now, pts, au) +func (v *muxerVariantMPEGTS) writeAAC(ntp time.Time, pts time.Duration, au []byte) error { + return v.segmenter.writeAAC(ntp, pts, au) } func (v *muxerVariantMPEGTS) file(name string, msn string, part string, skip string) *MuxerFileResponse { diff --git a/internal/hls/muxer_variant_mpegts_segmenter.go b/internal/hls/muxer_variant_mpegts_segmenter.go index fa4102c1..e70b3d2a 100644 --- a/internal/hls/muxer_variant_mpegts_segmenter.go +++ b/internal/hls/muxer_variant_mpegts_segmenter.go @@ -56,7 +56,7 @@ func (m *muxerVariantMPEGTSSegmenter) genSegmentID() uint64 { return id } -func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration, nalus [][]byte) error { +func (m *muxerVariantMPEGTSSegmenter) writeH264(ntp time.Time, pts time.Duration, nalus [][]byte) error { idrPresent := false nonIDRPresent := false @@ -87,7 +87,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration return err } - m.startPCR = now + m.startPCR = ntp m.startDTS = dts dts = 0 pts -= m.startDTS @@ -95,7 +95,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration // create first segment m.currentSegment = newMuxerVariantMPEGTSSegment( m.genSegmentID(), - now, + ntp, m.segmentMaxSize, m.videoTrack, m.audioTrack, @@ -121,7 +121,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration m.onSegmentReady(m.currentSegment) m.currentSegment = newMuxerVariantMPEGTSSegment( m.genSegmentID(), - now, + ntp, m.segmentMaxSize, m.videoTrack, m.audioTrack, @@ -130,7 +130,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration } err := m.currentSegment.writeH264( - now.Sub(m.startPCR), + ntp.Sub(m.startPCR), dts, pts, idrPresent, @@ -142,17 +142,17 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration return nil } -func (m *muxerVariantMPEGTSSegmenter) writeAAC(now time.Time, pts time.Duration, au []byte) error { +func (m *muxerVariantMPEGTSSegmenter) writeAAC(ntp time.Time, pts time.Duration, au []byte) error { if m.videoTrack == nil { if m.currentSegment == nil { - m.startPCR = now + m.startPCR = ntp m.startDTS = pts pts = 0 // create first segment m.currentSegment = newMuxerVariantMPEGTSSegment( m.genSegmentID(), - now, + ntp, m.segmentMaxSize, m.videoTrack, m.audioTrack, @@ -167,7 +167,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeAAC(now time.Time, pts time.Duration, m.onSegmentReady(m.currentSegment) m.currentSegment = newMuxerVariantMPEGTSSegment( m.genSegmentID(), - now, + ntp, m.segmentMaxSize, m.videoTrack, m.audioTrack, @@ -183,7 +183,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeAAC(now time.Time, pts time.Duration, pts -= m.startDTS } - err := m.currentSegment.writeAAC(now.Sub(m.startPCR), pts, au) + err := m.currentSegment.writeAAC(ntp.Sub(m.startPCR), pts, au) if err != nil { return err }