hls client: skip packets received before the 1st packet of the leading

track; make sure that the initial DTS is zero
This commit is contained in:
aler9 2022-10-26 17:37:54 +02:00
parent 8cec54c980
commit 7981522bf6
6 changed files with 130 additions and 142 deletions

View File

@ -54,5 +54,10 @@ func (t *clientProcessorMPEGTSTrack) processEntry(ctx context.Context, pes *asti
return err return err
} }
// silently discard packets prior to the first packet of the leading track
if pts < 0 {
return nil
}
return t.onEntry(pts, pes.Data) return t.onEntry(pts, pes.Data)
} }

View File

@ -3,31 +3,30 @@ package hls
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/aler9/rtsp-simple-server/internal/hls/mpegts" "github.com/aler9/rtsp-simple-server/internal/hls/mpegtstimedec"
) )
type clientTimeSyncMPEGTS struct { type clientTimeSyncMPEGTS struct {
startRTC time.Time startRTC time.Time
startDTS int64 td *mpegtstimedec.Decoder
td *mpegts.TimeDecoder mutex sync.Mutex
} }
func newClientTimeSyncMPEGTS(startDTS int64) *clientTimeSyncMPEGTS { func newClientTimeSyncMPEGTS(startDTS int64) *clientTimeSyncMPEGTS {
return &clientTimeSyncMPEGTS{ return &clientTimeSyncMPEGTS{
startRTC: time.Now(), startRTC: time.Now(),
startDTS: startDTS, td: mpegtstimedec.New(startDTS),
td: mpegts.NewTimeDecoder(),
} }
} }
func (ts *clientTimeSyncMPEGTS) convertAndSync(ctx context.Context, rawDTS int64, rawPTS int64) (time.Duration, error) { func (ts *clientTimeSyncMPEGTS) convertAndSync(ctx context.Context, rawDTS int64, rawPTS int64) (time.Duration, error) {
rawDTS = (rawDTS - ts.startDTS) & 0x1FFFFFFFF ts.mutex.Lock()
rawPTS = (rawPTS - ts.startDTS) & 0x1FFFFFFFF
dts := ts.td.Decode(rawDTS) dts := ts.td.Decode(rawDTS)
pts := ts.td.Decode(rawPTS) pts := ts.td.Decode(rawPTS)
ts.mutex.Unlock()
elapsed := time.Since(ts.startRTC) elapsed := time.Since(ts.startRTC)
if dts > elapsed { if dts > elapsed {

View File

@ -1,55 +0,0 @@
package mpegts
import (
"sync"
"time"
)
const (
maximum = 0x1FFFFFFFF // 33 bits
negativeThreshold = 0x1FFFFFFFF / 2
clockRate = 90000
)
// TimeDecoder is a MPEG-TS timestamp decoder.
type TimeDecoder struct {
initialized bool
tsOverall time.Duration
tsPrev int64
mutex sync.Mutex
}
// NewTimeDecoder allocates a TimeDecoder.
func NewTimeDecoder() *TimeDecoder {
return &TimeDecoder{}
}
// Decode decodes a MPEG-TS timestamp.
func (d *TimeDecoder) Decode(ts int64) time.Duration {
d.mutex.Lock()
defer d.mutex.Unlock()
if !d.initialized {
d.initialized = true
d.tsPrev = ts
return 0
}
diff := (ts - d.tsPrev) & maximum
// negative difference
if diff > negativeThreshold {
diff = (d.tsPrev - ts) & maximum
d.tsPrev = ts
d.tsOverall -= time.Duration(diff)
} else {
d.tsPrev = ts
d.tsOverall += time.Duration(diff)
}
// avoid an int64 overflow and preserve resolution by splitting division into two parts:
// first add the integer part, then the decimal part.
secs := d.tsOverall / clockRate
dec := d.tsOverall % clockRate
return secs*time.Second + dec*time.Second/clockRate
}

View File

@ -1,79 +0,0 @@
package mpegts
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestTimeDecoderNegativeDiff(t *testing.T) {
d := NewTimeDecoder()
i := int64(0)
pts := d.Decode(i)
require.Equal(t, time.Duration(0), pts)
i += 90000 * 2
pts = d.Decode(i)
require.Equal(t, 2*time.Second, pts)
i -= 90000 * 1
pts = d.Decode(i)
require.Equal(t, 1*time.Second, pts)
i += 90000 * 2
pts = d.Decode(i)
require.Equal(t, 3*time.Second, pts)
}
func TestTimeDecoderOverflow(t *testing.T) {
d := NewTimeDecoder()
i := int64(0x1FFFFFFFF - 20)
secs := time.Duration(0)
pts := d.Decode(i)
require.Equal(t, time.Duration(0), pts)
const stride = 150
lim := int64(uint64(0x1FFFFFFFF - (stride * 90000)))
for n := 0; n < 100; n++ {
// overflow
i += 90000 * stride
secs += stride
pts = d.Decode(i)
require.Equal(t, secs*time.Second, pts)
// reach 2^32 slowly
secs += stride
i += 90000 * stride
for ; i < lim; i += 90000 * stride {
pts = d.Decode(i)
require.Equal(t, secs*time.Second, pts)
secs += stride
}
}
}
func TestTimeDecoderOverflowAndBack(t *testing.T) {
d := NewTimeDecoder()
pts := d.Decode(0x1FFFFFFFF - 90000 + 1)
require.Equal(t, time.Duration(0), pts)
pts = d.Decode(90000)
require.Equal(t, 2*time.Second, pts)
pts = d.Decode(0x1FFFFFFFF - 90000 + 1)
require.Equal(t, time.Duration(0), pts)
pts = d.Decode(0x1FFFFFFFF - 90000*2 + 1)
require.Equal(t, -1*time.Second, pts)
pts = d.Decode(0x1FFFFFFFF - 90000 + 1)
require.Equal(t, time.Duration(0), pts)
pts = d.Decode(90000)
require.Equal(t, 2*time.Second, pts)
}

View File

@ -0,0 +1,46 @@
// Package mpegtstimedec contains a MPEG-TS timestamp decoder.
package mpegtstimedec
import (
"time"
)
const (
maximum = 0x1FFFFFFFF // 33 bits
negativeThreshold = 0x1FFFFFFFF / 2
clockRate = 90000
)
// Decoder is a MPEG-TS timestamp decoder.
type Decoder struct {
overall time.Duration
prev int64
}
// New allocates a Decoder.
func New(start int64) *Decoder {
return &Decoder{
prev: start,
}
}
// Decode decodes a MPEG-TS timestamp.
func (d *Decoder) Decode(ts int64) time.Duration {
diff := (ts - d.prev) & maximum
// negative difference
if diff > negativeThreshold {
diff = (d.prev - ts) & maximum
d.prev = ts
d.overall -= time.Duration(diff)
} else {
d.prev = ts
d.overall += time.Duration(diff)
}
// avoid an int64 overflow and preserve resolution by splitting division into two parts:
// first add the integer part, then the decimal part.
secs := d.overall / clockRate
dec := d.overall % clockRate
return secs*time.Second + dec*time.Second/clockRate
}

View File

@ -0,0 +1,72 @@
package mpegtstimedec
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestNegativeDiff(t *testing.T) {
d := New(64523434)
ts := d.Decode(64523434 - 90000)
require.Equal(t, -1*time.Second, ts)
ts = d.Decode(64523434)
require.Equal(t, time.Duration(0), ts)
ts = d.Decode(64523434 + 90000*2)
require.Equal(t, 2*time.Second, ts)
ts = d.Decode(64523434 + 90000)
require.Equal(t, 1*time.Second, ts)
}
func TestOverflow(t *testing.T) {
d := New(0x1FFFFFFFF - 20)
i := int64(0x1FFFFFFFF - 20)
secs := time.Duration(0)
const stride = 150
lim := int64(uint64(0x1FFFFFFFF - (stride * 90000)))
for n := 0; n < 100; n++ {
// overflow
i += 90000 * stride
secs += stride
ts := d.Decode(i)
require.Equal(t, secs*time.Second, ts)
// reach 2^32 slowly
secs += stride
i += 90000 * stride
for ; i < lim; i += 90000 * stride {
ts = d.Decode(i)
require.Equal(t, secs*time.Second, ts)
secs += stride
}
}
}
func TestOverflowAndBack(t *testing.T) {
d := New(0x1FFFFFFFF - 90000 + 1)
ts := d.Decode(0x1FFFFFFFF - 90000 + 1)
require.Equal(t, time.Duration(0), ts)
ts = d.Decode(90000)
require.Equal(t, 2*time.Second, ts)
ts = d.Decode(0x1FFFFFFFF - 90000 + 1)
require.Equal(t, time.Duration(0), ts)
ts = d.Decode(0x1FFFFFFFF - 90000*2 + 1)
require.Equal(t, -1*time.Second, ts)
ts = d.Decode(0x1FFFFFFFF - 90000 + 1)
require.Equal(t, time.Duration(0), ts)
ts = d.Decode(90000)
require.Equal(t, 2*time.Second, ts)
}