diff --git a/internal/hls/client_processor_mpegts_track.go b/internal/hls/client_processor_mpegts_track.go index 32de38f3..5831ed25 100644 --- a/internal/hls/client_processor_mpegts_track.go +++ b/internal/hls/client_processor_mpegts_track.go @@ -54,5 +54,10 @@ func (t *clientProcessorMPEGTSTrack) processEntry(ctx context.Context, pes *asti return err } + // silently discard packets prior to the first packet of the leading track + if pts < 0 { + return nil + } + return t.onEntry(pts, pes.Data) } diff --git a/internal/hls/client_timesync_mpegts.go b/internal/hls/client_timesync_mpegts.go index f49e0ffa..9ffcc0b1 100644 --- a/internal/hls/client_timesync_mpegts.go +++ b/internal/hls/client_timesync_mpegts.go @@ -3,31 +3,30 @@ package hls import ( "context" "fmt" + "sync" "time" - "github.com/aler9/rtsp-simple-server/internal/hls/mpegts" + "github.com/aler9/rtsp-simple-server/internal/hls/mpegtstimedec" ) type clientTimeSyncMPEGTS struct { startRTC time.Time - startDTS int64 - td *mpegts.TimeDecoder + td *mpegtstimedec.Decoder + mutex sync.Mutex } func newClientTimeSyncMPEGTS(startDTS int64) *clientTimeSyncMPEGTS { return &clientTimeSyncMPEGTS{ startRTC: time.Now(), - startDTS: startDTS, - td: mpegts.NewTimeDecoder(), + td: mpegtstimedec.New(startDTS), } } func (ts *clientTimeSyncMPEGTS) convertAndSync(ctx context.Context, rawDTS int64, rawPTS int64) (time.Duration, error) { - rawDTS = (rawDTS - ts.startDTS) & 0x1FFFFFFFF - rawPTS = (rawPTS - ts.startDTS) & 0x1FFFFFFFF - + ts.mutex.Lock() dts := ts.td.Decode(rawDTS) pts := ts.td.Decode(rawPTS) + ts.mutex.Unlock() elapsed := time.Since(ts.startRTC) if dts > elapsed { diff --git a/internal/hls/mpegts/timedecoder.go b/internal/hls/mpegts/timedecoder.go deleted file mode 100644 index 0879cddd..00000000 --- a/internal/hls/mpegts/timedecoder.go +++ /dev/null @@ -1,55 +0,0 @@ -package mpegts - -import ( - "sync" - "time" -) - -const ( - maximum = 0x1FFFFFFFF // 33 bits - negativeThreshold = 0x1FFFFFFFF / 2 - clockRate = 90000 -) - -// TimeDecoder is a MPEG-TS timestamp decoder. -type TimeDecoder struct { - initialized bool - tsOverall time.Duration - tsPrev int64 - mutex sync.Mutex -} - -// NewTimeDecoder allocates a TimeDecoder. -func NewTimeDecoder() *TimeDecoder { - return &TimeDecoder{} -} - -// Decode decodes a MPEG-TS timestamp. -func (d *TimeDecoder) Decode(ts int64) time.Duration { - d.mutex.Lock() - defer d.mutex.Unlock() - - if !d.initialized { - d.initialized = true - d.tsPrev = ts - return 0 - } - - diff := (ts - d.tsPrev) & maximum - - // negative difference - if diff > negativeThreshold { - diff = (d.tsPrev - ts) & maximum - d.tsPrev = ts - d.tsOverall -= time.Duration(diff) - } else { - d.tsPrev = ts - d.tsOverall += time.Duration(diff) - } - - // avoid an int64 overflow and preserve resolution by splitting division into two parts: - // first add the integer part, then the decimal part. - secs := d.tsOverall / clockRate - dec := d.tsOverall % clockRate - return secs*time.Second + dec*time.Second/clockRate -} diff --git a/internal/hls/mpegts/timedecoder_test.go b/internal/hls/mpegts/timedecoder_test.go deleted file mode 100644 index 8bf48fac..00000000 --- a/internal/hls/mpegts/timedecoder_test.go +++ /dev/null @@ -1,79 +0,0 @@ -package mpegts - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestTimeDecoderNegativeDiff(t *testing.T) { - d := NewTimeDecoder() - - i := int64(0) - pts := d.Decode(i) - require.Equal(t, time.Duration(0), pts) - - i += 90000 * 2 - pts = d.Decode(i) - require.Equal(t, 2*time.Second, pts) - - i -= 90000 * 1 - pts = d.Decode(i) - require.Equal(t, 1*time.Second, pts) - - i += 90000 * 2 - pts = d.Decode(i) - require.Equal(t, 3*time.Second, pts) -} - -func TestTimeDecoderOverflow(t *testing.T) { - d := NewTimeDecoder() - - i := int64(0x1FFFFFFFF - 20) - secs := time.Duration(0) - pts := d.Decode(i) - require.Equal(t, time.Duration(0), pts) - - const stride = 150 - lim := int64(uint64(0x1FFFFFFFF - (stride * 90000))) - - for n := 0; n < 100; n++ { - // overflow - i += 90000 * stride - secs += stride - pts = d.Decode(i) - require.Equal(t, secs*time.Second, pts) - - // reach 2^32 slowly - secs += stride - i += 90000 * stride - for ; i < lim; i += 90000 * stride { - pts = d.Decode(i) - require.Equal(t, secs*time.Second, pts) - secs += stride - } - } -} - -func TestTimeDecoderOverflowAndBack(t *testing.T) { - d := NewTimeDecoder() - - pts := d.Decode(0x1FFFFFFFF - 90000 + 1) - require.Equal(t, time.Duration(0), pts) - - pts = d.Decode(90000) - require.Equal(t, 2*time.Second, pts) - - pts = d.Decode(0x1FFFFFFFF - 90000 + 1) - require.Equal(t, time.Duration(0), pts) - - pts = d.Decode(0x1FFFFFFFF - 90000*2 + 1) - require.Equal(t, -1*time.Second, pts) - - pts = d.Decode(0x1FFFFFFFF - 90000 + 1) - require.Equal(t, time.Duration(0), pts) - - pts = d.Decode(90000) - require.Equal(t, 2*time.Second, pts) -} diff --git a/internal/hls/mpegtstimedec/decoder.go b/internal/hls/mpegtstimedec/decoder.go new file mode 100644 index 00000000..6edbbd6d --- /dev/null +++ b/internal/hls/mpegtstimedec/decoder.go @@ -0,0 +1,46 @@ +// Package mpegtstimedec contains a MPEG-TS timestamp decoder. +package mpegtstimedec + +import ( + "time" +) + +const ( + maximum = 0x1FFFFFFFF // 33 bits + negativeThreshold = 0x1FFFFFFFF / 2 + clockRate = 90000 +) + +// Decoder is a MPEG-TS timestamp decoder. +type Decoder struct { + overall time.Duration + prev int64 +} + +// New allocates a Decoder. +func New(start int64) *Decoder { + return &Decoder{ + prev: start, + } +} + +// Decode decodes a MPEG-TS timestamp. +func (d *Decoder) Decode(ts int64) time.Duration { + diff := (ts - d.prev) & maximum + + // negative difference + if diff > negativeThreshold { + diff = (d.prev - ts) & maximum + d.prev = ts + d.overall -= time.Duration(diff) + } else { + d.prev = ts + d.overall += time.Duration(diff) + } + + // avoid an int64 overflow and preserve resolution by splitting division into two parts: + // first add the integer part, then the decimal part. + secs := d.overall / clockRate + dec := d.overall % clockRate + return secs*time.Second + dec*time.Second/clockRate +} diff --git a/internal/hls/mpegtstimedec/decoder_test.go b/internal/hls/mpegtstimedec/decoder_test.go new file mode 100644 index 00000000..5e3c4444 --- /dev/null +++ b/internal/hls/mpegtstimedec/decoder_test.go @@ -0,0 +1,72 @@ +package mpegtstimedec + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestNegativeDiff(t *testing.T) { + d := New(64523434) + + ts := d.Decode(64523434 - 90000) + require.Equal(t, -1*time.Second, ts) + + ts = d.Decode(64523434) + require.Equal(t, time.Duration(0), ts) + + ts = d.Decode(64523434 + 90000*2) + require.Equal(t, 2*time.Second, ts) + + ts = d.Decode(64523434 + 90000) + require.Equal(t, 1*time.Second, ts) +} + +func TestOverflow(t *testing.T) { + d := New(0x1FFFFFFFF - 20) + + i := int64(0x1FFFFFFFF - 20) + secs := time.Duration(0) + const stride = 150 + lim := int64(uint64(0x1FFFFFFFF - (stride * 90000))) + + for n := 0; n < 100; n++ { + // overflow + i += 90000 * stride + secs += stride + ts := d.Decode(i) + require.Equal(t, secs*time.Second, ts) + + // reach 2^32 slowly + secs += stride + i += 90000 * stride + for ; i < lim; i += 90000 * stride { + ts = d.Decode(i) + require.Equal(t, secs*time.Second, ts) + secs += stride + } + } +} + +func TestOverflowAndBack(t *testing.T) { + d := New(0x1FFFFFFFF - 90000 + 1) + + ts := d.Decode(0x1FFFFFFFF - 90000 + 1) + require.Equal(t, time.Duration(0), ts) + + ts = d.Decode(90000) + require.Equal(t, 2*time.Second, ts) + + ts = d.Decode(0x1FFFFFFFF - 90000 + 1) + require.Equal(t, time.Duration(0), ts) + + ts = d.Decode(0x1FFFFFFFF - 90000*2 + 1) + require.Equal(t, -1*time.Second, ts) + + ts = d.Decode(0x1FFFFFFFF - 90000 + 1) + require.Equal(t, time.Duration(0), ts) + + ts = d.Decode(90000) + require.Equal(t, 2*time.Second, ts) +}